//! Runtime state + config shared between the host admin socket, the manager //! socket, and the per-agent sockets: the broker, configured `agent_flake`, //! and the map of registered agent sockets. use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; use tokio::sync::broadcast; use crate::agent_server::{self, AgentSocket}; use crate::approvals::Approvals; use crate::broker::Broker; use crate::container_view::{self, ContainerView}; use crate::dashboard_events::DashboardEvent; use crate::operator_questions::OperatorQuestions; /// Capacity of the dashboard event channel. Slow browser subscribers /// (idle tab, throttled connection) drop frames past this — that's /// fine, the seq dedupe makes a reconnect resync safe. const DASHBOARD_CHANNEL: usize = 256; const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents"; const MANAGER_RUNTIME_ROOT: &str = "/run/hyperhive/manager"; /// Manager-editable per-agent config repos. Bind-mounted RW into the manager /// container as `/agents//`. Hive-c0re only writes to these on first /// spawn (initial commit); after that it's manager-only. const AGENT_STATE_ROOT: &str = "/var/lib/hyperhive/agents"; /// Hive-c0re-only authoritative per-agent config repos. Containers build from /// these. Manager has no filesystem access; the only way to update is via /// `request_apply_commit` + user approval. const APPLIED_STATE_ROOT: &str = "/var/lib/hyperhive/applied"; pub struct Coordinator { pub broker: Arc, pub approvals: Arc, pub questions: Arc, /// URL of the hyperhive flake (no fragment). Inlined into per-agent /// `flake.nix` files as `inputs.hyperhive.url`. pub hyperhive_flake: String, /// TCP port the host's hive-c0re dashboard listens on. Inlined into /// each per-agent flake so the agent's web UI can build the right /// rebuild-button URL pointing back at the dashboard. pub dashboard_port: u16, /// Operator pronouns (free text) — `she/her` by default, set via /// the NixOS module option `services.hive-c0re.operatorPronouns`. /// Reaches each container as the `HIVE_OPERATOR_PRONOUNS` env var /// (injected into systemd.services..environment by the /// meta flake); the harness substitutes it into the agent / /// manager system prompt at boot. pub operator_pronouns: String, agents: Mutex>, /// Agents whose lifecycle action (currently just spawn) is in flight. /// Read by the dashboard to render a spinner; cleared when the action /// resolves (success or failure). transient: Mutex>, /// Unified wire-facing event channel feeding the dashboard SSE /// stream. Carries broker messages (mirrored from `broker.subscribe` /// by the forwarder task in `main.rs`) and dashboard-only mutation /// events (approval added/resolved, question added/answered, etc.). /// Snapshot endpoints capture `event_seq` before reading state so /// the client can dedupe its buffered live traffic against the /// snapshot. dashboard_events: broadcast::Sender, event_seq: AtomicU64, /// Last container snapshot seen by `rescan_containers_and_emit`, /// keyed by `ContainerView.name`. The rescan diffs a fresh /// `container_view::build_all` against this map and emits one /// `ContainerStateChanged` per added/changed row and one /// `ContainerRemoved` per disappeared row. Async — guarded by a /// tokio mutex so the rescan can `await` `lifecycle::list` / /// `is_running` without blocking other coordinator paths. last_containers: tokio::sync::Mutex>, } /// Per-agent in-progress state that the dashboard surfaces between approve /// click and container ready. #[derive(Debug, Clone)] pub struct TransientState { pub kind: TransientKind, pub since: std::time::Instant, } /// RAII handle returned by `Coordinator::transient_guard`. Cleared on /// drop — including drop-via-cancellation, the path that bare /// `set_transient` / `clear_transient` pairs leaked through. Holds an /// `Arc` so the guard is freely returnable / movable. pub struct TransientGuard { coord: Arc, name: String, } impl Drop for TransientGuard { fn drop(&mut self) { self.coord.clear_transient(&self.name); } } #[derive(Debug, Clone, Copy)] pub enum TransientKind { /// `lifecycle::spawn` is running (nixos-container create + update + start). Spawning, /// `lifecycle::start` is running. Starting, /// `lifecycle::kill` is running. Stopping, /// `lifecycle::restart` is running. Restarting, /// `lifecycle::rebuild` is running (nixos-container update). Rebuilding, /// `actions::destroy` is running. Destroying, } impl TransientKind { /// Wire/UI label. Matches the strings the dashboard already /// renders in the transient spinner. pub fn as_str(self) -> &'static str { match self { TransientKind::Spawning => "spawning", TransientKind::Starting => "starting", TransientKind::Stopping => "stopping", TransientKind::Restarting => "restarting", TransientKind::Rebuilding => "rebuilding", TransientKind::Destroying => "destroying", } } } impl Coordinator { pub fn open( db_path: &Path, hyperhive_flake: String, dashboard_port: u16, operator_pronouns: String, ) -> Result { let broker = Broker::open(db_path).context("open broker")?; let approvals = Approvals::open(db_path).context("open approvals")?; let questions = OperatorQuestions::open(db_path).context("open operator_questions")?; let (dashboard_events, _) = broadcast::channel(DASHBOARD_CHANNEL); Ok(Self { broker: Arc::new(broker), approvals: Arc::new(approvals), questions: Arc::new(questions), hyperhive_flake, dashboard_port, operator_pronouns, agents: Mutex::new(HashMap::new()), transient: Mutex::new(HashMap::new()), dashboard_events, event_seq: AtomicU64::new(0), last_containers: tokio::sync::Mutex::new(HashMap::new()), }) } /// Subscribe to the unified dashboard event channel. Used by the /// `/dashboard/stream` SSE handler and by the broker-to-dashboard /// forwarder task. pub fn dashboard_subscribe(&self) -> broadcast::Receiver { self.dashboard_events.subscribe() } /// Stamp the next sequence number. Each emission of a /// `DashboardEvent` should fill its `seq` with `next_seq()` so the /// frame the wire carries is the one the client uses to dedupe. pub fn next_seq(&self) -> u64 { self.event_seq.fetch_add(1, Ordering::SeqCst) + 1 } /// Current high-water seq. Snapshot endpoints read this *before* /// gathering state so the (snapshot.seq, snapshot) pair satisfies: /// any frame with `seq > snapshot.seq` is post-snapshot. The seq /// captured here may grow during snapshot construction — clients /// may double-apply such events, which renderers must tolerate. pub fn current_seq(&self) -> u64 { self.event_seq.load(Ordering::SeqCst) } /// Broadcast a freshly-built `DashboardEvent` (caller fills `seq` /// via `next_seq()`). Returns silently when there are no /// subscribers — the dashboard channel is best-effort presentation /// plumbing, not a delivery guarantee. pub fn emit_dashboard_event(&self, event: DashboardEvent) { let _ = self.dashboard_events.send(event); } /// Emit `ApprovalAdded` immediately after the row is inserted in /// sqlite. Caller passes the diff text it already computed (or /// `None` for spawn approvals which carry no diff). pub fn emit_approval_added( &self, id: i64, agent: &str, approval_kind: &'static str, sha_short: Option, diff: Option, description: Option, ) { self.emit_dashboard_event(DashboardEvent::ApprovalAdded { seq: self.next_seq(), id, agent: agent.to_owned(), approval_kind, sha_short, diff, description, }); } /// Emit `ApprovalResolved` after `mark_approved` / `mark_denied` / /// `mark_failed` lands. `resolved_at` is stamped from the system /// clock here so call sites don't repeat the conversion; if you /// already have an authoritative timestamp from the db update, /// the tiny skew between "row updated" and "event emitted" is /// presentation-only and doesn't matter to clients. pub fn emit_approval_resolved( &self, id: i64, agent: &str, approval_kind: &'static str, sha_short: Option, status: &'static str, note: Option, description: Option, ) { let resolved_at = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0); self.emit_dashboard_event(DashboardEvent::ApprovalResolved { seq: self.next_seq(), id, agent: agent.to_owned(), approval_kind, sha_short, status, resolved_at, note, description, }); } /// Emit `QuestionAdded` after a question is inserted. Fires for /// both operator-targeted (`target = None`) and peer-to-peer /// (`target = Some(agent)`) threads — the dashboard surfaces /// both, distinguishing visually + offering operator override. pub fn emit_question_added( &self, id: i64, asker: &str, question: &str, options: &[String], multi: bool, deadline_at: Option, target: Option<&str>, ) { let asked_at = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0); let question_refs = crate::dashboard::scan_validated_paths(question); self.emit_dashboard_event(DashboardEvent::QuestionAdded { seq: self.next_seq(), id, asker: asker.to_owned(), question: question.to_owned(), options: options.to_vec(), multi, asked_at, deadline_at, target: target.map(str::to_owned), question_refs, }); } /// Emit `QuestionResolved` when a question transitions to /// answered (operator answer, peer answer, operator override on /// a peer thread, operator cancel, or ttl watchdog). Both /// operator-targeted and peer threads fire so the dashboard's /// derived store can move the row from pending to history. pub fn emit_question_resolved( &self, id: i64, answer: &str, answerer: &str, cancelled: bool, target: Option<&str>, ) { let answered_at = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0); let answer_refs = crate::dashboard::scan_validated_paths(answer); self.emit_dashboard_event(DashboardEvent::QuestionResolved { seq: self.next_seq(), id, answer: answer.to_owned(), answerer: answerer.to_owned(), answered_at, cancelled, target: target.map(str::to_owned), answer_refs, }); } /// Rebuild the per-container snapshot, diff it against the last /// one cached on `self`, and emit one /// `DashboardEvent::ContainerStateChanged` per added/changed row /// and one `DashboardEvent::ContainerRemoved` per disappeared row. /// Call after any mutation that could affect what /// `nixos-container list` returns or what a row's /// `running` / `needs_update` / `needs_login` / `deployed_sha` /// resolves to — lifecycle ops, destroy, approve (post-spawn), /// rebuild, meta-update, and the crash-watcher's periodic poll. /// Cheap when nothing changed (one `nixos-container list` + a /// HashMap diff + zero emits). pub async fn rescan_containers_and_emit(self: &Arc) { let fresh = container_view::build_all(self).await; let mut last = self.last_containers.lock().await; let mut changed_or_new = Vec::new(); let mut removed = Vec::new(); // Diff into change vs. add. for view in &fresh { match last.get(&view.name) { Some(prev) if prev == view => {} // unchanged _ => changed_or_new.push(view.clone()), } } // Anything in `last` but not in `fresh` is gone. let fresh_names: std::collections::HashSet<&str> = fresh.iter().map(|c| c.name.as_str()).collect(); for name in last.keys() { if !fresh_names.contains(name.as_str()) { removed.push(name.clone()); } } // Rebuild the cache from the fresh snapshot. last.clear(); for c in fresh { last.insert(c.name.clone(), c); } drop(last); for c in changed_or_new { self.emit_dashboard_event(DashboardEvent::ContainerStateChanged { seq: self.next_seq(), container: c, }); } for name in removed { self.emit_dashboard_event(DashboardEvent::ContainerRemoved { seq: self.next_seq(), name, }); } } /// Read-only snapshot of the last cached container view. Used by /// `/api/state` to cold-load page-open clients without re-running /// `nixos-container list` themselves; the /// `rescan_containers_and_emit` calls keep this fresh. pub async fn containers_snapshot(&self) -> Vec { let last = self.last_containers.lock().await; let mut out: Vec = last.values().cloned().collect(); out.sort_by(|a, b| a.name.cmp(&b.name)); out } pub fn register_agent(self: &Arc, name: &str) -> Result { // Idempotent: drop any existing listener so re-registration (e.g. on rebuild, // or after a hive-c0re restart cleared /run/hyperhive) gets a fresh socket. self.unregister_agent(name); let agent_dir = Self::agent_dir(name); std::fs::create_dir_all(&agent_dir) .with_context(|| format!("create agent dir {}", agent_dir.display()))?; let socket_path = Self::socket_path(name); // Hand the full Coordinator to the per-agent socket — it // needs broker + operator_questions to handle the agent-side // `ask` / `answer` tools, not just the broker. let socket = agent_server::start(name, &socket_path, self.clone())?; self.agents.lock().unwrap().insert(name.to_owned(), socket); Ok(agent_dir) } pub fn unregister_agent(&self, name: &str) { if let Some(socket) = self.agents.lock().unwrap().remove(name) { socket.handle.abort(); let _ = std::fs::remove_file(&socket.path); } } pub fn list_agents(&self) -> Vec { self.agents.lock().unwrap().keys().cloned().collect() } /// Mark an agent as in-progress (only one state per agent for now). /// /// Prefer `transient_guard` when possible — it auto-clears on drop /// even if the surrounding future is cancelled (HTTP request /// aborted, runtime shutdown mid-rebuild, panic between set and /// clear). The bare `set_transient` / `clear_transient` pair leaks /// the transient on any of those paths and the dashboard then /// shows the agent stuck in "rebuilding…" forever. pub fn set_transient(&self, name: &str, kind: TransientKind) { self.transient.lock().unwrap().insert( name.to_owned(), TransientState { kind, since: std::time::Instant::now(), }, ); // Live-update dashboards. `since_unix` is wall-clock so the // browser can tick "Ns spawning…" without polling. The // intra-process map keeps using `Instant` for monotonicity. let since_unix = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0); self.emit_dashboard_event(DashboardEvent::TransientSet { seq: self.next_seq(), name: name.to_owned(), transient_kind: kind.as_str(), since_unix, }); } pub fn clear_transient(&self, name: &str) { let removed = self.transient.lock().unwrap().remove(name).is_some(); if removed { self.emit_dashboard_event(DashboardEvent::TransientCleared { seq: self.next_seq(), name: name.to_owned(), }); } } /// Set a transient state and return a guard that clears it on drop. /// Use this from any path where the surrounding future could be /// cancelled or panic between set and clear (HTTP handlers, spawned /// tasks). The guard's `Drop` runs even on task cancellation, so /// the dashboard's spinner can't get pinned forever. pub fn transient_guard(self: &Arc, name: &str, kind: TransientKind) -> TransientGuard { self.set_transient(name, kind); TransientGuard { coord: self.clone(), name: name.to_owned(), } } pub fn transient_snapshot(&self) -> HashMap { self.transient.lock().unwrap().clone() } /// Drop a system message into the given agent's inbox. Wakes the /// turn loop with a "you were just (re)started" hint — operator /// caused the transition, agent picks up where it left off /// (notes are in the bind-mounted state dir, last turn is in /// --continue's session). Best-effort; broker errors are logged /// but don't propagate. pub fn kick_agent(&self, name: &str, reason: &str) { // Sub-agents bind their state at /agents//state. The // manager has both /state (legacy mount) and /agents // bind-mounted, so /agents//state resolves there too — // use that uniformly so the wake message has one canonical // path that works everywhere. let body = format!( "{reason}\n\nYou were just (re)started by the operator. \ If you were mid-task, check `/agents/{name}/state/` for \ your notes and pick up where you left off. claude's \ `--continue` session is intact, so prior context is \ still in your window." ); if let Err(e) = self.broker.send(&hive_sh4re::Message { from: hive_sh4re::SYSTEM_SENDER.to_owned(), to: name.to_owned(), body, }) { tracing::warn!(error = ?e, %name, "kick_agent: broker.send failed"); } } /// Push a `HelperEvent` into the manager's inbox. Encoded as JSON in /// `Message::body`; sender = `SYSTEM_SENDER`. The manager harness /// recognises the sender and parses the body. Best-effort: a serde or /// broker error is logged but does not propagate. pub fn notify_manager(&self, event: &hive_sh4re::HelperEvent) { self.notify_agent(hive_sh4re::MANAGER_AGENT, event); } /// Push a `HelperEvent` into an arbitrary agent's inbox. Encoded /// the same way as `notify_manager` (sender = `SYSTEM_SENDER`, /// body = JSON-encoded event). Used to route `QuestionAnswered` /// events back to the agent that called `ask`, `QuestionAsked` /// events to the target of a peer question, etc. pub fn notify_agent(&self, agent: &str, event: &hive_sh4re::HelperEvent) { let body = match serde_json::to_string(event) { Ok(s) => s, Err(e) => { tracing::warn!(error = ?e, "failed to encode helper event"); return; } }; if let Err(e) = self.broker.send(&hive_sh4re::Message { from: hive_sh4re::SYSTEM_SENDER.to_owned(), to: agent.to_owned(), body, }) { tracing::warn!(error = ?e, target = %agent, "failed to push helper event"); } } /// Deliver `body` to every currently-registered agent, appending the /// standard broadcast hint. Returns a list of per-agent error strings /// for any that failed (empty = all ok). The sender's own inbox is /// included — the hint text tells agents to ignore if no action needed. pub fn broadcast_send(&self, from: &str, body: &str) -> Vec { const HINT: &str = "\n\n⚠️ _hint: this was a broadcast and may not need any action from you_"; let broadcast_body = format!("{body}{HINT}"); let mut errors = Vec::new(); for agent_name in self.list_agents() { if let Err(e) = self.broker.send(&hive_sh4re::Message { from: from.to_owned(), to: agent_name.clone(), body: broadcast_body.clone(), }) { errors.push(format!("{agent_name}: {e}")); } } errors } pub fn agent_dir(name: &str) -> PathBuf { PathBuf::from(format!("{AGENT_RUNTIME_ROOT}/{name}")) } pub fn socket_path(name: &str) -> PathBuf { Self::agent_dir(name).join("mcp.sock") } pub fn manager_dir() -> PathBuf { PathBuf::from(MANAGER_RUNTIME_ROOT) } pub fn manager_socket_path() -> PathBuf { Self::manager_dir().join("mcp.sock") } /// Ensure a runtime dir + (for sub-agents) per-agent socket exists. For /// the manager, `manager_server::start` owns the socket — just return /// the dir. For sub-agents this is `register_agent` (creates a fresh /// listener bound to `socket_path(name)`). Source directory of the /// `/run/hive/mcp.sock` bind that ends up in `set_nspawn_flags`. pub fn ensure_runtime(self: &Arc, name: &str) -> Result { if name == crate::lifecycle::MANAGER_NAME { let dir = Self::manager_dir(); std::fs::create_dir_all(&dir) .with_context(|| format!("create manager dir {}", dir.display()))?; return Ok(dir); } self.register_agent(name) } /// Per-agent state root (parent of `config/`, future `prompts/`, etc.). pub fn agent_state_root(name: &str) -> PathBuf { PathBuf::from(format!("{AGENT_STATE_ROOT}/{name}")) } /// Manager-editable proposed config repo. Bind-mounted into the manager /// container as `/agents//config/`. pub fn agent_proposed_dir(name: &str) -> PathBuf { Self::agent_state_root(name).join("config") } /// Per-agent Claude credentials dir. Bind-mounted RW into the agent /// container at `/root/.claude` so OAuth state survives container /// destroy/recreate. Each agent owns its own token lineage — sharing /// would break on the first refresh-token rotation. pub fn agent_claude_dir(name: &str) -> PathBuf { Self::agent_state_root(name).join("claude") } /// Per-agent durable knowledge dir. Bind-mounted RW into the agent /// container at `/state`. Survives destroy/recreate alongside the /// claude dir. Agents are told (via the system prompt) to write /// long-lived notes / scratch state here. pub fn agent_notes_dir(name: &str) -> PathBuf { Self::agent_state_root(name).join("state") } /// Authoritative applied config repo. Hive-c0re-only. pub fn agent_applied_dir(name: &str) -> PathBuf { PathBuf::from(format!("{APPLIED_STATE_ROOT}/{name}")) } /// Enumerate names that have a persistent state dir under /// `/var/lib/hyperhive/agents/` (i.e. config / claude creds / /// notes survive). Includes both currently-existing containers and /// destroyed-but-kept tombstones; callers filter the latter by /// subtracting `lifecycle::list()`. #[must_use] pub fn kept_state_names() -> Vec { let Ok(rd) = std::fs::read_dir(AGENT_STATE_ROOT) else { return Vec::new(); }; let mut out: Vec = rd .flatten() .filter(|e| e.file_type().is_ok_and(|t| t.is_dir())) .filter_map(|e| e.file_name().into_string().ok()) .collect(); out.sort(); out } }