From 690cb5ab5b9d71e73e753377d6e692ab875a0d29 Mon Sep 17 00:00:00 2001 From: damocles Date: Mon, 18 May 2026 22:01:48 +0200 Subject: [PATCH] =?UTF-8?q?broker:=20lease-style=20delivery=20=E2=80=94=20?= =?UTF-8?q?ack=5Fturn=20+=20requeue=5Finflight=20close=20the=20no-drop=20l?= =?UTF-8?q?oop?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CLAUDE.md | 37 ++++ hive-ag3nt/src/bin/hive-ag3nt.rs | 79 ++++++- hive-ag3nt/src/bin/hive-m1nd.rs | 66 +++++- hive-ag3nt/src/mcp.rs | 61 ++++- hive-c0re/src/agent_server.rs | 26 ++- hive-c0re/src/broker.rs | 369 ++++++++++++++++++++++++++++++- hive-c0re/src/manager_server.rs | 25 ++- hive-sh4re/src/lib.rs | 56 ++++- 8 files changed, 684 insertions(+), 35 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index d60dfe1..4a7b47d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -183,6 +183,43 @@ read them à la carte. In-flight or recent context that hasn't earned a section yet. Prune freely. +- **Just landed:** lease-style message delivery / no-drop + on turn fail. The `messages` table gained an `acked_at` + column (idempotent ALTER + backfill = `delivered_at` so + pre-migration delivered rows count as already-acked). + `Broker::recv` now returns `Delivery { id, redelivered, + message }` — the harness gets the row id back so + `AckTurn` can sweep every popped id at turn-end-OK. Two + new wire arms on both agent + manager surfaces: + `AckTurn` (drains the broker's per-recipient in-memory + `unacked_ids` list and stamps the rows `acked_at = NOW`) + and `RequeueInflight` (one-shot at harness boot: resets + `delivered_at = NULL` on every still-inflight row + + remembers each id so the next `Recv` carries + `redelivered: true`). Both bin loops call + `requeue_inflight` once before entering serve, and + `ack_turn` after every `TurnOutcome::Ok` (Failed + + PromptTooLong intentionally skip the ack so the popped + rows stay in-flight for the next boot's requeue). + `format_recv` + `format_wake_prompt` on both bins + surface a `[redelivered after harness restart — may + already be handled]` banner so claude knows the + side-effects of any previous handling may already have + happened. Lock order: `inflight` mutex first then + `conn` mutex in all three methods (`recv` / `ack_turn` + / `requeue_inflight`) so a concurrent pop can't race + the requeue's DB update vs in-memory populate and + miss the redelivered tag. `vacuum_delivered` filter + flipped from `delivered_at < cutoff` to `acked_at IS + NOT NULL AND acked_at < cutoff` so unacked-but- + delivered rows survive vacuum (they're recoverable via + `requeue_inflight`). 7 new tests in `broker::tests` + cover happy path, crash recovery, idempotency, per- + recipient isolation, batch ack, vacuum preservation, + and FIFO ordering on requeue. Closes the "post-rebuild + system-message missed wake" bug class entirely (any + turn that wakes from a `delivered_at NOT NULL, + acked_at NULL` row resurfaces on next boot). - **Just landed:** ctx + cost badges split. The per-agent page now shows TWO chips — `ctx · N` (last inference's prompt size = actual context window utilisation, parsed diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index a9b9b44..927b48d 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -160,6 +160,15 @@ async fn serve( ) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-ag3nt serve"); let _ = state; // reserved for future state transitions (turn-loop -> needs-login) + // Boot-time recovery: ask the broker to resurface anything we + // popped in a previous harness session but never acked + // (crashed mid-turn / OOM / container restart). The broker + // resets `delivered_at = NULL` on those rows and remembers + // their ids so the next `Recv` tags them `redelivered: true`; + // we then prepend a "may already be handled" hint to the wake + // prompt. Single shot before entering the serve loop; idempotent + // when there's nothing inflight. + requeue_inflight(socket).await; loop { let recv: Result = // Explicit long-poll: the new agent_server semantics treat @@ -174,8 +183,13 @@ async fn serve( ) .await; match recv { - Ok(AgentResponse::Message { from, body }) => { - tracing::info!(%from, %body, "inbox"); + Ok(AgentResponse::Message { + from, + body, + id: _, + redelivered, + }) => { + tracing::info!(%from, %body, %redelivered, "inbox"); let unread = inbox_unread(socket).await; bus.emit(LiveEvent::TurnStart { from: from.clone(), @@ -186,13 +200,24 @@ async fn serve( let started_at = now_unix(); let started_instant = std::time::Instant::now(); let model_at_start = bus.model(); - let prompt = format_wake_prompt(&from, &body, unread); + let prompt = format_wake_prompt(&from, &body, unread, redelivered); let outcome = { let _guard = turn_lock.lock().await; turn::drive_turn(&prompt, files, &bus).await }; turn::emit_turn_end(&bus, &outcome); bus.set_state(TurnState::Idle); + // Ack only on a clean turn-end. `Failed` leaves every + // message popped during the turn in the unacked list; + // next harness boot's `RequeueInflight` will reset + // `delivered_at = NULL` and tag them `redelivered`. + // `PromptTooLong` is absorbed inside `drive_turn` via + // compaction so it shouldn't reach here, but if it + // does we also skip the ack (safer to redeliver than + // to lose the message). + if matches!(outcome, turn::TurnOutcome::Ok) { + ack_turn(socket).await; + } // Failures are unhandled by definition — PromptTooLong is // absorbed inside drive_turn via compaction, so anything // that reaches Failed here is a real crash. Notify the @@ -261,7 +286,16 @@ async fn serve( /// (`prompts/agent.md` → `claude --system-prompt-file`); this is just the /// wake signal claude reacts to. `unread` is the count of *other* /// messages in the inbox right after this one was popped. -fn format_wake_prompt(from: &str, body: &str, unread: u64) -> String { +/// `redelivered` flags messages that were popped in a prior harness +/// session, never acked, and resurfaced after a restart — a banner +/// at the top of the wake prompt warns that any side-effects of +/// previous handling may already have happened. +fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String { + let banner = if redelivered { + hive_ag3nt::mcp::REDELIVERY_HINT + } else { + "" + }; let pending = if unread == 0 { String::new() } else { @@ -269,7 +303,42 @@ fn format_wake_prompt(from: &str, body: &str, unread: u64) -> String { "\n\n({unread} more message(s) pending in your inbox — drain via `mcp__hyperhive__recv` if relevant.)" ) }; - format!("Incoming message from `{from}`:\n---\n{body}\n---{pending}") + format!("{banner}Incoming message from `{from}`:\n---\n{body}\n---{pending}") +} + +/// Best-effort: tell the broker every message we popped during the +/// turn is now fully handled (turn-end-OK). Swallows transport +/// errors — the worst case is a redundant requeue on next boot. +async fn ack_turn(socket: &Path) { + match client::request::<_, AgentResponse>(socket, &AgentRequest::AckTurn).await { + Ok(AgentResponse::Ok) => {} + Ok(AgentResponse::Err { message }) => { + tracing::warn!(%message, "ack_turn rejected by broker"); + } + Ok(other) => { + tracing::warn!(?other, "ack_turn unexpected response"); + } + Err(e) => tracing::warn!(error = ?e, "ack_turn transport error"), + } +} + +/// Boot-time recovery: ask the broker to resurface anything we +/// popped in a previous harness session but never acked. The broker +/// resets `delivered_at = NULL` on those rows and remembers their +/// ids so the next `Recv` carries `redelivered: true`. Swallows +/// transport errors — they degrade to "no recovery this boot", +/// which is no worse than the pre-feature behaviour (silent drop). +async fn requeue_inflight(socket: &Path) { + match client::request::<_, AgentResponse>(socket, &AgentRequest::RequeueInflight).await { + Ok(AgentResponse::Ok) => {} + Ok(AgentResponse::Err { message }) => { + tracing::warn!(%message, "requeue_inflight rejected by broker"); + } + Ok(other) => { + tracing::warn!(?other, "requeue_inflight unexpected response"); + } + Err(e) => tracing::warn!(error = ?e, "requeue_inflight transport error"), + } } /// Best-effort: tell the manager that this agent's last turn crashed diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 911876c..535a99d 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -121,6 +121,10 @@ async fn serve( turn_lock: TurnLock, ) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-m1nd serve"); + // Same boot-time recovery as hive-ag3nt — see that loop for the + // rationale. Manager-flavour socket so we requeue only manager + // inflight rows. + requeue_inflight(socket).await; loop { let recv: Result = // Explicit long-poll: see hive-ag3nt's serve loop for the @@ -134,7 +138,12 @@ async fn serve( ) .await; match recv { - Ok(ManagerResponse::Message { from, body }) => { + Ok(ManagerResponse::Message { + from, + body, + id: _, + redelivered, + }) => { if from == SYSTEM_SENDER { // Helper events (ApprovalResolved / Spawned / Rebuilt / // Killed / Destroyed) — these are FYI for the manager; @@ -154,14 +163,14 @@ async fn serve( // prompt body so claude sees it. Sender stays "system" // so the wake prompt can label it as such. } - tracing::info!(%from, %body, "manager inbox"); + tracing::info!(%from, %body, %redelivered, "manager inbox"); let unread = inbox_unread(socket).await; bus.emit(LiveEvent::TurnStart { from: from.clone(), body: body.clone(), unread, }); - let prompt = format_wake_prompt(&from, &body, unread); + let prompt = format_wake_prompt(&from, &body, unread, redelivered); bus.set_state(TurnState::Thinking); let started_at = now_unix(); let started_instant = std::time::Instant::now(); @@ -172,6 +181,12 @@ async fn serve( }; turn::emit_turn_end(&bus, &outcome); bus.set_state(TurnState::Idle); + // Ack only on a clean turn-end; Failed leaves the + // popped ids in-flight for the next boot's requeue. + // Mirrors hive-ag3nt; see that loop for full rationale. + if matches!(outcome, turn::TurnOutcome::Ok) { + ack_turn(socket).await; + } if let Some(s) = &stats { let ended_at = now_unix(); let duration_ms = @@ -228,8 +243,15 @@ async fn serve( /// Per-turn user prompt. The role/tools/etc. is in the system prompt /// (`prompts/manager.md` → `claude --system-prompt-file`); this is just /// the wake signal. `unread` is the inbox depth after this message was -/// popped. -fn format_wake_prompt(from: &str, body: &str, unread: u64) -> String { +/// popped. `redelivered` adds a "may already be handled" banner above +/// the wake body when the broker resurfaced this row (see hive-ag3nt's +/// `format_wake_prompt` for the full story). +fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String { + let banner = if redelivered { + hive_ag3nt::mcp::REDELIVERY_HINT + } else { + "" + }; let pending = if unread == 0 { String::new() } else { @@ -237,7 +259,39 @@ fn format_wake_prompt(from: &str, body: &str, unread: u64) -> String { "\n\n({unread} more message(s) pending in your inbox — drain via `mcp__hyperhive__recv` if relevant.)" ) }; - format!("Incoming message from `{from}`:\n---\n{body}\n---{pending}") + format!("{banner}Incoming message from `{from}`:\n---\n{body}\n---{pending}") +} + +/// Best-effort: tell the broker every message popped during the turn +/// is now handled. Mirror of `hive-ag3nt::ack_turn` on the manager +/// surface. +async fn ack_turn(socket: &Path) { + match client::request::<_, ManagerResponse>(socket, &ManagerRequest::AckTurn).await { + Ok(ManagerResponse::Ok) => {} + Ok(ManagerResponse::Err { message }) => { + tracing::warn!(%message, "ack_turn rejected by broker"); + } + Ok(other) => { + tracing::warn!(?other, "ack_turn unexpected response"); + } + Err(e) => tracing::warn!(error = ?e, "ack_turn transport error"), + } +} + +/// Boot-time recovery: ask the broker to resurface any inflight (popped +/// but not acked) messages so the next `Recv` re-delivers them with +/// the redelivery banner. Mirror of `hive-ag3nt::requeue_inflight`. +async fn requeue_inflight(socket: &Path) { + match client::request::<_, ManagerResponse>(socket, &ManagerRequest::RequeueInflight).await { + Ok(ManagerResponse::Ok) => {} + Ok(ManagerResponse::Err { message }) => { + tracing::warn!(%message, "requeue_inflight rejected by broker"); + } + Ok(other) => { + tracing::warn!(?other, "requeue_inflight unexpected response"); + } + Err(e) => tracing::warn!(error = ?e, "requeue_inflight transport error"), + } } async fn inbox_unread(socket: &Path) -> u64 { diff --git a/hive-ag3nt/src/mcp.rs b/hive-ag3nt/src/mcp.rs index ace175a..7e6bc0a 100644 --- a/hive-ag3nt/src/mcp.rs +++ b/hive-ag3nt/src/mcp.rs @@ -34,7 +34,18 @@ use crate::client; pub enum SocketReply { Ok, Err(String), - Message { from: String, body: String }, + /// `id` is the broker's row id — not surfaced to claude but + /// useful for harness-side bookkeeping (not used in this module + /// today; the bin loops drive ack via `AckTurn` instead of + /// per-id). `redelivered` triggers the "may already be handled" + /// hint in `format_recv` so claude sees it when draining the + /// inbox in-turn. + Message { + from: String, + body: String, + id: i64, + redelivered: bool, + }, Empty, Status(u64), QuestionQueued(i64), @@ -54,7 +65,17 @@ impl From for SocketReply { match r { hive_sh4re::AgentResponse::Ok => Self::Ok, hive_sh4re::AgentResponse::Err { message } => Self::Err(message), - hive_sh4re::AgentResponse::Message { from, body } => Self::Message { from, body }, + hive_sh4re::AgentResponse::Message { + from, + body, + id, + redelivered, + } => Self::Message { + from, + body, + id, + redelivered, + }, hive_sh4re::AgentResponse::Empty => Self::Empty, hive_sh4re::AgentResponse::Status { unread } => Self::Status(unread), hive_sh4re::AgentResponse::Recent { rows } => Self::Recent(rows), @@ -81,7 +102,17 @@ impl From for SocketReply { match r { hive_sh4re::ManagerResponse::Ok => Self::Ok, hive_sh4re::ManagerResponse::Err { message } => Self::Err(message), - hive_sh4re::ManagerResponse::Message { from, body } => Self::Message { from, body }, + hive_sh4re::ManagerResponse::Message { + from, + body, + id, + redelivered, + } => Self::Message { + from, + body, + id, + redelivered, + }, hive_sh4re::ManagerResponse::Empty => Self::Empty, hive_sh4re::ManagerResponse::Status { unread } => Self::Status(unread), hive_sh4re::ManagerResponse::QuestionQueued { id } => Self::QuestionQueued(id), @@ -117,10 +148,22 @@ pub fn format_ack(resp: Result, tool: &str, ok_msg: } /// Format helper for `recv` tools: `Message` → from + body block; -/// `Empty` → marker; anything else surfaces as an error. +/// `Empty` → marker; anything else surfaces as an error. When the +/// broker tags the row as `redelivered` (popped before, never acked, +/// resurfaced after a harness restart) a short banner is prepended +/// so claude knows the side-effects of any previous handling may +/// already have happened. pub fn format_recv(resp: Result) -> String { match resp { - Ok(SocketReply::Message { from, body }) => format!("from: {from}\n\n{body}"), + Ok(SocketReply::Message { + from, + body, + redelivered, + .. + }) => { + let banner = if redelivered { REDELIVERY_HINT } else { "" }; + format!("{banner}from: {from}\n\n{body}") + } Ok(SocketReply::Empty) => "(empty)".into(), Ok(SocketReply::Err(m)) => format!("recv failed: {m}"), Ok(other) => format!("recv unexpected response: {other:?}"), @@ -128,6 +171,14 @@ pub fn format_recv(resp: Result) -> String { } } +/// Header prepended to message bodies that were popped by a prior +/// harness session, never acked (turn crash / OOM / restart), and +/// resurfaced by `RequeueInflight` on this session's boot. Same +/// string surfaces in the wake prompt (see the bin loops) and the +/// in-turn `recv` tool result so claude sees the warning either way. +pub const REDELIVERY_HINT: &str = + "[redelivered after harness restart — may already be handled]\n"; + /// Format helper for `get_loose_ends`: renders a short bulleted list /// of pending approvals + questions + reminders. Empty list collapses /// to a clear marker so claude doesn't go hunting for a payload that diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index 8615f16..f4c0148 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -94,6 +94,7 @@ fn recv_timeout(wait_seconds: Option) -> std::time::Duration { } } +#[allow(clippy::too_many_lines)] async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> AgentResponse { let broker = &coord.broker; match req { @@ -102,9 +103,11 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> .recv_blocking(agent, recv_timeout(*wait_seconds)) .await { - Ok(Some(msg)) => AgentResponse::Message { - from: msg.from, - body: msg.body, + Ok(Some(d)) => AgentResponse::Message { + from: d.message.from, + body: d.message.body, + id: d.id, + redelivered: d.redelivered, }, Ok(None) => AgentResponse::Empty, Err(e) => AgentResponse::Err { @@ -200,6 +203,23 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> |message| AgentResponse::Err { message }, |()| AgentResponse::Ok, ), + AgentRequest::AckTurn => match broker.ack_turn(agent) { + Ok(_n) => AgentResponse::Ok, + Err(e) => AgentResponse::Err { + message: format!("{e:#}"), + }, + }, + AgentRequest::RequeueInflight => match broker.requeue_inflight(agent) { + Ok(n) => { + if n > 0 { + tracing::info!(%agent, requeued = %n, "requeued in-flight messages"); + } + AgentResponse::Ok + } + Err(e) => AgentResponse::Err { + message: format!("{e:#}"), + }, + }, } } diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index e29c21f..af36e4c 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -1,6 +1,7 @@ //! Sqlite-backed message broker. Survives `hive-c0re` restart, and taps every //! send/recv onto a broadcast channel so the dashboard can stream it. +use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::Mutex; use std::time::{SystemTime, UNIX_EPOCH}; @@ -46,6 +47,18 @@ const EVENT_CHANNEL: usize = 256; /// self-documenting. pub type DueReminder = (String, i64, String, Option); +/// A single message hand-off from broker to recipient. Carries the +/// broker's row id (so the harness can drive `ack_turn` later) and +/// the redelivery flag (so the harness can prepend the +/// "may already be handled" hint to the wake prompt). The +/// `Message` itself is identical to a pristine `Send` payload. +#[derive(Debug, Clone)] +pub struct Delivery { + pub id: i64, + pub redelivered: bool, + pub message: Message, +} + /// Row shape for [`Broker::list_pending_reminders`], shipped on the /// dashboard `/api/reminders` response. #[derive(Debug, Clone, Serialize)] @@ -99,9 +112,33 @@ pub enum MessageEvent { }, } +/// Per-recipient in-memory bookkeeping for the deliver-then-ack +/// flow. Source of truth is the DB columns `delivered_at` + +/// `acked_at`; the in-memory state here is purely an optimisation +/// (avoids scanning the messages table on `AckTurn`) plus the +/// redelivery-hint marker. +#[derive(Default)] +struct RecipientInflight { + /// Message ids the broker has handed to this recipient since the + /// last `AckTurn`. Drained on `ack_turn`, which then runs a + /// single `UPDATE … WHERE id IN (…)` to set `acked_at`. + unacked_ids: Vec, + /// Message ids resurfaced by the most recent `requeue_inflight` + /// call. The next `recv` pop of any id in this set tags the + /// response with `redelivered: true` so the harness can prepend + /// the "may already be handled" hint to the wake prompt; + /// successful pops drain the id from the set. + requeued_ids: HashSet, +} + pub struct Broker { conn: Mutex, events: broadcast::Sender, + /// Per-recipient deliver/ack tracking. Lost on hive-c0re restart + /// (harmless — the harness fires `RequeueInflight` on its own + /// boot, which rebuilds the `requeued_ids` set from the DB and + /// clears any stale `unacked_ids`). + inflight: Mutex>, } impl Broker { @@ -113,11 +150,13 @@ impl Broker { let conn = Connection::open(path).with_context(|| format!("open broker db {}", path.display()))?; conn.execute_batch(SCHEMA).context("apply broker schema")?; + ensure_message_columns(&conn).context("migrate messages columns")?; ensure_reminder_columns(&conn).context("migrate reminders columns")?; let (events, _) = broadcast::channel(EVENT_CHANNEL); Ok(Self { conn: Mutex::new(conn), events, + inflight: Mutex::new(HashMap::new()), }) } @@ -229,10 +268,10 @@ impl Broker { &self, recipient: &str, timeout: std::time::Duration, - ) -> Result> { + ) -> Result> { let mut rx = self.subscribe(); - if let Some(m) = self.recv(recipient)? { - return Ok(Some(m)); + if let Some(d) = self.recv(recipient)? { + return Ok(Some(d)); } let deadline = tokio::time::Instant::now() + timeout; loop { @@ -246,8 +285,8 @@ impl Broker { // pop (in case we missed our notification while behind). Ok(Err(_)) => return self.recv(recipient), Ok(Ok(MessageEvent::Sent { to, .. })) if to == recipient => { - if let Some(m) = self.recv(recipient)? { - return Ok(Some(m)); + if let Some(d) = self.recv(recipient)? { + return Ok(Some(d)); } // Lost a race (concurrent recv elsewhere). Keep waiting. } @@ -256,22 +295,31 @@ impl Broker { } } - /// Delete delivered messages older than `older_than_secs`. Undelivered - /// rows are always kept regardless of age — those are still in flight + /// Delete fully-acked messages older than `older_than_secs`. + /// Unacked rows (delivered but not yet acknowledged by a clean + /// turn-end, plus undelivered rows) are always kept regardless of + /// age — the former because they're recoverable via + /// `requeue_inflight`, the latter because they're still in flight /// from the broker's POV. Returns the number of rows removed. pub fn vacuum_delivered(&self, older_than_secs: i64) -> Result { let cutoff = now_unix() - older_than_secs; let conn = self.conn.lock().unwrap(); let n = conn.execute( "DELETE FROM messages - WHERE delivered_at IS NOT NULL - AND delivered_at < ?1", + WHERE acked_at IS NOT NULL + AND acked_at < ?1", params![cutoff], )?; Ok(u64::try_from(n).unwrap_or(0)) } - pub fn recv(&self, recipient: &str) -> Result> { + pub fn recv(&self, recipient: &str) -> Result> { + // Lock order: inflight FIRST, then conn. `requeue_inflight` + + // `ack_turn` follow the same order so we never deadlock; the + // requeue path also needs both locks held together so a pop + // can't sneak in between its DB update + in-memory populate + // and miss the `redelivered` flag. + let mut inflight = self.inflight.lock().unwrap(); let conn = self.conn.lock().unwrap(); let row: Option<(i64, String, String, String)> = conn .query_row( @@ -291,14 +339,113 @@ impl Broker { "UPDATE messages SET delivered_at = ?1 WHERE id = ?2", params![now_unix(), id], )?; + // Track the id so the next `ack_turn(recipient)` can sweep it, + // and check whether it was resurfaced by a recent + // `requeue_inflight` (in which case the wake prompt gets the + // "may already be handled" hint). Both ops are O(1) per pop; + // the hash-set lookup runs at most once per delivery. + let slot = inflight.entry(recipient.to_owned()).or_default(); + slot.unacked_ids.push(id); + let redelivered = slot.requeued_ids.remove(&id); drop(conn); + drop(inflight); let _ = self.events.send(MessageEvent::Delivered { from: from.clone(), to: to.clone(), body: body.clone(), at: now_unix(), }); - Ok(Some(Message { from, to, body })) + Ok(Some(Delivery { + id, + redelivered, + message: Message { from, to, body }, + })) + } + + /// Drain the per-recipient unacked-id list and mark every row + /// `acked_at = NOW`. Fired by the harness after `TurnOutcome::Ok`. + /// Returns the number of rows acked (zero is normal — claude + /// may have not called recv during the turn). Tolerant of ids + /// that no longer exist in the DB (vacuumed, manually deleted) + /// — `UPDATE … WHERE id IN (…)` simply matches zero rows. + pub fn ack_turn(&self, recipient: &str) -> Result { + // Same lock order as `recv` and `requeue_inflight`. + let mut inflight = self.inflight.lock().unwrap(); + let ids: Vec = inflight + .get_mut(recipient) + .map(|s| std::mem::take(&mut s.unacked_ids)) + .unwrap_or_default(); + if ids.is_empty() { + return Ok(0); + } + let now = now_unix(); + let conn = self.conn.lock().unwrap(); + // Bind every id explicitly. Caps in the hundreds in the worst + // case (a single very chatty turn); well under sqlite's 999 + // default param limit and we're already serialising on the + // broker mutex. + let placeholders = std::iter::repeat_n("?", ids.len()) + .collect::>() + .join(","); + let sql = format!("UPDATE messages SET acked_at = ? WHERE id IN ({placeholders})"); + let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(ids.len() + 1); + params_vec.push(&now); + for id in &ids { + params_vec.push(id); + } + let n = conn.execute(&sql, params_vec.as_slice())?; + Ok(u64::try_from(n).unwrap_or(0)) + } + + /// Resurface every message the broker previously handed to this + /// recipient that never got `acked_at` set. Used by the harness at + /// boot to recover from the crashed-mid-turn / OOM-killed / + /// container-restarted cases. Three steps: + /// + /// 1. Clear any stale in-memory state for this recipient (the + /// previous harness session's `unacked_ids` are irrelevant — + /// the new session will repopulate from fresh pops). + /// 2. Find every row where `recipient = me`, `delivered_at IS NOT + /// NULL`, `acked_at IS NULL`. Reset `delivered_at = NULL` so + /// the next `Recv` pops them again. + /// 3. Remember each id in the per-recipient `requeued_ids` set so + /// the next pop tags the response with `redelivered: true`. + /// + /// Returns the number of rows requeued. Safe to call when there's + /// nothing in flight (returns 0). Safe to call multiple times + /// (idempotent — the second call finds nothing because the rows + /// are now back in the pending state). + pub fn requeue_inflight(&self, recipient: &str) -> Result { + // Hold inflight + conn together so a concurrent `recv` can't + // pop a just-requeued row between our DB update and our + // in-memory populate and miss the redelivered tag. + let mut inflight = self.inflight.lock().unwrap(); + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT id FROM messages + WHERE recipient = ?1 + AND delivered_at IS NOT NULL + AND acked_at IS NULL", + )?; + let ids: Vec = stmt + .query_map(params![recipient], |row| row.get(0))? + .collect::>()?; + drop(stmt); + if !ids.is_empty() { + let placeholders = std::iter::repeat_n("?", ids.len()) + .collect::>() + .join(","); + let sql = + format!("UPDATE messages SET delivered_at = NULL WHERE id IN ({placeholders})"); + let params_vec: Vec<&dyn rusqlite::ToSql> = + ids.iter().map(|id| id as &dyn rusqlite::ToSql).collect(); + conn.execute(&sql, params_vec.as_slice())?; + } + let slot = inflight.entry(recipient.to_owned()).or_default(); + slot.unacked_ids.clear(); + slot.requeued_ids.clear(); + slot.requeued_ids.extend(ids.iter().copied()); + Ok(u64::try_from(ids.len()).unwrap_or(0)) } /// Store a new reminder. Returns the reminder id. @@ -502,6 +649,30 @@ impl Broker { } } +/// Idempotent messages-table migrations. Adds `acked_at` and +/// back-fills it for every already-delivered row, so the +/// pre-migration sessions count as "fully handled" and won't be +/// resurfaced by the first `requeue_inflight` after upgrade. +fn ensure_message_columns(conn: &Connection) -> Result<()> { + let has: bool = conn + .prepare("SELECT 1 FROM pragma_table_info('messages') WHERE name = 'acked_at'")? + .exists([])?; + if !has { + conn.execute_batch("ALTER TABLE messages ADD COLUMN acked_at INTEGER;") + .context("add messages.acked_at column")?; + // Backfill: treat every existing delivered row as acked. The + // session it was delivered to is gone, so requeue would just + // surface phantom traffic to whatever harness reads next. + conn.execute( + "UPDATE messages SET acked_at = delivered_at \ + WHERE delivered_at IS NOT NULL AND acked_at IS NULL", + [], + ) + .context("backfill messages.acked_at from delivered_at")?; + } + Ok(()) +} + /// Idempotent reminder-table migrations. `ALTER TABLE ADD COLUMN` /// has no `IF NOT EXISTS` form in sqlite, so we probe /// `pragma_table_info` per column. New deploys (table created by @@ -538,3 +709,179 @@ fn now_unix() -> i64 { .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0) } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicU64, Ordering}; + + /// Per-process counter so each test gets a unique sqlite path even + /// when threads run concurrently. Avoids pulling in a `tempfile` + /// dep just for this one module. + static TEST_COUNTER: AtomicU64 = AtomicU64::new(0); + + struct TmpBroker { + path: std::path::PathBuf, + pub broker: Broker, + } + + impl Drop for TmpBroker { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.path); + } + } + + fn open_broker() -> TmpBroker { + let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed); + let pid = std::process::id(); + let path = std::env::temp_dir().join(format!("hive-broker-test-{pid}-{n}.sqlite")); + let _ = std::fs::remove_file(&path); + let broker = Broker::open(&path).expect("open broker"); + TmpBroker { path, broker } + } + + fn msg(from: &str, to: &str, body: &str) -> Message { + Message { + from: from.to_owned(), + to: to.to_owned(), + body: body.to_owned(), + } + } + + /// Happy path: send → recv → `ack_turn` drains the in-memory list + /// and marks the row `acked_at IS NOT NULL`. A second recv finds + /// nothing pending (the row stays in the table for vacuum). + #[test] + fn ack_turn_marks_delivered_rows_acked() { + let h = open_broker(); + let broker = &h.broker; + broker.send(&msg("a", "b", "hi")).unwrap(); + let d = broker.recv("b").unwrap().expect("popped"); + assert_eq!(d.message.body, "hi"); + assert!(!d.redelivered); + assert_eq!(broker.ack_turn("b").unwrap(), 1); + // ack_turn drained the unacked list; calling again is a no-op. + assert_eq!(broker.ack_turn("b").unwrap(), 0); + // Recv finds nothing — the row is now delivered + acked. + assert!(broker.recv("b").unwrap().is_none()); + } + + /// Crash-recovery: send → recv → (no ack) → `requeue_inflight` + /// resets `delivered_at` + tags the next pop as redelivered. After + /// that `ack_turn` closes it out cleanly. + #[test] + fn requeue_inflight_resurfaces_unacked_with_redelivered_flag() { + let h = open_broker(); + let broker = &h.broker; + broker.send(&msg("a", "b", "hi")).unwrap(); + let d1 = broker.recv("b").unwrap().expect("popped"); + assert!(!d1.redelivered); + // Simulate harness crash: never call ack_turn. Now boot the + // new harness — requeue_inflight resurfaces the row. + assert_eq!(broker.requeue_inflight("b").unwrap(), 1); + let d2 = broker.recv("b").unwrap().expect("popped again"); + assert_eq!(d2.message.body, "hi"); + assert!( + d2.redelivered, + "second pop should be tagged redelivered" + ); + assert_eq!(broker.ack_turn("b").unwrap(), 1); + } + + /// Idempotency: a second `requeue_inflight` on the same recipient + /// finds nothing because the prior call already reset + /// `delivered_at` (the row is back in the pending state, not + /// inflight). + #[test] + fn requeue_inflight_is_idempotent() { + let h = open_broker(); + let broker = &h.broker; + broker.send(&msg("a", "b", "hi")).unwrap(); + broker.recv("b").unwrap().expect("popped"); + assert_eq!(broker.requeue_inflight("b").unwrap(), 1); + // Second call: the row is pending (delivered_at IS NULL) so + // nothing matches the inflight filter. + assert_eq!(broker.requeue_inflight("b").unwrap(), 0); + } + + /// Multiple messages, partial drain: pop two, `ack_turn` covers + /// both even though one was popped before the other. + #[test] + fn ack_turn_handles_batch() { + let h = open_broker(); + let broker = &h.broker; + broker.send(&msg("a", "b", "one")).unwrap(); + broker.send(&msg("a", "b", "two")).unwrap(); + broker.send(&msg("a", "b", "three")).unwrap(); + broker.recv("b").unwrap().expect("popped 1"); + broker.recv("b").unwrap().expect("popped 2"); + broker.recv("b").unwrap().expect("popped 3"); + assert_eq!(broker.ack_turn("b").unwrap(), 3); + assert!(broker.recv("b").unwrap().is_none()); + } + + /// Vacuum filter respects the new `acked_at` semantics — a + /// delivered-but-not-acked row is NOT vacuumed regardless of + /// age (the requeue path needs it). + #[test] + fn vacuum_preserves_unacked_inflight_rows() { + let h = open_broker(); + let broker = &h.broker; + broker.send(&msg("a", "b", "stuck")).unwrap(); + broker.recv("b").unwrap().expect("popped"); + // Wide window — should still skip unacked rows. + let removed = broker.vacuum_delivered(-i64::from(u8::MAX)).unwrap(); + assert_eq!(removed, 0, "unacked inflight row must survive vacuum"); + // After ack_turn the row is fair game. + broker.ack_turn("b").unwrap(); + let removed = broker.vacuum_delivered(-i64::from(u8::MAX)).unwrap(); + assert_eq!(removed, 1, "acked row is now vacuumable"); + } + + /// Recv ordering: requeued rows go back into FIFO position + /// (they keep their original id). New sends added after the + /// requeue arrive after them. + #[test] + fn requeued_rows_come_back_in_original_order() { + let h = open_broker(); + let broker = &h.broker; + broker.send(&msg("a", "b", "first")).unwrap(); + broker.send(&msg("a", "b", "second")).unwrap(); + // Pop both, ack neither. + broker.recv("b").unwrap().expect("popped 1"); + broker.recv("b").unwrap().expect("popped 2"); + broker.requeue_inflight("b").unwrap(); + // Now add a brand new message AFTER the requeue. + broker.send(&msg("a", "b", "third")).unwrap(); + let d1 = broker.recv("b").unwrap().expect("re-pop 1"); + assert_eq!(d1.message.body, "first"); + assert!(d1.redelivered); + let d2 = broker.recv("b").unwrap().expect("re-pop 2"); + assert_eq!(d2.message.body, "second"); + assert!(d2.redelivered); + let d3 = broker.recv("b").unwrap().expect("re-pop 3"); + assert_eq!(d3.message.body, "third"); + assert!( + !d3.redelivered, + "fresh-send-after-requeue must NOT be tagged redelivered" + ); + } + + /// Per-recipient isolation: `requeue_inflight("a")` doesn't touch + /// b's inflight rows. + #[test] + fn requeue_inflight_is_per_recipient() { + let h = open_broker(); + let broker = &h.broker; + broker.send(&msg("x", "alice", "for alice")).unwrap(); + broker.send(&msg("x", "bob", "for bob")).unwrap(); + broker.recv("alice").unwrap().expect("popped alice"); + broker.recv("bob").unwrap().expect("popped bob"); + // Requeue only alice. Bob's row stays inflight. + assert_eq!(broker.requeue_inflight("alice").unwrap(), 1); + let d = broker.recv("alice").unwrap().expect("re-pop alice"); + assert!(d.redelivered); + // Bob has nothing pending (his row is still delivered, not requeued). + assert!(broker.recv("bob").unwrap().is_none()); + } +} diff --git a/hive-c0re/src/manager_server.rs b/hive-c0re/src/manager_server.rs index c6e7aa5..99fae61 100644 --- a/hive-c0re/src/manager_server.rs +++ b/hive-c0re/src/manager_server.rs @@ -138,9 +138,11 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResp .recv_blocking(MANAGER_AGENT, manager_recv_timeout(*wait_seconds)) .await { - Ok(Some(msg)) => ManagerResponse::Message { - from: msg.from, - body: msg.body, + Ok(Some(d)) => ManagerResponse::Message { + from: d.message.from, + body: d.message.body, + id: d.id, + redelivered: d.redelivered, }, Ok(None) => ManagerResponse::Empty, Err(e) => ManagerResponse::Err { @@ -358,6 +360,23 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResp |message| ManagerResponse::Err { message }, |()| ManagerResponse::Ok, ), + ManagerRequest::AckTurn => match coord.broker.ack_turn(MANAGER_AGENT) { + Ok(_n) => ManagerResponse::Ok, + Err(e) => ManagerResponse::Err { + message: format!("{e:#}"), + }, + }, + ManagerRequest::RequeueInflight => match coord.broker.requeue_inflight(MANAGER_AGENT) { + Ok(n) => { + if n > 0 { + tracing::info!(agent = %MANAGER_AGENT, requeued = %n, "requeued in-flight messages"); + } + ManagerResponse::Ok + } + Err(e) => ManagerResponse::Err { + message: format!("{e:#}"), + }, + }, } } diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index 47c84fb..4cd729c 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -358,6 +358,28 @@ pub enum AgentRequest { /// row. The manager surface uses the same wire variant but /// accepts any id. CancelLooseEnd { kind: CancelLooseEndKind, id: i64 }, + /// Mark every message popped by this agent since the last `AckTurn` + /// as fully handled. Fired by the harness after `TurnOutcome::Ok` + /// — claude doesn't see this surface, it's harness↔broker only. + /// On `TurnOutcome::Failed` the harness intentionally skips this + /// call, so the unacked rows stay in-flight in the DB and get + /// requeued by the next `RequeueInflight` on harness boot. Tracks + /// the popped-id list in-memory on the broker side; no payload + /// needed (the broker knows which ids it handed to this + /// recipient). + AckTurn, + /// Requeue every message the broker handed to this agent that + /// never got acked. Fired by the harness exactly once at boot, + /// before entering the serve loop — catches the + /// crashed-mid-turn / OOM-killed / container-restarted cases + /// where a previous harness session popped messages but never + /// drove them to a clean turn-end. Resets `delivered_at` on each + /// row back to NULL (so the next `Recv` pops it) and remembers + /// the id in a per-recipient in-memory set so the next `Recv` + /// can tag the message with `redelivered: true` (the harness + /// then prepends a "may already be handled" hint to the wake + /// prompt). Idempotent + cheap when there's nothing in flight. + RequeueInflight, } /// Responses on a per-agent socket. @@ -368,8 +390,22 @@ pub enum AgentResponse { Ok, /// Either `Send` failed or `Recv` errored. Err { message: String }, - /// `Recv` produced a message. - Message { from: String, body: String }, + /// `Recv` produced a message. `id` is the broker's row id — opaque + /// to claude (the MCP surface strips it before handing the body + /// to the model) but tracked by the harness so the broker's + /// in-memory unacked list can be drained on `AckTurn`. When + /// `redelivered = true` this row was popped earlier, never + /// acked (turn crash / OOM / restart), and resurfaced by + /// `RequeueInflight` — the harness prepends a "may already be + /// handled" hint to the wake prompt so claude can DTRT. + Message { + from: String, + body: String, + #[serde(default)] + id: i64, + #[serde(default)] + redelivered: bool, + }, /// `Recv` found nothing pending. Empty, /// `Status` result: how many pending messages are in this agent's inbox. @@ -668,6 +704,13 @@ pub enum ManagerRequest { /// can cancel any row (no owner check) — same dispatch as /// `AgentRequest::CancelLooseEnd` but with privileged auth. CancelLooseEnd { kind: CancelLooseEndKind, id: i64 }, + /// Mirror of `AgentRequest::AckTurn` on the manager surface — fired + /// by the manager harness after `TurnOutcome::Ok` to close out + /// every message popped during the turn. + AckTurn, + /// Mirror of `AgentRequest::RequeueInflight` on the manager + /// surface — fired exactly once on manager harness boot. + RequeueInflight, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -677,9 +720,18 @@ pub enum ManagerResponse { Err { message: String, }, + /// Same delivery shape as `AgentResponse::Message` — `id` + + /// `redelivered` carry the broker's row id and the + /// "previously popped, not acked" flag through the manager + /// surface so the manager harness drives the same + /// requeue-with-hint flow as a sub-agent. Message { from: String, body: String, + #[serde(default)] + id: i64, + #[serde(default)] + redelivered: bool, }, Empty, Status {