diff --git a/TODO.md b/TODO.md index d5ccad3..dbfada2 100644 --- a/TODO.md +++ b/TODO.md @@ -34,6 +34,23 @@ Pick anything from here when relevant. Cross-cutting design notes live in `napping 😴` once the `/compact` trigger and `nap` tool exist — both need a harness signal (an explicit `LiveEvent::StateChange` variant or piggyback on Note). +- **Server-side state badge.** Today the badge is computed client-side + from `turn_start`/`turn_end` events. On page reload mid-turn the + history replay re-derives it, but with a `compacting` / `napping` + state coming and a non-trivial state machine it's better to track + authoritative state in the harness and expose it via + `GET /api/state` (`status: "thinking" | "idle" | "compacting" | + "napping"`). JS just renders. Drops the + derive-from-events-and-pray code path. +- **Terminal: inline diffs for Write/Edit.** Today a `Write` / + `Edit` tool-use row just shows the file path. Render the actual + change inline in the terminal: for `Edit`, a small `+`/`-` + per-line diff between `input.old_string` and `input.new_string`; + for `Write`, the first few lines of `input.content` (it's all + "+"). Keep collapsed by default (`
` like the existing + tool_result rollups), expand to full diff on click. Color via + the same `.diff-add` / `.diff-del` classes the dashboard + approval diff already uses. - **Terminal: `/model` slash command.** Operator-typeable model override from the terminal. Depends on the model-override work above; once an override mechanism exists, wire a `/model ` diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index bddee64..e1bff36 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -58,7 +58,6 @@ async fn main() -> Result<()> { let login_state = Arc::new(Mutex::new(initial)); let ui_state = login_state.clone(); let bus = Bus::new(); - spawn_events_vacuum(bus.clone()); let ui_bus = bus.clone(); let ui_socket = cli.socket.clone(); tokio::spawn(async move { @@ -154,23 +153,6 @@ async fn serve( } } -/// Vacuum events older than 7 days, cap to 2000 most-recent rows. -/// Runs immediately, then hourly. -fn spawn_events_vacuum(bus: Bus) { - tokio::spawn(async move { - let interval_secs = 3600u64; - let keep_secs: i64 = 7 * 24 * 3600; - let keep_rows = 2000; - loop { - let n = bus.vacuum(keep_secs, keep_rows); - if n > 0 { - tracing::info!(removed = n, "events vacuum"); - } - tokio::time::sleep(Duration::from_secs(interval_secs)).await; - } - }); -} - /// Per-turn user prompt. The role/tools/etc. is in the system prompt /// (`prompts/agent.md` → `claude --system-prompt-file`); this is just the /// wake signal claude reacts to. `unread` is the count of *other* diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 256c4ab..d3cbd8d 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -61,7 +61,6 @@ async fn main() -> Result<()> { let login_state = Arc::new(Mutex::new(initial)); let ui_state = login_state.clone(); let bus = Bus::new(); - spawn_events_vacuum(bus.clone()); let ui_bus = bus.clone(); let ui_socket = cli.socket.clone(); tokio::spawn(async move { @@ -90,22 +89,6 @@ async fn main() -> Result<()> { } } -/// Vacuum events older than 7 days, cap to 2000 most-recent rows. -fn spawn_events_vacuum(bus: Bus) { - tokio::spawn(async move { - let interval_secs = 3600u64; - let keep_secs: i64 = 7 * 24 * 3600; - let keep_rows = 2000; - loop { - let n = bus.vacuum(keep_secs, keep_rows); - if n > 0 { - tracing::info!(removed = n, "events vacuum"); - } - tokio::time::sleep(Duration::from_secs(interval_secs)).await; - } - }); -} - async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-m1nd serve"); let mcp_config = turn::write_mcp_config(socket).await?; diff --git a/hive-ag3nt/src/events.rs b/hive-ag3nt/src/events.rs index 54297d3..33efeb5 100644 --- a/hive-ag3nt/src/events.rs +++ b/hive-ag3nt/src/events.rs @@ -115,28 +115,6 @@ impl EventStore { Ok(out) } - /// Drop rows older than `older_than_secs` AND any rows beyond - /// `keep_rows` newest. Two-stage so a quiet agent keeps a useful - /// tail and a chatty one is bounded. - fn vacuum(&self, older_than_secs: i64, keep_rows: usize) -> rusqlite::Result { - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .ok() - .and_then(|d| i64::try_from(d.as_secs()).ok()) - .unwrap_or(0); - let cutoff = now - older_than_secs; - let conn = self.conn.lock().unwrap(); - let by_age = conn.execute("DELETE FROM events WHERE ts < ?1", params![cutoff])?; - let keep_i = i64::try_from(keep_rows).unwrap_or(i64::MAX); - let by_count = conn.execute( - "DELETE FROM events - WHERE id NOT IN ( - SELECT id FROM events ORDER BY id DESC LIMIT ?1 - )", - params![keep_i], - )?; - Ok(u64::try_from(by_age + by_count).unwrap_or(0)) - } } #[derive(Clone)] @@ -195,12 +173,6 @@ impl Bus { store.recent(HISTORY_CAPACITY).unwrap_or_default() } - /// Drop events older than `older_than_secs` and keep only the - /// newest `keep_rows`. Called periodically by the harness. - pub fn vacuum(&self, older_than_secs: i64, keep_rows: usize) -> u64 { - let Some(store) = &self.store else { return 0 }; - store.vacuum(older_than_secs, keep_rows).unwrap_or(0) - } } impl Default for Bus { diff --git a/hive-c0re/src/events_vacuum.rs b/hive-c0re/src/events_vacuum.rs new file mode 100644 index 0000000..cef6bd6 --- /dev/null +++ b/hive-c0re/src/events_vacuum.rs @@ -0,0 +1,68 @@ +//! Host-side vacuum of every per-agent events.sqlite. The harness +//! writes to `/state/hyperhive-events.sqlite` (bind-mounted from +//! `/var/lib/hyperhive/agents//state/`); we open the same file +//! from the host every hour and apply the same two-stage delete +//! (drop rows older than `keep_secs`, then trim to `keep_rows` +//! newest). Keeping retention on the host means agents don't need any +//! cleanup wiring of their own, and a misbehaving harness can't +//! disable its own vacuum. + +use std::path::Path; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use rusqlite::{Connection, Result, params}; + +use crate::coordinator::Coordinator; + +const VACUUM_INTERVAL: Duration = Duration::from_secs(3600); +const KEEP_SECS: i64 = 7 * 24 * 3600; +const KEEP_ROWS: i64 = 2000; + +/// Background loop: sweep every existing agent state dir hourly, run +/// the vacuum SQL against its events.sqlite if present. Errors are +/// logged but don't tear the loop down. +pub fn spawn(coord: Arc) { + tokio::spawn(async move { + loop { + sweep_once(); + // touching coord keeps the type wired in case future sweeps + // need approvals/etc.; the ref is otherwise unused today. + let _ = &coord; + tokio::time::sleep(VACUUM_INTERVAL).await; + } + }); +} + +fn sweep_once() { + for name in Coordinator::kept_state_names() { + let path = Coordinator::agent_notes_dir(&name).join("hyperhive-events.sqlite"); + if !path.exists() { + continue; + } + match vacuum_file(&path) { + Ok(0) => {} + Ok(n) => tracing::info!(agent = %name, removed = n, "events vacuum"), + Err(e) => tracing::warn!(agent = %name, error = ?e, "events vacuum failed"), + } + } +} + +fn vacuum_file(path: &Path) -> Result { + let conn = Connection::open(path)?; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + .unwrap_or(0); + let cutoff = now - KEEP_SECS; + let by_age = conn.execute("DELETE FROM events WHERE ts < ?1", params![cutoff])?; + let by_count = conn.execute( + "DELETE FROM events + WHERE id NOT IN ( + SELECT id FROM events ORDER BY id DESC LIMIT ?1 + )", + params![KEEP_ROWS], + )?; + Ok(u64::try_from(by_age + by_count).unwrap_or(0)) +} diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index e3fb11d..cbdb45d 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -14,6 +14,7 @@ mod client; mod coordinator; mod dashboard; mod lifecycle; +mod events_vacuum; mod manager_server; mod operator_questions; mod server; @@ -126,6 +127,9 @@ async fn main() -> Result<()> { tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; } }); + // Per-agent events.sqlite vacuum: host-side so the harness + // doesn't need any retention wiring of its own. + events_vacuum::spawn(coord.clone()); let dash_coord = coord.clone(); tokio::spawn(async move { if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {