From 77b89bf2c6784cfb44973861f26a6cea26ebbc03 Mon Sep 17 00:00:00 2001 From: damocles Date: Tue, 19 May 2026 00:40:31 +0200 Subject: [PATCH] =?UTF-8?q?broker:=20recv=5Fbatch(max)=20=E2=80=94=20drain?= =?UTF-8?q?=20a=20bursty=20inbox=20in=20one=20round-trip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CLAUDE.md | 23 +++++ TODO.md | 9 -- hive-ag3nt/src/bin/hive-ag3nt.rs | 4 +- hive-ag3nt/src/bin/hive-m1nd.rs | 4 +- hive-ag3nt/src/mcp.rs | 86 +++++++++++++++++++ hive-c0re/src/agent_server.rs | 28 ++++++ hive-c0re/src/broker.rs | 143 +++++++++++++++++++++++++++++++ hive-c0re/src/manager_server.rs | 24 ++++++ hive-sh4re/src/lib.rs | 44 ++++++++++ 9 files changed, 354 insertions(+), 11 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 4a7b47d..a7bf6fb 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -183,6 +183,29 @@ read them à la carte. In-flight or recent context that hasn't earned a section yet. Prune freely. +- **Just landed:** inbox batching via `recv_batch(max)`. New + MCP tool on both agent + manager surfaces pops up to `max` + pending messages in a single round-trip (server-side cap + 32, `max = 0` short-circuits). Same delivery + ack + bookkeeping as `recv`: every popped row is marked + `delivered_at = NOW`, tracked on the per-recipient + `unacked_ids` list (so the next `AckTurn` closes them out), + and tagged `redelivered: true` if it was resurfaced by + `RequeueInflight`. Wake prompt's pending-inbox hint + rewritten to recommend `recv_batch(max: N)` instead of N + consecutive `recv`s. Wire: new + `AgentRequest::RecvBatch { max }` / + `AgentResponse::Batch { messages: Vec }` + (+ manager mirror) and a new `DeliveredMessage` struct in + `hive-sh4re` shared by both. `format_recv_batch` on the + mcp side renders the popped list with per-message + redelivery banners + `---` separators; empty batch + collapses to "(empty)" like single `recv`. 4 new broker + tests cover FIFO + cap, empty-when-idle, zero-max + short-circuit, and redelivered-flag propagation. Closes + the "inbox batching hint" item from the ergonomics + wishlist — the hint that was already in `format_wake_prompt` + is now actionable in one tool call instead of N. - **Just landed:** lease-style message delivery / no-drop on turn fail. The `messages` table gained an `acked_at` column (idempotent ALTER + backfill = `delivered_at` so diff --git a/TODO.md b/TODO.md index 44b929f..5d2549b 100644 --- a/TODO.md +++ b/TODO.md @@ -35,15 +35,6 @@ Filed by damocles, who actually lives in this thing. Loosely ranked by how often the friction bites in normal use. -- **Inbox batching hint in the wake prompt** — when the harness pops a - message and there are N more waiting, the wake prompt should say so - (e.g. `"(+3 more queued; consider draining before acting)"`) so claude - knows to call `recv()` again in the same turn instead of doing the - expensive Read/Edit dance once per message over N turns. The data's - already in the broker (`Broker::pending_count(agent)`); just thread it - into the prompt builder in `hive-ag3nt::turn.rs`. Even better: add a - one-shot `recv_batch(max: u32)` MCP tool that returns up to `max` - pending messages in a single round-trip. - **Optional `in_reply_to: ` on send** — pure wire addition; no behavioural change. The dashboard could render conversation threads (already wants this for the agent-to-agent question UI in the diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index 84dfeac..e3b8973 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -265,6 +265,7 @@ async fn serve( Ok( AgentResponse::Ok | AgentResponse::Status { .. } + | AgentResponse::Batch { .. } | AgentResponse::Recent { .. } | AgentResponse::QuestionQueued { .. } | AgentResponse::LooseEnds { .. } @@ -301,7 +302,8 @@ fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String::new() } else { format!( - "\n\n({unread} more message(s) pending in your inbox — drain via `mcp__hyperhive__recv` if relevant.)" + "\n\n({unread} more message(s) pending in your inbox — call `mcp__hyperhive__recv_batch` \ + with `max: {unread}` to drain them all in one round-trip before acting.)" ) }; format!("{banner}Incoming message from `{from}`:\n---\n{body}\n---{pending}") diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 0022d6a..4108fe5 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -221,6 +221,7 @@ async fn serve( Ok( ManagerResponse::Ok | ManagerResponse::Status { .. } + | ManagerResponse::Batch { .. } | ManagerResponse::QuestionQueued { .. } | ManagerResponse::Recent { .. } | ManagerResponse::Logs { .. } @@ -256,7 +257,8 @@ fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String::new() } else { format!( - "\n\n({unread} more message(s) pending in your inbox — drain via `mcp__hyperhive__recv` if relevant.)" + "\n\n({unread} more message(s) pending in your inbox — call `mcp__hyperhive__recv_batch` \ + with `max: {unread}` to drain them all in one round-trip before acting.)" ) }; format!("{banner}Incoming message from `{from}`:\n---\n{body}\n---{pending}") diff --git a/hive-ag3nt/src/mcp.rs b/hive-ag3nt/src/mcp.rs index d6c4ea2..a26a1cf 100644 --- a/hive-ag3nt/src/mcp.rs +++ b/hive-ag3nt/src/mcp.rs @@ -46,6 +46,10 @@ pub enum SocketReply { id: i64, redelivered: bool, }, + /// Result of `recv_batch`: zero or more messages popped in one + /// round-trip. Empty vec is the equivalent of `Empty` for a + /// single `recv` — the format helper collapses it to "(empty)". + Batch(Vec), Empty, Status(u64), QuestionQueued(i64), @@ -77,6 +81,7 @@ impl From for SocketReply { redelivered, }, hive_sh4re::AgentResponse::Empty => Self::Empty, + hive_sh4re::AgentResponse::Batch { messages } => Self::Batch(messages), hive_sh4re::AgentResponse::Status { unread } => Self::Status(unread), hive_sh4re::AgentResponse::Recent { rows } => Self::Recent(rows), hive_sh4re::AgentResponse::QuestionQueued { id } => Self::QuestionQueued(id), @@ -114,6 +119,7 @@ impl From for SocketReply { redelivered, }, hive_sh4re::ManagerResponse::Empty => Self::Empty, + hive_sh4re::ManagerResponse::Batch { messages } => Self::Batch(messages), hive_sh4re::ManagerResponse::Status { unread } => Self::Status(unread), hive_sh4re::ManagerResponse::QuestionQueued { id } => Self::QuestionQueued(id), hive_sh4re::ManagerResponse::Recent { rows } => Self::Recent(rows), @@ -179,6 +185,35 @@ pub fn format_recv(resp: Result) -> String { pub const REDELIVERY_HINT: &str = "[redelivered after harness restart — may already be handled]\n"; +/// Format helper for `recv_batch`: renders zero or more popped +/// messages as a single string. Empty batch collapses to "(empty)" +/// so claude doesn't go hunting for content. Each message is rendered +/// in the same `from: \n\n` shape as `format_recv` — +/// per-message redelivery banner included — separated by a thin rule +/// so the model can tell where one body ends and the next begins. +pub fn format_recv_batch(resp: Result) -> String { + use std::fmt::Write as _; + let messages = match resp { + Ok(SocketReply::Batch(m)) => m, + Ok(SocketReply::Err(m)) => return format!("recv_batch failed: {m}"), + Ok(other) => return format!("recv_batch unexpected response: {other:?}"), + Err(e) => return format!("recv_batch transport error: {e:#}"), + }; + if messages.is_empty() { + return "(empty)".to_owned(); + } + let n = messages.len(); + let mut out = format!("popped {n} message(s):\n\n"); + for (i, m) in messages.iter().enumerate() { + if i > 0 { + out.push_str("\n---\n\n"); + } + let banner = if m.redelivered { REDELIVERY_HINT } else { "" }; + let _ = write!(out, "{banner}from: {}\n\n{}", m.from, m.body); + } + out +} + /// Format helper for `get_loose_ends`: renders a short bulleted list /// of pending approvals + questions + reminders. Empty list collapses /// to a clear marker so claude doesn't go hunting for a payload that @@ -338,6 +373,17 @@ pub struct RecvArgs { pub wait_seconds: Option, } +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct RecvBatchArgs { + /// Maximum number of messages to pop in this round-trip. Capped + /// at 32 server-side; values above the cap clamp silently. + /// Returns whatever is currently pending (possibly zero) without + /// long-polling — call when you've been told the inbox is busy + /// (e.g. the wake prompt mentioned N pending) and want to drain + /// them in one tool call instead of N separate `recv` calls. + pub max: u32, +} + /// MCP tool args for `remind`. Exactly one of `delay_seconds` or /// `at_unix_timestamp` must be set; both / neither is a tool-side error. /// Hides the tagged `ReminderTiming` enum behind a flatter schema so the @@ -511,6 +557,26 @@ impl AgentServer { .await } + #[tool( + description = "Pop up to `max` messages from this agent's inbox in a single round-trip \ + (no long-poll — returns whatever's pending immediately, possibly zero). Use this \ + when the wake prompt tells you the inbox has more messages queued, or any time you \ + know you'll be draining several at once: one tool call beats N consecutive `recv`s. \ + Per-message redelivery banners + ack bookkeeping work the same as `recv`. `max` \ + caps at 32 server-side; pass whatever you reasonably expect to handle this turn." + )] + async fn recv_batch(&self, Parameters(args): Parameters) -> String { + let log = format!("{args:?}"); + let max = args.max; + run_tool_envelope("recv_batch", log, async move { + let (resp, retries) = self + .dispatch(hive_sh4re::AgentRequest::RecvBatch { max }) + .await; + annotate_retries(format_recv_batch(resp), retries) + }) + .await + } + #[tool( description = "List loose ends pending against this agent: unanswered questions \ where you are the asker (waiting on someone) or the target (someone's waiting on \ @@ -819,6 +885,26 @@ impl ManagerServer { .await } + #[tool( + description = "Pop up to `max` messages from the manager inbox in a single round-trip \ + (no long-poll — returns whatever's pending immediately, possibly zero). Use this \ + when the wake prompt tells you the inbox has more messages queued, or any time you \ + know you'll be draining several at once: one tool call beats N consecutive `recv`s. \ + Per-message redelivery banners + ack bookkeeping work the same as `recv`. `max` \ + caps at 32 server-side; pass whatever you reasonably expect to handle this turn." + )] + async fn recv_batch(&self, Parameters(args): Parameters) -> String { + let log = format!("{args:?}"); + let max = args.max; + run_tool_envelope("recv_batch", log, async move { + let (resp, retries) = self + .dispatch(hive_sh4re::ManagerRequest::RecvBatch { max }) + .await; + annotate_retries(format_recv_batch(resp), retries) + }) + .await + } + #[tool( description = "Queue a Spawn approval for a brand-new sub-agent. The operator \ approves on the dashboard before the container is actually created." diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index 5da185a..3138f75 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -87,6 +87,15 @@ async fn serve(stream: UnixStream, agent: String, coord: Arc) -> Re /// positive `wait_seconds`. const RECV_LONG_POLL_MAX: std::time::Duration = std::time::Duration::from_secs(180); +/// Server-side hard cap on `RecvBatch.max`. Bounds the size of a +/// single round-trip so a confused caller can't drain the entire +/// inbox in one go and blow past wire-buffer sizes; everything above +/// the cap silently clamps. 32 is comfortably above the burst sizes +/// we've seen in practice (post-rebuild rescue, multi-agent reply +/// storms) and well under the per-message `MESSAGE_MAX_BYTES` * N +/// envelope budget. +const RECV_BATCH_MAX: u32 = 32; + fn recv_timeout(wait_seconds: Option) -> std::time::Duration { match wait_seconds { Some(s) => std::time::Duration::from_secs(s).min(RECV_LONG_POLL_MAX), @@ -114,6 +123,25 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> message: format!("{e:#}"), }, }, + AgentRequest::RecvBatch { max } => { + let cap = (*max).min(RECV_BATCH_MAX) as usize; + match broker.recv_batch(agent, cap) { + Ok(deliveries) => AgentResponse::Batch { + messages: deliveries + .into_iter() + .map(|d| hive_sh4re::DeliveredMessage { + from: d.message.from, + body: d.message.body, + id: d.id, + redelivered: d.redelivered, + }) + .collect(), + }, + Err(e) => AgentResponse::Err { + message: format!("{e:#}"), + }, + } + } AgentRequest::Status => match broker.count_pending(agent) { Ok(unread) => AgentResponse::Status { unread }, Err(e) => AgentResponse::Err { diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 4cf66c3..7e739ed 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -362,6 +362,85 @@ impl Broker { })) } + /// Pop up to `max` pending messages for `recipient` in one + /// round-trip. Same per-row semantics as `recv`: every popped row + /// is marked `delivered_at = NOW`, pushed onto the per-recipient + /// `unacked_ids` list (so the next `ack_turn` closes them out), + /// and tagged with `redelivered = true` if it was resurfaced by + /// the most recent `requeue_inflight`. Emits one + /// `MessageEvent::Delivered` per popped row so the dashboard + /// forwarder stream stays consistent with the single-row path. + /// + /// `max == 0` short-circuits to an empty vec (no DB hit); any + /// positive value caps the batch at `max`. FIFO order matches + /// `recv`. + pub fn recv_batch(&self, recipient: &str, max: usize) -> Result> { + if max == 0 { + return Ok(Vec::new()); + } + // Same lock order as `recv` / `ack_turn` / `requeue_inflight`. + let mut inflight = self.inflight.lock().unwrap(); + let conn = self.conn.lock().unwrap(); + let max_i = i64::try_from(max).unwrap_or(i64::MAX); + let mut stmt = conn.prepare( + "SELECT id, sender, recipient, body + FROM messages + WHERE recipient = ?1 AND delivered_at IS NULL + ORDER BY id ASC + LIMIT ?2", + )?; + let rows: Vec<(i64, String, String, String)> = stmt + .query_map(params![recipient, max_i], |row| { + Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)) + })? + .collect::>()?; + drop(stmt); + if rows.is_empty() { + return Ok(Vec::new()); + } + // Stamp all popped rows in a single UPDATE — under the broker + // mutex, well within sqlite's 999-param default. + let now = now_unix(); + let ids: Vec = rows.iter().map(|(id, _, _, _)| *id).collect(); + let placeholders = std::iter::repeat_n("?", ids.len()) + .collect::>() + .join(","); + let sql = format!("UPDATE messages SET delivered_at = ? WHERE id IN ({placeholders})"); + let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(ids.len() + 1); + params_vec.push(&now); + for id in &ids { + params_vec.push(id); + } + conn.execute(&sql, params_vec.as_slice())?; + drop(conn); + // Bookkeeping + assemble the Delivery list. Per-row + // `requeued_ids` lookup runs once per pop, same as `recv`. + let slot = inflight.entry(recipient.to_owned()).or_default(); + let mut deliveries = Vec::with_capacity(rows.len()); + for (id, from, to, body) in rows { + slot.unacked_ids.push(id); + let redelivered = slot.requeued_ids.remove(&id); + deliveries.push(Delivery { + id, + redelivered, + message: Message { from, to, body }, + }); + } + drop(inflight); + // Mirror the per-row Delivered emit `recv` does so the + // dashboard forwarder sees one event per message regardless of + // which surface the harness used. + for d in &deliveries { + let _ = self.events.send(MessageEvent::Delivered { + from: d.message.from.clone(), + to: d.message.to.clone(), + body: d.message.body.clone(), + at: now, + }); + } + Ok(deliveries) + } + /// Drain the per-recipient unacked-id list and mark every row /// `acked_at = NOW`. Fired by the harness after `TurnOutcome::Ok`. /// Returns the number of rows acked (zero is normal — claude @@ -867,6 +946,70 @@ mod tests { ); } + /// Happy path for `recv_batch`: pops in FIFO order, respects + /// `max`, leaves the rest pending for the next call. + #[test] + fn recv_batch_pops_fifo_capped_at_max() { + let h = open_broker(); + let broker = &h.broker; + for i in 0..5 { + broker.send(&msg("a", "b", &format!("m{i}"))).unwrap(); + } + let batch = broker.recv_batch("b", 3).unwrap(); + let bodies: Vec<_> = batch.iter().map(|d| d.message.body.as_str()).collect(); + assert_eq!(bodies, vec!["m0", "m1", "m2"]); + // Remaining two stay pending; a second batch drains them. + let next = broker.recv_batch("b", 10).unwrap(); + let bodies: Vec<_> = next.iter().map(|d| d.message.body.as_str()).collect(); + assert_eq!(bodies, vec!["m3", "m4"]); + // ack_turn closes out all five popped rows in one go. + assert_eq!(broker.ack_turn("b").unwrap(), 5); + } + + /// `recv_batch` with no pending traffic returns an empty vec + /// (the "(empty)" path), not an error. + #[test] + fn recv_batch_returns_empty_when_idle() { + let h = open_broker(); + let batch = h.broker.recv_batch("ghost", 5).unwrap(); + assert!(batch.is_empty()); + } + + /// `max = 0` short-circuits without touching the DB (covered by + /// asserting we don't accidentally pop a pending row). + #[test] + fn recv_batch_zero_max_pops_nothing() { + let h = open_broker(); + let broker = &h.broker; + broker.send(&msg("a", "b", "stay")).unwrap(); + assert!(broker.recv_batch("b", 0).unwrap().is_empty()); + // The pending row is still in flight for the next real recv. + let d = broker.recv("b").unwrap().expect("still pending"); + assert_eq!(d.message.body, "stay"); + } + + /// `recv_batch` tags requeued rows with `redelivered: true` and + /// drains them from the per-recipient `requeued_ids` set so a + /// fresh follow-up recv after the batch doesn't double-tag. + #[test] + fn recv_batch_propagates_redelivered_flag() { + let h = open_broker(); + let broker = &h.broker; + broker.send(&msg("a", "b", "one")).unwrap(); + broker.send(&msg("a", "b", "two")).unwrap(); + broker.recv("b").unwrap().expect("popped 1"); + broker.recv("b").unwrap().expect("popped 2"); + broker.requeue_inflight("b").unwrap(); + let batch = broker.recv_batch("b", 5).unwrap(); + assert_eq!(batch.len(), 2); + assert!(batch.iter().all(|d| d.redelivered)); + // Fresh send after the batch is NOT tagged redelivered. + broker.send(&msg("a", "b", "three")).unwrap(); + let d = broker.recv("b").unwrap().expect("re-pop 3"); + assert_eq!(d.message.body, "three"); + assert!(!d.redelivered); + } + /// Per-recipient isolation: `requeue_inflight("a")` doesn't touch /// b's inflight rows. #[test] diff --git a/hive-c0re/src/manager_server.rs b/hive-c0re/src/manager_server.rs index 3cf7d30..69b074e 100644 --- a/hive-c0re/src/manager_server.rs +++ b/hive-c0re/src/manager_server.rs @@ -75,6 +75,11 @@ async fn serve(stream: UnixStream, coord: Arc) -> Result<()> { /// seconds (clamped at MAX). const MANAGER_RECV_LONG_POLL_MAX: std::time::Duration = std::time::Duration::from_secs(180); +/// Same shape + rationale as `agent_server::RECV_BATCH_MAX`. Kept +/// numerically aligned across surfaces so a tool description that +/// quotes the cap stays accurate either way. +const MANAGER_RECV_BATCH_MAX: u32 = 32; + fn manager_recv_timeout(wait_seconds: Option) -> std::time::Duration { match wait_seconds { Some(s) => std::time::Duration::from_secs(s).min(MANAGER_RECV_LONG_POLL_MAX), @@ -149,6 +154,25 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResp message: format!("{e:#}"), }, }, + ManagerRequest::RecvBatch { max } => { + let cap = (*max).min(MANAGER_RECV_BATCH_MAX) as usize; + match coord.broker.recv_batch(MANAGER_AGENT, cap) { + Ok(deliveries) => ManagerResponse::Batch { + messages: deliveries + .into_iter() + .map(|d| hive_sh4re::DeliveredMessage { + from: d.message.from, + body: d.message.body, + id: d.id, + redelivered: d.redelivered, + }) + .collect(), + }, + Err(e) => ManagerResponse::Err { + message: format!("{e:#}"), + }, + } + } ManagerRequest::RequestSpawn { name, description } => { tracing::info!(%name, "manager: request_spawn"); match coord.approvals.submit_kind( diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index 4cd729c..0203879 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -174,6 +174,28 @@ pub struct InboxRow { pub at: i64, } +/// One delivered message in a `RecvBatch` response. Same fields as +/// `AgentResponse::Message` / `ManagerResponse::Message` without the +/// variant wrapper — a batch returns a `Vec` so the +/// harness can iterate without unpicking N separate top-level frames. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeliveredMessage { + pub from: String, + pub body: String, + /// Broker row id, mirrored from the `Delivery` struct. Opaque to + /// claude but tracked by the harness so the broker's in-memory + /// unacked list can be drained on `AckTurn`. Marked `default` for + /// wire backwards-compat — pre-feature peers parse to 0. + #[serde(default)] + pub id: i64, + /// `true` when this row was previously popped, never acked, and + /// resurfaced by `RequeueInflight`. The format helper prepends the + /// "may already be handled" hint to the rendered body so claude + /// sees the warning per-message in the batch. + #[serde(default)] + pub redelivered: bool, +} + /// Reminder timing: either relative (wait N seconds) or absolute (at unix /// timestamp). #[derive(Debug, Clone, Serialize, Deserialize)] @@ -271,6 +293,15 @@ pub enum AgentRequest { #[serde(default)] wait_seconds: Option, }, + /// Pop up to `max` pending messages in one round-trip. No + /// long-poll — returns whatever's currently queued (possibly + /// zero) immediately. Same delivery + ack bookkeeping as `Recv`: + /// every popped row is marked `delivered_at = NOW`, tracked in + /// the broker's per-recipient `unacked_ids` list (so the next + /// `AckTurn` closes them out), and tagged `redelivered = true` if + /// it was resurfaced by `RequeueInflight`. Used by the harness to + /// drain a bursty inbox without N socket round-trips. + RecvBatch { max: u32 }, /// Non-mutating: how many pending messages are addressed to me? /// Used by the harness to render a status line after each tool call. Status, @@ -408,6 +439,10 @@ pub enum AgentResponse { }, /// `Recv` found nothing pending. Empty, + /// `RecvBatch` result. `messages` is in FIFO order and may be + /// empty (treated like `Empty` for `Recv`); never longer than the + /// `max` the caller passed. + Batch { messages: Vec }, /// `Status` result: how many pending messages are in this agent's inbox. Status { unread: u64 }, /// `Recent` result: newest-first inbox rows. @@ -583,6 +618,10 @@ pub enum ManagerRequest { #[serde(default)] wait_seconds: Option, }, + /// Mirror of `AgentRequest::RecvBatch` on the manager surface — + /// pop up to `max` pending messages in one round-trip, no + /// long-poll. Same ack + redelivery bookkeeping. + RecvBatch { max: u32 }, /// Non-mutating: pending message count, used to render a status line /// after each MCP tool call (mirrors `AgentRequest::Status`). Status, @@ -734,6 +773,11 @@ pub enum ManagerResponse { redelivered: bool, }, Empty, + /// Mirror of `AgentResponse::Batch` on the manager surface. + /// `messages` is in FIFO order and may be empty. + Batch { + messages: Vec, + }, Status { unread: u64, },