From 6d52f67292ae1246a4a9d5e78d431174531d62ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?m=C3=BCde?= Date: Fri, 15 May 2026 19:40:38 +0200 Subject: [PATCH] broker: hourly vacuum of delivered messages older than 30 days undelivered rows are always kept regardless of age (still in flight). sweep runs immediately on serve start then every hour. logs row count when non-zero. keep_secs is hard-coded for now (30 days); can be config-driven later if a host wants to retain more / less for audit. --- TODO.md | 2 -- hive-c0re/src/broker.rs | 15 +++++++++++++++ hive-c0re/src/main.rs | 16 ++++++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/TODO.md b/TODO.md index 59d8538..2775bed 100644 --- a/TODO.md +++ b/TODO.md @@ -114,8 +114,6 @@ Pick anything from here when relevant. Cross-cutting design notes live in ## Lifecycle / reliability -- **Bounded broker.** Cap rows per recipient or auto-vacuum delivered - messages older than a threshold. sqlite is growing unbounded. - **Container crash events.** Watch `container@*.service` via D-Bus, push `HelperEvent::ContainerCrash` to the manager's inbox so the manager can react (restart, escalate, etc.). diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 232b15c..bd6b45f 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -171,6 +171,21 @@ impl Broker { } } + /// Delete delivered messages older than `older_than_secs`. Undelivered + /// rows are always kept regardless of age — those are still in flight + /// from the broker's POV. Returns the number of rows removed. + pub fn vacuum_delivered(&self, older_than_secs: i64) -> Result { + let cutoff = now_unix() - older_than_secs; + let conn = self.conn.lock().unwrap(); + let n = conn.execute( + "DELETE FROM messages + WHERE delivered_at IS NOT NULL + AND delivered_at < ?1", + params![cutoff], + )?; + Ok(u64::try_from(n).unwrap_or(0)) + } + pub fn recv(&self, recipient: &str) -> Result> { let conn = self.conn.lock().unwrap(); let row: Option<(i64, String, String, String)> = conn diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 6561703..e3fb11d 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -110,6 +110,22 @@ async fn main() -> Result<()> { tracing::warn!(error = ?e, "auto-update task failed"); } }); + // Periodic broker vacuum: drop delivered messages older than + // 30 days. Undelivered messages are always kept (still in + // flight). Runs hourly; first sweep happens immediately. + let vacuum_coord = coord.clone(); + tokio::spawn(async move { + let interval_secs = 3600u64; + let keep_secs: i64 = 30 * 24 * 3600; + loop { + match vacuum_coord.broker.vacuum_delivered(keep_secs) { + Ok(0) => {} + Ok(n) => tracing::info!(removed = n, "broker vacuum"), + Err(e) => tracing::warn!(error = ?e, "broker vacuum failed"), + } + tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; + } + }); let dash_coord = coord.clone(); tokio::spawn(async move { if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {