broker: hourly vacuum of delivered messages older than 30 days

undelivered rows are always kept regardless of age (still in flight).
sweep runs immediately on serve start then every hour. logs row count
when non-zero. keep_secs is hard-coded for now (30 days); can be
config-driven later if a host wants to retain more / less for audit.
This commit is contained in:
müde 2026-05-15 19:40:38 +02:00
parent a9ed33d94f
commit 6d52f67292
3 changed files with 31 additions and 2 deletions

View file

@ -171,6 +171,21 @@ impl Broker {
}
}
/// Delete delivered messages older than `older_than_secs`. Undelivered
/// rows are always kept regardless of age — those are still in flight
/// from the broker's POV. Returns the number of rows removed.
pub fn vacuum_delivered(&self, older_than_secs: i64) -> Result<u64> {
let cutoff = now_unix() - older_than_secs;
let conn = self.conn.lock().unwrap();
let n = conn.execute(
"DELETE FROM messages
WHERE delivered_at IS NOT NULL
AND delivered_at < ?1",
params![cutoff],
)?;
Ok(u64::try_from(n).unwrap_or(0))
}
pub fn recv(&self, recipient: &str) -> Result<Option<Message>> {
let conn = self.conn.lock().unwrap();
let row: Option<(i64, String, String, String)> = conn

View file

@ -110,6 +110,22 @@ async fn main() -> Result<()> {
tracing::warn!(error = ?e, "auto-update task failed");
}
});
// Periodic broker vacuum: drop delivered messages older than
// 30 days. Undelivered messages are always kept (still in
// flight). Runs hourly; first sweep happens immediately.
let vacuum_coord = coord.clone();
tokio::spawn(async move {
let interval_secs = 3600u64;
let keep_secs: i64 = 30 * 24 * 3600;
loop {
match vacuum_coord.broker.vacuum_delivered(keep_secs) {
Ok(0) => {}
Ok(n) => tracing::info!(removed = n, "broker vacuum"),
Err(e) => tracing::warn!(error = ?e, "broker vacuum failed"),
}
tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
}
});
let dash_coord = coord.clone();
tokio::spawn(async move {
if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {