diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index c908558..b9d6a20 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -40,6 +40,12 @@ CREATE INDEX IF NOT EXISTS idx_reminders_due /// may drop events past this; we send a `lagged` notice in their stream. const EVENT_CHANNEL: usize = 256; +/// Row shape returned by [`Broker::get_due_reminders`]: +/// `(agent, reminder_id, message, file_path)`. Type alias keeps +/// `clippy::type_complexity` quiet and makes the scheduler call site +/// self-documenting. +pub type DueReminder = (String, i64, String, Option); + #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "snake_case", tag = "kind")] pub enum MessageEvent { @@ -245,16 +251,20 @@ impl Broker { Ok(id) } - /// Get all reminders for an agent that are due now or in the past. - /// Returns (id, message, file_path) tuples. - /// Get all due reminders across all agents in a single query. - /// Returns a vec of (agent, id, message, file_path) tuples. - pub fn get_all_due_reminders(&self) -> Result)>> { + /// Get up to `limit` due reminders across all agents in a single query. + /// Returns `(agent, id, message, file_path)` tuples. Pass a small limit + /// (e.g. 100) so a burst of overdue reminders doesn't flood the broker + /// in one cycle — leftovers stay due and get picked up on the next tick. + pub fn get_due_reminders(&self, limit: u64) -> Result> { let conn = self.conn.lock().unwrap(); + let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX); let mut stmt = conn.prepare( - "SELECT agent, id, message, file_path FROM reminders WHERE due_at <= ?1 AND sent_at IS NULL ORDER BY agent, due_at ASC" + "SELECT agent, id, message, file_path FROM reminders \ + WHERE due_at <= ?1 AND sent_at IS NULL \ + ORDER BY agent, due_at ASC \ + LIMIT ?2", )?; - let rows = stmt.query_map(params![now_unix()], |row| { + let rows = stmt.query_map(params![now_unix(), limit_i], |row| { Ok(( row.get::<_, String>(0)?, row.get::<_, i64>(1)?, @@ -263,16 +273,40 @@ impl Broker { )) })?; rows.collect::>>() - .context("query all due reminders") + .context("query due reminders") } - /// Mark a reminder as sent (delivered). - pub fn mark_reminder_sent(&self, id: i64) -> Result<()> { - let conn = self.conn.lock().unwrap(); - conn.execute( - "UPDATE reminders SET sent_at = ?1 WHERE id = ?2", - params![now_unix(), id], + /// Atomic reminder delivery: insert the inbox message AND mark the + /// reminder as sent in a single sqlite transaction. Prevents the + /// orphan-reminder duplicate-delivery class of bugs that two separate + /// calls (send + `mark_reminder_sent`) could produce if the second one + /// failed transiently — the next scheduler tick would see the reminder + /// still due and redeliver. Either both writes commit or neither does; + /// re-running on failure is safe. + /// + /// Emits a `Sent` event on the broadcast channel after the transaction + /// commits (so subscribers see the inbox message but never see a + /// "phantom" send for a transaction that rolled back). + pub fn deliver_reminder(&self, id: i64, agent: &str, message: &str) -> Result<()> { + let now = now_unix(); + let mut conn = self.conn.lock().unwrap(); + let tx = conn.transaction()?; + tx.execute( + "INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)", + params!["reminder", agent, message, now], )?; + tx.execute( + "UPDATE reminders SET sent_at = ?1 WHERE id = ?2", + params![now, id], + )?; + tx.commit()?; + drop(conn); + let _ = self.events.send(MessageEvent::Sent { + from: "reminder".to_owned(), + to: agent.to_owned(), + body: message.to_owned(), + at: now, + }); Ok(()) } } diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 3ebbabe..ea84f8c 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -86,6 +86,12 @@ enum Cmd { Deny { id: i64 }, } +/// Per-tick cap on reminders the scheduler delivers. Anything over this +/// stays due in the table and gets picked up on the next 5s tick — keeps +/// a 10k-deep backlog from flooding the broker (or hogging its mutex) in +/// one shot. +const REMINDER_BATCH_LIMIT: u64 = 100; + #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() @@ -166,36 +172,30 @@ async fn main() -> Result<()> { // operator-initiated transient state. crash_watch::spawn(coord.clone()); // Reminder scheduler: checks for due reminders every 5 seconds, - // delivers them as inbox messages from "reminder". + // delivers them atomically (insert inbox + mark sent in one + // sqlite transaction so a transient failure on the second step + // can never produce a duplicate next tick). Per-cycle batch + // limit caps the burst — leftover reminders stay due and get + // picked up on the next tick instead of monopolising the broker + // mutex. let reminder_coord = coord.clone(); tokio::spawn(async move { - use hive_sh4re::Message; loop { - // Query all due reminders in a single DB call - match reminder_coord.broker.get_all_due_reminders() { + match reminder_coord + .broker + .get_due_reminders(REMINDER_BATCH_LIMIT) + { Ok(reminders) => { for (agent, id, message, _file_path) in reminders { - // Deliver as inbox message from "reminder" - if let Err(e) = reminder_coord.broker.send(&Message { - from: "reminder".to_owned(), - to: agent.clone(), - body: message.clone(), - }) { + if let Err(e) = + reminder_coord.broker.deliver_reminder(id, &agent, &message) + { tracing::warn!( reminder_id = id, %agent, error = ?e, "failed to deliver reminder" ); - continue; - } - // Mark as sent - if let Err(e) = reminder_coord.broker.mark_reminder_sent(id) { - tracing::warn!( - reminder_id = id, - error = ?e, - "failed to mark reminder sent" - ); } } }