reminder: atomic delivery transaction + per-tick batch cap

This commit is contained in:
damocles 2026-05-17 02:51:05 +02:00
parent e45d161cb8
commit b86c0a2217
2 changed files with 67 additions and 33 deletions

View file

@ -40,6 +40,12 @@ CREATE INDEX IF NOT EXISTS idx_reminders_due
/// may drop events past this; we send a `lagged` notice in their stream. /// may drop events past this; we send a `lagged` notice in their stream.
const EVENT_CHANNEL: usize = 256; const EVENT_CHANNEL: usize = 256;
/// Row shape returned by [`Broker::get_due_reminders`]:
/// `(agent, reminder_id, message, file_path)`. Type alias keeps
/// `clippy::type_complexity` quiet and makes the scheduler call site
/// self-documenting.
pub type DueReminder = (String, i64, String, Option<String>);
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "snake_case", tag = "kind")] #[serde(rename_all = "snake_case", tag = "kind")]
pub enum MessageEvent { pub enum MessageEvent {
@ -245,16 +251,20 @@ impl Broker {
Ok(id) Ok(id)
} }
/// Get all reminders for an agent that are due now or in the past. /// Get up to `limit` due reminders across all agents in a single query.
/// Returns (id, message, file_path) tuples. /// Returns `(agent, id, message, file_path)` tuples. Pass a small limit
/// Get all due reminders across all agents in a single query. /// (e.g. 100) so a burst of overdue reminders doesn't flood the broker
/// Returns a vec of (agent, id, message, file_path) tuples. /// in one cycle — leftovers stay due and get picked up on the next tick.
pub fn get_all_due_reminders(&self) -> Result<Vec<(String, i64, String, Option<String>)>> { pub fn get_due_reminders(&self, limit: u64) -> Result<Vec<DueReminder>> {
let conn = self.conn.lock().unwrap(); let conn = self.conn.lock().unwrap();
let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX);
let mut stmt = conn.prepare( let mut stmt = conn.prepare(
"SELECT agent, id, message, file_path FROM reminders WHERE due_at <= ?1 AND sent_at IS NULL ORDER BY agent, due_at ASC" "SELECT agent, id, message, file_path FROM reminders \
WHERE due_at <= ?1 AND sent_at IS NULL \
ORDER BY agent, due_at ASC \
LIMIT ?2",
)?; )?;
let rows = stmt.query_map(params![now_unix()], |row| { let rows = stmt.query_map(params![now_unix(), limit_i], |row| {
Ok(( Ok((
row.get::<_, String>(0)?, row.get::<_, String>(0)?,
row.get::<_, i64>(1)?, row.get::<_, i64>(1)?,
@ -263,16 +273,40 @@ impl Broker {
)) ))
})?; })?;
rows.collect::<rusqlite::Result<Vec<_>>>() rows.collect::<rusqlite::Result<Vec<_>>>()
.context("query all due reminders") .context("query due reminders")
} }
/// Mark a reminder as sent (delivered). /// Atomic reminder delivery: insert the inbox message AND mark the
pub fn mark_reminder_sent(&self, id: i64) -> Result<()> { /// reminder as sent in a single sqlite transaction. Prevents the
let conn = self.conn.lock().unwrap(); /// orphan-reminder duplicate-delivery class of bugs that two separate
conn.execute( /// calls (send + `mark_reminder_sent`) could produce if the second one
"UPDATE reminders SET sent_at = ?1 WHERE id = ?2", /// failed transiently — the next scheduler tick would see the reminder
params![now_unix(), id], /// still due and redeliver. Either both writes commit or neither does;
/// re-running on failure is safe.
///
/// Emits a `Sent` event on the broadcast channel after the transaction
/// commits (so subscribers see the inbox message but never see a
/// "phantom" send for a transaction that rolled back).
pub fn deliver_reminder(&self, id: i64, agent: &str, message: &str) -> Result<()> {
let now = now_unix();
let mut conn = self.conn.lock().unwrap();
let tx = conn.transaction()?;
tx.execute(
"INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)",
params!["reminder", agent, message, now],
)?; )?;
tx.execute(
"UPDATE reminders SET sent_at = ?1 WHERE id = ?2",
params![now, id],
)?;
tx.commit()?;
drop(conn);
let _ = self.events.send(MessageEvent::Sent {
from: "reminder".to_owned(),
to: agent.to_owned(),
body: message.to_owned(),
at: now,
});
Ok(()) Ok(())
} }
} }

View file

@ -86,6 +86,12 @@ enum Cmd {
Deny { id: i64 }, Deny { id: i64 },
} }
/// Per-tick cap on reminders the scheduler delivers. Anything over this
/// stays due in the table and gets picked up on the next 5s tick — keeps
/// a 10k-deep backlog from flooding the broker (or hogging its mutex) in
/// one shot.
const REMINDER_BATCH_LIMIT: u64 = 100;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
tracing_subscriber::fmt() tracing_subscriber::fmt()
@ -166,36 +172,30 @@ async fn main() -> Result<()> {
// operator-initiated transient state. // operator-initiated transient state.
crash_watch::spawn(coord.clone()); crash_watch::spawn(coord.clone());
// Reminder scheduler: checks for due reminders every 5 seconds, // Reminder scheduler: checks for due reminders every 5 seconds,
// delivers them as inbox messages from "reminder". // delivers them atomically (insert inbox + mark sent in one
// sqlite transaction so a transient failure on the second step
// can never produce a duplicate next tick). Per-cycle batch
// limit caps the burst — leftover reminders stay due and get
// picked up on the next tick instead of monopolising the broker
// mutex.
let reminder_coord = coord.clone(); let reminder_coord = coord.clone();
tokio::spawn(async move { tokio::spawn(async move {
use hive_sh4re::Message;
loop { loop {
// Query all due reminders in a single DB call match reminder_coord
match reminder_coord.broker.get_all_due_reminders() { .broker
.get_due_reminders(REMINDER_BATCH_LIMIT)
{
Ok(reminders) => { Ok(reminders) => {
for (agent, id, message, _file_path) in reminders { for (agent, id, message, _file_path) in reminders {
// Deliver as inbox message from "reminder" if let Err(e) =
if let Err(e) = reminder_coord.broker.send(&Message { reminder_coord.broker.deliver_reminder(id, &agent, &message)
from: "reminder".to_owned(), {
to: agent.clone(),
body: message.clone(),
}) {
tracing::warn!( tracing::warn!(
reminder_id = id, reminder_id = id,
%agent, %agent,
error = ?e, error = ?e,
"failed to deliver reminder" "failed to deliver reminder"
); );
continue;
}
// Mark as sent
if let Err(e) = reminder_coord.broker.mark_reminder_sent(id) {
tracing::warn!(
reminder_id = id,
error = ?e,
"failed to mark reminder sent"
);
} }
} }
} }