//! Sqlite-backed message broker. Survives `hive-c0re` restart, and taps every //! send/recv onto a broadcast channel so the dashboard can stream it. use std::path::Path; use std::sync::Mutex; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result}; use hive_sh4re::{InboxRow, Message}; use rusqlite::{Connection, OptionalExtension, params}; use serde::Serialize; use tokio::sync::broadcast; const SCHEMA: &str = r" CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, sender TEXT NOT NULL, recipient TEXT NOT NULL, body TEXT NOT NULL, sent_at INTEGER NOT NULL, delivered_at INTEGER ); CREATE INDEX IF NOT EXISTS idx_messages_undelivered ON messages (recipient, id) WHERE delivered_at IS NULL; CREATE TABLE IF NOT EXISTS reminders ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent TEXT NOT NULL, message TEXT NOT NULL, file_path TEXT, due_at INTEGER NOT NULL, created_at INTEGER NOT NULL, sent_at INTEGER ); CREATE INDEX IF NOT EXISTS idx_reminders_due ON reminders (agent, due_at) WHERE sent_at IS NULL; "; /// Capacity of the live event channel. Slow subscribers (e.g. an idle browser) /// may drop events past this; we send a `lagged` notice in their stream. const EVENT_CHANNEL: usize = 256; /// Row shape returned by [`Broker::get_due_reminders`]: /// `(agent, reminder_id, message, file_path)`. Type alias keeps /// `clippy::type_complexity` quiet and makes the scheduler call site /// self-documenting. pub type DueReminder = (String, i64, String, Option); #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "snake_case", tag = "kind")] pub enum MessageEvent { Sent { seq: u64, from: String, to: String, body: String, at: i64, }, Delivered { seq: u64, from: String, to: String, body: String, at: i64, }, } pub struct Broker { conn: Mutex, events: broadcast::Sender, /// Monotonic per-process counter stamped onto every emitted /// `MessageEvent`. Persisted nowhere — clients always treat a hive-c0re /// restart as "everything is new" (fresh snapshot, fresh stream of /// seqs starting at 1). Historical rows replayed via `recent_all` /// carry `seq = 0` since they predate the live stream the seq is /// meant to dedupe against. event_seq: AtomicU64, } impl Broker { pub fn open(path: &Path) -> Result { if let Some(parent) = path.parent() { std::fs::create_dir_all(parent) .with_context(|| format!("create db parent {}", parent.display()))?; } let conn = Connection::open(path).with_context(|| format!("open broker db {}", path.display()))?; conn.execute_batch(SCHEMA).context("apply broker schema")?; let (events, _) = broadcast::channel(EVENT_CHANNEL); Ok(Self { conn: Mutex::new(conn), events, event_seq: AtomicU64::new(0), }) } pub fn subscribe(&self) -> broadcast::Receiver { self.events.subscribe() } /// Current high-water seq. Snapshot endpoints read this *before* /// gathering state so the resulting (snapshot.seq, snapshot) pair /// satisfies: any live event with seq > snapshot.seq is post-snapshot /// (not yet reflected); any with seq <= snapshot.seq either pre-dates /// the snapshot or was already captured by it. Clients dedupe their /// buffered SSE traffic against this value. pub fn current_seq(&self) -> u64 { self.event_seq.load(Ordering::SeqCst) } fn next_seq(&self) -> u64 { self.event_seq.fetch_add(1, Ordering::SeqCst) + 1 } pub fn send(&self, message: &Message) -> Result<()> { let conn = self.conn.lock().unwrap(); conn.execute( "INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)", params![message.from, message.to, message.body, now_unix()], )?; drop(conn); let _ = self.events.send(MessageEvent::Sent { seq: self.next_seq(), from: message.from.clone(), to: message.to.clone(), body: message.body.clone(), at: now_unix(), }); Ok(()) } /// Latest `limit` messages addressed to `recipient`, newest-first. /// Includes delivered + undelivered alike — used for the operator /// inbox view on the dashboard. Caller decides what to show. pub fn recent_for(&self, recipient: &str, limit: u64) -> Result> { let conn = self.conn.lock().unwrap(); let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX); let mut stmt = conn.prepare( "SELECT id, sender, body, sent_at FROM messages WHERE recipient = ?1 ORDER BY id DESC LIMIT ?2", )?; let rows = stmt.query_map(params![recipient, limit_i], |row| { Ok(InboxRow { id: row.get(0)?, from: row.get(1)?, body: row.get(2)?, at: row.get(3)?, }) })?; rows.collect::>>() .map_err(Into::into) } /// Latest `limit` messages across every recipient, newest-first. /// Backs the dashboard's message-flow backfill so a reload doesn't /// blank the operator's view of recent traffic. Returns each row as /// a [`MessageEvent::Sent`] so the dashboard's live renderer (which /// already speaks `MessageEvent`) can replay history through the /// same code path. We don't synthesise `Delivered` events here — /// the recv-side acks live in a different table column and would /// double-render on backfill; the live stream picks them up /// immediately on the first new `recv`. pub fn recent_all(&self, limit: u64) -> Result> { let conn = self.conn.lock().unwrap(); let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX); let mut stmt = conn.prepare( "SELECT sender, recipient, body, sent_at FROM messages ORDER BY id DESC LIMIT ?1", )?; let rows = stmt.query_map(params![limit_i], |row| { Ok(MessageEvent::Sent { // Historical events: seq=0 (never compared against live // seqs). Live dedupe windows close against // history_seq = broker.current_seq() captured at fetch // time, not against per-row seqs. seq: 0, from: row.get(0)?, to: row.get(1)?, body: row.get(2)?, at: row.get(3)?, }) })?; rows.collect::>>() .map_err(Into::into) } /// Number of undelivered messages addressed to `recipient`. Non-mutating /// — used by the harness to surface "N unread" in tool-result status /// lines without popping the queue. pub fn count_pending(&self, recipient: &str) -> Result { let conn = self.conn.lock().unwrap(); let n: i64 = conn.query_row( "SELECT COUNT(*) FROM messages WHERE recipient = ?1 AND delivered_at IS NULL", params![recipient], |row| row.get(0), )?; Ok(u64::try_from(n.max(0)).unwrap_or(0)) } /// Long-poll variant of `recv`: returns immediately if there's a /// pending message; otherwise waits up to `timeout` for the broker to /// emit a `Sent { to: recipient }` event, then retries the pop. Lets /// agents react to new mail without polling their socket on a fixed /// interval. /// /// **Subscribe-before-check order matters.** If we polled the sqlite /// row first and only then called `subscribe()`, a concurrent `send` /// landing in that window would commit + broadcast its event *before* /// our receiver existed — and we'd then sit on the long-poll until /// the timeout (or another, unrelated send) fired. That looked /// externally like "the agent processed one wake then went deaf /// until the operator poked it again". Subscribing first guarantees /// any post-subscribe send notifies us; the redundant `recv()` /// catches the message either way. pub async fn recv_blocking( &self, recipient: &str, timeout: std::time::Duration, ) -> Result> { let mut rx = self.subscribe(); if let Some(m) = self.recv(recipient)? { return Ok(Some(m)); } let deadline = tokio::time::Instant::now() + timeout; loop { let Some(remaining) = deadline.checked_duration_since(tokio::time::Instant::now()) else { return Ok(None); }; match tokio::time::timeout(remaining, rx.recv()).await { Err(_) => return Ok(None), // Channel lagged or closed — fall back to a single direct // pop (in case we missed our notification while behind). Ok(Err(_)) => return self.recv(recipient), Ok(Ok(MessageEvent::Sent { to, .. })) if to == recipient => { if let Some(m) = self.recv(recipient)? { return Ok(Some(m)); } // Lost a race (concurrent recv elsewhere). Keep waiting. } Ok(Ok(_)) => {} } } } /// Delete delivered messages older than `older_than_secs`. Undelivered /// rows are always kept regardless of age — those are still in flight /// from the broker's POV. Returns the number of rows removed. pub fn vacuum_delivered(&self, older_than_secs: i64) -> Result { let cutoff = now_unix() - older_than_secs; let conn = self.conn.lock().unwrap(); let n = conn.execute( "DELETE FROM messages WHERE delivered_at IS NOT NULL AND delivered_at < ?1", params![cutoff], )?; Ok(u64::try_from(n).unwrap_or(0)) } pub fn recv(&self, recipient: &str) -> Result> { let conn = self.conn.lock().unwrap(); let row: Option<(i64, String, String, String)> = conn .query_row( "SELECT id, sender, recipient, body FROM messages WHERE recipient = ?1 AND delivered_at IS NULL ORDER BY id ASC LIMIT 1", params![recipient], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), ) .optional()?; let Some((id, from, to, body)) = row else { return Ok(None); }; conn.execute( "UPDATE messages SET delivered_at = ?1 WHERE id = ?2", params![now_unix(), id], )?; drop(conn); let _ = self.events.send(MessageEvent::Delivered { seq: self.next_seq(), from: from.clone(), to: to.clone(), body: body.clone(), at: now_unix(), }); Ok(Some(Message { from, to, body })) } /// Store a new reminder. Returns the reminder id. pub fn store_reminder( &self, agent: &str, message: &str, file_path: Option<&str>, due_at: i64, ) -> Result { let conn = self.conn.lock().unwrap(); conn.execute( "INSERT INTO reminders (agent, message, file_path, due_at, created_at) VALUES (?1, ?2, ?3, ?4, ?5)", params![agent, message, file_path, due_at, now_unix()], )?; let id = conn.last_insert_rowid(); Ok(id) } /// Get up to `limit` due reminders across all agents in a single query. /// Returns `(agent, id, message, file_path)` tuples. Pass a small limit /// (e.g. 100) so a burst of overdue reminders doesn't flood the broker /// in one cycle — leftovers stay due and get picked up on the next tick. pub fn get_due_reminders(&self, limit: u64) -> Result> { let conn = self.conn.lock().unwrap(); let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX); let mut stmt = conn.prepare( "SELECT agent, id, message, file_path FROM reminders \ WHERE due_at <= ?1 AND sent_at IS NULL \ ORDER BY agent, due_at ASC \ LIMIT ?2", )?; let rows = stmt.query_map(params![now_unix(), limit_i], |row| { Ok(( row.get::<_, String>(0)?, row.get::<_, i64>(1)?, row.get::<_, String>(2)?, row.get::<_, Option>(3)?, )) })?; rows.collect::>>() .context("query due reminders") } /// Atomic reminder delivery: insert the inbox message AND mark the /// reminder as sent in a single sqlite transaction. Prevents the /// orphan-reminder duplicate-delivery class of bugs that two separate /// calls (send + `mark_reminder_sent`) could produce if the second one /// failed transiently — the next scheduler tick would see the reminder /// still due and redeliver. Either both writes commit or neither does; /// re-running on failure is safe. /// /// Emits a `Sent` event on the broadcast channel after the transaction /// commits (so subscribers see the inbox message but never see a /// "phantom" send for a transaction that rolled back). pub fn deliver_reminder(&self, id: i64, agent: &str, message: &str) -> Result<()> { let now = now_unix(); let mut conn = self.conn.lock().unwrap(); let tx = conn.transaction()?; tx.execute( "INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)", params!["reminder", agent, message, now], )?; tx.execute( "UPDATE reminders SET sent_at = ?1 WHERE id = ?2", params![now, id], )?; tx.commit()?; drop(conn); let _ = self.events.send(MessageEvent::Sent { seq: self.next_seq(), from: "reminder".to_owned(), to: agent.to_owned(), body: message.to_owned(), at: now, }); Ok(()) } } fn now_unix() -> i64 { SystemTime::now() .duration_since(UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0) }