- MessageEvent and DashboardEvent Sent/Delivered now carry id and in_reply_to - broker.send() includes last_insert_rowid in the emitted event - recent_all() and recv_batch() include id and in_reply_to from the DB - deliver_reminders_batch() tracks per-row rowids within the transaction - dashboard message flow: reply rows are indented with a border-left and a clickable '↳ reply' tag that scroll-jumps + briefly highlights the parent - per-agent inbox: reply messages get a '↳ reply ·' prefix and indent Closes #26
1133 lines
46 KiB
Rust
1133 lines
46 KiB
Rust
//! Sqlite-backed message broker. Survives `hive-c0re` restart, and taps every
|
|
//! send/recv onto a broadcast channel so the dashboard can stream it.
|
|
|
|
use std::collections::{HashMap, HashSet};
|
|
use std::path::Path;
|
|
use std::sync::Mutex;
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
|
|
use anyhow::{Context, Result};
|
|
use hive_sh4re::{InboxRow, Message};
|
|
use rusqlite::{Connection, OptionalExtension, params};
|
|
use serde::Serialize;
|
|
use tokio::sync::broadcast;
|
|
|
|
const SCHEMA: &str = r"
|
|
CREATE TABLE IF NOT EXISTS messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
sender TEXT NOT NULL,
|
|
recipient TEXT NOT NULL,
|
|
body TEXT NOT NULL,
|
|
sent_at INTEGER NOT NULL,
|
|
delivered_at INTEGER,
|
|
in_reply_to INTEGER
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_messages_undelivered
|
|
ON messages (recipient, id) WHERE delivered_at IS NULL;
|
|
|
|
CREATE TABLE IF NOT EXISTS reminders (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
agent TEXT NOT NULL,
|
|
message TEXT NOT NULL,
|
|
file_path TEXT,
|
|
due_at INTEGER NOT NULL,
|
|
created_at INTEGER NOT NULL,
|
|
sent_at INTEGER
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_reminders_due
|
|
ON reminders (agent, due_at) WHERE sent_at IS NULL;
|
|
";
|
|
|
|
/// Capacity of the live event channel. Slow subscribers (e.g. an idle browser)
|
|
/// may drop events past this; we send a `lagged` notice in their stream.
|
|
const EVENT_CHANNEL: usize = 256;
|
|
|
|
/// Row shape returned by [`Broker::get_due_reminders`]:
|
|
/// `(agent, reminder_id, message, file_path)`. Type alias keeps
|
|
/// `clippy::type_complexity` quiet and makes the scheduler call site
|
|
/// self-documenting.
|
|
pub type DueReminder = (String, i64, String, Option<String>);
|
|
|
|
/// A single message hand-off from broker to recipient. Carries the
|
|
/// broker's row id (so the harness can drive `ack_turn` later) and
|
|
/// the redelivery flag (so the harness can prepend the
|
|
/// "may already be handled" hint to the wake prompt). The
|
|
/// `Message` itself is identical to a pristine `Send` payload.
|
|
#[derive(Debug, Clone)]
|
|
pub struct Delivery {
|
|
pub id: i64,
|
|
pub redelivered: bool,
|
|
pub message: Message,
|
|
}
|
|
|
|
/// Row shape for [`Broker::list_pending_reminders`], shipped on the
|
|
/// dashboard `/api/reminders` response.
|
|
#[derive(Debug, Clone, Serialize)]
|
|
pub struct PendingReminder {
|
|
pub id: i64,
|
|
pub agent: String,
|
|
pub message: String,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub file_path: Option<String>,
|
|
pub due_at: i64,
|
|
pub created_at: i64,
|
|
/// Most recent delivery failure for this row, if any. Cleared
|
|
/// to NULL on operator retry. Surfaced inline in the dashboard
|
|
/// so a stuck reminder doesn't just silently retry forever.
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub last_error: Option<String>,
|
|
/// Number of failed delivery attempts since the row was
|
|
/// created or last retried. After `MAX_REMINDER_ATTEMPTS` the
|
|
/// scheduler stops trying (the row stays in `pending` with the
|
|
/// error so the operator can decide between retry + cancel).
|
|
#[serde(default)]
|
|
pub attempt_count: u32,
|
|
}
|
|
|
|
/// Stop retrying a row after this many consecutive failures. The
|
|
/// scheduler quits scheduling it until an operator explicitly
|
|
/// retries (which resets the counter) or cancels (which deletes
|
|
/// the row). Below the cap the existing 5s tick re-attempts each
|
|
/// time the row is due.
|
|
pub const MAX_REMINDER_ATTEMPTS: u32 = 5;
|
|
|
|
/// Intra-process broker event. `recv_blocking_batch` listens on the
|
|
/// same channel as the dashboard forwarder; the forwarder re-emits
|
|
/// each event as a `DashboardEvent` with a freshly-stamped seq from
|
|
/// the Coordinator. The broker itself doesn't stamp seqs — that's a
|
|
/// wire concern, not a storage concern.
|
|
#[derive(Debug, Clone, Serialize)]
|
|
#[serde(rename_all = "snake_case", tag = "kind")]
|
|
pub enum MessageEvent {
|
|
Sent {
|
|
/// Broker row id — used by the dashboard to track thread parents.
|
|
id: i64,
|
|
from: String,
|
|
to: String,
|
|
body: String,
|
|
at: i64,
|
|
in_reply_to: Option<i64>,
|
|
},
|
|
Delivered {
|
|
/// Broker row id — used by the dashboard to track thread parents.
|
|
id: i64,
|
|
from: String,
|
|
to: String,
|
|
body: String,
|
|
at: i64,
|
|
in_reply_to: Option<i64>,
|
|
},
|
|
}
|
|
|
|
/// Per-recipient in-memory bookkeeping for the deliver-then-ack
|
|
/// flow. Source of truth is the DB columns `delivered_at` +
|
|
/// `acked_at`; the in-memory state here is purely an optimisation
|
|
/// (avoids scanning the messages table on `AckTurn`) plus the
|
|
/// redelivery-hint marker.
|
|
#[derive(Default)]
|
|
struct RecipientInflight {
|
|
/// Message ids the broker has handed to this recipient since the
|
|
/// last `AckTurn`. Drained on `ack_turn`, which then runs a
|
|
/// single `UPDATE … WHERE id IN (…)` to set `acked_at`.
|
|
unacked_ids: Vec<i64>,
|
|
/// Message ids resurfaced by the most recent `requeue_inflight`
|
|
/// call. The next `recv_batch` pop of any id in this set tags
|
|
/// the response with `redelivered: true` so the harness can
|
|
/// prepend the "may already be handled" hint to the wake prompt;
|
|
/// successful pops drain the id from the set.
|
|
requeued_ids: HashSet<i64>,
|
|
}
|
|
|
|
pub struct Broker {
|
|
conn: Mutex<Connection>,
|
|
events: broadcast::Sender<MessageEvent>,
|
|
/// Per-recipient deliver/ack tracking. Lost on hive-c0re restart
|
|
/// (harmless — the harness fires `RequeueInflight` on its own
|
|
/// boot, which rebuilds the `requeued_ids` set from the DB and
|
|
/// clears any stale `unacked_ids`).
|
|
inflight: Mutex<HashMap<String, RecipientInflight>>,
|
|
}
|
|
|
|
impl Broker {
|
|
pub fn open(path: &Path) -> Result<Self> {
|
|
if let Some(parent) = path.parent() {
|
|
std::fs::create_dir_all(parent)
|
|
.with_context(|| format!("create db parent {}", parent.display()))?;
|
|
}
|
|
let conn =
|
|
Connection::open(path).with_context(|| format!("open broker db {}", path.display()))?;
|
|
conn.execute_batch(SCHEMA).context("apply broker schema")?;
|
|
ensure_message_columns(&conn).context("migrate messages columns")?;
|
|
ensure_reminder_columns(&conn).context("migrate reminders columns")?;
|
|
let (events, _) = broadcast::channel(EVENT_CHANNEL);
|
|
Ok(Self {
|
|
conn: Mutex::new(conn),
|
|
events,
|
|
inflight: Mutex::new(HashMap::new()),
|
|
})
|
|
}
|
|
|
|
pub fn subscribe(&self) -> broadcast::Receiver<MessageEvent> {
|
|
self.events.subscribe()
|
|
}
|
|
|
|
pub fn send(&self, message: &Message) -> Result<()> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let now = now_unix();
|
|
conn.execute(
|
|
"INSERT INTO messages (sender, recipient, body, sent_at, in_reply_to) VALUES (?1, ?2, ?3, ?4, ?5)",
|
|
params![message.from, message.to, message.body, now, message.in_reply_to],
|
|
)?;
|
|
let row_id = conn.last_insert_rowid();
|
|
drop(conn);
|
|
let _ = self.events.send(MessageEvent::Sent {
|
|
id: row_id,
|
|
from: message.from.clone(),
|
|
to: message.to.clone(),
|
|
body: message.body.clone(),
|
|
at: now,
|
|
in_reply_to: message.in_reply_to,
|
|
});
|
|
Ok(())
|
|
}
|
|
|
|
/// Latest `limit` messages addressed to `recipient`, newest-first.
|
|
/// Includes delivered + undelivered alike — used for the operator
|
|
/// inbox view on the dashboard. Caller decides what to show.
|
|
pub fn recent_for(&self, recipient: &str, limit: u64) -> Result<Vec<InboxRow>> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX);
|
|
let mut stmt = conn.prepare(
|
|
"SELECT id, sender, body, sent_at, in_reply_to
|
|
FROM messages
|
|
WHERE recipient = ?1
|
|
ORDER BY id DESC
|
|
LIMIT ?2",
|
|
)?;
|
|
let rows = stmt.query_map(params![recipient, limit_i], |row| {
|
|
Ok(InboxRow {
|
|
id: row.get(0)?,
|
|
from: row.get(1)?,
|
|
body: row.get(2)?,
|
|
at: row.get(3)?,
|
|
in_reply_to: row.get(4)?,
|
|
})
|
|
})?;
|
|
rows.collect::<rusqlite::Result<Vec<_>>>()
|
|
.map_err(Into::into)
|
|
}
|
|
|
|
/// Latest `limit` messages across every recipient, newest-first.
|
|
/// Backs the dashboard's message-flow backfill so a reload doesn't
|
|
/// blank the operator's view of recent traffic. Returns each row as
|
|
/// a [`MessageEvent::Sent`] so the dashboard's live renderer (which
|
|
/// already speaks `MessageEvent`) can replay history through the
|
|
/// same code path. We don't synthesise `Delivered` events here —
|
|
/// the recv-side acks live in a different table column and would
|
|
/// double-render on backfill; the live stream picks them up
|
|
/// immediately on the first new `recv`.
|
|
pub fn recent_all(&self, limit: u64) -> Result<Vec<MessageEvent>> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX);
|
|
let mut stmt = conn.prepare(
|
|
"SELECT id, sender, recipient, body, sent_at, in_reply_to
|
|
FROM messages
|
|
ORDER BY id DESC
|
|
LIMIT ?1",
|
|
)?;
|
|
let rows = stmt.query_map(params![limit_i], |row| {
|
|
Ok(MessageEvent::Sent {
|
|
id: row.get(0)?,
|
|
from: row.get(1)?,
|
|
to: row.get(2)?,
|
|
body: row.get(3)?,
|
|
at: row.get(4)?,
|
|
in_reply_to: row.get(5)?,
|
|
})
|
|
})?;
|
|
rows.collect::<rusqlite::Result<Vec<_>>>()
|
|
.map_err(Into::into)
|
|
}
|
|
|
|
/// Number of undelivered messages addressed to `recipient`. Non-mutating
|
|
/// — used by the harness to surface "N unread" in tool-result status
|
|
/// lines without popping the queue.
|
|
pub fn count_pending(&self, recipient: &str) -> Result<u64> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let n: i64 = conn.query_row(
|
|
"SELECT COUNT(*) FROM messages
|
|
WHERE recipient = ?1 AND delivered_at IS NULL",
|
|
params![recipient],
|
|
|row| row.get(0),
|
|
)?;
|
|
Ok(u64::try_from(n.max(0)).unwrap_or(0))
|
|
}
|
|
|
|
/// Long-poll variant of `recv_batch`: returns immediately if any
|
|
/// row is pending (popping up to `max`); otherwise waits up to
|
|
/// `timeout` for the broker to emit a `Sent { to: recipient }`
|
|
/// event and re-tries the pop. Lets agents react to new mail
|
|
/// without polling their socket on a fixed interval AND lets a
|
|
/// single round-trip drain a burst of messages.
|
|
///
|
|
/// **Subscribe-before-check order matters.** If we polled the
|
|
/// sqlite row first and only then called `subscribe()`, a
|
|
/// concurrent `send` landing in that window would commit +
|
|
/// broadcast its event *before* our receiver existed — and we'd
|
|
/// then sit on the long-poll until the timeout (or another,
|
|
/// unrelated send) fired. That looked externally like "the agent
|
|
/// processed one wake then went deaf until the operator poked it
|
|
/// again". Subscribing first guarantees any post-subscribe send
|
|
/// notifies us; the redundant `recv_batch()` catches the message
|
|
/// either way.
|
|
///
|
|
/// `max == 0` returns an empty vec without subscribing or waiting.
|
|
pub async fn recv_blocking_batch(
|
|
&self,
|
|
recipient: &str,
|
|
timeout: std::time::Duration,
|
|
max: usize,
|
|
) -> Result<Vec<Delivery>> {
|
|
if max == 0 {
|
|
return Ok(Vec::new());
|
|
}
|
|
let mut rx = self.subscribe();
|
|
let batch = self.recv_batch(recipient, max)?;
|
|
if !batch.is_empty() {
|
|
return Ok(batch);
|
|
}
|
|
let deadline = tokio::time::Instant::now() + timeout;
|
|
loop {
|
|
let Some(remaining) = deadline.checked_duration_since(tokio::time::Instant::now())
|
|
else {
|
|
return Ok(Vec::new());
|
|
};
|
|
match tokio::time::timeout(remaining, rx.recv()).await {
|
|
Err(_) => return Ok(Vec::new()),
|
|
// Channel lagged or closed — fall back to a single direct
|
|
// pop (in case we missed our notification while behind).
|
|
Ok(Err(_)) => return self.recv_batch(recipient, max),
|
|
Ok(Ok(MessageEvent::Sent { to, .. })) if to == recipient => {
|
|
let batch = self.recv_batch(recipient, max)?;
|
|
if !batch.is_empty() {
|
|
return Ok(batch);
|
|
}
|
|
// Lost a race (concurrent recv elsewhere). Keep waiting.
|
|
}
|
|
Ok(Ok(_)) => {}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Delete fully-acked messages older than `older_than_secs`.
|
|
/// Unacked rows (delivered but not yet acknowledged by a clean
|
|
/// turn-end, plus undelivered rows) are always kept regardless of
|
|
/// age — the former because they're recoverable via
|
|
/// `requeue_inflight`, the latter because they're still in flight
|
|
/// from the broker's POV. Returns the number of rows removed.
|
|
pub fn vacuum_delivered(&self, older_than_secs: i64) -> Result<u64> {
|
|
let cutoff = now_unix() - older_than_secs;
|
|
let conn = self.conn.lock().unwrap();
|
|
let n = conn.execute(
|
|
"DELETE FROM messages
|
|
WHERE acked_at IS NOT NULL
|
|
AND acked_at < ?1",
|
|
params![cutoff],
|
|
)?;
|
|
Ok(u64::try_from(n).unwrap_or(0))
|
|
}
|
|
|
|
/// Pop up to `max` pending messages for `recipient` in one
|
|
/// round-trip. Every popped row is marked `delivered_at = NOW`,
|
|
/// pushed onto the per-recipient `unacked_ids` list (so the next
|
|
/// `ack_turn` closes them out), and tagged with
|
|
/// `redelivered = true` if it was resurfaced by the most recent
|
|
/// `requeue_inflight`. Emits one `MessageEvent::Delivered` per
|
|
/// popped row so the dashboard forwarder stream sees one event
|
|
/// per message regardless of batch size.
|
|
///
|
|
/// `max == 0` short-circuits to an empty vec (no DB hit); any
|
|
/// positive value caps the batch at `max`. FIFO ordering.
|
|
///
|
|
/// Lock order: `inflight` FIRST, then `conn`. `requeue_inflight`
|
|
/// and `ack_turn` follow the same order so a concurrent pop can't
|
|
/// race the requeue's DB update vs in-memory populate and miss
|
|
/// the redelivered tag.
|
|
pub fn recv_batch(&self, recipient: &str, max: usize) -> Result<Vec<Delivery>> {
|
|
if max == 0 {
|
|
return Ok(Vec::new());
|
|
}
|
|
// Same lock order as `recv` / `ack_turn` / `requeue_inflight`.
|
|
let mut inflight = self.inflight.lock().unwrap();
|
|
let conn = self.conn.lock().unwrap();
|
|
let max_i = i64::try_from(max).unwrap_or(i64::MAX);
|
|
let mut stmt = conn.prepare(
|
|
"SELECT id, sender, recipient, body, in_reply_to
|
|
FROM messages
|
|
WHERE recipient = ?1 AND delivered_at IS NULL
|
|
ORDER BY id ASC
|
|
LIMIT ?2",
|
|
)?;
|
|
let rows: Vec<(i64, String, String, String, Option<i64>)> = stmt
|
|
.query_map(params![recipient, max_i], |row| {
|
|
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?))
|
|
})?
|
|
.collect::<rusqlite::Result<_>>()?;
|
|
drop(stmt);
|
|
if rows.is_empty() {
|
|
return Ok(Vec::new());
|
|
}
|
|
// Stamp all popped rows in a single UPDATE — under the broker
|
|
// mutex, well within sqlite's 999-param default.
|
|
let now = now_unix();
|
|
let ids: Vec<i64> = rows.iter().map(|(id, _, _, _, _)| *id).collect();
|
|
let placeholders = std::iter::repeat_n("?", ids.len())
|
|
.collect::<Vec<_>>()
|
|
.join(",");
|
|
let sql = format!("UPDATE messages SET delivered_at = ? WHERE id IN ({placeholders})");
|
|
let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(ids.len() + 1);
|
|
params_vec.push(&now);
|
|
for id in &ids {
|
|
params_vec.push(id);
|
|
}
|
|
conn.execute(&sql, params_vec.as_slice())?;
|
|
drop(conn);
|
|
// Bookkeeping + assemble the Delivery list. Per-row
|
|
// `requeued_ids` lookup runs once per pop, same as `recv`.
|
|
let slot = inflight.entry(recipient.to_owned()).or_default();
|
|
let mut deliveries = Vec::with_capacity(rows.len());
|
|
for (id, from, to, body, in_reply_to) in rows {
|
|
slot.unacked_ids.push(id);
|
|
let redelivered = slot.requeued_ids.remove(&id);
|
|
deliveries.push(Delivery {
|
|
id,
|
|
redelivered,
|
|
message: Message {
|
|
from,
|
|
to,
|
|
body,
|
|
in_reply_to,
|
|
},
|
|
});
|
|
}
|
|
drop(inflight);
|
|
// Mirror the per-row Delivered emit `recv` does so the
|
|
// dashboard forwarder sees one event per message regardless of
|
|
// which surface the harness used.
|
|
for d in &deliveries {
|
|
let _ = self.events.send(MessageEvent::Delivered {
|
|
id: d.id,
|
|
from: d.message.from.clone(),
|
|
to: d.message.to.clone(),
|
|
body: d.message.body.clone(),
|
|
at: now,
|
|
in_reply_to: d.message.in_reply_to,
|
|
});
|
|
}
|
|
Ok(deliveries)
|
|
}
|
|
|
|
/// Drain the per-recipient unacked-id list and mark every row
|
|
/// `acked_at = NOW`. Fired by the harness after `TurnOutcome::Ok`.
|
|
/// Returns the number of rows acked (zero is normal — claude
|
|
/// may have not called recv during the turn). Tolerant of ids
|
|
/// that no longer exist in the DB (vacuumed, manually deleted)
|
|
/// — `UPDATE … WHERE id IN (…)` simply matches zero rows.
|
|
pub fn ack_turn(&self, recipient: &str) -> Result<u64> {
|
|
// Same lock order as `recv` and `requeue_inflight`.
|
|
let mut inflight = self.inflight.lock().unwrap();
|
|
let ids: Vec<i64> = inflight
|
|
.get_mut(recipient)
|
|
.map(|s| std::mem::take(&mut s.unacked_ids))
|
|
.unwrap_or_default();
|
|
if ids.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
let now = now_unix();
|
|
let conn = self.conn.lock().unwrap();
|
|
// Bind every id explicitly. Caps in the hundreds in the worst
|
|
// case (a single very chatty turn); well under sqlite's 999
|
|
// default param limit and we're already serialising on the
|
|
// broker mutex.
|
|
let placeholders = std::iter::repeat_n("?", ids.len())
|
|
.collect::<Vec<_>>()
|
|
.join(",");
|
|
let sql = format!("UPDATE messages SET acked_at = ? WHERE id IN ({placeholders})");
|
|
let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(ids.len() + 1);
|
|
params_vec.push(&now);
|
|
for id in &ids {
|
|
params_vec.push(id);
|
|
}
|
|
let n = conn.execute(&sql, params_vec.as_slice())?;
|
|
Ok(u64::try_from(n).unwrap_or(0))
|
|
}
|
|
|
|
/// Resurface every message the broker previously handed to this
|
|
/// recipient that never got `acked_at` set. Used by the harness at
|
|
/// boot to recover from the crashed-mid-turn / OOM-killed /
|
|
/// container-restarted cases. Three steps:
|
|
///
|
|
/// 1. Clear any stale in-memory state for this recipient (the
|
|
/// previous harness session's `unacked_ids` are irrelevant —
|
|
/// the new session will repopulate from fresh pops).
|
|
/// 2. Find every row where `recipient = me`, `delivered_at IS NOT
|
|
/// NULL`, `acked_at IS NULL`. Reset `delivered_at = NULL` so
|
|
/// the next `Recv` pops them again.
|
|
/// 3. Remember each id in the per-recipient `requeued_ids` set so
|
|
/// the next pop tags the response with `redelivered: true`.
|
|
///
|
|
/// Returns the number of rows requeued. Safe to call when there's
|
|
/// nothing in flight (returns 0). Safe to call multiple times
|
|
/// (idempotent — the second call finds nothing because the rows
|
|
/// are now back in the pending state).
|
|
pub fn requeue_inflight(&self, recipient: &str) -> Result<u64> {
|
|
// Hold inflight + conn together so a concurrent `recv` can't
|
|
// pop a just-requeued row between our DB update and our
|
|
// in-memory populate and miss the redelivered tag.
|
|
let mut inflight = self.inflight.lock().unwrap();
|
|
let conn = self.conn.lock().unwrap();
|
|
let mut stmt = conn.prepare(
|
|
"SELECT id FROM messages
|
|
WHERE recipient = ?1
|
|
AND delivered_at IS NOT NULL
|
|
AND acked_at IS NULL",
|
|
)?;
|
|
let ids: Vec<i64> = stmt
|
|
.query_map(params![recipient], |row| row.get(0))?
|
|
.collect::<rusqlite::Result<_>>()?;
|
|
drop(stmt);
|
|
if !ids.is_empty() {
|
|
let placeholders = std::iter::repeat_n("?", ids.len())
|
|
.collect::<Vec<_>>()
|
|
.join(",");
|
|
let sql =
|
|
format!("UPDATE messages SET delivered_at = NULL WHERE id IN ({placeholders})");
|
|
let params_vec: Vec<&dyn rusqlite::ToSql> =
|
|
ids.iter().map(|id| id as &dyn rusqlite::ToSql).collect();
|
|
conn.execute(&sql, params_vec.as_slice())?;
|
|
}
|
|
let slot = inflight.entry(recipient.to_owned()).or_default();
|
|
slot.unacked_ids.clear();
|
|
slot.requeued_ids.clear();
|
|
slot.requeued_ids.extend(ids.iter().copied());
|
|
Ok(u64::try_from(ids.len()).unwrap_or(0))
|
|
}
|
|
|
|
/// Store a new reminder. Returns the reminder id.
|
|
pub fn store_reminder(
|
|
&self,
|
|
agent: &str,
|
|
message: &str,
|
|
file_path: Option<&str>,
|
|
due_at: i64,
|
|
) -> Result<i64> {
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.execute(
|
|
"INSERT INTO reminders (agent, message, file_path, due_at, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
|
|
params![agent, message, file_path, due_at, now_unix()],
|
|
)?;
|
|
let id = conn.last_insert_rowid();
|
|
Ok(id)
|
|
}
|
|
|
|
/// Every reminder still pending delivery, newest-first. Used by the
|
|
/// dashboard's reminders pane so the operator can see what's queued
|
|
/// + cancel rows that are no longer wanted.
|
|
pub fn list_pending_reminders(&self) -> Result<Vec<PendingReminder>> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let mut stmt = conn.prepare(
|
|
"SELECT id, agent, message, file_path, due_at, created_at, \
|
|
last_error, attempt_count \
|
|
FROM reminders \
|
|
WHERE sent_at IS NULL \
|
|
ORDER BY due_at ASC",
|
|
)?;
|
|
let rows = stmt.query_map([], |row| {
|
|
let attempts: i64 = row.get(7)?;
|
|
Ok(PendingReminder {
|
|
id: row.get(0)?,
|
|
agent: row.get(1)?,
|
|
message: row.get(2)?,
|
|
file_path: row.get(3)?,
|
|
due_at: row.get(4)?,
|
|
created_at: row.get(5)?,
|
|
last_error: row.get(6)?,
|
|
attempt_count: u32::try_from(attempts).unwrap_or(0),
|
|
})
|
|
})?;
|
|
rows.collect::<rusqlite::Result<Vec<_>>>()
|
|
.context("list pending reminders")
|
|
}
|
|
|
|
/// Mark a delivery attempt as failed: bump `attempt_count` and
|
|
/// stash the error string. Called by `reminder_scheduler::tick`
|
|
/// when `deliver_reminder` returns Err. Soft-cap behaviour
|
|
/// lives in `get_due_reminders` (rows over the cap drop out
|
|
/// of the due-list and stop being attempted until retry).
|
|
pub fn record_reminder_failure(&self, id: i64, reason: &str) -> Result<()> {
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.execute(
|
|
"UPDATE reminders \
|
|
SET attempt_count = attempt_count + 1, last_error = ?1 \
|
|
WHERE id = ?2 AND sent_at IS NULL",
|
|
params![reason, id],
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Clear the failure state on a pending reminder so the
|
|
/// scheduler picks it up again. No-op when the row is already
|
|
/// fresh (`attempt_count == 0`). Returns the number of rows
|
|
/// affected so callers can distinguish "retried" from "no
|
|
/// such pending reminder" (already delivered, or wrong id).
|
|
pub fn reset_reminder_failure(&self, id: i64) -> Result<usize> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let n = conn.execute(
|
|
"UPDATE reminders \
|
|
SET attempt_count = 0, last_error = NULL \
|
|
WHERE id = ?1 AND sent_at IS NULL",
|
|
params![id],
|
|
)?;
|
|
Ok(n)
|
|
}
|
|
|
|
/// Count this agent's still-pending (un-delivered) reminders.
|
|
/// Used by the per-turn stats sink for a cheap "what was queued
|
|
/// at turn-end" snapshot.
|
|
pub fn count_pending_reminders_for(&self, agent: &str) -> Result<u64> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let n: i64 = conn.query_row(
|
|
"SELECT COUNT(*) FROM reminders WHERE agent = ?1 AND sent_at IS NULL",
|
|
params![agent],
|
|
|row| row.get(0),
|
|
)?;
|
|
Ok(u64::try_from(n).unwrap_or(0))
|
|
}
|
|
|
|
/// Reminder rollup stats for an agent over a time window. Returns
|
|
/// counts of scheduled, delivered, and pending reminders created
|
|
/// in the last `since_secs` seconds (0 = all reminders).
|
|
pub fn reminder_rollup_for(&self, agent: &str, since_secs: u64) -> Result<hive_sh4re::ReminderStats> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let cutoff_time = if since_secs > 0 {
|
|
let now = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.ok()
|
|
.map(|d| d.as_secs() as i64)
|
|
.unwrap_or(0);
|
|
now - since_secs as i64
|
|
} else {
|
|
i64::MIN
|
|
};
|
|
let scheduled: i64 = conn.query_row(
|
|
"SELECT COUNT(*) FROM reminders WHERE agent = ?1 AND created_at >= ?2",
|
|
params![agent, cutoff_time],
|
|
|row| row.get(0),
|
|
)?;
|
|
let delivered: i64 = conn.query_row(
|
|
"SELECT COUNT(*) FROM reminders WHERE agent = ?1 AND created_at >= ?2 AND sent_at IS NOT NULL",
|
|
params![agent, cutoff_time],
|
|
|row| row.get(0),
|
|
)?;
|
|
let pending: i64 = conn.query_row(
|
|
"SELECT COUNT(*) FROM reminders WHERE agent = ?1 AND created_at >= ?2 AND sent_at IS NULL",
|
|
params![agent, cutoff_time],
|
|
|row| row.get(0),
|
|
)?;
|
|
Ok(hive_sh4re::ReminderStats {
|
|
scheduled: u64::try_from(scheduled).unwrap_or(0),
|
|
delivered: u64::try_from(delivered).unwrap_or(0),
|
|
pending: u64::try_from(pending).unwrap_or(0),
|
|
})
|
|
}
|
|
|
|
/// Delete a reminder by id. Returns the number of rows removed (0
|
|
/// when the id never existed or was already delivered). Hard
|
|
/// delete rather than soft so the row doesn't linger and confuse a
|
|
/// re-creation under the same id.
|
|
pub fn cancel_reminder(&self, id: i64) -> Result<usize> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let n = conn.execute(
|
|
"DELETE FROM reminders WHERE id = ?1 AND sent_at IS NULL",
|
|
params![id],
|
|
)?;
|
|
Ok(n)
|
|
}
|
|
|
|
/// Cancel a pending reminder on behalf of `canceller`. Returns
|
|
/// the owner agent name on success (handy for logging). Auth
|
|
/// rules mirror `OperatorQuestions::cancel`: owner, operator, or
|
|
/// manager.
|
|
pub fn cancel_reminder_as(&self, id: i64, canceller: &str) -> Result<String> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let owner: Option<String> = conn
|
|
.query_row(
|
|
"SELECT agent FROM reminders WHERE id = ?1 AND sent_at IS NULL",
|
|
params![id],
|
|
|row| row.get(0),
|
|
)
|
|
.optional()?;
|
|
let Some(owner) = owner else {
|
|
anyhow::bail!("reminder {id} not pending (already delivered or unknown)");
|
|
};
|
|
let authorised = canceller == owner
|
|
|| canceller == hive_sh4re::OPERATOR_RECIPIENT
|
|
|| canceller == hive_sh4re::MANAGER_AGENT;
|
|
if !authorised {
|
|
anyhow::bail!(
|
|
"reminder {id}: '{canceller}' not allowed to cancel (owner = '{owner}')"
|
|
);
|
|
}
|
|
let n = conn.execute(
|
|
"DELETE FROM reminders WHERE id = ?1 AND sent_at IS NULL",
|
|
params![id],
|
|
)?;
|
|
if n == 0 {
|
|
anyhow::bail!("reminder {id} vanished between auth check and delete");
|
|
}
|
|
Ok(owner)
|
|
}
|
|
|
|
/// Get up to `limit` due reminders across all agents in a single query.
|
|
/// Returns `(agent, id, message, file_path)` tuples. Pass a small limit
|
|
/// (e.g. 100) so a burst of overdue reminders doesn't flood the broker
|
|
/// in one cycle — leftovers stay due and get picked up on the next tick.
|
|
pub fn get_due_reminders(&self, limit: u64) -> Result<Vec<DueReminder>> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX);
|
|
let max_attempts = i64::from(MAX_REMINDER_ATTEMPTS);
|
|
// attempt_count >= cap = give up; row stays pending so the
|
|
// operator sees + can retry/cancel via the dashboard.
|
|
let mut stmt = conn.prepare(
|
|
"SELECT agent, id, message, file_path FROM reminders \
|
|
WHERE due_at <= ?1 AND sent_at IS NULL AND attempt_count < ?3 \
|
|
ORDER BY agent, due_at ASC \
|
|
LIMIT ?2",
|
|
)?;
|
|
let rows = stmt.query_map(params![now_unix(), limit_i, max_attempts], |row| {
|
|
Ok((
|
|
row.get::<_, String>(0)?,
|
|
row.get::<_, i64>(1)?,
|
|
row.get::<_, String>(2)?,
|
|
row.get::<_, Option<String>>(3)?,
|
|
))
|
|
})?;
|
|
rows.collect::<rusqlite::Result<Vec<_>>>()
|
|
.context("query due reminders")
|
|
}
|
|
|
|
/// Atomic reminder delivery: insert the inbox message AND mark the
|
|
/// reminder as sent in a single sqlite transaction. Prevents the
|
|
/// orphan-reminder duplicate-delivery class of bugs that two separate
|
|
/// calls (send + `mark_reminder_sent`) could produce if the second one
|
|
/// failed transiently — the next scheduler tick would see the reminder
|
|
/// still due and redeliver. Either both writes commit or neither does;
|
|
/// re-running on failure is safe.
|
|
///
|
|
/// Emits a `Sent` event on the broadcast channel after the transaction
|
|
/// commits (so subscribers see the inbox message but never see a
|
|
/// "phantom" send for a transaction that rolled back).
|
|
/// Deliver a batch of reminders in a single transaction, reducing
|
|
/// lock contention on the shared sqlite connection under high
|
|
/// reminder volume. Returns per-item results so the scheduler can
|
|
/// record individual failures without aborting successful ones.
|
|
///
|
|
/// Items where the INSERT+UPDATE succeeds get a `MessageEvent::Sent`
|
|
/// emitted after the transaction commits. Items that fail are
|
|
/// returned as `Err` in the output vec (index-aligned with input).
|
|
pub fn deliver_reminders_batch(
|
|
&self,
|
|
items: &[(i64, String, String)], // (reminder_id, agent, body)
|
|
) -> Vec<Result<()>> {
|
|
if items.is_empty() {
|
|
return Vec::new();
|
|
}
|
|
let now = now_unix();
|
|
let mut conn = self.conn.lock().unwrap();
|
|
// Build one transaction for all deliveries so we hold the lock
|
|
// once rather than N times. On a batch-level error (e.g. DB
|
|
// corruption), fall back to returning per-item errors so the
|
|
// scheduler records the failure cleanly.
|
|
let tx = match conn.transaction() {
|
|
Ok(t) => t,
|
|
Err(e) => {
|
|
let err_str = format!("{e:#}");
|
|
return items
|
|
.iter()
|
|
.map(|_| Err(anyhow::anyhow!("{}", err_str.clone())))
|
|
.collect();
|
|
}
|
|
};
|
|
let mut results: Vec<Result<()>> = Vec::with_capacity(items.len());
|
|
// Per-item broker row ids — collected inside the transaction so
|
|
// we can emit Sent events with the correct id after commit.
|
|
let mut msg_ids: Vec<i64> = Vec::with_capacity(items.len());
|
|
for (id, agent, body) in items {
|
|
let r = (|| -> Result<i64> {
|
|
tx.execute(
|
|
"INSERT INTO messages (sender, recipient, body, sent_at) \
|
|
VALUES (?1, ?2, ?3, ?4)",
|
|
params!["reminder", agent, body, now],
|
|
)?;
|
|
let msg_id = tx.last_insert_rowid();
|
|
tx.execute(
|
|
"UPDATE reminders SET sent_at = ?1 WHERE id = ?2",
|
|
params![now, id],
|
|
)?;
|
|
Ok(msg_id)
|
|
})();
|
|
match r {
|
|
Ok(msg_id) => {
|
|
msg_ids.push(msg_id);
|
|
results.push(Ok(()));
|
|
}
|
|
Err(e) => {
|
|
msg_ids.push(-1);
|
|
results.push(Err(e));
|
|
}
|
|
}
|
|
}
|
|
if let Err(e) = tx.commit() {
|
|
let err_str = format!("{e:#}");
|
|
return items
|
|
.iter()
|
|
.map(|_| Err(anyhow::anyhow!("{}", err_str.clone())))
|
|
.collect();
|
|
}
|
|
drop(conn);
|
|
// Emit per-row Sent events (only for rows that succeeded).
|
|
for (((id, agent, body), result), msg_id) in items.iter().zip(results.iter()).zip(msg_ids.iter()) {
|
|
if result.is_ok() {
|
|
let _ = self.events.send(MessageEvent::Sent {
|
|
id: *msg_id,
|
|
from: "reminder".to_owned(),
|
|
to: agent.clone(),
|
|
body: body.clone(),
|
|
at: now,
|
|
in_reply_to: None,
|
|
});
|
|
tracing::debug!(reminder_id = id, %agent, "reminder delivered");
|
|
}
|
|
}
|
|
results
|
|
}
|
|
}
|
|
|
|
/// Idempotent messages-table migrations. Adds `acked_at` and
|
|
/// back-fills it for every already-delivered row, so the
|
|
/// pre-migration sessions count as "fully handled" and won't be
|
|
/// resurfaced by the first `requeue_inflight` after upgrade.
|
|
fn ensure_message_columns(conn: &Connection) -> Result<()> {
|
|
let has_acked: bool = conn
|
|
.prepare("SELECT 1 FROM pragma_table_info('messages') WHERE name = 'acked_at'")?
|
|
.exists([])?;
|
|
if !has_acked {
|
|
conn.execute_batch("ALTER TABLE messages ADD COLUMN acked_at INTEGER;")
|
|
.context("add messages.acked_at column")?;
|
|
// Backfill: treat every existing delivered row as acked. The
|
|
// session it was delivered to is gone, so requeue would just
|
|
// surface phantom traffic to whatever harness reads next.
|
|
conn.execute(
|
|
"UPDATE messages SET acked_at = delivered_at \
|
|
WHERE delivered_at IS NOT NULL AND acked_at IS NULL",
|
|
[],
|
|
)
|
|
.context("backfill messages.acked_at from delivered_at")?;
|
|
}
|
|
let has_reply: bool = conn
|
|
.prepare("SELECT 1 FROM pragma_table_info('messages') WHERE name = 'in_reply_to'")?
|
|
.exists([])?;
|
|
if !has_reply {
|
|
conn.execute_batch("ALTER TABLE messages ADD COLUMN in_reply_to INTEGER;")
|
|
.context("add messages.in_reply_to column")?;
|
|
// No backfill needed — existing messages simply have NULL here,
|
|
// meaning "root of a new thread", which is correct.
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Idempotent reminder-table migrations. `ALTER TABLE ADD COLUMN`
|
|
/// has no `IF NOT EXISTS` form in sqlite, so we probe
|
|
/// `pragma_table_info` per column. New deploys (table created by
|
|
/// SCHEMA in this commit cycle) skip the ALTER; pre-existing
|
|
/// broker.sqlite files get the columns added on next boot.
|
|
fn ensure_reminder_columns(conn: &Connection) -> Result<()> {
|
|
for (name, sql) in [
|
|
(
|
|
"attempt_count",
|
|
"ALTER TABLE reminders ADD COLUMN attempt_count INTEGER NOT NULL DEFAULT 0;",
|
|
),
|
|
(
|
|
"last_error",
|
|
"ALTER TABLE reminders ADD COLUMN last_error TEXT;",
|
|
),
|
|
] {
|
|
let has: bool = conn
|
|
.prepare(&format!(
|
|
"SELECT 1 FROM pragma_table_info('reminders') WHERE name = '{name}'"
|
|
))?
|
|
.exists([])?;
|
|
if !has {
|
|
conn.execute_batch(sql)
|
|
.with_context(|| format!("add reminders.{name} column"))?;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn now_unix() -> i64 {
|
|
SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.ok()
|
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
|
.unwrap_or(0)
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
|
|
/// Per-process counter so each test gets a unique sqlite path even
|
|
/// when threads run concurrently. Avoids pulling in a `tempfile`
|
|
/// dep just for this one module.
|
|
static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
|
|
|
|
struct TmpBroker {
|
|
path: std::path::PathBuf,
|
|
pub broker: Broker,
|
|
}
|
|
|
|
impl Drop for TmpBroker {
|
|
fn drop(&mut self) {
|
|
let _ = std::fs::remove_file(&self.path);
|
|
}
|
|
}
|
|
|
|
fn open_broker() -> TmpBroker {
|
|
let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
|
|
let pid = std::process::id();
|
|
let path = std::env::temp_dir().join(format!("hive-broker-test-{pid}-{n}.sqlite"));
|
|
let _ = std::fs::remove_file(&path);
|
|
let broker = Broker::open(&path).expect("open broker");
|
|
TmpBroker { path, broker }
|
|
}
|
|
|
|
fn msg(from: &str, to: &str, body: &str) -> Message {
|
|
Message {
|
|
from: from.to_owned(),
|
|
to: to.to_owned(),
|
|
body: body.to_owned(),
|
|
in_reply_to: None,
|
|
}
|
|
}
|
|
|
|
/// Convenience wrapper for tests that want single-pop semantics
|
|
/// — the broker only exposes `recv_batch` publicly now, so
|
|
/// every test that used to call `recv` goes through here.
|
|
fn pop_one(broker: &Broker, recipient: &str) -> Option<Delivery> {
|
|
let mut batch = broker.recv_batch(recipient, 1).unwrap();
|
|
batch.pop()
|
|
}
|
|
|
|
/// Happy path: send → recv → `ack_turn` drains the in-memory list
|
|
/// and marks the row `acked_at IS NOT NULL`. A second recv finds
|
|
/// nothing pending (the row stays in the table for vacuum).
|
|
#[test]
|
|
fn ack_turn_marks_delivered_rows_acked() {
|
|
let h = open_broker();
|
|
let broker = &h.broker;
|
|
broker.send(&msg("a", "b", "hi")).unwrap();
|
|
let d = pop_one(broker, "b").expect("popped");
|
|
assert_eq!(d.message.body, "hi");
|
|
assert!(!d.redelivered);
|
|
assert_eq!(broker.ack_turn("b").unwrap(), 1);
|
|
// ack_turn drained the unacked list; calling again is a no-op.
|
|
assert_eq!(broker.ack_turn("b").unwrap(), 0);
|
|
// Recv finds nothing — the row is now delivered + acked.
|
|
assert!(pop_one(broker, "b").is_none());
|
|
}
|
|
|
|
/// Crash-recovery: send → recv → (no ack) → `requeue_inflight`
|
|
/// resets `delivered_at` + tags the next pop as redelivered. After
|
|
/// that `ack_turn` closes it out cleanly.
|
|
#[test]
|
|
fn requeue_inflight_resurfaces_unacked_with_redelivered_flag() {
|
|
let h = open_broker();
|
|
let broker = &h.broker;
|
|
broker.send(&msg("a", "b", "hi")).unwrap();
|
|
let d1 = pop_one(broker, "b").expect("popped");
|
|
assert!(!d1.redelivered);
|
|
// Simulate harness crash: never call ack_turn. Now boot the
|
|
// new harness — requeue_inflight resurfaces the row.
|
|
assert_eq!(broker.requeue_inflight("b").unwrap(), 1);
|
|
let d2 = pop_one(broker, "b").expect("popped again");
|
|
assert_eq!(d2.message.body, "hi");
|
|
assert!(
|
|
d2.redelivered,
|
|
"second pop should be tagged redelivered"
|
|
);
|
|
assert_eq!(broker.ack_turn("b").unwrap(), 1);
|
|
}
|
|
|
|
/// Idempotency: a second `requeue_inflight` on the same recipient
|
|
/// finds nothing because the prior call already reset
|
|
/// `delivered_at` (the row is back in the pending state, not
|
|
/// inflight).
|
|
#[test]
|
|
fn requeue_inflight_is_idempotent() {
|
|
let h = open_broker();
|
|
let broker = &h.broker;
|
|
broker.send(&msg("a", "b", "hi")).unwrap();
|
|
pop_one(broker, "b").expect("popped");
|
|
assert_eq!(broker.requeue_inflight("b").unwrap(), 1);
|
|
// Second call: the row is pending (delivered_at IS NULL) so
|
|
// nothing matches the inflight filter.
|
|
assert_eq!(broker.requeue_inflight("b").unwrap(), 0);
|
|
}
|
|
|
|
/// Multiple messages, partial drain: pop two, `ack_turn` covers
|
|
/// both even though one was popped before the other.
|
|
#[test]
|
|
fn ack_turn_handles_batch() {
|
|
let h = open_broker();
|
|
let broker = &h.broker;
|
|
broker.send(&msg("a", "b", "one")).unwrap();
|
|
broker.send(&msg("a", "b", "two")).unwrap();
|
|
broker.send(&msg("a", "b", "three")).unwrap();
|
|
pop_one(broker, "b").expect("popped 1");
|
|
pop_one(broker, "b").expect("popped 2");
|
|
pop_one(broker, "b").expect("popped 3");
|
|
assert_eq!(broker.ack_turn("b").unwrap(), 3);
|
|
assert!(pop_one(broker, "b").is_none());
|
|
}
|
|
|
|
/// Vacuum filter respects the new `acked_at` semantics — a
|
|
/// delivered-but-not-acked row is NOT vacuumed regardless of
|
|
/// age (the requeue path needs it).
|
|
#[test]
|
|
fn vacuum_preserves_unacked_inflight_rows() {
|
|
let h = open_broker();
|
|
let broker = &h.broker;
|
|
broker.send(&msg("a", "b", "stuck")).unwrap();
|
|
pop_one(broker, "b").expect("popped");
|
|
// Wide window — should still skip unacked rows.
|
|
let removed = broker.vacuum_delivered(-i64::from(u8::MAX)).unwrap();
|
|
assert_eq!(removed, 0, "unacked inflight row must survive vacuum");
|
|
// After ack_turn the row is fair game.
|
|
broker.ack_turn("b").unwrap();
|
|
let removed = broker.vacuum_delivered(-i64::from(u8::MAX)).unwrap();
|
|
assert_eq!(removed, 1, "acked row is now vacuumable");
|
|
}
|
|
|
|
/// Recv ordering: requeued rows go back into FIFO position
|
|
/// (they keep their original id). New sends added after the
|
|
/// requeue arrive after them.
|
|
#[test]
|
|
fn requeued_rows_come_back_in_original_order() {
|
|
let h = open_broker();
|
|
let broker = &h.broker;
|
|
broker.send(&msg("a", "b", "first")).unwrap();
|
|
broker.send(&msg("a", "b", "second")).unwrap();
|
|
// Pop both, ack neither.
|
|
pop_one(broker, "b").expect("popped 1");
|
|
pop_one(broker, "b").expect("popped 2");
|
|
broker.requeue_inflight("b").unwrap();
|
|
// Now add a brand new message AFTER the requeue.
|
|
broker.send(&msg("a", "b", "third")).unwrap();
|
|
let d1 = pop_one(broker, "b").expect("re-pop 1");
|
|
assert_eq!(d1.message.body, "first");
|
|
assert!(d1.redelivered);
|
|
let d2 = pop_one(broker, "b").expect("re-pop 2");
|
|
assert_eq!(d2.message.body, "second");
|
|
assert!(d2.redelivered);
|
|
let d3 = pop_one(broker, "b").expect("re-pop 3");
|
|
assert_eq!(d3.message.body, "third");
|
|
assert!(
|
|
!d3.redelivered,
|
|
"fresh-send-after-requeue must NOT be tagged redelivered"
|
|
);
|
|
}
|
|
|
|
/// Happy path for `recv_batch`: pops in FIFO order, respects
|
|
/// `max`, leaves the rest pending for the next call.
|
|
#[test]
|
|
fn recv_batch_pops_fifo_capped_at_max() {
|
|
let h = open_broker();
|
|
let broker = &h.broker;
|
|
for i in 0..5 {
|
|
broker.send(&msg("a", "b", &format!("m{i}"))).unwrap();
|
|
}
|
|
let batch = broker.recv_batch("b", 3).unwrap();
|
|
let bodies: Vec<_> = batch.iter().map(|d| d.message.body.as_str()).collect();
|
|
assert_eq!(bodies, vec!["m0", "m1", "m2"]);
|
|
// Remaining two stay pending; a second batch drains them.
|
|
let next = broker.recv_batch("b", 10).unwrap();
|
|
let bodies: Vec<_> = next.iter().map(|d| d.message.body.as_str()).collect();
|
|
assert_eq!(bodies, vec!["m3", "m4"]);
|
|
// ack_turn closes out all five popped rows in one go.
|
|
assert_eq!(broker.ack_turn("b").unwrap(), 5);
|
|
}
|
|
|
|
/// `recv_batch` with no pending traffic returns an empty vec
|
|
/// (the "(empty)" path), not an error.
|
|
#[test]
|
|
fn recv_batch_returns_empty_when_idle() {
|
|
let h = open_broker();
|
|
let batch = h.broker.recv_batch("ghost", 5).unwrap();
|
|
assert!(batch.is_empty());
|
|
}
|
|
|
|
/// `max = 0` short-circuits without touching the DB (covered by
|
|
/// asserting we don't accidentally pop a pending row).
|
|
#[test]
|
|
fn recv_batch_zero_max_pops_nothing() {
|
|
let h = open_broker();
|
|
let broker = &h.broker;
|
|
broker.send(&msg("a", "b", "stay")).unwrap();
|
|
assert!(broker.recv_batch("b", 0).unwrap().is_empty());
|
|
// The pending row is still in flight for the next real recv.
|
|
let d = pop_one(broker, "b").expect("still pending");
|
|
assert_eq!(d.message.body, "stay");
|
|
}
|
|
|
|
/// `recv_batch` tags requeued rows with `redelivered: true` and
|
|
/// drains them from the per-recipient `requeued_ids` set so a
|
|
/// fresh follow-up recv after the batch doesn't double-tag.
|
|
#[test]
|
|
fn recv_batch_propagates_redelivered_flag() {
|
|
let h = open_broker();
|
|
let broker = &h.broker;
|
|
broker.send(&msg("a", "b", "one")).unwrap();
|
|
broker.send(&msg("a", "b", "two")).unwrap();
|
|
pop_one(broker, "b").expect("popped 1");
|
|
pop_one(broker, "b").expect("popped 2");
|
|
broker.requeue_inflight("b").unwrap();
|
|
let batch = broker.recv_batch("b", 5).unwrap();
|
|
assert_eq!(batch.len(), 2);
|
|
assert!(batch.iter().all(|d| d.redelivered));
|
|
// Fresh send after the batch is NOT tagged redelivered.
|
|
broker.send(&msg("a", "b", "three")).unwrap();
|
|
let d = pop_one(broker, "b").expect("re-pop 3");
|
|
assert_eq!(d.message.body, "three");
|
|
assert!(!d.redelivered);
|
|
}
|
|
|
|
/// Per-recipient isolation: `requeue_inflight("a")` doesn't touch
|
|
/// b's inflight rows.
|
|
#[test]
|
|
fn requeue_inflight_is_per_recipient() {
|
|
let h = open_broker();
|
|
let broker = &h.broker;
|
|
broker.send(&msg("x", "alice", "for alice")).unwrap();
|
|
broker.send(&msg("x", "bob", "for bob")).unwrap();
|
|
pop_one(broker, "alice").expect("popped alice");
|
|
pop_one(broker, "bob").expect("popped bob");
|
|
// Requeue only alice. Bob's row stays inflight.
|
|
assert_eq!(broker.requeue_inflight("alice").unwrap(), 1);
|
|
let d = pop_one(broker, "alice").expect("re-pop alice");
|
|
assert!(d.redelivered);
|
|
// Bob has nothing pending (his row is still delivered, not requeued).
|
|
assert!(pop_one(broker, "bob").is_none());
|
|
}
|
|
}
|