recv: fold batch drain into recv(max) — one tool, uniform list response

This commit is contained in:
damocles 2026-05-19 01:07:30 +02:00
parent 77b89bf2c6
commit 5d27ae3048
8 changed files with 271 additions and 417 deletions

View file

@ -87,13 +87,13 @@ async fn serve(stream: UnixStream, agent: String, coord: Arc<Coordinator>) -> Re
/// positive `wait_seconds`.
const RECV_LONG_POLL_MAX: std::time::Duration = std::time::Duration::from_secs(180);
/// Server-side hard cap on `RecvBatch.max`. Bounds the size of a
/// single round-trip so a confused caller can't drain the entire
/// inbox in one go and blow past wire-buffer sizes; everything above
/// the cap silently clamps. 32 is comfortably above the burst sizes
/// we've seen in practice (post-rebuild rescue, multi-agent reply
/// storms) and well under the per-message `MESSAGE_MAX_BYTES` * N
/// envelope budget.
/// Server-side hard cap on `Recv.max`. Bounds the size of a single
/// round-trip so a confused caller can't drain the entire inbox in
/// one go and blow past wire-buffer sizes; everything above the cap
/// silently clamps. 32 is comfortably above the burst sizes we've
/// seen in practice (post-rebuild rescue, multi-agent reply storms)
/// and well under the per-message `MESSAGE_MAX_BYTES` * N envelope
/// budget.
const RECV_BATCH_MAX: u32 = 32;
fn recv_timeout(wait_seconds: Option<u64>) -> std::time::Duration {
@ -108,25 +108,13 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
let broker = &coord.broker;
match req {
AgentRequest::Send { to, body } => handle_send(coord, agent, to, body),
AgentRequest::Recv { wait_seconds } => match broker
.recv_blocking(agent, recv_timeout(*wait_seconds))
.await
{
Ok(Some(d)) => AgentResponse::Message {
from: d.message.from,
body: d.message.body,
id: d.id,
redelivered: d.redelivered,
},
Ok(None) => AgentResponse::Empty,
Err(e) => AgentResponse::Err {
message: format!("{e:#}"),
},
},
AgentRequest::RecvBatch { max } => {
let cap = (*max).min(RECV_BATCH_MAX) as usize;
match broker.recv_batch(agent, cap) {
Ok(deliveries) => AgentResponse::Batch {
AgentRequest::Recv { wait_seconds, max } => {
let cap = max.unwrap_or(1).min(RECV_BATCH_MAX) as usize;
match broker
.recv_blocking_batch(agent, recv_timeout(*wait_seconds), cap)
.await
{
Ok(deliveries) => AgentResponse::Messages {
messages: deliveries
.into_iter()
.map(|d| hive_sh4re::DeliveredMessage {

View file

@ -90,11 +90,11 @@ pub struct PendingReminder {
/// time the row is due.
pub const MAX_REMINDER_ATTEMPTS: u32 = 5;
/// Intra-process broker event. `recv_blocking` listens on the same
/// channel as the dashboard forwarder; the forwarder re-emits each
/// event as a `DashboardEvent` with a freshly-stamped seq from the
/// Coordinator. The broker itself doesn't stamp seqs — that's a wire
/// concern, not a storage concern.
/// Intra-process broker event. `recv_blocking_batch` listens on the
/// same channel as the dashboard forwarder; the forwarder re-emits
/// each event as a `DashboardEvent` with a freshly-stamped seq from
/// the Coordinator. The broker itself doesn't stamp seqs — that's a
/// wire concern, not a storage concern.
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "snake_case", tag = "kind")]
pub enum MessageEvent {
@ -124,9 +124,9 @@ struct RecipientInflight {
/// single `UPDATE … WHERE id IN (…)` to set `acked_at`.
unacked_ids: Vec<i64>,
/// Message ids resurfaced by the most recent `requeue_inflight`
/// call. The next `recv` pop of any id in this set tags the
/// response with `redelivered: true` so the harness can prepend
/// the "may already be handled" hint to the wake prompt;
/// call. The next `recv_batch` pop of any id in this set tags
/// the response with `redelivered: true` so the harness can
/// prepend the "may already be handled" hint to the wake prompt;
/// successful pops drain the id from the set.
requeued_ids: HashSet<i64>,
}
@ -249,44 +249,54 @@ impl Broker {
Ok(u64::try_from(n.max(0)).unwrap_or(0))
}
/// Long-poll variant of `recv`: returns immediately if there's a
/// pending message; otherwise waits up to `timeout` for the broker to
/// emit a `Sent { to: recipient }` event, then retries the pop. Lets
/// agents react to new mail without polling their socket on a fixed
/// interval.
/// Long-poll variant of `recv_batch`: returns immediately if any
/// row is pending (popping up to `max`); otherwise waits up to
/// `timeout` for the broker to emit a `Sent { to: recipient }`
/// event and re-tries the pop. Lets agents react to new mail
/// without polling their socket on a fixed interval AND lets a
/// single round-trip drain a burst of messages.
///
/// **Subscribe-before-check order matters.** If we polled the sqlite
/// row first and only then called `subscribe()`, a concurrent `send`
/// landing in that window would commit + broadcast its event *before*
/// our receiver existed — and we'd then sit on the long-poll until
/// the timeout (or another, unrelated send) fired. That looked
/// externally like "the agent processed one wake then went deaf
/// until the operator poked it again". Subscribing first guarantees
/// any post-subscribe send notifies us; the redundant `recv()`
/// catches the message either way.
pub async fn recv_blocking(
/// **Subscribe-before-check order matters.** If we polled the
/// sqlite row first and only then called `subscribe()`, a
/// concurrent `send` landing in that window would commit +
/// broadcast its event *before* our receiver existed — and we'd
/// then sit on the long-poll until the timeout (or another,
/// unrelated send) fired. That looked externally like "the agent
/// processed one wake then went deaf until the operator poked it
/// again". Subscribing first guarantees any post-subscribe send
/// notifies us; the redundant `recv_batch()` catches the message
/// either way.
///
/// `max == 0` returns an empty vec without subscribing or waiting.
pub async fn recv_blocking_batch(
&self,
recipient: &str,
timeout: std::time::Duration,
) -> Result<Option<Delivery>> {
max: usize,
) -> Result<Vec<Delivery>> {
if max == 0 {
return Ok(Vec::new());
}
let mut rx = self.subscribe();
if let Some(d) = self.recv(recipient)? {
return Ok(Some(d));
let batch = self.recv_batch(recipient, max)?;
if !batch.is_empty() {
return Ok(batch);
}
let deadline = tokio::time::Instant::now() + timeout;
loop {
let Some(remaining) = deadline.checked_duration_since(tokio::time::Instant::now())
else {
return Ok(None);
return Ok(Vec::new());
};
match tokio::time::timeout(remaining, rx.recv()).await {
Err(_) => return Ok(None),
Err(_) => return Ok(Vec::new()),
// Channel lagged or closed — fall back to a single direct
// pop (in case we missed our notification while behind).
Ok(Err(_)) => return self.recv(recipient),
Ok(Err(_)) => return self.recv_batch(recipient, max),
Ok(Ok(MessageEvent::Sent { to, .. })) if to == recipient => {
if let Some(d) = self.recv(recipient)? {
return Ok(Some(d));
let batch = self.recv_batch(recipient, max)?;
if !batch.is_empty() {
return Ok(batch);
}
// Lost a race (concurrent recv elsewhere). Keep waiting.
}
@ -313,67 +323,22 @@ impl Broker {
Ok(u64::try_from(n).unwrap_or(0))
}
pub fn recv(&self, recipient: &str) -> Result<Option<Delivery>> {
// Lock order: inflight FIRST, then conn. `requeue_inflight` +
// `ack_turn` follow the same order so we never deadlock; the
// requeue path also needs both locks held together so a pop
// can't sneak in between its DB update + in-memory populate
// and miss the `redelivered` flag.
let mut inflight = self.inflight.lock().unwrap();
let conn = self.conn.lock().unwrap();
let row: Option<(i64, String, String, String)> = conn
.query_row(
"SELECT id, sender, recipient, body
FROM messages
WHERE recipient = ?1 AND delivered_at IS NULL
ORDER BY id ASC
LIMIT 1",
params![recipient],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
)
.optional()?;
let Some((id, from, to, body)) = row else {
return Ok(None);
};
conn.execute(
"UPDATE messages SET delivered_at = ?1 WHERE id = ?2",
params![now_unix(), id],
)?;
// Track the id so the next `ack_turn(recipient)` can sweep it,
// and check whether it was resurfaced by a recent
// `requeue_inflight` (in which case the wake prompt gets the
// "may already be handled" hint). Both ops are O(1) per pop;
// the hash-set lookup runs at most once per delivery.
let slot = inflight.entry(recipient.to_owned()).or_default();
slot.unacked_ids.push(id);
let redelivered = slot.requeued_ids.remove(&id);
drop(conn);
drop(inflight);
let _ = self.events.send(MessageEvent::Delivered {
from: from.clone(),
to: to.clone(),
body: body.clone(),
at: now_unix(),
});
Ok(Some(Delivery {
id,
redelivered,
message: Message { from, to, body },
}))
}
/// Pop up to `max` pending messages for `recipient` in one
/// round-trip. Same per-row semantics as `recv`: every popped row
/// is marked `delivered_at = NOW`, pushed onto the per-recipient
/// `unacked_ids` list (so the next `ack_turn` closes them out),
/// and tagged with `redelivered = true` if it was resurfaced by
/// the most recent `requeue_inflight`. Emits one
/// `MessageEvent::Delivered` per popped row so the dashboard
/// forwarder stream stays consistent with the single-row path.
/// round-trip. Every popped row is marked `delivered_at = NOW`,
/// pushed onto the per-recipient `unacked_ids` list (so the next
/// `ack_turn` closes them out), and tagged with
/// `redelivered = true` if it was resurfaced by the most recent
/// `requeue_inflight`. Emits one `MessageEvent::Delivered` per
/// popped row so the dashboard forwarder stream sees one event
/// per message regardless of batch size.
///
/// `max == 0` short-circuits to an empty vec (no DB hit); any
/// positive value caps the batch at `max`. FIFO order matches
/// `recv`.
/// positive value caps the batch at `max`. FIFO ordering.
///
/// Lock order: `inflight` FIRST, then `conn`. `requeue_inflight`
/// and `ack_turn` follow the same order so a concurrent pop can't
/// race the requeue's DB update vs in-memory populate and miss
/// the redelivered tag.
pub fn recv_batch(&self, recipient: &str, max: usize) -> Result<Vec<Delivery>> {
if max == 0 {
return Ok(Vec::new());
@ -827,6 +792,14 @@ mod tests {
}
}
/// Convenience wrapper for tests that want single-pop semantics
/// — the broker only exposes `recv_batch` publicly now, so
/// every test that used to call `recv` goes through here.
fn pop_one(broker: &Broker, recipient: &str) -> Option<Delivery> {
let mut batch = broker.recv_batch(recipient, 1).unwrap();
batch.pop()
}
/// Happy path: send → recv → `ack_turn` drains the in-memory list
/// and marks the row `acked_at IS NOT NULL`. A second recv finds
/// nothing pending (the row stays in the table for vacuum).
@ -835,14 +808,14 @@ mod tests {
let h = open_broker();
let broker = &h.broker;
broker.send(&msg("a", "b", "hi")).unwrap();
let d = broker.recv("b").unwrap().expect("popped");
let d = pop_one(broker, "b").expect("popped");
assert_eq!(d.message.body, "hi");
assert!(!d.redelivered);
assert_eq!(broker.ack_turn("b").unwrap(), 1);
// ack_turn drained the unacked list; calling again is a no-op.
assert_eq!(broker.ack_turn("b").unwrap(), 0);
// Recv finds nothing — the row is now delivered + acked.
assert!(broker.recv("b").unwrap().is_none());
assert!(pop_one(broker, "b").is_none());
}
/// Crash-recovery: send → recv → (no ack) → `requeue_inflight`
@ -853,12 +826,12 @@ mod tests {
let h = open_broker();
let broker = &h.broker;
broker.send(&msg("a", "b", "hi")).unwrap();
let d1 = broker.recv("b").unwrap().expect("popped");
let d1 = pop_one(broker, "b").expect("popped");
assert!(!d1.redelivered);
// Simulate harness crash: never call ack_turn. Now boot the
// new harness — requeue_inflight resurfaces the row.
assert_eq!(broker.requeue_inflight("b").unwrap(), 1);
let d2 = broker.recv("b").unwrap().expect("popped again");
let d2 = pop_one(broker, "b").expect("popped again");
assert_eq!(d2.message.body, "hi");
assert!(
d2.redelivered,
@ -876,7 +849,7 @@ mod tests {
let h = open_broker();
let broker = &h.broker;
broker.send(&msg("a", "b", "hi")).unwrap();
broker.recv("b").unwrap().expect("popped");
pop_one(broker, "b").expect("popped");
assert_eq!(broker.requeue_inflight("b").unwrap(), 1);
// Second call: the row is pending (delivered_at IS NULL) so
// nothing matches the inflight filter.
@ -892,11 +865,11 @@ mod tests {
broker.send(&msg("a", "b", "one")).unwrap();
broker.send(&msg("a", "b", "two")).unwrap();
broker.send(&msg("a", "b", "three")).unwrap();
broker.recv("b").unwrap().expect("popped 1");
broker.recv("b").unwrap().expect("popped 2");
broker.recv("b").unwrap().expect("popped 3");
pop_one(broker, "b").expect("popped 1");
pop_one(broker, "b").expect("popped 2");
pop_one(broker, "b").expect("popped 3");
assert_eq!(broker.ack_turn("b").unwrap(), 3);
assert!(broker.recv("b").unwrap().is_none());
assert!(pop_one(broker, "b").is_none());
}
/// Vacuum filter respects the new `acked_at` semantics — a
@ -907,7 +880,7 @@ mod tests {
let h = open_broker();
let broker = &h.broker;
broker.send(&msg("a", "b", "stuck")).unwrap();
broker.recv("b").unwrap().expect("popped");
pop_one(broker, "b").expect("popped");
// Wide window — should still skip unacked rows.
let removed = broker.vacuum_delivered(-i64::from(u8::MAX)).unwrap();
assert_eq!(removed, 0, "unacked inflight row must survive vacuum");
@ -927,18 +900,18 @@ mod tests {
broker.send(&msg("a", "b", "first")).unwrap();
broker.send(&msg("a", "b", "second")).unwrap();
// Pop both, ack neither.
broker.recv("b").unwrap().expect("popped 1");
broker.recv("b").unwrap().expect("popped 2");
pop_one(broker, "b").expect("popped 1");
pop_one(broker, "b").expect("popped 2");
broker.requeue_inflight("b").unwrap();
// Now add a brand new message AFTER the requeue.
broker.send(&msg("a", "b", "third")).unwrap();
let d1 = broker.recv("b").unwrap().expect("re-pop 1");
let d1 = pop_one(broker, "b").expect("re-pop 1");
assert_eq!(d1.message.body, "first");
assert!(d1.redelivered);
let d2 = broker.recv("b").unwrap().expect("re-pop 2");
let d2 = pop_one(broker, "b").expect("re-pop 2");
assert_eq!(d2.message.body, "second");
assert!(d2.redelivered);
let d3 = broker.recv("b").unwrap().expect("re-pop 3");
let d3 = pop_one(broker, "b").expect("re-pop 3");
assert_eq!(d3.message.body, "third");
assert!(
!d3.redelivered,
@ -984,7 +957,7 @@ mod tests {
broker.send(&msg("a", "b", "stay")).unwrap();
assert!(broker.recv_batch("b", 0).unwrap().is_empty());
// The pending row is still in flight for the next real recv.
let d = broker.recv("b").unwrap().expect("still pending");
let d = pop_one(broker, "b").expect("still pending");
assert_eq!(d.message.body, "stay");
}
@ -997,15 +970,15 @@ mod tests {
let broker = &h.broker;
broker.send(&msg("a", "b", "one")).unwrap();
broker.send(&msg("a", "b", "two")).unwrap();
broker.recv("b").unwrap().expect("popped 1");
broker.recv("b").unwrap().expect("popped 2");
pop_one(broker, "b").expect("popped 1");
pop_one(broker, "b").expect("popped 2");
broker.requeue_inflight("b").unwrap();
let batch = broker.recv_batch("b", 5).unwrap();
assert_eq!(batch.len(), 2);
assert!(batch.iter().all(|d| d.redelivered));
// Fresh send after the batch is NOT tagged redelivered.
broker.send(&msg("a", "b", "three")).unwrap();
let d = broker.recv("b").unwrap().expect("re-pop 3");
let d = pop_one(broker, "b").expect("re-pop 3");
assert_eq!(d.message.body, "three");
assert!(!d.redelivered);
}
@ -1018,13 +991,13 @@ mod tests {
let broker = &h.broker;
broker.send(&msg("x", "alice", "for alice")).unwrap();
broker.send(&msg("x", "bob", "for bob")).unwrap();
broker.recv("alice").unwrap().expect("popped alice");
broker.recv("bob").unwrap().expect("popped bob");
pop_one(broker, "alice").expect("popped alice");
pop_one(broker, "bob").expect("popped bob");
// Requeue only alice. Bob's row stays inflight.
assert_eq!(broker.requeue_inflight("alice").unwrap(), 1);
let d = broker.recv("alice").unwrap().expect("re-pop alice");
let d = pop_one(broker, "alice").expect("re-pop alice");
assert!(d.redelivered);
// Bob has nothing pending (his row is still delivered, not requeued).
assert!(broker.recv("bob").unwrap().is_none());
assert!(pop_one(broker, "bob").is_none());
}
}

View file

@ -138,26 +138,14 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResp
message: format!("{e:#}"),
},
},
ManagerRequest::Recv { wait_seconds } => match coord
.broker
.recv_blocking(MANAGER_AGENT, manager_recv_timeout(*wait_seconds))
.await
{
Ok(Some(d)) => ManagerResponse::Message {
from: d.message.from,
body: d.message.body,
id: d.id,
redelivered: d.redelivered,
},
Ok(None) => ManagerResponse::Empty,
Err(e) => ManagerResponse::Err {
message: format!("{e:#}"),
},
},
ManagerRequest::RecvBatch { max } => {
let cap = (*max).min(MANAGER_RECV_BATCH_MAX) as usize;
match coord.broker.recv_batch(MANAGER_AGENT, cap) {
Ok(deliveries) => ManagerResponse::Batch {
ManagerRequest::Recv { wait_seconds, max } => {
let cap = max.unwrap_or(1).min(MANAGER_RECV_BATCH_MAX) as usize;
match coord
.broker
.recv_blocking_batch(MANAGER_AGENT, manager_recv_timeout(*wait_seconds), cap)
.await
{
Ok(deliveries) => ManagerResponse::Messages {
messages: deliveries
.into_iter()
.map(|d| hive_sh4re::DeliveredMessage {