//! Sqlite-backed message broker. Survives `hive-c0re` restart, and taps every //! send/recv onto a broadcast channel so the dashboard can stream it. use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::Mutex; use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result}; use hive_sh4re::{InboxRow, Message}; use rusqlite::{Connection, OptionalExtension, params}; use serde::Serialize; use tokio::sync::broadcast; const SCHEMA: &str = r" CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, sender TEXT NOT NULL, recipient TEXT NOT NULL, body TEXT NOT NULL, sent_at INTEGER NOT NULL, delivered_at INTEGER ); CREATE INDEX IF NOT EXISTS idx_messages_undelivered ON messages (recipient, id) WHERE delivered_at IS NULL; CREATE TABLE IF NOT EXISTS reminders ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent TEXT NOT NULL, message TEXT NOT NULL, file_path TEXT, due_at INTEGER NOT NULL, created_at INTEGER NOT NULL, sent_at INTEGER ); CREATE INDEX IF NOT EXISTS idx_reminders_due ON reminders (agent, due_at) WHERE sent_at IS NULL; "; /// Capacity of the live event channel. Slow subscribers (e.g. an idle browser) /// may drop events past this; we send a `lagged` notice in their stream. const EVENT_CHANNEL: usize = 256; /// Row shape returned by [`Broker::get_due_reminders`]: /// `(agent, reminder_id, message, file_path)`. Type alias keeps /// `clippy::type_complexity` quiet and makes the scheduler call site /// self-documenting. pub type DueReminder = (String, i64, String, Option); /// A single message hand-off from broker to recipient. Carries the /// broker's row id (so the harness can drive `ack_turn` later) and /// the redelivery flag (so the harness can prepend the /// "may already be handled" hint to the wake prompt). The /// `Message` itself is identical to a pristine `Send` payload. #[derive(Debug, Clone)] pub struct Delivery { pub id: i64, pub redelivered: bool, pub message: Message, } /// Row shape for [`Broker::list_pending_reminders`], shipped on the /// dashboard `/api/reminders` response. #[derive(Debug, Clone, Serialize)] pub struct PendingReminder { pub id: i64, pub agent: String, pub message: String, #[serde(skip_serializing_if = "Option::is_none")] pub file_path: Option, pub due_at: i64, pub created_at: i64, /// Most recent delivery failure for this row, if any. Cleared /// to NULL on operator retry. Surfaced inline in the dashboard /// so a stuck reminder doesn't just silently retry forever. #[serde(skip_serializing_if = "Option::is_none")] pub last_error: Option, /// Number of failed delivery attempts since the row was /// created or last retried. After `MAX_REMINDER_ATTEMPTS` the /// scheduler stops trying (the row stays in `pending` with the /// error so the operator can decide between retry + cancel). #[serde(default)] pub attempt_count: u32, } /// Stop retrying a row after this many consecutive failures. The /// scheduler quits scheduling it until an operator explicitly /// retries (which resets the counter) or cancels (which deletes /// the row). Below the cap the existing 5s tick re-attempts each /// time the row is due. pub const MAX_REMINDER_ATTEMPTS: u32 = 5; /// Intra-process broker event. `recv_blocking` listens on the same /// channel as the dashboard forwarder; the forwarder re-emits each /// event as a `DashboardEvent` with a freshly-stamped seq from the /// Coordinator. The broker itself doesn't stamp seqs — that's a wire /// concern, not a storage concern. #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "snake_case", tag = "kind")] pub enum MessageEvent { Sent { from: String, to: String, body: String, at: i64, }, Delivered { from: String, to: String, body: String, at: i64, }, } /// Per-recipient in-memory bookkeeping for the deliver-then-ack /// flow. Source of truth is the DB columns `delivered_at` + /// `acked_at`; the in-memory state here is purely an optimisation /// (avoids scanning the messages table on `AckTurn`) plus the /// redelivery-hint marker. #[derive(Default)] struct RecipientInflight { /// Message ids the broker has handed to this recipient since the /// last `AckTurn`. Drained on `ack_turn`, which then runs a /// single `UPDATE … WHERE id IN (…)` to set `acked_at`. unacked_ids: Vec, /// Message ids resurfaced by the most recent `requeue_inflight` /// call. The next `recv` pop of any id in this set tags the /// response with `redelivered: true` so the harness can prepend /// the "may already be handled" hint to the wake prompt; /// successful pops drain the id from the set. requeued_ids: HashSet, } pub struct Broker { conn: Mutex, events: broadcast::Sender, /// Per-recipient deliver/ack tracking. Lost on hive-c0re restart /// (harmless — the harness fires `RequeueInflight` on its own /// boot, which rebuilds the `requeued_ids` set from the DB and /// clears any stale `unacked_ids`). inflight: Mutex>, } impl Broker { pub fn open(path: &Path) -> Result { if let Some(parent) = path.parent() { std::fs::create_dir_all(parent) .with_context(|| format!("create db parent {}", parent.display()))?; } let conn = Connection::open(path).with_context(|| format!("open broker db {}", path.display()))?; conn.execute_batch(SCHEMA).context("apply broker schema")?; ensure_message_columns(&conn).context("migrate messages columns")?; ensure_reminder_columns(&conn).context("migrate reminders columns")?; let (events, _) = broadcast::channel(EVENT_CHANNEL); Ok(Self { conn: Mutex::new(conn), events, inflight: Mutex::new(HashMap::new()), }) } pub fn subscribe(&self) -> broadcast::Receiver { self.events.subscribe() } pub fn send(&self, message: &Message) -> Result<()> { let conn = self.conn.lock().unwrap(); conn.execute( "INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)", params![message.from, message.to, message.body, now_unix()], )?; drop(conn); let _ = self.events.send(MessageEvent::Sent { from: message.from.clone(), to: message.to.clone(), body: message.body.clone(), at: now_unix(), }); Ok(()) } /// Latest `limit` messages addressed to `recipient`, newest-first. /// Includes delivered + undelivered alike — used for the operator /// inbox view on the dashboard. Caller decides what to show. pub fn recent_for(&self, recipient: &str, limit: u64) -> Result> { let conn = self.conn.lock().unwrap(); let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX); let mut stmt = conn.prepare( "SELECT id, sender, body, sent_at FROM messages WHERE recipient = ?1 ORDER BY id DESC LIMIT ?2", )?; let rows = stmt.query_map(params![recipient, limit_i], |row| { Ok(InboxRow { id: row.get(0)?, from: row.get(1)?, body: row.get(2)?, at: row.get(3)?, }) })?; rows.collect::>>() .map_err(Into::into) } /// Latest `limit` messages across every recipient, newest-first. /// Backs the dashboard's message-flow backfill so a reload doesn't /// blank the operator's view of recent traffic. Returns each row as /// a [`MessageEvent::Sent`] so the dashboard's live renderer (which /// already speaks `MessageEvent`) can replay history through the /// same code path. We don't synthesise `Delivered` events here — /// the recv-side acks live in a different table column and would /// double-render on backfill; the live stream picks them up /// immediately on the first new `recv`. pub fn recent_all(&self, limit: u64) -> Result> { let conn = self.conn.lock().unwrap(); let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX); let mut stmt = conn.prepare( "SELECT sender, recipient, body, sent_at FROM messages ORDER BY id DESC LIMIT ?1", )?; let rows = stmt.query_map(params![limit_i], |row| { Ok(MessageEvent::Sent { from: row.get(0)?, to: row.get(1)?, body: row.get(2)?, at: row.get(3)?, }) })?; rows.collect::>>() .map_err(Into::into) } /// Number of undelivered messages addressed to `recipient`. Non-mutating /// — used by the harness to surface "N unread" in tool-result status /// lines without popping the queue. pub fn count_pending(&self, recipient: &str) -> Result { let conn = self.conn.lock().unwrap(); let n: i64 = conn.query_row( "SELECT COUNT(*) FROM messages WHERE recipient = ?1 AND delivered_at IS NULL", params![recipient], |row| row.get(0), )?; Ok(u64::try_from(n.max(0)).unwrap_or(0)) } /// Long-poll variant of `recv`: returns immediately if there's a /// pending message; otherwise waits up to `timeout` for the broker to /// emit a `Sent { to: recipient }` event, then retries the pop. Lets /// agents react to new mail without polling their socket on a fixed /// interval. /// /// **Subscribe-before-check order matters.** If we polled the sqlite /// row first and only then called `subscribe()`, a concurrent `send` /// landing in that window would commit + broadcast its event *before* /// our receiver existed — and we'd then sit on the long-poll until /// the timeout (or another, unrelated send) fired. That looked /// externally like "the agent processed one wake then went deaf /// until the operator poked it again". Subscribing first guarantees /// any post-subscribe send notifies us; the redundant `recv()` /// catches the message either way. pub async fn recv_blocking( &self, recipient: &str, timeout: std::time::Duration, ) -> Result> { let mut rx = self.subscribe(); if let Some(d) = self.recv(recipient)? { return Ok(Some(d)); } let deadline = tokio::time::Instant::now() + timeout; loop { let Some(remaining) = deadline.checked_duration_since(tokio::time::Instant::now()) else { return Ok(None); }; match tokio::time::timeout(remaining, rx.recv()).await { Err(_) => return Ok(None), // Channel lagged or closed — fall back to a single direct // pop (in case we missed our notification while behind). Ok(Err(_)) => return self.recv(recipient), Ok(Ok(MessageEvent::Sent { to, .. })) if to == recipient => { if let Some(d) = self.recv(recipient)? { return Ok(Some(d)); } // Lost a race (concurrent recv elsewhere). Keep waiting. } Ok(Ok(_)) => {} } } } /// Delete fully-acked messages older than `older_than_secs`. /// Unacked rows (delivered but not yet acknowledged by a clean /// turn-end, plus undelivered rows) are always kept regardless of /// age — the former because they're recoverable via /// `requeue_inflight`, the latter because they're still in flight /// from the broker's POV. Returns the number of rows removed. pub fn vacuum_delivered(&self, older_than_secs: i64) -> Result { let cutoff = now_unix() - older_than_secs; let conn = self.conn.lock().unwrap(); let n = conn.execute( "DELETE FROM messages WHERE acked_at IS NOT NULL AND acked_at < ?1", params![cutoff], )?; Ok(u64::try_from(n).unwrap_or(0)) } pub fn recv(&self, recipient: &str) -> Result> { // Lock order: inflight FIRST, then conn. `requeue_inflight` + // `ack_turn` follow the same order so we never deadlock; the // requeue path also needs both locks held together so a pop // can't sneak in between its DB update + in-memory populate // and miss the `redelivered` flag. let mut inflight = self.inflight.lock().unwrap(); let conn = self.conn.lock().unwrap(); let row: Option<(i64, String, String, String)> = conn .query_row( "SELECT id, sender, recipient, body FROM messages WHERE recipient = ?1 AND delivered_at IS NULL ORDER BY id ASC LIMIT 1", params![recipient], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), ) .optional()?; let Some((id, from, to, body)) = row else { return Ok(None); }; conn.execute( "UPDATE messages SET delivered_at = ?1 WHERE id = ?2", params![now_unix(), id], )?; // Track the id so the next `ack_turn(recipient)` can sweep it, // and check whether it was resurfaced by a recent // `requeue_inflight` (in which case the wake prompt gets the // "may already be handled" hint). Both ops are O(1) per pop; // the hash-set lookup runs at most once per delivery. let slot = inflight.entry(recipient.to_owned()).or_default(); slot.unacked_ids.push(id); let redelivered = slot.requeued_ids.remove(&id); drop(conn); drop(inflight); let _ = self.events.send(MessageEvent::Delivered { from: from.clone(), to: to.clone(), body: body.clone(), at: now_unix(), }); Ok(Some(Delivery { id, redelivered, message: Message { from, to, body }, })) } /// Pop up to `max` pending messages for `recipient` in one /// round-trip. Same per-row semantics as `recv`: every popped row /// is marked `delivered_at = NOW`, pushed onto the per-recipient /// `unacked_ids` list (so the next `ack_turn` closes them out), /// and tagged with `redelivered = true` if it was resurfaced by /// the most recent `requeue_inflight`. Emits one /// `MessageEvent::Delivered` per popped row so the dashboard /// forwarder stream stays consistent with the single-row path. /// /// `max == 0` short-circuits to an empty vec (no DB hit); any /// positive value caps the batch at `max`. FIFO order matches /// `recv`. pub fn recv_batch(&self, recipient: &str, max: usize) -> Result> { if max == 0 { return Ok(Vec::new()); } // Same lock order as `recv` / `ack_turn` / `requeue_inflight`. let mut inflight = self.inflight.lock().unwrap(); let conn = self.conn.lock().unwrap(); let max_i = i64::try_from(max).unwrap_or(i64::MAX); let mut stmt = conn.prepare( "SELECT id, sender, recipient, body FROM messages WHERE recipient = ?1 AND delivered_at IS NULL ORDER BY id ASC LIMIT ?2", )?; let rows: Vec<(i64, String, String, String)> = stmt .query_map(params![recipient, max_i], |row| { Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)) })? .collect::>()?; drop(stmt); if rows.is_empty() { return Ok(Vec::new()); } // Stamp all popped rows in a single UPDATE — under the broker // mutex, well within sqlite's 999-param default. let now = now_unix(); let ids: Vec = rows.iter().map(|(id, _, _, _)| *id).collect(); let placeholders = std::iter::repeat_n("?", ids.len()) .collect::>() .join(","); let sql = format!("UPDATE messages SET delivered_at = ? WHERE id IN ({placeholders})"); let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(ids.len() + 1); params_vec.push(&now); for id in &ids { params_vec.push(id); } conn.execute(&sql, params_vec.as_slice())?; drop(conn); // Bookkeeping + assemble the Delivery list. Per-row // `requeued_ids` lookup runs once per pop, same as `recv`. let slot = inflight.entry(recipient.to_owned()).or_default(); let mut deliveries = Vec::with_capacity(rows.len()); for (id, from, to, body) in rows { slot.unacked_ids.push(id); let redelivered = slot.requeued_ids.remove(&id); deliveries.push(Delivery { id, redelivered, message: Message { from, to, body }, }); } drop(inflight); // Mirror the per-row Delivered emit `recv` does so the // dashboard forwarder sees one event per message regardless of // which surface the harness used. for d in &deliveries { let _ = self.events.send(MessageEvent::Delivered { from: d.message.from.clone(), to: d.message.to.clone(), body: d.message.body.clone(), at: now, }); } Ok(deliveries) } /// Drain the per-recipient unacked-id list and mark every row /// `acked_at = NOW`. Fired by the harness after `TurnOutcome::Ok`. /// Returns the number of rows acked (zero is normal — claude /// may have not called recv during the turn). Tolerant of ids /// that no longer exist in the DB (vacuumed, manually deleted) /// — `UPDATE … WHERE id IN (…)` simply matches zero rows. pub fn ack_turn(&self, recipient: &str) -> Result { // Same lock order as `recv` and `requeue_inflight`. let mut inflight = self.inflight.lock().unwrap(); let ids: Vec = inflight .get_mut(recipient) .map(|s| std::mem::take(&mut s.unacked_ids)) .unwrap_or_default(); if ids.is_empty() { return Ok(0); } let now = now_unix(); let conn = self.conn.lock().unwrap(); // Bind every id explicitly. Caps in the hundreds in the worst // case (a single very chatty turn); well under sqlite's 999 // default param limit and we're already serialising on the // broker mutex. let placeholders = std::iter::repeat_n("?", ids.len()) .collect::>() .join(","); let sql = format!("UPDATE messages SET acked_at = ? WHERE id IN ({placeholders})"); let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(ids.len() + 1); params_vec.push(&now); for id in &ids { params_vec.push(id); } let n = conn.execute(&sql, params_vec.as_slice())?; Ok(u64::try_from(n).unwrap_or(0)) } /// Resurface every message the broker previously handed to this /// recipient that never got `acked_at` set. Used by the harness at /// boot to recover from the crashed-mid-turn / OOM-killed / /// container-restarted cases. Three steps: /// /// 1. Clear any stale in-memory state for this recipient (the /// previous harness session's `unacked_ids` are irrelevant — /// the new session will repopulate from fresh pops). /// 2. Find every row where `recipient = me`, `delivered_at IS NOT /// NULL`, `acked_at IS NULL`. Reset `delivered_at = NULL` so /// the next `Recv` pops them again. /// 3. Remember each id in the per-recipient `requeued_ids` set so /// the next pop tags the response with `redelivered: true`. /// /// Returns the number of rows requeued. Safe to call when there's /// nothing in flight (returns 0). Safe to call multiple times /// (idempotent — the second call finds nothing because the rows /// are now back in the pending state). pub fn requeue_inflight(&self, recipient: &str) -> Result { // Hold inflight + conn together so a concurrent `recv` can't // pop a just-requeued row between our DB update and our // in-memory populate and miss the redelivered tag. let mut inflight = self.inflight.lock().unwrap(); let conn = self.conn.lock().unwrap(); let mut stmt = conn.prepare( "SELECT id FROM messages WHERE recipient = ?1 AND delivered_at IS NOT NULL AND acked_at IS NULL", )?; let ids: Vec = stmt .query_map(params![recipient], |row| row.get(0))? .collect::>()?; drop(stmt); if !ids.is_empty() { let placeholders = std::iter::repeat_n("?", ids.len()) .collect::>() .join(","); let sql = format!("UPDATE messages SET delivered_at = NULL WHERE id IN ({placeholders})"); let params_vec: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|id| id as &dyn rusqlite::ToSql).collect(); conn.execute(&sql, params_vec.as_slice())?; } let slot = inflight.entry(recipient.to_owned()).or_default(); slot.unacked_ids.clear(); slot.requeued_ids.clear(); slot.requeued_ids.extend(ids.iter().copied()); Ok(u64::try_from(ids.len()).unwrap_or(0)) } /// Store a new reminder. Returns the reminder id. pub fn store_reminder( &self, agent: &str, message: &str, file_path: Option<&str>, due_at: i64, ) -> Result { let conn = self.conn.lock().unwrap(); conn.execute( "INSERT INTO reminders (agent, message, file_path, due_at, created_at) VALUES (?1, ?2, ?3, ?4, ?5)", params![agent, message, file_path, due_at, now_unix()], )?; let id = conn.last_insert_rowid(); Ok(id) } /// Every reminder still pending delivery, newest-first. Used by the /// dashboard's reminders pane so the operator can see what's queued /// + cancel rows that are no longer wanted. pub fn list_pending_reminders(&self) -> Result> { let conn = self.conn.lock().unwrap(); let mut stmt = conn.prepare( "SELECT id, agent, message, file_path, due_at, created_at, \ last_error, attempt_count \ FROM reminders \ WHERE sent_at IS NULL \ ORDER BY due_at ASC", )?; let rows = stmt.query_map([], |row| { let attempts: i64 = row.get(7)?; Ok(PendingReminder { id: row.get(0)?, agent: row.get(1)?, message: row.get(2)?, file_path: row.get(3)?, due_at: row.get(4)?, created_at: row.get(5)?, last_error: row.get(6)?, attempt_count: u32::try_from(attempts).unwrap_or(0), }) })?; rows.collect::>>() .context("list pending reminders") } /// Mark a delivery attempt as failed: bump `attempt_count` and /// stash the error string. Called by `reminder_scheduler::tick` /// when `deliver_reminder` returns Err. Soft-cap behaviour /// lives in `get_due_reminders` (rows over the cap drop out /// of the due-list and stop being attempted until retry). pub fn record_reminder_failure(&self, id: i64, reason: &str) -> Result<()> { let conn = self.conn.lock().unwrap(); conn.execute( "UPDATE reminders \ SET attempt_count = attempt_count + 1, last_error = ?1 \ WHERE id = ?2 AND sent_at IS NULL", params![reason, id], )?; Ok(()) } /// Clear the failure state on a pending reminder so the /// scheduler picks it up again. No-op when the row is already /// fresh (`attempt_count == 0`). Returns the number of rows /// affected so callers can distinguish "retried" from "no /// such pending reminder" (already delivered, or wrong id). pub fn reset_reminder_failure(&self, id: i64) -> Result { let conn = self.conn.lock().unwrap(); let n = conn.execute( "UPDATE reminders \ SET attempt_count = 0, last_error = NULL \ WHERE id = ?1 AND sent_at IS NULL", params![id], )?; Ok(n) } /// Count this agent's still-pending (un-delivered) reminders. /// Used by the per-turn stats sink for a cheap "what was queued /// at turn-end" snapshot. pub fn count_pending_reminders_for(&self, agent: &str) -> Result { let conn = self.conn.lock().unwrap(); let n: i64 = conn.query_row( "SELECT COUNT(*) FROM reminders WHERE agent = ?1 AND sent_at IS NULL", params![agent], |row| row.get(0), )?; Ok(u64::try_from(n).unwrap_or(0)) } /// Delete a reminder by id. Returns the number of rows removed (0 /// when the id never existed or was already delivered). Hard /// delete rather than soft so the row doesn't linger and confuse a /// re-creation under the same id. pub fn cancel_reminder(&self, id: i64) -> Result { let conn = self.conn.lock().unwrap(); let n = conn.execute( "DELETE FROM reminders WHERE id = ?1 AND sent_at IS NULL", params![id], )?; Ok(n) } /// Cancel a pending reminder on behalf of `canceller`. Returns /// the owner agent name on success (handy for logging). Auth /// rules mirror `OperatorQuestions::cancel`: owner, operator, or /// manager. pub fn cancel_reminder_as(&self, id: i64, canceller: &str) -> Result { let conn = self.conn.lock().unwrap(); let owner: Option = conn .query_row( "SELECT agent FROM reminders WHERE id = ?1 AND sent_at IS NULL", params![id], |row| row.get(0), ) .optional()?; let Some(owner) = owner else { anyhow::bail!("reminder {id} not pending (already delivered or unknown)"); }; let authorised = canceller == owner || canceller == hive_sh4re::OPERATOR_RECIPIENT || canceller == hive_sh4re::MANAGER_AGENT; if !authorised { anyhow::bail!( "reminder {id}: '{canceller}' not allowed to cancel (owner = '{owner}')" ); } let n = conn.execute( "DELETE FROM reminders WHERE id = ?1 AND sent_at IS NULL", params![id], )?; if n == 0 { anyhow::bail!("reminder {id} vanished between auth check and delete"); } Ok(owner) } /// Get up to `limit` due reminders across all agents in a single query. /// Returns `(agent, id, message, file_path)` tuples. Pass a small limit /// (e.g. 100) so a burst of overdue reminders doesn't flood the broker /// in one cycle — leftovers stay due and get picked up on the next tick. pub fn get_due_reminders(&self, limit: u64) -> Result> { let conn = self.conn.lock().unwrap(); let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX); let max_attempts = i64::from(MAX_REMINDER_ATTEMPTS); // attempt_count >= cap = give up; row stays pending so the // operator sees + can retry/cancel via the dashboard. let mut stmt = conn.prepare( "SELECT agent, id, message, file_path FROM reminders \ WHERE due_at <= ?1 AND sent_at IS NULL AND attempt_count < ?3 \ ORDER BY agent, due_at ASC \ LIMIT ?2", )?; let rows = stmt.query_map(params![now_unix(), limit_i, max_attempts], |row| { Ok(( row.get::<_, String>(0)?, row.get::<_, i64>(1)?, row.get::<_, String>(2)?, row.get::<_, Option>(3)?, )) })?; rows.collect::>>() .context("query due reminders") } /// Atomic reminder delivery: insert the inbox message AND mark the /// reminder as sent in a single sqlite transaction. Prevents the /// orphan-reminder duplicate-delivery class of bugs that two separate /// calls (send + `mark_reminder_sent`) could produce if the second one /// failed transiently — the next scheduler tick would see the reminder /// still due and redeliver. Either both writes commit or neither does; /// re-running on failure is safe. /// /// Emits a `Sent` event on the broadcast channel after the transaction /// commits (so subscribers see the inbox message but never see a /// "phantom" send for a transaction that rolled back). pub fn deliver_reminder(&self, id: i64, agent: &str, message: &str) -> Result<()> { let now = now_unix(); let mut conn = self.conn.lock().unwrap(); let tx = conn.transaction()?; tx.execute( "INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)", params!["reminder", agent, message, now], )?; tx.execute( "UPDATE reminders SET sent_at = ?1 WHERE id = ?2", params![now, id], )?; tx.commit()?; drop(conn); let _ = self.events.send(MessageEvent::Sent { from: "reminder".to_owned(), to: agent.to_owned(), body: message.to_owned(), at: now, }); Ok(()) } } /// Idempotent messages-table migrations. Adds `acked_at` and /// back-fills it for every already-delivered row, so the /// pre-migration sessions count as "fully handled" and won't be /// resurfaced by the first `requeue_inflight` after upgrade. fn ensure_message_columns(conn: &Connection) -> Result<()> { let has: bool = conn .prepare("SELECT 1 FROM pragma_table_info('messages') WHERE name = 'acked_at'")? .exists([])?; if !has { conn.execute_batch("ALTER TABLE messages ADD COLUMN acked_at INTEGER;") .context("add messages.acked_at column")?; // Backfill: treat every existing delivered row as acked. The // session it was delivered to is gone, so requeue would just // surface phantom traffic to whatever harness reads next. conn.execute( "UPDATE messages SET acked_at = delivered_at \ WHERE delivered_at IS NOT NULL AND acked_at IS NULL", [], ) .context("backfill messages.acked_at from delivered_at")?; } Ok(()) } /// Idempotent reminder-table migrations. `ALTER TABLE ADD COLUMN` /// has no `IF NOT EXISTS` form in sqlite, so we probe /// `pragma_table_info` per column. New deploys (table created by /// SCHEMA in this commit cycle) skip the ALTER; pre-existing /// broker.sqlite files get the columns added on next boot. fn ensure_reminder_columns(conn: &Connection) -> Result<()> { for (name, sql) in [ ( "attempt_count", "ALTER TABLE reminders ADD COLUMN attempt_count INTEGER NOT NULL DEFAULT 0;", ), ( "last_error", "ALTER TABLE reminders ADD COLUMN last_error TEXT;", ), ] { let has: bool = conn .prepare(&format!( "SELECT 1 FROM pragma_table_info('reminders') WHERE name = '{name}'" ))? .exists([])?; if !has { conn.execute_batch(sql) .with_context(|| format!("add reminders.{name} column"))?; } } Ok(()) } fn now_unix() -> i64 { SystemTime::now() .duration_since(UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0) } #[cfg(test)] mod tests { use super::*; use std::sync::atomic::{AtomicU64, Ordering}; /// Per-process counter so each test gets a unique sqlite path even /// when threads run concurrently. Avoids pulling in a `tempfile` /// dep just for this one module. static TEST_COUNTER: AtomicU64 = AtomicU64::new(0); struct TmpBroker { path: std::path::PathBuf, pub broker: Broker, } impl Drop for TmpBroker { fn drop(&mut self) { let _ = std::fs::remove_file(&self.path); } } fn open_broker() -> TmpBroker { let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed); let pid = std::process::id(); let path = std::env::temp_dir().join(format!("hive-broker-test-{pid}-{n}.sqlite")); let _ = std::fs::remove_file(&path); let broker = Broker::open(&path).expect("open broker"); TmpBroker { path, broker } } fn msg(from: &str, to: &str, body: &str) -> Message { Message { from: from.to_owned(), to: to.to_owned(), body: body.to_owned(), } } /// Happy path: send → recv → `ack_turn` drains the in-memory list /// and marks the row `acked_at IS NOT NULL`. A second recv finds /// nothing pending (the row stays in the table for vacuum). #[test] fn ack_turn_marks_delivered_rows_acked() { let h = open_broker(); let broker = &h.broker; broker.send(&msg("a", "b", "hi")).unwrap(); let d = broker.recv("b").unwrap().expect("popped"); assert_eq!(d.message.body, "hi"); assert!(!d.redelivered); assert_eq!(broker.ack_turn("b").unwrap(), 1); // ack_turn drained the unacked list; calling again is a no-op. assert_eq!(broker.ack_turn("b").unwrap(), 0); // Recv finds nothing — the row is now delivered + acked. assert!(broker.recv("b").unwrap().is_none()); } /// Crash-recovery: send → recv → (no ack) → `requeue_inflight` /// resets `delivered_at` + tags the next pop as redelivered. After /// that `ack_turn` closes it out cleanly. #[test] fn requeue_inflight_resurfaces_unacked_with_redelivered_flag() { let h = open_broker(); let broker = &h.broker; broker.send(&msg("a", "b", "hi")).unwrap(); let d1 = broker.recv("b").unwrap().expect("popped"); assert!(!d1.redelivered); // Simulate harness crash: never call ack_turn. Now boot the // new harness — requeue_inflight resurfaces the row. assert_eq!(broker.requeue_inflight("b").unwrap(), 1); let d2 = broker.recv("b").unwrap().expect("popped again"); assert_eq!(d2.message.body, "hi"); assert!( d2.redelivered, "second pop should be tagged redelivered" ); assert_eq!(broker.ack_turn("b").unwrap(), 1); } /// Idempotency: a second `requeue_inflight` on the same recipient /// finds nothing because the prior call already reset /// `delivered_at` (the row is back in the pending state, not /// inflight). #[test] fn requeue_inflight_is_idempotent() { let h = open_broker(); let broker = &h.broker; broker.send(&msg("a", "b", "hi")).unwrap(); broker.recv("b").unwrap().expect("popped"); assert_eq!(broker.requeue_inflight("b").unwrap(), 1); // Second call: the row is pending (delivered_at IS NULL) so // nothing matches the inflight filter. assert_eq!(broker.requeue_inflight("b").unwrap(), 0); } /// Multiple messages, partial drain: pop two, `ack_turn` covers /// both even though one was popped before the other. #[test] fn ack_turn_handles_batch() { let h = open_broker(); let broker = &h.broker; broker.send(&msg("a", "b", "one")).unwrap(); broker.send(&msg("a", "b", "two")).unwrap(); broker.send(&msg("a", "b", "three")).unwrap(); broker.recv("b").unwrap().expect("popped 1"); broker.recv("b").unwrap().expect("popped 2"); broker.recv("b").unwrap().expect("popped 3"); assert_eq!(broker.ack_turn("b").unwrap(), 3); assert!(broker.recv("b").unwrap().is_none()); } /// Vacuum filter respects the new `acked_at` semantics — a /// delivered-but-not-acked row is NOT vacuumed regardless of /// age (the requeue path needs it). #[test] fn vacuum_preserves_unacked_inflight_rows() { let h = open_broker(); let broker = &h.broker; broker.send(&msg("a", "b", "stuck")).unwrap(); broker.recv("b").unwrap().expect("popped"); // Wide window — should still skip unacked rows. let removed = broker.vacuum_delivered(-i64::from(u8::MAX)).unwrap(); assert_eq!(removed, 0, "unacked inflight row must survive vacuum"); // After ack_turn the row is fair game. broker.ack_turn("b").unwrap(); let removed = broker.vacuum_delivered(-i64::from(u8::MAX)).unwrap(); assert_eq!(removed, 1, "acked row is now vacuumable"); } /// Recv ordering: requeued rows go back into FIFO position /// (they keep their original id). New sends added after the /// requeue arrive after them. #[test] fn requeued_rows_come_back_in_original_order() { let h = open_broker(); let broker = &h.broker; broker.send(&msg("a", "b", "first")).unwrap(); broker.send(&msg("a", "b", "second")).unwrap(); // Pop both, ack neither. broker.recv("b").unwrap().expect("popped 1"); broker.recv("b").unwrap().expect("popped 2"); broker.requeue_inflight("b").unwrap(); // Now add a brand new message AFTER the requeue. broker.send(&msg("a", "b", "third")).unwrap(); let d1 = broker.recv("b").unwrap().expect("re-pop 1"); assert_eq!(d1.message.body, "first"); assert!(d1.redelivered); let d2 = broker.recv("b").unwrap().expect("re-pop 2"); assert_eq!(d2.message.body, "second"); assert!(d2.redelivered); let d3 = broker.recv("b").unwrap().expect("re-pop 3"); assert_eq!(d3.message.body, "third"); assert!( !d3.redelivered, "fresh-send-after-requeue must NOT be tagged redelivered" ); } /// Happy path for `recv_batch`: pops in FIFO order, respects /// `max`, leaves the rest pending for the next call. #[test] fn recv_batch_pops_fifo_capped_at_max() { let h = open_broker(); let broker = &h.broker; for i in 0..5 { broker.send(&msg("a", "b", &format!("m{i}"))).unwrap(); } let batch = broker.recv_batch("b", 3).unwrap(); let bodies: Vec<_> = batch.iter().map(|d| d.message.body.as_str()).collect(); assert_eq!(bodies, vec!["m0", "m1", "m2"]); // Remaining two stay pending; a second batch drains them. let next = broker.recv_batch("b", 10).unwrap(); let bodies: Vec<_> = next.iter().map(|d| d.message.body.as_str()).collect(); assert_eq!(bodies, vec!["m3", "m4"]); // ack_turn closes out all five popped rows in one go. assert_eq!(broker.ack_turn("b").unwrap(), 5); } /// `recv_batch` with no pending traffic returns an empty vec /// (the "(empty)" path), not an error. #[test] fn recv_batch_returns_empty_when_idle() { let h = open_broker(); let batch = h.broker.recv_batch("ghost", 5).unwrap(); assert!(batch.is_empty()); } /// `max = 0` short-circuits without touching the DB (covered by /// asserting we don't accidentally pop a pending row). #[test] fn recv_batch_zero_max_pops_nothing() { let h = open_broker(); let broker = &h.broker; broker.send(&msg("a", "b", "stay")).unwrap(); assert!(broker.recv_batch("b", 0).unwrap().is_empty()); // The pending row is still in flight for the next real recv. let d = broker.recv("b").unwrap().expect("still pending"); assert_eq!(d.message.body, "stay"); } /// `recv_batch` tags requeued rows with `redelivered: true` and /// drains them from the per-recipient `requeued_ids` set so a /// fresh follow-up recv after the batch doesn't double-tag. #[test] fn recv_batch_propagates_redelivered_flag() { let h = open_broker(); let broker = &h.broker; broker.send(&msg("a", "b", "one")).unwrap(); broker.send(&msg("a", "b", "two")).unwrap(); broker.recv("b").unwrap().expect("popped 1"); broker.recv("b").unwrap().expect("popped 2"); broker.requeue_inflight("b").unwrap(); let batch = broker.recv_batch("b", 5).unwrap(); assert_eq!(batch.len(), 2); assert!(batch.iter().all(|d| d.redelivered)); // Fresh send after the batch is NOT tagged redelivered. broker.send(&msg("a", "b", "three")).unwrap(); let d = broker.recv("b").unwrap().expect("re-pop 3"); assert_eq!(d.message.body, "three"); assert!(!d.redelivered); } /// Per-recipient isolation: `requeue_inflight("a")` doesn't touch /// b's inflight rows. #[test] fn requeue_inflight_is_per_recipient() { let h = open_broker(); let broker = &h.broker; broker.send(&msg("x", "alice", "for alice")).unwrap(); broker.send(&msg("x", "bob", "for bob")).unwrap(); broker.recv("alice").unwrap().expect("popped alice"); broker.recv("bob").unwrap().expect("popped bob"); // Requeue only alice. Bob's row stays inflight. assert_eq!(broker.requeue_inflight("alice").unwrap(), 1); let d = broker.recv("alice").unwrap().expect("re-pop alice"); assert!(d.redelivered); // Bob has nothing pending (his row is still delivered, not requeued). assert!(broker.recv("bob").unwrap().is_none()); } }