From 9133d9e1a3b7e58bcd8d7d3dae998051c6a49f96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?m=C3=BCde?= Date: Fri, 15 May 2026 00:13:34 +0200 Subject: [PATCH] Phase 7b: broker broadcast + dashboard SSE message-flow tail; pkgs.git in module --- Cargo.toml | 2 + hive-c0re/Cargo.toml | 1 + hive-c0re/src/broker.rs | 51 ++++++++++++++++++++++-- hive-c0re/src/dashboard.rs | 80 +++++++++++++++++++++++++++++++++++++- nix/modules/hive-c0re.nix | 5 ++- 5 files changed, 133 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a01086c..947b050 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,9 @@ tokio = { version = "1", features = [ "process", "rt-multi-thread", "signal", + "sync", "time", ] } +tokio-stream = { version = "0.1", features = ["sync"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/hive-c0re/Cargo.toml b/hive-c0re/Cargo.toml index f48ad29..f6b8a14 100644 --- a/hive-c0re/Cargo.toml +++ b/hive-c0re/Cargo.toml @@ -16,5 +16,6 @@ serde.workspace = true serde_json.workspace = true similar.workspace = true tokio.workspace = true +tokio-stream.workspace = true tracing.workspace = true tracing-subscriber.workspace = true diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 98719ee..dbff019 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -1,4 +1,5 @@ -//! Sqlite-backed message broker. Survives `hive-c0re` restart. +//! Sqlite-backed message broker. Survives `hive-c0re` restart, and taps every +//! send/recv onto a broadcast channel so the dashboard can stream it. use std::path::Path; use std::sync::Mutex; @@ -7,6 +8,8 @@ use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result}; use hive_sh4re::Message; use rusqlite::{Connection, OptionalExtension, params}; +use serde::Serialize; +use tokio::sync::broadcast; const SCHEMA: &str = r" CREATE TABLE IF NOT EXISTS messages ( @@ -21,8 +24,30 @@ CREATE INDEX IF NOT EXISTS idx_messages_undelivered ON messages (recipient, id) WHERE delivered_at IS NULL; "; +/// Capacity of the live event channel. Slow subscribers (e.g. an idle browser) +/// may drop events past this; we send a `lagged` notice in their stream. +const EVENT_CHANNEL: usize = 256; + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "snake_case", tag = "kind")] +pub enum MessageEvent { + Sent { + from: String, + to: String, + body: String, + at: i64, + }, + Delivered { + from: String, + to: String, + body: String, + at: i64, + }, +} + pub struct Broker { conn: Mutex, + events: broadcast::Sender, } impl Broker { @@ -31,20 +56,33 @@ impl Broker { std::fs::create_dir_all(parent) .with_context(|| format!("create db parent {}", parent.display()))?; } - let conn = - Connection::open(path).with_context(|| format!("open broker db {}", path.display()))?; + let conn = Connection::open(path) + .with_context(|| format!("open broker db {}", path.display()))?; conn.execute_batch(SCHEMA).context("apply broker schema")?; + let (events, _) = broadcast::channel(EVENT_CHANNEL); Ok(Self { conn: Mutex::new(conn), + events, }) } + pub fn subscribe(&self) -> broadcast::Receiver { + self.events.subscribe() + } + pub fn send(&self, message: &Message) -> Result<()> { let conn = self.conn.lock().unwrap(); conn.execute( "INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)", params![message.from, message.to, message.body, now_unix()], )?; + drop(conn); + let _ = self.events.send(MessageEvent::Sent { + from: message.from.clone(), + to: message.to.clone(), + body: message.body.clone(), + at: now_unix(), + }); Ok(()) } @@ -68,6 +106,13 @@ impl Broker { "UPDATE messages SET delivered_at = ?1 WHERE id = ?2", params![now_unix(), id], )?; + drop(conn); + let _ = self.events.send(MessageEvent::Delivered { + from: from.clone(), + to: to.clone(), + body: body.clone(), + at: now_unix(), + }); Ok(Some(Message { from, to, body })) } } diff --git a/hive-c0re/src/dashboard.rs b/hive-c0re/src/dashboard.rs index 535b981..5f24bfa 100644 --- a/hive-c0re/src/dashboard.rs +++ b/hive-c0re/src/dashboard.rs @@ -2,6 +2,7 @@ //! container's web UI), pending approvals (with unified diff vs the applied //! repo, plus approve/deny buttons), and the manager. +use std::convert::Infallible; use std::fmt::Write as _; use std::net::SocketAddr; use std::path::Path; @@ -12,11 +13,16 @@ use axum::{ Router, extract::{Path as AxumPath, State}, http::{HeaderMap, StatusCode}, - response::{Html, IntoResponse, Redirect, Response}, + response::{ + Html, IntoResponse, Redirect, Response, + sse::{Event, KeepAlive, Sse}, + }, routing::{get, post}, }; use hive_sh4re::Approval; use tokio::process::Command; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::{Stream, StreamExt}; use crate::actions; use crate::coordinator::Coordinator; @@ -34,6 +40,7 @@ pub async fn serve(port: u16, coord: Arc) -> Result<()> { .route("/", get(index)) .route("/approve/{id}", post(post_approve)) .route("/deny/{id}", post(post_deny)) + .route("/messages/stream", get(messages_stream)) .with_state(AppState { coord }); let addr = SocketAddr::from(([0, 0, 0, 0], port)); let listener = tokio::net::TcpListener::bind(addr) @@ -56,11 +63,24 @@ async fn index(headers: HeaderMap, State(state): State) -> Html\n\n\n\nhyperhive // h1ve-c0re\n{STYLE}\n\n\n{BANNER}\n{containers}\n{approvals_html}\n{FOOTER}\n\n\n", + "\n\n\n\nhyperhive // h1ve-c0re\n{STYLE}\n\n\n{BANNER}\n{containers}\n{approvals_html}\n{MSG_FLOW}\n{FOOTER}\n{MSG_FLOW_JS}\n\n\n", containers = render_containers(&containers, &hostname), )) } +async fn messages_stream( + State(state): State, +) -> Sse>> { + let rx = state.coord.broker.subscribe(); + let stream = BroadcastStream::new(rx).filter_map(|res| { + // Drop lagged events. Browsers reconnect; nothing to do here. + let event = res.ok()?; + let json = serde_json::to_string(&event).ok()?; + Some(Ok(Event::default().data(json))) + }); + Sse::new(stream).keep_alive(KeepAlive::default()) +} + async fn post_approve( State(state): State, AxumPath(id): AxumPath, @@ -185,6 +205,44 @@ const BANNER: &str = r#""#; +const MSG_FLOW: &str = r#"

◆ MESS4GE FL0W ◆

+
══════════════════════════════════════════════════════════════
+

live tail — newest at the top. tap on every send / recv through the broker.

+
connecting…
"#; + +const MSG_FLOW_JS: &str = r#""#; + const FOOTER: &str = r#"