From 5d27ae3048a23e9e108d6b11ff72569fc212bbfb Mon Sep 17 00:00:00 2001 From: damocles Date: Tue, 19 May 2026 01:07:30 +0200 Subject: [PATCH] =?UTF-8?q?recv:=20fold=20batch=20drain=20into=20recv(max)?= =?UTF-8?q?=20=E2=80=94=20one=20tool,=20uniform=20list=20response?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CLAUDE.md | 49 +++---- hive-ag3nt/src/bin/hive-ag3nt.rs | 27 ++-- hive-ag3nt/src/bin/hive-m1nd.rs | 24 ++-- hive-ag3nt/src/mcp.rs | 223 ++++++++++--------------------- hive-c0re/src/agent_server.rs | 40 ++---- hive-c0re/src/broker.rs | 197 ++++++++++++--------------- hive-c0re/src/manager_server.rs | 28 ++-- hive-sh4re/src/lib.rs | 100 ++++++-------- 8 files changed, 271 insertions(+), 417 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index a7bf6fb..49cf893 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -183,29 +183,32 @@ read them à la carte. In-flight or recent context that hasn't earned a section yet. Prune freely. -- **Just landed:** inbox batching via `recv_batch(max)`. New - MCP tool on both agent + manager surfaces pops up to `max` - pending messages in a single round-trip (server-side cap - 32, `max = 0` short-circuits). Same delivery + ack - bookkeeping as `recv`: every popped row is marked - `delivered_at = NOW`, tracked on the per-recipient - `unacked_ids` list (so the next `AckTurn` closes them out), - and tagged `redelivered: true` if it was resurfaced by - `RequeueInflight`. Wake prompt's pending-inbox hint - rewritten to recommend `recv_batch(max: N)` instead of N - consecutive `recv`s. Wire: new - `AgentRequest::RecvBatch { max }` / - `AgentResponse::Batch { messages: Vec }` - (+ manager mirror) and a new `DeliveredMessage` struct in - `hive-sh4re` shared by both. `format_recv_batch` on the - mcp side renders the popped list with per-message - redelivery banners + `---` separators; empty batch - collapses to "(empty)" like single `recv`. 4 new broker - tests cover FIFO + cap, empty-when-idle, zero-max - short-circuit, and redelivered-flag propagation. Closes - the "inbox batching hint" item from the ergonomics - wishlist — the hint that was already in `format_wake_prompt` - is now actionable in one tool call instead of N. +- **Just landed:** inbox batching unified into `recv(max?)`. + No separate `recv_batch` tool — the existing `recv` tool + grew an optional `max: u32` arg (default 1, server-side + cap 32) so a single round-trip drains up to N popped rows + with the same delivery + ack bookkeeping per row + (`delivered_at = NOW`, `unacked_ids` list, redelivered + tag from `requeue_inflight`). `wait_seconds` still applies + to the FIRST message; once one lands the call drains up + to `max` in total — long-poll + drain compose. Wake + prompt's pending-inbox hint points at `recv(max: N)`. + Wire shape: `AgentRequest::Recv { wait_seconds, max }` + (added `max`), `AgentResponse::Messages { messages: + Vec }` (collapsed the old + `Message` + `Empty` + `Batch` trio into one always-list + variant — empty vec = idle). `DeliveredMessage` is a flat + shared struct in `hive-sh4re`. `format_recv` renders + single = the historical `from: X\n\nbody` block, multi = + `popped N message(s)` header with `---` separators + + per-message redelivery banners; empty = "(empty)". Broker + primitive: dropped the singular `recv`, kept just + `recv_batch(recipient, max)` and `recv_blocking_batch` + (which long-polls then drains via `recv_batch`). 4 new + broker tests on top of the existing 7 (recv_batch_* + family). Closes the "inbox batching hint" item from the + ergonomics wishlist with one tool instead of two; lower + context bloat in claude's prompt. - **Just landed:** lease-style message delivery / no-drop on turn fail. The `messages` table gained an `acked_at` column (idempotent ALTER + backfill = `delivered_at` so diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index e3b8973..18c5e96 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -176,20 +176,23 @@ async fn serve( // `None` as "peek, don't wait", which would tight-loop on // sleep(interval). The harness wants to park until a // message arrives, so opt into the full 180s cap. + // `max: None` (= 1) — the serve loop drives one turn per + // wake; claude itself calls recv(max: N) in-turn to drain + // a burst when the wake prompt mentions pending. client::request( socket, &AgentRequest::Recv { wait_seconds: Some(180), + max: None, }, ) .await; match recv { - Ok(AgentResponse::Message { - from, - body, - id: _, - redelivered, - }) => { + Ok(AgentResponse::Messages { messages }) if !messages.is_empty() => { + let first = messages.into_iter().next().expect("checked non-empty"); + let from = first.from; + let body = first.body; + let redelivered = first.redelivered; tracing::info!(%from, %body, %redelivered, "inbox"); let unread = inbox_unread(socket).await; bus.emit(LiveEvent::TurnStart { @@ -255,17 +258,15 @@ async fn serve( tracing::info!(%pending, "pending messages after turn; fetching next"); } } - Ok(AgentResponse::Empty) => { - // Idle: brief sleep before next poll to avoid busy-looping - // on consecutive Empty responses. The recv() call already - // waits up to 180s for messages, so this is just for - // responsiveness if recv() times out. + Ok(AgentResponse::Messages { .. }) => { + // Idle: empty list = nothing pending. Brief sleep + // before next poll so a stretch of empty long-poll + // returns doesn't tight-loop. tokio::time::sleep(interval).await; } Ok( AgentResponse::Ok | AgentResponse::Status { .. } - | AgentResponse::Batch { .. } | AgentResponse::Recent { .. } | AgentResponse::QuestionQueued { .. } | AgentResponse::LooseEnds { .. } @@ -302,7 +303,7 @@ fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String::new() } else { format!( - "\n\n({unread} more message(s) pending in your inbox — call `mcp__hyperhive__recv_batch` \ + "\n\n({unread} more message(s) pending in your inbox — call `mcp__hyperhive__recv` \ with `max: {unread}` to drain them all in one round-trip before acting.)" ) }; diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 4108fe5..8037ee8 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -129,21 +129,23 @@ async fn serve( let recv: Result = // Explicit long-poll: see hive-ag3nt's serve loop for the // rationale — recv now defaults to peek when wait_seconds - // is None. + // is None. `max: None` (= 1) keeps the serve loop driving + // one turn per wake; claude calls recv(max: N) in-turn to + // drain a burst when the wake prompt mentions pending. client::request( socket, &ManagerRequest::Recv { wait_seconds: Some(180), + max: None, }, ) .await; match recv { - Ok(ManagerResponse::Message { - from, - body, - id: _, - redelivered, - }) => { + Ok(ManagerResponse::Messages { messages }) if !messages.is_empty() => { + let first = messages.into_iter().next().expect("checked non-empty"); + let from = first.from; + let body = first.body; + let redelivered = first.redelivered; if from == SYSTEM_SENDER { // Helper events (ApprovalResolved / Spawned / Rebuilt / // Killed / Destroyed) — these are FYI for the manager; @@ -214,14 +216,14 @@ async fn serve( tracing::info!(%pending, "pending messages after turn; fetching next"); } } - Ok(ManagerResponse::Empty) => { - // Idle: sleep briefly before next long-poll attempt. + Ok(ManagerResponse::Messages { .. }) => { + // Idle: empty list = nothing pending. Brief sleep + // before the next long-poll attempt. tokio::time::sleep(interval).await; } Ok( ManagerResponse::Ok | ManagerResponse::Status { .. } - | ManagerResponse::Batch { .. } | ManagerResponse::QuestionQueued { .. } | ManagerResponse::Recent { .. } | ManagerResponse::Logs { .. } @@ -257,7 +259,7 @@ fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String::new() } else { format!( - "\n\n({unread} more message(s) pending in your inbox — call `mcp__hyperhive__recv_batch` \ + "\n\n({unread} more message(s) pending in your inbox — call `mcp__hyperhive__recv` \ with `max: {unread}` to drain them all in one round-trip before acting.)" ) }; diff --git a/hive-ag3nt/src/mcp.rs b/hive-ag3nt/src/mcp.rs index a26a1cf..fc01432 100644 --- a/hive-ag3nt/src/mcp.rs +++ b/hive-ag3nt/src/mcp.rs @@ -34,23 +34,14 @@ use crate::client; pub enum SocketReply { Ok, Err(String), - /// `id` is the broker's row id — not surfaced to claude but - /// useful for harness-side bookkeeping (not used in this module - /// today; the bin loops drive ack via `AckTurn` instead of - /// per-id). `redelivered` triggers the "may already be handled" - /// hint in `format_recv` so claude sees it when draining the - /// inbox in-turn. - Message { - from: String, - body: String, - id: i64, - redelivered: bool, - }, - /// Result of `recv_batch`: zero or more messages popped in one - /// round-trip. Empty vec is the equivalent of `Empty` for a - /// single `recv` — the format helper collapses it to "(empty)". - Batch(Vec), - Empty, + /// Unified `recv` result: zero or more messages popped in one + /// round-trip. Empty vec = "(empty)" path; single-message = the + /// standard wake body; multi = batch render with per-message + /// separators. Per-row `id` is opaque to claude (the bin loops + /// drive ack via `AckTurn`, not per-id); `redelivered` triggers + /// the "may already be handled" banner in `format_recv` for that + /// specific row. + Messages(Vec), Status(u64), QuestionQueued(i64), Recent(Vec), @@ -69,19 +60,7 @@ impl From for SocketReply { match r { hive_sh4re::AgentResponse::Ok => Self::Ok, hive_sh4re::AgentResponse::Err { message } => Self::Err(message), - hive_sh4re::AgentResponse::Message { - from, - body, - id, - redelivered, - } => Self::Message { - from, - body, - id, - redelivered, - }, - hive_sh4re::AgentResponse::Empty => Self::Empty, - hive_sh4re::AgentResponse::Batch { messages } => Self::Batch(messages), + hive_sh4re::AgentResponse::Messages { messages } => Self::Messages(messages), hive_sh4re::AgentResponse::Status { unread } => Self::Status(unread), hive_sh4re::AgentResponse::Recent { rows } => Self::Recent(rows), hive_sh4re::AgentResponse::QuestionQueued { id } => Self::QuestionQueued(id), @@ -107,19 +86,7 @@ impl From for SocketReply { match r { hive_sh4re::ManagerResponse::Ok => Self::Ok, hive_sh4re::ManagerResponse::Err { message } => Self::Err(message), - hive_sh4re::ManagerResponse::Message { - from, - body, - id, - redelivered, - } => Self::Message { - from, - body, - id, - redelivered, - }, - hive_sh4re::ManagerResponse::Empty => Self::Empty, - hive_sh4re::ManagerResponse::Batch { messages } => Self::Batch(messages), + hive_sh4re::ManagerResponse::Messages { messages } => Self::Messages(messages), hive_sh4re::ManagerResponse::Status { unread } => Self::Status(unread), hive_sh4re::ManagerResponse::QuestionQueued { id } => Self::QuestionQueued(id), hive_sh4re::ManagerResponse::Recent { rows } => Self::Recent(rows), @@ -153,55 +120,30 @@ pub fn format_ack(resp: Result, tool: &str, ok_msg: } } -/// Format helper for `recv` tools: `Message` → from + body block; -/// `Empty` → marker; anything else surfaces as an error. When the -/// broker tags the row as `redelivered` (popped before, never acked, -/// resurfaced after a harness restart) a short banner is prepended -/// so claude knows the side-effects of any previous handling may -/// already have happened. +/// Format helper for `recv`: renders zero, one, or many popped +/// messages. Empty list collapses to "(empty)" so claude doesn't go +/// hunting for content. A single message renders as the historical +/// `from: X\n\nbody` block (banner first if `redelivered`). A +/// multi-message batch renders with a `popped N message(s):` header +/// and `---` separators between bodies so the model can tell where +/// one ends and the next begins; per-message redelivery banners +/// included. pub fn format_recv(resp: Result) -> String { - match resp { - Ok(SocketReply::Message { - from, - body, - redelivered, - .. - }) => { - let banner = if redelivered { REDELIVERY_HINT } else { "" }; - format!("{banner}from: {from}\n\n{body}") - } - Ok(SocketReply::Empty) => "(empty)".into(), - Ok(SocketReply::Err(m)) => format!("recv failed: {m}"), - Ok(other) => format!("recv unexpected response: {other:?}"), - Err(e) => format!("recv transport error: {e:#}"), - } -} - -/// Header prepended to message bodies that were popped by a prior -/// harness session, never acked (turn crash / OOM / restart), and -/// resurfaced by `RequeueInflight` on this session's boot. Same -/// string surfaces in the wake prompt (see the bin loops) and the -/// in-turn `recv` tool result so claude sees the warning either way. -pub const REDELIVERY_HINT: &str = - "[redelivered after harness restart — may already be handled]\n"; - -/// Format helper for `recv_batch`: renders zero or more popped -/// messages as a single string. Empty batch collapses to "(empty)" -/// so claude doesn't go hunting for content. Each message is rendered -/// in the same `from: \n\n` shape as `format_recv` — -/// per-message redelivery banner included — separated by a thin rule -/// so the model can tell where one body ends and the next begins. -pub fn format_recv_batch(resp: Result) -> String { use std::fmt::Write as _; let messages = match resp { - Ok(SocketReply::Batch(m)) => m, - Ok(SocketReply::Err(m)) => return format!("recv_batch failed: {m}"), - Ok(other) => return format!("recv_batch unexpected response: {other:?}"), - Err(e) => return format!("recv_batch transport error: {e:#}"), + Ok(SocketReply::Messages(m)) => m, + Ok(SocketReply::Err(m)) => return format!("recv failed: {m}"), + Ok(other) => return format!("recv unexpected response: {other:?}"), + Err(e) => return format!("recv transport error: {e:#}"), }; if messages.is_empty() { return "(empty)".to_owned(); } + if messages.len() == 1 { + let m = &messages[0]; + let banner = if m.redelivered { REDELIVERY_HINT } else { "" }; + return format!("{banner}from: {}\n\n{}", m.from, m.body); + } let n = messages.len(); let mut out = format!("popped {n} message(s):\n\n"); for (i, m) in messages.iter().enumerate() { @@ -214,6 +156,14 @@ pub fn format_recv_batch(resp: Result) -> String { out } +/// Header prepended to message bodies that were popped by a prior +/// harness session, never acked (turn crash / OOM / restart), and +/// resurfaced by `RequeueInflight` on this session's boot. Same +/// string surfaces in the wake prompt (see the bin loops) and the +/// in-turn `recv` tool result so claude sees the warning either way. +pub const REDELIVERY_HINT: &str = + "[redelivered after harness restart — may already be handled]\n"; + /// Format helper for `get_loose_ends`: renders a short bulleted list /// of pending approvals + questions + reminders. Empty list collapses /// to a clear marker so claude doesn't go hunting for a payload that @@ -365,23 +315,22 @@ pub struct SendArgs { #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct RecvArgs { - /// How long to long-poll for a new message before returning the - /// empty marker. Capped at 60s server-side. Default (None) is - /// 30s. Useful when an agent wants to throttle wakes without - /// actually napping — pick a longer wait to coalesce bursts. + /// How long to long-poll for the FIRST message before returning + /// the empty marker. Capped at 60s server-side. Default (None) + /// is 30s. Useful when an agent wants to park its turn waiting + /// for any new work — pick a longer wait to coalesce bursts. #[serde(default)] pub wait_seconds: Option, -} - -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -pub struct RecvBatchArgs { - /// Maximum number of messages to pop in this round-trip. Capped - /// at 32 server-side; values above the cap clamp silently. - /// Returns whatever is currently pending (possibly zero) without - /// long-polling — call when you've been told the inbox is busy - /// (e.g. the wake prompt mentioned N pending) and want to drain - /// them in one tool call instead of N separate `recv` calls. - pub max: u32, + /// Maximum number of messages to pop in this round-trip. Default + /// (None) is 1 (single-message behaviour — exactly what you want + /// when you're called to drive a turn off the first wake). Pass + /// a higher value (capped at 32 server-side) when you've been + /// told the inbox has more queued (the wake prompt mentions + /// pending count) and want to drain everything in one tool call. + /// Once the long-poll wakes up, the call drains up to `max` in + /// total before returning — no extra round-trip needed. + #[serde(default)] + pub max: Option, } /// MCP tool args for `remind`. Exactly one of `delay_seconds` or @@ -535,14 +484,21 @@ impl AgentServer { } #[tool( - description = "Pop one message from this agent's inbox. Returns the sender and body, \ - or an empty marker if nothing is waiting. Without `wait_seconds` (or with 0) the \ - call returns immediately — a cheap 'anything pending?' peek. Pass a positive \ + description = "Pop messages from this agent's inbox. Returns one or more messages, or \ + an empty marker if nothing is waiting. \n\n\ + **Single-message default**: with no args (or `max: 1`) you get the next message — \ + same behaviour the harness uses to drive a turn. Without `wait_seconds` (or with 0) \ + the call returns immediately — a cheap 'anything pending?' peek. Pass a positive \ `wait_seconds` (capped at 180) to park the turn waiting for new work — incoming \ messages wake you instantly, otherwise the call returns empty at the timeout. \ - That's strictly better than a fixed shell `sleep`. Typical pattern: when you have \ - nothing else useful to do, call `recv(wait_seconds: 180)` to park until \ - something arrives." + That's strictly better than a fixed shell `sleep`. \n\n\ + **Batch drain**: pass `max: N` (capped at 32) to drain up to N messages in one \ + round-trip. Use this when the wake prompt told you the inbox has more queued, or \ + any time you expect a burst — one tool call beats N consecutive single recvs. \ + `wait_seconds` still applies to the FIRST message; once one arrives the call drains \ + up to `max` in total. Empty result reported the same way regardless of `max`. \n\n\ + Typical pattern: when you have nothing else useful to do, call \ + `recv(wait_seconds: 180)` to park until something arrives." )] async fn recv(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); @@ -550,6 +506,7 @@ impl AgentServer { let (resp, retries) = self .dispatch(hive_sh4re::AgentRequest::Recv { wait_seconds: args.wait_seconds, + max: args.max, }) .await; annotate_retries(format_recv(resp), retries) @@ -557,26 +514,6 @@ impl AgentServer { .await } - #[tool( - description = "Pop up to `max` messages from this agent's inbox in a single round-trip \ - (no long-poll — returns whatever's pending immediately, possibly zero). Use this \ - when the wake prompt tells you the inbox has more messages queued, or any time you \ - know you'll be draining several at once: one tool call beats N consecutive `recv`s. \ - Per-message redelivery banners + ack bookkeeping work the same as `recv`. `max` \ - caps at 32 server-side; pass whatever you reasonably expect to handle this turn." - )] - async fn recv_batch(&self, Parameters(args): Parameters) -> String { - let log = format!("{args:?}"); - let max = args.max; - run_tool_envelope("recv_batch", log, async move { - let (resp, retries) = self - .dispatch(hive_sh4re::AgentRequest::RecvBatch { max }) - .await; - annotate_retries(format_recv_batch(resp), retries) - }) - .await - } - #[tool( description = "List loose ends pending against this agent: unanswered questions \ where you are the asker (waiting on someone) or the target (someone's waiting on \ @@ -866,11 +803,14 @@ impl ManagerServer { } #[tool( - description = "Pop one message from the manager inbox. Returns sender + body, or \ - empty. Without `wait_seconds` (or 0) returns immediately — a cheap inbox peek. \ - Pass a positive value (capped at 180) to park until either a message arrives \ - or the timeout fires; prefer a long wait (120 or 180) over ending a turn \ - early when you have nothing else to do." + description = "Pop messages from the manager inbox. Default returns one (sender + \ + body) or empty. Without `wait_seconds` (or 0) returns immediately — a cheap inbox \ + peek. Pass a positive value (capped at 180) to park until either a message arrives \ + or the timeout fires; prefer a long wait (120 or 180) over ending a turn early \ + when you have nothing else to do. \n\n\ + Pass `max: N` (capped at 32) to drain up to N messages in one round-trip — useful \ + when the wake prompt tells you the inbox has more queued. `wait_seconds` still \ + applies to the FIRST message; once one lands the call drains up to `max` in total." )] async fn recv(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); @@ -878,6 +818,7 @@ impl ManagerServer { let (resp, retries) = self .dispatch(hive_sh4re::ManagerRequest::Recv { wait_seconds: args.wait_seconds, + max: args.max, }) .await; annotate_retries(format_recv(resp), retries) @@ -885,26 +826,6 @@ impl ManagerServer { .await } - #[tool( - description = "Pop up to `max` messages from the manager inbox in a single round-trip \ - (no long-poll — returns whatever's pending immediately, possibly zero). Use this \ - when the wake prompt tells you the inbox has more messages queued, or any time you \ - know you'll be draining several at once: one tool call beats N consecutive `recv`s. \ - Per-message redelivery banners + ack bookkeeping work the same as `recv`. `max` \ - caps at 32 server-side; pass whatever you reasonably expect to handle this turn." - )] - async fn recv_batch(&self, Parameters(args): Parameters) -> String { - let log = format!("{args:?}"); - let max = args.max; - run_tool_envelope("recv_batch", log, async move { - let (resp, retries) = self - .dispatch(hive_sh4re::ManagerRequest::RecvBatch { max }) - .await; - annotate_retries(format_recv_batch(resp), retries) - }) - .await - } - #[tool( description = "Queue a Spawn approval for a brand-new sub-agent. The operator \ approves on the dashboard before the container is actually created." diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index 3138f75..c058122 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -87,13 +87,13 @@ async fn serve(stream: UnixStream, agent: String, coord: Arc) -> Re /// positive `wait_seconds`. const RECV_LONG_POLL_MAX: std::time::Duration = std::time::Duration::from_secs(180); -/// Server-side hard cap on `RecvBatch.max`. Bounds the size of a -/// single round-trip so a confused caller can't drain the entire -/// inbox in one go and blow past wire-buffer sizes; everything above -/// the cap silently clamps. 32 is comfortably above the burst sizes -/// we've seen in practice (post-rebuild rescue, multi-agent reply -/// storms) and well under the per-message `MESSAGE_MAX_BYTES` * N -/// envelope budget. +/// Server-side hard cap on `Recv.max`. Bounds the size of a single +/// round-trip so a confused caller can't drain the entire inbox in +/// one go and blow past wire-buffer sizes; everything above the cap +/// silently clamps. 32 is comfortably above the burst sizes we've +/// seen in practice (post-rebuild rescue, multi-agent reply storms) +/// and well under the per-message `MESSAGE_MAX_BYTES` * N envelope +/// budget. const RECV_BATCH_MAX: u32 = 32; fn recv_timeout(wait_seconds: Option) -> std::time::Duration { @@ -108,25 +108,13 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> let broker = &coord.broker; match req { AgentRequest::Send { to, body } => handle_send(coord, agent, to, body), - AgentRequest::Recv { wait_seconds } => match broker - .recv_blocking(agent, recv_timeout(*wait_seconds)) - .await - { - Ok(Some(d)) => AgentResponse::Message { - from: d.message.from, - body: d.message.body, - id: d.id, - redelivered: d.redelivered, - }, - Ok(None) => AgentResponse::Empty, - Err(e) => AgentResponse::Err { - message: format!("{e:#}"), - }, - }, - AgentRequest::RecvBatch { max } => { - let cap = (*max).min(RECV_BATCH_MAX) as usize; - match broker.recv_batch(agent, cap) { - Ok(deliveries) => AgentResponse::Batch { + AgentRequest::Recv { wait_seconds, max } => { + let cap = max.unwrap_or(1).min(RECV_BATCH_MAX) as usize; + match broker + .recv_blocking_batch(agent, recv_timeout(*wait_seconds), cap) + .await + { + Ok(deliveries) => AgentResponse::Messages { messages: deliveries .into_iter() .map(|d| hive_sh4re::DeliveredMessage { diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 7e739ed..12fd7dd 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -90,11 +90,11 @@ pub struct PendingReminder { /// time the row is due. pub const MAX_REMINDER_ATTEMPTS: u32 = 5; -/// Intra-process broker event. `recv_blocking` listens on the same -/// channel as the dashboard forwarder; the forwarder re-emits each -/// event as a `DashboardEvent` with a freshly-stamped seq from the -/// Coordinator. The broker itself doesn't stamp seqs — that's a wire -/// concern, not a storage concern. +/// Intra-process broker event. `recv_blocking_batch` listens on the +/// same channel as the dashboard forwarder; the forwarder re-emits +/// each event as a `DashboardEvent` with a freshly-stamped seq from +/// the Coordinator. The broker itself doesn't stamp seqs — that's a +/// wire concern, not a storage concern. #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "snake_case", tag = "kind")] pub enum MessageEvent { @@ -124,9 +124,9 @@ struct RecipientInflight { /// single `UPDATE … WHERE id IN (…)` to set `acked_at`. unacked_ids: Vec, /// Message ids resurfaced by the most recent `requeue_inflight` - /// call. The next `recv` pop of any id in this set tags the - /// response with `redelivered: true` so the harness can prepend - /// the "may already be handled" hint to the wake prompt; + /// call. The next `recv_batch` pop of any id in this set tags + /// the response with `redelivered: true` so the harness can + /// prepend the "may already be handled" hint to the wake prompt; /// successful pops drain the id from the set. requeued_ids: HashSet, } @@ -249,44 +249,54 @@ impl Broker { Ok(u64::try_from(n.max(0)).unwrap_or(0)) } - /// Long-poll variant of `recv`: returns immediately if there's a - /// pending message; otherwise waits up to `timeout` for the broker to - /// emit a `Sent { to: recipient }` event, then retries the pop. Lets - /// agents react to new mail without polling their socket on a fixed - /// interval. + /// Long-poll variant of `recv_batch`: returns immediately if any + /// row is pending (popping up to `max`); otherwise waits up to + /// `timeout` for the broker to emit a `Sent { to: recipient }` + /// event and re-tries the pop. Lets agents react to new mail + /// without polling their socket on a fixed interval AND lets a + /// single round-trip drain a burst of messages. /// - /// **Subscribe-before-check order matters.** If we polled the sqlite - /// row first and only then called `subscribe()`, a concurrent `send` - /// landing in that window would commit + broadcast its event *before* - /// our receiver existed — and we'd then sit on the long-poll until - /// the timeout (or another, unrelated send) fired. That looked - /// externally like "the agent processed one wake then went deaf - /// until the operator poked it again". Subscribing first guarantees - /// any post-subscribe send notifies us; the redundant `recv()` - /// catches the message either way. - pub async fn recv_blocking( + /// **Subscribe-before-check order matters.** If we polled the + /// sqlite row first and only then called `subscribe()`, a + /// concurrent `send` landing in that window would commit + + /// broadcast its event *before* our receiver existed — and we'd + /// then sit on the long-poll until the timeout (or another, + /// unrelated send) fired. That looked externally like "the agent + /// processed one wake then went deaf until the operator poked it + /// again". Subscribing first guarantees any post-subscribe send + /// notifies us; the redundant `recv_batch()` catches the message + /// either way. + /// + /// `max == 0` returns an empty vec without subscribing or waiting. + pub async fn recv_blocking_batch( &self, recipient: &str, timeout: std::time::Duration, - ) -> Result> { + max: usize, + ) -> Result> { + if max == 0 { + return Ok(Vec::new()); + } let mut rx = self.subscribe(); - if let Some(d) = self.recv(recipient)? { - return Ok(Some(d)); + let batch = self.recv_batch(recipient, max)?; + if !batch.is_empty() { + return Ok(batch); } let deadline = tokio::time::Instant::now() + timeout; loop { let Some(remaining) = deadline.checked_duration_since(tokio::time::Instant::now()) else { - return Ok(None); + return Ok(Vec::new()); }; match tokio::time::timeout(remaining, rx.recv()).await { - Err(_) => return Ok(None), + Err(_) => return Ok(Vec::new()), // Channel lagged or closed — fall back to a single direct // pop (in case we missed our notification while behind). - Ok(Err(_)) => return self.recv(recipient), + Ok(Err(_)) => return self.recv_batch(recipient, max), Ok(Ok(MessageEvent::Sent { to, .. })) if to == recipient => { - if let Some(d) = self.recv(recipient)? { - return Ok(Some(d)); + let batch = self.recv_batch(recipient, max)?; + if !batch.is_empty() { + return Ok(batch); } // Lost a race (concurrent recv elsewhere). Keep waiting. } @@ -313,67 +323,22 @@ impl Broker { Ok(u64::try_from(n).unwrap_or(0)) } - pub fn recv(&self, recipient: &str) -> Result> { - // Lock order: inflight FIRST, then conn. `requeue_inflight` + - // `ack_turn` follow the same order so we never deadlock; the - // requeue path also needs both locks held together so a pop - // can't sneak in between its DB update + in-memory populate - // and miss the `redelivered` flag. - let mut inflight = self.inflight.lock().unwrap(); - let conn = self.conn.lock().unwrap(); - let row: Option<(i64, String, String, String)> = conn - .query_row( - "SELECT id, sender, recipient, body - FROM messages - WHERE recipient = ?1 AND delivered_at IS NULL - ORDER BY id ASC - LIMIT 1", - params![recipient], - |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), - ) - .optional()?; - let Some((id, from, to, body)) = row else { - return Ok(None); - }; - conn.execute( - "UPDATE messages SET delivered_at = ?1 WHERE id = ?2", - params![now_unix(), id], - )?; - // Track the id so the next `ack_turn(recipient)` can sweep it, - // and check whether it was resurfaced by a recent - // `requeue_inflight` (in which case the wake prompt gets the - // "may already be handled" hint). Both ops are O(1) per pop; - // the hash-set lookup runs at most once per delivery. - let slot = inflight.entry(recipient.to_owned()).or_default(); - slot.unacked_ids.push(id); - let redelivered = slot.requeued_ids.remove(&id); - drop(conn); - drop(inflight); - let _ = self.events.send(MessageEvent::Delivered { - from: from.clone(), - to: to.clone(), - body: body.clone(), - at: now_unix(), - }); - Ok(Some(Delivery { - id, - redelivered, - message: Message { from, to, body }, - })) - } - /// Pop up to `max` pending messages for `recipient` in one - /// round-trip. Same per-row semantics as `recv`: every popped row - /// is marked `delivered_at = NOW`, pushed onto the per-recipient - /// `unacked_ids` list (so the next `ack_turn` closes them out), - /// and tagged with `redelivered = true` if it was resurfaced by - /// the most recent `requeue_inflight`. Emits one - /// `MessageEvent::Delivered` per popped row so the dashboard - /// forwarder stream stays consistent with the single-row path. + /// round-trip. Every popped row is marked `delivered_at = NOW`, + /// pushed onto the per-recipient `unacked_ids` list (so the next + /// `ack_turn` closes them out), and tagged with + /// `redelivered = true` if it was resurfaced by the most recent + /// `requeue_inflight`. Emits one `MessageEvent::Delivered` per + /// popped row so the dashboard forwarder stream sees one event + /// per message regardless of batch size. /// /// `max == 0` short-circuits to an empty vec (no DB hit); any - /// positive value caps the batch at `max`. FIFO order matches - /// `recv`. + /// positive value caps the batch at `max`. FIFO ordering. + /// + /// Lock order: `inflight` FIRST, then `conn`. `requeue_inflight` + /// and `ack_turn` follow the same order so a concurrent pop can't + /// race the requeue's DB update vs in-memory populate and miss + /// the redelivered tag. pub fn recv_batch(&self, recipient: &str, max: usize) -> Result> { if max == 0 { return Ok(Vec::new()); @@ -827,6 +792,14 @@ mod tests { } } + /// Convenience wrapper for tests that want single-pop semantics + /// — the broker only exposes `recv_batch` publicly now, so + /// every test that used to call `recv` goes through here. + fn pop_one(broker: &Broker, recipient: &str) -> Option { + let mut batch = broker.recv_batch(recipient, 1).unwrap(); + batch.pop() + } + /// Happy path: send → recv → `ack_turn` drains the in-memory list /// and marks the row `acked_at IS NOT NULL`. A second recv finds /// nothing pending (the row stays in the table for vacuum). @@ -835,14 +808,14 @@ mod tests { let h = open_broker(); let broker = &h.broker; broker.send(&msg("a", "b", "hi")).unwrap(); - let d = broker.recv("b").unwrap().expect("popped"); + let d = pop_one(broker, "b").expect("popped"); assert_eq!(d.message.body, "hi"); assert!(!d.redelivered); assert_eq!(broker.ack_turn("b").unwrap(), 1); // ack_turn drained the unacked list; calling again is a no-op. assert_eq!(broker.ack_turn("b").unwrap(), 0); // Recv finds nothing — the row is now delivered + acked. - assert!(broker.recv("b").unwrap().is_none()); + assert!(pop_one(broker, "b").is_none()); } /// Crash-recovery: send → recv → (no ack) → `requeue_inflight` @@ -853,12 +826,12 @@ mod tests { let h = open_broker(); let broker = &h.broker; broker.send(&msg("a", "b", "hi")).unwrap(); - let d1 = broker.recv("b").unwrap().expect("popped"); + let d1 = pop_one(broker, "b").expect("popped"); assert!(!d1.redelivered); // Simulate harness crash: never call ack_turn. Now boot the // new harness — requeue_inflight resurfaces the row. assert_eq!(broker.requeue_inflight("b").unwrap(), 1); - let d2 = broker.recv("b").unwrap().expect("popped again"); + let d2 = pop_one(broker, "b").expect("popped again"); assert_eq!(d2.message.body, "hi"); assert!( d2.redelivered, @@ -876,7 +849,7 @@ mod tests { let h = open_broker(); let broker = &h.broker; broker.send(&msg("a", "b", "hi")).unwrap(); - broker.recv("b").unwrap().expect("popped"); + pop_one(broker, "b").expect("popped"); assert_eq!(broker.requeue_inflight("b").unwrap(), 1); // Second call: the row is pending (delivered_at IS NULL) so // nothing matches the inflight filter. @@ -892,11 +865,11 @@ mod tests { broker.send(&msg("a", "b", "one")).unwrap(); broker.send(&msg("a", "b", "two")).unwrap(); broker.send(&msg("a", "b", "three")).unwrap(); - broker.recv("b").unwrap().expect("popped 1"); - broker.recv("b").unwrap().expect("popped 2"); - broker.recv("b").unwrap().expect("popped 3"); + pop_one(broker, "b").expect("popped 1"); + pop_one(broker, "b").expect("popped 2"); + pop_one(broker, "b").expect("popped 3"); assert_eq!(broker.ack_turn("b").unwrap(), 3); - assert!(broker.recv("b").unwrap().is_none()); + assert!(pop_one(broker, "b").is_none()); } /// Vacuum filter respects the new `acked_at` semantics — a @@ -907,7 +880,7 @@ mod tests { let h = open_broker(); let broker = &h.broker; broker.send(&msg("a", "b", "stuck")).unwrap(); - broker.recv("b").unwrap().expect("popped"); + pop_one(broker, "b").expect("popped"); // Wide window — should still skip unacked rows. let removed = broker.vacuum_delivered(-i64::from(u8::MAX)).unwrap(); assert_eq!(removed, 0, "unacked inflight row must survive vacuum"); @@ -927,18 +900,18 @@ mod tests { broker.send(&msg("a", "b", "first")).unwrap(); broker.send(&msg("a", "b", "second")).unwrap(); // Pop both, ack neither. - broker.recv("b").unwrap().expect("popped 1"); - broker.recv("b").unwrap().expect("popped 2"); + pop_one(broker, "b").expect("popped 1"); + pop_one(broker, "b").expect("popped 2"); broker.requeue_inflight("b").unwrap(); // Now add a brand new message AFTER the requeue. broker.send(&msg("a", "b", "third")).unwrap(); - let d1 = broker.recv("b").unwrap().expect("re-pop 1"); + let d1 = pop_one(broker, "b").expect("re-pop 1"); assert_eq!(d1.message.body, "first"); assert!(d1.redelivered); - let d2 = broker.recv("b").unwrap().expect("re-pop 2"); + let d2 = pop_one(broker, "b").expect("re-pop 2"); assert_eq!(d2.message.body, "second"); assert!(d2.redelivered); - let d3 = broker.recv("b").unwrap().expect("re-pop 3"); + let d3 = pop_one(broker, "b").expect("re-pop 3"); assert_eq!(d3.message.body, "third"); assert!( !d3.redelivered, @@ -984,7 +957,7 @@ mod tests { broker.send(&msg("a", "b", "stay")).unwrap(); assert!(broker.recv_batch("b", 0).unwrap().is_empty()); // The pending row is still in flight for the next real recv. - let d = broker.recv("b").unwrap().expect("still pending"); + let d = pop_one(broker, "b").expect("still pending"); assert_eq!(d.message.body, "stay"); } @@ -997,15 +970,15 @@ mod tests { let broker = &h.broker; broker.send(&msg("a", "b", "one")).unwrap(); broker.send(&msg("a", "b", "two")).unwrap(); - broker.recv("b").unwrap().expect("popped 1"); - broker.recv("b").unwrap().expect("popped 2"); + pop_one(broker, "b").expect("popped 1"); + pop_one(broker, "b").expect("popped 2"); broker.requeue_inflight("b").unwrap(); let batch = broker.recv_batch("b", 5).unwrap(); assert_eq!(batch.len(), 2); assert!(batch.iter().all(|d| d.redelivered)); // Fresh send after the batch is NOT tagged redelivered. broker.send(&msg("a", "b", "three")).unwrap(); - let d = broker.recv("b").unwrap().expect("re-pop 3"); + let d = pop_one(broker, "b").expect("re-pop 3"); assert_eq!(d.message.body, "three"); assert!(!d.redelivered); } @@ -1018,13 +991,13 @@ mod tests { let broker = &h.broker; broker.send(&msg("x", "alice", "for alice")).unwrap(); broker.send(&msg("x", "bob", "for bob")).unwrap(); - broker.recv("alice").unwrap().expect("popped alice"); - broker.recv("bob").unwrap().expect("popped bob"); + pop_one(broker, "alice").expect("popped alice"); + pop_one(broker, "bob").expect("popped bob"); // Requeue only alice. Bob's row stays inflight. assert_eq!(broker.requeue_inflight("alice").unwrap(), 1); - let d = broker.recv("alice").unwrap().expect("re-pop alice"); + let d = pop_one(broker, "alice").expect("re-pop alice"); assert!(d.redelivered); // Bob has nothing pending (his row is still delivered, not requeued). - assert!(broker.recv("bob").unwrap().is_none()); + assert!(pop_one(broker, "bob").is_none()); } } diff --git a/hive-c0re/src/manager_server.rs b/hive-c0re/src/manager_server.rs index 69b074e..5228c21 100644 --- a/hive-c0re/src/manager_server.rs +++ b/hive-c0re/src/manager_server.rs @@ -138,26 +138,14 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResp message: format!("{e:#}"), }, }, - ManagerRequest::Recv { wait_seconds } => match coord - .broker - .recv_blocking(MANAGER_AGENT, manager_recv_timeout(*wait_seconds)) - .await - { - Ok(Some(d)) => ManagerResponse::Message { - from: d.message.from, - body: d.message.body, - id: d.id, - redelivered: d.redelivered, - }, - Ok(None) => ManagerResponse::Empty, - Err(e) => ManagerResponse::Err { - message: format!("{e:#}"), - }, - }, - ManagerRequest::RecvBatch { max } => { - let cap = (*max).min(MANAGER_RECV_BATCH_MAX) as usize; - match coord.broker.recv_batch(MANAGER_AGENT, cap) { - Ok(deliveries) => ManagerResponse::Batch { + ManagerRequest::Recv { wait_seconds, max } => { + let cap = max.unwrap_or(1).min(MANAGER_RECV_BATCH_MAX) as usize; + match coord + .broker + .recv_blocking_batch(MANAGER_AGENT, manager_recv_timeout(*wait_seconds), cap) + .await + { + Ok(deliveries) => ManagerResponse::Messages { messages: deliveries .into_iter() .map(|d| hive_sh4re::DeliveredMessage { diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index 0203879..79aa9a3 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -174,10 +174,12 @@ pub struct InboxRow { pub at: i64, } -/// One delivered message in a `RecvBatch` response. Same fields as -/// `AgentResponse::Message` / `ManagerResponse::Message` without the -/// variant wrapper — a batch returns a `Vec` so the -/// harness can iterate without unpicking N separate top-level frames. +/// One delivered message in a `Recv` response. The unified +/// `Recv { max }` always returns a `Vec` — single +/// pop = a one-element vec, batch = up to `max` elements, idle = +/// empty. Each row carries the broker's id + redelivered flag so the +/// harness can drive `AckTurn` and surface the "may already be +/// handled" hint per-row. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DeliveredMessage { pub from: String, @@ -286,22 +288,25 @@ pub enum CancelLooseEndKind { pub enum AgentRequest { /// Send a message to another agent. Send { to: String, body: String }, - /// Pop one pending message from this agent's inbox. Long-polls - /// up to `wait_seconds` (capped at 60s server-side, default 30s - /// when None) before returning `Empty`. + /// Pop pending messages from this agent's inbox. Always returns + /// a list (`Messages { messages }`) — empty when nothing's + /// pending. `max` caps the batch size (default 1 = single-message + /// behaviour, server-side cap 32). `wait_seconds` long-polls for + /// the first message; once one arrives (or one is already + /// pending), the call drains up to `max` in total before + /// returning. Same delivery + ack bookkeeping per row as before: + /// `delivered_at = NOW`, tracked on the per-recipient + /// `unacked_ids` list (the next `AckTurn` closes them out), and + /// each row carries `redelivered = true` if `RequeueInflight` + /// resurfaced it. Recv { #[serde(default)] wait_seconds: Option, + /// Maximum number of messages to pop. None = 1 (single). + /// Server-side cap is 32; values above clamp silently. + #[serde(default)] + max: Option, }, - /// Pop up to `max` pending messages in one round-trip. No - /// long-poll — returns whatever's currently queued (possibly - /// zero) immediately. Same delivery + ack bookkeeping as `Recv`: - /// every popped row is marked `delivered_at = NOW`, tracked in - /// the broker's per-recipient `unacked_ids` list (so the next - /// `AckTurn` closes them out), and tagged `redelivered = true` if - /// it was resurfaced by `RequeueInflight`. Used by the harness to - /// drain a bursty inbox without N socket round-trips. - RecvBatch { max: u32 }, /// Non-mutating: how many pending messages are addressed to me? /// Used by the harness to render a status line after each tool call. Status, @@ -421,28 +426,13 @@ pub enum AgentResponse { Ok, /// Either `Send` failed or `Recv` errored. Err { message: String }, - /// `Recv` produced a message. `id` is the broker's row id — opaque - /// to claude (the MCP surface strips it before handing the body - /// to the model) but tracked by the harness so the broker's - /// in-memory unacked list can be drained on `AckTurn`. When - /// `redelivered = true` this row was popped earlier, never - /// acked (turn crash / OOM / restart), and resurfaced by - /// `RequeueInflight` — the harness prepends a "may already be - /// handled" hint to the wake prompt so claude can DTRT. - Message { - from: String, - body: String, - #[serde(default)] - id: i64, - #[serde(default)] - redelivered: bool, - }, - /// `Recv` found nothing pending. - Empty, - /// `RecvBatch` result. `messages` is in FIFO order and may be - /// empty (treated like `Empty` for `Recv`); never longer than the - /// `max` the caller passed. - Batch { messages: Vec }, + /// `Recv` result: zero or more messages, FIFO-ordered, never + /// longer than the `max` the caller passed. Empty vec = nothing + /// pending (the "(empty)" path for the formatter). Per-row `id` + + /// `redelivered` carry the broker's row id (opaque to claude; + /// tracked by the harness for `AckTurn`) and the "previously + /// popped, not acked" flag — see `DeliveredMessage` for details. + Messages { messages: Vec }, /// `Status` result: how many pending messages are in this agent's inbox. Status { unread: u64 }, /// `Recent` result: newest-first inbox rows. @@ -612,16 +602,16 @@ pub enum ManagerRequest { to: String, body: String, }, - /// Same shape as `AgentRequest::Recv` — caller-tunable long-poll - /// duration, capped at 60s server-side, default 30s when None. + /// Same shape as `AgentRequest::Recv` — caller-tunable + /// `wait_seconds` (capped at 60s server-side, default 30s when + /// None) for first-message long-poll, plus `max` (default 1, cap + /// 32) to drain up to N popped rows in one round-trip. Recv { #[serde(default)] wait_seconds: Option, + #[serde(default)] + max: Option, }, - /// Mirror of `AgentRequest::RecvBatch` on the manager surface — - /// pop up to `max` pending messages in one round-trip, no - /// long-poll. Same ack + redelivery bookkeeping. - RecvBatch { max: u32 }, /// Non-mutating: pending message count, used to render a status line /// after each MCP tool call (mirrors `AgentRequest::Status`). Status, @@ -759,23 +749,11 @@ pub enum ManagerResponse { Err { message: String, }, - /// Same delivery shape as `AgentResponse::Message` — `id` + - /// `redelivered` carry the broker's row id and the - /// "previously popped, not acked" flag through the manager - /// surface so the manager harness drives the same - /// requeue-with-hint flow as a sub-agent. - Message { - from: String, - body: String, - #[serde(default)] - id: i64, - #[serde(default)] - redelivered: bool, - }, - Empty, - /// Mirror of `AgentResponse::Batch` on the manager surface. - /// `messages` is in FIFO order and may be empty. - Batch { + /// Mirror of `AgentResponse::Messages` on the manager surface. + /// Always-list shape: 0..=max popped rows, FIFO-ordered. Carries + /// per-row `id` + `redelivered` so the manager harness drives the + /// same ack + requeue-with-hint flow as a sub-agent. + Messages { messages: Vec, }, Status {