diff --git a/TODO.md b/TODO.md index 59d8538..2775bed 100644 --- a/TODO.md +++ b/TODO.md @@ -114,8 +114,6 @@ Pick anything from here when relevant. Cross-cutting design notes live in ## Lifecycle / reliability -- **Bounded broker.** Cap rows per recipient or auto-vacuum delivered - messages older than a threshold. sqlite is growing unbounded. - **Container crash events.** Watch `container@*.service` via D-Bus, push `HelperEvent::ContainerCrash` to the manager's inbox so the manager can react (restart, escalate, etc.). diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 232b15c..bd6b45f 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -171,6 +171,21 @@ impl Broker { } } + /// Delete delivered messages older than `older_than_secs`. Undelivered + /// rows are always kept regardless of age — those are still in flight + /// from the broker's POV. Returns the number of rows removed. + pub fn vacuum_delivered(&self, older_than_secs: i64) -> Result { + let cutoff = now_unix() - older_than_secs; + let conn = self.conn.lock().unwrap(); + let n = conn.execute( + "DELETE FROM messages + WHERE delivered_at IS NOT NULL + AND delivered_at < ?1", + params![cutoff], + )?; + Ok(u64::try_from(n).unwrap_or(0)) + } + pub fn recv(&self, recipient: &str) -> Result> { let conn = self.conn.lock().unwrap(); let row: Option<(i64, String, String, String)> = conn diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 6561703..e3fb11d 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -110,6 +110,22 @@ async fn main() -> Result<()> { tracing::warn!(error = ?e, "auto-update task failed"); } }); + // Periodic broker vacuum: drop delivered messages older than + // 30 days. Undelivered messages are always kept (still in + // flight). Runs hourly; first sweep happens immediately. + let vacuum_coord = coord.clone(); + tokio::spawn(async move { + let interval_secs = 3600u64; + let keep_secs: i64 = 30 * 24 * 3600; + loop { + match vacuum_coord.broker.vacuum_delivered(keep_secs) { + Ok(0) => {} + Ok(n) => tracing::info!(removed = n, "broker vacuum"), + Err(e) => tracing::warn!(error = ?e, "broker vacuum failed"), + } + tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; + } + }); let dash_coord = coord.clone(); tokio::spawn(async move { if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {