62 lines
2.3 KiB
Rust
62 lines
2.3 KiB
Rust
//! Host-side vacuum of every per-agent events.sqlite. The harness
|
|
//! writes to `/state/hyperhive-events.sqlite` (bind-mounted from
|
|
//! `/var/lib/hyperhive/agents/<name>/state/`); we open the same file
|
|
//! from the host every hour and delete rows older than `KEEP_SECS`.
|
|
//! Age-only — no row cap — so a chatty turn doesn't lose history
|
|
//! sooner than a quiet one; disk pressure on a sustained burst is
|
|
//! a cheaper problem than a missing event when the operator is
|
|
//! debugging a regression. Keeping retention on the host means
|
|
//! agents don't need any cleanup wiring of their own, and a
|
|
//! misbehaving harness can't disable its own vacuum.
|
|
|
|
use std::path::Path;
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
|
|
|
use rusqlite::{Connection, Result, params};
|
|
|
|
use crate::coordinator::Coordinator;
|
|
|
|
const VACUUM_INTERVAL: Duration = Duration::from_secs(3600);
|
|
const KEEP_SECS: i64 = 7 * 24 * 3600;
|
|
|
|
/// Background loop: sweep every existing agent state dir hourly, run
|
|
/// the vacuum SQL against its events.sqlite if present. Errors are
|
|
/// logged but don't tear the loop down.
|
|
pub fn spawn(coord: Arc<Coordinator>) {
|
|
tokio::spawn(async move {
|
|
loop {
|
|
sweep_once();
|
|
// touching coord keeps the type wired in case future sweeps
|
|
// need approvals/etc.; the ref is otherwise unused today.
|
|
let _ = &coord;
|
|
tokio::time::sleep(VACUUM_INTERVAL).await;
|
|
}
|
|
});
|
|
}
|
|
|
|
fn sweep_once() {
|
|
for name in Coordinator::kept_state_names() {
|
|
let path = Coordinator::agent_notes_dir(&name).join("hyperhive-events.sqlite");
|
|
if !path.exists() {
|
|
continue;
|
|
}
|
|
match vacuum_file(&path) {
|
|
Ok(0) => {}
|
|
Ok(n) => tracing::info!(agent = %name, removed = n, "events vacuum"),
|
|
Err(e) => tracing::warn!(agent = %name, error = ?e, "events vacuum failed"),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn vacuum_file(path: &Path) -> Result<u64> {
|
|
let conn = Connection::open(path)?;
|
|
let now = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.ok()
|
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
|
.unwrap_or(0);
|
|
let cutoff = now - KEEP_SECS;
|
|
let removed = conn.execute("DELETE FROM events WHERE ts < ?1", params![cutoff])?;
|
|
Ok(u64::try_from(removed).unwrap_or(0))
|
|
}
|