sse: seq plumbing + subscribe-first dedupe dance

This commit is contained in:
müde 2026-05-17 12:26:00 +02:00
parent 8c186d4fb7
commit 1340a654e7
5 changed files with 197 additions and 37 deletions

View file

@ -3,6 +3,7 @@
use std::path::Path;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result};
@ -50,12 +51,14 @@ pub type DueReminder = (String, i64, String, Option<String>);
#[serde(rename_all = "snake_case", tag = "kind")]
pub enum MessageEvent {
Sent {
seq: u64,
from: String,
to: String,
body: String,
at: i64,
},
Delivered {
seq: u64,
from: String,
to: String,
body: String,
@ -66,6 +69,13 @@ pub enum MessageEvent {
pub struct Broker {
conn: Mutex<Connection>,
events: broadcast::Sender<MessageEvent>,
/// Monotonic per-process counter stamped onto every emitted
/// `MessageEvent`. Persisted nowhere — clients always treat a hive-c0re
/// restart as "everything is new" (fresh snapshot, fresh stream of
/// seqs starting at 1). Historical rows replayed via `recent_all`
/// carry `seq = 0` since they predate the live stream the seq is
/// meant to dedupe against.
event_seq: AtomicU64,
}
impl Broker {
@ -81,6 +91,7 @@ impl Broker {
Ok(Self {
conn: Mutex::new(conn),
events,
event_seq: AtomicU64::new(0),
})
}
@ -88,6 +99,20 @@ impl Broker {
self.events.subscribe()
}
/// Current high-water seq. Snapshot endpoints read this *before*
/// gathering state so the resulting (snapshot.seq, snapshot) pair
/// satisfies: any live event with seq > snapshot.seq is post-snapshot
/// (not yet reflected); any with seq <= snapshot.seq either pre-dates
/// the snapshot or was already captured by it. Clients dedupe their
/// buffered SSE traffic against this value.
pub fn current_seq(&self) -> u64 {
self.event_seq.load(Ordering::SeqCst)
}
fn next_seq(&self) -> u64 {
self.event_seq.fetch_add(1, Ordering::SeqCst) + 1
}
pub fn send(&self, message: &Message) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute(
@ -96,6 +121,7 @@ impl Broker {
)?;
drop(conn);
let _ = self.events.send(MessageEvent::Sent {
seq: self.next_seq(),
from: message.from.clone(),
to: message.to.clone(),
body: message.body.clone(),
@ -149,6 +175,11 @@ impl Broker {
)?;
let rows = stmt.query_map(params![limit_i], |row| {
Ok(MessageEvent::Sent {
// Historical events: seq=0 (never compared against live
// seqs). Live dedupe windows close against
// history_seq = broker.current_seq() captured at fetch
// time, not against per-row seqs.
seq: 0,
from: row.get(0)?,
to: row.get(1)?,
body: row.get(2)?,
@ -256,6 +287,7 @@ impl Broker {
)?;
drop(conn);
let _ = self.events.send(MessageEvent::Delivered {
seq: self.next_seq(),
from: from.clone(),
to: to.clone(),
body: body.clone(),
@ -332,6 +364,7 @@ impl Broker {
tx.commit()?;
drop(conn);
let _ = self.events.send(MessageEvent::Sent {
seq: self.next_seq(),
from: "reminder".to_owned(),
to: agent.to_owned(),
body: message.to_owned(),