From a47879291445336012e402035a66dccf8977b65f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?m=C3=BCde?= Date: Sun, 17 May 2026 12:39:48 +0200 Subject: [PATCH] dashboard events: unified coord channel + /dashboard/{stream,history}; broker forwards --- hive-c0re/assets/app.js | 8 ++-- hive-c0re/src/broker.rs | 38 ++------------- hive-c0re/src/coordinator.rs | 51 ++++++++++++++++++++ hive-c0re/src/dashboard.rs | 80 ++++++++++++++++++++----------- hive-c0re/src/dashboard_events.rs | 47 ++++++++++++++++++ hive-c0re/src/main.rs | 47 ++++++++++++++++++ 6 files changed, 205 insertions(+), 66 deletions(-) create mode 100644 hive-c0re/src/dashboard_events.rs diff --git a/hive-c0re/assets/app.js b/hive-c0re/assets/app.js index bafe380..02a0f71 100644 --- a/hive-c0re/assets/app.js +++ b/hive-c0re/assets/app.js @@ -1,6 +1,6 @@ // Dashboard SPA. Renders containers + approvals from `/api/state`, wires // up async-form submission (URL-encoded POST + spinner + state refresh), -// and tails the broker over `/messages/stream` SSE. +// and tails the unified dashboard event channel over `/dashboard/stream`. (() => { // ─── helpers ──────────────────────────────────────────────────────────── @@ -600,7 +600,7 @@ // ─── operator inbox (derived from the broker message stream) ─────────── // No longer shipped on `/api/state.operator_inbox`. The dashboard // terminal's HiveTerminal feeds this via `onAnyEvent` — backfill from - // `/messages/history` populates on load, live SSE keeps it current. + // `/dashboard/history` populates on load, live SSE keeps it current. // Newest-first to match the previous behaviour. const INBOX_LIMIT = 50; const operatorInbox = []; @@ -1009,8 +1009,8 @@ } HiveTerminal.create({ logEl: flow, - historyUrl: '/messages/history', - streamUrl: '/messages/stream', + historyUrl: '/dashboard/history', + streamUrl: '/dashboard/stream', renderers: { sent: (ev, api) => renderMsg(ev, api, '→'), delivered: (ev, api) => renderMsg(ev, api, '✓'), diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 892acc5..c25d2c1 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -3,7 +3,6 @@ use std::path::Path; use std::sync::Mutex; -use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result}; @@ -47,18 +46,21 @@ const EVENT_CHANNEL: usize = 256; /// self-documenting. pub type DueReminder = (String, i64, String, Option); +/// Intra-process broker event. `recv_blocking` listens on the same +/// channel as the dashboard forwarder; the forwarder re-emits each +/// event as a `DashboardEvent` with a freshly-stamped seq from the +/// Coordinator. The broker itself doesn't stamp seqs — that's a wire +/// concern, not a storage concern. #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "snake_case", tag = "kind")] pub enum MessageEvent { Sent { - seq: u64, from: String, to: String, body: String, at: i64, }, Delivered { - seq: u64, from: String, to: String, body: String, @@ -69,13 +71,6 @@ pub enum MessageEvent { pub struct Broker { conn: Mutex, events: broadcast::Sender, - /// Monotonic per-process counter stamped onto every emitted - /// `MessageEvent`. Persisted nowhere — clients always treat a hive-c0re - /// restart as "everything is new" (fresh snapshot, fresh stream of - /// seqs starting at 1). Historical rows replayed via `recent_all` - /// carry `seq = 0` since they predate the live stream the seq is - /// meant to dedupe against. - event_seq: AtomicU64, } impl Broker { @@ -91,7 +86,6 @@ impl Broker { Ok(Self { conn: Mutex::new(conn), events, - event_seq: AtomicU64::new(0), }) } @@ -99,20 +93,6 @@ impl Broker { self.events.subscribe() } - /// Current high-water seq. Snapshot endpoints read this *before* - /// gathering state so the resulting (snapshot.seq, snapshot) pair - /// satisfies: any live event with seq > snapshot.seq is post-snapshot - /// (not yet reflected); any with seq <= snapshot.seq either pre-dates - /// the snapshot or was already captured by it. Clients dedupe their - /// buffered SSE traffic against this value. - pub fn current_seq(&self) -> u64 { - self.event_seq.load(Ordering::SeqCst) - } - - fn next_seq(&self) -> u64 { - self.event_seq.fetch_add(1, Ordering::SeqCst) + 1 - } - pub fn send(&self, message: &Message) -> Result<()> { let conn = self.conn.lock().unwrap(); conn.execute( @@ -121,7 +101,6 @@ impl Broker { )?; drop(conn); let _ = self.events.send(MessageEvent::Sent { - seq: self.next_seq(), from: message.from.clone(), to: message.to.clone(), body: message.body.clone(), @@ -175,11 +154,6 @@ impl Broker { )?; let rows = stmt.query_map(params![limit_i], |row| { Ok(MessageEvent::Sent { - // Historical events: seq=0 (never compared against live - // seqs). Live dedupe windows close against - // history_seq = broker.current_seq() captured at fetch - // time, not against per-row seqs. - seq: 0, from: row.get(0)?, to: row.get(1)?, body: row.get(2)?, @@ -287,7 +261,6 @@ impl Broker { )?; drop(conn); let _ = self.events.send(MessageEvent::Delivered { - seq: self.next_seq(), from: from.clone(), to: to.clone(), body: body.clone(), @@ -364,7 +337,6 @@ impl Broker { tx.commit()?; drop(conn); let _ = self.events.send(MessageEvent::Sent { - seq: self.next_seq(), from: "reminder".to_owned(), to: agent.to_owned(), body: message.to_owned(), diff --git a/hive-c0re/src/coordinator.rs b/hive-c0re/src/coordinator.rs index bd8fd07..4df990e 100644 --- a/hive-c0re/src/coordinator.rs +++ b/hive-c0re/src/coordinator.rs @@ -4,15 +4,23 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; +use tokio::sync::broadcast; use crate::agent_server::{self, AgentSocket}; use crate::approvals::Approvals; use crate::broker::Broker; +use crate::dashboard_events::DashboardEvent; use crate::operator_questions::OperatorQuestions; +/// Capacity of the dashboard event channel. Slow browser subscribers +/// (idle tab, throttled connection) drop frames past this — that's +/// fine, the seq dedupe makes a reconnect resync safe. +const DASHBOARD_CHANNEL: usize = 256; + const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents"; const MANAGER_RUNTIME_ROOT: &str = "/run/hyperhive/manager"; /// Manager-editable per-agent config repos. Bind-mounted RW into the manager @@ -47,6 +55,15 @@ pub struct Coordinator { /// Read by the dashboard to render a spinner; cleared when the action /// resolves (success or failure). transient: Mutex>, + /// Unified wire-facing event channel feeding the dashboard SSE + /// stream. Carries broker messages (mirrored from `broker.subscribe` + /// by the forwarder task in `main.rs`) and dashboard-only mutation + /// events (approval added/resolved, question added/answered, etc.). + /// Snapshot endpoints capture `event_seq` before reading state so + /// the client can dedupe its buffered live traffic against the + /// snapshot. + dashboard_events: broadcast::Sender, + event_seq: AtomicU64, } /// Per-agent in-progress state that the dashboard surfaces between approve @@ -98,6 +115,7 @@ impl Coordinator { let broker = Broker::open(db_path).context("open broker")?; let approvals = Approvals::open(db_path).context("open approvals")?; let questions = OperatorQuestions::open(db_path).context("open operator_questions")?; + let (dashboard_events, _) = broadcast::channel(DASHBOARD_CHANNEL); Ok(Self { broker: Arc::new(broker), approvals: Arc::new(approvals), @@ -107,9 +125,42 @@ impl Coordinator { operator_pronouns, agents: Mutex::new(HashMap::new()), transient: Mutex::new(HashMap::new()), + dashboard_events, + event_seq: AtomicU64::new(0), }) } + /// Subscribe to the unified dashboard event channel. Used by the + /// `/dashboard/stream` SSE handler and by the broker-to-dashboard + /// forwarder task. + pub fn dashboard_subscribe(&self) -> broadcast::Receiver { + self.dashboard_events.subscribe() + } + + /// Stamp the next sequence number. Each emission of a + /// `DashboardEvent` should fill its `seq` with `next_seq()` so the + /// frame the wire carries is the one the client uses to dedupe. + pub fn next_seq(&self) -> u64 { + self.event_seq.fetch_add(1, Ordering::SeqCst) + 1 + } + + /// Current high-water seq. Snapshot endpoints read this *before* + /// gathering state so the (snapshot.seq, snapshot) pair satisfies: + /// any frame with `seq > snapshot.seq` is post-snapshot. The seq + /// captured here may grow during snapshot construction — clients + /// may double-apply such events, which renderers must tolerate. + pub fn current_seq(&self) -> u64 { + self.event_seq.load(Ordering::SeqCst) + } + + /// Broadcast a freshly-built `DashboardEvent` (caller fills `seq` + /// via `next_seq()`). Returns silently when there are no + /// subscribers — the dashboard channel is best-effort presentation + /// plumbing, not a delivery guarantee. + pub fn emit_dashboard_event(&self, event: DashboardEvent) { + let _ = self.dashboard_events.send(event); + } + pub fn register_agent(self: &Arc, name: &str) -> Result { // Idempotent: drop any existing listener so re-registration (e.g. on rebuild, // or after a hive-c0re restart cleared /run/hyperhive) gets a fresh socket. diff --git a/hive-c0re/src/dashboard.rs b/hive-c0re/src/dashboard.rs index d6f06e0..90880b5 100644 --- a/hive-c0re/src/dashboard.rs +++ b/hive-c0re/src/dashboard.rs @@ -57,8 +57,8 @@ pub async fn serve(port: u16, coord: Arc) -> Result<()> { .route("/request-spawn", post(post_request_spawn)) .route("/op-send", post(post_op_send)) .route("/meta-update", post(post_meta_update)) - .route("/messages/stream", get(messages_stream)) - .route("/messages/history", get(messages_history)) + .route("/dashboard/stream", get(dashboard_stream)) + .route("/dashboard/history", get(dashboard_history)) .route("/static/hive-fr0nt.js", get(serve_shared_js)) .with_state(AppState { coord }); let addr = SocketAddr::from(([0, 0, 0, 0], port)); @@ -73,7 +73,7 @@ pub async fn serve(port: u16, coord: Arc) -> Result<()> { // (static) shell; `GET /static/*` serves the CSS + JS app; `GET /api/state` // returns the current snapshot as JSON. The JS app fetches state on load, // re-fetches after every async-form submit, and listens on -// `/messages/stream` for broker traffic. +// `/dashboard/stream` for the unified live event channel. // --------------------------------------------------------------------------- /// `SO_REUSEADDR` bind with retry. Mirrors the per-agent variant — @@ -293,13 +293,13 @@ async fn api_state(headers: HeaderMap, State(state): State) -> axum::J .unwrap_or("localhost"); let hostname = host.split(':').next().unwrap_or(host).to_owned(); - // Capture the broker seq *before* any read so the dedupe contract - // is "events with seq > snapshot.seq are post-snapshot, never - // missed." A broker event landing during snapshot construction may - // be doubly applied (snapshot caught the write + client also - // applies the SSE event) — that's a renderer's problem to make - // idempotent, not ours to avoid here. - let seq = state.coord.broker.current_seq(); + // Capture the unified dashboard-channel seq *before* any read so the + // dedupe contract is "events with seq > snapshot.seq are + // post-snapshot, never missed." An event landing during snapshot + // construction may be doubly applied (snapshot caught the write + + // client also applies the SSE frame) — that's a renderer's problem + // to make idempotent, not ours to avoid here. + let seq = state.coord.current_seq(); let raw_containers = log_default("nixos-container list", lifecycle::list().await); let current_rev = crate::auto_update::current_flake_rev(&state.coord.hyperhive_flake); @@ -720,36 +720,58 @@ fn dir_size_bytes(root: &Path) -> u64 { total } -async fn messages_history(State(state): State) -> Response { - // Backfill source for the dashboard message-flow terminal. Returns - // up to ~200 historical broker messages as `MessageEvent::Sent` JSON - // wrapped in `{ seq, events }`. The seq is the broker's high water - // mark at fetch time; clients use it to dedupe their buffered live - // SSE traffic (drop anything with `seq <= history_seq`) so a message +async fn dashboard_history(State(state): State) -> Response { + // Backfill source for the dashboard terminal. Returns up to ~200 + // historical broker messages (no other event kinds are persisted) + // converted to `DashboardEvent::Sent` JSON so the client can replay + // through the same dispatch path as live frames. Wrapped in + // `{ seq, events }`: the seq is the dashboard channel's high-water + // mark at fetch time. Clients use it to dedupe their buffered live + // SSE traffic (drop anything with `seq <= history_seq`) so a frame // that lands between SSE-subscribe and history-fetch isn't shown - // twice and isn't lost. + // twice and isn't lost. Historical rows carry `seq = 0`; the + // boundary seq is what closes the dedupe window. const HISTORY_LIMIT: u64 = 200; - // Capture seq *before* the query so the dedupe contract is - // "drop buffered events you've already seen in history" — never - // "lose an event that fired between the read and the timestamp." - let seq = state.coord.broker.current_seq(); + let seq = state.coord.current_seq(); match state.coord.broker.recent_all(HISTORY_LIMIT) { - Ok(mut events) => { - // recent_all returns newest-first; reverse so the replay - // builds chronologically (matches the agent /events/history). - events.reverse(); + Ok(mut messages) => { + messages.reverse(); + let events: Vec = messages + .into_iter() + .map(|m| match m { + crate::broker::MessageEvent::Sent { from, to, body, at } => { + crate::dashboard_events::DashboardEvent::Sent { + seq: 0, + from, + to, + body, + at, + } + } + crate::broker::MessageEvent::Delivered { from, to, body, at } => { + crate::dashboard_events::DashboardEvent::Delivered { + seq: 0, + from, + to, + body, + at, + } + } + }) + .collect(); axum::Json(serde_json::json!({ "seq": seq, "events": events })).into_response() } - Err(e) => error_response(&format!("messages/history failed: {e:#}")), + Err(e) => error_response(&format!("dashboard/history failed: {e:#}")), } } -async fn messages_stream( +async fn dashboard_stream( State(state): State, ) -> Sse>> { - let rx = state.coord.broker.subscribe(); + let rx = state.coord.dashboard_subscribe(); let stream = BroadcastStream::new(rx).filter_map(|res| { - // Drop lagged events. Browsers reconnect; nothing to do here. + // Drop lagged frames. Browsers reconnect; the seq dedupe on + // reconnect skips any frame already reflected in the snapshot. let event = res.ok()?; let json = serde_json::to_string(&event).ok()?; Some(Ok(Event::default().data(json))) diff --git a/hive-c0re/src/dashboard_events.rs b/hive-c0re/src/dashboard_events.rs new file mode 100644 index 0000000..fc8305f --- /dev/null +++ b/hive-c0re/src/dashboard_events.rs @@ -0,0 +1,47 @@ +//! Unified dashboard event channel. +//! +//! Anything the browser wants to react to in near-real-time flows through +//! `Coordinator.dashboard_events`. Each event is stamped with a monotonic +//! per-process `seq` so the client can dedupe its buffered live traffic +//! against snapshot/history responses (drop frames with +//! `seq <= snapshot.seq`). +//! +//! Why one channel instead of one-per-domain: browsers cap concurrent +//! SSE connections per origin (~6 in chrome) and dispatch-by-kind on the +//! client is a one-liner. Splits get reserved for high-volume sub-streams +//! that most consumers don't care about (none yet). +//! +//! Message-broker traffic (`Sent` / `Delivered`) lives on this channel +//! too. A background forwarder task in `main.rs` subscribes to the broker +//! and re-emits each `MessageEvent` as a `DashboardEvent::Sent` / +//! `DashboardEvent::Delivered` with a freshly-stamped seq. Keeping the +//! broker's intra-process channel separate avoids coupling the broker +//! (used by `recv_blocking` inside the harness loop) to dashboard +//! presentation concerns. +//! +//! New mutation kinds (approval added/resolved, question added/answered, +//! transient changed, etc.) land here as additional variants. The client +//! dispatches by `kind` and updates the relevant section. + +use serde::Serialize; + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "snake_case", tag = "kind")] +pub enum DashboardEvent { + /// Broker `Sent` event mirrored onto the dashboard channel. + Sent { + seq: u64, + from: String, + to: String, + body: String, + at: i64, + }, + /// Broker `Delivered` event mirrored onto the dashboard channel. + Delivered { + seq: u64, + from: String, + to: String, + body: String, + at: i64, + }, +} diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 489dbac..97c5adf 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -14,6 +14,7 @@ mod client; mod coordinator; mod crash_watch; mod dashboard; +mod dashboard_events; mod events_vacuum; mod forge; mod lifecycle; @@ -170,6 +171,12 @@ async fn main() -> Result<()> { // Reminder scheduler: drains due reminders + handles // file_path payload persistence. See reminder_scheduler.rs. reminder_scheduler::spawn(coord.clone()); + // Forward every broker event onto the unified dashboard + // channel with a freshly-stamped seq, so the dashboard SSE + // sees broker messages + future mutation events on one + // stream with one monotonic seq. The broker's intra-process + // channel (used by `recv_blocking`) stays untouched. + spawn_broker_to_dashboard_forwarder(coord.clone()); let dash_coord = coord.clone(); tokio::spawn(async move { if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await { @@ -202,6 +209,46 @@ async fn main() -> Result<()> { } } +/// Re-emit every broker `MessageEvent` onto the dashboard channel as +/// a `DashboardEvent::Sent` / `Delivered` with a freshly-stamped seq. +/// Background task; runs for the life of the process. On a lagged +/// broker subscription we just keep going — the dashboard channel is +/// best-effort presentation plumbing, the broker keeps its own sqlite +/// log for replay. +fn spawn_broker_to_dashboard_forwarder(coord: Arc) { + use broker::MessageEvent; + use dashboard_events::DashboardEvent; + let mut rx = coord.broker.subscribe(); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(MessageEvent::Sent { from, to, body, at }) => { + coord.emit_dashboard_event(DashboardEvent::Sent { + seq: coord.next_seq(), + from, + to, + body, + at, + }); + } + Ok(MessageEvent::Delivered { from, to, body, at }) => { + coord.emit_dashboard_event(DashboardEvent::Delivered { + seq: coord.next_seq(), + from, + to, + body, + at, + }); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!(skipped = n, "broker-to-dashboard forwarder lagged"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); +} + fn render(resp: HostResponse) -> Result<()> { println!("{}", serde_json::to_string_pretty(&resp)?); if !resp.ok {