//! Hyperhive dashboard. Lists managed containers (with deep-links to each //! container's web UI), pending approvals (with unified diff vs the applied //! repo, plus approve/deny buttons), and the manager. use std::convert::Infallible; use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; use anyhow::{Context, Result}; use axum::extract::Form; use axum::{ Router, extract::{Path as AxumPath, State}, http::{HeaderMap, StatusCode}, response::{ Html, IntoResponse, Response, sse::{Event, KeepAlive, Sse}, }, routing::{get, post}, }; use hive_sh4re::Approval; use serde::{Deserialize, Serialize}; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::{Stream, StreamExt}; use crate::actions; use crate::container_view::{ContainerView, claude_has_session}; use crate::coordinator::Coordinator; use crate::lifecycle::{self, MANAGER_NAME}; const MANAGER_PORT: u16 = 8000; #[derive(Clone)] struct AppState { coord: Arc, } pub async fn serve(port: u16, coord: Arc) -> Result<()> { let app = Router::new() .route("/", get(serve_index)) .route("/static/dashboard.css", get(serve_css)) .route("/static/app.js", get(serve_app_js)) .route("/favicon.svg", get(serve_favicon)) .route("/api/state", get(api_state)) .route("/approve/{id}", post(post_approve)) .route("/deny/{id}", post(post_deny)) .route("/destroy/{name}", post(post_destroy)) .route("/kill/{name}", post(post_kill)) .route("/restart/{name}", post(post_restart)) .route("/start/{name}", post(post_start)) .route("/rebuild/{name}", post(post_rebuild)) .route("/update-all", post(post_update_all)) .route("/answer-question/{id}", post(post_answer_question)) .route("/cancel-question/{id}", post(post_cancel_question)) .route("/purge-tombstone/{name}", post(post_purge_tombstone)) .route("/api/journal/{name}", get(get_journal)) .route("/api/approval-diff/{id}", get(get_approval_diff)) .route("/api/state-file", get(get_state_file)) .route("/api/reminders", get(api_reminders)) .route("/api/agent/{name}/links", get(get_agent_links)) .route("/cancel-reminder/{id}", post(post_cancel_reminder)) .route("/retry-reminder/{id}", post(post_retry_reminder)) .route("/request-spawn", post(post_request_spawn)) .route("/op-send", post(post_op_send)) .route("/meta-update", post(post_meta_update)) .route("/dashboard/stream", get(dashboard_stream)) .route("/dashboard/history", get(dashboard_history)) .route("/static/hive-fr0nt.js", get(serve_shared_js)) .route("/static/marked.js", get(serve_marked_js)) .with_state(AppState { coord }); let addr = SocketAddr::from(([0, 0, 0, 0], port)); let listener = bind_with_retry(addr).await?; tracing::info!(%port, "dashboard listening"); axum::serve(listener, app).await?; Ok(()) } // --------------------------------------------------------------------------- // Static asset handlers: the dashboard is an SPA. `GET /` returns the // (static) shell; `GET /static/*` serves the CSS + JS app; `GET /api/state` // returns the current snapshot as JSON. The JS app fetches state on load, // re-fetches after every async-form submit, and listens on // `/dashboard/stream` for the unified live event channel. // --------------------------------------------------------------------------- /// `SO_REUSEADDR` bind with retry. Mirrors the per-agent variant in /// `hive-ag3nt::web_ui::bind_with_retry`: hive-c0re restarts also /// race the previous process's socket release, and the retry has no /// attempt cap — capping was the proximate cause of issue #324 /// (silent give-up on a long stale socket). Genuine port collisions /// don't reach this layer (dashboard is bound to a fixed configured /// port, no per-agent hashing), so any persistent `AddrInUse` always /// reflects a recoverable stale socket. WARN for the first dozen /// attempts; INFO after that to avoid spamming the journal during a /// long hold; INFO on eventual success when we did have to retry. async fn bind_with_retry(addr: SocketAddr) -> Result { let mut delay_ms = 250u64; let mut attempts = 0u32; loop { match try_bind(addr) { Ok(l) => { if attempts > 0 { tracing::info!( %addr, attempts, "dashboard: bind succeeded after retry" ); } return Ok(l); } Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => { let attempt = attempts + 1; if attempt <= 12 { tracing::warn!( %addr, attempt, "dashboard: AddrInUse, retrying in {delay_ms}ms" ); } else { tracing::info!( %addr, attempt, "dashboard: AddrInUse still holding, retrying in {delay_ms}ms" ); } tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; attempts += 1; delay_ms = (delay_ms * 2).min(2000); } Err(e) => { return Err(e).with_context(|| format!("bind dashboard on {addr}")); } } } } fn try_bind(addr: SocketAddr) -> std::io::Result { let sock = match addr { SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?, SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?, }; sock.set_reuseaddr(true)?; sock.bind(addr)?; sock.listen(1024) } async fn serve_index() -> impl IntoResponse { Html(include_str!("../assets/index.html")) } async fn serve_css() -> impl IntoResponse { // Prepend the shared palette/typography so per-page styles only need // to declare what's actually page-specific. One HTTP request, no // per-asset cache to invalidate. let body = format!( "{}\n{}\n{}", hive_fr0nt::BASE_CSS, hive_fr0nt::TERMINAL_CSS, include_str!("../assets/dashboard.css"), ); ([("content-type", "text/css")], body) } async fn serve_app_js() -> impl IntoResponse { ( [("content-type", "application/javascript")], include_str!("../assets/app.js"), ) } /// Dashboard favicon — the hyperhive mark. Static: the dashboard /// represents the whole hive, so it always uses the project logo /// (per-agent pages serve their own configurable `/icon` instead). async fn serve_favicon() -> impl IntoResponse { ( [("content-type", "image/svg+xml")], include_str!("../../branding/hyperhive.svg"), ) } async fn serve_shared_js() -> impl IntoResponse { ( [("content-type", "application/javascript")], hive_fr0nt::TERMINAL_JS, ) } /// Vendored `marked` bundle — the side panel renders markdown file /// previews with it. async fn serve_marked_js() -> impl IntoResponse { ( [("content-type", "application/javascript")], hive_fr0nt::MARKED_JS, ) } #[derive(Serialize)] struct StateSnapshot { /// Broker seq at the moment this snapshot was assembled. Clients /// dedupe their buffered SSE traffic against this value: any /// `MessageEvent` with `seq <= snapshot.seq` is already reflected in /// the snapshot (or pre-dates it); anything with `seq > snapshot.seq` /// is post-snapshot and should be applied. Set to 0 in the /// pre-emit case (no events ever fired) — clients treat that as /// "apply everything you've buffered". seq: u64, hostname: String, manager_port: u16, any_stale: bool, containers: Vec, transients: Vec, approvals: Vec, /// Last 30 resolved approvals (approved / denied / failed), newest- /// first. Drives the "history" tab on the approvals section. approval_history: Vec, /// Pending operator-targeted questions (`target IS NULL`). Any /// agent can `ask` the operator and `ask` returns immediately with /// the id; on `/answer-question` we mark the row answered and /// fire `HelperEvent::QuestionAnswered` back into the asker's /// inbox. Peer-to-peer questions live in the same table but never /// surface here (see `OperatorQuestions::pending`). questions: Vec, /// Last 20 answered questions, newest-first. question_history: Vec, /// State dirs (config history + claude creds + /state/ notes) that /// survive after a destroy-without-purge. The operator can re-spawn /// with the same name to resume, or PURG3 to wipe them. tombstones: Vec, /// Sub-agents whose FNV-1a hashed web UI port collides with at /// least one other agent. Operator resolves by renaming. The /// dashboard renders a banner at the top listing each cluster. port_conflicts: Vec, /// Inputs in `meta/flake.lock` the operator can selectively /// `nix flake update`. Hyperhive first, then `agent-` rows. meta_inputs: Vec, /// True while a dashboard-triggered `meta-update` (flake lock bump + /// agent rebuild ripple) is running in the background. Lets a /// client that cold-loads mid-update render the META INPUTS panel's /// disabled "updating…" state; live transitions arrive via the /// `MetaUpdateRunning` event (issue #259). meta_update_running: bool, /// Current state of the global rebuild queue — pending + running /// long-lived ops (rebuild / meta-update / spawn) plus the most /// recent few terminal entries the queue retains for history. /// Live transitions arrive via the `RebuildQueueChanged` event. /// See `rebuild_queue.rs`. rebuild_queue: Vec, /// Whether the hive-forge container is up. When true the dashboard /// links each container's config + each approval's commit into the /// forge's `agent-configs` repos. forge_present: bool, } /// `OpQuestion` + computed `question_refs` / `answer_refs`. Built /// from the snapshot read; the live channel attaches the same /// fields directly on `QuestionAdded` / `QuestionResolved`. #[derive(Serialize)] struct QuestionView { #[serde(flatten)] inner: crate::operator_questions::OpQuestion, #[serde(skip_serializing_if = "Vec::is_empty")] question_refs: Vec, #[serde(skip_serializing_if = "Vec::is_empty")] answer_refs: Vec, } impl QuestionView { fn from_question(q: crate::operator_questions::OpQuestion) -> Self { let question_refs = scan_validated_paths(&q.question); let answer_refs = q .answer .as_deref() .map(scan_validated_paths) .unwrap_or_default(); Self { inner: q, question_refs, answer_refs, } } } #[derive(Serialize)] struct PortConflict { port: u16, /// All agent names sharing this port (sorted, ≥2 entries). agents: Vec, } #[derive(Serialize, Clone, Debug)] pub(crate) struct TombstoneView { pub name: String, /// Bytes used by the state dir tree. Cheap-ish to compute; let the /// operator know how much they're holding onto. pub state_bytes: u64, /// Mtime (unix seconds) of the state dir; rough "last seen". pub last_seen: i64, pub has_creds: bool, } #[derive(Serialize)] struct TransientView { name: String, kind: &'static str, secs: u64, } #[derive(Serialize)] struct ApprovalHistoryView { id: i64, agent: String, kind: &'static str, /// First 12 chars of the canonical sha (preferred) or /// manager-supplied ref. None for resolved spawn approvals. sha_short: Option, /// `approved` / `denied` / `failed`. status: &'static str, /// Unix seconds. Renders as a relative time on the dashboard. resolved_at: i64, /// Operator-supplied deny reason (for `denied`) or build error /// (for `failed`). None on `approved`. #[serde(skip_serializing_if = "Option::is_none")] note: Option, } #[derive(Serialize)] struct ApprovalView { id: i64, agent: String, kind: &'static str, /// First 12 chars of the `commit_ref`, for `ApplyCommit` only. sha_short: Option, /// Raw unified diff text, for `ApplyCommit` only. The client splits /// on `\n` and per-line classifies (`+` / `-` / `@@` / `--- ` / `+++ ` /// → diff-add / diff-del / diff-hunk / diff-file). Shipping raw /// instead of pre-rendered HTML saves bytes on the wire (no /// per-line `` markup) and removes the only HTML-escape /// surface from the snapshot. diff: Option, /// Manager-supplied description shown on the approval card. #[serde(skip_serializing_if = "Option::is_none")] description: Option, /// Unix seconds the approval was queued. Rendered as a relative /// time on the card so the operator can spot a stale request. (#272) requested_at: i64, } /// Replace silent `.unwrap_or_default()` on the data sources behind /// `/api/state` so that whichever query degrades surfaces in journald /// instead of leaving the operator staring at an empty list. The /// dashboard still degrades to a sensible default value; the warn /// is just the diagnostic breadcrumb the old code swallowed. fn log_default(what: &str, result: std::result::Result) -> T where T: Default, E: std::fmt::Debug, { match result { Ok(v) => v, Err(e) => { tracing::warn!(target: "api_state", source = %what, error = ?e, "snapshot source failed; using default"); T::default() } } } async fn api_state(headers: HeaderMap, State(state): State) -> axum::Json { let host = headers .get("host") .and_then(|h| h.to_str().ok()) .unwrap_or("localhost"); let hostname = host.split(':').next().unwrap_or(host).to_owned(); // Capture the unified dashboard-channel seq *before* any read so the // dedupe contract is "events with seq > snapshot.seq are // post-snapshot, never missed." An event landing during snapshot // construction may be doubly applied (snapshot caught the write + // client also applies the SSE frame) — that's a renderer's problem // to make idempotent, not ours to avoid here. let seq = state.coord.current_seq(); // Refresh the coordinator's cached container snapshot before // reading. Cold-load clients then see whatever the latest rescan // produced; live clients converge via the matching // `ContainerStateChanged` / `ContainerRemoved` events the rescan // emits. state.coord.rescan_containers_and_emit().await; let containers = state.coord.containers_snapshot().await; let any_stale = containers.iter().any(|c| c.needs_update); let transient_snapshot = state.coord.transient_snapshot(); let pending_approvals = gc_orphans( &state.coord, log_default("approvals.pending", state.coord.approvals.pending()), ); let transients = build_transient_views(&containers, &transient_snapshot); let approvals = build_approval_views(pending_approvals).await; let approval_history = log_default( "approvals.recent_resolved", state.coord.approvals.recent_resolved(30), ) .into_iter() .map(history_view) .collect(); let tombstones = build_tombstone_views(&state.coord, &containers, &transient_snapshot); let port_conflicts = build_port_conflicts(&containers); // operator_inbox used to be served here as a 50-row array; the // dashboard now derives it client-side from the message stream // (terminal backfill + live SSE), so the snapshot stops shipping it. // Both operator-targeted and peer threads now surface on the // dashboard. Client filters by target client-side. Each row is // wrapped in QuestionView so the snapshot carries the same // file_refs the live event variants attach. let questions: Vec = log_default("questions.pending_all", state.coord.questions.pending_all()) .into_iter() .map(QuestionView::from_question) .collect(); let question_history: Vec = log_default( "questions.recent_answered_all", state.coord.questions.recent_answered_all(20), ) .into_iter() .map(QuestionView::from_question) .collect(); axum::Json(StateSnapshot { seq, hostname, manager_port: MANAGER_PORT, any_stale, containers, transients, approvals, approval_history, meta_inputs: read_meta_inputs(), meta_update_running: state.coord.meta_update_in_progress(), questions, question_history, tombstones, port_conflicts, rebuild_queue: state.coord.rebuild_queue.snapshot(), forge_present: crate::forge::is_present().await, }) } /// Group live containers by their assigned web UI port; clusters with /// more than one member are port-hash collisions the operator needs /// to resolve by renaming. Manager (fixed at 8000) and sub-agents /// (8100..8999) can't collide with each other — collisions are /// strictly between sub-agents. fn build_port_conflicts(containers: &[ContainerView]) -> Vec { let mut by_port: std::collections::BTreeMap> = std::collections::BTreeMap::new(); for c in containers { by_port.entry(c.port).or_default().push(c.name.clone()); } by_port .into_iter() .filter(|(_, agents)| agents.len() > 1) .map(|(port, mut agents)| { agents.sort(); PortConflict { port, agents } }) .collect() } #[derive(Serialize, Clone, Debug)] pub(crate) struct MetaInputView { /// Input key in meta's `flake.nix` — `hyperhive`, `agent-`, etc. pub name: String, /// Full locked sha. Not displayed verbatim; the dashboard /// truncates to the first 12 chars for the chip. pub rev: String, /// Unix seconds — `locked.lastModified`. Drives the relative /// "2h ago" timestamp on each input row. pub last_modified: i64, /// `original.url` if available, for the tooltip / row meta text. #[serde(skip_serializing_if = "Option::is_none")] pub url: Option, } /// Walk `flake.lock`'s `nodes` graph from `root` and emit one /// `MetaInputView` per fetched input, at **every** depth. That /// surfaces the direct meta inputs (`hyperhive`, `agent-`), the /// agent flakes' own inputs (`agent-dmatrix/mcp-matrix`, /// `hyperhive/nixpkgs`), and any deeper transitive inputs — so the /// operator can bump any of them individually. Names are /// slash-separated paths from root, the syntax `nix flake update` /// accepts for transitive inputs. /// /// Filtering (see issue #275): /// - Inputs that resolve via a `follows` chain (lock value is an /// array) are skipped — they alias another node, not their own /// fetched derivation, so updating them does nothing. /// - A node is emitted only when it carries a `locked.rev`. /// - Each fetched node is walked exactly once (a `visited` set): /// the lock graph shares nodes (many flakes reference one /// nixpkgs), so without this a shared subtree re-walks per parent /// and a cycle would recurse forever. The result is a spanning /// tree — every input shown once, at its shallowest path. fn read_meta_inputs() -> Vec { let mut out = Vec::new(); let Ok(raw) = std::fs::read_to_string("/var/lib/hyperhive/meta/flake.lock") else { return out; }; let Ok(json) = serde_json::from_str::(&raw) else { return out; }; let Some(nodes) = json.get("nodes").and_then(|v| v.as_object()) else { return out; }; let Some(root_name) = json.get("root").and_then(|v| v.as_str()) else { return out; }; let mut visited = std::collections::HashSet::new(); visited.insert(root_name.to_owned()); walk_meta_inputs(nodes, root_name, "", &mut visited, &mut out); // hyperhive first, then alphabetical. String-sorting the // slash-paths puts every node directly above its own children // (`agent-foo`, `agent-foo/bar`, `agent-foo/bar/baz`), so the // result is a pre-order traversal the tree renderer can consume. out.sort_by(|a, b| match (a.name.as_str(), b.name.as_str()) { ("hyperhive", _) => std::cmp::Ordering::Less, (_, "hyperhive") => std::cmp::Ordering::Greater, _ => a.name.cmp(&b.name), }); out } fn walk_meta_inputs( nodes: &serde_json::Map, node_name: &str, prefix: &str, visited: &mut std::collections::HashSet, out: &mut Vec, ) { let Some(node) = nodes.get(node_name) else { return; }; let Some(inputs_map) = node.get("inputs").and_then(|v| v.as_object()) else { return; }; // Two passes: claim (and emit) every direct input of this node // before descending into any of them. A shallow input that a // deeper flake also references then keeps its shallow path // rather than being captured first by the deep walk. let mut to_recurse: Vec<(String, String)> = Vec::new(); for (alias, target) in inputs_map { // Inputs map value is either a string (node name) or an // array (a `follows` chain). The latter just aliases another // node — we can't `nix flake update` it directly, so skip. let serde_json::Value::String(target_name) = target else { continue; }; // Walk each fetched node once — guards shared subtrees and // cycles, and keeps the panel free of duplicate rows. if !visited.insert(target_name.clone()) { continue; } let Some(target_node) = nodes.get(target_name) else { continue; }; let path = if prefix.is_empty() { alias.clone() } else { format!("{prefix}/{alias}") }; if let Some(rev) = target_node .get("locked") .and_then(|v| v.get("rev")) .and_then(|v| v.as_str()) { let last_modified = target_node .get("locked") .and_then(|v| v.get("lastModified")) .and_then(serde_json::Value::as_i64) .unwrap_or(0); let url = target_node .get("original") .and_then(|v| v.get("url")) .and_then(|v| v.as_str()) .map(str::to_owned); out.push(MetaInputView { name: path.clone(), rev: rev.to_owned(), last_modified, url, }); } to_recurse.push((target_name.clone(), path)); } for (target_name, path) in to_recurse { walk_meta_inputs(nodes, &target_name, &path, visited, out); } } /// Transient state for agents whose container does NOT yet exist /// (`Spawning`). Lifecycle ops on existing containers surface as /// `ContainerView.pending` inline; this list only catches pre-creation. fn build_transient_views( containers: &[ContainerView], transient_snapshot: &std::collections::HashMap, ) -> Vec { transient_snapshot .iter() .filter(|(name, _)| !containers.iter().any(|c| &c.name == *name)) .map(|(name, st)| TransientView { name: name.clone(), kind: transient_label(st.kind), secs: st.since.elapsed().as_secs(), }) .collect() } /// Render each pending approval into its dashboard view (short sha + /// unified diff for `ApplyCommit`, just the name for `Spawn`). /// Project a resolved sqlite row into the lean shape the dashboard /// history tab consumes — no `diff_html` (rendering 30 of them /// per /api/state poll would mean 30 git diffs per refresh). fn history_view(a: Approval) -> ApprovalHistoryView { let displayed = a.fetched_sha.as_deref().unwrap_or(&a.commit_ref); let sha_short = if displayed.is_empty() { None } else { Some(displayed[..displayed.len().min(12)].to_owned()) }; let status = match a.status { hive_sh4re::ApprovalStatus::Approved => "approved", hive_sh4re::ApprovalStatus::Denied => "denied", hive_sh4re::ApprovalStatus::Failed => "failed", // Pending shouldn't appear in recent_resolved, but be defensive. hive_sh4re::ApprovalStatus::Pending => "pending", }; let kind = match a.kind { hive_sh4re::ApprovalKind::ApplyCommit => "apply_commit", hive_sh4re::ApprovalKind::Spawn => "spawn", hive_sh4re::ApprovalKind::InitConfig => "init_config", hive_sh4re::ApprovalKind::UpdateMetaInputs => "update_meta_inputs", }; ApprovalHistoryView { id: a.id, agent: a.agent, kind, sha_short, status, resolved_at: a.resolved_at.unwrap_or(0), note: a.note, } } async fn build_approval_views(approvals: Vec) -> Vec { let mut out = Vec::with_capacity(approvals.len()); for a in approvals { out.push(match a.kind { hive_sh4re::ApprovalKind::ApplyCommit => { // Prefer the canonical fetched sha from applied; // commit_ref is only the manager's claim and may be // amended out from under us. let displayed = a.fetched_sha.as_deref().unwrap_or(&a.commit_ref); let sha = displayed[..displayed.len().min(12)].to_owned(); let diff = approval_diff(&a.agent, a.id).await; ApprovalView { id: a.id, agent: a.agent.clone(), kind: "apply_commit", sha_short: Some(sha), diff: Some(diff), description: a.description, requested_at: a.requested_at, } } hive_sh4re::ApprovalKind::Spawn => ApprovalView { id: a.id, agent: a.agent, kind: "spawn", sha_short: None, diff: None, description: a.description, requested_at: a.requested_at, }, hive_sh4re::ApprovalKind::InitConfig => ApprovalView { id: a.id, agent: a.agent, kind: "init_config", sha_short: None, diff: None, description: a.description, requested_at: a.requested_at, }, hive_sh4re::ApprovalKind::UpdateMetaInputs => ApprovalView { id: a.id, agent: a.agent, kind: "update_meta_inputs", sha_short: None, diff: None, description: a.description, requested_at: a.requested_at, }, }); } out } /// State-dir names that don't appear in the live container list (and /// aren't the manager). Each one surfaces in the dashboard as a row /// with R3V1V3 + PURG3 actions. fn build_tombstone_views( coord: &Coordinator, containers: &[ContainerView], transient_snapshot: &std::collections::HashMap, ) -> Vec { let _ = coord; // kept_state_names is a free fn but takes &self by future plan let live: std::collections::HashSet<&str> = containers .iter() .map(|c| c.name.as_str()) .chain(transient_snapshot.keys().map(String::as_str)) .collect(); Coordinator::kept_state_names() .into_iter() .filter(|name| name != MANAGER_NAME && !live.contains(name.as_str())) .map(|name| { let root = Coordinator::agent_state_root(&name); let state_bytes = dir_size_bytes(&root); let last_seen = std::fs::metadata(&root) .and_then(|m| m.modified()) .ok() .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0); let has_creds = claude_has_session(&Coordinator::agent_claude_dir(&name)); TombstoneView { name, state_bytes, last_seen, has_creds, } }) .collect() } /// Sum the byte size of every regular file under `root`. Cheap to compute /// for typical agent state (config repo + claude creds + notes file — /// usually a few MB); fine to do inline on each /api/state. Returns 0 on /// any error. fn dir_size_bytes(root: &Path) -> u64 { fn walk(p: &Path, acc: &mut u64) { let Ok(rd) = std::fs::read_dir(p) else { return }; for entry in rd.flatten() { let Ok(ft) = entry.file_type() else { continue }; if ft.is_dir() { walk(&entry.path(), acc); } else if ft.is_file() && let Ok(meta) = entry.metadata() { *acc += meta.len(); } } } let mut total = 0u64; walk(root, &mut total); total } async fn dashboard_history(State(state): State) -> Response { // Backfill source for the dashboard terminal. Returns up to ~200 // historical broker messages (no other event kinds are persisted) // converted to `DashboardEvent::Sent` JSON so the client can replay // through the same dispatch path as live frames. Wrapped in // `{ seq, events }`: the seq is the dashboard channel's high-water // mark at fetch time. Clients use it to dedupe their buffered live // SSE traffic (drop anything with `seq <= history_seq`) so a frame // that lands between SSE-subscribe and history-fetch isn't shown // twice and isn't lost. Historical rows carry `seq = 0`; the // boundary seq is what closes the dedupe window. const HISTORY_LIMIT: u64 = 200; let seq = state.coord.current_seq(); match state.coord.broker.recent_all(HISTORY_LIMIT) { Ok(mut messages) => { messages.reverse(); let events: Vec = messages .into_iter() .map(|m| match m { crate::broker::MessageEvent::Sent { id, from, to, body, at, in_reply_to } => { let file_refs = scan_validated_paths(&body); crate::dashboard_events::DashboardEvent::Sent { seq: 0, id, from, to, body, at, in_reply_to, file_refs, } } crate::broker::MessageEvent::Delivered { id, from, to, body, at, in_reply_to } => { let file_refs = scan_validated_paths(&body); crate::dashboard_events::DashboardEvent::Delivered { seq: 0, id, from, to, body, at, in_reply_to, file_refs, } } }) .collect(); axum::Json(serde_json::json!({ "seq": seq, "events": events })).into_response() } Err(e) => error_response(&format!("dashboard/history failed: {e:#}")), } } async fn dashboard_stream( State(state): State, ) -> Sse>> { let rx = state.coord.dashboard_subscribe(); let stream = BroadcastStream::new(rx).filter_map(|res| { // Drop lagged frames. Browsers reconnect; the seq dedupe on // reconnect skips any frame already reflected in the snapshot. let event = res.ok()?; let json = serde_json::to_string(&event).ok()?; Some(Ok(Event::default().data(json))) }); Sse::new(stream).keep_alive(KeepAlive::default()) } async fn post_approve(State(state): State, AxumPath(id): AxumPath) -> Response { match actions::approve(state.coord.clone(), id).await { // 200 instead of 303 — `actions::approve` fires // `ApprovalResolved` (success path) or the eventual failure // event, both of which the dashboard's derived store applies // live. The matching form carries `data-no-refresh`. Ok(()) => (StatusCode::OK, "ok").into_response(), Err(e) => error_response(&format!("approve {id} failed: {e:#}")), } } #[derive(Deserialize, Default)] struct DenyForm { #[serde(default)] note: Option, } async fn post_deny( State(state): State, AxumPath(id): AxumPath, Form(form): Form, ) -> Response { let note = form .note .as_deref() .map(str::trim) .filter(|s| !s.is_empty()); match actions::deny(&state.coord, id, note).await { Ok(()) => (StatusCode::OK, "ok").into_response(), Err(e) => error_response(&format!("deny {id} failed: {e:#}")), } } #[derive(Deserialize)] struct RequestSpawnForm { name: String, } #[derive(Deserialize)] struct AnswerForm { answer: String, } /// Attach a permissive CORS header so the per-agent web UI — served on /// a different port — can POST an operator answer here and read the /// result. The dashboard has no auth, so `*` exposes nothing a plain /// cross-origin form-POST couldn't already reach. This shim disappears /// once the unifying gateway makes the agent page same-origin; see /// `docs/boundary.md`. fn with_cors(mut resp: Response) -> Response { resp.headers_mut().insert( axum::http::header::ACCESS_CONTROL_ALLOW_ORIGIN, axum::http::HeaderValue::from_static("*"), ); resp } async fn post_answer_question( State(state): State, AxumPath(id): AxumPath, Form(form): Form, ) -> Response { let answer = form.answer.trim(); if answer.is_empty() { return with_cors(error_response("answer: required")); } let resp = match state .coord .questions .answer(id, answer, hive_sh4re::OPERATOR_RECIPIENT) { Ok((question, asker, target)) => { tracing::info!(%id, %asker, "operator answered question"); state.coord.notify_agent( &asker, &hive_sh4re::HelperEvent::QuestionAnswered { id, question, answer: answer.to_owned(), answerer: hive_sh4re::OPERATOR_RECIPIENT.to_owned(), }, ); state.coord.emit_question_resolved( id, answer, hive_sh4re::OPERATOR_RECIPIENT, false, target.as_deref(), ); (StatusCode::OK, "ok").into_response() } Err(e) => error_response(&format!("answer {id} failed: {e:#}")), }; with_cors(resp) } /// Resolve a pending operator question with a sentinel answer when /// the operator decides not to / can't answer. The asker harness /// receives a `QuestionAnswered` event with `answer = "[cancelled]"` /// so it can fall back on whatever default it had. Same code path as /// a real answer — just lets the operator close the loop instead of /// letting the question dangle forever. async fn post_cancel_question( State(state): State, AxumPath(id): AxumPath, ) -> Response { const SENTINEL: &str = "[cancelled]"; match state .coord .questions .answer(id, SENTINEL, hive_sh4re::OPERATOR_RECIPIENT) { Ok((question, asker, target)) => { tracing::info!(%id, %asker, "operator cancelled question"); state.coord.emit_question_resolved( id, SENTINEL, hive_sh4re::OPERATOR_RECIPIENT, true, target.as_deref(), ); state.coord.notify_agent_from( hive_sh4re::OPERATOR_RECIPIENT, &asker, &hive_sh4re::HelperEvent::QuestionAnswered { id, question, answer: SENTINEL.to_owned(), answerer: hive_sh4re::OPERATOR_RECIPIENT.to_owned(), }, ); (StatusCode::OK, "ok").into_response() } Err(e) => error_response(&format!("cancel-question {id} failed: {e:#}")), } } #[derive(Deserialize)] struct JournalQuery { /// Optional systemd unit filter — e.g. `hive-ag3nt.service`. When /// omitted, returns the full machine journal. #[serde(default)] unit: Option, /// Number of trailing lines to return. Capped at 5000. #[serde(default)] lines: Option, } /// Shell out to `journalctl -M -b` and return its text /// output. Operator-only by virtue of the dashboard being host-bound; /// hive-c0re already runs as root in its systemd unit so journalctl /// has the access it needs. async fn get_journal( AxumPath(name): AxumPath, axum::extract::Query(q): axum::extract::Query, ) -> Response { // Validate the container name against the list of managed // containers so we don't shell out with arbitrary input. let container = strip_container_prefix(&name); let prefixed = if container == lifecycle::MANAGER_NAME { container.clone() } else { format!("{}{container}", lifecycle::AGENT_PREFIX) }; let live = lifecycle::list().await.unwrap_or_default(); if !live.iter().any(|c| c == &prefixed) { return error_response(&format!("journal: no managed container {prefixed:?}")); } let lines = q.lines.unwrap_or(500).min(5000); let mut cmd = tokio::process::Command::new("journalctl"); cmd.args([ "-M", &prefixed, "-b", "--no-pager", "--output=short-iso", "--lines", ]) .arg(lines.to_string()); if let Some(u) = q.unit.as_deref().filter(|s| !s.is_empty()) { // accept hive-ag3nt[.service] / hive-m1nd[.service] — anything // else we refuse, again to keep the shell-out tight. let allowed = ["hive-ag3nt.service", "hive-m1nd.service"]; let unit = if u.ends_with(".service") { u.to_owned() } else { format!("{u}.service") }; if !allowed.contains(&unit.as_str()) { return error_response(&format!("journal: unknown unit {unit:?}")); } cmd.args(["-u", &unit]); } match cmd.output().await { Ok(out) => { // Combine stdout + stderr — journalctl emits to both on errors. let mut body = String::from_utf8_lossy(&out.stdout).into_owned(); if !out.status.success() { body.push_str("\n--- stderr ---\n"); body.push_str(&String::from_utf8_lossy(&out.stderr)); } ([("content-type", "text/plain; charset=utf-8")], body).into_response() } Err(e) => error_response(&format!("journalctl spawn: {e}")), } } #[derive(Deserialize)] struct StateFileQuery { path: String, } /// Bounded-size read of a file under one of two allow-listed /// roots: `/var/lib/hyperhive/agents//state/` (per-agent durable /// notes — the only writable path agents have outside their /// container) and `/var/lib/hyperhive/shared/` (shared docs). Both /// path forms are accepted: /// - canonical host: `/var/lib/hyperhive/agents/alice/state/foo.md` /// - container view: `/agents/alice/state/foo.md` /// - shared: `/shared/foo.md` /// /// `/state/...` on its own is *not* accepted — the in-container /// mount is ambiguous from the host's perspective (we don't know /// which agent's `/state` it refers to) and using it would silently /// resolve to the wrong file. /// /// Path is canonicalised before the allow-list check so `..` /// traversal and symlink games can't escape the roots. Files larger /// than `MAX_BYTES` are truncated with a banner so a runaway log /// can't OOM the browser. /// Resolve a caller-supplied path string to a canonical host path /// that has been verified against the allow-list. Returns `Err` /// with a human-readable reason for every failure mode (path /// outside roots, canonicalize failure, escape via symlink, /// per-agent subdir not `state`, symlink anywhere below the root, /// file not world-readable). Shared by `get_state_file` (read) and /// `scan_validated_paths` (linkify candidates in message bodies) /// so both apply identical security rules and the linkifier /// doesn't render a path the reader will refuse to serve. /// /// Defense-in-depth layers (in order): /// 1. Caller-supplied prefix has to match the allow-list (agents/ /// or shared/), else reject without touching the fs. /// 2. No symlinks below the matched root. Walked pre-canonicalize /// via `symlink_metadata` on each component so a sub-agent that /// plants `ln -s /var/lib/hyperhive/agents/other/state/secret /// /agents/me/state/peek` can't proxy a different agent's file /// through this endpoint (canonicalize would happily resolve /// the symlink to a path inside the allow-list). /// 3. Canonicalize is run anyway as a belt-and-braces check — /// resolves `..`/`.` traversal and rejects if the result /// escapes the roots. /// 4. Under `AGENTS_ROOT`, the second path component must be /// `state/` — agents' applied/proposed git repos and config dirs /// are off-limits. /// 5. The target's metadata is fetched once and returned to the /// caller so they don't restat. If the target is a regular /// file it must be world-readable (mode & 0o004); a 0600 file /// inside `state/` could leak through this endpoint to anyone /// holding the dashboard URL otherwise. fn resolve_state_path( raw: &str, ) -> std::result::Result<(std::path::PathBuf, std::fs::Metadata), String> { use std::os::unix::fs::PermissionsExt as _; const AGENTS_ROOT: &str = "/var/lib/hyperhive/agents"; const SHARED_ROOT: &str = "/var/lib/hyperhive/shared"; let raw = raw.trim(); let (mapped, root): (std::path::PathBuf, &str) = if let Some(rest) = raw.strip_prefix("/agents/") { ( std::path::PathBuf::from(format!("{AGENTS_ROOT}/{rest}")), AGENTS_ROOT, ) } else if let Some(rest) = raw.strip_prefix("/shared/") { ( std::path::PathBuf::from(format!("{SHARED_ROOT}/{rest}")), SHARED_ROOT, ) } else if let Some(rest) = raw.strip_prefix(&format!("{AGENTS_ROOT}/")) { ( std::path::PathBuf::from(format!("{AGENTS_ROOT}/{rest}")), AGENTS_ROOT, ) } else if let Some(rest) = raw.strip_prefix(&format!("{SHARED_ROOT}/")) { ( std::path::PathBuf::from(format!("{SHARED_ROOT}/{rest}")), SHARED_ROOT, ) } else { return Err(format!("path not in allow-list: {raw}")); }; reject_symlinks_below(std::path::Path::new(root), &mapped)?; let canonical = std::fs::canonicalize(&mapped) .map_err(|e| format!("{}: {e}", mapped.display()))?; if !(canonical.starts_with(AGENTS_ROOT) || canonical.starts_with(SHARED_ROOT)) { return Err(format!( "resolved path escapes allow-list: {}", canonical.display() )); } if let Ok(rel) = canonical.strip_prefix(AGENTS_ROOT) { let mut components = rel.components(); let _agent = components.next(); let dir = components.next().and_then(|c| c.as_os_str().to_str()); if dir != Some("state") { return Err(format!( "only per-agent state/ is readable here ({} dir not allowed)", dir.unwrap_or("(root)") )); } } let meta = std::fs::metadata(&canonical) .map_err(|e| format!("stat {}: {e}", canonical.display()))?; if meta.is_file() { let mode = meta.permissions().mode(); if mode & 0o004 == 0 { return Err(format!( "{} not world-readable (mode 0{:o}); refusing to proxy non-public file", canonical.display(), mode & 0o777, )); } } Ok((canonical, meta)) } /// Walk every path component under `root` and refuse if any of /// them is a symlink. The roots themselves (`AGENTS_ROOT`, /// `SHARED_ROOT`) are hive-c0re-owned and assumed trusted; only /// the parts the agent / operator can plant matter. Components /// that don't exist yet are skipped — `canonicalize` reports /// non-existence separately, and missing-component checks would /// just race the filesystem. fn reject_symlinks_below( root: &std::path::Path, mapped: &std::path::Path, ) -> std::result::Result<(), String> { let Ok(rel) = mapped.strip_prefix(root) else { return Ok(()); }; let mut cumulative = root.to_path_buf(); for component in rel.components() { match component { std::path::Component::Normal(name) => { cumulative.push(name); match std::fs::symlink_metadata(&cumulative) { Ok(m) if m.file_type().is_symlink() => { return Err(format!( "symlink at {} not allowed (canonicalize would resolve it past the \ allow-list check; refuse outright)", cumulative.display() )); } Ok(_) | Err(_) => {} } } std::path::Component::ParentDir => { return Err(format!( "path contains `..` traversal below {}; refuse outright", root.display() )); } _ => {} } } Ok(()) } #[cfg(test)] mod tests { use super::*; use std::os::unix::fs::symlink; /// Make a unique tmp subdir for the calling test. Caller is responsible /// for cleanup (we leak on panic, fine for ephemeral CI runs). fn tmproot(tag: &str) -> std::path::PathBuf { let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_nanos()) .unwrap_or(0); let p = std::env::temp_dir().join(format!("hyperhive-test-{tag}-{ts}")); std::fs::create_dir_all(&p).unwrap(); p } #[test] fn reject_symlinks_below_accepts_plain_dirs_and_files() { let root = tmproot("symlink-ok"); std::fs::create_dir_all(root.join("alice/state")).unwrap(); std::fs::write(root.join("alice/state/notes.md"), b"hi").unwrap(); assert!(reject_symlinks_below(&root, &root.join("alice/state/notes.md")).is_ok()); } #[test] fn reject_symlinks_below_rejects_leaf_symlink() { let root = tmproot("symlink-leaf"); std::fs::create_dir_all(root.join("alice/state")).unwrap(); // Plant a symlink that points anywhere; resolve_state_path's // canonicalize would happily resolve it past the allow-list // check, so we have to refuse at the un-canonical layer. symlink("/etc/shadow", root.join("alice/state/peek")).unwrap(); let err = reject_symlinks_below(&root, &root.join("alice/state/peek")).unwrap_err(); assert!(err.contains("symlink at"), "msg = {err}"); assert!(err.contains("peek"), "msg = {err}"); } #[test] fn reject_symlinks_below_rejects_directory_symlink_in_middle() { let root = tmproot("symlink-mid"); std::fs::create_dir_all(root.join("real/state")).unwrap(); std::fs::write(root.join("real/state/secret.md"), b"hi").unwrap(); // alice's "state" dir is actually a symlink to real/state — a // sub-agent shouldn't be able to plant this and proxy real's // private files via the dashboard. std::fs::create_dir_all(root.join("alice")).unwrap(); symlink(root.join("real/state"), root.join("alice/state")).unwrap(); let err = reject_symlinks_below(&root, &root.join("alice/state/secret.md")).unwrap_err(); assert!(err.contains("symlink at"), "msg = {err}"); } #[test] fn reject_symlinks_below_rejects_parent_dir_traversal() { let root = tmproot("symlink-dotdot"); // `..` doesn't survive canonicalize anyway, but we want a // friendlier error than "path escapes allow-list" — refusing // upfront also avoids walking ancestors with `symlink_metadata`. let p = root.join("alice/state/../escape"); let err = reject_symlinks_below(&root, &p).unwrap_err(); assert!(err.contains("`..`"), "msg = {err}"); } #[test] fn reject_symlinks_below_passes_through_when_path_not_under_root() { // resolve_state_path's earlier allow-list check would reject // this; reject_symlinks_below stays a no-op so the caller // surfaces the better-fit error. let root = std::path::Path::new("/var/lib/hyperhive/agents"); assert!(reject_symlinks_below(root, std::path::Path::new("/etc/shadow")).is_ok()); } } /// Snapshot the current tombstone list and emit a /// `TombstonesChanged` event. Call after any mutation that could /// add or remove a tombstone (`actions::destroy`, /// `post_purge_tombstone`, spawn finalisation). Cheap — the list /// is tiny. pub(crate) async fn emit_tombstones_snapshot(coord: &Arc) { let containers = coord.containers_snapshot().await; let transient_snapshot = coord.transient_snapshot(); let tombstones = build_tombstone_views(coord, &containers, &transient_snapshot); coord.emit_dashboard_event( crate::dashboard_events::DashboardEvent::TombstonesChanged { seq: coord.next_seq(), tombstones, }, ); } /// Snapshot meta/flake.lock's root inputs + emit /// `MetaInputsChanged`. Call after any mutation that bumps a lock /// (`run_meta_update`, `auto_update::rebuild_agent`). pub(crate) fn emit_meta_inputs_snapshot(coord: &Coordinator) { let inputs = read_meta_inputs(); coord.emit_dashboard_event( crate::dashboard_events::DashboardEvent::MetaInputsChanged { seq: coord.next_seq(), inputs, }, ); } /// Scan `body` for path-shaped tokens, validate each against the /// allow-list, return the unique set of tokens that resolve to a /// regular file. Called at broker-message ingest time so the /// dashboard event already carries the verified set — no client- /// side probe endpoint required, and historical messages get the /// same treatment on `/dashboard/history` backfill. /// /// Tokenisation: split on whitespace + a handful of trailing /// punctuation chars (`,;:)]}`) that commonly follow paths in /// natural-language text but aren't part of the path itself. Any /// token starting with `/agents/`, `/shared/`, or /// `/var/lib/hyperhive/{agents,shared}/` is a candidate. The /// allow-list + `is_file` check happens via the same /// `resolve_state_path` helper the read endpoint uses, so the /// security rules can't drift. pub(crate) fn scan_validated_paths(body: &str) -> Vec { const PREFIXES: [&str; 4] = [ "/agents/", "/shared/", "/var/lib/hyperhive/agents/", "/var/lib/hyperhive/shared/", ]; let mut out = Vec::::new(); for raw in body.split(|c: char| c.is_whitespace()) { // Trim trailing natural-language punctuation that wouldn't // be part of any real path. Inline rather than via a regex // dep — the set is small and the call is hot. let token = raw.trim_end_matches([',', ';', ':', ')', ']', '}', '.', '\'', '"']); if token.is_empty() { continue; } if !PREFIXES.iter().any(|p| token.starts_with(p)) { continue; } // Cheap dedupe — typical message has 0-3 refs. if out.iter().any(|s| s == token) { continue; } if let Ok((_canonical, meta)) = resolve_state_path(token) && meta.is_file() { out.push(token.to_owned()); } } out } async fn get_state_file( axum::extract::Query(q): axum::extract::Query, ) -> Response { const MAX_BYTES: usize = 1 << 20; // 1 MiB let (canonical, meta) = match resolve_state_path(&q.path) { Ok(pair) => pair, Err(e) => return error_response(&format!("state-file: {e}")), }; if !meta.is_file() { return error_response(&format!( "state-file: {} is not a regular file", canonical.display() )); } let size = meta.len(); let bytes = match std::fs::read(&canonical) { Ok(b) => b, Err(e) => return error_response(&format!("state-file: read {}: {e}", canonical.display())), }; // Raster images: serve the raw bytes with their real content-type // so the dashboard can render them in an . Not truncated — // a clipped binary is corrupt, so over-cap images are rejected // instead. (SVG stays on the text path: it's text, and the client // renders it via a data: URI.) if let Some(ct) = image_content_type(&canonical) { if bytes.len() > MAX_BYTES { return error_response(&format!( "state-file: image {} is {size} bytes, over the {MAX_BYTES}-byte preview cap", canonical.display() )); } return ([("content-type", ct)], bytes).into_response(); } let truncated = bytes.len() > MAX_BYTES; let body_bytes = if truncated { &bytes[..MAX_BYTES] } else { &bytes[..] }; let mut body = String::from_utf8_lossy(body_bytes).into_owned(); if truncated { use std::fmt::Write as _; let _ = write!(body, "\n\n--- truncated at {MAX_BYTES} of {size} bytes ---\n"); } ([("content-type", "text/plain; charset=utf-8")], body).into_response() } /// Content-type for a raster image the dashboard can preview in an /// ``, keyed off the file extension. `None` for non-image, SVG, /// and text files (SVG is served on the text path and rendered /// client-side via a `data:` URI). fn image_content_type(path: &Path) -> Option<&'static str> { let ext = path.extension()?.to_str()?.to_ascii_lowercase(); Some(match ext.as_str() { "png" => "image/png", "jpg" | "jpeg" => "image/jpeg", "gif" => "image/gif", "webp" => "image/webp", "bmp" => "image/bmp", "ico" => "image/x-icon", "avif" => "image/avif", _ => return None, }) } async fn api_reminders(State(state): State) -> Response { match state.coord.broker.list_pending_reminders() { Ok(rows) => axum::Json(rows).into_response(), Err(e) => error_response(&format!("reminders: {e:#}")), } } /// Same-origin proxy that fetches the named agent's /// `GET /api/state` and forwards only the `links` field to the /// dashboard JS (issue #262). Lets the agent backend stay the /// single source of truth for its own nav links: the dashboard /// card's icon-only strip and the per-agent page's labelled row /// render the same list, no shared Rust wire type required, no /// CORS surface on the agent side. /// /// Failure modes (agent down, slow response, malformed JSON) all /// degrade to an empty list so the dashboard still renders. async fn get_agent_links(AxumPath(name): AxumPath) -> Response { let port = lifecycle::agent_web_port(&name); let url = format!("http://127.0.0.1:{port}/api/state"); let client = match reqwest::Client::builder() .timeout(std::time::Duration::from_secs(2)) .build() { Ok(c) => c, Err(e) => { tracing::warn!(%name, error = %e, "agent-links proxy: client build failed"); return axum::Json(serde_json::json!([])).into_response(); } }; match client.get(&url).send().await { Ok(resp) if resp.status().is_success() => match resp.json::().await { Ok(body) => { let links = body.get("links").cloned().unwrap_or_else(|| serde_json::json!([])); axum::Json(links).into_response() } Err(e) => { tracing::debug!(%name, error = %e, "agent-links proxy: response parse failed"); axum::Json(serde_json::json!([])).into_response() } }, Ok(resp) => { tracing::debug!(%name, status = %resp.status(), "agent-links proxy: non-2xx"); axum::Json(serde_json::json!([])).into_response() } Err(e) => { tracing::debug!(%name, error = %e, "agent-links proxy: fetch failed"); axum::Json(serde_json::json!([])).into_response() } } } async fn post_cancel_reminder( State(state): State, AxumPath(id): AxumPath, ) -> Response { match state.coord.broker.cancel_reminder(id) { Ok(0) => error_response(&format!("reminder {id} not pending (already delivered?)")), Ok(_) => { tracing::info!(%id, "operator cancelled reminder"); (StatusCode::OK, "ok").into_response() } Err(e) => error_response(&format!("cancel reminder {id} failed: {e:#}")), } } /// Reset a pending reminder's failure state so the scheduler /// retries it on the next tick. Useful when the failure was /// transient (sqlite lock contention, disk full → freed up) and /// the operator wants delivery to resume immediately instead of /// the row sitting in attempt-count-capped purgatory. async fn post_retry_reminder( State(state): State, AxumPath(id): AxumPath, ) -> Response { match state.coord.broker.reset_reminder_failure(id) { Ok(0) => error_response(&format!("reminder {id} not pending (already delivered?)")), Ok(_) => { tracing::info!(%id, "operator reset reminder failure for retry"); (StatusCode::OK, "ok").into_response() } Err(e) => error_response(&format!("retry reminder {id} failed: {e:#}")), } } async fn post_purge_tombstone( State(state): State, AxumPath(name): AxumPath, ) -> Response { if name == lifecycle::MANAGER_NAME { return error_response("refusing to purge the manager's state"); } // Sanity: refuse to purge if a live container still exists with this // name. The dashboard already filters tombstones to non-live names, // but the operator could send a stale POST. let live = lifecycle::list().await.unwrap_or_default(); if live .iter() .any(|c| c == &format!("{}{name}", lifecycle::AGENT_PREFIX) || c == &name) { return error_response(&format!( "refusing to purge {name}: container still exists — use DESTR0Y first" )); } let mut errors = Vec::new(); for dir in [ Coordinator::agent_state_root(&name), Coordinator::agent_applied_dir(&name), ] { if dir.exists() && let Err(e) = std::fs::remove_dir_all(&dir) { errors.push(format!("{}: {e}", dir.display())); } } let _ = state .coord .approvals .fail_pending_for_agent(&name, "agent state purged"); if errors.is_empty() { tracing::info!(%name, "tombstone purged"); // Fire the post-purge tombstones snapshot so dashboards // drop the row live; matching form carries // `data-no-refresh`. emit_tombstones_snapshot(&state.coord).await; (StatusCode::OK, "ok").into_response() } else { error_response(&format!("purge {name} partial: {}", errors.join(", "))) } } /// Operator-side compose form on the dashboard terminal. Drops a /// message into the broker as `{from: "operator", to, body}`. Same /// shape that per-agent web UIs use via `OperatorMsg`, but here the /// operator picks the recipient explicitly with `@name`. No /// validation that `to` resolves to a known agent — broker accepts /// arbitrary recipients (and the agent's inbox grows whether or not /// they exist, which is fine for spawn-then-greet flows). #[derive(Deserialize)] struct OpSendForm { to: String, body: String, } /// Form for `POST /meta-update`. Inputs ride in as a comma-separated /// list under the `inputs` field — the JS submitter joins the /// checked boxes since axum's `Form` extractor doesn't natively /// decode repeated keys without a helper. #[derive(Deserialize)] struct MetaUpdateForm { inputs: String, } /// Bulk-update selected meta flake inputs, then rebuild the affected /// agents in the background. Idempotent w.r.t. selection — choosing /// an input that's already at the latest sha is a no-op (no commit, /// no rebuild ripple). Returns immediately after queueing the work; /// dashboard polls for progress via container `pending` spinners + /// the meta-inputs row sha update. async fn post_meta_update( State(state): State, Form(form): Form, ) -> Response { let inputs: Vec = form .inputs .split(',') .map(|s| s.trim().to_owned()) .filter(|s| !s.is_empty()) .collect(); if inputs.is_empty() { return error_response("meta-update: no inputs selected"); } state.coord.rebuild_queue.enqueue_with_inputs( crate::rebuild_queue::QueueKind::MetaUpdate, "hyperhive".to_owned(), crate::rebuild_queue::QueueSource::Manual, format!("meta-update via dashboard ({})", inputs.join(", ")), None, inputs, ); state.coord.emit_rebuild_queue_snapshot(); (StatusCode::OK, "ok").into_response() } async fn post_op_send(State(state): State, Form(form): Form) -> Response { let to = form.to.trim().to_owned(); let body = form.body.trim().to_owned(); if to.is_empty() { return error_response("op-send: `to` required"); } if body.is_empty() { return error_response("op-send: `body` required"); } if to == "*" { let errors = state .coord .broadcast_send(hive_sh4re::OPERATOR_RECIPIENT, &body); if !errors.is_empty() { return error_response(&format!("op-send broadcast partial fail: {}", errors.join("; "))); } } else if let Err(e) = state.coord.broker.send(&hive_sh4re::Message { from: hive_sh4re::OPERATOR_RECIPIENT.to_owned(), to: to.clone(), body, in_reply_to: None, }) { return error_response(&format!("op-send to {to} failed: {e:#}")); } // 200 instead of 303 → the client doesn't refetch /api/state. The // broker `send` already emitted a `MessageEvent` which the // dashboard channel forwarder mirrors as `DashboardEvent::Sent`, // and the page's terminal + inbox derive from that stream — so the // operator's send shows up the same way an agent's send does, with // no full-state refresh in between. (axum::http::StatusCode::OK, "ok").into_response() } async fn post_request_spawn( State(state): State, Form(form): Form, ) -> Response { let name = form.name.trim().to_owned(); if name.is_empty() { return error_response("spawn: `name` required"); } match state .coord .approvals .submit_kind(&name, hive_sh4re::ApprovalKind::Spawn, "", None) { Ok(id) => { tracing::info!(%id, %name, "operator: spawn approval queued via dashboard"); // Phase 5b: notify the dashboard event channel so live // subscribers can append the row without a snapshot // refetch. Spawn approvals carry no diff/sha. state .coord .emit_approval_added(id, &name, "spawn", None, None, None); (StatusCode::OK, "ok").into_response() } Err(e) => error_response(&format!("request-spawn {name} failed: {e:#}")), } } async fn post_rebuild(State(state): State, AxumPath(name): AxumPath) -> Response { let logical = strip_container_prefix(&name); state.coord.rebuild_queue.enqueue( crate::rebuild_queue::QueueKind::Rebuild, logical, crate::rebuild_queue::QueueSource::Manual, "manual via dashboard ↻ R3BU1LD button".to_owned(), None, ); state.coord.emit_rebuild_queue_snapshot(); (StatusCode::OK, "ok").into_response() } /// Common shape for the simple lifecycle action handlers (start / /// stop / restart / rebuild): strip the container prefix, mark /// transient for the duration so the dashboard can spinner, run the /// lifecycle op, clear transient, redirect on success or surface the /// error. `verb` only appears in the error message; `extra` runs on /// success after `clear_transient` for handlers that need follow-up /// (e.g. `kill` also unregisters the agent + fires `HelperEvent`). async fn lifecycle_action( state: &AppState, name: &str, kind: crate::coordinator::TransientKind, verb: &str, body: F, extra: impl FnOnce(&AppState, &str), ) -> Response where F: FnOnce(String) -> Fut, Fut: std::future::Future>, { let logical = strip_container_prefix(name); let guard = state.coord.transient_guard(&logical, kind); let result = body(logical.clone()).await; drop(guard); match result { Ok(()) => { extra(state, &logical); // Rescan so the running/needs_login/needs_update flip on // the affected row lands on every dashboard's SSE channel // without waiting for a snapshot poll. 200 + matching // `data-no-refresh` on the form skip the post-submit // /api/state refetch. state.coord.rescan_containers_and_emit().await; (StatusCode::OK, "ok").into_response() } Err(e) => error_response(&format!("{verb} {logical} failed: {e:#}")), } } async fn post_kill(State(state): State, AxumPath(name): AxumPath) -> Response { let logical = strip_container_prefix(&name); if logical == lifecycle::MANAGER_NAME { return error_response("kill: refusing to stop the manager"); } lifecycle_action( &state, &name, crate::coordinator::TransientKind::Stopping, "kill", |n| async move { lifecycle::kill(&n).await }, |s, n| { s.coord.unregister_agent(n); s.coord.notify_manager(&hive_sh4re::HelperEvent::Killed { agent: n.to_owned(), }); }, ) .await } async fn post_restart(State(state): State, AxumPath(name): AxumPath) -> Response { lifecycle_action( &state, &name, crate::coordinator::TransientKind::Restarting, "restart", |n| async move { lifecycle::restart(&n).await }, |s, n| s.coord.kick_agent(n, "container restarted"), ) .await } async fn post_start(State(state): State, AxumPath(name): AxumPath) -> Response { lifecycle_action( &state, &name, crate::coordinator::TransientKind::Starting, "start", |n| async move { lifecycle::start(&n).await }, |s, n| s.coord.kick_agent(n, "container started"), ) .await } async fn post_update_all(State(state): State) -> Response { let containers = lifecycle::list().await.unwrap_or_default(); for container in containers { let logical = if container == lifecycle::MANAGER_NAME { lifecycle::MANAGER_NAME.to_owned() } else if let Some(n) = container.strip_prefix(lifecycle::AGENT_PREFIX) { n.to_owned() } else { continue; }; state.coord.rebuild_queue.enqueue( crate::rebuild_queue::QueueKind::Rebuild, logical, crate::rebuild_queue::QueueSource::Manual, "manual via dashboard 🌀 UPDATE ALL".to_owned(), None, ); } state.coord.emit_rebuild_queue_snapshot(); (StatusCode::OK, "ok").into_response() } fn transient_label(k: crate::coordinator::TransientKind) -> &'static str { use crate::coordinator::TransientKind::{ Destroying, Rebuilding, Restarting, Spawning, Starting, Stopping, }; match k { Spawning => "spawning", Starting => "starting", Stopping => "stopping", Restarting => "restarting", Rebuilding => "rebuilding", Destroying => "destroying", } } /// Convert either a logical name or a container name back to the logical /// name. Sub-agents are `h-foo` → `foo`; manager stays `hm1nd`. fn strip_container_prefix(name: &str) -> String { name.strip_prefix(lifecycle::AGENT_PREFIX) .unwrap_or(name) .to_owned() } #[derive(Deserialize, Default)] struct DestroyForm { #[serde(default)] purge: Option, } async fn post_destroy( State(state): State, AxumPath(name): AxumPath, Form(form): Form, ) -> Response { // Checkbox semantics: any non-empty value (axum sends "on") = purge. let purge = form.purge.as_deref().is_some_and(|v| !v.is_empty()); // `actions::destroy` rescans the container list on success, so the // `ContainerRemoved` event lands before we return 200. The matching // form carries `data-no-refresh`. match actions::destroy(&state.coord, &name, purge).await { Ok(()) => (StatusCode::OK, "ok").into_response(), Err(e) => error_response(&format!("destroy {name} failed: {e:#}")), } } fn error_response(message: &str) -> Response { // Plain text — the JS app surfaces this in an alert(), so HTML // wrapping would just clutter the message. (StatusCode::INTERNAL_SERVER_ERROR, message.to_owned()).into_response() } /// Filter out approvals whose agent state dir was wiped out from under us /// (e.g. by a test script's cleanup). Marks them failed so they fall out of /// `pending` on next render. fn gc_orphans(coord: &Coordinator, approvals: Vec) -> Vec { approvals .into_iter() .filter(|a| { // Spawn and InitConfig approvals are for not-yet-existent agents; // the proposed dir is supposed to be missing. if matches!( a.kind, hive_sh4re::ApprovalKind::Spawn | hive_sh4re::ApprovalKind::InitConfig ) { return true; } if Coordinator::agent_proposed_dir(&a.agent).exists() { true } else { let note = "agent state dir missing"; let _ = coord.approvals.mark_failed(a.id, note); tracing::info!(id = a.id, agent = %a.agent, "auto-failed orphan approval"); let sha_short = a .fetched_sha .as_deref() .map(|s| s[..s.len().min(12)].to_owned()); coord.emit_approval_resolved( a.id, &a.agent, "apply_commit", sha_short, "failed", Some(note.to_owned()), a.description.clone(), ); false } }) .collect() } /// Multi-file unified diff between the currently-deployed tree and /// the proposal for this approval. Runs against the applied repo /// since the canonical proposal commit lives there (manager-side /// amendments don't move it). Empty output means proposal == main — /// a no-op approval. /// /// `pub(crate)` so the manager-socket handler can pre-compute the /// diff once at submission time and embed it in the `ApprovalAdded` /// dashboard event (instead of forcing the dashboard to wait a /// `/api/state` cycle to see the diff for newly-queued approvals). pub(crate) async fn approval_diff(agent: &str, approval_id: i64) -> String { let applied = Coordinator::agent_applied_dir(agent); if !applied.join(".git").exists() { return format!("(no applied git repo at {})", applied.display()); } let proposal_ref = format!("refs/tags/proposal/{approval_id}"); match git_diff_refs(&applied, "refs/heads/main", &proposal_ref).await { Ok(s) if s.is_empty() => "(proposal matches currently-deployed tree)".to_owned(), Ok(s) => s, Err(e) => format!("(error: {e:#})"), } } async fn git_diff_refs(applied_dir: &Path, base_ref: &str, target_ref: &str) -> Result { let out = lifecycle::git_command() .current_dir(applied_dir) .args(["diff", &format!("{base_ref}..{target_ref}")]) .output() .await .with_context(|| format!("spawn `git diff` in {}", applied_dir.display()))?; if !out.status.success() { anyhow::bail!( "git diff {base_ref}..{target_ref} failed: {}", String::from_utf8_lossy(&out.stderr).trim() ); } Ok(String::from_utf8_lossy(&out.stdout).into_owned()) } /// Numeric ids of `/` tags in the applied repo (e.g. /// `proposal/3` → `3`). Unparseable suffixes are skipped. Used to /// resolve the `approved` / `previous` diff bases for an approval. async fn tag_ids(applied_dir: &Path, prefix: &str) -> Vec { let Ok(out) = lifecycle::git_command() .current_dir(applied_dir) .args(["tag", "-l", &format!("{prefix}/*")]) .output() .await else { return Vec::new(); }; if !out.status.success() { return Vec::new(); } let strip = format!("{prefix}/"); String::from_utf8_lossy(&out.stdout) .lines() .filter_map(|l| l.trim().strip_prefix(&strip)) .filter_map(|s| s.parse::().ok()) .collect() } #[derive(Deserialize)] struct DiffBaseQuery { /// `applied` (running tree — default), `approved` (most recent /// earlier approved proposal), or `previous` (the prior queued /// proposal for this agent). base: Option, } /// On-demand unified diff for one `ApplyCommit` approval against a /// chosen base. `applied` = `applied/main` (what's running); /// `approved` = the most recent earlier `approved/` tag (the last /// proposal the operator OK'd, even if its build then failed); /// `previous` = the prior queued `proposal/` (the incremental /// delta when the manager chains proposals). Returns the raw diff /// text — the dashboard classifies lines client-side. async fn get_approval_diff( State(state): State, AxumPath(id): AxumPath, axum::extract::Query(q): axum::extract::Query, ) -> Response { let base = q.base.as_deref().unwrap_or("applied"); let approval = match state.coord.approvals.get(id) { Ok(Some(a)) => a, Ok(None) => return error_response(&format!("approval {id} not found")), Err(e) => return error_response(&format!("approval {id}: {e:#}")), }; if !matches!(approval.kind, hive_sh4re::ApprovalKind::ApplyCommit) { return error_response("spawn approvals carry no commit to diff"); } let applied = Coordinator::agent_applied_dir(&approval.agent); if !applied.join(".git").exists() { return plain_text(format!("(no applied git repo at {})", applied.display())); } let target = format!("refs/tags/proposal/{id}"); let base_ref = match base { "applied" => Some("refs/heads/main".to_owned()), "approved" => { let ids = tag_ids(&applied, "approved").await; ids.into_iter() .filter(|&n| n != id) .max() .map(|n| format!("refs/tags/approved/{n}")) } "previous" => { let ids = tag_ids(&applied, "proposal").await; ids.into_iter() .filter(|&n| n < id) .max() .map(|n| format!("refs/tags/proposal/{n}")) } other => return error_response(&format!("unknown diff base {other:?}")), }; let Some(base_ref) = base_ref else { return plain_text(match base { "approved" => "(no earlier approved proposal to diff against)".to_owned(), _ => "(no previous proposal to diff against)".to_owned(), }); }; match git_diff_refs(&applied, &base_ref, &target).await { Ok(s) if s.is_empty() => plain_text("(identical — no changes vs this base)".to_owned()), Ok(s) => plain_text(s), Err(e) => error_response(&format!("git diff: {e:#}")), } } fn plain_text(body: String) -> Response { (StatusCode::OK, body).into_response() }