hyperhive/hive-ag3nt/src/web_ui.rs
müde d3f90f4cc0 stats: per-agent /stats page with chart.js trends + breakdowns
new hive-ag3nt::stats module reads turn_stats.sqlite read-only and
aggregates over 24h/7d/30d windows (hourly/daily buckets) — turn
rate, p50/p95/avg duration, ctx tokens (avg/max), cost token
components, top tools, wake mix, result mix. served by the agent
itself so per-MCP extensions can register more providers without
the host knowing their schemas.

/stats route + /api/stats?window=... on the per-agent web ui.
chart.js v4.4.4 pulled from jsdelivr (SRI hash deferred). nav
links: 📊 chip on the dashboard container row + 📊 stats → on
the per-agent header.

todo housekeeping: softened damocles-area note at the top,
new reverse-proxy + deferred reminder-rollup items, removed
the two telemetry-ui items absorbed by this page.
2026-05-19 00:27:01 +02:00

628 lines
24 KiB
Rust

//! Per-container HTTP UI. SPA shape: `GET /` returns a static shell;
//! `GET /static/*` serves CSS + JS; `GET /api/state` returns the page
//! state as JSON; the JS app renders. Live events stream on
//! `/events/stream`. Action POSTs (`/send`, `/login/*`) return either a
//! 303 Redirect (for browsers that submit the form normally) or just
//! 200 OK — the JS app re-fetches `/api/state` afterwards.
use std::convert::Infallible;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use anyhow::{Context, Result};
use axum::{
Form, Router,
extract::State,
http::StatusCode,
response::{
IntoResponse, Response,
sse::{Event, KeepAlive, Sse},
},
routing::{get, post},
};
use serde::{Deserialize, Serialize};
use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream};
use crate::client;
use crate::events::Bus;
use crate::login::LoginState;
use crate::login_session::{LoginSession, drop_if_finished};
use crate::mcp;
use crate::turn::TurnFiles;
/// Live login state for the web UI. The harness updates this in place as it
/// transitions between `NeedsLogin` and `Online`; the UI reads on each
/// render.
pub type LoginStateCell = Arc<Mutex<LoginState>>;
/// Shared turn lock. The serve loop acquires this (as an async mutex) for the
/// duration of every `drive_turn` call. The `/api/compact` handler tries
/// `try_lock()` and rejects immediately if a turn is in flight, preventing
/// concurrent access to the claude session.
pub type TurnLock = Arc<tokio::sync::Mutex<()>>;
#[derive(Clone)]
struct AppState {
label: String,
login: LoginStateCell,
session: Arc<Mutex<Option<Arc<LoginSession>>>>,
bus: Bus,
socket: PathBuf,
/// Same `TurnFiles` the harness's turn loop uses. Shared so
/// `/api/compact` re-uses the exact MCP config / system prompt /
/// settings claude saw on the last regular turn — keeps the
/// session shape identical across compact + normal turns.
files: TurnFiles,
/// Prevents `/api/compact` from racing with an in-flight normal turn.
turn_lock: TurnLock,
}
impl AppState {
fn flavor(&self) -> Flavor {
self.files.flavor
}
}
/// Which wire protocol the per-agent UI's `/send` handler should speak.
/// Sub-agent → `AgentRequest::OperatorMsg`; manager →
/// `ManagerRequest::OperatorMsg`. Reuses the MCP-side enum so a
/// single value drives both the send protocol and (in
/// `post_compact`) the allowed-tools surface claude sees.
pub type Flavor = mcp::Flavor;
pub async fn serve(
label: String,
port: u16,
login: LoginStateCell,
bus: Bus,
socket: PathBuf,
files: TurnFiles,
turn_lock: TurnLock,
) -> Result<()> {
let state = AppState {
label,
login,
session: Arc::new(Mutex::new(None)),
bus,
socket,
files,
turn_lock,
};
let app = Router::new()
.route("/", get(serve_index))
.route("/static/agent.css", get(serve_css))
.route("/static/app.js", get(serve_app_js))
.route("/static/hive-fr0nt.js", get(serve_shared_js))
.route("/static/marked.js", get(serve_marked_js))
.route("/api/state", get(api_state))
.route("/events/stream", get(events_stream))
.route("/events/history", get(events_history))
.route("/send", post(post_send))
.route("/login/start", post(post_login_start))
.route("/login/code", post(post_login_code))
.route("/login/cancel", post(post_login_cancel))
.route("/api/cancel", post(post_cancel_turn))
.route("/api/compact", post(post_compact))
.route("/api/model", post(post_set_model))
.route("/api/new-session", post(post_new_session))
.route("/api/loose-ends", get(api_loose_ends))
.route("/stats", get(serve_stats))
.route("/static/stats.js", get(serve_stats_js))
.route("/api/stats", get(api_stats))
.with_state(state);
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = bind_with_retry(addr, "web UI").await?;
tracing::info!(%port, "web UI listening");
axum::serve(listener, app).await?;
Ok(())
}
// ---------------------------------------------------------------------------
// Static assets + state snapshot
// ---------------------------------------------------------------------------
/// Bind a TCP listener with `SO_REUSEADDR` set, retrying on
/// `AddrInUse` for up to ~20s. nspawn restarts can race the previous
/// harness's socket release; `SO_REUSEADDR` lets us reclaim a port
/// still in `TIME_WAIT` from a clean previous exit, and the retry
/// covers the case where the previous process is genuinely still
/// alive (systemd restart-delay overlap).
async fn bind_with_retry(addr: SocketAddr, label: &str) -> Result<tokio::net::TcpListener> {
let mut delay_ms = 250u64;
let mut attempts = 0u32;
loop {
match try_bind(addr) {
Ok(l) => return Ok(l),
Err(e) if e.kind() == std::io::ErrorKind::AddrInUse && attempts < 12 => {
tracing::warn!(
%addr, attempt = attempts + 1,
"{label}: AddrInUse, retrying in {delay_ms}ms"
);
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
attempts += 1;
delay_ms = (delay_ms * 2).min(2000);
}
Err(e) => {
return Err(e).with_context(|| format!("bind {label} on {addr}"));
}
}
}
}
fn try_bind(addr: SocketAddr) -> std::io::Result<tokio::net::TcpListener> {
let sock = match addr {
SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
};
sock.set_reuseaddr(true)?;
sock.bind(addr)?;
sock.listen(1024)
}
async fn serve_index() -> impl IntoResponse {
(
[("content-type", "text/html; charset=utf-8")],
include_str!("../assets/index.html"),
)
}
async fn serve_css() -> impl IntoResponse {
// Prepend the shared palette/typography so per-page styles only need
// to declare what's actually page-specific. One HTTP request, no
// per-asset cache to invalidate.
let body = format!(
"{}\n{}\n{}",
hive_fr0nt::BASE_CSS,
hive_fr0nt::TERMINAL_CSS,
include_str!("../assets/agent.css"),
);
([("content-type", "text/css")], body)
}
async fn serve_app_js() -> impl IntoResponse {
(
[("content-type", "application/javascript")],
include_str!("../assets/app.js"),
)
}
async fn serve_shared_js() -> impl IntoResponse {
(
[("content-type", "application/javascript")],
hive_fr0nt::TERMINAL_JS,
)
}
async fn serve_marked_js() -> impl IntoResponse {
(
[("content-type", "application/javascript")],
hive_fr0nt::MARKED_JS,
)
}
async fn serve_stats() -> impl IntoResponse {
(
[("content-type", "text/html; charset=utf-8")],
include_str!("../assets/stats.html"),
)
}
async fn serve_stats_js() -> impl IntoResponse {
(
[("content-type", "application/javascript")],
include_str!("../assets/stats.js"),
)
}
#[derive(Deserialize)]
struct StatsQuery {
window: Option<String>,
}
async fn api_stats(
State(_state): State<AppState>,
axum::extract::Query(q): axum::extract::Query<StatsQuery>,
) -> axum::Json<crate::stats::Snapshot> {
let window = crate::stats::Window::parse(q.window.as_deref().unwrap_or("24h"));
axum::Json(crate::stats::snapshot_default(window))
}
#[derive(Serialize)]
struct StateSnapshot {
/// Bus seq at the moment this snapshot was assembled. Clients dedupe
/// their buffered SSE traffic against this value: events with
/// `seq <= snapshot.seq` are already reflected (or pre-date the
/// snapshot); `seq > snapshot.seq` is post-snapshot. Reset to 0 on
/// harness restart — clients treat reconnect as a fresh world.
seq: u64,
label: String,
dashboard_port: u16,
/// `"online"` | `"needs_login_idle"` | `"needs_login_in_progress"`.
status: &'static str,
/// Present when `status == "needs_login_in_progress"`.
session: Option<SessionView>,
/// Last N messages addressed to this agent, newest-first. Pulled
/// from the broker via the per-agent socket on each render.
/// Empty on transport failure.
inbox: Vec<hive_sh4re::InboxRow>,
/// Authoritative turn-loop state from the harness and the unix
/// timestamp the state was entered. The JS computes the age
/// client-side off this rather than tracking it from SSE events.
turn_state: crate::events::TurnState,
turn_state_since: i64,
/// Currently-active claude model name. Reflected on the page so
/// the operator can see what they just switched to (and what's
/// in flight). Mutable at runtime via `POST /api/model`.
model: String,
/// Last-inference token usage from the most recent completed
/// turn — represents the current context-window size at turn-end.
/// `null` until the first turn finishes.
ctx_usage: Option<crate::events::TokenUsage>,
/// Cumulative token usage across the most recent turn's inferences
/// (cost signal). `null` until the first turn finishes.
cost_usage: Option<crate::events::TokenUsage>,
}
#[derive(Serialize)]
struct SessionView {
/// First `https://…` claude emitted on stdout, if any.
url: Option<String>,
/// Accumulated stdout + stderr.
output: String,
finished: bool,
exit_note: Option<String>,
}
/// Proxy this agent's loose-ends list via the per-agent socket. The
/// web UI surfaces the result as a collapsible section in the page
/// so the operator can see at a glance what's pending against the
/// agent (questions asked by it, peer questions targeting it,
/// reminders it scheduled, approvals for the manager). Same data
/// the `mcp__hyperhive__get_loose_ends` tool sees from inside the
/// container.
async fn api_loose_ends(State(state): State<AppState>) -> Response {
let loose_ends: Vec<hive_sh4re::LooseEnd> = match state.flavor() {
Flavor::Agent => {
match client::request::<_, hive_sh4re::AgentResponse>(
&state.socket,
&hive_sh4re::AgentRequest::GetLooseEnds,
)
.await
{
Ok(hive_sh4re::AgentResponse::LooseEnds { loose_ends }) => loose_ends,
Ok(hive_sh4re::AgentResponse::Err { message }) => {
return error_response(&format!("get_loose_ends: {message}"));
}
Ok(other) => return error_response(&format!("unexpected response: {other:?}")),
Err(e) => return error_response(&format!("transport: {e:#}")),
}
}
Flavor::Manager => {
match client::request::<_, hive_sh4re::ManagerResponse>(
&state.socket,
&hive_sh4re::ManagerRequest::GetLooseEnds,
)
.await
{
Ok(hive_sh4re::ManagerResponse::LooseEnds { loose_ends }) => loose_ends,
Ok(hive_sh4re::ManagerResponse::Err { message }) => {
return error_response(&format!("get_loose_ends: {message}"));
}
Ok(other) => return error_response(&format!("unexpected response: {other:?}")),
Err(e) => return error_response(&format!("transport: {e:#}")),
}
}
};
axum::Json(serde_json::json!({ "loose_ends": loose_ends })).into_response()
}
async fn api_state(State(state): State<AppState>) -> axum::Json<StateSnapshot> {
// Capture seq *before* any reads so the dedupe contract is
// "events with seq > snapshot.seq are post-snapshot, never missed."
let seq = state.bus.current_seq();
drop_if_finished(&state.session);
let login = *state.login.lock().unwrap();
let session_snapshot = state.session.lock().unwrap().clone();
let (status, session_view) = match (login, session_snapshot) {
(LoginState::Online, _) => ("online", None),
(LoginState::NeedsLogin, None) => ("needs_login_idle", None),
(LoginState::NeedsLogin, Some(s)) => (
"needs_login_in_progress",
Some(SessionView {
url: s.url(),
output: s.output(),
finished: s.finished(),
exit_note: s.exit_note(),
}),
),
};
let dashboard_port = std::env::var("HIVE_DASHBOARD_PORT")
.ok()
.and_then(|s| s.parse::<u16>().ok())
.unwrap_or(7000);
let inbox = recent_inbox(&state.socket, state.flavor()).await;
let (turn_state, turn_state_since) = state.bus.state_snapshot();
let model = state.bus.model();
let ctx_usage = state.bus.last_ctx_usage();
let cost_usage = state.bus.last_cost_usage();
axum::Json(StateSnapshot {
seq,
label: state.label.clone(),
dashboard_port,
status,
session: session_view,
inbox,
turn_state,
turn_state_since,
model,
ctx_usage,
cost_usage,
})
}
/// Best-effort: pull the last 30 messages addressed to us via the
/// per-agent / manager socket. Empty list on any transport / decode
/// failure — the inbox section is decorative, not authoritative.
async fn recent_inbox(socket: &std::path::Path, flavor: Flavor) -> Vec<hive_sh4re::InboxRow> {
const LIMIT: u64 = 30;
match flavor {
Flavor::Agent => {
match client::request::<_, hive_sh4re::AgentResponse>(
socket,
&hive_sh4re::AgentRequest::Recent { limit: LIMIT },
)
.await
{
Ok(hive_sh4re::AgentResponse::Recent { rows }) => rows,
_ => Vec::new(),
}
}
Flavor::Manager => {
match client::request::<_, hive_sh4re::ManagerResponse>(
socket,
&hive_sh4re::ManagerRequest::Recent { limit: LIMIT },
)
.await
{
Ok(hive_sh4re::ManagerResponse::Recent { rows }) => rows,
_ => Vec::new(),
}
}
}
}
// ---------------------------------------------------------------------------
// Action handlers
// ---------------------------------------------------------------------------
#[derive(Deserialize)]
struct SendForm {
body: String,
}
async fn post_send(State(state): State<AppState>, Form(form): Form<SendForm>) -> Response {
let body = form.body.trim().to_owned();
if body.is_empty() {
return error_response("send: `body` required");
}
let result = match state.flavor() {
Flavor::Agent => match client::request::<_, hive_sh4re::AgentResponse>(
&state.socket,
&hive_sh4re::AgentRequest::OperatorMsg { body },
)
.await
{
Ok(hive_sh4re::AgentResponse::Ok) => Ok(()),
Ok(hive_sh4re::AgentResponse::Err { message }) => Err(message),
Ok(other) => Err(format!("unexpected response: {other:?}")),
Err(e) => Err(format!("transport: {e:#}")),
},
Flavor::Manager => match client::request::<_, hive_sh4re::ManagerResponse>(
&state.socket,
&hive_sh4re::ManagerRequest::OperatorMsg { body },
)
.await
{
Ok(hive_sh4re::ManagerResponse::Ok) => Ok(()),
Ok(hive_sh4re::ManagerResponse::Err { message }) => Err(message),
Ok(other) => Err(format!("unexpected response: {other:?}")),
Err(e) => Err(format!("transport: {e:#}")),
},
};
match result {
// 200 instead of 303 → the client doesn't refetch /api/state.
// The operator message becomes a broker `Sent` (already shown
// server-side in the dashboard); on the agent side, the
// resulting `TurnStart` SSE event drives the terminal + the
// inbox row gets consumed by the time `TurnEnd` fires the
// existing turn-end refresh.
Ok(()) => (axum::http::StatusCode::OK, "ok").into_response(),
Err(e) => error_response(&format!("send failed: {e}")),
}
}
async fn events_history(State(state): State<AppState>) -> axum::Json<serde_json::Value> {
// Capture seq *before* the read so dedupe is "drop buffered events
// you've already seen in history", never "lose an event that fired
// between the read and the timestamp." Historical rows have no
// per-row seq; only the high-water mark matters for the dedupe
// window.
let seq = state.bus.current_seq();
let events = state.bus.history();
axum::Json(serde_json::json!({ "seq": seq, "events": events }))
}
async fn events_stream(
State(state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
tracing::info!("sse: client subscribed");
let rx = state.bus.subscribe();
// Drop a "hello" note into the bus so every new subscriber sees at
// least one event immediately and can clear the connecting placeholder.
state.bus.emit(crate::events::LiveEvent::Note {
text: "live stream attached".into(),
});
let stream = BroadcastStream::new(rx).filter_map(|res| {
let ev = res.ok()?;
let json = serde_json::to_string(&ev).ok()?;
Some(Ok(Event::default().data(json)))
});
Sse::new(stream).keep_alive(KeepAlive::default())
}
async fn post_login_start(State(state): State<AppState>) -> Response {
drop_if_finished(&state.session);
{
let guard = state.session.lock().unwrap();
if guard.is_some() {
return (axum::http::StatusCode::OK, "ok").into_response();
}
}
match LoginSession::start() {
Ok(session) => {
*state.session.lock().unwrap() = Some(Arc::new(session));
// Flip status from needs_login_idle → needs_login_in_progress
// so the web UI's badge + polling kick in (polling is still
// the right tool for the streaming session output during
// the login flow itself; events drop the poll for
// *everything else*).
state.bus.emit_status("needs_login_in_progress");
(axum::http::StatusCode::OK, "ok").into_response()
}
Err(e) => error_response(&format!("login start failed: {e:#}")),
}
}
#[derive(Deserialize)]
struct CodeForm {
code: String,
}
async fn post_login_code(State(state): State<AppState>, Form(form): Form<CodeForm>) -> Response {
let session = state.session.lock().unwrap().clone();
let Some(session) = session else {
return error_response("no login session running");
};
if let Err(e) = session.submit_code(&form.code).await {
return error_response(&format!("submit code failed: {e:#}"));
}
(axum::http::StatusCode::OK, "ok").into_response()
}
async fn post_login_cancel(State(state): State<AppState>) -> Response {
let session = state.session.lock().unwrap().take();
if let Some(session) = session {
session.close_stdin().await;
session.kill();
}
// Back to needs_login_idle (LoginState unchanged, session gone).
state.bus.emit_status("needs_login_idle");
(axum::http::StatusCode::OK, "ok").into_response()
}
/// Operator-initiated session compaction. Spawns `turn::compact_session`
/// in the background — the HTTP handler returns immediately so the
/// async-form spinner can clear. Output (claude's compaction stream,
/// the "/compact done" note) lands in the live event panel like any
/// other turn. If a regular turn is in flight, claude's own session
/// lock will reject this one and we surface the error as a Note.
#[derive(Deserialize)]
struct ModelForm {
model: String,
}
/// Switch the model for future turns. The current turn (if any)
/// keeps its model; `/model <name>` applies starting with the next
/// `recv` cycle. Empty / whitespace-only inputs are rejected. No
/// claude-side validation — we just hand the string through to
/// `claude --model <name>`; an unknown model surfaces as a turn
/// failure in the live panel and the operator can revert.
async fn post_set_model(State(state): State<AppState>, Form(form): Form<ModelForm>) -> Response {
let name = form.model.trim();
if name.is_empty() {
return error_response("model: name required");
}
state.bus.set_model(name);
state.bus.emit(crate::events::LiveEvent::Note {
text: format!("operator: /model — claude model set to '{name}' for future turns"),
});
tracing::info!(%name, "operator set model");
(axum::http::StatusCode::OK, "ok").into_response()
}
async fn post_compact(State(state): State<AppState>) -> Response {
// Clone the Arc before locking so the guard's lifetime is tied to the
// clone (which we can move into the spawn) rather than to `state`.
let lock = state.turn_lock.clone();
// Reject immediately if a normal turn is in flight — concurrent access
// to the claude session is unsafe and produces garbled output.
let Ok(guard) = lock.try_lock_owned() else {
return error_response("turn in flight — wait for it to finish before compacting");
};
let bus = state.bus.clone();
let files = state.files.clone();
tokio::spawn(async move {
let _guard = guard; // keep lock alive for the duration of compaction
bus.emit(crate::events::LiveEvent::Note {
text: "operator: /compact — running on persistent session".into(),
});
bus.set_state(crate::events::TurnState::Compacting);
let r = crate::turn::compact_session(&files, &bus).await;
bus.set_state(crate::events::TurnState::Idle);
if let Err(e) = r {
bus.emit(crate::events::LiveEvent::Note {
text: format!("/compact failed: {e:#}"),
});
}
});
(axum::http::StatusCode::OK, "ok").into_response()
}
/// Cancel the in-flight claude turn. Coarse-grained: shells out
/// `pkill -INT claude` since there's at most one claude per container.
/// SIGINT (not SIGTERM) so claude flushes anything in-flight and emits a
/// final result row. Emits a Note so the operator sees the cancel
/// landed; the actual state transition back to `idle` happens when
/// `run_claude` wakes up and the harness emits `TurnEnd`.
/// Arm a one-shot: the next claude turn drops `--continue`, starting a
/// fresh session. Subsequent turns resume normal `--continue`
/// behavior. Idempotent before the next turn fires — calling twice
/// still results in a single fresh start. Useful when the
/// session-resume context is poisoned (claude went off the rails,
/// hit an unrecoverable refusal, etc.) and a full reset is cheaper
/// than asking claude to forget mid-stream.
async fn post_new_session(State(state): State<AppState>) -> Response {
state.bus.request_new_session();
state.bus.emit(crate::events::LiveEvent::Note {
text: "operator: new session armed — next turn runs without --continue".into(),
});
(axum::http::StatusCode::OK, "ok").into_response()
}
async fn post_cancel_turn(State(state): State<AppState>) -> Response {
let out = tokio::process::Command::new("pkill")
.args(["-INT", "claude"])
.output()
.await;
let note = match out {
Ok(o) if o.status.success() => "operator: /cancel — sent SIGINT to claude".to_owned(),
Ok(o) if o.status.code() == Some(1) => {
"operator: /cancel — no claude process to interrupt".to_owned()
}
Ok(o) => format!(
"operator: /cancel — pkill exited {} stderr={}",
o.status,
String::from_utf8_lossy(&o.stderr).trim()
),
Err(e) => format!("operator: /cancel — pkill failed: {e}"),
};
state.bus.emit(crate::events::LiveEvent::Note { text: note });
(axum::http::StatusCode::OK, "ok").into_response()
}
fn error_response(message: &str) -> Response {
// Plain text — JS app surfaces in `alert()`, HTML wrapping would just
// be noise.
(StatusCode::INTERNAL_SERVER_ERROR, message.to_owned()).into_response()
}