//! Live event stream for the per-agent web UI. The harness emits one //! `LiveEvent` per interesting thing that happens during a turn — wake-up //! (the popped inbox message), every line claude prints on stdout //! (parsed from `--output-format stream-json`), and the turn-end summary. //! The web UI subscribes via SSE and renders rows live. //! //! Channel type is `tokio::sync::broadcast`. New subscribers see only //! future events; the dashboard JS deals with the cold-start case by //! showing "connecting…" until the first event arrives. use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use rusqlite::{Connection, params}; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; const CHANNEL_CAPACITY: usize = 256; /// Max `LiveEvent`s the `Bus` returns from `history()` and keeps in /// sqlite. Older rows are vacuumed on a periodic sweep. const HISTORY_CAPACITY: usize = 2000; /// Path to the persisted event db. Overridable via `HYPERHIVE_EVENTS_DB` /// for dev / tests; otherwise derived from the agent's state dir. fn events_db_path() -> PathBuf { std::env::var_os("HYPERHIVE_EVENTS_DB").map_or_else( || crate::paths::state_dir().join("hyperhive-events.sqlite"), PathBuf::from, ) } /// Path to the persisted model file. Overridable via `HYPERHIVE_MODEL_FILE` /// for dev / tests; otherwise derived from the agent's state dir. fn model_file_path() -> PathBuf { std::env::var_os("HYPERHIVE_MODEL_FILE").map_or_else( || crate::paths::state_dir().join("hyperhive-model"), PathBuf::from, ) } fn load_model() -> Option { let s = std::fs::read_to_string(model_file_path()).ok()?; let name = s.trim(); if name.is_empty() { None } else { Some(name.to_owned()) } } fn persist_model(name: &str) -> std::io::Result<()> { let path = model_file_path(); if let Some(parent) = path.parent() { let _ = std::fs::create_dir_all(parent); } std::fs::write(path, format!("{name}\n")) } fn now_unix() -> i64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0) } const SCHEMA: &str = " CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts INTEGER NOT NULL, kind TEXT NOT NULL, payload_json TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_events_ts ON events (ts); "; /// Envelope carried over the broadcast channel: the `LiveEvent` itself /// plus a monotonic per-process seq stamped by `Bus::emit`. SSE consumers /// serialize this directly (seq becomes a sibling of the `kind` tag); /// clients use seq to dedupe their buffered live traffic against the /// snapshot/history responses (drop anything with `seq <= snapshot.seq`). #[derive(Debug, Clone, Serialize)] pub struct BusEvent { pub seq: u64, #[serde(flatten)] pub event: LiveEvent, } /// One row of the agent's live stream. Serialised to JSON for SSE delivery. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum LiveEvent { /// Harness popped a wake-up message and is about to invoke claude. /// `unread` is the count of *other* messages still in the inbox at /// that moment — surfaced as a badge in the live panel header. TurnStart { from: String, body: String, unread: u64, }, /// One line of claude's `--output-format stream-json` stdout, parsed as /// a generic JSON value (so we don't have to track every claude-code /// event variant). The frontend pretty-prints by `type` field. Stream(serde_json::Value), /// Free-form note from the harness (e.g. "claude exited 0", /// "stream-json parse error: ..."). Useful when stream-json itself /// fails so the UI doesn't just go silent. /// /// Must be a struct variant (not `Note(String)`): internally-tagged /// enums can't flatten a tag onto a primitive newtype, and serde /// fails serialization at runtime — silently, because the SSE /// handler's `filter_map(... .ok()? ...)` swallows the error. From /// 2025-08 through 2026-05 every `Note` emission was a no-op + the /// sqlite history persisted them as the literal string `"null"`. /// The web UI's `note` renderer already reads `ev.text`, so the /// wire shape matches without a JS change. Note { text: String }, /// Turn finished. `ok=false` means claude exited non-zero or the /// harness hit a transport error. TurnEnd { ok: bool, note: Option }, /// Harness reachability flipped: `"online"` / /// `"needs_login_idle"` / `"needs_login_in_progress"`. The web UI /// drives the alive badge from this so the operator sees a login /// land (or get revoked) without polling. Session detail /// (`url`/`output`/`finished`) is still served by `/api/state` /// during the short-lived in-progress window — the client /// re-fetches only while that flow is active. StatusChanged { status: String }, /// `/api/model` switched the active claude model. The web UI /// updates the chip + the per-turn stats sink will key off this /// to mark the boundary in its log. ModelChanged { model: String }, /// Final-turn `usage` block landed (input + output + cache /// counters). Powers the context-window badge + accumulates into /// the per-turn stats sink. TokenUsageChanged { usage: TokenUsage }, /// Harness's `TurnState` transitioned (idle / thinking / /// compacting). `since_unix` matches `Bus::state_snapshot().1` /// so the client's elapsed-time ticker keeps progressing across /// SSE reconnects without drift. TurnStateChanged { state: TurnState, since_unix: i64, }, } /// sqlite-backed event log. Wraps a `Connection` behind a `Mutex` so the /// `Bus` (which clones cheaply) shares one writer. struct EventStore { conn: Mutex, } impl EventStore { fn open(path: &Path) -> rusqlite::Result { if let Some(parent) = path.parent() { let _ = std::fs::create_dir_all(parent); } let conn = Connection::open(path)?; conn.execute_batch(SCHEMA)?; Ok(Self { conn: Mutex::new(conn), }) } fn append(&self, event: &LiveEvent) -> rusqlite::Result<()> { let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0); let kind = match event { LiveEvent::TurnStart { .. } => "turn_start", LiveEvent::Stream(_) => "stream", LiveEvent::Note { .. } => "note", LiveEvent::TurnEnd { .. } => "turn_end", LiveEvent::StatusChanged { .. } => "status_changed", LiveEvent::ModelChanged { .. } => "model_changed", LiveEvent::TokenUsageChanged { .. } => "token_usage_changed", LiveEvent::TurnStateChanged { .. } => "turn_state_changed", }; let payload = serde_json::to_string(event).unwrap_or_else(|_| "null".into()); let conn = self.conn.lock().unwrap(); conn.execute( "INSERT INTO events (ts, kind, payload_json) VALUES (?1, ?2, ?3)", params![ts, kind, payload], )?; Ok(()) } fn recent(&self, limit: usize) -> rusqlite::Result> { let limit_i = i64::try_from(limit).unwrap_or(i64::MAX); let conn = self.conn.lock().unwrap(); let mut stmt = conn.prepare( "SELECT payload_json FROM events ORDER BY id DESC LIMIT ?1", )?; let rows = stmt.query_map(params![limit_i], |row| { let s: String = row.get(0)?; Ok(serde_json::from_str::(&s).ok()) })?; let mut out: Vec = rows.flatten().flatten().collect(); out.reverse(); Ok(out) } } /// Token usage emitted by claude in the final `result` stream-json event. /// All counts are in tokens. `None` fields mean the server didn't report them. #[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)] pub struct TokenUsage { pub input_tokens: u64, pub output_tokens: u64, pub cache_read_input_tokens: u64, pub cache_creation_input_tokens: u64, } impl TokenUsage { /// Total context consumed this turn (input + cache reads + cache writes). pub fn context_tokens(&self) -> u64 { self.input_tokens + self.cache_read_input_tokens + self.cache_creation_input_tokens } /// Parse usage from a stream-json event. Returns `Some` only for the /// terminal `result` event (which is the only one that carries `usage`); /// every other event maps to `None`. Missing numeric fields default to 0 /// so partial server payloads don't drop the whole snapshot. pub fn from_stream_event(v: &serde_json::Value) -> Option { if v.get("type").and_then(|t| t.as_str()) != Some("result") { return None; } let u = v.get("usage")?; let field = |k: &str| u.get(k).and_then(serde_json::Value::as_u64).unwrap_or(0); Some(Self { input_tokens: field("input_tokens"), output_tokens: field("output_tokens"), cache_read_input_tokens: field("cache_read_input_tokens"), cache_creation_input_tokens: field("cache_creation_input_tokens"), }) } } /// Authoritative turn-loop state. The harness owns it; the web UI /// reads via `/api/state` and renders. Lives alongside the bus /// because everyone who has a `Bus` already has the right handle to /// poke the state on transitions. #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum TurnState { /// Inbox is empty / waiting on `Recv`. Idle, /// `claude --print` is running for a turn. Thinking, /// Operator-triggered `/compact` is running on the persistent /// session. Compacting, } /// Default claude model when nothing's been set at runtime. The /// operator can switch via `/model ` in the web terminal; the /// chosen model lives in `Bus::model` for the rest of the harness /// process's life (resets on restart, by design — operator overrides /// shouldn't survive accidentally). pub const DEFAULT_MODEL: &str = "haiku"; #[derive(Clone)] pub struct Bus { tx: Arc>, /// Monotonic per-process counter stamped onto every `BusEvent`. /// Persisted nowhere — a harness restart resets seq to 0; clients /// always treat reconnect as "fresh state, fresh stream of seqs." /// Historical events served from sqlite carry no seq (they predate /// the live channel the seq is meant to dedupe against). event_seq: Arc, /// Persistent event log. `None` only if opening the sqlite db failed /// at construction — we keep going so the harness doesn't die on a /// missing state dir mount in dev / test scenarios. store: Option>, /// Current turn-loop state + since-when (unix seconds). state: Arc>, /// Model name passed to `claude --model`. Default `haiku`; the /// operator can override at runtime via `POST /api/model`. model: Arc>, /// Last token usage reported by claude (from the `result` stream-json /// event). `None` until the first turn with usage data completes. /// Updated on every turn; survives across turns within one harness /// process lifetime (resets on container restart, which is fine — /// it's a live indicator, not a cumulative counter). last_usage: Arc>>, /// One-shot: next `run_claude` call drops `--continue`, starting /// a fresh claude session. Set by `POST /api/new-session` from /// the per-agent web UI; consumed (cleared back to false) by the /// next turn. Subsequent turns resume normal `--continue` /// behavior. Atomic so the consumer can take-and-clear without a /// lock. skip_continue_once: Arc, /// Per-turn tool-call counter. Reset by the bin loop between /// turns via `take_tool_calls`. Populated by `observe_stream` as /// the stdout pump parses each stream-json line. Powers the /// `tool_call_count` + `tool_call_breakdown_json` columns on the /// per-turn stats sink. tool_calls: Arc>>, } impl Bus { /// Open the events db (path from `events_db_path()`). On failure, fall back /// to a no-store bus — the harness still works, just without persistent history. #[must_use] pub fn new() -> Self { let path = events_db_path(); let store = match EventStore::open(&path) { Ok(s) => Some(Arc::new(s)), Err(e) => { tracing::warn!(error = ?e, path = %path.display(), "events db open failed; running without history"); None } }; let (tx, _) = broadcast::channel(CHANNEL_CAPACITY); let initial_model = load_model().unwrap_or_else(|| DEFAULT_MODEL.to_owned()); Self { tx: Arc::new(tx), event_seq: Arc::new(AtomicU64::new(0)), store, state: Arc::new(Mutex::new((TurnState::Idle, now_unix()))), model: Arc::new(Mutex::new(initial_model)), last_usage: Arc::new(Mutex::new(None)), skip_continue_once: Arc::new(AtomicBool::new(false)), tool_calls: Arc::new(Mutex::new(std::collections::HashMap::new())), } } /// Current high-water seq. Snapshot endpoints read this before /// gathering state so the resulting (snapshot.seq, snapshot) pair /// satisfies: any live event with seq > snapshot.seq is post-snapshot /// (not yet reflected). Clients dedupe buffered SSE traffic against /// this value. #[must_use] pub fn current_seq(&self) -> u64 { self.event_seq.load(Ordering::SeqCst) } fn next_seq(&self) -> u64 { self.event_seq.fetch_add(1, Ordering::SeqCst) + 1 } /// Arm the one-shot: the next claude invocation will run without /// `--continue`, dropping any prior session context. Idempotent /// — calling twice in a row before the next turn still consumes /// to a single fresh-start. pub fn request_new_session(&self) { self.skip_continue_once.store(true, Ordering::SeqCst); } /// Take + clear the one-shot. Returns true iff the caller should /// run claude without `--continue` for this turn. pub fn take_skip_continue(&self) -> bool { self.skip_continue_once.swap(false, Ordering::SeqCst) } /// Currently-selected claude model name. Read on every turn so a /// `/model ` flip takes effect on the next turn. #[must_use] pub fn model(&self) -> String { self.model.lock().unwrap().clone() } /// Switch the model for future turns. The current turn (if any) /// keeps the model it was already running. Persisted to the agent's /// state dir (`hyperhive-model`) so the override survives harness /// restart and container rebuild (gone on `--purge`, matching /// every other piece of agent state). pub fn set_model(&self, name: impl Into) { let value: String = name.into(); self.model.lock().unwrap().clone_from(&value); if let Err(e) = persist_model(&value) { tracing::warn!(error = ?e, "model: persist failed"); } self.emit(LiveEvent::ModelChanged { model: value }); } /// Record the latest token usage from a completed turn. pub fn record_usage(&self, usage: TokenUsage) { *self.last_usage.lock().unwrap() = Some(usage); self.emit(LiveEvent::TokenUsageChanged { usage }); } /// Walk a stream-json value for `tool_use` blocks and bump the /// per-turn counter for each one we find. Called by the stdout /// pump on every parsed line. Cheap when the line isn't an /// assistant message — the field-check short-circuits. pub fn observe_stream(&self, v: &serde_json::Value) { if v.get("type").and_then(|t| t.as_str()) != Some("assistant") { return; } let Some(content) = v .get("message") .and_then(|m| m.get("content")) .and_then(|c| c.as_array()) else { return; }; let mut counts = self.tool_calls.lock().unwrap(); for block in content { if block.get("type").and_then(|t| t.as_str()) != Some("tool_use") { continue; } let name = block .get("name") .and_then(|n| n.as_str()) .unwrap_or("") .to_owned(); *counts.entry(name).or_insert(0) += 1; } } /// Snapshot + clear the per-turn tool-call counter. The harness /// calls this between turns to fold the breakdown into a /// `turn_stats` row, then start the next turn with an empty map. #[must_use] pub fn take_tool_calls(&self) -> std::collections::HashMap { std::mem::take(&mut *self.tool_calls.lock().unwrap()) } /// Last known token usage, or `None` if no turn has completed yet. #[must_use] pub fn last_usage(&self) -> Option { *self.last_usage.lock().unwrap() } /// Update the harness's authoritative turn-loop state. Records /// the transition time so `state_snapshot` can return a since-age. pub fn set_state(&self, next: TurnState) { let since; { let mut guard = self.state.lock().unwrap(); if guard.0 == next { return; } *guard = (next, now_unix()); since = guard.1; } self.emit(LiveEvent::TurnStateChanged { state: next, since_unix: since, }); } /// Broadcast a status flip (online / needs_login_*). Called by /// the bin entry points + `turn::wait_for_login` + the /// `post_login_*` handlers — every site that mutates the /// `Arc>` should also call this so the web UI /// drops its periodic /api/state poll while a turn loop is /// running. pub fn emit_status(&self, status: impl Into) { self.emit(LiveEvent::StatusChanged { status: status.into(), }); } /// Current state + since-when (unix seconds). Snapshot copy, no lock held. #[must_use] pub fn state_snapshot(&self) -> (TurnState, i64) { *self.state.lock().unwrap() } pub fn emit(&self, event: LiveEvent) { if let Some(store) = &self.store && let Err(e) = store.append(&event) { tracing::warn!(error = ?e, "events: append failed"); } let envelope = BusEvent { seq: self.next_seq(), event, }; // Lagged subscribers drop events — fine; the UI is a tail, not a log. let _ = self.tx.send(envelope); } pub fn subscribe(&self) -> broadcast::Receiver { self.tx.subscribe() } /// Most recent events, oldest first, capped at `HISTORY_CAPACITY`. /// Drives the terminal pre-fill when the operator opens the agent /// page; without a store (db open failed) this is empty. #[must_use] pub fn history(&self) -> Vec { let Some(store) = &self.store else { return Vec::new(); }; store.recent(HISTORY_CAPACITY).unwrap_or_default() } } impl Default for Bus { fn default() -> Self { Self::new() } }