diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 54c4002..d5a8442 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -717,27 +717,74 @@ impl Broker { /// Emits a `Sent` event on the broadcast channel after the transaction /// commits (so subscribers see the inbox message but never see a /// "phantom" send for a transaction that rolled back). - pub fn deliver_reminder(&self, id: i64, agent: &str, message: &str) -> Result<()> { + /// Deliver a batch of reminders in a single transaction, reducing + /// lock contention on the shared sqlite connection under high + /// reminder volume. Returns per-item results so the scheduler can + /// record individual failures without aborting successful ones. + /// + /// Items where the INSERT+UPDATE succeeds get a `MessageEvent::Sent` + /// emitted after the transaction commits. Items that fail are + /// returned as `Err` in the output vec (index-aligned with input). + pub fn deliver_reminders_batch( + &self, + items: &[(i64, String, String)], // (reminder_id, agent, body) + ) -> Vec> { + if items.is_empty() { + return Vec::new(); + } let now = now_unix(); let mut conn = self.conn.lock().unwrap(); - let tx = conn.transaction()?; - tx.execute( - "INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)", - params!["reminder", agent, message, now], - )?; - tx.execute( - "UPDATE reminders SET sent_at = ?1 WHERE id = ?2", - params![now, id], - )?; - tx.commit()?; + // Build one transaction for all deliveries so we hold the lock + // once rather than N times. On a batch-level error (e.g. DB + // corruption), fall back to returning per-item errors so the + // scheduler records the failure cleanly. + let tx = match conn.transaction() { + Ok(t) => t, + Err(e) => { + let err_str = format!("{e:#}"); + return items + .iter() + .map(|_| Err(anyhow::anyhow!("{}", err_str.clone()))) + .collect(); + } + }; + let mut results: Vec> = Vec::with_capacity(items.len()); + for (id, agent, body) in items { + let r = (|| -> Result<()> { + tx.execute( + "INSERT INTO messages (sender, recipient, body, sent_at) \ + VALUES (?1, ?2, ?3, ?4)", + params!["reminder", agent, body, now], + )?; + tx.execute( + "UPDATE reminders SET sent_at = ?1 WHERE id = ?2", + params![now, id], + )?; + Ok(()) + })(); + results.push(r); + } + if let Err(e) = tx.commit() { + let err_str = format!("{e:#}"); + return items + .iter() + .map(|_| Err(anyhow::anyhow!("{}", err_str.clone()))) + .collect(); + } drop(conn); - let _ = self.events.send(MessageEvent::Sent { - from: "reminder".to_owned(), - to: agent.to_owned(), - body: message.to_owned(), - at: now, - }); - Ok(()) + // Emit per-row Sent events (only for rows that succeeded). + for ((id, agent, body), result) in items.iter().zip(results.iter()) { + if result.is_ok() { + let _ = self.events.send(MessageEvent::Sent { + from: "reminder".to_owned(), + to: agent.clone(), + body: body.clone(), + at: now, + }); + tracing::debug!(reminder_id = id, %agent, "reminder delivered"); + } + } + results } } diff --git a/hive-c0re/src/reminder_scheduler.rs b/hive-c0re/src/reminder_scheduler.rs index 95c53ff..6cc64db 100644 --- a/hive-c0re/src/reminder_scheduler.rs +++ b/hive-c0re/src/reminder_scheduler.rs @@ -27,8 +27,8 @@ //! small and the bulky payload can be read out of band. //! //! Atomicity of the inbox INSERT + `reminders.sent_at` UPDATE is handled -//! inside `Broker::deliver_reminder`; this module only computes the -//! body string before calling it. +//! inside `Broker::deliver_reminders_batch`; this module only computes the +//! body strings before calling it. use std::io::Write; use std::os::unix::fs::OpenOptionsExt; @@ -75,9 +75,23 @@ fn tick(coord: &Arc) { return; } }; - for (agent, id, message, file_path) in due { - let body = prepare_body(&agent, &message, file_path.as_deref()); - if let Err(e) = coord.broker.deliver_reminder(id, &agent, &body) { + if due.is_empty() { + return; + } + // Resolve body strings (file-path writes / inline) before entering + // the batch transaction so the DB lock is held as briefly as possible. + let items: Vec<(i64, String, String)> = due + .iter() + .map(|(agent, id, message, file_path)| { + let body = prepare_body(agent, message, file_path.as_deref()); + (*id, agent.clone(), body) + }) + .collect(); + // Single-transaction batch: one DB lock acquisition for N reminders + // instead of N sequential lock/unlock cycles. + let results = coord.broker.deliver_reminders_batch(&items); + for ((id, agent, _body), result) in items.iter().zip(results.iter()) { + if let Err(e) = result { let reason = format!("{e:#}"); tracing::warn!( reminder_id = id, @@ -85,11 +99,8 @@ fn tick(coord: &Arc) { error = %reason, "failed to deliver reminder" ); - // Persist the failure so the dashboard can surface it + - // bump attempt_count. After MAX_REMINDER_ATTEMPTS the - // row drops out of `get_due_reminders` and waits for - // operator retry / cancel. - if let Err(persist_err) = coord.broker.record_reminder_failure(id, &reason) { + // Persist the failure so the dashboard can surface it. + if let Err(persist_err) = coord.broker.record_reminder_failure(*id, &reason) { tracing::warn!( reminder_id = id, error = ?persist_err,