broker: lease-style delivery — ack_turn + requeue_inflight close the no-drop loop

This commit is contained in:
damocles 2026-05-18 22:01:48 +02:00
parent 69a3ca7469
commit 690cb5ab5b
8 changed files with 684 additions and 35 deletions

View file

@ -1,6 +1,7 @@
//! Sqlite-backed message broker. Survives `hive-c0re` restart, and taps every
//! send/recv onto a broadcast channel so the dashboard can stream it.
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
@ -46,6 +47,18 @@ const EVENT_CHANNEL: usize = 256;
/// self-documenting.
pub type DueReminder = (String, i64, String, Option<String>);
/// A single message hand-off from broker to recipient. Carries the
/// broker's row id (so the harness can drive `ack_turn` later) and
/// the redelivery flag (so the harness can prepend the
/// "may already be handled" hint to the wake prompt). The
/// `Message` itself is identical to a pristine `Send` payload.
#[derive(Debug, Clone)]
pub struct Delivery {
pub id: i64,
pub redelivered: bool,
pub message: Message,
}
/// Row shape for [`Broker::list_pending_reminders`], shipped on the
/// dashboard `/api/reminders` response.
#[derive(Debug, Clone, Serialize)]
@ -99,9 +112,33 @@ pub enum MessageEvent {
},
}
/// Per-recipient in-memory bookkeeping for the deliver-then-ack
/// flow. Source of truth is the DB columns `delivered_at` +
/// `acked_at`; the in-memory state here is purely an optimisation
/// (avoids scanning the messages table on `AckTurn`) plus the
/// redelivery-hint marker.
#[derive(Default)]
struct RecipientInflight {
/// Message ids the broker has handed to this recipient since the
/// last `AckTurn`. Drained on `ack_turn`, which then runs a
/// single `UPDATE … WHERE id IN (…)` to set `acked_at`.
unacked_ids: Vec<i64>,
/// Message ids resurfaced by the most recent `requeue_inflight`
/// call. The next `recv` pop of any id in this set tags the
/// response with `redelivered: true` so the harness can prepend
/// the "may already be handled" hint to the wake prompt;
/// successful pops drain the id from the set.
requeued_ids: HashSet<i64>,
}
pub struct Broker {
conn: Mutex<Connection>,
events: broadcast::Sender<MessageEvent>,
/// Per-recipient deliver/ack tracking. Lost on hive-c0re restart
/// (harmless — the harness fires `RequeueInflight` on its own
/// boot, which rebuilds the `requeued_ids` set from the DB and
/// clears any stale `unacked_ids`).
inflight: Mutex<HashMap<String, RecipientInflight>>,
}
impl Broker {
@ -113,11 +150,13 @@ impl Broker {
let conn =
Connection::open(path).with_context(|| format!("open broker db {}", path.display()))?;
conn.execute_batch(SCHEMA).context("apply broker schema")?;
ensure_message_columns(&conn).context("migrate messages columns")?;
ensure_reminder_columns(&conn).context("migrate reminders columns")?;
let (events, _) = broadcast::channel(EVENT_CHANNEL);
Ok(Self {
conn: Mutex::new(conn),
events,
inflight: Mutex::new(HashMap::new()),
})
}
@ -229,10 +268,10 @@ impl Broker {
&self,
recipient: &str,
timeout: std::time::Duration,
) -> Result<Option<Message>> {
) -> Result<Option<Delivery>> {
let mut rx = self.subscribe();
if let Some(m) = self.recv(recipient)? {
return Ok(Some(m));
if let Some(d) = self.recv(recipient)? {
return Ok(Some(d));
}
let deadline = tokio::time::Instant::now() + timeout;
loop {
@ -246,8 +285,8 @@ impl Broker {
// pop (in case we missed our notification while behind).
Ok(Err(_)) => return self.recv(recipient),
Ok(Ok(MessageEvent::Sent { to, .. })) if to == recipient => {
if let Some(m) = self.recv(recipient)? {
return Ok(Some(m));
if let Some(d) = self.recv(recipient)? {
return Ok(Some(d));
}
// Lost a race (concurrent recv elsewhere). Keep waiting.
}
@ -256,22 +295,31 @@ impl Broker {
}
}
/// Delete delivered messages older than `older_than_secs`. Undelivered
/// rows are always kept regardless of age — those are still in flight
/// Delete fully-acked messages older than `older_than_secs`.
/// Unacked rows (delivered but not yet acknowledged by a clean
/// turn-end, plus undelivered rows) are always kept regardless of
/// age — the former because they're recoverable via
/// `requeue_inflight`, the latter because they're still in flight
/// from the broker's POV. Returns the number of rows removed.
pub fn vacuum_delivered(&self, older_than_secs: i64) -> Result<u64> {
let cutoff = now_unix() - older_than_secs;
let conn = self.conn.lock().unwrap();
let n = conn.execute(
"DELETE FROM messages
WHERE delivered_at IS NOT NULL
AND delivered_at < ?1",
WHERE acked_at IS NOT NULL
AND acked_at < ?1",
params![cutoff],
)?;
Ok(u64::try_from(n).unwrap_or(0))
}
pub fn recv(&self, recipient: &str) -> Result<Option<Message>> {
pub fn recv(&self, recipient: &str) -> Result<Option<Delivery>> {
// Lock order: inflight FIRST, then conn. `requeue_inflight` +
// `ack_turn` follow the same order so we never deadlock; the
// requeue path also needs both locks held together so a pop
// can't sneak in between its DB update + in-memory populate
// and miss the `redelivered` flag.
let mut inflight = self.inflight.lock().unwrap();
let conn = self.conn.lock().unwrap();
let row: Option<(i64, String, String, String)> = conn
.query_row(
@ -291,14 +339,113 @@ impl Broker {
"UPDATE messages SET delivered_at = ?1 WHERE id = ?2",
params![now_unix(), id],
)?;
// Track the id so the next `ack_turn(recipient)` can sweep it,
// and check whether it was resurfaced by a recent
// `requeue_inflight` (in which case the wake prompt gets the
// "may already be handled" hint). Both ops are O(1) per pop;
// the hash-set lookup runs at most once per delivery.
let slot = inflight.entry(recipient.to_owned()).or_default();
slot.unacked_ids.push(id);
let redelivered = slot.requeued_ids.remove(&id);
drop(conn);
drop(inflight);
let _ = self.events.send(MessageEvent::Delivered {
from: from.clone(),
to: to.clone(),
body: body.clone(),
at: now_unix(),
});
Ok(Some(Message { from, to, body }))
Ok(Some(Delivery {
id,
redelivered,
message: Message { from, to, body },
}))
}
/// Drain the per-recipient unacked-id list and mark every row
/// `acked_at = NOW`. Fired by the harness after `TurnOutcome::Ok`.
/// Returns the number of rows acked (zero is normal — claude
/// may have not called recv during the turn). Tolerant of ids
/// that no longer exist in the DB (vacuumed, manually deleted)
/// — `UPDATE … WHERE id IN (…)` simply matches zero rows.
pub fn ack_turn(&self, recipient: &str) -> Result<u64> {
// Same lock order as `recv` and `requeue_inflight`.
let mut inflight = self.inflight.lock().unwrap();
let ids: Vec<i64> = inflight
.get_mut(recipient)
.map(|s| std::mem::take(&mut s.unacked_ids))
.unwrap_or_default();
if ids.is_empty() {
return Ok(0);
}
let now = now_unix();
let conn = self.conn.lock().unwrap();
// Bind every id explicitly. Caps in the hundreds in the worst
// case (a single very chatty turn); well under sqlite's 999
// default param limit and we're already serialising on the
// broker mutex.
let placeholders = std::iter::repeat_n("?", ids.len())
.collect::<Vec<_>>()
.join(",");
let sql = format!("UPDATE messages SET acked_at = ? WHERE id IN ({placeholders})");
let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(ids.len() + 1);
params_vec.push(&now);
for id in &ids {
params_vec.push(id);
}
let n = conn.execute(&sql, params_vec.as_slice())?;
Ok(u64::try_from(n).unwrap_or(0))
}
/// Resurface every message the broker previously handed to this
/// recipient that never got `acked_at` set. Used by the harness at
/// boot to recover from the crashed-mid-turn / OOM-killed /
/// container-restarted cases. Three steps:
///
/// 1. Clear any stale in-memory state for this recipient (the
/// previous harness session's `unacked_ids` are irrelevant —
/// the new session will repopulate from fresh pops).
/// 2. Find every row where `recipient = me`, `delivered_at IS NOT
/// NULL`, `acked_at IS NULL`. Reset `delivered_at = NULL` so
/// the next `Recv` pops them again.
/// 3. Remember each id in the per-recipient `requeued_ids` set so
/// the next pop tags the response with `redelivered: true`.
///
/// Returns the number of rows requeued. Safe to call when there's
/// nothing in flight (returns 0). Safe to call multiple times
/// (idempotent — the second call finds nothing because the rows
/// are now back in the pending state).
pub fn requeue_inflight(&self, recipient: &str) -> Result<u64> {
// Hold inflight + conn together so a concurrent `recv` can't
// pop a just-requeued row between our DB update and our
// in-memory populate and miss the redelivered tag.
let mut inflight = self.inflight.lock().unwrap();
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id FROM messages
WHERE recipient = ?1
AND delivered_at IS NOT NULL
AND acked_at IS NULL",
)?;
let ids: Vec<i64> = stmt
.query_map(params![recipient], |row| row.get(0))?
.collect::<rusqlite::Result<_>>()?;
drop(stmt);
if !ids.is_empty() {
let placeholders = std::iter::repeat_n("?", ids.len())
.collect::<Vec<_>>()
.join(",");
let sql =
format!("UPDATE messages SET delivered_at = NULL WHERE id IN ({placeholders})");
let params_vec: Vec<&dyn rusqlite::ToSql> =
ids.iter().map(|id| id as &dyn rusqlite::ToSql).collect();
conn.execute(&sql, params_vec.as_slice())?;
}
let slot = inflight.entry(recipient.to_owned()).or_default();
slot.unacked_ids.clear();
slot.requeued_ids.clear();
slot.requeued_ids.extend(ids.iter().copied());
Ok(u64::try_from(ids.len()).unwrap_or(0))
}
/// Store a new reminder. Returns the reminder id.
@ -502,6 +649,30 @@ impl Broker {
}
}
/// Idempotent messages-table migrations. Adds `acked_at` and
/// back-fills it for every already-delivered row, so the
/// pre-migration sessions count as "fully handled" and won't be
/// resurfaced by the first `requeue_inflight` after upgrade.
fn ensure_message_columns(conn: &Connection) -> Result<()> {
let has: bool = conn
.prepare("SELECT 1 FROM pragma_table_info('messages') WHERE name = 'acked_at'")?
.exists([])?;
if !has {
conn.execute_batch("ALTER TABLE messages ADD COLUMN acked_at INTEGER;")
.context("add messages.acked_at column")?;
// Backfill: treat every existing delivered row as acked. The
// session it was delivered to is gone, so requeue would just
// surface phantom traffic to whatever harness reads next.
conn.execute(
"UPDATE messages SET acked_at = delivered_at \
WHERE delivered_at IS NOT NULL AND acked_at IS NULL",
[],
)
.context("backfill messages.acked_at from delivered_at")?;
}
Ok(())
}
/// Idempotent reminder-table migrations. `ALTER TABLE ADD COLUMN`
/// has no `IF NOT EXISTS` form in sqlite, so we probe
/// `pragma_table_info` per column. New deploys (table created by
@ -538,3 +709,179 @@ fn now_unix() -> i64 {
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
/// Per-process counter so each test gets a unique sqlite path even
/// when threads run concurrently. Avoids pulling in a `tempfile`
/// dep just for this one module.
static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
struct TmpBroker {
path: std::path::PathBuf,
pub broker: Broker,
}
impl Drop for TmpBroker {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}
fn open_broker() -> TmpBroker {
let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
let pid = std::process::id();
let path = std::env::temp_dir().join(format!("hive-broker-test-{pid}-{n}.sqlite"));
let _ = std::fs::remove_file(&path);
let broker = Broker::open(&path).expect("open broker");
TmpBroker { path, broker }
}
fn msg(from: &str, to: &str, body: &str) -> Message {
Message {
from: from.to_owned(),
to: to.to_owned(),
body: body.to_owned(),
}
}
/// Happy path: send → recv → `ack_turn` drains the in-memory list
/// and marks the row `acked_at IS NOT NULL`. A second recv finds
/// nothing pending (the row stays in the table for vacuum).
#[test]
fn ack_turn_marks_delivered_rows_acked() {
let h = open_broker();
let broker = &h.broker;
broker.send(&msg("a", "b", "hi")).unwrap();
let d = broker.recv("b").unwrap().expect("popped");
assert_eq!(d.message.body, "hi");
assert!(!d.redelivered);
assert_eq!(broker.ack_turn("b").unwrap(), 1);
// ack_turn drained the unacked list; calling again is a no-op.
assert_eq!(broker.ack_turn("b").unwrap(), 0);
// Recv finds nothing — the row is now delivered + acked.
assert!(broker.recv("b").unwrap().is_none());
}
/// Crash-recovery: send → recv → (no ack) → `requeue_inflight`
/// resets `delivered_at` + tags the next pop as redelivered. After
/// that `ack_turn` closes it out cleanly.
#[test]
fn requeue_inflight_resurfaces_unacked_with_redelivered_flag() {
let h = open_broker();
let broker = &h.broker;
broker.send(&msg("a", "b", "hi")).unwrap();
let d1 = broker.recv("b").unwrap().expect("popped");
assert!(!d1.redelivered);
// Simulate harness crash: never call ack_turn. Now boot the
// new harness — requeue_inflight resurfaces the row.
assert_eq!(broker.requeue_inflight("b").unwrap(), 1);
let d2 = broker.recv("b").unwrap().expect("popped again");
assert_eq!(d2.message.body, "hi");
assert!(
d2.redelivered,
"second pop should be tagged redelivered"
);
assert_eq!(broker.ack_turn("b").unwrap(), 1);
}
/// Idempotency: a second `requeue_inflight` on the same recipient
/// finds nothing because the prior call already reset
/// `delivered_at` (the row is back in the pending state, not
/// inflight).
#[test]
fn requeue_inflight_is_idempotent() {
let h = open_broker();
let broker = &h.broker;
broker.send(&msg("a", "b", "hi")).unwrap();
broker.recv("b").unwrap().expect("popped");
assert_eq!(broker.requeue_inflight("b").unwrap(), 1);
// Second call: the row is pending (delivered_at IS NULL) so
// nothing matches the inflight filter.
assert_eq!(broker.requeue_inflight("b").unwrap(), 0);
}
/// Multiple messages, partial drain: pop two, `ack_turn` covers
/// both even though one was popped before the other.
#[test]
fn ack_turn_handles_batch() {
let h = open_broker();
let broker = &h.broker;
broker.send(&msg("a", "b", "one")).unwrap();
broker.send(&msg("a", "b", "two")).unwrap();
broker.send(&msg("a", "b", "three")).unwrap();
broker.recv("b").unwrap().expect("popped 1");
broker.recv("b").unwrap().expect("popped 2");
broker.recv("b").unwrap().expect("popped 3");
assert_eq!(broker.ack_turn("b").unwrap(), 3);
assert!(broker.recv("b").unwrap().is_none());
}
/// Vacuum filter respects the new `acked_at` semantics — a
/// delivered-but-not-acked row is NOT vacuumed regardless of
/// age (the requeue path needs it).
#[test]
fn vacuum_preserves_unacked_inflight_rows() {
let h = open_broker();
let broker = &h.broker;
broker.send(&msg("a", "b", "stuck")).unwrap();
broker.recv("b").unwrap().expect("popped");
// Wide window — should still skip unacked rows.
let removed = broker.vacuum_delivered(-i64::from(u8::MAX)).unwrap();
assert_eq!(removed, 0, "unacked inflight row must survive vacuum");
// After ack_turn the row is fair game.
broker.ack_turn("b").unwrap();
let removed = broker.vacuum_delivered(-i64::from(u8::MAX)).unwrap();
assert_eq!(removed, 1, "acked row is now vacuumable");
}
/// Recv ordering: requeued rows go back into FIFO position
/// (they keep their original id). New sends added after the
/// requeue arrive after them.
#[test]
fn requeued_rows_come_back_in_original_order() {
let h = open_broker();
let broker = &h.broker;
broker.send(&msg("a", "b", "first")).unwrap();
broker.send(&msg("a", "b", "second")).unwrap();
// Pop both, ack neither.
broker.recv("b").unwrap().expect("popped 1");
broker.recv("b").unwrap().expect("popped 2");
broker.requeue_inflight("b").unwrap();
// Now add a brand new message AFTER the requeue.
broker.send(&msg("a", "b", "third")).unwrap();
let d1 = broker.recv("b").unwrap().expect("re-pop 1");
assert_eq!(d1.message.body, "first");
assert!(d1.redelivered);
let d2 = broker.recv("b").unwrap().expect("re-pop 2");
assert_eq!(d2.message.body, "second");
assert!(d2.redelivered);
let d3 = broker.recv("b").unwrap().expect("re-pop 3");
assert_eq!(d3.message.body, "third");
assert!(
!d3.redelivered,
"fresh-send-after-requeue must NOT be tagged redelivered"
);
}
/// Per-recipient isolation: `requeue_inflight("a")` doesn't touch
/// b's inflight rows.
#[test]
fn requeue_inflight_is_per_recipient() {
let h = open_broker();
let broker = &h.broker;
broker.send(&msg("x", "alice", "for alice")).unwrap();
broker.send(&msg("x", "bob", "for bob")).unwrap();
broker.recv("alice").unwrap().expect("popped alice");
broker.recv("bob").unwrap().expect("popped bob");
// Requeue only alice. Bob's row stays inflight.
assert_eq!(broker.requeue_inflight("alice").unwrap(), 1);
let d = broker.recv("alice").unwrap().expect("re-pop alice");
assert!(d.redelivered);
// Bob has nothing pending (his row is still delivered, not requeued).
assert!(broker.recv("bob").unwrap().is_none());
}
}