hyperhive/hive-c0re/src/dashboard.rs
müde d06b598c56 kick_agent on every rebuild + apply path
agents weren't being woken with the 'you were rebuilt — check
/state/ for notes, --continue intact' system message after
several recent rebuild surfaces:

- auto_update::rebuild_agent — used by the dashboard rebuild
  button, admin-CLI rebuild via lifecycle_action, the startup
  rev-scan, AND the new meta-input update batch loop. kick
  moves *into* rebuild_agent's success arm so all four
  paths benefit. (the dashboard's lifecycle_action extra
  closure was already firing kick — now it's a no-op for the
  rebuild path since rebuild_agent does it.)
- actions::run_apply_commit — apply-commit approve flow built
  + tagged deployed/<id> but never kicked. add kick on
  success with the more specific 'config update applied' hint.
- server.rs::HostRequest::Rebuild — the admin-CLI direct path
  calls lifecycle::rebuild bypassing rebuild_agent. add kick
  on success.

dashboard's restart / start lifecycle_action extras still
kick via their own closures since they don't route through
rebuild_agent. stop / kill / destroy intentionally don't
kick — there's nothing to wake.
2026-05-16 04:20:01 +02:00

1358 lines
48 KiB
Rust

//! Hyperhive dashboard. Lists managed containers (with deep-links to each
//! container's web UI), pending approvals (with unified diff vs the applied
//! repo, plus approve/deny buttons), and the manager.
use std::convert::Infallible;
use std::fmt::Write as _;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, Result};
use axum::extract::Form;
use axum::{
Router,
extract::{Path as AxumPath, State},
http::{HeaderMap, StatusCode},
response::{
Html, IntoResponse, Redirect, Response,
sse::{Event, KeepAlive, Sse},
},
routing::{get, post},
};
use hive_sh4re::Approval;
use serde::{Deserialize, Serialize};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::{Stream, StreamExt};
use crate::actions;
use crate::coordinator::Coordinator;
use crate::lifecycle::{self, AGENT_PREFIX, MANAGER_NAME};
const MANAGER_PORT: u16 = 8000;
#[derive(Clone)]
struct AppState {
coord: Arc<Coordinator>,
}
pub async fn serve(port: u16, coord: Arc<Coordinator>) -> Result<()> {
let app = Router::new()
.route("/", get(serve_index))
.route("/static/dashboard.css", get(serve_css))
.route("/static/app.js", get(serve_app_js))
.route("/api/state", get(api_state))
.route("/approve/{id}", post(post_approve))
.route("/deny/{id}", post(post_deny))
.route("/destroy/{name}", post(post_destroy))
.route("/kill/{name}", post(post_kill))
.route("/restart/{name}", post(post_restart))
.route("/start/{name}", post(post_start))
.route("/rebuild/{name}", post(post_rebuild))
.route("/update-all", post(post_update_all))
.route("/answer-question/{id}", post(post_answer_question))
.route("/cancel-question/{id}", post(post_cancel_question))
.route("/purge-tombstone/{name}", post(post_purge_tombstone))
.route("/api/journal/{name}", get(get_journal))
.route("/api/agent-config/{name}", get(get_agent_config))
.route("/request-spawn", post(post_request_spawn))
.route("/op-send", post(post_op_send))
.route("/meta-update", post(post_meta_update))
.route("/messages/stream", get(messages_stream))
.with_state(AppState { coord });
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = bind_with_retry(addr).await?;
tracing::info!(%port, "dashboard listening");
axum::serve(listener, app).await?;
Ok(())
}
// ---------------------------------------------------------------------------
// Static asset handlers: the dashboard is an SPA. `GET /` returns the
// (static) shell; `GET /static/*` serves the CSS + JS app; `GET /api/state`
// returns the current snapshot as JSON. The JS app fetches state on load,
// re-fetches after every async-form submit, and listens on
// `/messages/stream` for broker traffic.
// ---------------------------------------------------------------------------
/// `SO_REUSEADDR` bind with retry. Mirrors the per-agent variant —
/// hive-c0re restarts also race the previous process's socket release.
async fn bind_with_retry(addr: SocketAddr) -> Result<tokio::net::TcpListener> {
let mut delay_ms = 250u64;
let mut attempts = 0u32;
loop {
match try_bind(addr) {
Ok(l) => return Ok(l),
Err(e) if e.kind() == std::io::ErrorKind::AddrInUse && attempts < 12 => {
tracing::warn!(
%addr, attempt = attempts + 1,
"dashboard: AddrInUse, retrying in {delay_ms}ms"
);
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
attempts += 1;
delay_ms = (delay_ms * 2).min(2000);
}
Err(e) => {
return Err(e).with_context(|| format!("bind dashboard on {addr}"));
}
}
}
}
fn try_bind(addr: SocketAddr) -> std::io::Result<tokio::net::TcpListener> {
let sock = match addr {
SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
};
sock.set_reuseaddr(true)?;
sock.bind(addr)?;
sock.listen(1024)
}
async fn serve_index() -> impl IntoResponse {
Html(include_str!("../assets/index.html"))
}
async fn serve_css() -> impl IntoResponse {
(
[("content-type", "text/css")],
include_str!("../assets/dashboard.css"),
)
}
async fn serve_app_js() -> impl IntoResponse {
(
[("content-type", "application/javascript")],
include_str!("../assets/app.js"),
)
}
#[derive(Serialize)]
struct StateSnapshot {
hostname: String,
manager_port: u16,
any_stale: bool,
containers: Vec<ContainerView>,
transients: Vec<TransientView>,
approvals: Vec<ApprovalView>,
/// Last 30 resolved approvals (approved / denied / failed), newest-
/// first. Drives the "history" tab on the approvals section.
approval_history: Vec<ApprovalHistoryView>,
/// Latest messages addressed to `operator` — surfaces agent replies
/// asynchronously so the operator can see them without watching the
/// live panel during a turn.
operator_inbox: Vec<hive_sh4re::InboxRow>,
/// Pending operator questions (currently only from the manager).
/// `ask_operator` returns immediately with the id; on `/answer-question`
/// we mark the row answered and fire `HelperEvent::OperatorAnswered`
/// into the manager's inbox.
questions: Vec<crate::operator_questions::OpQuestion>,
/// State dirs (config history + claude creds + /state/ notes) that
/// survive after a destroy-without-purge. The operator can re-spawn
/// with the same name to resume, or PURG3 to wipe them.
tombstones: Vec<TombstoneView>,
/// Sub-agents whose FNV-1a hashed web UI port collides with at
/// least one other agent. Operator resolves by renaming. The
/// dashboard renders a banner at the top listing each cluster.
port_conflicts: Vec<PortConflict>,
/// Inputs in `meta/flake.lock` the operator can selectively
/// `nix flake update`. Hyperhive first, then `agent-<n>` rows.
meta_inputs: Vec<MetaInputView>,
}
#[derive(Serialize)]
struct PortConflict {
port: u16,
/// All agent names sharing this port (sorted, ≥2 entries).
agents: Vec<String>,
}
#[derive(Serialize)]
struct TombstoneView {
name: String,
/// Bytes used by the state dir tree. Cheap-ish to compute; let the
/// operator know how much they're holding onto.
state_bytes: u64,
/// Mtime (unix seconds) of the state dir; rough "last seen".
last_seen: i64,
has_creds: bool,
}
#[derive(Serialize)]
#[allow(clippy::struct_excessive_bools)]
struct ContainerView {
/// Logical agent name (no `h-` prefix). Used in action URLs.
name: String,
/// Container name as nixos-container sees it (`h-foo`, `hm1nd`).
container: String,
is_manager: bool,
port: u16,
running: bool,
needs_update: bool,
needs_login: bool,
/// When a lifecycle action is in flight on this container, the kind
/// (`starting`, `stopping`, etc.) so the JS can render a spinner +
/// disable other buttons.
#[serde(skip_serializing_if = "Option::is_none")]
pending: Option<&'static str>,
/// First 12 chars of the sha the meta flake currently has locked
/// for this agent's input. Reflects what's actually deployed; can
/// differ from `applied/<n>/main` only between
/// `meta::prepare_deploy` and `finalize_deploy` (≤ build duration).
#[serde(skip_serializing_if = "Option::is_none")]
deployed_sha: Option<String>,
}
#[derive(Serialize)]
struct TransientView {
name: String,
kind: &'static str,
secs: u64,
}
#[derive(Serialize)]
struct ApprovalHistoryView {
id: i64,
agent: String,
kind: &'static str,
/// First 12 chars of the canonical sha (preferred) or
/// manager-supplied ref. None for resolved spawn approvals.
sha_short: Option<String>,
/// `approved` / `denied` / `failed`.
status: &'static str,
/// Unix seconds. Renders as a relative time on the dashboard.
resolved_at: i64,
/// Operator-supplied deny reason (for `denied`) or build error
/// (for `failed`). None on `approved`.
#[serde(skip_serializing_if = "Option::is_none")]
note: Option<String>,
}
#[derive(Serialize)]
struct ApprovalView {
id: i64,
agent: String,
kind: &'static str,
/// First 12 chars of the `commit_ref`, for `ApplyCommit` only.
sha_short: Option<String>,
/// Pre-rendered syntax-coloured diff HTML, for `ApplyCommit` only.
diff_html: Option<String>,
}
/// Replace silent `.unwrap_or_default()` on the data sources behind
/// `/api/state` so that whichever query degrades surfaces in journald
/// instead of leaving the operator staring at an empty list. The
/// dashboard still degrades to a sensible default value; the warn
/// is just the diagnostic breadcrumb the old code swallowed.
fn log_default<T, E>(what: &str, result: std::result::Result<T, E>) -> T
where
T: Default,
E: std::fmt::Debug,
{
match result {
Ok(v) => v,
Err(e) => {
tracing::warn!(target: "api_state", source = %what, error = ?e, "snapshot source failed; using default");
T::default()
}
}
}
async fn api_state(headers: HeaderMap, State(state): State<AppState>) -> axum::Json<StateSnapshot> {
let host = headers
.get("host")
.and_then(|h| h.to_str().ok())
.unwrap_or("localhost");
let hostname = host.split(':').next().unwrap_or(host).to_owned();
let raw_containers = log_default("nixos-container list", lifecycle::list().await);
let current_rev = crate::auto_update::current_flake_rev(&state.coord.hyperhive_flake);
let transient_snapshot = state.coord.transient_snapshot();
let pending_approvals = gc_orphans(
&state.coord,
log_default("approvals.pending", state.coord.approvals.pending()),
);
let (containers, any_stale) =
build_container_views(&raw_containers, current_rev.as_deref(), &transient_snapshot).await;
let transients = build_transient_views(&raw_containers, &transient_snapshot);
let approvals = build_approval_views(pending_approvals).await;
let approval_history = log_default(
"approvals.recent_resolved",
state.coord.approvals.recent_resolved(30),
)
.into_iter()
.map(history_view)
.collect();
let tombstones = build_tombstone_views(&state.coord, &containers, &transient_snapshot);
let port_conflicts = build_port_conflicts(&containers);
let operator_inbox = log_default(
"broker.recent_for(operator)",
state
.coord
.broker
.recent_for(hive_sh4re::OPERATOR_RECIPIENT, 50),
);
let questions = log_default("questions.pending", state.coord.questions.pending());
axum::Json(StateSnapshot {
hostname,
manager_port: MANAGER_PORT,
any_stale,
containers,
transients,
approvals,
approval_history,
meta_inputs: read_meta_inputs(),
operator_inbox,
questions,
tombstones,
port_conflicts,
})
}
/// Group live containers by their assigned web UI port; clusters with
/// more than one member are port-hash collisions the operator needs
/// to resolve by renaming. Manager (fixed at 8000) and sub-agents
/// (8100..8999) can't collide with each other — collisions are
/// strictly between sub-agents.
fn build_port_conflicts(containers: &[ContainerView]) -> Vec<PortConflict> {
let mut by_port: std::collections::BTreeMap<u16, Vec<String>> =
std::collections::BTreeMap::new();
for c in containers {
by_port.entry(c.port).or_default().push(c.name.clone());
}
by_port
.into_iter()
.filter(|(_, agents)| agents.len() > 1)
.map(|(port, mut agents)| {
agents.sort();
PortConflict { port, agents }
})
.collect()
}
/// Build `ContainerView`s for every live nixos-container. Returns the
/// list and whether any container is stale (drives the "↻ UPD4TE 4LL"
/// banner).
async fn build_container_views(
raw_containers: &[String],
current_rev: Option<&str>,
transient_snapshot: &std::collections::HashMap<String, crate::coordinator::TransientState>,
) -> (Vec<ContainerView>, bool) {
let mut out = Vec::new();
let mut any_stale = false;
let locked = read_meta_locked_revs();
for c in raw_containers {
let (logical, is_manager) = if c == MANAGER_NAME {
(MANAGER_NAME.to_owned(), true)
} else if let Some(n) = c.strip_prefix(AGENT_PREFIX) {
(n.to_owned(), false)
} else {
continue;
};
let needs_update =
current_rev.is_some_and(|rev| crate::auto_update::agent_needs_update(&logical, rev));
if needs_update {
any_stale = true;
}
let needs_login =
!is_manager && !claude_has_session(&Coordinator::agent_claude_dir(&logical));
let pending = transient_snapshot
.get(&logical)
.map(|st| transient_label(st.kind));
let deployed_sha = locked
.get(&format!("agent-{logical}"))
.map(|s| s[..s.len().min(12)].to_owned());
out.push(ContainerView {
port: lifecycle::agent_web_port(&logical),
running: lifecycle::is_running(&logical).await,
container: c.clone(),
name: logical,
is_manager,
needs_update,
needs_login,
pending,
deployed_sha,
});
}
(out, any_stale)
}
/// Map of node name → locked sha for nodes the **root** of meta
/// directly depends on (`hyperhive`, `agent-<n>`). Used by the
/// container row to render its `deployed:<sha12>` chip per agent.
/// Distinct from `read_meta_inputs()` which walks deeper for the
/// flake-input update form.
fn read_meta_locked_revs() -> std::collections::HashMap<String, String> {
let mut out = std::collections::HashMap::new();
let Ok(raw) = std::fs::read_to_string("/var/lib/hyperhive/meta/flake.lock") else {
return out;
};
let Ok(json) = serde_json::from_str::<serde_json::Value>(&raw) else {
return out;
};
let Some(nodes) = json.get("nodes").and_then(|v| v.as_object()) else {
return out;
};
let Some(root_name) = json.get("root").and_then(|v| v.as_str()) else {
return out;
};
let Some(root_inputs) = nodes
.get(root_name)
.and_then(|n| n.get("inputs"))
.and_then(|v| v.as_object())
else {
return out;
};
for alias in root_inputs.keys() {
let target_name = match root_inputs.get(alias) {
Some(serde_json::Value::String(s)) => s.clone(),
_ => continue,
};
if let Some(rev) = nodes
.get(&target_name)
.and_then(|n| n.get("locked"))
.and_then(|v| v.get("rev"))
.and_then(|v| v.as_str())
{
out.insert(alias.clone(), rev.to_owned());
}
}
out
}
#[derive(Serialize, Clone)]
struct MetaInputView {
/// Input key in meta's `flake.nix` — `hyperhive`, `agent-<n>`, etc.
name: String,
/// Full locked sha. Not displayed verbatim; the dashboard
/// truncates to the first 12 chars for the chip.
rev: String,
/// Unix seconds — `locked.lastModified`. Drives the relative
/// "2h ago" timestamp on each input row.
last_modified: i64,
/// `original.url` if available, for the tooltip / row meta text.
#[serde(skip_serializing_if = "Option::is_none")]
url: Option<String>,
}
/// Walk `flake.lock`'s `nodes` graph from `root` and emit one
/// `MetaInputView` per fetched input, up to two levels deep. That
/// surfaces the direct meta inputs (`hyperhive`, `agent-<n>`) AND
/// the agent flakes' own inputs (`agent-dmatrix/mcp-matrix`,
/// `hyperhive/nixpkgs`, etc.) so the operator can bump them
/// individually from the UI. Deeper transitive nodes aren't shown
/// to keep the panel readable — bumping the level-2 entry will
/// re-fetch its own sub-inputs anyway. Names are slash-separated
/// paths from root, which is the syntax `nix flake update` accepts
/// for transitive inputs.
///
/// Inputs that resolve via a `follows` chain (lock value is an
/// Array of strings) are skipped — they're aliases, not their own
/// fetched derivation, and updating them does nothing.
fn read_meta_inputs() -> Vec<MetaInputView> {
let mut out = Vec::new();
let Ok(raw) = std::fs::read_to_string("/var/lib/hyperhive/meta/flake.lock") else {
return out;
};
let Ok(json) = serde_json::from_str::<serde_json::Value>(&raw) else {
return out;
};
let Some(nodes) = json.get("nodes").and_then(|v| v.as_object()) else {
return out;
};
let Some(root_name) = json.get("root").and_then(|v| v.as_str()) else {
return out;
};
walk_meta_inputs(nodes, root_name, "", 0, 2, &mut out);
// hyperhive first, then alphabetical (sub-paths sort under their
// parent, which gives a tidy 'agent-foo, agent-foo/bar' grouping).
out.sort_by(|a, b| match (a.name.as_str(), b.name.as_str()) {
("hyperhive", _) => std::cmp::Ordering::Less,
(_, "hyperhive") => std::cmp::Ordering::Greater,
_ => a.name.cmp(&b.name),
});
out
}
fn walk_meta_inputs(
nodes: &serde_json::Map<String, serde_json::Value>,
node_name: &str,
prefix: &str,
depth: u32,
max_depth: u32,
out: &mut Vec<MetaInputView>,
) {
if depth >= max_depth {
return;
}
let Some(node) = nodes.get(node_name) else {
return;
};
let Some(inputs_map) = node.get("inputs").and_then(|v| v.as_object()) else {
return;
};
for (alias, target) in inputs_map {
// Inputs map value is either a string (node name) or an
// array (a `follows` chain). The latter just aliases another
// node — we can't `nix flake update` it directly, so skip.
let target_name = match target {
serde_json::Value::String(s) => s.clone(),
_ => continue,
};
let Some(target_node) = nodes.get(&target_name) else {
continue;
};
let path = if prefix.is_empty() {
alias.clone()
} else {
format!("{prefix}/{alias}")
};
if let Some(rev) = target_node
.get("locked")
.and_then(|v| v.get("rev"))
.and_then(|v| v.as_str())
{
let last_modified = target_node
.get("locked")
.and_then(|v| v.get("lastModified"))
.and_then(serde_json::Value::as_i64)
.unwrap_or(0);
let url = target_node
.get("original")
.and_then(|v| v.get("url"))
.and_then(|v| v.as_str())
.map(str::to_owned);
out.push(MetaInputView {
name: path.clone(),
rev: rev.to_owned(),
last_modified,
url,
});
}
walk_meta_inputs(nodes, &target_name, &path, depth + 1, max_depth, out);
}
}
/// Transient state for agents whose container does NOT yet exist
/// (`Spawning`). Lifecycle ops on existing containers surface as
/// `ContainerView.pending` inline; this list only catches pre-creation.
fn build_transient_views(
raw_containers: &[String],
transient_snapshot: &std::collections::HashMap<String, crate::coordinator::TransientState>,
) -> Vec<TransientView> {
transient_snapshot
.iter()
.filter(|(name, _)| {
!raw_containers
.iter()
.any(|c| c == &format!("{AGENT_PREFIX}{name}") || c == *name)
})
.map(|(name, st)| TransientView {
name: name.clone(),
kind: transient_label(st.kind),
secs: st.since.elapsed().as_secs(),
})
.collect()
}
/// Render each pending approval into its dashboard view (short sha +
/// unified diff for `ApplyCommit`, just the name for `Spawn`).
/// Project a resolved sqlite row into the lean shape the dashboard
/// history tab consumes — no `diff_html` (rendering 30 of them
/// per /api/state poll would mean 30 git diffs per refresh).
fn history_view(a: Approval) -> ApprovalHistoryView {
let displayed = a.fetched_sha.as_deref().unwrap_or(&a.commit_ref);
let sha_short = if displayed.is_empty() {
None
} else {
Some(displayed[..displayed.len().min(12)].to_owned())
};
let status = match a.status {
hive_sh4re::ApprovalStatus::Approved => "approved",
hive_sh4re::ApprovalStatus::Denied => "denied",
hive_sh4re::ApprovalStatus::Failed => "failed",
// Pending shouldn't appear in recent_resolved, but be defensive.
hive_sh4re::ApprovalStatus::Pending => "pending",
};
let kind = match a.kind {
hive_sh4re::ApprovalKind::ApplyCommit => "apply_commit",
hive_sh4re::ApprovalKind::Spawn => "spawn",
};
ApprovalHistoryView {
id: a.id,
agent: a.agent,
kind,
sha_short,
status,
resolved_at: a.resolved_at.unwrap_or(0),
note: a.note,
}
}
async fn build_approval_views(approvals: Vec<Approval>) -> Vec<ApprovalView> {
let mut out = Vec::with_capacity(approvals.len());
for a in approvals {
out.push(match a.kind {
hive_sh4re::ApprovalKind::ApplyCommit => {
// Prefer the canonical fetched sha from applied;
// commit_ref is only the manager's claim and may be
// amended out from under us.
let displayed = a.fetched_sha.as_deref().unwrap_or(&a.commit_ref);
let sha = displayed[..displayed.len().min(12)].to_owned();
let diff = approval_diff(&a.agent, a.id).await;
ApprovalView {
id: a.id,
agent: a.agent,
kind: "apply_commit",
sha_short: Some(sha),
diff_html: Some(render_diff_lines(&diff)),
}
}
hive_sh4re::ApprovalKind::Spawn => ApprovalView {
id: a.id,
agent: a.agent,
kind: "spawn",
sha_short: None,
diff_html: None,
},
});
}
out
}
/// State-dir names that don't appear in the live container list (and
/// aren't the manager). Each one surfaces in the dashboard as a row
/// with R3V1V3 + PURG3 actions.
fn build_tombstone_views(
coord: &Coordinator,
containers: &[ContainerView],
transient_snapshot: &std::collections::HashMap<String, crate::coordinator::TransientState>,
) -> Vec<TombstoneView> {
let _ = coord; // kept_state_names is a free fn but takes &self by future plan
let live: std::collections::HashSet<&str> = containers
.iter()
.map(|c| c.name.as_str())
.chain(transient_snapshot.keys().map(String::as_str))
.collect();
Coordinator::kept_state_names()
.into_iter()
.filter(|name| name != MANAGER_NAME && !live.contains(name.as_str()))
.map(|name| {
let root = Coordinator::agent_state_root(&name);
let state_bytes = dir_size_bytes(&root);
let last_seen = std::fs::metadata(&root)
.and_then(|m| m.modified())
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0);
let has_creds = claude_has_session(&Coordinator::agent_claude_dir(&name));
TombstoneView {
name,
state_bytes,
last_seen,
has_creds,
}
})
.collect()
}
/// Sum the byte size of every regular file under `root`. Cheap to compute
/// for typical agent state (config repo + claude creds + notes file —
/// usually a few MB); fine to do inline on each /api/state. Returns 0 on
/// any error.
fn dir_size_bytes(root: &Path) -> u64 {
fn walk(p: &Path, acc: &mut u64) {
let Ok(rd) = std::fs::read_dir(p) else { return };
for entry in rd.flatten() {
let Ok(ft) = entry.file_type() else { continue };
if ft.is_dir() {
walk(&entry.path(), acc);
} else if ft.is_file()
&& let Ok(meta) = entry.metadata()
{
*acc += meta.len();
}
}
}
let mut total = 0u64;
walk(root, &mut total);
total
}
async fn messages_stream(
State(state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let rx = state.coord.broker.subscribe();
let stream = BroadcastStream::new(rx).filter_map(|res| {
// Drop lagged events. Browsers reconnect; nothing to do here.
let event = res.ok()?;
let json = serde_json::to_string(&event).ok()?;
Some(Ok(Event::default().data(json)))
});
Sse::new(stream).keep_alive(KeepAlive::default())
}
async fn post_approve(State(state): State<AppState>, AxumPath(id): AxumPath<i64>) -> Response {
match actions::approve(state.coord.clone(), id).await {
Ok(()) => Redirect::to("/").into_response(),
Err(e) => error_response(&format!("approve {id} failed: {e:#}")),
}
}
#[derive(Deserialize, Default)]
struct DenyForm {
#[serde(default)]
note: Option<String>,
}
async fn post_deny(
State(state): State<AppState>,
AxumPath(id): AxumPath<i64>,
Form(form): Form<DenyForm>,
) -> Response {
let note = form
.note
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty());
match actions::deny(&state.coord, id, note).await {
Ok(()) => Redirect::to("/").into_response(),
Err(e) => error_response(&format!("deny {id} failed: {e:#}")),
}
}
#[derive(Deserialize)]
struct RequestSpawnForm {
name: String,
}
#[derive(Deserialize)]
struct AnswerForm {
answer: String,
}
async fn post_answer_question(
State(state): State<AppState>,
AxumPath(id): AxumPath<i64>,
Form(form): Form<AnswerForm>,
) -> Response {
let answer = form.answer.trim();
if answer.is_empty() {
return error_response("answer: required");
}
match state.coord.questions.answer(id, answer) {
Ok((question, asker)) => {
tracing::info!(%id, %asker, "operator answered question");
state.coord.notify_agent(
&asker,
&hive_sh4re::HelperEvent::OperatorAnswered {
id,
question,
answer: answer.to_owned(),
},
);
Redirect::to("/").into_response()
}
Err(e) => error_response(&format!("answer {id} failed: {e:#}")),
}
}
/// Resolve a pending operator question with a sentinel answer when
/// the operator decides not to / can't answer. The manager harness
/// receives an `OperatorAnswered` event with `answer = "[cancelled]"`
/// so it can fall back on whatever default it had. Same code path as
/// a real answer — just lets the operator close the loop instead of
/// letting the question dangle forever.
async fn post_cancel_question(
State(state): State<AppState>,
AxumPath(id): AxumPath<i64>,
) -> Response {
const SENTINEL: &str = "[cancelled]";
match state.coord.questions.answer(id, SENTINEL) {
Ok((question, asker)) => {
tracing::info!(%id, %asker, "operator cancelled question");
state.coord.notify_agent(
&asker,
&hive_sh4re::HelperEvent::OperatorAnswered {
id,
question,
answer: SENTINEL.to_owned(),
},
);
Redirect::to("/").into_response()
}
Err(e) => error_response(&format!("cancel-question {id} failed: {e:#}")),
}
}
#[derive(Deserialize)]
struct JournalQuery {
/// Optional systemd unit filter — e.g. `hive-ag3nt.service`. When
/// omitted, returns the full machine journal.
#[serde(default)]
unit: Option<String>,
/// Number of trailing lines to return. Capped at 5000.
#[serde(default)]
lines: Option<u32>,
}
/// Shell out to `journalctl -M <container> -b` and return its text
/// output. Operator-only by virtue of the dashboard being host-bound;
/// hive-c0re already runs as root in its systemd unit so journalctl
/// has the access it needs.
async fn get_journal(
AxumPath(name): AxumPath<String>,
axum::extract::Query(q): axum::extract::Query<JournalQuery>,
) -> Response {
// Validate the container name against the list of managed
// containers so we don't shell out with arbitrary input.
let container = strip_container_prefix(&name);
let prefixed = if container == lifecycle::MANAGER_NAME {
container.clone()
} else {
format!("{}{container}", lifecycle::AGENT_PREFIX)
};
let live = lifecycle::list().await.unwrap_or_default();
if !live.iter().any(|c| c == &prefixed) {
return error_response(&format!("journal: no managed container {prefixed:?}"));
}
let lines = q.lines.unwrap_or(500).min(5000);
let mut cmd = tokio::process::Command::new("journalctl");
cmd.args([
"-M",
&prefixed,
"-b",
"--no-pager",
"--output=short-iso",
"--lines",
])
.arg(lines.to_string());
if let Some(u) = q.unit.as_deref().filter(|s| !s.is_empty()) {
// accept hive-ag3nt[.service] / hive-m1nd[.service] — anything
// else we refuse, again to keep the shell-out tight.
let allowed = ["hive-ag3nt.service", "hive-m1nd.service"];
let unit = if u.ends_with(".service") {
u.to_owned()
} else {
format!("{u}.service")
};
if !allowed.contains(&unit.as_str()) {
return error_response(&format!("journal: unknown unit {unit:?}"));
}
cmd.args(["-u", &unit]);
}
match cmd.output().await {
Ok(out) => {
// Combine stdout + stderr — journalctl emits to both on errors.
let mut body = String::from_utf8_lossy(&out.stdout).into_owned();
if !out.status.success() {
body.push_str("\n--- stderr ---\n");
body.push_str(&String::from_utf8_lossy(&out.stderr));
}
([("content-type", "text/plain; charset=utf-8")], body).into_response()
}
Err(e) => error_response(&format!("journalctl spawn: {e}")),
}
}
/// Show the current `agent.nix` from the applied repo — the file
/// the container actually builds against. Read-only; the manager
/// can't influence what this returns (that path goes through the
/// approval queue).
async fn get_agent_config(AxumPath(name): AxumPath<String>) -> Response {
let logical = strip_container_prefix(&name);
// Constrain to managed containers — same shape as the journal
// endpoint, prevents arbitrary filesystem reads.
let live = lifecycle::list().await.unwrap_or_default();
let prefixed = if logical == lifecycle::MANAGER_NAME {
logical.clone()
} else {
format!("{}{logical}", lifecycle::AGENT_PREFIX)
};
if !live.iter().any(|c| c == &prefixed) {
return error_response(&format!("agent-config: no managed container {prefixed:?}"));
}
let path = Coordinator::agent_applied_dir(&logical).join("agent.nix");
match std::fs::read_to_string(&path) {
Ok(body) => ([("content-type", "text/plain; charset=utf-8")], body).into_response(),
Err(e) => error_response(&format!("read {}: {e}", path.display())),
}
}
async fn post_purge_tombstone(
State(state): State<AppState>,
AxumPath(name): AxumPath<String>,
) -> Response {
if name == lifecycle::MANAGER_NAME {
return error_response("refusing to purge the manager's state");
}
// Sanity: refuse to purge if a live container still exists with this
// name. The dashboard already filters tombstones to non-live names,
// but the operator could send a stale POST.
let live = lifecycle::list().await.unwrap_or_default();
if live
.iter()
.any(|c| c == &format!("{}{name}", lifecycle::AGENT_PREFIX) || c == &name)
{
return error_response(&format!(
"refusing to purge {name}: container still exists — use DESTR0Y first"
));
}
let mut errors = Vec::new();
for dir in [
Coordinator::agent_state_root(&name),
Coordinator::agent_applied_dir(&name),
] {
if dir.exists()
&& let Err(e) = std::fs::remove_dir_all(&dir)
{
errors.push(format!("{}: {e}", dir.display()));
}
}
let _ = state
.coord
.approvals
.fail_pending_for_agent(&name, "agent state purged");
if errors.is_empty() {
tracing::info!(%name, "tombstone purged");
Redirect::to("/").into_response()
} else {
error_response(&format!("purge {name} partial: {}", errors.join(", ")))
}
}
/// Operator-side compose form on the dashboard terminal. Drops a
/// message into the broker as `{from: "operator", to, body}`. Same
/// shape that per-agent web UIs use via `OperatorMsg`, but here the
/// operator picks the recipient explicitly with `@name`. No
/// validation that `to` resolves to a known agent — broker accepts
/// arbitrary recipients (and the agent's inbox grows whether or not
/// they exist, which is fine for spawn-then-greet flows).
#[derive(Deserialize)]
struct OpSendForm {
to: String,
body: String,
}
/// Form for `POST /meta-update`. Inputs ride in as a comma-separated
/// list under the `inputs` field — the JS submitter joins the
/// checked boxes since axum's `Form` extractor doesn't natively
/// decode repeated keys without a helper.
#[derive(Deserialize)]
struct MetaUpdateForm {
inputs: String,
}
/// Bulk-update selected meta flake inputs, then rebuild the affected
/// agents in the background. Idempotent w.r.t. selection — choosing
/// an input that's already at the latest sha is a no-op (no commit,
/// no rebuild ripple). Returns immediately after queueing the work;
/// dashboard polls for progress via container `pending` spinners +
/// the meta-inputs row sha update.
async fn post_meta_update(
State(state): State<AppState>,
Form(form): Form<MetaUpdateForm>,
) -> Response {
let inputs: Vec<String> = form
.inputs
.split(',')
.map(|s| s.trim().to_owned())
.filter(|s| !s.is_empty())
.collect();
if inputs.is_empty() {
return error_response("meta-update: no inputs selected");
}
let coord = state.coord.clone();
let inputs_clone = inputs.clone();
tokio::spawn(async move {
run_meta_update(&coord, &inputs_clone).await;
});
Redirect::to("/").into_response()
}
/// Background task: run `nix flake update <inputs>` in meta + commit,
/// then rebuild every agent whose input was touched (or all agents
/// when `hyperhive` was bumped, since that's the shared base). Each
/// rebuild fires `Rebuilt { ok, note, ... }` to the manager so the
/// operator and manager get the same feedback they'd see from an
/// auto-update / manual dashboard rebuild.
async fn run_meta_update(coord: &Arc<crate::coordinator::Coordinator>, inputs: &[String]) {
tracing::info!(?inputs, "meta-update: starting");
if let Err(e) = crate::meta::lock_update(inputs).await {
tracing::warn!(error = ?e, "meta-update: lock_update failed");
return;
}
// Decide which agents to rebuild. Inputs are slash-paths from
// the meta root — `hyperhive`, `hyperhive/nixpkgs`,
// `agent-coder`, `agent-coder/mcp-matrix`, etc. Anything in the
// hyperhive subtree affects every agent (shared base); anything
// in `agent-<n>/...` only the named agent.
let touched_hyperhive = inputs
.iter()
.any(|i| i == "hyperhive" || i.starts_with("hyperhive/"));
let touched_agents: Vec<String> = inputs
.iter()
.filter_map(|i| i.strip_prefix("agent-"))
.map(|rest| rest.split('/').next().unwrap_or(rest).to_owned())
.collect();
let agents_to_rebuild: Vec<String> = if touched_hyperhive {
crate::lifecycle::list()
.await
.unwrap_or_default()
.into_iter()
.filter_map(|c| {
if c == crate::lifecycle::MANAGER_NAME {
Some(crate::lifecycle::MANAGER_NAME.to_owned())
} else {
c.strip_prefix(crate::lifecycle::AGENT_PREFIX)
.map(str::to_owned)
}
})
.collect()
} else {
touched_agents
};
let current_rev = crate::auto_update::current_flake_rev(&coord.hyperhive_flake)
.unwrap_or_default();
// Sequential rebuild loop — the META_LOCK guards meta-side
// races but parallel nix builds also serialise via nix-daemon,
// so sequential is just as fast in practice and keeps logs
// readable.
for name in agents_to_rebuild {
tracing::info!(%name, "meta-update: rebuilding agent");
if let Err(e) = crate::auto_update::rebuild_agent(coord, &name, &current_rev).await {
tracing::warn!(%name, error = ?e, "meta-update: rebuild failed");
// continue: surface each per-agent failure via its own
// Rebuilt event; don't abort the whole batch.
}
}
tracing::info!("meta-update: done");
}
async fn post_op_send(State(state): State<AppState>, Form(form): Form<OpSendForm>) -> Response {
let to = form.to.trim().to_owned();
let body = form.body.trim().to_owned();
if to.is_empty() {
return error_response("op-send: `to` required");
}
if body.is_empty() {
return error_response("op-send: `body` required");
}
if let Err(e) = state.coord.broker.send(&hive_sh4re::Message {
from: hive_sh4re::OPERATOR_RECIPIENT.to_owned(),
to: to.clone(),
body,
}) {
return error_response(&format!("op-send to {to} failed: {e:#}"));
}
Redirect::to("/").into_response()
}
async fn post_request_spawn(
State(state): State<AppState>,
Form(form): Form<RequestSpawnForm>,
) -> Response {
let name = form.name.trim().to_owned();
if name.is_empty() {
return error_response("spawn: `name` required");
}
match state
.coord
.approvals
.submit_kind(&name, hive_sh4re::ApprovalKind::Spawn, "")
{
Ok(id) => {
tracing::info!(%id, %name, "operator: spawn approval queued via dashboard");
Redirect::to("/").into_response()
}
Err(e) => error_response(&format!("request-spawn {name} failed: {e:#}")),
}
}
async fn post_rebuild(State(state): State<AppState>, AxumPath(name): AxumPath<String>) -> Response {
let Some(current_rev) = crate::auto_update::current_flake_rev(&state.coord.hyperhive_flake)
else {
return error_response(
"rebuild: hyperhive_flake has no canonical path; manual rebuild only via `hive-c0re rebuild`",
);
};
let coord = state.coord.clone();
lifecycle_action(
&state,
&name,
crate::coordinator::TransientKind::Rebuilding,
"rebuild",
move |n| {
let coord = coord.clone();
let rev = current_rev.clone();
async move { crate::auto_update::rebuild_agent(&coord, &n, &rev).await }
},
// rebuild_agent fires kick_agent on success itself, so the
// extra-closure is a no-op here.
|_, _| {},
)
.await
}
/// Common shape for the simple lifecycle action handlers (start /
/// stop / restart / rebuild): strip the container prefix, mark
/// transient for the duration so the dashboard can spinner, run the
/// lifecycle op, clear transient, redirect on success or surface the
/// error. `verb` only appears in the error message; `extra` runs on
/// success after `clear_transient` for handlers that need follow-up
/// (e.g. `kill` also unregisters the agent + fires `HelperEvent`).
async fn lifecycle_action<F, Fut>(
state: &AppState,
name: &str,
kind: crate::coordinator::TransientKind,
verb: &str,
body: F,
extra: impl FnOnce(&AppState, &str),
) -> Response
where
F: FnOnce(String) -> Fut,
Fut: std::future::Future<Output = anyhow::Result<()>>,
{
let logical = strip_container_prefix(name);
state.coord.set_transient(&logical, kind);
let result = body(logical.clone()).await;
state.coord.clear_transient(&logical);
match result {
Ok(()) => {
extra(state, &logical);
Redirect::to("/").into_response()
}
Err(e) => error_response(&format!("{verb} {logical} failed: {e:#}")),
}
}
async fn post_kill(State(state): State<AppState>, AxumPath(name): AxumPath<String>) -> Response {
let logical = strip_container_prefix(&name);
if logical == lifecycle::MANAGER_NAME {
return error_response("kill: refusing to stop the manager");
}
lifecycle_action(
&state,
&name,
crate::coordinator::TransientKind::Stopping,
"kill",
|n| async move { lifecycle::kill(&n).await },
|s, n| {
s.coord.unregister_agent(n);
s.coord.notify_manager(&hive_sh4re::HelperEvent::Killed {
agent: n.to_owned(),
});
},
)
.await
}
async fn post_restart(State(state): State<AppState>, AxumPath(name): AxumPath<String>) -> Response {
lifecycle_action(
&state,
&name,
crate::coordinator::TransientKind::Restarting,
"restart",
|n| async move { lifecycle::restart(&n).await },
|s, n| s.coord.kick_agent(n, "container restarted"),
)
.await
}
async fn post_start(State(state): State<AppState>, AxumPath(name): AxumPath<String>) -> Response {
lifecycle_action(
&state,
&name,
crate::coordinator::TransientKind::Starting,
"start",
|n| async move { lifecycle::start(&n).await },
|s, n| s.coord.kick_agent(n, "container started"),
)
.await
}
async fn post_update_all(State(state): State<AppState>) -> Response {
let Some(current_rev) = crate::auto_update::current_flake_rev(&state.coord.hyperhive_flake)
else {
return error_response("update-all: hyperhive_flake has no canonical path");
};
let containers = lifecycle::list().await.unwrap_or_default();
let mut errors = Vec::new();
for container in containers {
let logical = if container == lifecycle::MANAGER_NAME {
lifecycle::MANAGER_NAME.to_owned()
} else if let Some(n) = container.strip_prefix(lifecycle::AGENT_PREFIX) {
n.to_owned()
} else {
continue;
};
if !crate::auto_update::agent_needs_update(&logical, &current_rev) {
continue;
}
if let Err(e) =
crate::auto_update::rebuild_agent(&state.coord, &logical, &current_rev).await
{
errors.push(format!("{logical}: {e:#}"));
}
}
if errors.is_empty() {
Redirect::to("/").into_response()
} else {
error_response(&format!(
"update-all partial failure:\n{}",
errors.join("\n")
))
}
}
fn transient_label(k: crate::coordinator::TransientKind) -> &'static str {
use crate::coordinator::TransientKind::{
Destroying, Rebuilding, Restarting, Spawning, Starting, Stopping,
};
match k {
Spawning => "spawning",
Starting => "starting",
Stopping => "stopping",
Restarting => "restarting",
Rebuilding => "rebuilding",
Destroying => "destroying",
}
}
/// Convert either a logical name or a container name back to the logical
/// name. Sub-agents are `h-foo` → `foo`; manager stays `hm1nd`.
fn strip_container_prefix(name: &str) -> String {
name.strip_prefix(lifecycle::AGENT_PREFIX)
.unwrap_or(name)
.to_owned()
}
#[derive(Deserialize, Default)]
struct DestroyForm {
#[serde(default)]
purge: Option<String>,
}
async fn post_destroy(
State(state): State<AppState>,
AxumPath(name): AxumPath<String>,
Form(form): Form<DestroyForm>,
) -> Response {
// Checkbox semantics: any non-empty value (axum sends "on") = purge.
let purge = form.purge.as_deref().is_some_and(|v| !v.is_empty());
match actions::destroy(&state.coord, &name, purge).await {
Ok(()) => Redirect::to("/").into_response(),
Err(e) => error_response(&format!("destroy {name} failed: {e:#}")),
}
}
fn error_response(message: &str) -> Response {
// Plain text — the JS app surfaces this in an alert(), so HTML
// wrapping would just clutter the message.
(StatusCode::INTERNAL_SERVER_ERROR, message.to_owned()).into_response()
}
/// Filter out approvals whose agent state dir was wiped out from under us
/// (e.g. by a test script's cleanup). Marks them failed so they fall out of
/// `pending` on next render.
fn gc_orphans(coord: &Coordinator, approvals: Vec<Approval>) -> Vec<Approval> {
approvals
.into_iter()
.filter(|a| {
// Spawn approvals are for not-yet-existent agents; the proposed
// dir is supposed to be missing.
if matches!(a.kind, hive_sh4re::ApprovalKind::Spawn) {
return true;
}
if Coordinator::agent_proposed_dir(&a.agent).exists() {
true
} else {
let _ = coord.approvals.mark_failed(a.id, "agent state dir missing");
tracing::info!(id = a.id, agent = %a.agent, "auto-failed orphan approval");
false
}
})
.collect()
}
/// Render a unified diff with per-line CSS classes so the dashboard can
/// colour adds / dels / hunk headers / context. Each line becomes a
/// `<span>` tagged by its leading character; the wrapping `<pre>` keeps
/// whitespace intact.
fn render_diff_lines(diff: &str) -> String {
let mut out = String::new();
for raw in diff.lines() {
let cls = match raw.as_bytes().first() {
// file headers (`--- a/...` / `+++ b/...`) come before any
// line starting with a single `+`/`-`. similar-rs emits them
// with the doubled prefix.
_ if raw.starts_with("--- ") => "diff-file",
_ if raw.starts_with("+++ ") => "diff-file",
Some(b'@') => "diff-hunk",
Some(b'+') => "diff-add",
Some(b'-') => "diff-del",
_ => "diff-ctx",
};
let _ = writeln!(out, "<span class=\"{cls}\">{}</span>", html_escape(raw),);
}
out
}
/// Host-side mirror of `hive_ag3nt::login::has_session`. Returns true if the
/// agent's bound `~/.claude/` dir on disk contains any regular file. The
/// dashboard reads this each render so logins driven from the agent web UI
/// (Phase 8 step 4) reflect within one auto-refresh cycle.
fn claude_has_session(dir: &Path) -> bool {
let Ok(entries) = std::fs::read_dir(dir) else {
return false;
};
entries
.flatten()
.any(|e| e.file_type().is_ok_and(|t| t.is_file()))
}
/// Multi-file unified diff between the currently-deployed tree and
/// the proposal for this approval. Runs against the applied repo
/// since the canonical proposal commit lives there (manager-side
/// amendments don't move it). Empty output means proposal == main —
/// a no-op approval.
async fn approval_diff(agent: &str, approval_id: i64) -> String {
let applied = Coordinator::agent_applied_dir(agent);
if !applied.join(".git").exists() {
return format!("(no applied git repo at {})", applied.display());
}
let proposal_ref = format!("refs/tags/proposal/{approval_id}");
match git_diff_main_to(&applied, &proposal_ref).await {
Ok(s) if s.is_empty() => "(proposal matches currently-deployed tree)".to_owned(),
Ok(s) => s,
Err(e) => format!("(error: {e:#})"),
}
}
async fn git_diff_main_to(applied_dir: &Path, target_ref: &str) -> Result<String> {
let out = lifecycle::git_command()
.current_dir(applied_dir)
.args(["diff", &format!("refs/heads/main..{target_ref}")])
.output()
.await
.with_context(|| format!("spawn `git diff` in {}", applied_dir.display()))?;
if !out.status.success() {
anyhow::bail!(
"git diff main..{target_ref} failed: {}",
String::from_utf8_lossy(&out.stderr).trim()
);
}
Ok(String::from_utf8_lossy(&out.stdout).into_owned())
}
fn html_escape(s: &str) -> String {
s.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
}