use std::path::PathBuf; use std::sync::Arc; use anyhow::{Result, bail}; use clap::{Parser, Subcommand}; use hive_sh4re::{HostRequest, HostResponse}; mod actions; mod agent_server; mod approvals; mod auto_update; mod broker; mod client; mod container_view; mod coordinator; mod crash_watch; mod dashboard; mod dashboard_events; mod events_vacuum; mod forge; mod lifecycle; mod limits; mod manager_server; mod meta; mod migrate; mod open_threads; mod operator_questions; mod questions; mod reminder_scheduler; mod server; use coordinator::Coordinator; #[derive(Parser)] #[command(name = "hive-c0re", about = "hyperhive coordinator daemon and CLI")] struct Cli { /// Path to the host admin socket. #[arg(long, global = true, default_value = "/run/hyperhive/host.sock")] socket: PathBuf, #[command(subcommand)] cmd: Cmd, } #[derive(Subcommand)] enum Cmd { /// Run the coordinator daemon. Serve { /// URL of the hyperhive flake. Inlined into each per-agent /// `flake.nix` as the `hyperhive` input. #[arg(long, default_value = "/etc/hyperhive")] hyperhive_flake: String, /// Path to the sqlite message store. #[arg(long, default_value = "/var/lib/hyperhive/broker.sqlite")] db: PathBuf, /// Dashboard HTTP port. #[arg(long, default_value_t = 7000)] dashboard_port: u16, /// Operator pronouns (free text). Threaded into each /// container's harness via `HIVE_OPERATOR_PRONOUNS` so the /// system prompt can mention them. Default: `she/her`. #[arg(long, default_value = "she/her")] operator_pronouns: String, }, /// Spawn a new agent container directly (`hive-agent-`). Bypasses /// the approval queue — use only as an operator on the host. For /// approval-gated spawns, use `request-spawn` instead. Spawn { name: String }, /// Queue a spawn request as an approval. The container is created on /// `approve ` (CLI) or the dashboard's APPR0VE button. RequestSpawn { name: String }, /// Stop a managed container (graceful). Kill { name: String }, /// Tear down a sub-agent container. Container is removed; persistent /// state (config repos + Claude credentials) is kept by default. Pass /// `--purge` to also wipe the agent's state dirs (config + creds + /// notes). No undo. Destroy { name: String, #[arg(long)] purge: bool, }, /// Apply pending config to a managed container. Rebuild { name: String }, /// List managed containers. List, /// List pending approval requests submitted by the manager. Pending, /// Approve a pending request by id; the action runs immediately. Approve { id: i64 }, /// Deny a pending request by id. Deny { id: i64 }, } #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) .init(); let cli = Cli::parse(); match cli.cmd { Cmd::Serve { hyperhive_flake, db, dashboard_port, operator_pronouns, } => { let coord = Arc::new(Coordinator::open( &db, hyperhive_flake, dashboard_port, operator_pronouns, )?); manager_server::start(coord.clone())?; // Idempotent pre-flight: rewrite pre-meta-layout applied // repos, ensure proposed repos carry the `applied` // remote, bootstrap the meta repo, repoint containers at // `meta#` (one-shot, guarded by a marker file). // Runs before manager auto-spawn so the new manager is // built against meta from the first attempt. if let Err(e) = migrate::run(&coord).await { tracing::warn!(error = ?e, "startup migration failed"); } // Auto-create the manager container if it isn't there yet. Block // on this — without hm1nd the system has no manager harness. // Failures are logged but allowed: a broken auto-spawn shouldn't // make the dashboard unreachable for debugging. if let Err(e) = auto_update::ensure_manager(&coord).await { tracing::warn!(error = ?e, "auto-spawn manager failed"); } // Auto-update in the background — don't block service start. // Sub-agent rebuilds can take tens of seconds; we want the admin // socket up immediately. let update_coord = coord.clone(); tokio::spawn(async move { if let Err(e) = auto_update::run(update_coord).await { tracing::warn!(error = ?e, "auto-update task failed"); } }); // Forge user sweep: ensure every existing container has a // forgejo user + access token. No-op when the hive-forge // container isn't running. Backgrounded — touches the // forge state dir via `nixos-container run` which is slow. tokio::spawn(async move { forge::ensure_all().await; }); // Periodic broker vacuum: drop delivered messages older than // 30 days. Undelivered messages are always kept (still in // flight). Runs hourly; first sweep happens immediately. let vacuum_coord = coord.clone(); tokio::spawn(async move { let interval_secs = 3600u64; let keep_secs: i64 = 30 * 24 * 3600; loop { match vacuum_coord.broker.vacuum_delivered(keep_secs) { Ok(0) => {} Ok(n) => tracing::info!(removed = n, "broker vacuum"), Err(e) => tracing::warn!(error = ?e, "broker vacuum failed"), } tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; } }); // Per-agent events.sqlite vacuum: host-side so the harness // doesn't need any retention wiring of its own. events_vacuum::spawn(coord.clone()); // Container crash watcher: emits HelperEvent::ContainerCrash // when a previously-running container goes away without an // operator-initiated transient state. crash_watch::spawn(coord.clone()); // Reminder scheduler: drains due reminders + handles // file_path payload persistence. See reminder_scheduler.rs. reminder_scheduler::spawn(coord.clone()); // Forward every broker event onto the unified dashboard // channel with a freshly-stamped seq, so the dashboard SSE // sees broker messages + future mutation events on one // stream with one monotonic seq. The broker's intra-process // channel (used by `recv_blocking`) stays untouched. spawn_broker_to_dashboard_forwarder(coord.clone()); let dash_coord = coord.clone(); tokio::spawn(async move { if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await { tracing::error!(error = ?e, "dashboard failed"); } }); server::serve(&cli.socket, coord).await } Cmd::Spawn { name } => { render(client::request(&cli.socket, HostRequest::Spawn { name }).await?) } Cmd::RequestSpawn { name } => { render(client::request(&cli.socket, HostRequest::RequestSpawn { name }).await?) } Cmd::Kill { name } => { render(client::request(&cli.socket, HostRequest::Kill { name }).await?) } Cmd::Destroy { name, purge } => { render(client::request(&cli.socket, HostRequest::Destroy { name, purge }).await?) } Cmd::Rebuild { name } => { render(client::request(&cli.socket, HostRequest::Rebuild { name }).await?) } Cmd::List => render(client::request(&cli.socket, HostRequest::List).await?), Cmd::Pending => render(client::request(&cli.socket, HostRequest::Pending).await?), Cmd::Approve { id } => { render(client::request(&cli.socket, HostRequest::Approve { id }).await?) } Cmd::Deny { id } => render(client::request(&cli.socket, HostRequest::Deny { id }).await?), } } /// Re-emit every broker `MessageEvent` onto the dashboard channel as /// a `DashboardEvent::Sent` / `Delivered` with a freshly-stamped seq. /// Background task; runs for the life of the process. On a lagged /// broker subscription we just keep going — the dashboard channel is /// best-effort presentation plumbing, the broker keeps its own sqlite /// log for replay. fn spawn_broker_to_dashboard_forwarder(coord: Arc) { use broker::MessageEvent; use dashboard_events::DashboardEvent; let mut rx = coord.broker.subscribe(); tokio::spawn(async move { loop { match rx.recv().await { Ok(MessageEvent::Sent { from, to, body, at }) => { let file_refs = dashboard::scan_validated_paths(&body); coord.emit_dashboard_event(DashboardEvent::Sent { seq: coord.next_seq(), from, to, body, at, file_refs, }); } Ok(MessageEvent::Delivered { from, to, body, at }) => { let file_refs = dashboard::scan_validated_paths(&body); coord.emit_dashboard_event(DashboardEvent::Delivered { seq: coord.next_seq(), from, to, body, at, file_refs, }); } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { tracing::warn!(skipped = n, "broker-to-dashboard forwarder lagged"); } Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } }); } fn render(resp: HostResponse) -> Result<()> { println!("{}", serde_json::to_string_pretty(&resp)?); if !resp.ok { bail!(resp.error.unwrap_or_else(|| "request failed".to_owned())); } Ok(()) }