From 1340a654e72a38cd984c984581f5c862f373a855 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?m=C3=BCde?= Date: Sun, 17 May 2026 12:26:00 +0200 Subject: [PATCH] sse: seq plumbing + subscribe-first dedupe dance --- hive-ag3nt/src/events.rs | 45 +++++++++++++-- hive-ag3nt/src/web_ui.rs | 23 ++++++-- hive-c0re/src/broker.rs | 33 +++++++++++ hive-c0re/src/dashboard.rs | 30 +++++++++- hive-fr0nt/assets/terminal.js | 103 +++++++++++++++++++++++++--------- 5 files changed, 197 insertions(+), 37 deletions(-) diff --git a/hive-ag3nt/src/events.rs b/hive-ag3nt/src/events.rs index 0588c82..e894483 100644 --- a/hive-ag3nt/src/events.rs +++ b/hive-ag3nt/src/events.rs @@ -9,7 +9,7 @@ //! showing "connecting…" until the first event arrives. use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use rusqlite::{Connection, params}; @@ -74,6 +74,18 @@ CREATE TABLE IF NOT EXISTS events ( CREATE INDEX IF NOT EXISTS idx_events_ts ON events (ts); "; +/// Envelope carried over the broadcast channel: the `LiveEvent` itself +/// plus a monotonic per-process seq stamped by `Bus::emit`. SSE consumers +/// serialize this directly (seq becomes a sibling of the `kind` tag); +/// clients use seq to dedupe their buffered live traffic against the +/// snapshot/history responses (drop anything with `seq <= snapshot.seq`). +#[derive(Debug, Clone, Serialize)] +pub struct BusEvent { + pub seq: u64, + #[serde(flatten)] + pub event: LiveEvent, +} + /// One row of the agent's live stream. Serialised to JSON for SSE delivery. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] @@ -216,7 +228,13 @@ pub const DEFAULT_MODEL: &str = "haiku"; #[derive(Clone)] pub struct Bus { - tx: Arc>, + tx: Arc>, + /// Monotonic per-process counter stamped onto every `BusEvent`. + /// Persisted nowhere — a harness restart resets seq to 0; clients + /// always treat reconnect as "fresh state, fresh stream of seqs." + /// Historical events served from sqlite carry no seq (they predate + /// the live channel the seq is meant to dedupe against). + event_seq: Arc, /// Persistent event log. `None` only if opening the sqlite db failed /// at construction — we keep going so the harness doesn't die on a /// missing state dir mount in dev / test scenarios. @@ -258,6 +276,7 @@ impl Bus { let initial_model = load_model().unwrap_or_else(|| DEFAULT_MODEL.to_owned()); Self { tx: Arc::new(tx), + event_seq: Arc::new(AtomicU64::new(0)), store, state: Arc::new(Mutex::new((TurnState::Idle, now_unix()))), model: Arc::new(Mutex::new(initial_model)), @@ -266,6 +285,20 @@ impl Bus { } } + /// Current high-water seq. Snapshot endpoints read this before + /// gathering state so the resulting (snapshot.seq, snapshot) pair + /// satisfies: any live event with seq > snapshot.seq is post-snapshot + /// (not yet reflected). Clients dedupe buffered SSE traffic against + /// this value. + #[must_use] + pub fn current_seq(&self) -> u64 { + self.event_seq.load(Ordering::SeqCst) + } + + fn next_seq(&self) -> u64 { + self.event_seq.fetch_add(1, Ordering::SeqCst) + 1 + } + /// Arm the one-shot: the next claude invocation will run without /// `--continue`, dropping any prior session context. Idempotent /// — calling twice in a row before the next turn still consumes @@ -333,11 +366,15 @@ impl Bus { { tracing::warn!(error = ?e, "events: append failed"); } + let envelope = BusEvent { + seq: self.next_seq(), + event, + }; // Lagged subscribers drop events — fine; the UI is a tail, not a log. - let _ = self.tx.send(event); + let _ = self.tx.send(envelope); } - pub fn subscribe(&self) -> broadcast::Receiver { + pub fn subscribe(&self) -> broadcast::Receiver { self.tx.subscribe() } diff --git a/hive-ag3nt/src/web_ui.rs b/hive-ag3nt/src/web_ui.rs index 5eb13fc..941eb29 100644 --- a/hive-ag3nt/src/web_ui.rs +++ b/hive-ag3nt/src/web_ui.rs @@ -191,6 +191,12 @@ async fn serve_shared_js() -> impl IntoResponse { #[derive(Serialize)] struct StateSnapshot { + /// Bus seq at the moment this snapshot was assembled. Clients dedupe + /// their buffered SSE traffic against this value: events with + /// `seq <= snapshot.seq` are already reflected (or pre-date the + /// snapshot); `seq > snapshot.seq` is post-snapshot. Reset to 0 on + /// harness restart — clients treat reconnect as a fresh world. + seq: u64, label: String, dashboard_port: u16, /// `"online"` | `"needs_login_idle"` | `"needs_login_in_progress"`. @@ -226,6 +232,9 @@ struct SessionView { } async fn api_state(State(state): State) -> axum::Json { + // Capture seq *before* any reads so the dedupe contract is + // "events with seq > snapshot.seq are post-snapshot, never missed." + let seq = state.bus.current_seq(); drop_if_finished(&state.session); let login = *state.login.lock().unwrap(); let session_snapshot = state.session.lock().unwrap().clone(); @@ -251,6 +260,7 @@ async fn api_state(State(state): State) -> axum::Json { let model = state.bus.model(); let token_usage = state.bus.last_usage(); axum::Json(StateSnapshot { + seq, label: state.label.clone(), dashboard_port, status, @@ -338,10 +348,15 @@ async fn post_send(State(state): State, Form(form): Form) -> } } -async fn events_history( - State(state): State, -) -> axum::Json> { - axum::Json(state.bus.history()) +async fn events_history(State(state): State) -> axum::Json { + // Capture seq *before* the read so dedupe is "drop buffered events + // you've already seen in history", never "lose an event that fired + // between the read and the timestamp." Historical rows have no + // per-row seq; only the high-water mark matters for the dedupe + // window. + let seq = state.bus.current_seq(); + let events = state.bus.history(); + axum::Json(serde_json::json!({ "seq": seq, "events": events })) } async fn events_stream( diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 93931d3..892acc5 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -3,6 +3,7 @@ use std::path::Path; use std::sync::Mutex; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result}; @@ -50,12 +51,14 @@ pub type DueReminder = (String, i64, String, Option); #[serde(rename_all = "snake_case", tag = "kind")] pub enum MessageEvent { Sent { + seq: u64, from: String, to: String, body: String, at: i64, }, Delivered { + seq: u64, from: String, to: String, body: String, @@ -66,6 +69,13 @@ pub enum MessageEvent { pub struct Broker { conn: Mutex, events: broadcast::Sender, + /// Monotonic per-process counter stamped onto every emitted + /// `MessageEvent`. Persisted nowhere — clients always treat a hive-c0re + /// restart as "everything is new" (fresh snapshot, fresh stream of + /// seqs starting at 1). Historical rows replayed via `recent_all` + /// carry `seq = 0` since they predate the live stream the seq is + /// meant to dedupe against. + event_seq: AtomicU64, } impl Broker { @@ -81,6 +91,7 @@ impl Broker { Ok(Self { conn: Mutex::new(conn), events, + event_seq: AtomicU64::new(0), }) } @@ -88,6 +99,20 @@ impl Broker { self.events.subscribe() } + /// Current high-water seq. Snapshot endpoints read this *before* + /// gathering state so the resulting (snapshot.seq, snapshot) pair + /// satisfies: any live event with seq > snapshot.seq is post-snapshot + /// (not yet reflected); any with seq <= snapshot.seq either pre-dates + /// the snapshot or was already captured by it. Clients dedupe their + /// buffered SSE traffic against this value. + pub fn current_seq(&self) -> u64 { + self.event_seq.load(Ordering::SeqCst) + } + + fn next_seq(&self) -> u64 { + self.event_seq.fetch_add(1, Ordering::SeqCst) + 1 + } + pub fn send(&self, message: &Message) -> Result<()> { let conn = self.conn.lock().unwrap(); conn.execute( @@ -96,6 +121,7 @@ impl Broker { )?; drop(conn); let _ = self.events.send(MessageEvent::Sent { + seq: self.next_seq(), from: message.from.clone(), to: message.to.clone(), body: message.body.clone(), @@ -149,6 +175,11 @@ impl Broker { )?; let rows = stmt.query_map(params![limit_i], |row| { Ok(MessageEvent::Sent { + // Historical events: seq=0 (never compared against live + // seqs). Live dedupe windows close against + // history_seq = broker.current_seq() captured at fetch + // time, not against per-row seqs. + seq: 0, from: row.get(0)?, to: row.get(1)?, body: row.get(2)?, @@ -256,6 +287,7 @@ impl Broker { )?; drop(conn); let _ = self.events.send(MessageEvent::Delivered { + seq: self.next_seq(), from: from.clone(), to: to.clone(), body: body.clone(), @@ -332,6 +364,7 @@ impl Broker { tx.commit()?; drop(conn); let _ = self.events.send(MessageEvent::Sent { + seq: self.next_seq(), from: "reminder".to_owned(), to: agent.to_owned(), body: message.to_owned(), diff --git a/hive-c0re/src/dashboard.rs b/hive-c0re/src/dashboard.rs index 6da925c..22ebcbd 100644 --- a/hive-c0re/src/dashboard.rs +++ b/hive-c0re/src/dashboard.rs @@ -144,6 +144,14 @@ async fn serve_shared_js() -> impl IntoResponse { #[derive(Serialize)] struct StateSnapshot { + /// Broker seq at the moment this snapshot was assembled. Clients + /// dedupe their buffered SSE traffic against this value: any + /// `MessageEvent` with `seq <= snapshot.seq` is already reflected in + /// the snapshot (or pre-dates it); anything with `seq > snapshot.seq` + /// is post-snapshot and should be applied. Set to 0 in the + /// pre-emit case (no events ever fired) — clients treat that as + /// "apply everything you've buffered". + seq: u64, hostname: String, manager_port: u16, any_stale: bool, @@ -285,6 +293,14 @@ async fn api_state(headers: HeaderMap, State(state): State) -> axum::J .unwrap_or("localhost"); let hostname = host.split(':').next().unwrap_or(host).to_owned(); + // Capture the broker seq *before* any read so the dedupe contract + // is "events with seq > snapshot.seq are post-snapshot, never + // missed." A broker event landing during snapshot construction may + // be doubly applied (snapshot caught the write + client also + // applies the SSE event) — that's a renderer's problem to make + // idempotent, not ours to avoid here. + let seq = state.coord.broker.current_seq(); + let raw_containers = log_default("nixos-container list", lifecycle::list().await); let current_rev = crate::auto_update::current_flake_rev(&state.coord.hyperhive_flake); let transient_snapshot = state.coord.transient_snapshot(); @@ -319,6 +335,7 @@ async fn api_state(headers: HeaderMap, State(state): State) -> axum::J log_default("questions.recent_answered", state.coord.questions.recent_answered(20)); axum::Json(StateSnapshot { + seq, hostname, manager_port: MANAGER_PORT, any_stale, @@ -711,15 +728,22 @@ fn dir_size_bytes(root: &Path) -> u64 { async fn messages_history(State(state): State) -> Response { // Backfill source for the dashboard message-flow terminal. Returns // up to ~200 historical broker messages as `MessageEvent::Sent` JSON - // — same shape as the live `/messages/stream`, so the renderer - // doesn't branch on history vs. live. + // wrapped in `{ seq, events }`. The seq is the broker's high water + // mark at fetch time; clients use it to dedupe their buffered live + // SSE traffic (drop anything with `seq <= history_seq`) so a message + // that lands between SSE-subscribe and history-fetch isn't shown + // twice and isn't lost. const HISTORY_LIMIT: u64 = 200; + // Capture seq *before* the query so the dedupe contract is + // "drop buffered events you've already seen in history" — never + // "lose an event that fired between the read and the timestamp." + let seq = state.coord.broker.current_seq(); match state.coord.broker.recent_all(HISTORY_LIMIT) { Ok(mut events) => { // recent_all returns newest-first; reverse so the replay // builds chronologically (matches the agent /events/history). events.reverse(); - axum::Json(events).into_response() + axum::Json(serde_json::json!({ "seq": seq, "events": events })).into_response() } Err(e) => error_response(&format!("messages/history failed: {e:#}")), } diff --git a/hive-fr0nt/assets/terminal.js b/hive-fr0nt/assets/terminal.js index 625af28..11dc051 100644 --- a/hive-fr0nt/assets/terminal.js +++ b/hive-fr0nt/assets/terminal.js @@ -166,36 +166,35 @@ } } - async function backfill() { - if (!opts.historyUrl) { - if (opts.onBackfillDone) opts.onBackfillDone(0); - return; - } - try { - const resp = await fetch(opts.historyUrl); - if (!resp.ok) { - if (opts.onBackfillDone) opts.onBackfillDone(0); - return; - } - const events = await resp.json(); - currentNoAnim = true; - for (const ev of events) dispatch(ev, true); - currentNoAnim = false; - if (events.length) row('note', '─── live (older above) ───'); - else placeholder('(connected — waiting for events)'); - if (opts.onBackfillDone) opts.onBackfillDone(events.length); - } catch (err) { - console.warn('history backfill failed', err); - if (opts.onBackfillDone) opts.onBackfillDone(0); - } - } + // Subscribe → buffer → fetch history → dedupe → apply. + // + // Race the SSE subscription opens before the history fetch starts. + // Live events that land before history resolves are buffered, not + // rendered. Once the history response (`{ seq, events }`) arrives we: + // 1. Replay `events` (fromHistory=true). + // 2. Drop buffered events with `seq <= history.seq` — they're + // already reflected in the history rows above. + // 3. Apply remaining buffered events (fromHistory=false). + // 4. Switch to live mode: each new SSE event dispatches immediately. + // + // Without this dance an event that fires between history-fetch and + // SSE-subscribe goes missing; without seq dedupe the same event + // shows twice (once via history, once via live buffer). Both bugs + // were latent before. + // + // If `historyUrl` is unset we skip the dance: buffered events apply + // as live the moment the buffer flushes (no dedupe possible without + // a boundary seq). + function start() { + let live = false; + let buffered = []; - function subscribe() { const es = new EventSource(opts.streamUrl); es.onmessage = (e) => { let ev; try { ev = JSON.parse(e.data); } catch (err) { row('note', '[parse err] ' + e.data); return; } + if (!live) { buffered.push(ev); return; } dispatch(ev, false); if (opts.onLiveEvent) { try { opts.onLiveEvent(ev); } @@ -206,10 +205,62 @@ if (es.readyState === EventSource.CONNECTING) row('note', '[reconnecting…]'); else row('note', '[disconnected]'); }; - return es; + + function flushBuffered(boundarySeq) { + const drained = buffered; + buffered = []; + live = true; + for (const ev of drained) { + // ev.seq is set by the server on live frames; absent/0 means + // "no dedupe possible, apply." Historical replays via the + // history endpoint carry no seq either way. + if (boundarySeq != null && typeof ev.seq === 'number' && ev.seq <= boundarySeq) { + continue; + } + dispatch(ev, false); + if (opts.onLiveEvent) { + try { opts.onLiveEvent(ev); } + catch (err) { console.error('onLiveEvent threw', err); } + } + } + } + + async function backfill() { + if (!opts.historyUrl) { + flushBuffered(null); + if (opts.onBackfillDone) opts.onBackfillDone(0); + return; + } + try { + const resp = await fetch(opts.historyUrl); + if (!resp.ok) { + flushBuffered(null); + if (opts.onBackfillDone) opts.onBackfillDone(0); + return; + } + const body = await resp.json(); + // Accept the envelope `{ seq, events }`. A bare array means + // the server hasn't been updated to include seq yet — treat + // it as "no dedupe possible." + const events = Array.isArray(body) ? body : (body.events || []); + const boundarySeq = Array.isArray(body) ? null : (body.seq ?? null); + currentNoAnim = true; + for (const ev of events) dispatch(ev, true); + currentNoAnim = false; + if (events.length) row('note', '─── live (older above) ───'); + else placeholder('(connected — waiting for events)'); + flushBuffered(boundarySeq); + if (opts.onBackfillDone) opts.onBackfillDone(events.length); + } catch (err) { + console.warn('history backfill failed', err); + flushBuffered(null); + if (opts.onBackfillDone) opts.onBackfillDone(0); + } + } + return backfill(); } - const ready = backfill().then(subscribe); + const ready = start(); return { row, details, detailsDiff, placeholder, ready }; }