Broker schema gains attempt_count INTEGER + last_error TEXT
columns via idempotent ALTER TABLE migration (pragma-probed so
fresh + existing dbs converge). reminder_scheduler::tick calls
record_reminder_failure on every deliver_reminder error,
bumping the counter + stashing the message. get_due_reminders
filters out rows where attempt_count >= MAX_REMINDER_ATTEMPTS
(5) so the scheduler stops retrying a stuck row until the
operator intervenes.
new POST /retry-reminder/{id} → reset_reminder_failure clears
the counters; next 5s tick re-attempts. cancel-reminder
unchanged (hard-delete).
dashboard renders failed rows with a red left rule, the error
text inline, and a ⚠ N failed badge. ↻ R3TRY button appears
when attempt_count > 0 — sits next to ✗ C4NC3L in a small
actions row below the body.
506 lines
19 KiB
Rust
506 lines
19 KiB
Rust
//! Sqlite-backed message broker. Survives `hive-c0re` restart, and taps every
|
|
//! send/recv onto a broadcast channel so the dashboard can stream it.
|
|
|
|
use std::path::Path;
|
|
use std::sync::Mutex;
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
|
|
use anyhow::{Context, Result};
|
|
use hive_sh4re::{InboxRow, Message};
|
|
use rusqlite::{Connection, OptionalExtension, params};
|
|
use serde::Serialize;
|
|
use tokio::sync::broadcast;
|
|
|
|
const SCHEMA: &str = r"
|
|
CREATE TABLE IF NOT EXISTS messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
sender TEXT NOT NULL,
|
|
recipient TEXT NOT NULL,
|
|
body TEXT NOT NULL,
|
|
sent_at INTEGER NOT NULL,
|
|
delivered_at INTEGER
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_messages_undelivered
|
|
ON messages (recipient, id) WHERE delivered_at IS NULL;
|
|
|
|
CREATE TABLE IF NOT EXISTS reminders (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
agent TEXT NOT NULL,
|
|
message TEXT NOT NULL,
|
|
file_path TEXT,
|
|
due_at INTEGER NOT NULL,
|
|
created_at INTEGER NOT NULL,
|
|
sent_at INTEGER
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_reminders_due
|
|
ON reminders (agent, due_at) WHERE sent_at IS NULL;
|
|
";
|
|
|
|
/// Capacity of the live event channel. Slow subscribers (e.g. an idle browser)
|
|
/// may drop events past this; we send a `lagged` notice in their stream.
|
|
const EVENT_CHANNEL: usize = 256;
|
|
|
|
/// Row shape returned by [`Broker::get_due_reminders`]:
|
|
/// `(agent, reminder_id, message, file_path)`. Type alias keeps
|
|
/// `clippy::type_complexity` quiet and makes the scheduler call site
|
|
/// self-documenting.
|
|
pub type DueReminder = (String, i64, String, Option<String>);
|
|
|
|
/// Row shape for [`Broker::list_pending_reminders`], shipped on the
|
|
/// dashboard `/api/reminders` response.
|
|
#[derive(Debug, Clone, Serialize)]
|
|
pub struct PendingReminder {
|
|
pub id: i64,
|
|
pub agent: String,
|
|
pub message: String,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub file_path: Option<String>,
|
|
pub due_at: i64,
|
|
pub created_at: i64,
|
|
/// Most recent delivery failure for this row, if any. Cleared
|
|
/// to NULL on operator retry. Surfaced inline in the dashboard
|
|
/// so a stuck reminder doesn't just silently retry forever.
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub last_error: Option<String>,
|
|
/// Number of failed delivery attempts since the row was
|
|
/// created or last retried. After `MAX_REMINDER_ATTEMPTS` the
|
|
/// scheduler stops trying (the row stays in `pending` with the
|
|
/// error so the operator can decide between retry + cancel).
|
|
#[serde(default)]
|
|
pub attempt_count: u32,
|
|
}
|
|
|
|
/// Stop retrying a row after this many consecutive failures. The
|
|
/// scheduler quits scheduling it until an operator explicitly
|
|
/// retries (which resets the counter) or cancels (which deletes
|
|
/// the row). Below the cap the existing 5s tick re-attempts each
|
|
/// time the row is due.
|
|
pub const MAX_REMINDER_ATTEMPTS: u32 = 5;
|
|
|
|
/// Intra-process broker event. `recv_blocking` listens on the same
|
|
/// channel as the dashboard forwarder; the forwarder re-emits each
|
|
/// event as a `DashboardEvent` with a freshly-stamped seq from the
|
|
/// Coordinator. The broker itself doesn't stamp seqs — that's a wire
|
|
/// concern, not a storage concern.
|
|
#[derive(Debug, Clone, Serialize)]
|
|
#[serde(rename_all = "snake_case", tag = "kind")]
|
|
pub enum MessageEvent {
|
|
Sent {
|
|
from: String,
|
|
to: String,
|
|
body: String,
|
|
at: i64,
|
|
},
|
|
Delivered {
|
|
from: String,
|
|
to: String,
|
|
body: String,
|
|
at: i64,
|
|
},
|
|
}
|
|
|
|
pub struct Broker {
|
|
conn: Mutex<Connection>,
|
|
events: broadcast::Sender<MessageEvent>,
|
|
}
|
|
|
|
impl Broker {
|
|
pub fn open(path: &Path) -> Result<Self> {
|
|
if let Some(parent) = path.parent() {
|
|
std::fs::create_dir_all(parent)
|
|
.with_context(|| format!("create db parent {}", parent.display()))?;
|
|
}
|
|
let conn =
|
|
Connection::open(path).with_context(|| format!("open broker db {}", path.display()))?;
|
|
conn.execute_batch(SCHEMA).context("apply broker schema")?;
|
|
ensure_reminder_columns(&conn).context("migrate reminders columns")?;
|
|
let (events, _) = broadcast::channel(EVENT_CHANNEL);
|
|
Ok(Self {
|
|
conn: Mutex::new(conn),
|
|
events,
|
|
})
|
|
}
|
|
|
|
pub fn subscribe(&self) -> broadcast::Receiver<MessageEvent> {
|
|
self.events.subscribe()
|
|
}
|
|
|
|
pub fn send(&self, message: &Message) -> Result<()> {
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.execute(
|
|
"INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)",
|
|
params![message.from, message.to, message.body, now_unix()],
|
|
)?;
|
|
drop(conn);
|
|
let _ = self.events.send(MessageEvent::Sent {
|
|
from: message.from.clone(),
|
|
to: message.to.clone(),
|
|
body: message.body.clone(),
|
|
at: now_unix(),
|
|
});
|
|
Ok(())
|
|
}
|
|
|
|
/// Latest `limit` messages addressed to `recipient`, newest-first.
|
|
/// Includes delivered + undelivered alike — used for the operator
|
|
/// inbox view on the dashboard. Caller decides what to show.
|
|
pub fn recent_for(&self, recipient: &str, limit: u64) -> Result<Vec<InboxRow>> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX);
|
|
let mut stmt = conn.prepare(
|
|
"SELECT id, sender, body, sent_at
|
|
FROM messages
|
|
WHERE recipient = ?1
|
|
ORDER BY id DESC
|
|
LIMIT ?2",
|
|
)?;
|
|
let rows = stmt.query_map(params![recipient, limit_i], |row| {
|
|
Ok(InboxRow {
|
|
id: row.get(0)?,
|
|
from: row.get(1)?,
|
|
body: row.get(2)?,
|
|
at: row.get(3)?,
|
|
})
|
|
})?;
|
|
rows.collect::<rusqlite::Result<Vec<_>>>()
|
|
.map_err(Into::into)
|
|
}
|
|
|
|
/// Latest `limit` messages across every recipient, newest-first.
|
|
/// Backs the dashboard's message-flow backfill so a reload doesn't
|
|
/// blank the operator's view of recent traffic. Returns each row as
|
|
/// a [`MessageEvent::Sent`] so the dashboard's live renderer (which
|
|
/// already speaks `MessageEvent`) can replay history through the
|
|
/// same code path. We don't synthesise `Delivered` events here —
|
|
/// the recv-side acks live in a different table column and would
|
|
/// double-render on backfill; the live stream picks them up
|
|
/// immediately on the first new `recv`.
|
|
pub fn recent_all(&self, limit: u64) -> Result<Vec<MessageEvent>> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX);
|
|
let mut stmt = conn.prepare(
|
|
"SELECT sender, recipient, body, sent_at
|
|
FROM messages
|
|
ORDER BY id DESC
|
|
LIMIT ?1",
|
|
)?;
|
|
let rows = stmt.query_map(params![limit_i], |row| {
|
|
Ok(MessageEvent::Sent {
|
|
from: row.get(0)?,
|
|
to: row.get(1)?,
|
|
body: row.get(2)?,
|
|
at: row.get(3)?,
|
|
})
|
|
})?;
|
|
rows.collect::<rusqlite::Result<Vec<_>>>()
|
|
.map_err(Into::into)
|
|
}
|
|
|
|
/// Number of undelivered messages addressed to `recipient`. Non-mutating
|
|
/// — used by the harness to surface "N unread" in tool-result status
|
|
/// lines without popping the queue.
|
|
pub fn count_pending(&self, recipient: &str) -> Result<u64> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let n: i64 = conn.query_row(
|
|
"SELECT COUNT(*) FROM messages
|
|
WHERE recipient = ?1 AND delivered_at IS NULL",
|
|
params![recipient],
|
|
|row| row.get(0),
|
|
)?;
|
|
Ok(u64::try_from(n.max(0)).unwrap_or(0))
|
|
}
|
|
|
|
/// Long-poll variant of `recv`: returns immediately if there's a
|
|
/// pending message; otherwise waits up to `timeout` for the broker to
|
|
/// emit a `Sent { to: recipient }` event, then retries the pop. Lets
|
|
/// agents react to new mail without polling their socket on a fixed
|
|
/// interval.
|
|
///
|
|
/// **Subscribe-before-check order matters.** If we polled the sqlite
|
|
/// row first and only then called `subscribe()`, a concurrent `send`
|
|
/// landing in that window would commit + broadcast its event *before*
|
|
/// our receiver existed — and we'd then sit on the long-poll until
|
|
/// the timeout (or another, unrelated send) fired. That looked
|
|
/// externally like "the agent processed one wake then went deaf
|
|
/// until the operator poked it again". Subscribing first guarantees
|
|
/// any post-subscribe send notifies us; the redundant `recv()`
|
|
/// catches the message either way.
|
|
pub async fn recv_blocking(
|
|
&self,
|
|
recipient: &str,
|
|
timeout: std::time::Duration,
|
|
) -> Result<Option<Message>> {
|
|
let mut rx = self.subscribe();
|
|
if let Some(m) = self.recv(recipient)? {
|
|
return Ok(Some(m));
|
|
}
|
|
let deadline = tokio::time::Instant::now() + timeout;
|
|
loop {
|
|
let Some(remaining) = deadline.checked_duration_since(tokio::time::Instant::now())
|
|
else {
|
|
return Ok(None);
|
|
};
|
|
match tokio::time::timeout(remaining, rx.recv()).await {
|
|
Err(_) => return Ok(None),
|
|
// Channel lagged or closed — fall back to a single direct
|
|
// pop (in case we missed our notification while behind).
|
|
Ok(Err(_)) => return self.recv(recipient),
|
|
Ok(Ok(MessageEvent::Sent { to, .. })) if to == recipient => {
|
|
if let Some(m) = self.recv(recipient)? {
|
|
return Ok(Some(m));
|
|
}
|
|
// Lost a race (concurrent recv elsewhere). Keep waiting.
|
|
}
|
|
Ok(Ok(_)) => {}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Delete delivered messages older than `older_than_secs`. Undelivered
|
|
/// rows are always kept regardless of age — those are still in flight
|
|
/// from the broker's POV. Returns the number of rows removed.
|
|
pub fn vacuum_delivered(&self, older_than_secs: i64) -> Result<u64> {
|
|
let cutoff = now_unix() - older_than_secs;
|
|
let conn = self.conn.lock().unwrap();
|
|
let n = conn.execute(
|
|
"DELETE FROM messages
|
|
WHERE delivered_at IS NOT NULL
|
|
AND delivered_at < ?1",
|
|
params![cutoff],
|
|
)?;
|
|
Ok(u64::try_from(n).unwrap_or(0))
|
|
}
|
|
|
|
pub fn recv(&self, recipient: &str) -> Result<Option<Message>> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let row: Option<(i64, String, String, String)> = conn
|
|
.query_row(
|
|
"SELECT id, sender, recipient, body
|
|
FROM messages
|
|
WHERE recipient = ?1 AND delivered_at IS NULL
|
|
ORDER BY id ASC
|
|
LIMIT 1",
|
|
params![recipient],
|
|
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
|
|
)
|
|
.optional()?;
|
|
let Some((id, from, to, body)) = row else {
|
|
return Ok(None);
|
|
};
|
|
conn.execute(
|
|
"UPDATE messages SET delivered_at = ?1 WHERE id = ?2",
|
|
params![now_unix(), id],
|
|
)?;
|
|
drop(conn);
|
|
let _ = self.events.send(MessageEvent::Delivered {
|
|
from: from.clone(),
|
|
to: to.clone(),
|
|
body: body.clone(),
|
|
at: now_unix(),
|
|
});
|
|
Ok(Some(Message { from, to, body }))
|
|
}
|
|
|
|
/// Store a new reminder. Returns the reminder id.
|
|
pub fn store_reminder(
|
|
&self,
|
|
agent: &str,
|
|
message: &str,
|
|
file_path: Option<&str>,
|
|
due_at: i64,
|
|
) -> Result<i64> {
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.execute(
|
|
"INSERT INTO reminders (agent, message, file_path, due_at, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
|
|
params![agent, message, file_path, due_at, now_unix()],
|
|
)?;
|
|
let id = conn.last_insert_rowid();
|
|
Ok(id)
|
|
}
|
|
|
|
/// Every reminder still pending delivery, newest-first. Used by the
|
|
/// dashboard's reminders pane so the operator can see what's queued
|
|
/// + cancel rows that are no longer wanted.
|
|
pub fn list_pending_reminders(&self) -> Result<Vec<PendingReminder>> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let mut stmt = conn.prepare(
|
|
"SELECT id, agent, message, file_path, due_at, created_at, \
|
|
last_error, attempt_count \
|
|
FROM reminders \
|
|
WHERE sent_at IS NULL \
|
|
ORDER BY due_at ASC",
|
|
)?;
|
|
let rows = stmt.query_map([], |row| {
|
|
let attempts: i64 = row.get(7)?;
|
|
Ok(PendingReminder {
|
|
id: row.get(0)?,
|
|
agent: row.get(1)?,
|
|
message: row.get(2)?,
|
|
file_path: row.get(3)?,
|
|
due_at: row.get(4)?,
|
|
created_at: row.get(5)?,
|
|
last_error: row.get(6)?,
|
|
attempt_count: u32::try_from(attempts).unwrap_or(0),
|
|
})
|
|
})?;
|
|
rows.collect::<rusqlite::Result<Vec<_>>>()
|
|
.context("list pending reminders")
|
|
}
|
|
|
|
/// Mark a delivery attempt as failed: bump `attempt_count` and
|
|
/// stash the error string. Called by `reminder_scheduler::tick`
|
|
/// when `deliver_reminder` returns Err. Soft-cap behaviour
|
|
/// lives in `get_due_reminders` (rows over the cap drop out
|
|
/// of the due-list and stop being attempted until retry).
|
|
pub fn record_reminder_failure(&self, id: i64, reason: &str) -> Result<()> {
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.execute(
|
|
"UPDATE reminders \
|
|
SET attempt_count = attempt_count + 1, last_error = ?1 \
|
|
WHERE id = ?2 AND sent_at IS NULL",
|
|
params![reason, id],
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Clear the failure state on a pending reminder so the
|
|
/// scheduler picks it up again. No-op when the row is already
|
|
/// fresh (attempt_count == 0). Returns the number of rows
|
|
/// affected so callers can distinguish "retried" from "no
|
|
/// such pending reminder" (already delivered, or wrong id).
|
|
pub fn reset_reminder_failure(&self, id: i64) -> Result<usize> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let n = conn.execute(
|
|
"UPDATE reminders \
|
|
SET attempt_count = 0, last_error = NULL \
|
|
WHERE id = ?1 AND sent_at IS NULL",
|
|
params![id],
|
|
)?;
|
|
Ok(n)
|
|
}
|
|
|
|
/// Count this agent's still-pending (un-delivered) reminders.
|
|
/// Used by the per-turn stats sink for a cheap "what was queued
|
|
/// at turn-end" snapshot.
|
|
pub fn count_pending_reminders_for(&self, agent: &str) -> Result<u64> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let n: i64 = conn.query_row(
|
|
"SELECT COUNT(*) FROM reminders WHERE agent = ?1 AND sent_at IS NULL",
|
|
params![agent],
|
|
|row| row.get(0),
|
|
)?;
|
|
Ok(u64::try_from(n).unwrap_or(0))
|
|
}
|
|
|
|
/// Delete a reminder by id. Returns the number of rows removed (0
|
|
/// when the id never existed or was already delivered). Hard
|
|
/// delete rather than soft so the row doesn't linger and confuse a
|
|
/// re-creation under the same id.
|
|
pub fn cancel_reminder(&self, id: i64) -> Result<usize> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let n = conn.execute(
|
|
"DELETE FROM reminders WHERE id = ?1 AND sent_at IS NULL",
|
|
params![id],
|
|
)?;
|
|
Ok(n)
|
|
}
|
|
|
|
/// Get up to `limit` due reminders across all agents in a single query.
|
|
/// Returns `(agent, id, message, file_path)` tuples. Pass a small limit
|
|
/// (e.g. 100) so a burst of overdue reminders doesn't flood the broker
|
|
/// in one cycle — leftovers stay due and get picked up on the next tick.
|
|
pub fn get_due_reminders(&self, limit: u64) -> Result<Vec<DueReminder>> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX);
|
|
let max_attempts = i64::from(MAX_REMINDER_ATTEMPTS);
|
|
// attempt_count >= cap = give up; row stays pending so the
|
|
// operator sees + can retry/cancel via the dashboard.
|
|
let mut stmt = conn.prepare(
|
|
"SELECT agent, id, message, file_path FROM reminders \
|
|
WHERE due_at <= ?1 AND sent_at IS NULL AND attempt_count < ?3 \
|
|
ORDER BY agent, due_at ASC \
|
|
LIMIT ?2",
|
|
)?;
|
|
let rows = stmt.query_map(params![now_unix(), limit_i, max_attempts], |row| {
|
|
Ok((
|
|
row.get::<_, String>(0)?,
|
|
row.get::<_, i64>(1)?,
|
|
row.get::<_, String>(2)?,
|
|
row.get::<_, Option<String>>(3)?,
|
|
))
|
|
})?;
|
|
rows.collect::<rusqlite::Result<Vec<_>>>()
|
|
.context("query due reminders")
|
|
}
|
|
|
|
/// Atomic reminder delivery: insert the inbox message AND mark the
|
|
/// reminder as sent in a single sqlite transaction. Prevents the
|
|
/// orphan-reminder duplicate-delivery class of bugs that two separate
|
|
/// calls (send + `mark_reminder_sent`) could produce if the second one
|
|
/// failed transiently — the next scheduler tick would see the reminder
|
|
/// still due and redeliver. Either both writes commit or neither does;
|
|
/// re-running on failure is safe.
|
|
///
|
|
/// Emits a `Sent` event on the broadcast channel after the transaction
|
|
/// commits (so subscribers see the inbox message but never see a
|
|
/// "phantom" send for a transaction that rolled back).
|
|
pub fn deliver_reminder(&self, id: i64, agent: &str, message: &str) -> Result<()> {
|
|
let now = now_unix();
|
|
let mut conn = self.conn.lock().unwrap();
|
|
let tx = conn.transaction()?;
|
|
tx.execute(
|
|
"INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)",
|
|
params!["reminder", agent, message, now],
|
|
)?;
|
|
tx.execute(
|
|
"UPDATE reminders SET sent_at = ?1 WHERE id = ?2",
|
|
params![now, id],
|
|
)?;
|
|
tx.commit()?;
|
|
drop(conn);
|
|
let _ = self.events.send(MessageEvent::Sent {
|
|
from: "reminder".to_owned(),
|
|
to: agent.to_owned(),
|
|
body: message.to_owned(),
|
|
at: now,
|
|
});
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// Idempotent reminder-table migrations. `ALTER TABLE ADD COLUMN`
|
|
/// has no `IF NOT EXISTS` form in sqlite, so we probe
|
|
/// `pragma_table_info` per column. New deploys (table created by
|
|
/// SCHEMA in this commit cycle) skip the ALTER; pre-existing
|
|
/// broker.sqlite files get the columns added on next boot.
|
|
fn ensure_reminder_columns(conn: &Connection) -> Result<()> {
|
|
for (name, sql) in [
|
|
(
|
|
"attempt_count",
|
|
"ALTER TABLE reminders ADD COLUMN attempt_count INTEGER NOT NULL DEFAULT 0;",
|
|
),
|
|
(
|
|
"last_error",
|
|
"ALTER TABLE reminders ADD COLUMN last_error TEXT;",
|
|
),
|
|
] {
|
|
let has: bool = conn
|
|
.prepare(&format!(
|
|
"SELECT 1 FROM pragma_table_info('reminders') WHERE name = '{name}'"
|
|
))?
|
|
.exists([])?;
|
|
if !has {
|
|
conn.execute_batch(sql)
|
|
.with_context(|| format!("add reminders.{name} column"))?;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn now_unix() -> i64 {
|
|
SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.ok()
|
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
|
.unwrap_or(0)
|
|
}
|