From f78c6085b924a1b104ca5e0f549c2600db0dfe1b Mon Sep 17 00:00:00 2001 From: damocles Date: Sun, 17 May 2026 02:42:06 +0200 Subject: [PATCH] fix: subscribe-before-check in recv_blocking to avoid missed-wake race --- hive-c0re/src/broker.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index c1bae45..c908558 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -142,15 +142,25 @@ impl Broker { /// emit a `Sent { to: recipient }` event, then retries the pop. Lets /// agents react to new mail without polling their socket on a fixed /// interval. + /// + /// **Subscribe-before-check order matters.** If we polled the sqlite + /// row first and only then called `subscribe()`, a concurrent `send` + /// landing in that window would commit + broadcast its event *before* + /// our receiver existed — and we'd then sit on the long-poll until + /// the timeout (or another, unrelated send) fired. That looked + /// externally like "the agent processed one wake then went deaf + /// until the operator poked it again". Subscribing first guarantees + /// any post-subscribe send notifies us; the redundant `recv()` + /// catches the message either way. pub async fn recv_blocking( &self, recipient: &str, timeout: std::time::Duration, ) -> Result> { + let mut rx = self.subscribe(); if let Some(m) = self.recv(recipient)? { return Ok(Some(m)); } - let mut rx = self.subscribe(); let deadline = tokio::time::Instant::now() + timeout; loop { let Some(remaining) = deadline.checked_duration_since(tokio::time::Instant::now())