fix: subscribe-before-check in recv_blocking to avoid missed-wake race
This commit is contained in:
parent
4f56954422
commit
f78c6085b9
1 changed files with 11 additions and 1 deletions
|
|
@ -142,15 +142,25 @@ impl Broker {
|
||||||
/// emit a `Sent { to: recipient }` event, then retries the pop. Lets
|
/// emit a `Sent { to: recipient }` event, then retries the pop. Lets
|
||||||
/// agents react to new mail without polling their socket on a fixed
|
/// agents react to new mail without polling their socket on a fixed
|
||||||
/// interval.
|
/// interval.
|
||||||
|
///
|
||||||
|
/// **Subscribe-before-check order matters.** If we polled the sqlite
|
||||||
|
/// row first and only then called `subscribe()`, a concurrent `send`
|
||||||
|
/// landing in that window would commit + broadcast its event *before*
|
||||||
|
/// our receiver existed — and we'd then sit on the long-poll until
|
||||||
|
/// the timeout (or another, unrelated send) fired. That looked
|
||||||
|
/// externally like "the agent processed one wake then went deaf
|
||||||
|
/// until the operator poked it again". Subscribing first guarantees
|
||||||
|
/// any post-subscribe send notifies us; the redundant `recv()`
|
||||||
|
/// catches the message either way.
|
||||||
pub async fn recv_blocking(
|
pub async fn recv_blocking(
|
||||||
&self,
|
&self,
|
||||||
recipient: &str,
|
recipient: &str,
|
||||||
timeout: std::time::Duration,
|
timeout: std::time::Duration,
|
||||||
) -> Result<Option<Message>> {
|
) -> Result<Option<Message>> {
|
||||||
|
let mut rx = self.subscribe();
|
||||||
if let Some(m) = self.recv(recipient)? {
|
if let Some(m) = self.recv(recipient)? {
|
||||||
return Ok(Some(m));
|
return Ok(Some(m));
|
||||||
}
|
}
|
||||||
let mut rx = self.subscribe();
|
|
||||||
let deadline = tokio::time::Instant::now() + timeout;
|
let deadline = tokio::time::Instant::now() + timeout;
|
||||||
loop {
|
loop {
|
||||||
let Some(remaining) = deadline.checked_duration_since(tokio::time::Instant::now())
|
let Some(remaining) = deadline.checked_duration_since(tokio::time::Instant::now())
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue