broker: batch reminder delivery in single db transaction
This commit is contained in:
parent
3c672ed6b2
commit
931d4b26e7
2 changed files with 86 additions and 28 deletions
|
|
@ -717,27 +717,74 @@ impl Broker {
|
||||||
/// Emits a `Sent` event on the broadcast channel after the transaction
|
/// Emits a `Sent` event on the broadcast channel after the transaction
|
||||||
/// commits (so subscribers see the inbox message but never see a
|
/// commits (so subscribers see the inbox message but never see a
|
||||||
/// "phantom" send for a transaction that rolled back).
|
/// "phantom" send for a transaction that rolled back).
|
||||||
pub fn deliver_reminder(&self, id: i64, agent: &str, message: &str) -> Result<()> {
|
/// Deliver a batch of reminders in a single transaction, reducing
|
||||||
|
/// lock contention on the shared sqlite connection under high
|
||||||
|
/// reminder volume. Returns per-item results so the scheduler can
|
||||||
|
/// record individual failures without aborting successful ones.
|
||||||
|
///
|
||||||
|
/// Items where the INSERT+UPDATE succeeds get a `MessageEvent::Sent`
|
||||||
|
/// emitted after the transaction commits. Items that fail are
|
||||||
|
/// returned as `Err` in the output vec (index-aligned with input).
|
||||||
|
pub fn deliver_reminders_batch(
|
||||||
|
&self,
|
||||||
|
items: &[(i64, String, String)], // (reminder_id, agent, body)
|
||||||
|
) -> Vec<Result<()>> {
|
||||||
|
if items.is_empty() {
|
||||||
|
return Vec::new();
|
||||||
|
}
|
||||||
let now = now_unix();
|
let now = now_unix();
|
||||||
let mut conn = self.conn.lock().unwrap();
|
let mut conn = self.conn.lock().unwrap();
|
||||||
let tx = conn.transaction()?;
|
// Build one transaction for all deliveries so we hold the lock
|
||||||
tx.execute(
|
// once rather than N times. On a batch-level error (e.g. DB
|
||||||
"INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)",
|
// corruption), fall back to returning per-item errors so the
|
||||||
params!["reminder", agent, message, now],
|
// scheduler records the failure cleanly.
|
||||||
)?;
|
let tx = match conn.transaction() {
|
||||||
tx.execute(
|
Ok(t) => t,
|
||||||
"UPDATE reminders SET sent_at = ?1 WHERE id = ?2",
|
Err(e) => {
|
||||||
params![now, id],
|
let err_str = format!("{e:#}");
|
||||||
)?;
|
return items
|
||||||
tx.commit()?;
|
.iter()
|
||||||
|
.map(|_| Err(anyhow::anyhow!("{}", err_str.clone())))
|
||||||
|
.collect();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let mut results: Vec<Result<()>> = Vec::with_capacity(items.len());
|
||||||
|
for (id, agent, body) in items {
|
||||||
|
let r = (|| -> Result<()> {
|
||||||
|
tx.execute(
|
||||||
|
"INSERT INTO messages (sender, recipient, body, sent_at) \
|
||||||
|
VALUES (?1, ?2, ?3, ?4)",
|
||||||
|
params!["reminder", agent, body, now],
|
||||||
|
)?;
|
||||||
|
tx.execute(
|
||||||
|
"UPDATE reminders SET sent_at = ?1 WHERE id = ?2",
|
||||||
|
params![now, id],
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
})();
|
||||||
|
results.push(r);
|
||||||
|
}
|
||||||
|
if let Err(e) = tx.commit() {
|
||||||
|
let err_str = format!("{e:#}");
|
||||||
|
return items
|
||||||
|
.iter()
|
||||||
|
.map(|_| Err(anyhow::anyhow!("{}", err_str.clone())))
|
||||||
|
.collect();
|
||||||
|
}
|
||||||
drop(conn);
|
drop(conn);
|
||||||
let _ = self.events.send(MessageEvent::Sent {
|
// Emit per-row Sent events (only for rows that succeeded).
|
||||||
from: "reminder".to_owned(),
|
for ((id, agent, body), result) in items.iter().zip(results.iter()) {
|
||||||
to: agent.to_owned(),
|
if result.is_ok() {
|
||||||
body: message.to_owned(),
|
let _ = self.events.send(MessageEvent::Sent {
|
||||||
at: now,
|
from: "reminder".to_owned(),
|
||||||
});
|
to: agent.clone(),
|
||||||
Ok(())
|
body: body.clone(),
|
||||||
|
at: now,
|
||||||
|
});
|
||||||
|
tracing::debug!(reminder_id = id, %agent, "reminder delivered");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
results
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -27,8 +27,8 @@
|
||||||
//! small and the bulky payload can be read out of band.
|
//! small and the bulky payload can be read out of band.
|
||||||
//!
|
//!
|
||||||
//! Atomicity of the inbox INSERT + `reminders.sent_at` UPDATE is handled
|
//! Atomicity of the inbox INSERT + `reminders.sent_at` UPDATE is handled
|
||||||
//! inside `Broker::deliver_reminder`; this module only computes the
|
//! inside `Broker::deliver_reminders_batch`; this module only computes the
|
||||||
//! body string before calling it.
|
//! body strings before calling it.
|
||||||
|
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::os::unix::fs::OpenOptionsExt;
|
use std::os::unix::fs::OpenOptionsExt;
|
||||||
|
|
@ -75,9 +75,23 @@ fn tick(coord: &Arc<Coordinator>) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
for (agent, id, message, file_path) in due {
|
if due.is_empty() {
|
||||||
let body = prepare_body(&agent, &message, file_path.as_deref());
|
return;
|
||||||
if let Err(e) = coord.broker.deliver_reminder(id, &agent, &body) {
|
}
|
||||||
|
// Resolve body strings (file-path writes / inline) before entering
|
||||||
|
// the batch transaction so the DB lock is held as briefly as possible.
|
||||||
|
let items: Vec<(i64, String, String)> = due
|
||||||
|
.iter()
|
||||||
|
.map(|(agent, id, message, file_path)| {
|
||||||
|
let body = prepare_body(agent, message, file_path.as_deref());
|
||||||
|
(*id, agent.clone(), body)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
// Single-transaction batch: one DB lock acquisition for N reminders
|
||||||
|
// instead of N sequential lock/unlock cycles.
|
||||||
|
let results = coord.broker.deliver_reminders_batch(&items);
|
||||||
|
for ((id, agent, _body), result) in items.iter().zip(results.iter()) {
|
||||||
|
if let Err(e) = result {
|
||||||
let reason = format!("{e:#}");
|
let reason = format!("{e:#}");
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
reminder_id = id,
|
reminder_id = id,
|
||||||
|
|
@ -85,11 +99,8 @@ fn tick(coord: &Arc<Coordinator>) {
|
||||||
error = %reason,
|
error = %reason,
|
||||||
"failed to deliver reminder"
|
"failed to deliver reminder"
|
||||||
);
|
);
|
||||||
// Persist the failure so the dashboard can surface it +
|
// Persist the failure so the dashboard can surface it.
|
||||||
// bump attempt_count. After MAX_REMINDER_ATTEMPTS the
|
if let Err(persist_err) = coord.broker.record_reminder_failure(*id, &reason) {
|
||||||
// row drops out of `get_due_reminders` and waits for
|
|
||||||
// operator retry / cancel.
|
|
||||||
if let Err(persist_err) = coord.broker.record_reminder_failure(id, &reason) {
|
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
reminder_id = id,
|
reminder_id = id,
|
||||||
error = ?persist_err,
|
error = ?persist_err,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue