//! Live event stream for the per-agent web UI. The harness emits one //! `LiveEvent` per interesting thing that happens during a turn — wake-up //! (the popped inbox message), every line claude prints on stdout //! (parsed from `--output-format stream-json`), and the turn-end summary. //! The web UI subscribes via SSE and renders rows live. //! //! Channel type is `tokio::sync::broadcast`. New subscribers see only //! future events; the dashboard JS deals with the cold-start case by //! showing "connecting…" until the first event arrives. use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use rusqlite::{Connection, params}; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; const CHANNEL_CAPACITY: usize = 256; /// Max `LiveEvent`s the `Bus` returns from `history()` and keeps in /// sqlite. Older rows are vacuumed on a periodic sweep. const HISTORY_CAPACITY: usize = 2000; /// Default sqlite db path. Lives under `/state/` so it survives /// destroy/recreate but goes away on purge. Overridable via the /// `HYPERHIVE_EVENTS_DB` env var (used in tests and one-shot tools). const DEFAULT_EVENTS_DB: &str = "/state/hyperhive-events.sqlite"; const SCHEMA: &str = " CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts INTEGER NOT NULL, kind TEXT NOT NULL, payload_json TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_events_ts ON events (ts); "; /// One row of the agent's live stream. Serialised to JSON for SSE delivery. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum LiveEvent { /// Harness popped a wake-up message and is about to invoke claude. /// `unread` is the count of *other* messages still in the inbox at /// that moment — surfaced as a badge in the live panel header. TurnStart { from: String, body: String, unread: u64, }, /// One line of claude's `--output-format stream-json` stdout, parsed as /// a generic JSON value (so we don't have to track every claude-code /// event variant). The frontend pretty-prints by `type` field. Stream(serde_json::Value), /// Free-form note from the harness (e.g. "claude exited 0", /// "stream-json parse error: ..."). Useful when stream-json itself /// fails so the UI doesn't just go silent. Note(String), /// Turn finished. `ok=false` means claude exited non-zero or the /// harness hit a transport error. TurnEnd { ok: bool, note: Option }, } /// sqlite-backed event log. Wraps a `Connection` behind a `Mutex` so the /// `Bus` (which clones cheaply) shares one writer. struct EventStore { conn: Mutex, } impl EventStore { fn open(path: &Path) -> rusqlite::Result { if let Some(parent) = path.parent() { let _ = std::fs::create_dir_all(parent); } let conn = Connection::open(path)?; conn.execute_batch(SCHEMA)?; Ok(Self { conn: Mutex::new(conn), }) } fn append(&self, event: &LiveEvent) -> rusqlite::Result<()> { let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0); let kind = match event { LiveEvent::TurnStart { .. } => "turn_start", LiveEvent::Stream(_) => "stream", LiveEvent::Note(_) => "note", LiveEvent::TurnEnd { .. } => "turn_end", }; let payload = serde_json::to_string(event).unwrap_or_else(|_| "null".into()); let conn = self.conn.lock().unwrap(); conn.execute( "INSERT INTO events (ts, kind, payload_json) VALUES (?1, ?2, ?3)", params![ts, kind, payload], )?; Ok(()) } fn recent(&self, limit: usize) -> rusqlite::Result> { let limit_i = i64::try_from(limit).unwrap_or(i64::MAX); let conn = self.conn.lock().unwrap(); let mut stmt = conn.prepare( "SELECT payload_json FROM events ORDER BY id DESC LIMIT ?1", )?; let rows = stmt.query_map(params![limit_i], |row| { let s: String = row.get(0)?; Ok(serde_json::from_str::(&s).ok()) })?; let mut out: Vec = rows.flatten().flatten().collect(); out.reverse(); Ok(out) } } #[derive(Clone)] pub struct Bus { tx: Arc>, /// Persistent event log. `None` only if opening the sqlite db failed /// at construction — we keep going so the harness doesn't die on a /// missing `/state/` mount in dev / test scenarios. store: Option>, } impl Bus { /// Open the default events db (`/state/hyperhive-events.sqlite`, or /// `HYPERHIVE_EVENTS_DB`). On failure, fall back to a no-store bus — /// the harness still works, just without persistent history. #[must_use] pub fn new() -> Self { let path = std::env::var_os("HYPERHIVE_EVENTS_DB") .map_or_else(|| PathBuf::from(DEFAULT_EVENTS_DB), PathBuf::from); let store = match EventStore::open(&path) { Ok(s) => Some(Arc::new(s)), Err(e) => { tracing::warn!(error = ?e, path = %path.display(), "events db open failed; running without history"); None } }; let (tx, _) = broadcast::channel(CHANNEL_CAPACITY); Self { tx: Arc::new(tx), store, } } pub fn emit(&self, event: LiveEvent) { if let Some(store) = &self.store && let Err(e) = store.append(&event) { tracing::warn!(error = ?e, "events: append failed"); } // Lagged subscribers drop events — fine; the UI is a tail, not a log. let _ = self.tx.send(event); } pub fn subscribe(&self) -> broadcast::Receiver { self.tx.subscribe() } /// Most recent events, oldest first, capped at `HISTORY_CAPACITY`. /// Drives the terminal pre-fill when the operator opens the agent /// page; without a store (db open failed) this is empty. #[must_use] pub fn history(&self) -> Vec { let Some(store) = &self.store else { return Vec::new(); }; store.recent(HISTORY_CAPACITY).unwrap_or_default() } } impl Default for Bus { fn default() -> Self { Self::new() } }