diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index df1f599..e030f81 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -65,7 +65,7 @@ async fn serve(stream: UnixStream, agent: String, broker: Arc) -> Result return Ok(()); } let resp = match serde_json::from_str::(line.trim()) { - Ok(req) => dispatch(&req, &agent, &broker), + Ok(req) => dispatch(&req, &agent, &broker).await, Err(e) => AgentResponse::Err { message: format!("parse error: {e}"), }, @@ -77,7 +77,11 @@ async fn serve(stream: UnixStream, agent: String, broker: Arc) -> Result } } -fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse { +/// How long the long-poll `Recv` holds a connection open waiting for new +/// mail. Set well below typical TCP/proxy idle limits. +const RECV_LONG_POLL: std::time::Duration = std::time::Duration::from_secs(30); + +async fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse { match req { AgentRequest::Send { to, body } => { match broker.send(&Message { @@ -91,7 +95,7 @@ fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse { }, } } - AgentRequest::Recv => match broker.recv(agent) { + AgentRequest::Recv => match broker.recv_blocking(agent, RECV_LONG_POLL).await { Ok(Some(msg)) => AgentResponse::Message { from: msg.from, body: msg.body, diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index eb8a2ec..4ffc402 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -100,6 +100,42 @@ impl Broker { Ok(u64::try_from(n.max(0)).unwrap_or(0)) } + /// Long-poll variant of `recv`: returns immediately if there's a + /// pending message; otherwise waits up to `timeout` for the broker to + /// emit a `Sent { to: recipient }` event, then retries the pop. Lets + /// agents react to new mail without polling their socket on a fixed + /// interval. + pub async fn recv_blocking( + &self, + recipient: &str, + timeout: std::time::Duration, + ) -> Result> { + if let Some(m) = self.recv(recipient)? { + return Ok(Some(m)); + } + let mut rx = self.subscribe(); + let deadline = tokio::time::Instant::now() + timeout; + loop { + let Some(remaining) = deadline.checked_duration_since(tokio::time::Instant::now()) + else { + return Ok(None); + }; + match tokio::time::timeout(remaining, rx.recv()).await { + Err(_) => return Ok(None), + // Channel lagged or closed — fall back to a single direct + // pop (in case we missed our notification while behind). + Ok(Err(_)) => return self.recv(recipient), + Ok(Ok(MessageEvent::Sent { to, .. })) if to == recipient => { + if let Some(m) = self.recv(recipient)? { + return Ok(Some(m)); + } + // Lost a race (concurrent recv elsewhere). Keep waiting. + } + Ok(Ok(_)) => {} + } + } + } + pub fn recv(&self, recipient: &str) -> Result> { let conn = self.conn.lock().unwrap(); let row: Option<(i64, String, String, String)> = conn diff --git a/hive-c0re/src/manager_server.rs b/hive-c0re/src/manager_server.rs index 38c6005..ce21bfd 100644 --- a/hive-c0re/src/manager_server.rs +++ b/hive-c0re/src/manager_server.rs @@ -69,6 +69,8 @@ async fn serve(stream: UnixStream, coord: Arc) -> Result<()> { } } +const MANAGER_RECV_LONG_POLL: std::time::Duration = std::time::Duration::from_secs(30); + async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse { match req { ManagerRequest::Send { to, body } => match coord.broker.send(&Message { @@ -97,7 +99,11 @@ async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse message: format!("{e:#}"), }, }, - ManagerRequest::Recv => match coord.broker.recv(MANAGER_AGENT) { + ManagerRequest::Recv => match coord + .broker + .recv_blocking(MANAGER_AGENT, MANAGER_RECV_LONG_POLL) + .await + { Ok(Some(msg)) => ManagerResponse::Message { from: msg.from, body: msg.body,