1985 lines
76 KiB
Rust
1985 lines
76 KiB
Rust
//! Hyperhive dashboard. Lists managed containers (with deep-links to each
|
|
//! container's web UI), pending approvals (with unified diff vs the applied
|
|
//! repo, plus approve/deny buttons), and the manager.
|
|
|
|
use std::convert::Infallible;
|
|
use std::net::SocketAddr;
|
|
use std::path::Path;
|
|
use std::sync::Arc;
|
|
|
|
use anyhow::{Context, Result};
|
|
use axum::extract::Form;
|
|
use axum::{
|
|
Router,
|
|
extract::{Path as AxumPath, State},
|
|
http::{HeaderMap, StatusCode},
|
|
response::{
|
|
Html, IntoResponse, Response,
|
|
sse::{Event, KeepAlive, Sse},
|
|
},
|
|
routing::{get, post},
|
|
};
|
|
use hive_sh4re::Approval;
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio_stream::wrappers::BroadcastStream;
|
|
use tokio_stream::{Stream, StreamExt};
|
|
|
|
use crate::actions;
|
|
use crate::container_view::{ContainerView, claude_has_session};
|
|
use crate::coordinator::Coordinator;
|
|
use crate::lifecycle::{self, MANAGER_NAME};
|
|
|
|
const MANAGER_PORT: u16 = 8000;
|
|
|
|
#[derive(Clone)]
|
|
struct AppState {
|
|
coord: Arc<Coordinator>,
|
|
}
|
|
|
|
pub async fn serve(port: u16, coord: Arc<Coordinator>) -> Result<()> {
|
|
let app = Router::new()
|
|
.route("/", get(serve_index))
|
|
.route("/static/dashboard.css", get(serve_css))
|
|
.route("/static/app.js", get(serve_app_js))
|
|
.route("/favicon.svg", get(serve_favicon))
|
|
.route("/api/state", get(api_state))
|
|
.route("/approve/{id}", post(post_approve))
|
|
.route("/deny/{id}", post(post_deny))
|
|
.route("/destroy/{name}", post(post_destroy))
|
|
.route("/kill/{name}", post(post_kill))
|
|
.route("/restart/{name}", post(post_restart))
|
|
.route("/start/{name}", post(post_start))
|
|
.route("/rebuild/{name}", post(post_rebuild))
|
|
.route("/update-all", post(post_update_all))
|
|
.route("/answer-question/{id}", post(post_answer_question))
|
|
.route("/cancel-question/{id}", post(post_cancel_question))
|
|
.route("/purge-tombstone/{name}", post(post_purge_tombstone))
|
|
.route("/api/journal/{name}", get(get_journal))
|
|
.route("/api/approval-diff/{id}", get(get_approval_diff))
|
|
.route("/api/state-file", get(get_state_file))
|
|
.route("/api/reminders", get(api_reminders))
|
|
.route("/api/agent/{name}/links", get(get_agent_links))
|
|
.route("/cancel-reminder/{id}", post(post_cancel_reminder))
|
|
.route("/retry-reminder/{id}", post(post_retry_reminder))
|
|
.route("/request-spawn", post(post_request_spawn))
|
|
.route("/op-send", post(post_op_send))
|
|
.route("/meta-update", post(post_meta_update))
|
|
.route("/dashboard/stream", get(dashboard_stream))
|
|
.route("/dashboard/history", get(dashboard_history))
|
|
.route("/static/hive-fr0nt.js", get(serve_shared_js))
|
|
.route("/static/marked.js", get(serve_marked_js))
|
|
.with_state(AppState { coord });
|
|
let addr = SocketAddr::from(([0, 0, 0, 0], port));
|
|
let listener = bind_with_retry(addr).await?;
|
|
tracing::info!(%port, "dashboard listening");
|
|
axum::serve(listener, app).await?;
|
|
Ok(())
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Static asset handlers: the dashboard is an SPA. `GET /` returns the
|
|
// (static) shell; `GET /static/*` serves the CSS + JS app; `GET /api/state`
|
|
// returns the current snapshot as JSON. The JS app fetches state on load,
|
|
// re-fetches after every async-form submit, and listens on
|
|
// `/dashboard/stream` for the unified live event channel.
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// `SO_REUSEADDR` bind with retry. Mirrors the per-agent variant in
|
|
/// `hive-ag3nt::web_ui::bind_with_retry`: hive-c0re restarts also
|
|
/// race the previous process's socket release, and the retry has no
|
|
/// attempt cap — capping was the proximate cause of issue #324
|
|
/// (silent give-up on a long stale socket). Genuine port collisions
|
|
/// don't reach this layer (dashboard is bound to a fixed configured
|
|
/// port, no per-agent hashing), so any persistent `AddrInUse` always
|
|
/// reflects a recoverable stale socket. WARN for the first dozen
|
|
/// attempts; INFO after that to avoid spamming the journal during a
|
|
/// long hold; INFO on eventual success when we did have to retry.
|
|
async fn bind_with_retry(addr: SocketAddr) -> Result<tokio::net::TcpListener> {
|
|
let mut delay_ms = 250u64;
|
|
let mut attempts = 0u32;
|
|
loop {
|
|
match try_bind(addr) {
|
|
Ok(l) => {
|
|
if attempts > 0 {
|
|
tracing::info!(
|
|
%addr, attempts,
|
|
"dashboard: bind succeeded after retry"
|
|
);
|
|
}
|
|
return Ok(l);
|
|
}
|
|
Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
|
|
let attempt = attempts + 1;
|
|
if attempt <= 12 {
|
|
tracing::warn!(
|
|
%addr, attempt,
|
|
"dashboard: AddrInUse, retrying in {delay_ms}ms"
|
|
);
|
|
} else {
|
|
tracing::info!(
|
|
%addr, attempt,
|
|
"dashboard: AddrInUse still holding, retrying in {delay_ms}ms"
|
|
);
|
|
}
|
|
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
|
|
attempts += 1;
|
|
delay_ms = (delay_ms * 2).min(2000);
|
|
}
|
|
Err(e) => {
|
|
return Err(e).with_context(|| format!("bind dashboard on {addr}"));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn try_bind(addr: SocketAddr) -> std::io::Result<tokio::net::TcpListener> {
|
|
let sock = match addr {
|
|
SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
|
|
SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
|
|
};
|
|
sock.set_reuseaddr(true)?;
|
|
sock.bind(addr)?;
|
|
sock.listen(1024)
|
|
}
|
|
|
|
async fn serve_index() -> impl IntoResponse {
|
|
Html(include_str!("../assets/index.html"))
|
|
}
|
|
|
|
async fn serve_css() -> impl IntoResponse {
|
|
// Prepend the shared palette/typography so per-page styles only need
|
|
// to declare what's actually page-specific. One HTTP request, no
|
|
// per-asset cache to invalidate.
|
|
let body = format!(
|
|
"{}\n{}\n{}",
|
|
hive_fr0nt::BASE_CSS,
|
|
hive_fr0nt::TERMINAL_CSS,
|
|
include_str!("../assets/dashboard.css"),
|
|
);
|
|
([("content-type", "text/css")], body)
|
|
}
|
|
|
|
async fn serve_app_js() -> impl IntoResponse {
|
|
(
|
|
[("content-type", "application/javascript")],
|
|
include_str!("../assets/app.js"),
|
|
)
|
|
}
|
|
|
|
/// Dashboard favicon — the hyperhive mark. Static: the dashboard
|
|
/// represents the whole hive, so it always uses the project logo
|
|
/// (per-agent pages serve their own configurable `/icon` instead).
|
|
async fn serve_favicon() -> impl IntoResponse {
|
|
(
|
|
[("content-type", "image/svg+xml")],
|
|
include_str!("../../branding/hyperhive.svg"),
|
|
)
|
|
}
|
|
|
|
async fn serve_shared_js() -> impl IntoResponse {
|
|
(
|
|
[("content-type", "application/javascript")],
|
|
hive_fr0nt::TERMINAL_JS,
|
|
)
|
|
}
|
|
|
|
/// Vendored `marked` bundle — the side panel renders markdown file
|
|
/// previews with it.
|
|
async fn serve_marked_js() -> impl IntoResponse {
|
|
(
|
|
[("content-type", "application/javascript")],
|
|
hive_fr0nt::MARKED_JS,
|
|
)
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct StateSnapshot {
|
|
/// Broker seq at the moment this snapshot was assembled. Clients
|
|
/// dedupe their buffered SSE traffic against this value: any
|
|
/// `MessageEvent` with `seq <= snapshot.seq` is already reflected in
|
|
/// the snapshot (or pre-dates it); anything with `seq > snapshot.seq`
|
|
/// is post-snapshot and should be applied. Set to 0 in the
|
|
/// pre-emit case (no events ever fired) — clients treat that as
|
|
/// "apply everything you've buffered".
|
|
seq: u64,
|
|
hostname: String,
|
|
manager_port: u16,
|
|
any_stale: bool,
|
|
containers: Vec<ContainerView>,
|
|
transients: Vec<TransientView>,
|
|
approvals: Vec<ApprovalView>,
|
|
/// Last 30 resolved approvals (approved / denied / failed), newest-
|
|
/// first. Drives the "history" tab on the approvals section.
|
|
approval_history: Vec<ApprovalHistoryView>,
|
|
/// Pending operator-targeted questions (`target IS NULL`). Any
|
|
/// agent can `ask` the operator and `ask` returns immediately with
|
|
/// the id; on `/answer-question` we mark the row answered and
|
|
/// fire `HelperEvent::QuestionAnswered` back into the asker's
|
|
/// inbox. Peer-to-peer questions live in the same table but never
|
|
/// surface here (see `OperatorQuestions::pending`).
|
|
questions: Vec<QuestionView>,
|
|
/// Last 20 answered questions, newest-first.
|
|
question_history: Vec<QuestionView>,
|
|
/// State dirs (config history + claude creds + /state/ notes) that
|
|
/// survive after a destroy-without-purge. The operator can re-spawn
|
|
/// with the same name to resume, or PURG3 to wipe them.
|
|
tombstones: Vec<TombstoneView>,
|
|
/// Sub-agents whose FNV-1a hashed web UI port collides with at
|
|
/// least one other agent. Operator resolves by renaming. The
|
|
/// dashboard renders a banner at the top listing each cluster.
|
|
port_conflicts: Vec<PortConflict>,
|
|
/// Inputs in `meta/flake.lock` the operator can selectively
|
|
/// `nix flake update`. Hyperhive first, then `agent-<n>` rows.
|
|
meta_inputs: Vec<MetaInputView>,
|
|
/// True while a dashboard-triggered `meta-update` (flake lock bump +
|
|
/// agent rebuild ripple) is running in the background. Lets a
|
|
/// client that cold-loads mid-update render the META INPUTS panel's
|
|
/// disabled "updating…" state; live transitions arrive via the
|
|
/// `MetaUpdateRunning` event (issue #259).
|
|
meta_update_running: bool,
|
|
/// Whether the hive-forge container is up. When true the dashboard
|
|
/// links each container's config + each approval's commit into the
|
|
/// forge's `agent-configs` repos.
|
|
forge_present: bool,
|
|
}
|
|
|
|
/// `OpQuestion` + computed `question_refs` / `answer_refs`. Built
|
|
/// from the snapshot read; the live channel attaches the same
|
|
/// fields directly on `QuestionAdded` / `QuestionResolved`.
|
|
#[derive(Serialize)]
|
|
struct QuestionView {
|
|
#[serde(flatten)]
|
|
inner: crate::operator_questions::OpQuestion,
|
|
#[serde(skip_serializing_if = "Vec::is_empty")]
|
|
question_refs: Vec<String>,
|
|
#[serde(skip_serializing_if = "Vec::is_empty")]
|
|
answer_refs: Vec<String>,
|
|
}
|
|
|
|
impl QuestionView {
|
|
fn from_question(q: crate::operator_questions::OpQuestion) -> Self {
|
|
let question_refs = scan_validated_paths(&q.question);
|
|
let answer_refs = q
|
|
.answer
|
|
.as_deref()
|
|
.map(scan_validated_paths)
|
|
.unwrap_or_default();
|
|
Self {
|
|
inner: q,
|
|
question_refs,
|
|
answer_refs,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct PortConflict {
|
|
port: u16,
|
|
/// All agent names sharing this port (sorted, ≥2 entries).
|
|
agents: Vec<String>,
|
|
}
|
|
|
|
#[derive(Serialize, Clone, Debug)]
|
|
pub(crate) struct TombstoneView {
|
|
pub name: String,
|
|
/// Bytes used by the state dir tree. Cheap-ish to compute; let the
|
|
/// operator know how much they're holding onto.
|
|
pub state_bytes: u64,
|
|
/// Mtime (unix seconds) of the state dir; rough "last seen".
|
|
pub last_seen: i64,
|
|
pub has_creds: bool,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct TransientView {
|
|
name: String,
|
|
kind: &'static str,
|
|
secs: u64,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct ApprovalHistoryView {
|
|
id: i64,
|
|
agent: String,
|
|
kind: &'static str,
|
|
/// First 12 chars of the canonical sha (preferred) or
|
|
/// manager-supplied ref. None for resolved spawn approvals.
|
|
sha_short: Option<String>,
|
|
/// `approved` / `denied` / `failed`.
|
|
status: &'static str,
|
|
/// Unix seconds. Renders as a relative time on the dashboard.
|
|
resolved_at: i64,
|
|
/// Operator-supplied deny reason (for `denied`) or build error
|
|
/// (for `failed`). None on `approved`.
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
note: Option<String>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct ApprovalView {
|
|
id: i64,
|
|
agent: String,
|
|
kind: &'static str,
|
|
/// First 12 chars of the `commit_ref`, for `ApplyCommit` only.
|
|
sha_short: Option<String>,
|
|
/// Raw unified diff text, for `ApplyCommit` only. The client splits
|
|
/// on `\n` and per-line classifies (`+` / `-` / `@@` / `--- ` / `+++ `
|
|
/// → diff-add / diff-del / diff-hunk / diff-file). Shipping raw
|
|
/// instead of pre-rendered HTML saves bytes on the wire (no
|
|
/// per-line `<span>` markup) and removes the only HTML-escape
|
|
/// surface from the snapshot.
|
|
diff: Option<String>,
|
|
/// Manager-supplied description shown on the approval card.
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
description: Option<String>,
|
|
/// Unix seconds the approval was queued. Rendered as a relative
|
|
/// time on the card so the operator can spot a stale request. (#272)
|
|
requested_at: i64,
|
|
}
|
|
|
|
/// Replace silent `.unwrap_or_default()` on the data sources behind
|
|
/// `/api/state` so that whichever query degrades surfaces in journald
|
|
/// instead of leaving the operator staring at an empty list. The
|
|
/// dashboard still degrades to a sensible default value; the warn
|
|
/// is just the diagnostic breadcrumb the old code swallowed.
|
|
fn log_default<T, E>(what: &str, result: std::result::Result<T, E>) -> T
|
|
where
|
|
T: Default,
|
|
E: std::fmt::Debug,
|
|
{
|
|
match result {
|
|
Ok(v) => v,
|
|
Err(e) => {
|
|
tracing::warn!(target: "api_state", source = %what, error = ?e, "snapshot source failed; using default");
|
|
T::default()
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn api_state(headers: HeaderMap, State(state): State<AppState>) -> axum::Json<StateSnapshot> {
|
|
let host = headers
|
|
.get("host")
|
|
.and_then(|h| h.to_str().ok())
|
|
.unwrap_or("localhost");
|
|
let hostname = host.split(':').next().unwrap_or(host).to_owned();
|
|
|
|
// Capture the unified dashboard-channel seq *before* any read so the
|
|
// dedupe contract is "events with seq > snapshot.seq are
|
|
// post-snapshot, never missed." An event landing during snapshot
|
|
// construction may be doubly applied (snapshot caught the write +
|
|
// client also applies the SSE frame) — that's a renderer's problem
|
|
// to make idempotent, not ours to avoid here.
|
|
let seq = state.coord.current_seq();
|
|
|
|
// Refresh the coordinator's cached container snapshot before
|
|
// reading. Cold-load clients then see whatever the latest rescan
|
|
// produced; live clients converge via the matching
|
|
// `ContainerStateChanged` / `ContainerRemoved` events the rescan
|
|
// emits.
|
|
state.coord.rescan_containers_and_emit().await;
|
|
let containers = state.coord.containers_snapshot().await;
|
|
let any_stale = containers.iter().any(|c| c.needs_update);
|
|
let transient_snapshot = state.coord.transient_snapshot();
|
|
let pending_approvals = gc_orphans(
|
|
&state.coord,
|
|
log_default("approvals.pending", state.coord.approvals.pending()),
|
|
);
|
|
let transients = build_transient_views(&containers, &transient_snapshot);
|
|
let approvals = build_approval_views(pending_approvals).await;
|
|
let approval_history = log_default(
|
|
"approvals.recent_resolved",
|
|
state.coord.approvals.recent_resolved(30),
|
|
)
|
|
.into_iter()
|
|
.map(history_view)
|
|
.collect();
|
|
let tombstones = build_tombstone_views(&state.coord, &containers, &transient_snapshot);
|
|
let port_conflicts = build_port_conflicts(&containers);
|
|
|
|
// operator_inbox used to be served here as a 50-row array; the
|
|
// dashboard now derives it client-side from the message stream
|
|
// (terminal backfill + live SSE), so the snapshot stops shipping it.
|
|
// Both operator-targeted and peer threads now surface on the
|
|
// dashboard. Client filters by target client-side. Each row is
|
|
// wrapped in QuestionView so the snapshot carries the same
|
|
// file_refs the live event variants attach.
|
|
let questions: Vec<QuestionView> =
|
|
log_default("questions.pending_all", state.coord.questions.pending_all())
|
|
.into_iter()
|
|
.map(QuestionView::from_question)
|
|
.collect();
|
|
let question_history: Vec<QuestionView> = log_default(
|
|
"questions.recent_answered_all",
|
|
state.coord.questions.recent_answered_all(20),
|
|
)
|
|
.into_iter()
|
|
.map(QuestionView::from_question)
|
|
.collect();
|
|
|
|
axum::Json(StateSnapshot {
|
|
seq,
|
|
hostname,
|
|
manager_port: MANAGER_PORT,
|
|
any_stale,
|
|
containers,
|
|
transients,
|
|
approvals,
|
|
approval_history,
|
|
meta_inputs: read_meta_inputs(),
|
|
meta_update_running: state.coord.meta_update_in_progress(),
|
|
questions,
|
|
question_history,
|
|
tombstones,
|
|
port_conflicts,
|
|
forge_present: crate::forge::is_present().await,
|
|
})
|
|
}
|
|
|
|
/// Group live containers by their assigned web UI port; clusters with
|
|
/// more than one member are port-hash collisions the operator needs
|
|
/// to resolve by renaming. Manager (fixed at 8000) and sub-agents
|
|
/// (8100..8999) can't collide with each other — collisions are
|
|
/// strictly between sub-agents.
|
|
fn build_port_conflicts(containers: &[ContainerView]) -> Vec<PortConflict> {
|
|
let mut by_port: std::collections::BTreeMap<u16, Vec<String>> =
|
|
std::collections::BTreeMap::new();
|
|
for c in containers {
|
|
by_port.entry(c.port).or_default().push(c.name.clone());
|
|
}
|
|
by_port
|
|
.into_iter()
|
|
.filter(|(_, agents)| agents.len() > 1)
|
|
.map(|(port, mut agents)| {
|
|
agents.sort();
|
|
PortConflict { port, agents }
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
#[derive(Serialize, Clone, Debug)]
|
|
pub(crate) struct MetaInputView {
|
|
/// Input key in meta's `flake.nix` — `hyperhive`, `agent-<n>`, etc.
|
|
pub name: String,
|
|
/// Full locked sha. Not displayed verbatim; the dashboard
|
|
/// truncates to the first 12 chars for the chip.
|
|
pub rev: String,
|
|
/// Unix seconds — `locked.lastModified`. Drives the relative
|
|
/// "2h ago" timestamp on each input row.
|
|
pub last_modified: i64,
|
|
/// `original.url` if available, for the tooltip / row meta text.
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub url: Option<String>,
|
|
}
|
|
|
|
/// Walk `flake.lock`'s `nodes` graph from `root` and emit one
|
|
/// `MetaInputView` per fetched input, at **every** depth. That
|
|
/// surfaces the direct meta inputs (`hyperhive`, `agent-<n>`), the
|
|
/// agent flakes' own inputs (`agent-dmatrix/mcp-matrix`,
|
|
/// `hyperhive/nixpkgs`), and any deeper transitive inputs — so the
|
|
/// operator can bump any of them individually. Names are
|
|
/// slash-separated paths from root, the syntax `nix flake update`
|
|
/// accepts for transitive inputs.
|
|
///
|
|
/// Filtering (see issue #275):
|
|
/// - Inputs that resolve via a `follows` chain (lock value is an
|
|
/// array) are skipped — they alias another node, not their own
|
|
/// fetched derivation, so updating them does nothing.
|
|
/// - A node is emitted only when it carries a `locked.rev`.
|
|
/// - Each fetched node is walked exactly once (a `visited` set):
|
|
/// the lock graph shares nodes (many flakes reference one
|
|
/// nixpkgs), so without this a shared subtree re-walks per parent
|
|
/// and a cycle would recurse forever. The result is a spanning
|
|
/// tree — every input shown once, at its shallowest path.
|
|
fn read_meta_inputs() -> Vec<MetaInputView> {
|
|
let mut out = Vec::new();
|
|
let Ok(raw) = std::fs::read_to_string("/var/lib/hyperhive/meta/flake.lock") else {
|
|
return out;
|
|
};
|
|
let Ok(json) = serde_json::from_str::<serde_json::Value>(&raw) else {
|
|
return out;
|
|
};
|
|
let Some(nodes) = json.get("nodes").and_then(|v| v.as_object()) else {
|
|
return out;
|
|
};
|
|
let Some(root_name) = json.get("root").and_then(|v| v.as_str()) else {
|
|
return out;
|
|
};
|
|
let mut visited = std::collections::HashSet::new();
|
|
visited.insert(root_name.to_owned());
|
|
walk_meta_inputs(nodes, root_name, "", &mut visited, &mut out);
|
|
// hyperhive first, then alphabetical. String-sorting the
|
|
// slash-paths puts every node directly above its own children
|
|
// (`agent-foo`, `agent-foo/bar`, `agent-foo/bar/baz`), so the
|
|
// result is a pre-order traversal the tree renderer can consume.
|
|
out.sort_by(|a, b| match (a.name.as_str(), b.name.as_str()) {
|
|
("hyperhive", _) => std::cmp::Ordering::Less,
|
|
(_, "hyperhive") => std::cmp::Ordering::Greater,
|
|
_ => a.name.cmp(&b.name),
|
|
});
|
|
out
|
|
}
|
|
|
|
fn walk_meta_inputs(
|
|
nodes: &serde_json::Map<String, serde_json::Value>,
|
|
node_name: &str,
|
|
prefix: &str,
|
|
visited: &mut std::collections::HashSet<String>,
|
|
out: &mut Vec<MetaInputView>,
|
|
) {
|
|
let Some(node) = nodes.get(node_name) else {
|
|
return;
|
|
};
|
|
let Some(inputs_map) = node.get("inputs").and_then(|v| v.as_object()) else {
|
|
return;
|
|
};
|
|
// Two passes: claim (and emit) every direct input of this node
|
|
// before descending into any of them. A shallow input that a
|
|
// deeper flake also references then keeps its shallow path
|
|
// rather than being captured first by the deep walk.
|
|
let mut to_recurse: Vec<(String, String)> = Vec::new();
|
|
for (alias, target) in inputs_map {
|
|
// Inputs map value is either a string (node name) or an
|
|
// array (a `follows` chain). The latter just aliases another
|
|
// node — we can't `nix flake update` it directly, so skip.
|
|
let serde_json::Value::String(target_name) = target else {
|
|
continue;
|
|
};
|
|
// Walk each fetched node once — guards shared subtrees and
|
|
// cycles, and keeps the panel free of duplicate rows.
|
|
if !visited.insert(target_name.clone()) {
|
|
continue;
|
|
}
|
|
let Some(target_node) = nodes.get(target_name) else {
|
|
continue;
|
|
};
|
|
let path = if prefix.is_empty() {
|
|
alias.clone()
|
|
} else {
|
|
format!("{prefix}/{alias}")
|
|
};
|
|
if let Some(rev) = target_node
|
|
.get("locked")
|
|
.and_then(|v| v.get("rev"))
|
|
.and_then(|v| v.as_str())
|
|
{
|
|
let last_modified = target_node
|
|
.get("locked")
|
|
.and_then(|v| v.get("lastModified"))
|
|
.and_then(serde_json::Value::as_i64)
|
|
.unwrap_or(0);
|
|
let url = target_node
|
|
.get("original")
|
|
.and_then(|v| v.get("url"))
|
|
.and_then(|v| v.as_str())
|
|
.map(str::to_owned);
|
|
out.push(MetaInputView {
|
|
name: path.clone(),
|
|
rev: rev.to_owned(),
|
|
last_modified,
|
|
url,
|
|
});
|
|
}
|
|
to_recurse.push((target_name.clone(), path));
|
|
}
|
|
for (target_name, path) in to_recurse {
|
|
walk_meta_inputs(nodes, &target_name, &path, visited, out);
|
|
}
|
|
}
|
|
|
|
/// Transient state for agents whose container does NOT yet exist
|
|
/// (`Spawning`). Lifecycle ops on existing containers surface as
|
|
/// `ContainerView.pending` inline; this list only catches pre-creation.
|
|
fn build_transient_views(
|
|
containers: &[ContainerView],
|
|
transient_snapshot: &std::collections::HashMap<String, crate::coordinator::TransientState>,
|
|
) -> Vec<TransientView> {
|
|
transient_snapshot
|
|
.iter()
|
|
.filter(|(name, _)| !containers.iter().any(|c| &c.name == *name))
|
|
.map(|(name, st)| TransientView {
|
|
name: name.clone(),
|
|
kind: transient_label(st.kind),
|
|
secs: st.since.elapsed().as_secs(),
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Render each pending approval into its dashboard view (short sha +
|
|
/// unified diff for `ApplyCommit`, just the name for `Spawn`).
|
|
/// Project a resolved sqlite row into the lean shape the dashboard
|
|
/// history tab consumes — no `diff_html` (rendering 30 of them
|
|
/// per /api/state poll would mean 30 git diffs per refresh).
|
|
fn history_view(a: Approval) -> ApprovalHistoryView {
|
|
let displayed = a.fetched_sha.as_deref().unwrap_or(&a.commit_ref);
|
|
let sha_short = if displayed.is_empty() {
|
|
None
|
|
} else {
|
|
Some(displayed[..displayed.len().min(12)].to_owned())
|
|
};
|
|
let status = match a.status {
|
|
hive_sh4re::ApprovalStatus::Approved => "approved",
|
|
hive_sh4re::ApprovalStatus::Denied => "denied",
|
|
hive_sh4re::ApprovalStatus::Failed => "failed",
|
|
// Pending shouldn't appear in recent_resolved, but be defensive.
|
|
hive_sh4re::ApprovalStatus::Pending => "pending",
|
|
};
|
|
let kind = match a.kind {
|
|
hive_sh4re::ApprovalKind::ApplyCommit => "apply_commit",
|
|
hive_sh4re::ApprovalKind::Spawn => "spawn",
|
|
hive_sh4re::ApprovalKind::InitConfig => "init_config",
|
|
hive_sh4re::ApprovalKind::UpdateMetaInputs => "update_meta_inputs",
|
|
};
|
|
ApprovalHistoryView {
|
|
id: a.id,
|
|
agent: a.agent,
|
|
kind,
|
|
sha_short,
|
|
status,
|
|
resolved_at: a.resolved_at.unwrap_or(0),
|
|
note: a.note,
|
|
}
|
|
}
|
|
|
|
async fn build_approval_views(approvals: Vec<Approval>) -> Vec<ApprovalView> {
|
|
let mut out = Vec::with_capacity(approvals.len());
|
|
for a in approvals {
|
|
out.push(match a.kind {
|
|
hive_sh4re::ApprovalKind::ApplyCommit => {
|
|
// Prefer the canonical fetched sha from applied;
|
|
// commit_ref is only the manager's claim and may be
|
|
// amended out from under us.
|
|
let displayed = a.fetched_sha.as_deref().unwrap_or(&a.commit_ref);
|
|
let sha = displayed[..displayed.len().min(12)].to_owned();
|
|
let diff = approval_diff(&a.agent, a.id).await;
|
|
ApprovalView {
|
|
id: a.id,
|
|
agent: a.agent.clone(),
|
|
kind: "apply_commit",
|
|
sha_short: Some(sha),
|
|
diff: Some(diff),
|
|
description: a.description,
|
|
requested_at: a.requested_at,
|
|
}
|
|
}
|
|
hive_sh4re::ApprovalKind::Spawn => ApprovalView {
|
|
id: a.id,
|
|
agent: a.agent,
|
|
kind: "spawn",
|
|
sha_short: None,
|
|
diff: None,
|
|
description: a.description,
|
|
requested_at: a.requested_at,
|
|
},
|
|
hive_sh4re::ApprovalKind::InitConfig => ApprovalView {
|
|
id: a.id,
|
|
agent: a.agent,
|
|
kind: "init_config",
|
|
sha_short: None,
|
|
diff: None,
|
|
description: a.description,
|
|
requested_at: a.requested_at,
|
|
},
|
|
hive_sh4re::ApprovalKind::UpdateMetaInputs => ApprovalView {
|
|
id: a.id,
|
|
agent: a.agent,
|
|
kind: "update_meta_inputs",
|
|
sha_short: None,
|
|
diff: None,
|
|
description: a.description,
|
|
requested_at: a.requested_at,
|
|
},
|
|
});
|
|
}
|
|
out
|
|
}
|
|
|
|
/// State-dir names that don't appear in the live container list (and
|
|
/// aren't the manager). Each one surfaces in the dashboard as a row
|
|
/// with R3V1V3 + PURG3 actions.
|
|
fn build_tombstone_views(
|
|
coord: &Coordinator,
|
|
containers: &[ContainerView],
|
|
transient_snapshot: &std::collections::HashMap<String, crate::coordinator::TransientState>,
|
|
) -> Vec<TombstoneView> {
|
|
let _ = coord; // kept_state_names is a free fn but takes &self by future plan
|
|
let live: std::collections::HashSet<&str> = containers
|
|
.iter()
|
|
.map(|c| c.name.as_str())
|
|
.chain(transient_snapshot.keys().map(String::as_str))
|
|
.collect();
|
|
Coordinator::kept_state_names()
|
|
.into_iter()
|
|
.filter(|name| name != MANAGER_NAME && !live.contains(name.as_str()))
|
|
.map(|name| {
|
|
let root = Coordinator::agent_state_root(&name);
|
|
let state_bytes = dir_size_bytes(&root);
|
|
let last_seen = std::fs::metadata(&root)
|
|
.and_then(|m| m.modified())
|
|
.ok()
|
|
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
|
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
|
.unwrap_or(0);
|
|
let has_creds = claude_has_session(&Coordinator::agent_claude_dir(&name));
|
|
TombstoneView {
|
|
name,
|
|
state_bytes,
|
|
last_seen,
|
|
has_creds,
|
|
}
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Sum the byte size of every regular file under `root`. Cheap to compute
|
|
/// for typical agent state (config repo + claude creds + notes file —
|
|
/// usually a few MB); fine to do inline on each /api/state. Returns 0 on
|
|
/// any error.
|
|
fn dir_size_bytes(root: &Path) -> u64 {
|
|
fn walk(p: &Path, acc: &mut u64) {
|
|
let Ok(rd) = std::fs::read_dir(p) else { return };
|
|
for entry in rd.flatten() {
|
|
let Ok(ft) = entry.file_type() else { continue };
|
|
if ft.is_dir() {
|
|
walk(&entry.path(), acc);
|
|
} else if ft.is_file()
|
|
&& let Ok(meta) = entry.metadata()
|
|
{
|
|
*acc += meta.len();
|
|
}
|
|
}
|
|
}
|
|
let mut total = 0u64;
|
|
walk(root, &mut total);
|
|
total
|
|
}
|
|
|
|
async fn dashboard_history(State(state): State<AppState>) -> Response {
|
|
// Backfill source for the dashboard terminal. Returns up to ~200
|
|
// historical broker messages (no other event kinds are persisted)
|
|
// converted to `DashboardEvent::Sent` JSON so the client can replay
|
|
// through the same dispatch path as live frames. Wrapped in
|
|
// `{ seq, events }`: the seq is the dashboard channel's high-water
|
|
// mark at fetch time. Clients use it to dedupe their buffered live
|
|
// SSE traffic (drop anything with `seq <= history_seq`) so a frame
|
|
// that lands between SSE-subscribe and history-fetch isn't shown
|
|
// twice and isn't lost. Historical rows carry `seq = 0`; the
|
|
// boundary seq is what closes the dedupe window.
|
|
const HISTORY_LIMIT: u64 = 200;
|
|
let seq = state.coord.current_seq();
|
|
match state.coord.broker.recent_all(HISTORY_LIMIT) {
|
|
Ok(mut messages) => {
|
|
messages.reverse();
|
|
let events: Vec<crate::dashboard_events::DashboardEvent> = messages
|
|
.into_iter()
|
|
.map(|m| match m {
|
|
crate::broker::MessageEvent::Sent { id, from, to, body, at, in_reply_to } => {
|
|
let file_refs = scan_validated_paths(&body);
|
|
crate::dashboard_events::DashboardEvent::Sent {
|
|
seq: 0,
|
|
id,
|
|
from,
|
|
to,
|
|
body,
|
|
at,
|
|
in_reply_to,
|
|
file_refs,
|
|
}
|
|
}
|
|
crate::broker::MessageEvent::Delivered { id, from, to, body, at, in_reply_to } => {
|
|
let file_refs = scan_validated_paths(&body);
|
|
crate::dashboard_events::DashboardEvent::Delivered {
|
|
seq: 0,
|
|
id,
|
|
from,
|
|
to,
|
|
body,
|
|
at,
|
|
in_reply_to,
|
|
file_refs,
|
|
}
|
|
}
|
|
})
|
|
.collect();
|
|
axum::Json(serde_json::json!({ "seq": seq, "events": events })).into_response()
|
|
}
|
|
Err(e) => error_response(&format!("dashboard/history failed: {e:#}")),
|
|
}
|
|
}
|
|
|
|
async fn dashboard_stream(
|
|
State(state): State<AppState>,
|
|
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
|
|
let rx = state.coord.dashboard_subscribe();
|
|
let stream = BroadcastStream::new(rx).filter_map(|res| {
|
|
// Drop lagged frames. Browsers reconnect; the seq dedupe on
|
|
// reconnect skips any frame already reflected in the snapshot.
|
|
let event = res.ok()?;
|
|
let json = serde_json::to_string(&event).ok()?;
|
|
Some(Ok(Event::default().data(json)))
|
|
});
|
|
Sse::new(stream).keep_alive(KeepAlive::default())
|
|
}
|
|
|
|
async fn post_approve(State(state): State<AppState>, AxumPath(id): AxumPath<i64>) -> Response {
|
|
match actions::approve(state.coord.clone(), id).await {
|
|
// 200 instead of 303 — `actions::approve` fires
|
|
// `ApprovalResolved` (success path) or the eventual failure
|
|
// event, both of which the dashboard's derived store applies
|
|
// live. The matching form carries `data-no-refresh`.
|
|
Ok(()) => (StatusCode::OK, "ok").into_response(),
|
|
Err(e) => error_response(&format!("approve {id} failed: {e:#}")),
|
|
}
|
|
}
|
|
|
|
#[derive(Deserialize, Default)]
|
|
struct DenyForm {
|
|
#[serde(default)]
|
|
note: Option<String>,
|
|
}
|
|
|
|
async fn post_deny(
|
|
State(state): State<AppState>,
|
|
AxumPath(id): AxumPath<i64>,
|
|
Form(form): Form<DenyForm>,
|
|
) -> Response {
|
|
let note = form
|
|
.note
|
|
.as_deref()
|
|
.map(str::trim)
|
|
.filter(|s| !s.is_empty());
|
|
match actions::deny(&state.coord, id, note).await {
|
|
Ok(()) => (StatusCode::OK, "ok").into_response(),
|
|
Err(e) => error_response(&format!("deny {id} failed: {e:#}")),
|
|
}
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct RequestSpawnForm {
|
|
name: String,
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct AnswerForm {
|
|
answer: String,
|
|
}
|
|
|
|
/// Attach a permissive CORS header so the per-agent web UI — served on
|
|
/// a different port — can POST an operator answer here and read the
|
|
/// result. The dashboard has no auth, so `*` exposes nothing a plain
|
|
/// cross-origin form-POST couldn't already reach. This shim disappears
|
|
/// once the unifying gateway makes the agent page same-origin; see
|
|
/// `docs/boundary.md`.
|
|
fn with_cors(mut resp: Response) -> Response {
|
|
resp.headers_mut().insert(
|
|
axum::http::header::ACCESS_CONTROL_ALLOW_ORIGIN,
|
|
axum::http::HeaderValue::from_static("*"),
|
|
);
|
|
resp
|
|
}
|
|
|
|
async fn post_answer_question(
|
|
State(state): State<AppState>,
|
|
AxumPath(id): AxumPath<i64>,
|
|
Form(form): Form<AnswerForm>,
|
|
) -> Response {
|
|
let answer = form.answer.trim();
|
|
if answer.is_empty() {
|
|
return with_cors(error_response("answer: required"));
|
|
}
|
|
let resp = match state
|
|
.coord
|
|
.questions
|
|
.answer(id, answer, hive_sh4re::OPERATOR_RECIPIENT)
|
|
{
|
|
Ok((question, asker, target)) => {
|
|
tracing::info!(%id, %asker, "operator answered question");
|
|
state.coord.notify_agent(
|
|
&asker,
|
|
&hive_sh4re::HelperEvent::QuestionAnswered {
|
|
id,
|
|
question,
|
|
answer: answer.to_owned(),
|
|
answerer: hive_sh4re::OPERATOR_RECIPIENT.to_owned(),
|
|
},
|
|
);
|
|
state.coord.emit_question_resolved(
|
|
id,
|
|
answer,
|
|
hive_sh4re::OPERATOR_RECIPIENT,
|
|
false,
|
|
target.as_deref(),
|
|
);
|
|
(StatusCode::OK, "ok").into_response()
|
|
}
|
|
Err(e) => error_response(&format!("answer {id} failed: {e:#}")),
|
|
};
|
|
with_cors(resp)
|
|
}
|
|
|
|
/// Resolve a pending operator question with a sentinel answer when
|
|
/// the operator decides not to / can't answer. The asker harness
|
|
/// receives a `QuestionAnswered` event with `answer = "[cancelled]"`
|
|
/// so it can fall back on whatever default it had. Same code path as
|
|
/// a real answer — just lets the operator close the loop instead of
|
|
/// letting the question dangle forever.
|
|
async fn post_cancel_question(
|
|
State(state): State<AppState>,
|
|
AxumPath(id): AxumPath<i64>,
|
|
) -> Response {
|
|
const SENTINEL: &str = "[cancelled]";
|
|
match state
|
|
.coord
|
|
.questions
|
|
.answer(id, SENTINEL, hive_sh4re::OPERATOR_RECIPIENT)
|
|
{
|
|
Ok((question, asker, target)) => {
|
|
tracing::info!(%id, %asker, "operator cancelled question");
|
|
state.coord.emit_question_resolved(
|
|
id,
|
|
SENTINEL,
|
|
hive_sh4re::OPERATOR_RECIPIENT,
|
|
true,
|
|
target.as_deref(),
|
|
);
|
|
state.coord.notify_agent_from(
|
|
hive_sh4re::OPERATOR_RECIPIENT,
|
|
&asker,
|
|
&hive_sh4re::HelperEvent::QuestionAnswered {
|
|
id,
|
|
question,
|
|
answer: SENTINEL.to_owned(),
|
|
answerer: hive_sh4re::OPERATOR_RECIPIENT.to_owned(),
|
|
},
|
|
);
|
|
(StatusCode::OK, "ok").into_response()
|
|
}
|
|
Err(e) => error_response(&format!("cancel-question {id} failed: {e:#}")),
|
|
}
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct JournalQuery {
|
|
/// Optional systemd unit filter — e.g. `hive-ag3nt.service`. When
|
|
/// omitted, returns the full machine journal.
|
|
#[serde(default)]
|
|
unit: Option<String>,
|
|
/// Number of trailing lines to return. Capped at 5000.
|
|
#[serde(default)]
|
|
lines: Option<u32>,
|
|
}
|
|
|
|
/// Shell out to `journalctl -M <container> -b` and return its text
|
|
/// output. Operator-only by virtue of the dashboard being host-bound;
|
|
/// hive-c0re already runs as root in its systemd unit so journalctl
|
|
/// has the access it needs.
|
|
async fn get_journal(
|
|
AxumPath(name): AxumPath<String>,
|
|
axum::extract::Query(q): axum::extract::Query<JournalQuery>,
|
|
) -> Response {
|
|
// Validate the container name against the list of managed
|
|
// containers so we don't shell out with arbitrary input.
|
|
let container = strip_container_prefix(&name);
|
|
let prefixed = if container == lifecycle::MANAGER_NAME {
|
|
container.clone()
|
|
} else {
|
|
format!("{}{container}", lifecycle::AGENT_PREFIX)
|
|
};
|
|
let live = lifecycle::list().await.unwrap_or_default();
|
|
if !live.iter().any(|c| c == &prefixed) {
|
|
return error_response(&format!("journal: no managed container {prefixed:?}"));
|
|
}
|
|
let lines = q.lines.unwrap_or(500).min(5000);
|
|
let mut cmd = tokio::process::Command::new("journalctl");
|
|
cmd.args([
|
|
"-M",
|
|
&prefixed,
|
|
"-b",
|
|
"--no-pager",
|
|
"--output=short-iso",
|
|
"--lines",
|
|
])
|
|
.arg(lines.to_string());
|
|
if let Some(u) = q.unit.as_deref().filter(|s| !s.is_empty()) {
|
|
// accept hive-ag3nt[.service] / hive-m1nd[.service] — anything
|
|
// else we refuse, again to keep the shell-out tight.
|
|
let allowed = ["hive-ag3nt.service", "hive-m1nd.service"];
|
|
let unit = if u.ends_with(".service") {
|
|
u.to_owned()
|
|
} else {
|
|
format!("{u}.service")
|
|
};
|
|
if !allowed.contains(&unit.as_str()) {
|
|
return error_response(&format!("journal: unknown unit {unit:?}"));
|
|
}
|
|
cmd.args(["-u", &unit]);
|
|
}
|
|
match cmd.output().await {
|
|
Ok(out) => {
|
|
// Combine stdout + stderr — journalctl emits to both on errors.
|
|
let mut body = String::from_utf8_lossy(&out.stdout).into_owned();
|
|
if !out.status.success() {
|
|
body.push_str("\n--- stderr ---\n");
|
|
body.push_str(&String::from_utf8_lossy(&out.stderr));
|
|
}
|
|
([("content-type", "text/plain; charset=utf-8")], body).into_response()
|
|
}
|
|
Err(e) => error_response(&format!("journalctl spawn: {e}")),
|
|
}
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct StateFileQuery {
|
|
path: String,
|
|
}
|
|
|
|
/// Bounded-size read of a file under one of two allow-listed
|
|
/// roots: `/var/lib/hyperhive/agents/<n>/state/` (per-agent durable
|
|
/// notes — the only writable path agents have outside their
|
|
/// container) and `/var/lib/hyperhive/shared/` (shared docs). Both
|
|
/// path forms are accepted:
|
|
/// - canonical host: `/var/lib/hyperhive/agents/alice/state/foo.md`
|
|
/// - container view: `/agents/alice/state/foo.md`
|
|
/// - shared: `/shared/foo.md`
|
|
///
|
|
/// `/state/...` on its own is *not* accepted — the in-container
|
|
/// mount is ambiguous from the host's perspective (we don't know
|
|
/// which agent's `/state` it refers to) and using it would silently
|
|
/// resolve to the wrong file.
|
|
///
|
|
/// Path is canonicalised before the allow-list check so `..`
|
|
/// traversal and symlink games can't escape the roots. Files larger
|
|
/// than `MAX_BYTES` are truncated with a banner so a runaway log
|
|
/// can't OOM the browser.
|
|
/// Resolve a caller-supplied path string to a canonical host path
|
|
/// that has been verified against the allow-list. Returns `Err`
|
|
/// with a human-readable reason for every failure mode (path
|
|
/// outside roots, canonicalize failure, escape via symlink,
|
|
/// per-agent subdir not `state`, symlink anywhere below the root,
|
|
/// file not world-readable). Shared by `get_state_file` (read) and
|
|
/// `scan_validated_paths` (linkify candidates in message bodies)
|
|
/// so both apply identical security rules and the linkifier
|
|
/// doesn't render a path the reader will refuse to serve.
|
|
///
|
|
/// Defense-in-depth layers (in order):
|
|
/// 1. Caller-supplied prefix has to match the allow-list (agents/
|
|
/// or shared/), else reject without touching the fs.
|
|
/// 2. No symlinks below the matched root. Walked pre-canonicalize
|
|
/// via `symlink_metadata` on each component so a sub-agent that
|
|
/// plants `ln -s /var/lib/hyperhive/agents/other/state/secret
|
|
/// /agents/me/state/peek` can't proxy a different agent's file
|
|
/// through this endpoint (canonicalize would happily resolve
|
|
/// the symlink to a path inside the allow-list).
|
|
/// 3. Canonicalize is run anyway as a belt-and-braces check —
|
|
/// resolves `..`/`.` traversal and rejects if the result
|
|
/// escapes the roots.
|
|
/// 4. Under `AGENTS_ROOT`, the second path component must be
|
|
/// `state/` — agents' applied/proposed git repos and config dirs
|
|
/// are off-limits.
|
|
/// 5. The target's metadata is fetched once and returned to the
|
|
/// caller so they don't restat. If the target is a regular
|
|
/// file it must be world-readable (mode & 0o004); a 0600 file
|
|
/// inside `state/` could leak through this endpoint to anyone
|
|
/// holding the dashboard URL otherwise.
|
|
fn resolve_state_path(
|
|
raw: &str,
|
|
) -> std::result::Result<(std::path::PathBuf, std::fs::Metadata), String> {
|
|
use std::os::unix::fs::PermissionsExt as _;
|
|
const AGENTS_ROOT: &str = "/var/lib/hyperhive/agents";
|
|
const SHARED_ROOT: &str = "/var/lib/hyperhive/shared";
|
|
let raw = raw.trim();
|
|
let (mapped, root): (std::path::PathBuf, &str) =
|
|
if let Some(rest) = raw.strip_prefix("/agents/") {
|
|
(
|
|
std::path::PathBuf::from(format!("{AGENTS_ROOT}/{rest}")),
|
|
AGENTS_ROOT,
|
|
)
|
|
} else if let Some(rest) = raw.strip_prefix("/shared/") {
|
|
(
|
|
std::path::PathBuf::from(format!("{SHARED_ROOT}/{rest}")),
|
|
SHARED_ROOT,
|
|
)
|
|
} else if let Some(rest) = raw.strip_prefix(&format!("{AGENTS_ROOT}/")) {
|
|
(
|
|
std::path::PathBuf::from(format!("{AGENTS_ROOT}/{rest}")),
|
|
AGENTS_ROOT,
|
|
)
|
|
} else if let Some(rest) = raw.strip_prefix(&format!("{SHARED_ROOT}/")) {
|
|
(
|
|
std::path::PathBuf::from(format!("{SHARED_ROOT}/{rest}")),
|
|
SHARED_ROOT,
|
|
)
|
|
} else {
|
|
return Err(format!("path not in allow-list: {raw}"));
|
|
};
|
|
reject_symlinks_below(std::path::Path::new(root), &mapped)?;
|
|
let canonical = std::fs::canonicalize(&mapped)
|
|
.map_err(|e| format!("{}: {e}", mapped.display()))?;
|
|
if !(canonical.starts_with(AGENTS_ROOT) || canonical.starts_with(SHARED_ROOT)) {
|
|
return Err(format!(
|
|
"resolved path escapes allow-list: {}",
|
|
canonical.display()
|
|
));
|
|
}
|
|
if let Ok(rel) = canonical.strip_prefix(AGENTS_ROOT) {
|
|
let mut components = rel.components();
|
|
let _agent = components.next();
|
|
let dir = components.next().and_then(|c| c.as_os_str().to_str());
|
|
if dir != Some("state") {
|
|
return Err(format!(
|
|
"only per-agent state/ is readable here ({} dir not allowed)",
|
|
dir.unwrap_or("(root)")
|
|
));
|
|
}
|
|
}
|
|
let meta = std::fs::metadata(&canonical)
|
|
.map_err(|e| format!("stat {}: {e}", canonical.display()))?;
|
|
if meta.is_file() {
|
|
let mode = meta.permissions().mode();
|
|
if mode & 0o004 == 0 {
|
|
return Err(format!(
|
|
"{} not world-readable (mode 0{:o}); refusing to proxy non-public file",
|
|
canonical.display(),
|
|
mode & 0o777,
|
|
));
|
|
}
|
|
}
|
|
Ok((canonical, meta))
|
|
}
|
|
|
|
/// Walk every path component under `root` and refuse if any of
|
|
/// them is a symlink. The roots themselves (`AGENTS_ROOT`,
|
|
/// `SHARED_ROOT`) are hive-c0re-owned and assumed trusted; only
|
|
/// the parts the agent / operator can plant matter. Components
|
|
/// that don't exist yet are skipped — `canonicalize` reports
|
|
/// non-existence separately, and missing-component checks would
|
|
/// just race the filesystem.
|
|
fn reject_symlinks_below(
|
|
root: &std::path::Path,
|
|
mapped: &std::path::Path,
|
|
) -> std::result::Result<(), String> {
|
|
let Ok(rel) = mapped.strip_prefix(root) else {
|
|
return Ok(());
|
|
};
|
|
let mut cumulative = root.to_path_buf();
|
|
for component in rel.components() {
|
|
match component {
|
|
std::path::Component::Normal(name) => {
|
|
cumulative.push(name);
|
|
match std::fs::symlink_metadata(&cumulative) {
|
|
Ok(m) if m.file_type().is_symlink() => {
|
|
return Err(format!(
|
|
"symlink at {} not allowed (canonicalize would resolve it past the \
|
|
allow-list check; refuse outright)",
|
|
cumulative.display()
|
|
));
|
|
}
|
|
Ok(_) | Err(_) => {}
|
|
}
|
|
}
|
|
std::path::Component::ParentDir => {
|
|
return Err(format!(
|
|
"path contains `..` traversal below {}; refuse outright",
|
|
root.display()
|
|
));
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use std::os::unix::fs::symlink;
|
|
|
|
/// Make a unique tmp subdir for the calling test. Caller is responsible
|
|
/// for cleanup (we leak on panic, fine for ephemeral CI runs).
|
|
fn tmproot(tag: &str) -> std::path::PathBuf {
|
|
let ts = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.map(|d| d.as_nanos())
|
|
.unwrap_or(0);
|
|
let p = std::env::temp_dir().join(format!("hyperhive-test-{tag}-{ts}"));
|
|
std::fs::create_dir_all(&p).unwrap();
|
|
p
|
|
}
|
|
|
|
#[test]
|
|
fn reject_symlinks_below_accepts_plain_dirs_and_files() {
|
|
let root = tmproot("symlink-ok");
|
|
std::fs::create_dir_all(root.join("alice/state")).unwrap();
|
|
std::fs::write(root.join("alice/state/notes.md"), b"hi").unwrap();
|
|
assert!(reject_symlinks_below(&root, &root.join("alice/state/notes.md")).is_ok());
|
|
}
|
|
|
|
#[test]
|
|
fn reject_symlinks_below_rejects_leaf_symlink() {
|
|
let root = tmproot("symlink-leaf");
|
|
std::fs::create_dir_all(root.join("alice/state")).unwrap();
|
|
// Plant a symlink that points anywhere; resolve_state_path's
|
|
// canonicalize would happily resolve it past the allow-list
|
|
// check, so we have to refuse at the un-canonical layer.
|
|
symlink("/etc/shadow", root.join("alice/state/peek")).unwrap();
|
|
let err = reject_symlinks_below(&root, &root.join("alice/state/peek")).unwrap_err();
|
|
assert!(err.contains("symlink at"), "msg = {err}");
|
|
assert!(err.contains("peek"), "msg = {err}");
|
|
}
|
|
|
|
#[test]
|
|
fn reject_symlinks_below_rejects_directory_symlink_in_middle() {
|
|
let root = tmproot("symlink-mid");
|
|
std::fs::create_dir_all(root.join("real/state")).unwrap();
|
|
std::fs::write(root.join("real/state/secret.md"), b"hi").unwrap();
|
|
// alice's "state" dir is actually a symlink to real/state — a
|
|
// sub-agent shouldn't be able to plant this and proxy real's
|
|
// private files via the dashboard.
|
|
std::fs::create_dir_all(root.join("alice")).unwrap();
|
|
symlink(root.join("real/state"), root.join("alice/state")).unwrap();
|
|
let err = reject_symlinks_below(&root, &root.join("alice/state/secret.md")).unwrap_err();
|
|
assert!(err.contains("symlink at"), "msg = {err}");
|
|
}
|
|
|
|
#[test]
|
|
fn reject_symlinks_below_rejects_parent_dir_traversal() {
|
|
let root = tmproot("symlink-dotdot");
|
|
// `..` doesn't survive canonicalize anyway, but we want a
|
|
// friendlier error than "path escapes allow-list" — refusing
|
|
// upfront also avoids walking ancestors with `symlink_metadata`.
|
|
let p = root.join("alice/state/../escape");
|
|
let err = reject_symlinks_below(&root, &p).unwrap_err();
|
|
assert!(err.contains("`..`"), "msg = {err}");
|
|
}
|
|
|
|
#[test]
|
|
fn reject_symlinks_below_passes_through_when_path_not_under_root() {
|
|
// resolve_state_path's earlier allow-list check would reject
|
|
// this; reject_symlinks_below stays a no-op so the caller
|
|
// surfaces the better-fit error.
|
|
let root = std::path::Path::new("/var/lib/hyperhive/agents");
|
|
assert!(reject_symlinks_below(root, std::path::Path::new("/etc/shadow")).is_ok());
|
|
}
|
|
}
|
|
|
|
/// Snapshot the current tombstone list and emit a
|
|
/// `TombstonesChanged` event. Call after any mutation that could
|
|
/// add or remove a tombstone (`actions::destroy`,
|
|
/// `post_purge_tombstone`, spawn finalisation). Cheap — the list
|
|
/// is tiny.
|
|
pub(crate) async fn emit_tombstones_snapshot(coord: &Arc<Coordinator>) {
|
|
let containers = coord.containers_snapshot().await;
|
|
let transient_snapshot = coord.transient_snapshot();
|
|
let tombstones = build_tombstone_views(coord, &containers, &transient_snapshot);
|
|
coord.emit_dashboard_event(
|
|
crate::dashboard_events::DashboardEvent::TombstonesChanged {
|
|
seq: coord.next_seq(),
|
|
tombstones,
|
|
},
|
|
);
|
|
}
|
|
|
|
/// Snapshot meta/flake.lock's root inputs + emit
|
|
/// `MetaInputsChanged`. Call after any mutation that bumps a lock
|
|
/// (`run_meta_update`, `auto_update::rebuild_agent`).
|
|
pub(crate) fn emit_meta_inputs_snapshot(coord: &Coordinator) {
|
|
let inputs = read_meta_inputs();
|
|
coord.emit_dashboard_event(
|
|
crate::dashboard_events::DashboardEvent::MetaInputsChanged {
|
|
seq: coord.next_seq(),
|
|
inputs,
|
|
},
|
|
);
|
|
}
|
|
|
|
/// Scan `body` for path-shaped tokens, validate each against the
|
|
/// allow-list, return the unique set of tokens that resolve to a
|
|
/// regular file. Called at broker-message ingest time so the
|
|
/// dashboard event already carries the verified set — no client-
|
|
/// side probe endpoint required, and historical messages get the
|
|
/// same treatment on `/dashboard/history` backfill.
|
|
///
|
|
/// Tokenisation: split on whitespace + a handful of trailing
|
|
/// punctuation chars (`,;:)]}`) that commonly follow paths in
|
|
/// natural-language text but aren't part of the path itself. Any
|
|
/// token starting with `/agents/`, `/shared/`, or
|
|
/// `/var/lib/hyperhive/{agents,shared}/` is a candidate. The
|
|
/// allow-list + `is_file` check happens via the same
|
|
/// `resolve_state_path` helper the read endpoint uses, so the
|
|
/// security rules can't drift.
|
|
pub(crate) fn scan_validated_paths(body: &str) -> Vec<String> {
|
|
const PREFIXES: [&str; 4] = [
|
|
"/agents/",
|
|
"/shared/",
|
|
"/var/lib/hyperhive/agents/",
|
|
"/var/lib/hyperhive/shared/",
|
|
];
|
|
let mut out = Vec::<String>::new();
|
|
for raw in body.split(|c: char| c.is_whitespace()) {
|
|
// Trim trailing natural-language punctuation that wouldn't
|
|
// be part of any real path. Inline rather than via a regex
|
|
// dep — the set is small and the call is hot.
|
|
let token = raw.trim_end_matches([',', ';', ':', ')', ']', '}', '.', '\'', '"']);
|
|
if token.is_empty() {
|
|
continue;
|
|
}
|
|
if !PREFIXES.iter().any(|p| token.starts_with(p)) {
|
|
continue;
|
|
}
|
|
// Cheap dedupe — typical message has 0-3 refs.
|
|
if out.iter().any(|s| s == token) {
|
|
continue;
|
|
}
|
|
if let Ok((_canonical, meta)) = resolve_state_path(token)
|
|
&& meta.is_file()
|
|
{
|
|
out.push(token.to_owned());
|
|
}
|
|
}
|
|
out
|
|
}
|
|
|
|
async fn get_state_file(
|
|
axum::extract::Query(q): axum::extract::Query<StateFileQuery>,
|
|
) -> Response {
|
|
const MAX_BYTES: usize = 1 << 20; // 1 MiB
|
|
let (canonical, meta) = match resolve_state_path(&q.path) {
|
|
Ok(pair) => pair,
|
|
Err(e) => return error_response(&format!("state-file: {e}")),
|
|
};
|
|
if !meta.is_file() {
|
|
return error_response(&format!(
|
|
"state-file: {} is not a regular file",
|
|
canonical.display()
|
|
));
|
|
}
|
|
let size = meta.len();
|
|
let bytes = match std::fs::read(&canonical) {
|
|
Ok(b) => b,
|
|
Err(e) => return error_response(&format!("state-file: read {}: {e}", canonical.display())),
|
|
};
|
|
// Raster images: serve the raw bytes with their real content-type
|
|
// so the dashboard can render them in an <img>. Not truncated —
|
|
// a clipped binary is corrupt, so over-cap images are rejected
|
|
// instead. (SVG stays on the text path: it's text, and the client
|
|
// renders it via a data: URI.)
|
|
if let Some(ct) = image_content_type(&canonical) {
|
|
if bytes.len() > MAX_BYTES {
|
|
return error_response(&format!(
|
|
"state-file: image {} is {size} bytes, over the {MAX_BYTES}-byte preview cap",
|
|
canonical.display()
|
|
));
|
|
}
|
|
return ([("content-type", ct)], bytes).into_response();
|
|
}
|
|
let truncated = bytes.len() > MAX_BYTES;
|
|
let body_bytes = if truncated { &bytes[..MAX_BYTES] } else { &bytes[..] };
|
|
let mut body = String::from_utf8_lossy(body_bytes).into_owned();
|
|
if truncated {
|
|
use std::fmt::Write as _;
|
|
let _ = write!(body, "\n\n--- truncated at {MAX_BYTES} of {size} bytes ---\n");
|
|
}
|
|
([("content-type", "text/plain; charset=utf-8")], body).into_response()
|
|
}
|
|
|
|
/// Content-type for a raster image the dashboard can preview in an
|
|
/// `<img>`, keyed off the file extension. `None` for non-image, SVG,
|
|
/// and text files (SVG is served on the text path and rendered
|
|
/// client-side via a `data:` URI).
|
|
fn image_content_type(path: &Path) -> Option<&'static str> {
|
|
let ext = path.extension()?.to_str()?.to_ascii_lowercase();
|
|
Some(match ext.as_str() {
|
|
"png" => "image/png",
|
|
"jpg" | "jpeg" => "image/jpeg",
|
|
"gif" => "image/gif",
|
|
"webp" => "image/webp",
|
|
"bmp" => "image/bmp",
|
|
"ico" => "image/x-icon",
|
|
"avif" => "image/avif",
|
|
_ => return None,
|
|
})
|
|
}
|
|
|
|
async fn api_reminders(State(state): State<AppState>) -> Response {
|
|
match state.coord.broker.list_pending_reminders() {
|
|
Ok(rows) => axum::Json(rows).into_response(),
|
|
Err(e) => error_response(&format!("reminders: {e:#}")),
|
|
}
|
|
}
|
|
|
|
/// Same-origin proxy that fetches the named agent's
|
|
/// `GET /api/state` and forwards only the `links` field to the
|
|
/// dashboard JS (issue #262). Lets the agent backend stay the
|
|
/// single source of truth for its own nav links: the dashboard
|
|
/// card's icon-only strip and the per-agent page's labelled row
|
|
/// render the same list, no shared Rust wire type required, no
|
|
/// CORS surface on the agent side.
|
|
///
|
|
/// Failure modes (agent down, slow response, malformed JSON) all
|
|
/// degrade to an empty list so the dashboard still renders.
|
|
async fn get_agent_links(AxumPath(name): AxumPath<String>) -> Response {
|
|
let port = lifecycle::agent_web_port(&name);
|
|
let url = format!("http://127.0.0.1:{port}/api/state");
|
|
let client = match reqwest::Client::builder()
|
|
.timeout(std::time::Duration::from_secs(2))
|
|
.build()
|
|
{
|
|
Ok(c) => c,
|
|
Err(e) => {
|
|
tracing::warn!(%name, error = %e, "agent-links proxy: client build failed");
|
|
return axum::Json(serde_json::json!([])).into_response();
|
|
}
|
|
};
|
|
match client.get(&url).send().await {
|
|
Ok(resp) if resp.status().is_success() => match resp.json::<serde_json::Value>().await {
|
|
Ok(body) => {
|
|
let links = body.get("links").cloned().unwrap_or_else(|| serde_json::json!([]));
|
|
axum::Json(links).into_response()
|
|
}
|
|
Err(e) => {
|
|
tracing::debug!(%name, error = %e, "agent-links proxy: response parse failed");
|
|
axum::Json(serde_json::json!([])).into_response()
|
|
}
|
|
},
|
|
Ok(resp) => {
|
|
tracing::debug!(%name, status = %resp.status(), "agent-links proxy: non-2xx");
|
|
axum::Json(serde_json::json!([])).into_response()
|
|
}
|
|
Err(e) => {
|
|
tracing::debug!(%name, error = %e, "agent-links proxy: fetch failed");
|
|
axum::Json(serde_json::json!([])).into_response()
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn post_cancel_reminder(
|
|
State(state): State<AppState>,
|
|
AxumPath(id): AxumPath<i64>,
|
|
) -> Response {
|
|
match state.coord.broker.cancel_reminder(id) {
|
|
Ok(0) => error_response(&format!("reminder {id} not pending (already delivered?)")),
|
|
Ok(_) => {
|
|
tracing::info!(%id, "operator cancelled reminder");
|
|
(StatusCode::OK, "ok").into_response()
|
|
}
|
|
Err(e) => error_response(&format!("cancel reminder {id} failed: {e:#}")),
|
|
}
|
|
}
|
|
|
|
/// Reset a pending reminder's failure state so the scheduler
|
|
/// retries it on the next tick. Useful when the failure was
|
|
/// transient (sqlite lock contention, disk full → freed up) and
|
|
/// the operator wants delivery to resume immediately instead of
|
|
/// the row sitting in attempt-count-capped purgatory.
|
|
async fn post_retry_reminder(
|
|
State(state): State<AppState>,
|
|
AxumPath(id): AxumPath<i64>,
|
|
) -> Response {
|
|
match state.coord.broker.reset_reminder_failure(id) {
|
|
Ok(0) => error_response(&format!("reminder {id} not pending (already delivered?)")),
|
|
Ok(_) => {
|
|
tracing::info!(%id, "operator reset reminder failure for retry");
|
|
(StatusCode::OK, "ok").into_response()
|
|
}
|
|
Err(e) => error_response(&format!("retry reminder {id} failed: {e:#}")),
|
|
}
|
|
}
|
|
|
|
async fn post_purge_tombstone(
|
|
State(state): State<AppState>,
|
|
AxumPath(name): AxumPath<String>,
|
|
) -> Response {
|
|
if name == lifecycle::MANAGER_NAME {
|
|
return error_response("refusing to purge the manager's state");
|
|
}
|
|
// Sanity: refuse to purge if a live container still exists with this
|
|
// name. The dashboard already filters tombstones to non-live names,
|
|
// but the operator could send a stale POST.
|
|
let live = lifecycle::list().await.unwrap_or_default();
|
|
if live
|
|
.iter()
|
|
.any(|c| c == &format!("{}{name}", lifecycle::AGENT_PREFIX) || c == &name)
|
|
{
|
|
return error_response(&format!(
|
|
"refusing to purge {name}: container still exists — use DESTR0Y first"
|
|
));
|
|
}
|
|
let mut errors = Vec::new();
|
|
for dir in [
|
|
Coordinator::agent_state_root(&name),
|
|
Coordinator::agent_applied_dir(&name),
|
|
] {
|
|
if dir.exists()
|
|
&& let Err(e) = std::fs::remove_dir_all(&dir)
|
|
{
|
|
errors.push(format!("{}: {e}", dir.display()));
|
|
}
|
|
}
|
|
let _ = state
|
|
.coord
|
|
.approvals
|
|
.fail_pending_for_agent(&name, "agent state purged");
|
|
if errors.is_empty() {
|
|
tracing::info!(%name, "tombstone purged");
|
|
// Fire the post-purge tombstones snapshot so dashboards
|
|
// drop the row live; matching form carries
|
|
// `data-no-refresh`.
|
|
emit_tombstones_snapshot(&state.coord).await;
|
|
(StatusCode::OK, "ok").into_response()
|
|
} else {
|
|
error_response(&format!("purge {name} partial: {}", errors.join(", ")))
|
|
}
|
|
}
|
|
|
|
/// Operator-side compose form on the dashboard terminal. Drops a
|
|
/// message into the broker as `{from: "operator", to, body}`. Same
|
|
/// shape that per-agent web UIs use via `OperatorMsg`, but here the
|
|
/// operator picks the recipient explicitly with `@name`. No
|
|
/// validation that `to` resolves to a known agent — broker accepts
|
|
/// arbitrary recipients (and the agent's inbox grows whether or not
|
|
/// they exist, which is fine for spawn-then-greet flows).
|
|
#[derive(Deserialize)]
|
|
struct OpSendForm {
|
|
to: String,
|
|
body: String,
|
|
}
|
|
|
|
/// Form for `POST /meta-update`. Inputs ride in as a comma-separated
|
|
/// list under the `inputs` field — the JS submitter joins the
|
|
/// checked boxes since axum's `Form` extractor doesn't natively
|
|
/// decode repeated keys without a helper.
|
|
#[derive(Deserialize)]
|
|
struct MetaUpdateForm {
|
|
inputs: String,
|
|
}
|
|
|
|
/// Bulk-update selected meta flake inputs, then rebuild the affected
|
|
/// agents in the background. Idempotent w.r.t. selection — choosing
|
|
/// an input that's already at the latest sha is a no-op (no commit,
|
|
/// no rebuild ripple). Returns immediately after queueing the work;
|
|
/// dashboard polls for progress via container `pending` spinners +
|
|
/// the meta-inputs row sha update.
|
|
async fn post_meta_update(
|
|
State(state): State<AppState>,
|
|
Form(form): Form<MetaUpdateForm>,
|
|
) -> Response {
|
|
let inputs: Vec<String> = form
|
|
.inputs
|
|
.split(',')
|
|
.map(|s| s.trim().to_owned())
|
|
.filter(|s| !s.is_empty())
|
|
.collect();
|
|
if inputs.is_empty() {
|
|
return error_response("meta-update: no inputs selected");
|
|
}
|
|
state.coord.rebuild_queue.enqueue_with_inputs(
|
|
crate::rebuild_queue::QueueKind::MetaUpdate,
|
|
"hyperhive".to_owned(),
|
|
crate::rebuild_queue::QueueSource::Manual,
|
|
format!("meta-update via dashboard ({})", inputs.join(", ")),
|
|
None,
|
|
inputs,
|
|
);
|
|
state.coord.emit_rebuild_queue_snapshot();
|
|
(StatusCode::OK, "ok").into_response()
|
|
}
|
|
|
|
async fn post_op_send(State(state): State<AppState>, Form(form): Form<OpSendForm>) -> Response {
|
|
let to = form.to.trim().to_owned();
|
|
let body = form.body.trim().to_owned();
|
|
if to.is_empty() {
|
|
return error_response("op-send: `to` required");
|
|
}
|
|
if body.is_empty() {
|
|
return error_response("op-send: `body` required");
|
|
}
|
|
if to == "*" {
|
|
let errors = state
|
|
.coord
|
|
.broadcast_send(hive_sh4re::OPERATOR_RECIPIENT, &body);
|
|
if !errors.is_empty() {
|
|
return error_response(&format!("op-send broadcast partial fail: {}", errors.join("; ")));
|
|
}
|
|
} else if let Err(e) = state.coord.broker.send(&hive_sh4re::Message {
|
|
from: hive_sh4re::OPERATOR_RECIPIENT.to_owned(),
|
|
to: to.clone(),
|
|
body,
|
|
in_reply_to: None,
|
|
}) {
|
|
return error_response(&format!("op-send to {to} failed: {e:#}"));
|
|
}
|
|
// 200 instead of 303 → the client doesn't refetch /api/state. The
|
|
// broker `send` already emitted a `MessageEvent` which the
|
|
// dashboard channel forwarder mirrors as `DashboardEvent::Sent`,
|
|
// and the page's terminal + inbox derive from that stream — so the
|
|
// operator's send shows up the same way an agent's send does, with
|
|
// no full-state refresh in between.
|
|
(axum::http::StatusCode::OK, "ok").into_response()
|
|
}
|
|
|
|
async fn post_request_spawn(
|
|
State(state): State<AppState>,
|
|
Form(form): Form<RequestSpawnForm>,
|
|
) -> Response {
|
|
let name = form.name.trim().to_owned();
|
|
if name.is_empty() {
|
|
return error_response("spawn: `name` required");
|
|
}
|
|
match state
|
|
.coord
|
|
.approvals
|
|
.submit_kind(&name, hive_sh4re::ApprovalKind::Spawn, "", None)
|
|
{
|
|
Ok(id) => {
|
|
tracing::info!(%id, %name, "operator: spawn approval queued via dashboard");
|
|
// Phase 5b: notify the dashboard event channel so live
|
|
// subscribers can append the row without a snapshot
|
|
// refetch. Spawn approvals carry no diff/sha.
|
|
state
|
|
.coord
|
|
.emit_approval_added(id, &name, "spawn", None, None, None);
|
|
(StatusCode::OK, "ok").into_response()
|
|
}
|
|
Err(e) => error_response(&format!("request-spawn {name} failed: {e:#}")),
|
|
}
|
|
}
|
|
|
|
async fn post_rebuild(State(state): State<AppState>, AxumPath(name): AxumPath<String>) -> Response {
|
|
let logical = strip_container_prefix(&name);
|
|
state.coord.rebuild_queue.enqueue(
|
|
crate::rebuild_queue::QueueKind::Rebuild,
|
|
logical,
|
|
crate::rebuild_queue::QueueSource::Manual,
|
|
"manual via dashboard ↻ R3BU1LD button".to_owned(),
|
|
None,
|
|
);
|
|
state.coord.emit_rebuild_queue_snapshot();
|
|
(StatusCode::OK, "ok").into_response()
|
|
}
|
|
|
|
/// Common shape for the simple lifecycle action handlers (start /
|
|
/// stop / restart / rebuild): strip the container prefix, mark
|
|
/// transient for the duration so the dashboard can spinner, run the
|
|
/// lifecycle op, clear transient, redirect on success or surface the
|
|
/// error. `verb` only appears in the error message; `extra` runs on
|
|
/// success after `clear_transient` for handlers that need follow-up
|
|
/// (e.g. `kill` also unregisters the agent + fires `HelperEvent`).
|
|
async fn lifecycle_action<F, Fut>(
|
|
state: &AppState,
|
|
name: &str,
|
|
kind: crate::coordinator::TransientKind,
|
|
verb: &str,
|
|
body: F,
|
|
extra: impl FnOnce(&AppState, &str),
|
|
) -> Response
|
|
where
|
|
F: FnOnce(String) -> Fut,
|
|
Fut: std::future::Future<Output = anyhow::Result<()>>,
|
|
{
|
|
let logical = strip_container_prefix(name);
|
|
let guard = state.coord.transient_guard(&logical, kind);
|
|
let result = body(logical.clone()).await;
|
|
drop(guard);
|
|
match result {
|
|
Ok(()) => {
|
|
extra(state, &logical);
|
|
// Rescan so the running/needs_login/needs_update flip on
|
|
// the affected row lands on every dashboard's SSE channel
|
|
// without waiting for a snapshot poll. 200 + matching
|
|
// `data-no-refresh` on the form skip the post-submit
|
|
// /api/state refetch.
|
|
state.coord.rescan_containers_and_emit().await;
|
|
(StatusCode::OK, "ok").into_response()
|
|
}
|
|
Err(e) => error_response(&format!("{verb} {logical} failed: {e:#}")),
|
|
}
|
|
}
|
|
|
|
async fn post_kill(State(state): State<AppState>, AxumPath(name): AxumPath<String>) -> Response {
|
|
let logical = strip_container_prefix(&name);
|
|
if logical == lifecycle::MANAGER_NAME {
|
|
return error_response("kill: refusing to stop the manager");
|
|
}
|
|
lifecycle_action(
|
|
&state,
|
|
&name,
|
|
crate::coordinator::TransientKind::Stopping,
|
|
"kill",
|
|
|n| async move { lifecycle::kill(&n).await },
|
|
|s, n| {
|
|
s.coord.unregister_agent(n);
|
|
s.coord.notify_manager(&hive_sh4re::HelperEvent::Killed {
|
|
agent: n.to_owned(),
|
|
});
|
|
},
|
|
)
|
|
.await
|
|
}
|
|
|
|
async fn post_restart(State(state): State<AppState>, AxumPath(name): AxumPath<String>) -> Response {
|
|
lifecycle_action(
|
|
&state,
|
|
&name,
|
|
crate::coordinator::TransientKind::Restarting,
|
|
"restart",
|
|
|n| async move { lifecycle::restart(&n).await },
|
|
|s, n| s.coord.kick_agent(n, "container restarted"),
|
|
)
|
|
.await
|
|
}
|
|
|
|
async fn post_start(State(state): State<AppState>, AxumPath(name): AxumPath<String>) -> Response {
|
|
lifecycle_action(
|
|
&state,
|
|
&name,
|
|
crate::coordinator::TransientKind::Starting,
|
|
"start",
|
|
|n| async move { lifecycle::start(&n).await },
|
|
|s, n| s.coord.kick_agent(n, "container started"),
|
|
)
|
|
.await
|
|
}
|
|
|
|
async fn post_update_all(State(state): State<AppState>) -> Response {
|
|
let containers = lifecycle::list().await.unwrap_or_default();
|
|
for container in containers {
|
|
let logical = if container == lifecycle::MANAGER_NAME {
|
|
lifecycle::MANAGER_NAME.to_owned()
|
|
} else if let Some(n) = container.strip_prefix(lifecycle::AGENT_PREFIX) {
|
|
n.to_owned()
|
|
} else {
|
|
continue;
|
|
};
|
|
state.coord.rebuild_queue.enqueue(
|
|
crate::rebuild_queue::QueueKind::Rebuild,
|
|
logical,
|
|
crate::rebuild_queue::QueueSource::Manual,
|
|
"manual via dashboard 🌀 UPDATE ALL".to_owned(),
|
|
None,
|
|
);
|
|
}
|
|
state.coord.emit_rebuild_queue_snapshot();
|
|
(StatusCode::OK, "ok").into_response()
|
|
}
|
|
|
|
fn transient_label(k: crate::coordinator::TransientKind) -> &'static str {
|
|
use crate::coordinator::TransientKind::{
|
|
Destroying, Rebuilding, Restarting, Spawning, Starting, Stopping,
|
|
};
|
|
match k {
|
|
Spawning => "spawning",
|
|
Starting => "starting",
|
|
Stopping => "stopping",
|
|
Restarting => "restarting",
|
|
Rebuilding => "rebuilding",
|
|
Destroying => "destroying",
|
|
}
|
|
}
|
|
|
|
/// Convert either a logical name or a container name back to the logical
|
|
/// name. Sub-agents are `h-foo` → `foo`; manager stays `hm1nd`.
|
|
fn strip_container_prefix(name: &str) -> String {
|
|
name.strip_prefix(lifecycle::AGENT_PREFIX)
|
|
.unwrap_or(name)
|
|
.to_owned()
|
|
}
|
|
|
|
#[derive(Deserialize, Default)]
|
|
struct DestroyForm {
|
|
#[serde(default)]
|
|
purge: Option<String>,
|
|
}
|
|
|
|
async fn post_destroy(
|
|
State(state): State<AppState>,
|
|
AxumPath(name): AxumPath<String>,
|
|
Form(form): Form<DestroyForm>,
|
|
) -> Response {
|
|
// Checkbox semantics: any non-empty value (axum sends "on") = purge.
|
|
let purge = form.purge.as_deref().is_some_and(|v| !v.is_empty());
|
|
// `actions::destroy` rescans the container list on success, so the
|
|
// `ContainerRemoved` event lands before we return 200. The matching
|
|
// form carries `data-no-refresh`.
|
|
match actions::destroy(&state.coord, &name, purge).await {
|
|
Ok(()) => (StatusCode::OK, "ok").into_response(),
|
|
Err(e) => error_response(&format!("destroy {name} failed: {e:#}")),
|
|
}
|
|
}
|
|
|
|
fn error_response(message: &str) -> Response {
|
|
// Plain text — the JS app surfaces this in an alert(), so HTML
|
|
// wrapping would just clutter the message.
|
|
(StatusCode::INTERNAL_SERVER_ERROR, message.to_owned()).into_response()
|
|
}
|
|
|
|
/// Filter out approvals whose agent state dir was wiped out from under us
|
|
/// (e.g. by a test script's cleanup). Marks them failed so they fall out of
|
|
/// `pending` on next render.
|
|
fn gc_orphans(coord: &Coordinator, approvals: Vec<Approval>) -> Vec<Approval> {
|
|
approvals
|
|
.into_iter()
|
|
.filter(|a| {
|
|
// Spawn and InitConfig approvals are for not-yet-existent agents;
|
|
// the proposed dir is supposed to be missing.
|
|
if matches!(
|
|
a.kind,
|
|
hive_sh4re::ApprovalKind::Spawn | hive_sh4re::ApprovalKind::InitConfig
|
|
) {
|
|
return true;
|
|
}
|
|
if Coordinator::agent_proposed_dir(&a.agent).exists() {
|
|
true
|
|
} else {
|
|
let note = "agent state dir missing";
|
|
let _ = coord.approvals.mark_failed(a.id, note);
|
|
tracing::info!(id = a.id, agent = %a.agent, "auto-failed orphan approval");
|
|
let sha_short = a
|
|
.fetched_sha
|
|
.as_deref()
|
|
.map(|s| s[..s.len().min(12)].to_owned());
|
|
coord.emit_approval_resolved(
|
|
a.id,
|
|
&a.agent,
|
|
"apply_commit",
|
|
sha_short,
|
|
"failed",
|
|
Some(note.to_owned()),
|
|
a.description.clone(),
|
|
);
|
|
false
|
|
}
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
|
|
/// Multi-file unified diff between the currently-deployed tree and
|
|
/// the proposal for this approval. Runs against the applied repo
|
|
/// since the canonical proposal commit lives there (manager-side
|
|
/// amendments don't move it). Empty output means proposal == main —
|
|
/// a no-op approval.
|
|
///
|
|
/// `pub(crate)` so the manager-socket handler can pre-compute the
|
|
/// diff once at submission time and embed it in the `ApprovalAdded`
|
|
/// dashboard event (instead of forcing the dashboard to wait a
|
|
/// `/api/state` cycle to see the diff for newly-queued approvals).
|
|
pub(crate) async fn approval_diff(agent: &str, approval_id: i64) -> String {
|
|
let applied = Coordinator::agent_applied_dir(agent);
|
|
if !applied.join(".git").exists() {
|
|
return format!("(no applied git repo at {})", applied.display());
|
|
}
|
|
let proposal_ref = format!("refs/tags/proposal/{approval_id}");
|
|
match git_diff_refs(&applied, "refs/heads/main", &proposal_ref).await {
|
|
Ok(s) if s.is_empty() => "(proposal matches currently-deployed tree)".to_owned(),
|
|
Ok(s) => s,
|
|
Err(e) => format!("(error: {e:#})"),
|
|
}
|
|
}
|
|
|
|
async fn git_diff_refs(applied_dir: &Path, base_ref: &str, target_ref: &str) -> Result<String> {
|
|
let out = lifecycle::git_command()
|
|
.current_dir(applied_dir)
|
|
.args(["diff", &format!("{base_ref}..{target_ref}")])
|
|
.output()
|
|
.await
|
|
.with_context(|| format!("spawn `git diff` in {}", applied_dir.display()))?;
|
|
if !out.status.success() {
|
|
anyhow::bail!(
|
|
"git diff {base_ref}..{target_ref} failed: {}",
|
|
String::from_utf8_lossy(&out.stderr).trim()
|
|
);
|
|
}
|
|
Ok(String::from_utf8_lossy(&out.stdout).into_owned())
|
|
}
|
|
|
|
/// Numeric ids of `<prefix>/<n>` tags in the applied repo (e.g.
|
|
/// `proposal/3` → `3`). Unparseable suffixes are skipped. Used to
|
|
/// resolve the `approved` / `previous` diff bases for an approval.
|
|
async fn tag_ids(applied_dir: &Path, prefix: &str) -> Vec<i64> {
|
|
let Ok(out) = lifecycle::git_command()
|
|
.current_dir(applied_dir)
|
|
.args(["tag", "-l", &format!("{prefix}/*")])
|
|
.output()
|
|
.await
|
|
else {
|
|
return Vec::new();
|
|
};
|
|
if !out.status.success() {
|
|
return Vec::new();
|
|
}
|
|
let strip = format!("{prefix}/");
|
|
String::from_utf8_lossy(&out.stdout)
|
|
.lines()
|
|
.filter_map(|l| l.trim().strip_prefix(&strip))
|
|
.filter_map(|s| s.parse::<i64>().ok())
|
|
.collect()
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct DiffBaseQuery {
|
|
/// `applied` (running tree — default), `approved` (most recent
|
|
/// earlier approved proposal), or `previous` (the prior queued
|
|
/// proposal for this agent).
|
|
base: Option<String>,
|
|
}
|
|
|
|
/// On-demand unified diff for one `ApplyCommit` approval against a
|
|
/// chosen base. `applied` = `applied/main` (what's running);
|
|
/// `approved` = the most recent earlier `approved/<n>` tag (the last
|
|
/// proposal the operator OK'd, even if its build then failed);
|
|
/// `previous` = the prior queued `proposal/<n>` (the incremental
|
|
/// delta when the manager chains proposals). Returns the raw diff
|
|
/// text — the dashboard classifies lines client-side.
|
|
async fn get_approval_diff(
|
|
State(state): State<AppState>,
|
|
AxumPath(id): AxumPath<i64>,
|
|
axum::extract::Query(q): axum::extract::Query<DiffBaseQuery>,
|
|
) -> Response {
|
|
let base = q.base.as_deref().unwrap_or("applied");
|
|
let approval = match state.coord.approvals.get(id) {
|
|
Ok(Some(a)) => a,
|
|
Ok(None) => return error_response(&format!("approval {id} not found")),
|
|
Err(e) => return error_response(&format!("approval {id}: {e:#}")),
|
|
};
|
|
if !matches!(approval.kind, hive_sh4re::ApprovalKind::ApplyCommit) {
|
|
return error_response("spawn approvals carry no commit to diff");
|
|
}
|
|
let applied = Coordinator::agent_applied_dir(&approval.agent);
|
|
if !applied.join(".git").exists() {
|
|
return plain_text(format!("(no applied git repo at {})", applied.display()));
|
|
}
|
|
let target = format!("refs/tags/proposal/{id}");
|
|
let base_ref = match base {
|
|
"applied" => Some("refs/heads/main".to_owned()),
|
|
"approved" => {
|
|
let ids = tag_ids(&applied, "approved").await;
|
|
ids.into_iter()
|
|
.filter(|&n| n != id)
|
|
.max()
|
|
.map(|n| format!("refs/tags/approved/{n}"))
|
|
}
|
|
"previous" => {
|
|
let ids = tag_ids(&applied, "proposal").await;
|
|
ids.into_iter()
|
|
.filter(|&n| n < id)
|
|
.max()
|
|
.map(|n| format!("refs/tags/proposal/{n}"))
|
|
}
|
|
other => return error_response(&format!("unknown diff base {other:?}")),
|
|
};
|
|
let Some(base_ref) = base_ref else {
|
|
return plain_text(match base {
|
|
"approved" => "(no earlier approved proposal to diff against)".to_owned(),
|
|
_ => "(no previous proposal to diff against)".to_owned(),
|
|
});
|
|
};
|
|
match git_diff_refs(&applied, &base_ref, &target).await {
|
|
Ok(s) if s.is_empty() => plain_text("(identical — no changes vs this base)".to_owned()),
|
|
Ok(s) => plain_text(s),
|
|
Err(e) => error_response(&format!("git diff: {e:#}")),
|
|
}
|
|
}
|
|
|
|
fn plain_text(body: String) -> Response {
|
|
(StatusCode::OK, body).into_response()
|
|
}
|
|
|