agents wake on send: broker.recv_blocking + 30s long-poll on Recv

This commit is contained in:
müde 2026-05-15 16:00:31 +02:00
parent f1fd787f17
commit dfbcf2b9d1
3 changed files with 50 additions and 4 deletions

View file

@ -100,6 +100,42 @@ impl Broker {
Ok(u64::try_from(n.max(0)).unwrap_or(0))
}
/// Long-poll variant of `recv`: returns immediately if there's a
/// pending message; otherwise waits up to `timeout` for the broker to
/// emit a `Sent { to: recipient }` event, then retries the pop. Lets
/// agents react to new mail without polling their socket on a fixed
/// interval.
pub async fn recv_blocking(
&self,
recipient: &str,
timeout: std::time::Duration,
) -> Result<Option<Message>> {
if let Some(m) = self.recv(recipient)? {
return Ok(Some(m));
}
let mut rx = self.subscribe();
let deadline = tokio::time::Instant::now() + timeout;
loop {
let Some(remaining) = deadline.checked_duration_since(tokio::time::Instant::now())
else {
return Ok(None);
};
match tokio::time::timeout(remaining, rx.recv()).await {
Err(_) => return Ok(None),
// Channel lagged or closed — fall back to a single direct
// pop (in case we missed our notification while behind).
Ok(Err(_)) => return self.recv(recipient),
Ok(Ok(MessageEvent::Sent { to, .. })) if to == recipient => {
if let Some(m) = self.recv(recipient)? {
return Ok(Some(m));
}
// Lost a race (concurrent recv elsewhere). Keep waiting.
}
Ok(Ok(_)) => {}
}
}
}
pub fn recv(&self, recipient: &str) -> Result<Option<Message>> {
let conn = self.conn.lock().unwrap();
let row: Option<(i64, String, String, String)> = conn