dashboard events: unified coord channel + /dashboard/{stream,history}; broker forwards
This commit is contained in:
parent
d48cee7c2d
commit
a478792914
6 changed files with 205 additions and 66 deletions
|
|
@ -1,6 +1,6 @@
|
||||||
// Dashboard SPA. Renders containers + approvals from `/api/state`, wires
|
// Dashboard SPA. Renders containers + approvals from `/api/state`, wires
|
||||||
// up async-form submission (URL-encoded POST + spinner + state refresh),
|
// up async-form submission (URL-encoded POST + spinner + state refresh),
|
||||||
// and tails the broker over `/messages/stream` SSE.
|
// and tails the unified dashboard event channel over `/dashboard/stream`.
|
||||||
|
|
||||||
(() => {
|
(() => {
|
||||||
// ─── helpers ────────────────────────────────────────────────────────────
|
// ─── helpers ────────────────────────────────────────────────────────────
|
||||||
|
|
@ -600,7 +600,7 @@
|
||||||
// ─── operator inbox (derived from the broker message stream) ───────────
|
// ─── operator inbox (derived from the broker message stream) ───────────
|
||||||
// No longer shipped on `/api/state.operator_inbox`. The dashboard
|
// No longer shipped on `/api/state.operator_inbox`. The dashboard
|
||||||
// terminal's HiveTerminal feeds this via `onAnyEvent` — backfill from
|
// terminal's HiveTerminal feeds this via `onAnyEvent` — backfill from
|
||||||
// `/messages/history` populates on load, live SSE keeps it current.
|
// `/dashboard/history` populates on load, live SSE keeps it current.
|
||||||
// Newest-first to match the previous behaviour.
|
// Newest-first to match the previous behaviour.
|
||||||
const INBOX_LIMIT = 50;
|
const INBOX_LIMIT = 50;
|
||||||
const operatorInbox = [];
|
const operatorInbox = [];
|
||||||
|
|
@ -1009,8 +1009,8 @@
|
||||||
}
|
}
|
||||||
HiveTerminal.create({
|
HiveTerminal.create({
|
||||||
logEl: flow,
|
logEl: flow,
|
||||||
historyUrl: '/messages/history',
|
historyUrl: '/dashboard/history',
|
||||||
streamUrl: '/messages/stream',
|
streamUrl: '/dashboard/stream',
|
||||||
renderers: {
|
renderers: {
|
||||||
sent: (ev, api) => renderMsg(ev, api, '→'),
|
sent: (ev, api) => renderMsg(ev, api, '→'),
|
||||||
delivered: (ev, api) => renderMsg(ev, api, '✓'),
|
delivered: (ev, api) => renderMsg(ev, api, '✓'),
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,6 @@
|
||||||
|
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
|
|
@ -47,18 +46,21 @@ const EVENT_CHANNEL: usize = 256;
|
||||||
/// self-documenting.
|
/// self-documenting.
|
||||||
pub type DueReminder = (String, i64, String, Option<String>);
|
pub type DueReminder = (String, i64, String, Option<String>);
|
||||||
|
|
||||||
|
/// Intra-process broker event. `recv_blocking` listens on the same
|
||||||
|
/// channel as the dashboard forwarder; the forwarder re-emits each
|
||||||
|
/// event as a `DashboardEvent` with a freshly-stamped seq from the
|
||||||
|
/// Coordinator. The broker itself doesn't stamp seqs — that's a wire
|
||||||
|
/// concern, not a storage concern.
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "snake_case", tag = "kind")]
|
#[serde(rename_all = "snake_case", tag = "kind")]
|
||||||
pub enum MessageEvent {
|
pub enum MessageEvent {
|
||||||
Sent {
|
Sent {
|
||||||
seq: u64,
|
|
||||||
from: String,
|
from: String,
|
||||||
to: String,
|
to: String,
|
||||||
body: String,
|
body: String,
|
||||||
at: i64,
|
at: i64,
|
||||||
},
|
},
|
||||||
Delivered {
|
Delivered {
|
||||||
seq: u64,
|
|
||||||
from: String,
|
from: String,
|
||||||
to: String,
|
to: String,
|
||||||
body: String,
|
body: String,
|
||||||
|
|
@ -69,13 +71,6 @@ pub enum MessageEvent {
|
||||||
pub struct Broker {
|
pub struct Broker {
|
||||||
conn: Mutex<Connection>,
|
conn: Mutex<Connection>,
|
||||||
events: broadcast::Sender<MessageEvent>,
|
events: broadcast::Sender<MessageEvent>,
|
||||||
/// Monotonic per-process counter stamped onto every emitted
|
|
||||||
/// `MessageEvent`. Persisted nowhere — clients always treat a hive-c0re
|
|
||||||
/// restart as "everything is new" (fresh snapshot, fresh stream of
|
|
||||||
/// seqs starting at 1). Historical rows replayed via `recent_all`
|
|
||||||
/// carry `seq = 0` since they predate the live stream the seq is
|
|
||||||
/// meant to dedupe against.
|
|
||||||
event_seq: AtomicU64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Broker {
|
impl Broker {
|
||||||
|
|
@ -91,7 +86,6 @@ impl Broker {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
conn: Mutex::new(conn),
|
conn: Mutex::new(conn),
|
||||||
events,
|
events,
|
||||||
event_seq: AtomicU64::new(0),
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -99,20 +93,6 @@ impl Broker {
|
||||||
self.events.subscribe()
|
self.events.subscribe()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Current high-water seq. Snapshot endpoints read this *before*
|
|
||||||
/// gathering state so the resulting (snapshot.seq, snapshot) pair
|
|
||||||
/// satisfies: any live event with seq > snapshot.seq is post-snapshot
|
|
||||||
/// (not yet reflected); any with seq <= snapshot.seq either pre-dates
|
|
||||||
/// the snapshot or was already captured by it. Clients dedupe their
|
|
||||||
/// buffered SSE traffic against this value.
|
|
||||||
pub fn current_seq(&self) -> u64 {
|
|
||||||
self.event_seq.load(Ordering::SeqCst)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn next_seq(&self) -> u64 {
|
|
||||||
self.event_seq.fetch_add(1, Ordering::SeqCst) + 1
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send(&self, message: &Message) -> Result<()> {
|
pub fn send(&self, message: &Message) -> Result<()> {
|
||||||
let conn = self.conn.lock().unwrap();
|
let conn = self.conn.lock().unwrap();
|
||||||
conn.execute(
|
conn.execute(
|
||||||
|
|
@ -121,7 +101,6 @@ impl Broker {
|
||||||
)?;
|
)?;
|
||||||
drop(conn);
|
drop(conn);
|
||||||
let _ = self.events.send(MessageEvent::Sent {
|
let _ = self.events.send(MessageEvent::Sent {
|
||||||
seq: self.next_seq(),
|
|
||||||
from: message.from.clone(),
|
from: message.from.clone(),
|
||||||
to: message.to.clone(),
|
to: message.to.clone(),
|
||||||
body: message.body.clone(),
|
body: message.body.clone(),
|
||||||
|
|
@ -175,11 +154,6 @@ impl Broker {
|
||||||
)?;
|
)?;
|
||||||
let rows = stmt.query_map(params![limit_i], |row| {
|
let rows = stmt.query_map(params![limit_i], |row| {
|
||||||
Ok(MessageEvent::Sent {
|
Ok(MessageEvent::Sent {
|
||||||
// Historical events: seq=0 (never compared against live
|
|
||||||
// seqs). Live dedupe windows close against
|
|
||||||
// history_seq = broker.current_seq() captured at fetch
|
|
||||||
// time, not against per-row seqs.
|
|
||||||
seq: 0,
|
|
||||||
from: row.get(0)?,
|
from: row.get(0)?,
|
||||||
to: row.get(1)?,
|
to: row.get(1)?,
|
||||||
body: row.get(2)?,
|
body: row.get(2)?,
|
||||||
|
|
@ -287,7 +261,6 @@ impl Broker {
|
||||||
)?;
|
)?;
|
||||||
drop(conn);
|
drop(conn);
|
||||||
let _ = self.events.send(MessageEvent::Delivered {
|
let _ = self.events.send(MessageEvent::Delivered {
|
||||||
seq: self.next_seq(),
|
|
||||||
from: from.clone(),
|
from: from.clone(),
|
||||||
to: to.clone(),
|
to: to.clone(),
|
||||||
body: body.clone(),
|
body: body.clone(),
|
||||||
|
|
@ -364,7 +337,6 @@ impl Broker {
|
||||||
tx.commit()?;
|
tx.commit()?;
|
||||||
drop(conn);
|
drop(conn);
|
||||||
let _ = self.events.send(MessageEvent::Sent {
|
let _ = self.events.send(MessageEvent::Sent {
|
||||||
seq: self.next_seq(),
|
|
||||||
from: "reminder".to_owned(),
|
from: "reminder".to_owned(),
|
||||||
to: agent.to_owned(),
|
to: agent.to_owned(),
|
||||||
body: message.to_owned(),
|
body: message.to_owned(),
|
||||||
|
|
|
||||||
|
|
@ -4,15 +4,23 @@
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
use crate::agent_server::{self, AgentSocket};
|
use crate::agent_server::{self, AgentSocket};
|
||||||
use crate::approvals::Approvals;
|
use crate::approvals::Approvals;
|
||||||
use crate::broker::Broker;
|
use crate::broker::Broker;
|
||||||
|
use crate::dashboard_events::DashboardEvent;
|
||||||
use crate::operator_questions::OperatorQuestions;
|
use crate::operator_questions::OperatorQuestions;
|
||||||
|
|
||||||
|
/// Capacity of the dashboard event channel. Slow browser subscribers
|
||||||
|
/// (idle tab, throttled connection) drop frames past this — that's
|
||||||
|
/// fine, the seq dedupe makes a reconnect resync safe.
|
||||||
|
const DASHBOARD_CHANNEL: usize = 256;
|
||||||
|
|
||||||
const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents";
|
const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents";
|
||||||
const MANAGER_RUNTIME_ROOT: &str = "/run/hyperhive/manager";
|
const MANAGER_RUNTIME_ROOT: &str = "/run/hyperhive/manager";
|
||||||
/// Manager-editable per-agent config repos. Bind-mounted RW into the manager
|
/// Manager-editable per-agent config repos. Bind-mounted RW into the manager
|
||||||
|
|
@ -47,6 +55,15 @@ pub struct Coordinator {
|
||||||
/// Read by the dashboard to render a spinner; cleared when the action
|
/// Read by the dashboard to render a spinner; cleared when the action
|
||||||
/// resolves (success or failure).
|
/// resolves (success or failure).
|
||||||
transient: Mutex<HashMap<String, TransientState>>,
|
transient: Mutex<HashMap<String, TransientState>>,
|
||||||
|
/// Unified wire-facing event channel feeding the dashboard SSE
|
||||||
|
/// stream. Carries broker messages (mirrored from `broker.subscribe`
|
||||||
|
/// by the forwarder task in `main.rs`) and dashboard-only mutation
|
||||||
|
/// events (approval added/resolved, question added/answered, etc.).
|
||||||
|
/// Snapshot endpoints capture `event_seq` before reading state so
|
||||||
|
/// the client can dedupe its buffered live traffic against the
|
||||||
|
/// snapshot.
|
||||||
|
dashboard_events: broadcast::Sender<DashboardEvent>,
|
||||||
|
event_seq: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Per-agent in-progress state that the dashboard surfaces between approve
|
/// Per-agent in-progress state that the dashboard surfaces between approve
|
||||||
|
|
@ -98,6 +115,7 @@ impl Coordinator {
|
||||||
let broker = Broker::open(db_path).context("open broker")?;
|
let broker = Broker::open(db_path).context("open broker")?;
|
||||||
let approvals = Approvals::open(db_path).context("open approvals")?;
|
let approvals = Approvals::open(db_path).context("open approvals")?;
|
||||||
let questions = OperatorQuestions::open(db_path).context("open operator_questions")?;
|
let questions = OperatorQuestions::open(db_path).context("open operator_questions")?;
|
||||||
|
let (dashboard_events, _) = broadcast::channel(DASHBOARD_CHANNEL);
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
broker: Arc::new(broker),
|
broker: Arc::new(broker),
|
||||||
approvals: Arc::new(approvals),
|
approvals: Arc::new(approvals),
|
||||||
|
|
@ -107,9 +125,42 @@ impl Coordinator {
|
||||||
operator_pronouns,
|
operator_pronouns,
|
||||||
agents: Mutex::new(HashMap::new()),
|
agents: Mutex::new(HashMap::new()),
|
||||||
transient: Mutex::new(HashMap::new()),
|
transient: Mutex::new(HashMap::new()),
|
||||||
|
dashboard_events,
|
||||||
|
event_seq: AtomicU64::new(0),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Subscribe to the unified dashboard event channel. Used by the
|
||||||
|
/// `/dashboard/stream` SSE handler and by the broker-to-dashboard
|
||||||
|
/// forwarder task.
|
||||||
|
pub fn dashboard_subscribe(&self) -> broadcast::Receiver<DashboardEvent> {
|
||||||
|
self.dashboard_events.subscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stamp the next sequence number. Each emission of a
|
||||||
|
/// `DashboardEvent` should fill its `seq` with `next_seq()` so the
|
||||||
|
/// frame the wire carries is the one the client uses to dedupe.
|
||||||
|
pub fn next_seq(&self) -> u64 {
|
||||||
|
self.event_seq.fetch_add(1, Ordering::SeqCst) + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Current high-water seq. Snapshot endpoints read this *before*
|
||||||
|
/// gathering state so the (snapshot.seq, snapshot) pair satisfies:
|
||||||
|
/// any frame with `seq > snapshot.seq` is post-snapshot. The seq
|
||||||
|
/// captured here may grow during snapshot construction — clients
|
||||||
|
/// may double-apply such events, which renderers must tolerate.
|
||||||
|
pub fn current_seq(&self) -> u64 {
|
||||||
|
self.event_seq.load(Ordering::SeqCst)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Broadcast a freshly-built `DashboardEvent` (caller fills `seq`
|
||||||
|
/// via `next_seq()`). Returns silently when there are no
|
||||||
|
/// subscribers — the dashboard channel is best-effort presentation
|
||||||
|
/// plumbing, not a delivery guarantee.
|
||||||
|
pub fn emit_dashboard_event(&self, event: DashboardEvent) {
|
||||||
|
let _ = self.dashboard_events.send(event);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn register_agent(self: &Arc<Self>, name: &str) -> Result<PathBuf> {
|
pub fn register_agent(self: &Arc<Self>, name: &str) -> Result<PathBuf> {
|
||||||
// Idempotent: drop any existing listener so re-registration (e.g. on rebuild,
|
// Idempotent: drop any existing listener so re-registration (e.g. on rebuild,
|
||||||
// or after a hive-c0re restart cleared /run/hyperhive) gets a fresh socket.
|
// or after a hive-c0re restart cleared /run/hyperhive) gets a fresh socket.
|
||||||
|
|
|
||||||
|
|
@ -57,8 +57,8 @@ pub async fn serve(port: u16, coord: Arc<Coordinator>) -> Result<()> {
|
||||||
.route("/request-spawn", post(post_request_spawn))
|
.route("/request-spawn", post(post_request_spawn))
|
||||||
.route("/op-send", post(post_op_send))
|
.route("/op-send", post(post_op_send))
|
||||||
.route("/meta-update", post(post_meta_update))
|
.route("/meta-update", post(post_meta_update))
|
||||||
.route("/messages/stream", get(messages_stream))
|
.route("/dashboard/stream", get(dashboard_stream))
|
||||||
.route("/messages/history", get(messages_history))
|
.route("/dashboard/history", get(dashboard_history))
|
||||||
.route("/static/hive-fr0nt.js", get(serve_shared_js))
|
.route("/static/hive-fr0nt.js", get(serve_shared_js))
|
||||||
.with_state(AppState { coord });
|
.with_state(AppState { coord });
|
||||||
let addr = SocketAddr::from(([0, 0, 0, 0], port));
|
let addr = SocketAddr::from(([0, 0, 0, 0], port));
|
||||||
|
|
@ -73,7 +73,7 @@ pub async fn serve(port: u16, coord: Arc<Coordinator>) -> Result<()> {
|
||||||
// (static) shell; `GET /static/*` serves the CSS + JS app; `GET /api/state`
|
// (static) shell; `GET /static/*` serves the CSS + JS app; `GET /api/state`
|
||||||
// returns the current snapshot as JSON. The JS app fetches state on load,
|
// returns the current snapshot as JSON. The JS app fetches state on load,
|
||||||
// re-fetches after every async-form submit, and listens on
|
// re-fetches after every async-form submit, and listens on
|
||||||
// `/messages/stream` for broker traffic.
|
// `/dashboard/stream` for the unified live event channel.
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
/// `SO_REUSEADDR` bind with retry. Mirrors the per-agent variant —
|
/// `SO_REUSEADDR` bind with retry. Mirrors the per-agent variant —
|
||||||
|
|
@ -293,13 +293,13 @@ async fn api_state(headers: HeaderMap, State(state): State<AppState>) -> axum::J
|
||||||
.unwrap_or("localhost");
|
.unwrap_or("localhost");
|
||||||
let hostname = host.split(':').next().unwrap_or(host).to_owned();
|
let hostname = host.split(':').next().unwrap_or(host).to_owned();
|
||||||
|
|
||||||
// Capture the broker seq *before* any read so the dedupe contract
|
// Capture the unified dashboard-channel seq *before* any read so the
|
||||||
// is "events with seq > snapshot.seq are post-snapshot, never
|
// dedupe contract is "events with seq > snapshot.seq are
|
||||||
// missed." A broker event landing during snapshot construction may
|
// post-snapshot, never missed." An event landing during snapshot
|
||||||
// be doubly applied (snapshot caught the write + client also
|
// construction may be doubly applied (snapshot caught the write +
|
||||||
// applies the SSE event) — that's a renderer's problem to make
|
// client also applies the SSE frame) — that's a renderer's problem
|
||||||
// idempotent, not ours to avoid here.
|
// to make idempotent, not ours to avoid here.
|
||||||
let seq = state.coord.broker.current_seq();
|
let seq = state.coord.current_seq();
|
||||||
|
|
||||||
let raw_containers = log_default("nixos-container list", lifecycle::list().await);
|
let raw_containers = log_default("nixos-container list", lifecycle::list().await);
|
||||||
let current_rev = crate::auto_update::current_flake_rev(&state.coord.hyperhive_flake);
|
let current_rev = crate::auto_update::current_flake_rev(&state.coord.hyperhive_flake);
|
||||||
|
|
@ -720,36 +720,58 @@ fn dir_size_bytes(root: &Path) -> u64 {
|
||||||
total
|
total
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn messages_history(State(state): State<AppState>) -> Response {
|
async fn dashboard_history(State(state): State<AppState>) -> Response {
|
||||||
// Backfill source for the dashboard message-flow terminal. Returns
|
// Backfill source for the dashboard terminal. Returns up to ~200
|
||||||
// up to ~200 historical broker messages as `MessageEvent::Sent` JSON
|
// historical broker messages (no other event kinds are persisted)
|
||||||
// wrapped in `{ seq, events }`. The seq is the broker's high water
|
// converted to `DashboardEvent::Sent` JSON so the client can replay
|
||||||
// mark at fetch time; clients use it to dedupe their buffered live
|
// through the same dispatch path as live frames. Wrapped in
|
||||||
// SSE traffic (drop anything with `seq <= history_seq`) so a message
|
// `{ seq, events }`: the seq is the dashboard channel's high-water
|
||||||
|
// mark at fetch time. Clients use it to dedupe their buffered live
|
||||||
|
// SSE traffic (drop anything with `seq <= history_seq`) so a frame
|
||||||
// that lands between SSE-subscribe and history-fetch isn't shown
|
// that lands between SSE-subscribe and history-fetch isn't shown
|
||||||
// twice and isn't lost.
|
// twice and isn't lost. Historical rows carry `seq = 0`; the
|
||||||
|
// boundary seq is what closes the dedupe window.
|
||||||
const HISTORY_LIMIT: u64 = 200;
|
const HISTORY_LIMIT: u64 = 200;
|
||||||
// Capture seq *before* the query so the dedupe contract is
|
let seq = state.coord.current_seq();
|
||||||
// "drop buffered events you've already seen in history" — never
|
|
||||||
// "lose an event that fired between the read and the timestamp."
|
|
||||||
let seq = state.coord.broker.current_seq();
|
|
||||||
match state.coord.broker.recent_all(HISTORY_LIMIT) {
|
match state.coord.broker.recent_all(HISTORY_LIMIT) {
|
||||||
Ok(mut events) => {
|
Ok(mut messages) => {
|
||||||
// recent_all returns newest-first; reverse so the replay
|
messages.reverse();
|
||||||
// builds chronologically (matches the agent /events/history).
|
let events: Vec<crate::dashboard_events::DashboardEvent> = messages
|
||||||
events.reverse();
|
.into_iter()
|
||||||
|
.map(|m| match m {
|
||||||
|
crate::broker::MessageEvent::Sent { from, to, body, at } => {
|
||||||
|
crate::dashboard_events::DashboardEvent::Sent {
|
||||||
|
seq: 0,
|
||||||
|
from,
|
||||||
|
to,
|
||||||
|
body,
|
||||||
|
at,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
crate::broker::MessageEvent::Delivered { from, to, body, at } => {
|
||||||
|
crate::dashboard_events::DashboardEvent::Delivered {
|
||||||
|
seq: 0,
|
||||||
|
from,
|
||||||
|
to,
|
||||||
|
body,
|
||||||
|
at,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
axum::Json(serde_json::json!({ "seq": seq, "events": events })).into_response()
|
axum::Json(serde_json::json!({ "seq": seq, "events": events })).into_response()
|
||||||
}
|
}
|
||||||
Err(e) => error_response(&format!("messages/history failed: {e:#}")),
|
Err(e) => error_response(&format!("dashboard/history failed: {e:#}")),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn messages_stream(
|
async fn dashboard_stream(
|
||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
|
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
|
||||||
let rx = state.coord.broker.subscribe();
|
let rx = state.coord.dashboard_subscribe();
|
||||||
let stream = BroadcastStream::new(rx).filter_map(|res| {
|
let stream = BroadcastStream::new(rx).filter_map(|res| {
|
||||||
// Drop lagged events. Browsers reconnect; nothing to do here.
|
// Drop lagged frames. Browsers reconnect; the seq dedupe on
|
||||||
|
// reconnect skips any frame already reflected in the snapshot.
|
||||||
let event = res.ok()?;
|
let event = res.ok()?;
|
||||||
let json = serde_json::to_string(&event).ok()?;
|
let json = serde_json::to_string(&event).ok()?;
|
||||||
Some(Ok(Event::default().data(json)))
|
Some(Ok(Event::default().data(json)))
|
||||||
|
|
|
||||||
47
hive-c0re/src/dashboard_events.rs
Normal file
47
hive-c0re/src/dashboard_events.rs
Normal file
|
|
@ -0,0 +1,47 @@
|
||||||
|
//! Unified dashboard event channel.
|
||||||
|
//!
|
||||||
|
//! Anything the browser wants to react to in near-real-time flows through
|
||||||
|
//! `Coordinator.dashboard_events`. Each event is stamped with a monotonic
|
||||||
|
//! per-process `seq` so the client can dedupe its buffered live traffic
|
||||||
|
//! against snapshot/history responses (drop frames with
|
||||||
|
//! `seq <= snapshot.seq`).
|
||||||
|
//!
|
||||||
|
//! Why one channel instead of one-per-domain: browsers cap concurrent
|
||||||
|
//! SSE connections per origin (~6 in chrome) and dispatch-by-kind on the
|
||||||
|
//! client is a one-liner. Splits get reserved for high-volume sub-streams
|
||||||
|
//! that most consumers don't care about (none yet).
|
||||||
|
//!
|
||||||
|
//! Message-broker traffic (`Sent` / `Delivered`) lives on this channel
|
||||||
|
//! too. A background forwarder task in `main.rs` subscribes to the broker
|
||||||
|
//! and re-emits each `MessageEvent` as a `DashboardEvent::Sent` /
|
||||||
|
//! `DashboardEvent::Delivered` with a freshly-stamped seq. Keeping the
|
||||||
|
//! broker's intra-process channel separate avoids coupling the broker
|
||||||
|
//! (used by `recv_blocking` inside the harness loop) to dashboard
|
||||||
|
//! presentation concerns.
|
||||||
|
//!
|
||||||
|
//! New mutation kinds (approval added/resolved, question added/answered,
|
||||||
|
//! transient changed, etc.) land here as additional variants. The client
|
||||||
|
//! dispatches by `kind` and updates the relevant section.
|
||||||
|
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "snake_case", tag = "kind")]
|
||||||
|
pub enum DashboardEvent {
|
||||||
|
/// Broker `Sent` event mirrored onto the dashboard channel.
|
||||||
|
Sent {
|
||||||
|
seq: u64,
|
||||||
|
from: String,
|
||||||
|
to: String,
|
||||||
|
body: String,
|
||||||
|
at: i64,
|
||||||
|
},
|
||||||
|
/// Broker `Delivered` event mirrored onto the dashboard channel.
|
||||||
|
Delivered {
|
||||||
|
seq: u64,
|
||||||
|
from: String,
|
||||||
|
to: String,
|
||||||
|
body: String,
|
||||||
|
at: i64,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
@ -14,6 +14,7 @@ mod client;
|
||||||
mod coordinator;
|
mod coordinator;
|
||||||
mod crash_watch;
|
mod crash_watch;
|
||||||
mod dashboard;
|
mod dashboard;
|
||||||
|
mod dashboard_events;
|
||||||
mod events_vacuum;
|
mod events_vacuum;
|
||||||
mod forge;
|
mod forge;
|
||||||
mod lifecycle;
|
mod lifecycle;
|
||||||
|
|
@ -170,6 +171,12 @@ async fn main() -> Result<()> {
|
||||||
// Reminder scheduler: drains due reminders + handles
|
// Reminder scheduler: drains due reminders + handles
|
||||||
// file_path payload persistence. See reminder_scheduler.rs.
|
// file_path payload persistence. See reminder_scheduler.rs.
|
||||||
reminder_scheduler::spawn(coord.clone());
|
reminder_scheduler::spawn(coord.clone());
|
||||||
|
// Forward every broker event onto the unified dashboard
|
||||||
|
// channel with a freshly-stamped seq, so the dashboard SSE
|
||||||
|
// sees broker messages + future mutation events on one
|
||||||
|
// stream with one monotonic seq. The broker's intra-process
|
||||||
|
// channel (used by `recv_blocking`) stays untouched.
|
||||||
|
spawn_broker_to_dashboard_forwarder(coord.clone());
|
||||||
let dash_coord = coord.clone();
|
let dash_coord = coord.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {
|
if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {
|
||||||
|
|
@ -202,6 +209,46 @@ async fn main() -> Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Re-emit every broker `MessageEvent` onto the dashboard channel as
|
||||||
|
/// a `DashboardEvent::Sent` / `Delivered` with a freshly-stamped seq.
|
||||||
|
/// Background task; runs for the life of the process. On a lagged
|
||||||
|
/// broker subscription we just keep going — the dashboard channel is
|
||||||
|
/// best-effort presentation plumbing, the broker keeps its own sqlite
|
||||||
|
/// log for replay.
|
||||||
|
fn spawn_broker_to_dashboard_forwarder(coord: Arc<Coordinator>) {
|
||||||
|
use broker::MessageEvent;
|
||||||
|
use dashboard_events::DashboardEvent;
|
||||||
|
let mut rx = coord.broker.subscribe();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match rx.recv().await {
|
||||||
|
Ok(MessageEvent::Sent { from, to, body, at }) => {
|
||||||
|
coord.emit_dashboard_event(DashboardEvent::Sent {
|
||||||
|
seq: coord.next_seq(),
|
||||||
|
from,
|
||||||
|
to,
|
||||||
|
body,
|
||||||
|
at,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(MessageEvent::Delivered { from, to, body, at }) => {
|
||||||
|
coord.emit_dashboard_event(DashboardEvent::Delivered {
|
||||||
|
seq: coord.next_seq(),
|
||||||
|
from,
|
||||||
|
to,
|
||||||
|
body,
|
||||||
|
at,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
tracing::warn!(skipped = n, "broker-to-dashboard forwarder lagged");
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
fn render(resp: HostResponse) -> Result<()> {
|
fn render(resp: HostResponse) -> Result<()> {
|
||||||
println!("{}", serde_json::to_string_pretty(&resp)?);
|
println!("{}", serde_json::to_string_pretty(&resp)?);
|
||||||
if !resp.ok {
|
if !resp.ok {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue