events: persist to sqlite, survive harness restart

hive_ag3nt::events::Bus replaces its in-memory VecDeque with a sqlite-
backed store at /state/hyperhive-events.sqlite (overridable via
HYPERHIVE_EVENTS_DB). emit() inserts a row; history() reads back the
most recent 2000 events. survives harness restart now — operator reload
mid-investigation no longer wipes the trail.

vacuum runs hourly (immediate first sweep): drop rows older than 7
days, then trim to 2000 newest. two-stage so a quiet agent keeps a
useful tail and a chatty one stays bounded. wired into both
hive-ag3nt and hive-m1nd via spawn_events_vacuum.

if the db open fails (e.g. no /state mount in dev), Bus runs in
no-store mode — events still broadcast, just nothing persisted.
This commit is contained in:
müde 2026-05-15 19:42:57 +02:00
parent 6d52f67292
commit de09503b59
6 changed files with 170 additions and 23 deletions

1
Cargo.lock generated
View file

@ -453,6 +453,7 @@ dependencies = [
"clap",
"hive-sh4re",
"rmcp",
"rusqlite",
"schemars",
"serde",
"serde_json",

View file

@ -29,14 +29,6 @@ Pick anything from here when relevant. Cross-cutting design notes live in
- **Per-agent UI substance.** Show last N inbox messages, last turn timing,
link back to dashboard.
- **Delivered events history persistence.** The `events::Bus` ring
buffer (500 events, in-memory) backfills the terminal on page load
but dies on harness restart, and only ever holds the most recent
turn or two. Persist to sqlite (`events(agent, id, ts, kind,
payload_json)`) so the operator can scroll back through prior
turns, and so `/events/history` survives restart. Cap rows per
agent or auto-vacuum on age, same trade-off as the bounded broker
entry below.
- **State badge: compacting + napping states.** Idle/thinking already
ship (driven from SSE turn_start/turn_end). Add `compacting 📦` and
`napping 😴` once the `/compact` trigger and `nap` tool exist —

View file

@ -12,6 +12,7 @@ axum.workspace = true
clap.workspace = true
hive-sh4re.workspace = true
rmcp.workspace = true
rusqlite.workspace = true
schemars.workspace = true
serde.workspace = true
serde_json.workspace = true

View file

@ -58,6 +58,7 @@ async fn main() -> Result<()> {
let login_state = Arc::new(Mutex::new(initial));
let ui_state = login_state.clone();
let bus = Bus::new();
spawn_events_vacuum(bus.clone());
let ui_bus = bus.clone();
let ui_socket = cli.socket.clone();
tokio::spawn(async move {
@ -153,6 +154,23 @@ async fn serve(
}
}
/// Vacuum events older than 7 days, cap to 2000 most-recent rows.
/// Runs immediately, then hourly.
fn spawn_events_vacuum(bus: Bus) {
tokio::spawn(async move {
let interval_secs = 3600u64;
let keep_secs: i64 = 7 * 24 * 3600;
let keep_rows = 2000;
loop {
let n = bus.vacuum(keep_secs, keep_rows);
if n > 0 {
tracing::info!(removed = n, "events vacuum");
}
tokio::time::sleep(Duration::from_secs(interval_secs)).await;
}
});
}
/// Per-turn user prompt. The role/tools/etc. is in the system prompt
/// (`prompts/agent.md` → `claude --system-prompt-file`); this is just the
/// wake signal claude reacts to. `unread` is the count of *other*

View file

@ -61,6 +61,7 @@ async fn main() -> Result<()> {
let login_state = Arc::new(Mutex::new(initial));
let ui_state = login_state.clone();
let bus = Bus::new();
spawn_events_vacuum(bus.clone());
let ui_bus = bus.clone();
let ui_socket = cli.socket.clone();
tokio::spawn(async move {
@ -89,6 +90,22 @@ async fn main() -> Result<()> {
}
}
/// Vacuum events older than 7 days, cap to 2000 most-recent rows.
fn spawn_events_vacuum(bus: Bus) {
tokio::spawn(async move {
let interval_secs = 3600u64;
let keep_secs: i64 = 7 * 24 * 3600;
let keep_rows = 2000;
loop {
let n = bus.vacuum(keep_secs, keep_rows);
if n > 0 {
tracing::info!(removed = n, "events vacuum");
}
tokio::time::sleep(Duration::from_secs(interval_secs)).await;
}
});
}
async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> {
tracing::info!(socket = %socket.display(), "hive-m1nd serve");
let mcp_config = turn::write_mcp_config(socket).await?;

View file

@ -8,17 +8,31 @@
//! future events; the dashboard JS deals with the cold-start case by
//! showing "connecting…" until the first event arrives.
use std::collections::VecDeque;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use rusqlite::{Connection, params};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
const CHANNEL_CAPACITY: usize = 256;
/// Max `LiveEvent`s the `Bus` keeps in its ring buffer. The web UI fetches
/// this on page load to backfill the terminal so the operator sees the
/// last turn(s) without having to wait for the next one.
const HISTORY_CAPACITY: usize = 500;
/// Max `LiveEvent`s the `Bus` returns from `history()` and keeps in
/// sqlite. Older rows are vacuumed on a periodic sweep.
const HISTORY_CAPACITY: usize = 2000;
/// Default sqlite db path. Lives under `/state/` so it survives
/// destroy/recreate but goes away on purge. Overridable via the
/// `HYPERHIVE_EVENTS_DB` env var (used in tests and one-shot tools).
const DEFAULT_EVENTS_DB: &str = "/state/hyperhive-events.sqlite";
const SCHEMA: &str = "
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
kind TEXT NOT NULL,
payload_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_events_ts ON events (ts);
";
/// One row of the agent's live stream. Serialised to JSON for SSE delivery.
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -45,29 +59,122 @@ pub enum LiveEvent {
TurnEnd { ok: bool, note: Option<String> },
}
/// sqlite-backed event log. Wraps a `Connection` behind a `Mutex` so the
/// `Bus` (which clones cheaply) shares one writer.
struct EventStore {
conn: Mutex<Connection>,
}
impl EventStore {
fn open(path: &Path) -> rusqlite::Result<Self> {
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let conn = Connection::open(path)?;
conn.execute_batch(SCHEMA)?;
Ok(Self {
conn: Mutex::new(conn),
})
}
fn append(&self, event: &LiveEvent) -> rusqlite::Result<()> {
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0);
let kind = match event {
LiveEvent::TurnStart { .. } => "turn_start",
LiveEvent::Stream(_) => "stream",
LiveEvent::Note(_) => "note",
LiveEvent::TurnEnd { .. } => "turn_end",
};
let payload = serde_json::to_string(event).unwrap_or_else(|_| "null".into());
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO events (ts, kind, payload_json) VALUES (?1, ?2, ?3)",
params![ts, kind, payload],
)?;
Ok(())
}
fn recent(&self, limit: usize) -> rusqlite::Result<Vec<LiveEvent>> {
let limit_i = i64::try_from(limit).unwrap_or(i64::MAX);
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT payload_json FROM events
ORDER BY id DESC
LIMIT ?1",
)?;
let rows = stmt.query_map(params![limit_i], |row| {
let s: String = row.get(0)?;
Ok(serde_json::from_str::<LiveEvent>(&s).ok())
})?;
let mut out: Vec<LiveEvent> = rows.flatten().flatten().collect();
out.reverse();
Ok(out)
}
/// Drop rows older than `older_than_secs` AND any rows beyond
/// `keep_rows` newest. Two-stage so a quiet agent keeps a useful
/// tail and a chatty one is bounded.
fn vacuum(&self, older_than_secs: i64, keep_rows: usize) -> rusqlite::Result<u64> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0);
let cutoff = now - older_than_secs;
let conn = self.conn.lock().unwrap();
let by_age = conn.execute("DELETE FROM events WHERE ts < ?1", params![cutoff])?;
let keep_i = i64::try_from(keep_rows).unwrap_or(i64::MAX);
let by_count = conn.execute(
"DELETE FROM events
WHERE id NOT IN (
SELECT id FROM events ORDER BY id DESC LIMIT ?1
)",
params![keep_i],
)?;
Ok(u64::try_from(by_age + by_count).unwrap_or(0))
}
}
#[derive(Clone)]
pub struct Bus {
tx: Arc<broadcast::Sender<LiveEvent>>,
history: Arc<Mutex<VecDeque<LiveEvent>>>,
/// Persistent event log. `None` only if opening the sqlite db failed
/// at construction — we keep going so the harness doesn't die on a
/// missing `/state/` mount in dev / test scenarios.
store: Option<Arc<EventStore>>,
}
impl Bus {
/// Open the default events db (`/state/hyperhive-events.sqlite`, or
/// `HYPERHIVE_EVENTS_DB`). On failure, fall back to a no-store bus —
/// the harness still works, just without persistent history.
#[must_use]
pub fn new() -> Self {
let path = std::env::var_os("HYPERHIVE_EVENTS_DB")
.map_or_else(|| PathBuf::from(DEFAULT_EVENTS_DB), PathBuf::from);
let store = match EventStore::open(&path) {
Ok(s) => Some(Arc::new(s)),
Err(e) => {
tracing::warn!(error = ?e, path = %path.display(), "events db open failed; running without history");
None
}
};
let (tx, _) = broadcast::channel(CHANNEL_CAPACITY);
Self {
tx: Arc::new(tx),
history: Arc::new(Mutex::new(VecDeque::with_capacity(HISTORY_CAPACITY))),
store,
}
}
pub fn emit(&self, event: LiveEvent) {
if let Some(store) = &self.store
&& let Err(e) = store.append(&event)
{
let mut h = self.history.lock().unwrap();
if h.len() == HISTORY_CAPACITY {
h.pop_front();
}
h.push_back(event.clone());
tracing::warn!(error = ?e, "events: append failed");
}
// Lagged subscribers drop events — fine; the UI is a tail, not a log.
let _ = self.tx.send(event);
@ -77,11 +184,22 @@ impl Bus {
self.tx.subscribe()
}
/// Snapshot of the in-memory event ring buffer, oldest first. Drives the
/// terminal pre-fill when the operator opens the agent page.
/// Most recent events, oldest first, capped at `HISTORY_CAPACITY`.
/// Drives the terminal pre-fill when the operator opens the agent
/// page; without a store (db open failed) this is empty.
#[must_use]
pub fn history(&self) -> Vec<LiveEvent> {
self.history.lock().unwrap().iter().cloned().collect()
let Some(store) = &self.store else {
return Vec::new();
};
store.recent(HISTORY_CAPACITY).unwrap_or_default()
}
/// Drop events older than `older_than_secs` and keep only the
/// newest `keep_rows`. Called periodically by the harness.
pub fn vacuum(&self, older_than_secs: i64, keep_rows: usize) -> u64 {
let Some(store) = &self.store else { return 0 };
store.vacuum(older_than_secs, keep_rows).unwrap_or(0)
}
}