//! Hyperhive dashboard. Lists managed containers (with deep-links to each //! container's web UI), pending approvals (with unified diff vs the applied //! repo, plus approve/deny buttons), and the manager. use std::convert::Infallible; use std::fmt::Write as _; use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; use anyhow::{Context, Result}; use axum::extract::Form; use axum::{ Router, extract::{Path as AxumPath, State}, http::{HeaderMap, StatusCode}, response::{ Html, IntoResponse, Redirect, Response, sse::{Event, KeepAlive, Sse}, }, routing::{get, post}, }; use hive_sh4re::Approval; use serde::{Deserialize, Serialize}; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::{Stream, StreamExt}; use crate::actions; use crate::coordinator::Coordinator; use crate::lifecycle::{self, AGENT_PREFIX, MANAGER_NAME}; const MANAGER_PORT: u16 = 8000; #[derive(Clone)] struct AppState { coord: Arc, } pub async fn serve(port: u16, coord: Arc) -> Result<()> { let app = Router::new() .route("/", get(serve_index)) .route("/static/dashboard.css", get(serve_css)) .route("/static/app.js", get(serve_app_js)) .route("/api/state", get(api_state)) .route("/approve/{id}", post(post_approve)) .route("/deny/{id}", post(post_deny)) .route("/destroy/{name}", post(post_destroy)) .route("/kill/{name}", post(post_kill)) .route("/restart/{name}", post(post_restart)) .route("/start/{name}", post(post_start)) .route("/rebuild/{name}", post(post_rebuild)) .route("/update-all", post(post_update_all)) .route("/answer-question/{id}", post(post_answer_question)) .route("/cancel-question/{id}", post(post_cancel_question)) .route("/purge-tombstone/{name}", post(post_purge_tombstone)) .route("/api/journal/{name}", get(get_journal)) .route("/api/agent-config/{name}", get(get_agent_config)) .route("/request-spawn", post(post_request_spawn)) .route("/op-send", post(post_op_send)) .route("/meta-update", post(post_meta_update)) .route("/messages/stream", get(messages_stream)) .with_state(AppState { coord }); let addr = SocketAddr::from(([0, 0, 0, 0], port)); let listener = bind_with_retry(addr).await?; tracing::info!(%port, "dashboard listening"); axum::serve(listener, app).await?; Ok(()) } // --------------------------------------------------------------------------- // Static asset handlers: the dashboard is an SPA. `GET /` returns the // (static) shell; `GET /static/*` serves the CSS + JS app; `GET /api/state` // returns the current snapshot as JSON. The JS app fetches state on load, // re-fetches after every async-form submit, and listens on // `/messages/stream` for broker traffic. // --------------------------------------------------------------------------- /// `SO_REUSEADDR` bind with retry. Mirrors the per-agent variant — /// hive-c0re restarts also race the previous process's socket release. async fn bind_with_retry(addr: SocketAddr) -> Result { let mut delay_ms = 250u64; let mut attempts = 0u32; loop { match try_bind(addr) { Ok(l) => return Ok(l), Err(e) if e.kind() == std::io::ErrorKind::AddrInUse && attempts < 12 => { tracing::warn!( %addr, attempt = attempts + 1, "dashboard: AddrInUse, retrying in {delay_ms}ms" ); tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; attempts += 1; delay_ms = (delay_ms * 2).min(2000); } Err(e) => { return Err(e).with_context(|| format!("bind dashboard on {addr}")); } } } } fn try_bind(addr: SocketAddr) -> std::io::Result { let sock = match addr { SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?, SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?, }; sock.set_reuseaddr(true)?; sock.bind(addr)?; sock.listen(1024) } async fn serve_index() -> impl IntoResponse { Html(include_str!("../assets/index.html")) } async fn serve_css() -> impl IntoResponse { ( [("content-type", "text/css")], include_str!("../assets/dashboard.css"), ) } async fn serve_app_js() -> impl IntoResponse { ( [("content-type", "application/javascript")], include_str!("../assets/app.js"), ) } #[derive(Serialize)] struct StateSnapshot { hostname: String, manager_port: u16, any_stale: bool, containers: Vec, transients: Vec, approvals: Vec, /// Last 30 resolved approvals (approved / denied / failed), newest- /// first. Drives the "history" tab on the approvals section. approval_history: Vec, /// Latest messages addressed to `operator` — surfaces agent replies /// asynchronously so the operator can see them without watching the /// live panel during a turn. operator_inbox: Vec, /// Pending operator questions (currently only from the manager). /// `ask_operator` returns immediately with the id; on `/answer-question` /// we mark the row answered and fire `HelperEvent::OperatorAnswered` /// into the manager's inbox. questions: Vec, /// State dirs (config history + claude creds + /state/ notes) that /// survive after a destroy-without-purge. The operator can re-spawn /// with the same name to resume, or PURG3 to wipe them. tombstones: Vec, /// Sub-agents whose FNV-1a hashed web UI port collides with at /// least one other agent. Operator resolves by renaming. The /// dashboard renders a banner at the top listing each cluster. port_conflicts: Vec, /// Inputs in `meta/flake.lock` the operator can selectively /// `nix flake update`. Hyperhive first, then `agent-` rows. meta_inputs: Vec, } #[derive(Serialize)] struct PortConflict { port: u16, /// All agent names sharing this port (sorted, ≥2 entries). agents: Vec, } #[derive(Serialize)] struct TombstoneView { name: String, /// Bytes used by the state dir tree. Cheap-ish to compute; let the /// operator know how much they're holding onto. state_bytes: u64, /// Mtime (unix seconds) of the state dir; rough "last seen". last_seen: i64, has_creds: bool, } #[derive(Serialize)] #[allow(clippy::struct_excessive_bools)] struct ContainerView { /// Logical agent name (no `h-` prefix). Used in action URLs. name: String, /// Container name as nixos-container sees it (`h-foo`, `hm1nd`). container: String, is_manager: bool, port: u16, running: bool, needs_update: bool, needs_login: bool, /// When a lifecycle action is in flight on this container, the kind /// (`starting`, `stopping`, etc.) so the JS can render a spinner + /// disable other buttons. #[serde(skip_serializing_if = "Option::is_none")] pending: Option<&'static str>, /// First 12 chars of the sha the meta flake currently has locked /// for this agent's input. Reflects what's actually deployed; can /// differ from `applied//main` only between /// `meta::prepare_deploy` and `finalize_deploy` (≤ build duration). #[serde(skip_serializing_if = "Option::is_none")] deployed_sha: Option, } #[derive(Serialize)] struct TransientView { name: String, kind: &'static str, secs: u64, } #[derive(Serialize)] struct ApprovalHistoryView { id: i64, agent: String, kind: &'static str, /// First 12 chars of the canonical sha (preferred) or /// manager-supplied ref. None for resolved spawn approvals. sha_short: Option, /// `approved` / `denied` / `failed`. status: &'static str, /// Unix seconds. Renders as a relative time on the dashboard. resolved_at: i64, /// Operator-supplied deny reason (for `denied`) or build error /// (for `failed`). None on `approved`. #[serde(skip_serializing_if = "Option::is_none")] note: Option, } #[derive(Serialize)] struct ApprovalView { id: i64, agent: String, kind: &'static str, /// First 12 chars of the `commit_ref`, for `ApplyCommit` only. sha_short: Option, /// Pre-rendered syntax-coloured diff HTML, for `ApplyCommit` only. diff_html: Option, } /// Replace silent `.unwrap_or_default()` on the data sources behind /// `/api/state` so that whichever query degrades surfaces in journald /// instead of leaving the operator staring at an empty list. The /// dashboard still degrades to a sensible default value; the warn /// is just the diagnostic breadcrumb the old code swallowed. fn log_default(what: &str, result: std::result::Result) -> T where T: Default, E: std::fmt::Debug, { match result { Ok(v) => v, Err(e) => { tracing::warn!(target: "api_state", source = %what, error = ?e, "snapshot source failed; using default"); T::default() } } } async fn api_state(headers: HeaderMap, State(state): State) -> axum::Json { let host = headers .get("host") .and_then(|h| h.to_str().ok()) .unwrap_or("localhost"); let hostname = host.split(':').next().unwrap_or(host).to_owned(); let raw_containers = log_default("nixos-container list", lifecycle::list().await); let current_rev = crate::auto_update::current_flake_rev(&state.coord.hyperhive_flake); let transient_snapshot = state.coord.transient_snapshot(); let pending_approvals = gc_orphans( &state.coord, log_default("approvals.pending", state.coord.approvals.pending()), ); let (containers, any_stale) = build_container_views(&raw_containers, current_rev.as_deref(), &transient_snapshot).await; let transients = build_transient_views(&raw_containers, &transient_snapshot); let approvals = build_approval_views(pending_approvals).await; let approval_history = log_default( "approvals.recent_resolved", state.coord.approvals.recent_resolved(30), ) .into_iter() .map(history_view) .collect(); let tombstones = build_tombstone_views(&state.coord, &containers, &transient_snapshot); let port_conflicts = build_port_conflicts(&containers); let operator_inbox = log_default( "broker.recent_for(operator)", state .coord .broker .recent_for(hive_sh4re::OPERATOR_RECIPIENT, 50), ); let questions = log_default("questions.pending", state.coord.questions.pending()); axum::Json(StateSnapshot { hostname, manager_port: MANAGER_PORT, any_stale, containers, transients, approvals, approval_history, meta_inputs: read_meta_inputs(), operator_inbox, questions, tombstones, port_conflicts, }) } /// Group live containers by their assigned web UI port; clusters with /// more than one member are port-hash collisions the operator needs /// to resolve by renaming. Manager (fixed at 8000) and sub-agents /// (8100..8999) can't collide with each other — collisions are /// strictly between sub-agents. fn build_port_conflicts(containers: &[ContainerView]) -> Vec { let mut by_port: std::collections::BTreeMap> = std::collections::BTreeMap::new(); for c in containers { by_port.entry(c.port).or_default().push(c.name.clone()); } by_port .into_iter() .filter(|(_, agents)| agents.len() > 1) .map(|(port, mut agents)| { agents.sort(); PortConflict { port, agents } }) .collect() } /// Build `ContainerView`s for every live nixos-container. Returns the /// list and whether any container is stale (drives the "↻ UPD4TE 4LL" /// banner). async fn build_container_views( raw_containers: &[String], current_rev: Option<&str>, transient_snapshot: &std::collections::HashMap, ) -> (Vec, bool) { let mut out = Vec::new(); let mut any_stale = false; let locked = read_meta_locked_revs(); for c in raw_containers { let (logical, is_manager) = if c == MANAGER_NAME { (MANAGER_NAME.to_owned(), true) } else if let Some(n) = c.strip_prefix(AGENT_PREFIX) { (n.to_owned(), false) } else { continue; }; let needs_update = current_rev.is_some_and(|rev| crate::auto_update::agent_needs_update(&logical, rev)); if needs_update { any_stale = true; } let needs_login = !is_manager && !claude_has_session(&Coordinator::agent_claude_dir(&logical)); let pending = transient_snapshot .get(&logical) .map(|st| transient_label(st.kind)); let deployed_sha = locked .get(&format!("agent-{logical}")) .map(|s| s[..s.len().min(12)].to_owned()); out.push(ContainerView { port: lifecycle::agent_web_port(&logical), running: lifecycle::is_running(&logical).await, container: c.clone(), name: logical, is_manager, needs_update, needs_login, pending, deployed_sha, }); } (out, any_stale) } /// Map of node name → locked sha for nodes the **root** of meta /// directly depends on (`hyperhive`, `agent-`). Used by the /// container row to render its `deployed:` chip per agent. /// Distinct from `read_meta_inputs()` which walks deeper for the /// flake-input update form. fn read_meta_locked_revs() -> std::collections::HashMap { let mut out = std::collections::HashMap::new(); let Ok(raw) = std::fs::read_to_string("/var/lib/hyperhive/meta/flake.lock") else { return out; }; let Ok(json) = serde_json::from_str::(&raw) else { return out; }; let Some(nodes) = json.get("nodes").and_then(|v| v.as_object()) else { return out; }; let Some(root_name) = json.get("root").and_then(|v| v.as_str()) else { return out; }; let Some(root_inputs) = nodes .get(root_name) .and_then(|n| n.get("inputs")) .and_then(|v| v.as_object()) else { return out; }; for alias in root_inputs.keys() { let target_name = match root_inputs.get(alias) { Some(serde_json::Value::String(s)) => s.clone(), _ => continue, }; if let Some(rev) = nodes .get(&target_name) .and_then(|n| n.get("locked")) .and_then(|v| v.get("rev")) .and_then(|v| v.as_str()) { out.insert(alias.clone(), rev.to_owned()); } } out } #[derive(Serialize, Clone)] struct MetaInputView { /// Input key in meta's `flake.nix` — `hyperhive`, `agent-`, etc. name: String, /// Full locked sha. Not displayed verbatim; the dashboard /// truncates to the first 12 chars for the chip. rev: String, /// Unix seconds — `locked.lastModified`. Drives the relative /// "2h ago" timestamp on each input row. last_modified: i64, /// `original.url` if available, for the tooltip / row meta text. #[serde(skip_serializing_if = "Option::is_none")] url: Option, } /// Walk `flake.lock`'s `nodes` graph from `root` and emit one /// `MetaInputView` per fetched input, up to two levels deep. That /// surfaces the direct meta inputs (`hyperhive`, `agent-`) AND /// the agent flakes' own inputs (`agent-dmatrix/mcp-matrix`, /// `hyperhive/nixpkgs`, etc.) so the operator can bump them /// individually from the UI. Deeper transitive nodes aren't shown /// to keep the panel readable — bumping the level-2 entry will /// re-fetch its own sub-inputs anyway. Names are slash-separated /// paths from root, which is the syntax `nix flake update` accepts /// for transitive inputs. /// /// Inputs that resolve via a `follows` chain (lock value is an /// Array of strings) are skipped — they're aliases, not their own /// fetched derivation, and updating them does nothing. fn read_meta_inputs() -> Vec { let mut out = Vec::new(); let Ok(raw) = std::fs::read_to_string("/var/lib/hyperhive/meta/flake.lock") else { return out; }; let Ok(json) = serde_json::from_str::(&raw) else { return out; }; let Some(nodes) = json.get("nodes").and_then(|v| v.as_object()) else { return out; }; let Some(root_name) = json.get("root").and_then(|v| v.as_str()) else { return out; }; walk_meta_inputs(nodes, root_name, "", 0, 2, &mut out); // hyperhive first, then alphabetical (sub-paths sort under their // parent, which gives a tidy 'agent-foo, agent-foo/bar' grouping). out.sort_by(|a, b| match (a.name.as_str(), b.name.as_str()) { ("hyperhive", _) => std::cmp::Ordering::Less, (_, "hyperhive") => std::cmp::Ordering::Greater, _ => a.name.cmp(&b.name), }); out } fn walk_meta_inputs( nodes: &serde_json::Map, node_name: &str, prefix: &str, depth: u32, max_depth: u32, out: &mut Vec, ) { if depth >= max_depth { return; } let Some(node) = nodes.get(node_name) else { return; }; let Some(inputs_map) = node.get("inputs").and_then(|v| v.as_object()) else { return; }; for (alias, target) in inputs_map { // Inputs map value is either a string (node name) or an // array (a `follows` chain). The latter just aliases another // node — we can't `nix flake update` it directly, so skip. let target_name = match target { serde_json::Value::String(s) => s.clone(), _ => continue, }; let Some(target_node) = nodes.get(&target_name) else { continue; }; let path = if prefix.is_empty() { alias.clone() } else { format!("{prefix}/{alias}") }; if let Some(rev) = target_node .get("locked") .and_then(|v| v.get("rev")) .and_then(|v| v.as_str()) { let last_modified = target_node .get("locked") .and_then(|v| v.get("lastModified")) .and_then(serde_json::Value::as_i64) .unwrap_or(0); let url = target_node .get("original") .and_then(|v| v.get("url")) .and_then(|v| v.as_str()) .map(str::to_owned); out.push(MetaInputView { name: path.clone(), rev: rev.to_owned(), last_modified, url, }); } walk_meta_inputs(nodes, &target_name, &path, depth + 1, max_depth, out); } } /// Transient state for agents whose container does NOT yet exist /// (`Spawning`). Lifecycle ops on existing containers surface as /// `ContainerView.pending` inline; this list only catches pre-creation. fn build_transient_views( raw_containers: &[String], transient_snapshot: &std::collections::HashMap, ) -> Vec { transient_snapshot .iter() .filter(|(name, _)| { !raw_containers .iter() .any(|c| c == &format!("{AGENT_PREFIX}{name}") || c == *name) }) .map(|(name, st)| TransientView { name: name.clone(), kind: transient_label(st.kind), secs: st.since.elapsed().as_secs(), }) .collect() } /// Render each pending approval into its dashboard view (short sha + /// unified diff for `ApplyCommit`, just the name for `Spawn`). /// Project a resolved sqlite row into the lean shape the dashboard /// history tab consumes — no `diff_html` (rendering 30 of them /// per /api/state poll would mean 30 git diffs per refresh). fn history_view(a: Approval) -> ApprovalHistoryView { let displayed = a.fetched_sha.as_deref().unwrap_or(&a.commit_ref); let sha_short = if displayed.is_empty() { None } else { Some(displayed[..displayed.len().min(12)].to_owned()) }; let status = match a.status { hive_sh4re::ApprovalStatus::Approved => "approved", hive_sh4re::ApprovalStatus::Denied => "denied", hive_sh4re::ApprovalStatus::Failed => "failed", // Pending shouldn't appear in recent_resolved, but be defensive. hive_sh4re::ApprovalStatus::Pending => "pending", }; let kind = match a.kind { hive_sh4re::ApprovalKind::ApplyCommit => "apply_commit", hive_sh4re::ApprovalKind::Spawn => "spawn", }; ApprovalHistoryView { id: a.id, agent: a.agent, kind, sha_short, status, resolved_at: a.resolved_at.unwrap_or(0), note: a.note, } } async fn build_approval_views(approvals: Vec) -> Vec { let mut out = Vec::with_capacity(approvals.len()); for a in approvals { out.push(match a.kind { hive_sh4re::ApprovalKind::ApplyCommit => { // Prefer the canonical fetched sha from applied; // commit_ref is only the manager's claim and may be // amended out from under us. let displayed = a.fetched_sha.as_deref().unwrap_or(&a.commit_ref); let sha = displayed[..displayed.len().min(12)].to_owned(); let diff = approval_diff(&a.agent, a.id).await; ApprovalView { id: a.id, agent: a.agent, kind: "apply_commit", sha_short: Some(sha), diff_html: Some(render_diff_lines(&diff)), } } hive_sh4re::ApprovalKind::Spawn => ApprovalView { id: a.id, agent: a.agent, kind: "spawn", sha_short: None, diff_html: None, }, }); } out } /// State-dir names that don't appear in the live container list (and /// aren't the manager). Each one surfaces in the dashboard as a row /// with R3V1V3 + PURG3 actions. fn build_tombstone_views( coord: &Coordinator, containers: &[ContainerView], transient_snapshot: &std::collections::HashMap, ) -> Vec { let _ = coord; // kept_state_names is a free fn but takes &self by future plan let live: std::collections::HashSet<&str> = containers .iter() .map(|c| c.name.as_str()) .chain(transient_snapshot.keys().map(String::as_str)) .collect(); Coordinator::kept_state_names() .into_iter() .filter(|name| name != MANAGER_NAME && !live.contains(name.as_str())) .map(|name| { let root = Coordinator::agent_state_root(&name); let state_bytes = dir_size_bytes(&root); let last_seen = std::fs::metadata(&root) .and_then(|m| m.modified()) .ok() .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0); let has_creds = claude_has_session(&Coordinator::agent_claude_dir(&name)); TombstoneView { name, state_bytes, last_seen, has_creds, } }) .collect() } /// Sum the byte size of every regular file under `root`. Cheap to compute /// for typical agent state (config repo + claude creds + notes file — /// usually a few MB); fine to do inline on each /api/state. Returns 0 on /// any error. fn dir_size_bytes(root: &Path) -> u64 { fn walk(p: &Path, acc: &mut u64) { let Ok(rd) = std::fs::read_dir(p) else { return }; for entry in rd.flatten() { let Ok(ft) = entry.file_type() else { continue }; if ft.is_dir() { walk(&entry.path(), acc); } else if ft.is_file() && let Ok(meta) = entry.metadata() { *acc += meta.len(); } } } let mut total = 0u64; walk(root, &mut total); total } async fn messages_stream( State(state): State, ) -> Sse>> { let rx = state.coord.broker.subscribe(); let stream = BroadcastStream::new(rx).filter_map(|res| { // Drop lagged events. Browsers reconnect; nothing to do here. let event = res.ok()?; let json = serde_json::to_string(&event).ok()?; Some(Ok(Event::default().data(json))) }); Sse::new(stream).keep_alive(KeepAlive::default()) } async fn post_approve(State(state): State, AxumPath(id): AxumPath) -> Response { match actions::approve(state.coord.clone(), id).await { Ok(()) => Redirect::to("/").into_response(), Err(e) => error_response(&format!("approve {id} failed: {e:#}")), } } #[derive(Deserialize, Default)] struct DenyForm { #[serde(default)] note: Option, } async fn post_deny( State(state): State, AxumPath(id): AxumPath, Form(form): Form, ) -> Response { let note = form .note .as_deref() .map(str::trim) .filter(|s| !s.is_empty()); match actions::deny(&state.coord, id, note).await { Ok(()) => Redirect::to("/").into_response(), Err(e) => error_response(&format!("deny {id} failed: {e:#}")), } } #[derive(Deserialize)] struct RequestSpawnForm { name: String, } #[derive(Deserialize)] struct AnswerForm { answer: String, } async fn post_answer_question( State(state): State, AxumPath(id): AxumPath, Form(form): Form, ) -> Response { let answer = form.answer.trim(); if answer.is_empty() { return error_response("answer: required"); } match state.coord.questions.answer(id, answer) { Ok((question, asker)) => { tracing::info!(%id, %asker, "operator answered question"); state.coord.notify_agent( &asker, &hive_sh4re::HelperEvent::OperatorAnswered { id, question, answer: answer.to_owned(), }, ); Redirect::to("/").into_response() } Err(e) => error_response(&format!("answer {id} failed: {e:#}")), } } /// Resolve a pending operator question with a sentinel answer when /// the operator decides not to / can't answer. The manager harness /// receives an `OperatorAnswered` event with `answer = "[cancelled]"` /// so it can fall back on whatever default it had. Same code path as /// a real answer — just lets the operator close the loop instead of /// letting the question dangle forever. async fn post_cancel_question( State(state): State, AxumPath(id): AxumPath, ) -> Response { const SENTINEL: &str = "[cancelled]"; match state.coord.questions.answer(id, SENTINEL) { Ok((question, asker)) => { tracing::info!(%id, %asker, "operator cancelled question"); state.coord.notify_agent( &asker, &hive_sh4re::HelperEvent::OperatorAnswered { id, question, answer: SENTINEL.to_owned(), }, ); Redirect::to("/").into_response() } Err(e) => error_response(&format!("cancel-question {id} failed: {e:#}")), } } #[derive(Deserialize)] struct JournalQuery { /// Optional systemd unit filter — e.g. `hive-ag3nt.service`. When /// omitted, returns the full machine journal. #[serde(default)] unit: Option, /// Number of trailing lines to return. Capped at 5000. #[serde(default)] lines: Option, } /// Shell out to `journalctl -M -b` and return its text /// output. Operator-only by virtue of the dashboard being host-bound; /// hive-c0re already runs as root in its systemd unit so journalctl /// has the access it needs. async fn get_journal( AxumPath(name): AxumPath, axum::extract::Query(q): axum::extract::Query, ) -> Response { // Validate the container name against the list of managed // containers so we don't shell out with arbitrary input. let container = strip_container_prefix(&name); let prefixed = if container == lifecycle::MANAGER_NAME { container.clone() } else { format!("{}{container}", lifecycle::AGENT_PREFIX) }; let live = lifecycle::list().await.unwrap_or_default(); if !live.iter().any(|c| c == &prefixed) { return error_response(&format!("journal: no managed container {prefixed:?}")); } let lines = q.lines.unwrap_or(500).min(5000); let mut cmd = tokio::process::Command::new("journalctl"); cmd.args([ "-M", &prefixed, "-b", "--no-pager", "--output=short-iso", "--lines", ]) .arg(lines.to_string()); if let Some(u) = q.unit.as_deref().filter(|s| !s.is_empty()) { // accept hive-ag3nt[.service] / hive-m1nd[.service] — anything // else we refuse, again to keep the shell-out tight. let allowed = ["hive-ag3nt.service", "hive-m1nd.service"]; let unit = if u.ends_with(".service") { u.to_owned() } else { format!("{u}.service") }; if !allowed.contains(&unit.as_str()) { return error_response(&format!("journal: unknown unit {unit:?}")); } cmd.args(["-u", &unit]); } match cmd.output().await { Ok(out) => { // Combine stdout + stderr — journalctl emits to both on errors. let mut body = String::from_utf8_lossy(&out.stdout).into_owned(); if !out.status.success() { body.push_str("\n--- stderr ---\n"); body.push_str(&String::from_utf8_lossy(&out.stderr)); } ([("content-type", "text/plain; charset=utf-8")], body).into_response() } Err(e) => error_response(&format!("journalctl spawn: {e}")), } } /// Show the current `agent.nix` from the applied repo — the file /// the container actually builds against. Read-only; the manager /// can't influence what this returns (that path goes through the /// approval queue). async fn get_agent_config(AxumPath(name): AxumPath) -> Response { let logical = strip_container_prefix(&name); // Constrain to managed containers — same shape as the journal // endpoint, prevents arbitrary filesystem reads. let live = lifecycle::list().await.unwrap_or_default(); let prefixed = if logical == lifecycle::MANAGER_NAME { logical.clone() } else { format!("{}{logical}", lifecycle::AGENT_PREFIX) }; if !live.iter().any(|c| c == &prefixed) { return error_response(&format!("agent-config: no managed container {prefixed:?}")); } let path = Coordinator::agent_applied_dir(&logical).join("agent.nix"); match std::fs::read_to_string(&path) { Ok(body) => ([("content-type", "text/plain; charset=utf-8")], body).into_response(), Err(e) => error_response(&format!("read {}: {e}", path.display())), } } async fn post_purge_tombstone( State(state): State, AxumPath(name): AxumPath, ) -> Response { if name == lifecycle::MANAGER_NAME { return error_response("refusing to purge the manager's state"); } // Sanity: refuse to purge if a live container still exists with this // name. The dashboard already filters tombstones to non-live names, // but the operator could send a stale POST. let live = lifecycle::list().await.unwrap_or_default(); if live .iter() .any(|c| c == &format!("{}{name}", lifecycle::AGENT_PREFIX) || c == &name) { return error_response(&format!( "refusing to purge {name}: container still exists — use DESTR0Y first" )); } let mut errors = Vec::new(); for dir in [ Coordinator::agent_state_root(&name), Coordinator::agent_applied_dir(&name), ] { if dir.exists() && let Err(e) = std::fs::remove_dir_all(&dir) { errors.push(format!("{}: {e}", dir.display())); } } let _ = state .coord .approvals .fail_pending_for_agent(&name, "agent state purged"); if errors.is_empty() { tracing::info!(%name, "tombstone purged"); Redirect::to("/").into_response() } else { error_response(&format!("purge {name} partial: {}", errors.join(", "))) } } /// Operator-side compose form on the dashboard terminal. Drops a /// message into the broker as `{from: "operator", to, body}`. Same /// shape that per-agent web UIs use via `OperatorMsg`, but here the /// operator picks the recipient explicitly with `@name`. No /// validation that `to` resolves to a known agent — broker accepts /// arbitrary recipients (and the agent's inbox grows whether or not /// they exist, which is fine for spawn-then-greet flows). #[derive(Deserialize)] struct OpSendForm { to: String, body: String, } /// Form for `POST /meta-update`. Inputs ride in as a comma-separated /// list under the `inputs` field — the JS submitter joins the /// checked boxes since axum's `Form` extractor doesn't natively /// decode repeated keys without a helper. #[derive(Deserialize)] struct MetaUpdateForm { inputs: String, } /// Bulk-update selected meta flake inputs, then rebuild the affected /// agents in the background. Idempotent w.r.t. selection — choosing /// an input that's already at the latest sha is a no-op (no commit, /// no rebuild ripple). Returns immediately after queueing the work; /// dashboard polls for progress via container `pending` spinners + /// the meta-inputs row sha update. async fn post_meta_update( State(state): State, Form(form): Form, ) -> Response { let inputs: Vec = form .inputs .split(',') .map(|s| s.trim().to_owned()) .filter(|s| !s.is_empty()) .collect(); if inputs.is_empty() { return error_response("meta-update: no inputs selected"); } let coord = state.coord.clone(); let inputs_clone = inputs.clone(); tokio::spawn(async move { run_meta_update(&coord, &inputs_clone).await; }); Redirect::to("/").into_response() } /// Background task: run `nix flake update ` in meta + commit, /// then rebuild every agent whose input was touched (or all agents /// when `hyperhive` was bumped, since that's the shared base). Each /// rebuild fires `Rebuilt { ok, note, ... }` to the manager so the /// operator and manager get the same feedback they'd see from an /// auto-update / manual dashboard rebuild. async fn run_meta_update(coord: &Arc, inputs: &[String]) { tracing::info!(?inputs, "meta-update: starting"); if let Err(e) = crate::meta::lock_update(inputs).await { tracing::warn!(error = ?e, "meta-update: lock_update failed"); return; } // Decide which agents to rebuild. Inputs are slash-paths from // the meta root — `hyperhive`, `hyperhive/nixpkgs`, // `agent-coder`, `agent-coder/mcp-matrix`, etc. Anything in the // hyperhive subtree affects every agent (shared base); anything // in `agent-/...` only the named agent. let touched_hyperhive = inputs .iter() .any(|i| i == "hyperhive" || i.starts_with("hyperhive/")); let touched_agents: Vec = inputs .iter() .filter_map(|i| i.strip_prefix("agent-")) .map(|rest| rest.split('/').next().unwrap_or(rest).to_owned()) .collect(); let agents_to_rebuild: Vec = if touched_hyperhive { crate::lifecycle::list() .await .unwrap_or_default() .into_iter() .filter_map(|c| { if c == crate::lifecycle::MANAGER_NAME { Some(crate::lifecycle::MANAGER_NAME.to_owned()) } else { c.strip_prefix(crate::lifecycle::AGENT_PREFIX) .map(str::to_owned) } }) .collect() } else { touched_agents }; let current_rev = crate::auto_update::current_flake_rev(&coord.hyperhive_flake) .unwrap_or_default(); // Sequential rebuild loop — the META_LOCK guards meta-side // races but parallel nix builds also serialise via nix-daemon, // so sequential is just as fast in practice and keeps logs // readable. for name in agents_to_rebuild { tracing::info!(%name, "meta-update: rebuilding agent"); if let Err(e) = crate::auto_update::rebuild_agent(coord, &name, ¤t_rev).await { tracing::warn!(%name, error = ?e, "meta-update: rebuild failed"); // continue: surface each per-agent failure via its own // Rebuilt event; don't abort the whole batch. } } tracing::info!("meta-update: done"); } async fn post_op_send(State(state): State, Form(form): Form) -> Response { let to = form.to.trim().to_owned(); let body = form.body.trim().to_owned(); if to.is_empty() { return error_response("op-send: `to` required"); } if body.is_empty() { return error_response("op-send: `body` required"); } if let Err(e) = state.coord.broker.send(&hive_sh4re::Message { from: hive_sh4re::OPERATOR_RECIPIENT.to_owned(), to: to.clone(), body, }) { return error_response(&format!("op-send to {to} failed: {e:#}")); } Redirect::to("/").into_response() } async fn post_request_spawn( State(state): State, Form(form): Form, ) -> Response { let name = form.name.trim().to_owned(); if name.is_empty() { return error_response("spawn: `name` required"); } match state .coord .approvals .submit_kind(&name, hive_sh4re::ApprovalKind::Spawn, "") { Ok(id) => { tracing::info!(%id, %name, "operator: spawn approval queued via dashboard"); Redirect::to("/").into_response() } Err(e) => error_response(&format!("request-spawn {name} failed: {e:#}")), } } async fn post_rebuild(State(state): State, AxumPath(name): AxumPath) -> Response { let Some(current_rev) = crate::auto_update::current_flake_rev(&state.coord.hyperhive_flake) else { return error_response( "rebuild: hyperhive_flake has no canonical path; manual rebuild only via `hive-c0re rebuild`", ); }; let coord = state.coord.clone(); lifecycle_action( &state, &name, crate::coordinator::TransientKind::Rebuilding, "rebuild", move |n| { let coord = coord.clone(); let rev = current_rev.clone(); async move { crate::auto_update::rebuild_agent(&coord, &n, &rev).await } }, // rebuild_agent fires kick_agent on success itself, so the // extra-closure is a no-op here. |_, _| {}, ) .await } /// Common shape for the simple lifecycle action handlers (start / /// stop / restart / rebuild): strip the container prefix, mark /// transient for the duration so the dashboard can spinner, run the /// lifecycle op, clear transient, redirect on success or surface the /// error. `verb` only appears in the error message; `extra` runs on /// success after `clear_transient` for handlers that need follow-up /// (e.g. `kill` also unregisters the agent + fires `HelperEvent`). async fn lifecycle_action( state: &AppState, name: &str, kind: crate::coordinator::TransientKind, verb: &str, body: F, extra: impl FnOnce(&AppState, &str), ) -> Response where F: FnOnce(String) -> Fut, Fut: std::future::Future>, { let logical = strip_container_prefix(name); state.coord.set_transient(&logical, kind); let result = body(logical.clone()).await; state.coord.clear_transient(&logical); match result { Ok(()) => { extra(state, &logical); Redirect::to("/").into_response() } Err(e) => error_response(&format!("{verb} {logical} failed: {e:#}")), } } async fn post_kill(State(state): State, AxumPath(name): AxumPath) -> Response { let logical = strip_container_prefix(&name); if logical == lifecycle::MANAGER_NAME { return error_response("kill: refusing to stop the manager"); } lifecycle_action( &state, &name, crate::coordinator::TransientKind::Stopping, "kill", |n| async move { lifecycle::kill(&n).await }, |s, n| { s.coord.unregister_agent(n); s.coord.notify_manager(&hive_sh4re::HelperEvent::Killed { agent: n.to_owned(), }); }, ) .await } async fn post_restart(State(state): State, AxumPath(name): AxumPath) -> Response { lifecycle_action( &state, &name, crate::coordinator::TransientKind::Restarting, "restart", |n| async move { lifecycle::restart(&n).await }, |s, n| s.coord.kick_agent(n, "container restarted"), ) .await } async fn post_start(State(state): State, AxumPath(name): AxumPath) -> Response { lifecycle_action( &state, &name, crate::coordinator::TransientKind::Starting, "start", |n| async move { lifecycle::start(&n).await }, |s, n| s.coord.kick_agent(n, "container started"), ) .await } async fn post_update_all(State(state): State) -> Response { let Some(current_rev) = crate::auto_update::current_flake_rev(&state.coord.hyperhive_flake) else { return error_response("update-all: hyperhive_flake has no canonical path"); }; let containers = lifecycle::list().await.unwrap_or_default(); let mut errors = Vec::new(); for container in containers { let logical = if container == lifecycle::MANAGER_NAME { lifecycle::MANAGER_NAME.to_owned() } else if let Some(n) = container.strip_prefix(lifecycle::AGENT_PREFIX) { n.to_owned() } else { continue; }; if !crate::auto_update::agent_needs_update(&logical, ¤t_rev) { continue; } if let Err(e) = crate::auto_update::rebuild_agent(&state.coord, &logical, ¤t_rev).await { errors.push(format!("{logical}: {e:#}")); } } if errors.is_empty() { Redirect::to("/").into_response() } else { error_response(&format!( "update-all partial failure:\n{}", errors.join("\n") )) } } fn transient_label(k: crate::coordinator::TransientKind) -> &'static str { use crate::coordinator::TransientKind::{ Destroying, Rebuilding, Restarting, Spawning, Starting, Stopping, }; match k { Spawning => "spawning", Starting => "starting", Stopping => "stopping", Restarting => "restarting", Rebuilding => "rebuilding", Destroying => "destroying", } } /// Convert either a logical name or a container name back to the logical /// name. Sub-agents are `h-foo` → `foo`; manager stays `hm1nd`. fn strip_container_prefix(name: &str) -> String { name.strip_prefix(lifecycle::AGENT_PREFIX) .unwrap_or(name) .to_owned() } #[derive(Deserialize, Default)] struct DestroyForm { #[serde(default)] purge: Option, } async fn post_destroy( State(state): State, AxumPath(name): AxumPath, Form(form): Form, ) -> Response { // Checkbox semantics: any non-empty value (axum sends "on") = purge. let purge = form.purge.as_deref().is_some_and(|v| !v.is_empty()); match actions::destroy(&state.coord, &name, purge).await { Ok(()) => Redirect::to("/").into_response(), Err(e) => error_response(&format!("destroy {name} failed: {e:#}")), } } fn error_response(message: &str) -> Response { // Plain text — the JS app surfaces this in an alert(), so HTML // wrapping would just clutter the message. (StatusCode::INTERNAL_SERVER_ERROR, message.to_owned()).into_response() } /// Filter out approvals whose agent state dir was wiped out from under us /// (e.g. by a test script's cleanup). Marks them failed so they fall out of /// `pending` on next render. fn gc_orphans(coord: &Coordinator, approvals: Vec) -> Vec { approvals .into_iter() .filter(|a| { // Spawn approvals are for not-yet-existent agents; the proposed // dir is supposed to be missing. if matches!(a.kind, hive_sh4re::ApprovalKind::Spawn) { return true; } if Coordinator::agent_proposed_dir(&a.agent).exists() { true } else { let _ = coord.approvals.mark_failed(a.id, "agent state dir missing"); tracing::info!(id = a.id, agent = %a.agent, "auto-failed orphan approval"); false } }) .collect() } /// Render a unified diff with per-line CSS classes so the dashboard can /// colour adds / dels / hunk headers / context. Each line becomes a /// `` tagged by its leading character; the wrapping `
` keeps
/// whitespace intact.
fn render_diff_lines(diff: &str) -> String {
    let mut out = String::new();
    for raw in diff.lines() {
        let cls = match raw.as_bytes().first() {
            // file headers (`--- a/...` / `+++ b/...`) come before any
            // line starting with a single `+`/`-`. similar-rs emits them
            // with the doubled prefix.
            _ if raw.starts_with("--- ") => "diff-file",
            _ if raw.starts_with("+++ ") => "diff-file",
            Some(b'@') => "diff-hunk",
            Some(b'+') => "diff-add",
            Some(b'-') => "diff-del",
            _ => "diff-ctx",
        };
        let _ = writeln!(out, "{}", html_escape(raw),);
    }
    out
}

/// Host-side mirror of `hive_ag3nt::login::has_session`. Returns true if the
/// agent's bound `~/.claude/` dir on disk contains any regular file. The
/// dashboard reads this each render so logins driven from the agent web UI
/// (Phase 8 step 4) reflect within one auto-refresh cycle.
fn claude_has_session(dir: &Path) -> bool {
    let Ok(entries) = std::fs::read_dir(dir) else {
        return false;
    };
    entries
        .flatten()
        .any(|e| e.file_type().is_ok_and(|t| t.is_file()))
}

/// Multi-file unified diff between the currently-deployed tree and
/// the proposal for this approval. Runs against the applied repo
/// since the canonical proposal commit lives there (manager-side
/// amendments don't move it). Empty output means proposal == main —
/// a no-op approval.
async fn approval_diff(agent: &str, approval_id: i64) -> String {
    let applied = Coordinator::agent_applied_dir(agent);
    if !applied.join(".git").exists() {
        return format!("(no applied git repo at {})", applied.display());
    }
    let proposal_ref = format!("refs/tags/proposal/{approval_id}");
    match git_diff_main_to(&applied, &proposal_ref).await {
        Ok(s) if s.is_empty() => "(proposal matches currently-deployed tree)".to_owned(),
        Ok(s) => s,
        Err(e) => format!("(error: {e:#})"),
    }
}

async fn git_diff_main_to(applied_dir: &Path, target_ref: &str) -> Result {
    let out = lifecycle::git_command()
        .current_dir(applied_dir)
        .args(["diff", &format!("refs/heads/main..{target_ref}")])
        .output()
        .await
        .with_context(|| format!("spawn `git diff` in {}", applied_dir.display()))?;
    if !out.status.success() {
        anyhow::bail!(
            "git diff main..{target_ref} failed: {}",
            String::from_utf8_lossy(&out.stderr).trim()
        );
    }
    Ok(String::from_utf8_lossy(&out.stdout).into_owned())
}

fn html_escape(s: &str) -> String {
    s.replace('&', "&")
        .replace('<', "<")
        .replace('>', ">")
}