diff --git a/Cargo.lock b/Cargo.lock index 6ec8564..eba3981 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -453,6 +453,7 @@ dependencies = [ "clap", "hive-sh4re", "rmcp", + "rusqlite", "schemars", "serde", "serde_json", diff --git a/TODO.md b/TODO.md index 2775bed..c4bb48d 100644 --- a/TODO.md +++ b/TODO.md @@ -29,14 +29,6 @@ Pick anything from here when relevant. Cross-cutting design notes live in - **Per-agent UI substance.** Show last N inbox messages, last turn timing, link back to dashboard. -- **Delivered events history persistence.** The `events::Bus` ring - buffer (500 events, in-memory) backfills the terminal on page load - but dies on harness restart, and only ever holds the most recent - turn or two. Persist to sqlite (`events(agent, id, ts, kind, - payload_json)`) so the operator can scroll back through prior - turns, and so `/events/history` survives restart. Cap rows per - agent or auto-vacuum on age, same trade-off as the bounded broker - entry below. - **State badge: compacting + napping states.** Idle/thinking already ship (driven from SSE turn_start/turn_end). Add `compacting 📦` and `napping 😴` once the `/compact` trigger and `nap` tool exist — diff --git a/hive-ag3nt/Cargo.toml b/hive-ag3nt/Cargo.toml index 19586b8..f1a253e 100644 --- a/hive-ag3nt/Cargo.toml +++ b/hive-ag3nt/Cargo.toml @@ -12,6 +12,7 @@ axum.workspace = true clap.workspace = true hive-sh4re.workspace = true rmcp.workspace = true +rusqlite.workspace = true schemars.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index bc5af54..f291cd5 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -58,6 +58,7 @@ async fn main() -> Result<()> { let login_state = Arc::new(Mutex::new(initial)); let ui_state = login_state.clone(); let bus = Bus::new(); + spawn_events_vacuum(bus.clone()); let ui_bus = bus.clone(); let ui_socket = cli.socket.clone(); tokio::spawn(async move { @@ -153,6 +154,23 @@ async fn serve( } } +/// Vacuum events older than 7 days, cap to 2000 most-recent rows. +/// Runs immediately, then hourly. +fn spawn_events_vacuum(bus: Bus) { + tokio::spawn(async move { + let interval_secs = 3600u64; + let keep_secs: i64 = 7 * 24 * 3600; + let keep_rows = 2000; + loop { + let n = bus.vacuum(keep_secs, keep_rows); + if n > 0 { + tracing::info!(removed = n, "events vacuum"); + } + tokio::time::sleep(Duration::from_secs(interval_secs)).await; + } + }); +} + /// Per-turn user prompt. The role/tools/etc. is in the system prompt /// (`prompts/agent.md` → `claude --system-prompt-file`); this is just the /// wake signal claude reacts to. `unread` is the count of *other* diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index d3cbd8d..256c4ab 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -61,6 +61,7 @@ async fn main() -> Result<()> { let login_state = Arc::new(Mutex::new(initial)); let ui_state = login_state.clone(); let bus = Bus::new(); + spawn_events_vacuum(bus.clone()); let ui_bus = bus.clone(); let ui_socket = cli.socket.clone(); tokio::spawn(async move { @@ -89,6 +90,22 @@ async fn main() -> Result<()> { } } +/// Vacuum events older than 7 days, cap to 2000 most-recent rows. +fn spawn_events_vacuum(bus: Bus) { + tokio::spawn(async move { + let interval_secs = 3600u64; + let keep_secs: i64 = 7 * 24 * 3600; + let keep_rows = 2000; + loop { + let n = bus.vacuum(keep_secs, keep_rows); + if n > 0 { + tracing::info!(removed = n, "events vacuum"); + } + tokio::time::sleep(Duration::from_secs(interval_secs)).await; + } + }); +} + async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-m1nd serve"); let mcp_config = turn::write_mcp_config(socket).await?; diff --git a/hive-ag3nt/src/events.rs b/hive-ag3nt/src/events.rs index fb95223..54297d3 100644 --- a/hive-ag3nt/src/events.rs +++ b/hive-ag3nt/src/events.rs @@ -8,17 +8,31 @@ //! future events; the dashboard JS deals with the cold-start case by //! showing "connecting…" until the first event arrives. -use std::collections::VecDeque; +use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; +use rusqlite::{Connection, params}; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; const CHANNEL_CAPACITY: usize = 256; -/// Max `LiveEvent`s the `Bus` keeps in its ring buffer. The web UI fetches -/// this on page load to backfill the terminal so the operator sees the -/// last turn(s) without having to wait for the next one. -const HISTORY_CAPACITY: usize = 500; +/// Max `LiveEvent`s the `Bus` returns from `history()` and keeps in +/// sqlite. Older rows are vacuumed on a periodic sweep. +const HISTORY_CAPACITY: usize = 2000; +/// Default sqlite db path. Lives under `/state/` so it survives +/// destroy/recreate but goes away on purge. Overridable via the +/// `HYPERHIVE_EVENTS_DB` env var (used in tests and one-shot tools). +const DEFAULT_EVENTS_DB: &str = "/state/hyperhive-events.sqlite"; + +const SCHEMA: &str = " +CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts INTEGER NOT NULL, + kind TEXT NOT NULL, + payload_json TEXT NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_events_ts ON events (ts); +"; /// One row of the agent's live stream. Serialised to JSON for SSE delivery. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -45,29 +59,122 @@ pub enum LiveEvent { TurnEnd { ok: bool, note: Option }, } +/// sqlite-backed event log. Wraps a `Connection` behind a `Mutex` so the +/// `Bus` (which clones cheaply) shares one writer. +struct EventStore { + conn: Mutex, +} + +impl EventStore { + fn open(path: &Path) -> rusqlite::Result { + if let Some(parent) = path.parent() { + let _ = std::fs::create_dir_all(parent); + } + let conn = Connection::open(path)?; + conn.execute_batch(SCHEMA)?; + Ok(Self { + conn: Mutex::new(conn), + }) + } + + fn append(&self, event: &LiveEvent) -> rusqlite::Result<()> { + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + .unwrap_or(0); + let kind = match event { + LiveEvent::TurnStart { .. } => "turn_start", + LiveEvent::Stream(_) => "stream", + LiveEvent::Note(_) => "note", + LiveEvent::TurnEnd { .. } => "turn_end", + }; + let payload = serde_json::to_string(event).unwrap_or_else(|_| "null".into()); + let conn = self.conn.lock().unwrap(); + conn.execute( + "INSERT INTO events (ts, kind, payload_json) VALUES (?1, ?2, ?3)", + params![ts, kind, payload], + )?; + Ok(()) + } + + fn recent(&self, limit: usize) -> rusqlite::Result> { + let limit_i = i64::try_from(limit).unwrap_or(i64::MAX); + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT payload_json FROM events + ORDER BY id DESC + LIMIT ?1", + )?; + let rows = stmt.query_map(params![limit_i], |row| { + let s: String = row.get(0)?; + Ok(serde_json::from_str::(&s).ok()) + })?; + let mut out: Vec = rows.flatten().flatten().collect(); + out.reverse(); + Ok(out) + } + + /// Drop rows older than `older_than_secs` AND any rows beyond + /// `keep_rows` newest. Two-stage so a quiet agent keeps a useful + /// tail and a chatty one is bounded. + fn vacuum(&self, older_than_secs: i64, keep_rows: usize) -> rusqlite::Result { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + .unwrap_or(0); + let cutoff = now - older_than_secs; + let conn = self.conn.lock().unwrap(); + let by_age = conn.execute("DELETE FROM events WHERE ts < ?1", params![cutoff])?; + let keep_i = i64::try_from(keep_rows).unwrap_or(i64::MAX); + let by_count = conn.execute( + "DELETE FROM events + WHERE id NOT IN ( + SELECT id FROM events ORDER BY id DESC LIMIT ?1 + )", + params![keep_i], + )?; + Ok(u64::try_from(by_age + by_count).unwrap_or(0)) + } +} + #[derive(Clone)] pub struct Bus { tx: Arc>, - history: Arc>>, + /// Persistent event log. `None` only if opening the sqlite db failed + /// at construction — we keep going so the harness doesn't die on a + /// missing `/state/` mount in dev / test scenarios. + store: Option>, } impl Bus { + /// Open the default events db (`/state/hyperhive-events.sqlite`, or + /// `HYPERHIVE_EVENTS_DB`). On failure, fall back to a no-store bus — + /// the harness still works, just without persistent history. #[must_use] pub fn new() -> Self { + let path = std::env::var_os("HYPERHIVE_EVENTS_DB") + .map_or_else(|| PathBuf::from(DEFAULT_EVENTS_DB), PathBuf::from); + let store = match EventStore::open(&path) { + Ok(s) => Some(Arc::new(s)), + Err(e) => { + tracing::warn!(error = ?e, path = %path.display(), "events db open failed; running without history"); + None + } + }; let (tx, _) = broadcast::channel(CHANNEL_CAPACITY); Self { tx: Arc::new(tx), - history: Arc::new(Mutex::new(VecDeque::with_capacity(HISTORY_CAPACITY))), + store, } } pub fn emit(&self, event: LiveEvent) { + if let Some(store) = &self.store + && let Err(e) = store.append(&event) { - let mut h = self.history.lock().unwrap(); - if h.len() == HISTORY_CAPACITY { - h.pop_front(); - } - h.push_back(event.clone()); + tracing::warn!(error = ?e, "events: append failed"); } // Lagged subscribers drop events — fine; the UI is a tail, not a log. let _ = self.tx.send(event); @@ -77,11 +184,22 @@ impl Bus { self.tx.subscribe() } - /// Snapshot of the in-memory event ring buffer, oldest first. Drives the - /// terminal pre-fill when the operator opens the agent page. + /// Most recent events, oldest first, capped at `HISTORY_CAPACITY`. + /// Drives the terminal pre-fill when the operator opens the agent + /// page; without a store (db open failed) this is empty. #[must_use] pub fn history(&self) -> Vec { - self.history.lock().unwrap().iter().cloned().collect() + let Some(store) = &self.store else { + return Vec::new(); + }; + store.recent(HISTORY_CAPACITY).unwrap_or_default() + } + + /// Drop events older than `older_than_secs` and keep only the + /// newest `keep_rows`. Called periodically by the harness. + pub fn vacuum(&self, older_than_secs: i64, keep_rows: usize) -> u64 { + let Some(store) = &self.store else { return 0 }; + store.vacuum(older_than_secs, keep_rows).unwrap_or(0) } }