//! Live event stream for the per-agent web UI. The harness emits one //! `LiveEvent` per interesting thing that happens during a turn — wake-up //! (the popped inbox message), every line claude prints on stdout //! (parsed from `--output-format stream-json`), and the turn-end summary. //! The web UI subscribes via SSE and renders rows live. //! //! Channel type is `tokio::sync::broadcast`. New subscribers see only //! future events; the dashboard JS deals with the cold-start case by //! showing "connecting…" until the first event arrives. use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use rusqlite::{Connection, params}; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; const CHANNEL_CAPACITY: usize = 256; /// Max `LiveEvent`s the `Bus` returns from `history()` and keeps in /// sqlite. Older rows are vacuumed on a periodic sweep. const HISTORY_CAPACITY: usize = 2000; /// Default sqlite db path. Lives under `/state/` so it survives /// destroy/recreate but goes away on purge. Overridable via the /// `HYPERHIVE_EVENTS_DB` env var (used in tests and one-shot tools). const DEFAULT_EVENTS_DB: &str = "/state/hyperhive-events.sqlite"; /// Persisted model name file. Same lifecycle as the events db — /// survives destroy/recreate, gone on purge. Empty / missing file /// falls back to `DEFAULT_MODEL`. const DEFAULT_MODEL_FILE: &str = "/state/hyperhive-model"; /// Path to the persisted model file. Overridable via /// `HYPERHIVE_MODEL_FILE` for dev / tests. fn model_file_path() -> PathBuf { std::env::var_os("HYPERHIVE_MODEL_FILE") .map_or_else(|| PathBuf::from(DEFAULT_MODEL_FILE), PathBuf::from) } fn load_model() -> Option { let s = std::fs::read_to_string(model_file_path()).ok()?; let name = s.trim(); if name.is_empty() { None } else { Some(name.to_owned()) } } fn persist_model(name: &str) -> std::io::Result<()> { let path = model_file_path(); if let Some(parent) = path.parent() { let _ = std::fs::create_dir_all(parent); } std::fs::write(path, format!("{name}\n")) } fn now_unix() -> i64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0) } const SCHEMA: &str = " CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts INTEGER NOT NULL, kind TEXT NOT NULL, payload_json TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_events_ts ON events (ts); "; /// One row of the agent's live stream. Serialised to JSON for SSE delivery. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum LiveEvent { /// Harness popped a wake-up message and is about to invoke claude. /// `unread` is the count of *other* messages still in the inbox at /// that moment — surfaced as a badge in the live panel header. TurnStart { from: String, body: String, unread: u64, }, /// One line of claude's `--output-format stream-json` stdout, parsed as /// a generic JSON value (so we don't have to track every claude-code /// event variant). The frontend pretty-prints by `type` field. Stream(serde_json::Value), /// Free-form note from the harness (e.g. "claude exited 0", /// "stream-json parse error: ..."). Useful when stream-json itself /// fails so the UI doesn't just go silent. Note(String), /// Turn finished. `ok=false` means claude exited non-zero or the /// harness hit a transport error. TurnEnd { ok: bool, note: Option }, } /// sqlite-backed event log. Wraps a `Connection` behind a `Mutex` so the /// `Bus` (which clones cheaply) shares one writer. struct EventStore { conn: Mutex, } impl EventStore { fn open(path: &Path) -> rusqlite::Result { if let Some(parent) = path.parent() { let _ = std::fs::create_dir_all(parent); } let conn = Connection::open(path)?; conn.execute_batch(SCHEMA)?; Ok(Self { conn: Mutex::new(conn), }) } fn append(&self, event: &LiveEvent) -> rusqlite::Result<()> { let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0); let kind = match event { LiveEvent::TurnStart { .. } => "turn_start", LiveEvent::Stream(_) => "stream", LiveEvent::Note(_) => "note", LiveEvent::TurnEnd { .. } => "turn_end", }; let payload = serde_json::to_string(event).unwrap_or_else(|_| "null".into()); let conn = self.conn.lock().unwrap(); conn.execute( "INSERT INTO events (ts, kind, payload_json) VALUES (?1, ?2, ?3)", params![ts, kind, payload], )?; Ok(()) } fn recent(&self, limit: usize) -> rusqlite::Result> { let limit_i = i64::try_from(limit).unwrap_or(i64::MAX); let conn = self.conn.lock().unwrap(); let mut stmt = conn.prepare( "SELECT payload_json FROM events ORDER BY id DESC LIMIT ?1", )?; let rows = stmt.query_map(params![limit_i], |row| { let s: String = row.get(0)?; Ok(serde_json::from_str::(&s).ok()) })?; let mut out: Vec = rows.flatten().flatten().collect(); out.reverse(); Ok(out) } } /// Authoritative turn-loop state. The harness owns it; the web UI /// reads via `/api/state` and renders. Lives alongside the bus /// because everyone who has a `Bus` already has the right handle to /// poke the state on transitions. #[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum TurnState { /// Inbox is empty / waiting on `Recv`. Idle, /// `claude --print` is running for a turn. Thinking, /// Operator-triggered `/compact` is running on the persistent /// session. Compacting, } /// Default claude model when nothing's been set at runtime. The /// operator can switch via `/model ` in the web terminal; the /// chosen model lives in `Bus::model` for the rest of the harness /// process's life (resets on restart, by design — operator overrides /// shouldn't survive accidentally). pub const DEFAULT_MODEL: &str = "haiku"; #[derive(Clone)] pub struct Bus { tx: Arc>, /// Persistent event log. `None` only if opening the sqlite db failed /// at construction — we keep going so the harness doesn't die on a /// missing `/state/` mount in dev / test scenarios. store: Option>, /// Current turn-loop state + since-when (unix seconds). state: Arc>, /// Model name passed to `claude --model`. Default `haiku`; the /// operator can override at runtime via `POST /api/model`. model: Arc>, } impl Bus { /// Open the default events db (`/state/hyperhive-events.sqlite`, or /// `HYPERHIVE_EVENTS_DB`). On failure, fall back to a no-store bus — /// the harness still works, just without persistent history. #[must_use] pub fn new() -> Self { let path = std::env::var_os("HYPERHIVE_EVENTS_DB") .map_or_else(|| PathBuf::from(DEFAULT_EVENTS_DB), PathBuf::from); let store = match EventStore::open(&path) { Ok(s) => Some(Arc::new(s)), Err(e) => { tracing::warn!(error = ?e, path = %path.display(), "events db open failed; running without history"); None } }; let (tx, _) = broadcast::channel(CHANNEL_CAPACITY); let initial_model = load_model().unwrap_or_else(|| DEFAULT_MODEL.to_owned()); Self { tx: Arc::new(tx), store, state: Arc::new(Mutex::new((TurnState::Idle, now_unix()))), model: Arc::new(Mutex::new(initial_model)), } } /// Currently-selected claude model name. Read on every turn so a /// `/model ` flip takes effect on the next turn. #[must_use] pub fn model(&self) -> String { self.model.lock().unwrap().clone() } /// Switch the model for future turns. The current turn (if any) /// keeps the model it was already running. Persisted to /// `/state/hyperhive-model` so the override survives harness /// restart and container rebuild (gone on `--purge`, matching /// every other piece of agent state). pub fn set_model(&self, name: impl Into) { let value: String = name.into(); self.model.lock().unwrap().clone_from(&value); if let Err(e) = persist_model(&value) { tracing::warn!(error = ?e, "model: persist failed"); } } /// Update the harness's authoritative turn-loop state. Records /// the transition time so `state_snapshot` can return a since-age. pub fn set_state(&self, next: TurnState) { let mut guard = self.state.lock().unwrap(); if guard.0 == next { return; } *guard = (next, now_unix()); } /// Current state + since-when (unix seconds). Snapshot copy, no lock held. #[must_use] pub fn state_snapshot(&self) -> (TurnState, i64) { *self.state.lock().unwrap() } pub fn emit(&self, event: LiveEvent) { if let Some(store) = &self.store && let Err(e) = store.append(&event) { tracing::warn!(error = ?e, "events: append failed"); } // Lagged subscribers drop events — fine; the UI is a tail, not a log. let _ = self.tx.send(event); } pub fn subscribe(&self) -> broadcast::Receiver { self.tx.subscribe() } /// Most recent events, oldest first, capped at `HISTORY_CAPACITY`. /// Drives the terminal pre-fill when the operator opens the agent /// page; without a store (db open failed) this is empty. #[must_use] pub fn history(&self) -> Vec { let Some(store) = &self.store else { return Vec::new(); }; store.recent(HISTORY_CAPACITY).unwrap_or_default() } } impl Default for Bus { fn default() -> Self { Self::new() } }