events.sqlite vacuum moves host-side
retention is a host concern — agents have no business doing their own cleanup, and a misbehaving harness could skip it. drop spawn_events_vacuum from both hive-ag3nt and hive-m1nd, drop the matching Bus::vacuum + EventStore::vacuum methods. new hive_c0re::events_vacuum module sweeps every existing agents/<name>/state/hyperhive-events.sqlite on the same hourly cadence as the broker vacuum. same two-stage delete (older than 7 days, trim to 2000 newest). called from main alongside broker vacuum. also: server-side state badge entered into todo.md (today's badge is derived client-side from sse, fine for idle/thinking but a state machine that grows compacting/napping wants authoritative status from the harness).
This commit is contained in:
parent
897e7c07ae
commit
89ccc5e6c5
6 changed files with 89 additions and 63 deletions
17
TODO.md
17
TODO.md
|
|
@ -34,6 +34,23 @@ Pick anything from here when relevant. Cross-cutting design notes live in
|
||||||
`napping 😴` once the `/compact` trigger and `nap` tool exist —
|
`napping 😴` once the `/compact` trigger and `nap` tool exist —
|
||||||
both need a harness signal (an explicit `LiveEvent::StateChange`
|
both need a harness signal (an explicit `LiveEvent::StateChange`
|
||||||
variant or piggyback on Note).
|
variant or piggyback on Note).
|
||||||
|
- **Server-side state badge.** Today the badge is computed client-side
|
||||||
|
from `turn_start`/`turn_end` events. On page reload mid-turn the
|
||||||
|
history replay re-derives it, but with a `compacting` / `napping`
|
||||||
|
state coming and a non-trivial state machine it's better to track
|
||||||
|
authoritative state in the harness and expose it via
|
||||||
|
`GET /api/state` (`status: "thinking" | "idle" | "compacting" |
|
||||||
|
"napping"`). JS just renders. Drops the
|
||||||
|
derive-from-events-and-pray code path.
|
||||||
|
- **Terminal: inline diffs for Write/Edit.** Today a `Write` /
|
||||||
|
`Edit` tool-use row just shows the file path. Render the actual
|
||||||
|
change inline in the terminal: for `Edit`, a small `+`/`-`
|
||||||
|
per-line diff between `input.old_string` and `input.new_string`;
|
||||||
|
for `Write`, the first few lines of `input.content` (it's all
|
||||||
|
"+"). Keep collapsed by default (`<details>` like the existing
|
||||||
|
tool_result rollups), expand to full diff on click. Color via
|
||||||
|
the same `.diff-add` / `.diff-del` classes the dashboard
|
||||||
|
approval diff already uses.
|
||||||
- **Terminal: `/model` slash command.** Operator-typeable model
|
- **Terminal: `/model` slash command.** Operator-typeable model
|
||||||
override from the terminal. Depends on the model-override work
|
override from the terminal. Depends on the model-override work
|
||||||
above; once an override mechanism exists, wire a `/model <name>`
|
above; once an override mechanism exists, wire a `/model <name>`
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,6 @@ async fn main() -> Result<()> {
|
||||||
let login_state = Arc::new(Mutex::new(initial));
|
let login_state = Arc::new(Mutex::new(initial));
|
||||||
let ui_state = login_state.clone();
|
let ui_state = login_state.clone();
|
||||||
let bus = Bus::new();
|
let bus = Bus::new();
|
||||||
spawn_events_vacuum(bus.clone());
|
|
||||||
let ui_bus = bus.clone();
|
let ui_bus = bus.clone();
|
||||||
let ui_socket = cli.socket.clone();
|
let ui_socket = cli.socket.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
@ -154,23 +153,6 @@ async fn serve(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Vacuum events older than 7 days, cap to 2000 most-recent rows.
|
|
||||||
/// Runs immediately, then hourly.
|
|
||||||
fn spawn_events_vacuum(bus: Bus) {
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let interval_secs = 3600u64;
|
|
||||||
let keep_secs: i64 = 7 * 24 * 3600;
|
|
||||||
let keep_rows = 2000;
|
|
||||||
loop {
|
|
||||||
let n = bus.vacuum(keep_secs, keep_rows);
|
|
||||||
if n > 0 {
|
|
||||||
tracing::info!(removed = n, "events vacuum");
|
|
||||||
}
|
|
||||||
tokio::time::sleep(Duration::from_secs(interval_secs)).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Per-turn user prompt. The role/tools/etc. is in the system prompt
|
/// Per-turn user prompt. The role/tools/etc. is in the system prompt
|
||||||
/// (`prompts/agent.md` → `claude --system-prompt-file`); this is just the
|
/// (`prompts/agent.md` → `claude --system-prompt-file`); this is just the
|
||||||
/// wake signal claude reacts to. `unread` is the count of *other*
|
/// wake signal claude reacts to. `unread` is the count of *other*
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,6 @@ async fn main() -> Result<()> {
|
||||||
let login_state = Arc::new(Mutex::new(initial));
|
let login_state = Arc::new(Mutex::new(initial));
|
||||||
let ui_state = login_state.clone();
|
let ui_state = login_state.clone();
|
||||||
let bus = Bus::new();
|
let bus = Bus::new();
|
||||||
spawn_events_vacuum(bus.clone());
|
|
||||||
let ui_bus = bus.clone();
|
let ui_bus = bus.clone();
|
||||||
let ui_socket = cli.socket.clone();
|
let ui_socket = cli.socket.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
@ -90,22 +89,6 @@ async fn main() -> Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Vacuum events older than 7 days, cap to 2000 most-recent rows.
|
|
||||||
fn spawn_events_vacuum(bus: Bus) {
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let interval_secs = 3600u64;
|
|
||||||
let keep_secs: i64 = 7 * 24 * 3600;
|
|
||||||
let keep_rows = 2000;
|
|
||||||
loop {
|
|
||||||
let n = bus.vacuum(keep_secs, keep_rows);
|
|
||||||
if n > 0 {
|
|
||||||
tracing::info!(removed = n, "events vacuum");
|
|
||||||
}
|
|
||||||
tokio::time::sleep(Duration::from_secs(interval_secs)).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> {
|
async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> {
|
||||||
tracing::info!(socket = %socket.display(), "hive-m1nd serve");
|
tracing::info!(socket = %socket.display(), "hive-m1nd serve");
|
||||||
let mcp_config = turn::write_mcp_config(socket).await?;
|
let mcp_config = turn::write_mcp_config(socket).await?;
|
||||||
|
|
|
||||||
|
|
@ -115,28 +115,6 @@ impl EventStore {
|
||||||
Ok(out)
|
Ok(out)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Drop rows older than `older_than_secs` AND any rows beyond
|
|
||||||
/// `keep_rows` newest. Two-stage so a quiet agent keeps a useful
|
|
||||||
/// tail and a chatty one is bounded.
|
|
||||||
fn vacuum(&self, older_than_secs: i64, keep_rows: usize) -> rusqlite::Result<u64> {
|
|
||||||
let now = std::time::SystemTime::now()
|
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
|
||||||
.ok()
|
|
||||||
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
|
||||||
.unwrap_or(0);
|
|
||||||
let cutoff = now - older_than_secs;
|
|
||||||
let conn = self.conn.lock().unwrap();
|
|
||||||
let by_age = conn.execute("DELETE FROM events WHERE ts < ?1", params![cutoff])?;
|
|
||||||
let keep_i = i64::try_from(keep_rows).unwrap_or(i64::MAX);
|
|
||||||
let by_count = conn.execute(
|
|
||||||
"DELETE FROM events
|
|
||||||
WHERE id NOT IN (
|
|
||||||
SELECT id FROM events ORDER BY id DESC LIMIT ?1
|
|
||||||
)",
|
|
||||||
params![keep_i],
|
|
||||||
)?;
|
|
||||||
Ok(u64::try_from(by_age + by_count).unwrap_or(0))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
@ -195,12 +173,6 @@ impl Bus {
|
||||||
store.recent(HISTORY_CAPACITY).unwrap_or_default()
|
store.recent(HISTORY_CAPACITY).unwrap_or_default()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Drop events older than `older_than_secs` and keep only the
|
|
||||||
/// newest `keep_rows`. Called periodically by the harness.
|
|
||||||
pub fn vacuum(&self, older_than_secs: i64, keep_rows: usize) -> u64 {
|
|
||||||
let Some(store) = &self.store else { return 0 };
|
|
||||||
store.vacuum(older_than_secs, keep_rows).unwrap_or(0)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Bus {
|
impl Default for Bus {
|
||||||
|
|
|
||||||
68
hive-c0re/src/events_vacuum.rs
Normal file
68
hive-c0re/src/events_vacuum.rs
Normal file
|
|
@ -0,0 +1,68 @@
|
||||||
|
//! Host-side vacuum of every per-agent events.sqlite. The harness
|
||||||
|
//! writes to `/state/hyperhive-events.sqlite` (bind-mounted from
|
||||||
|
//! `/var/lib/hyperhive/agents/<name>/state/`); we open the same file
|
||||||
|
//! from the host every hour and apply the same two-stage delete
|
||||||
|
//! (drop rows older than `keep_secs`, then trim to `keep_rows`
|
||||||
|
//! newest). Keeping retention on the host means agents don't need any
|
||||||
|
//! cleanup wiring of their own, and a misbehaving harness can't
|
||||||
|
//! disable its own vacuum.
|
||||||
|
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
use rusqlite::{Connection, Result, params};
|
||||||
|
|
||||||
|
use crate::coordinator::Coordinator;
|
||||||
|
|
||||||
|
const VACUUM_INTERVAL: Duration = Duration::from_secs(3600);
|
||||||
|
const KEEP_SECS: i64 = 7 * 24 * 3600;
|
||||||
|
const KEEP_ROWS: i64 = 2000;
|
||||||
|
|
||||||
|
/// Background loop: sweep every existing agent state dir hourly, run
|
||||||
|
/// the vacuum SQL against its events.sqlite if present. Errors are
|
||||||
|
/// logged but don't tear the loop down.
|
||||||
|
pub fn spawn(coord: Arc<Coordinator>) {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
sweep_once();
|
||||||
|
// touching coord keeps the type wired in case future sweeps
|
||||||
|
// need approvals/etc.; the ref is otherwise unused today.
|
||||||
|
let _ = &coord;
|
||||||
|
tokio::time::sleep(VACUUM_INTERVAL).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sweep_once() {
|
||||||
|
for name in Coordinator::kept_state_names() {
|
||||||
|
let path = Coordinator::agent_notes_dir(&name).join("hyperhive-events.sqlite");
|
||||||
|
if !path.exists() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
match vacuum_file(&path) {
|
||||||
|
Ok(0) => {}
|
||||||
|
Ok(n) => tracing::info!(agent = %name, removed = n, "events vacuum"),
|
||||||
|
Err(e) => tracing::warn!(agent = %name, error = ?e, "events vacuum failed"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn vacuum_file(path: &Path) -> Result<u64> {
|
||||||
|
let conn = Connection::open(path)?;
|
||||||
|
let now = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.ok()
|
||||||
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
||||||
|
.unwrap_or(0);
|
||||||
|
let cutoff = now - KEEP_SECS;
|
||||||
|
let by_age = conn.execute("DELETE FROM events WHERE ts < ?1", params![cutoff])?;
|
||||||
|
let by_count = conn.execute(
|
||||||
|
"DELETE FROM events
|
||||||
|
WHERE id NOT IN (
|
||||||
|
SELECT id FROM events ORDER BY id DESC LIMIT ?1
|
||||||
|
)",
|
||||||
|
params![KEEP_ROWS],
|
||||||
|
)?;
|
||||||
|
Ok(u64::try_from(by_age + by_count).unwrap_or(0))
|
||||||
|
}
|
||||||
|
|
@ -14,6 +14,7 @@ mod client;
|
||||||
mod coordinator;
|
mod coordinator;
|
||||||
mod dashboard;
|
mod dashboard;
|
||||||
mod lifecycle;
|
mod lifecycle;
|
||||||
|
mod events_vacuum;
|
||||||
mod manager_server;
|
mod manager_server;
|
||||||
mod operator_questions;
|
mod operator_questions;
|
||||||
mod server;
|
mod server;
|
||||||
|
|
@ -126,6 +127,9 @@ async fn main() -> Result<()> {
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
|
tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
// Per-agent events.sqlite vacuum: host-side so the harness
|
||||||
|
// doesn't need any retention wiring of its own.
|
||||||
|
events_vacuum::spawn(coord.clone());
|
||||||
let dash_coord = coord.clone();
|
let dash_coord = coord.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {
|
if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue