dashboard: msgflow uses shared terminal + backfill via /messages/history

This commit is contained in:
müde 2026-05-17 11:56:29 +02:00
parent f27108aecf
commit 8c186d4fb7
5 changed files with 116 additions and 72 deletions

View file

@ -955,17 +955,19 @@
refreshState();
NOTIF.bind();
// ─── message flow SSE ───────────────────────────────────────────────────
// ─── message flow: shared terminal pane ────────────────────────────────
// Scroll, pill, backfill + SSE plumbing live in hive-fr0nt::TERMINAL_JS
// (window.HiveTerminal). What stays here is the broker-message
// renderer + the page-local side effects (banner pulse, inbox refresh
// on operator-bound traffic, OS notifications).
(() => {
const flow = $('msgflow');
if (!flow) return;
if (!flow || !window.HiveTerminal) return;
flow.innerHTML = '';
const es = new EventSource('/messages/stream');
const MAX_ROWS = 200;
const tsFmt = (n) => new Date(n * 1000).toISOString().slice(11, 19);
// Animate the banner whenever a broker event lands. Each event nudges
// the shimmer window; if traffic stops, the class falls off after the
// grace timer.
// Pulse the page banner whenever a broker event lands. Each event
// nudges the shimmer window; if traffic stops, the class falls off
// after the grace timer.
const banner = document.querySelector('.banner');
let bannerOffTimer = null;
function pulseBanner() {
@ -974,40 +976,38 @@
if (bannerOffTimer) clearTimeout(bannerOffTimer);
bannerOffTimer = setTimeout(() => banner.classList.remove('active'), 4000);
}
es.onmessage = (e) => {
let m;
try { m = JSON.parse(e.data); } catch { return; }
pulseBanner();
// Live-update the inbox when claude sends to operator + ping
// the OS notification center.
if (m.kind === 'sent' && m.to === 'operator') {
refreshState();
NOTIF.show(
'◆ ' + m.from + ' → operator',
String(m.body || '').slice(0, 200),
// Unique-per-arrival tag so a burst stacks instead of
// overwriting itself in the OS notification center.
'hyperhive:msg:' + m.at + ':' + Math.random().toString(36).slice(2, 6),
);
}
const row = document.createElement('div');
row.className = 'msgrow ' + m.kind;
const kind = m.kind === 'sent' ? '→' : '✓';
row.innerHTML =
'<span class="msg-ts">' + tsFmt(m.at) + '</span>' +
'<span class="msg-arrow">' + kind + '</span>' +
'<span class="msg-from">' + esc(m.from) + '</span>' +
function renderMsg(ev, api, glyph) {
const el = api.row('msgrow ' + ev.kind, '');
el.innerHTML =
'<span class="msg-ts">' + tsFmt(ev.at) + '</span>' +
'<span class="msg-arrow">' + glyph + '</span>' +
'<span class="msg-from">' + esc(ev.from) + '</span>' +
'<span class="msg-sep">→</span>' +
'<span class="msg-to">' + esc(m.to) + '</span>' +
'<span class="msg-body">' + esc(m.body) + '</span>';
flow.insertBefore(row, flow.firstChild);
while (flow.childNodes.length > MAX_ROWS) flow.removeChild(flow.lastChild);
};
es.onerror = () => {
flow.insertBefore(Object.assign(document.createElement('div'), {
className: 'msgrow meta', textContent: '[connection lost — retrying]',
}), flow.firstChild);
};
'<span class="msg-to">' + esc(ev.to) + '</span>' +
'<span class="msg-body">' + esc(ev.body) + '</span>';
}
HiveTerminal.create({
logEl: flow,
historyUrl: '/messages/history',
streamUrl: '/messages/stream',
renderers: {
sent: (ev, api) => renderMsg(ev, api, '→'),
delivered: (ev, api) => renderMsg(ev, api, '✓'),
},
onLiveEvent: (ev) => {
pulseBanner();
if (ev.kind === 'sent' && ev.to === 'operator') {
refreshState();
NOTIF.show(
'◆ ' + ev.from + ' → operator',
String(ev.body || '').slice(0, 200),
// Unique-per-arrival tag so a burst stacks instead of
// overwriting itself in the OS notification center.
'hyperhive:msg:' + ev.at + ':' + Math.random().toString(36).slice(2, 6),
);
}
},
});
})();
// ─── compose: @-mention with sticky recipient ───────────────────────────

View file

@ -537,43 +537,28 @@ summary:hover { color: var(--purple); }
.inbox .msg-from { color: var(--amber); }
.inbox .msg-sep { color: var(--muted); }
.inbox .msg-body { color: var(--fg); white-space: pre-wrap; word-break: break-word; }
.msgflow {
background: rgba(24, 24, 37, 0.78);
-webkit-backdrop-filter: blur(8px) saturate(120%);
backdrop-filter: blur(8px) saturate(120%);
border: 1px solid var(--border);
padding: 0.8em;
font-size: 0.85em;
line-height: 1.5;
max-height: 32em;
overflow-y: auto;
}
.msgflow .msgrow {
animation: row-fade-in 220ms ease-out both;
}
@keyframes row-fade-in {
from { opacity: 0; transform: translateY(4px); }
to { opacity: 1; transform: translateY(0); }
}
.msgrow { display: grid; grid-template-columns: auto auto auto auto auto 1fr; gap: 0.6em; align-items: baseline; padding: 0.1em 0; }
.msgrow.sent .msg-arrow { color: var(--cyan); }
.msgrow.delivered .msg-arrow { color: var(--green); }
/* `#msgflow` is a shared `.live` pane inside `.terminal-wrap` (see
hive-fr0nt::TERMINAL_CSS). The msgrow / msg-* rules below are
dashboard-specific: each broker event becomes a grid of timestamp +
arrow + from/sep/to + body inside the `.row` shell. */
.live .msgrow { display: grid; grid-template-columns: auto auto auto auto auto 1fr; gap: 0.6em; align-items: baseline; padding: 0.1em 0; }
.live .msgrow.sent .msg-arrow { color: var(--cyan); }
.live .msgrow.delivered .msg-arrow { color: var(--green); }
.msg-ts { color: var(--muted); font-size: 0.85em; }
.msg-arrow { font-weight: bold; }
.msg-from { color: var(--amber); }
.msg-sep { color: var(--muted); }
.msg-to { color: var(--pink); }
.msg-body { color: var(--fg); white-space: pre-wrap; word-break: break-word; }
/* Compose box sits inside `.terminal-wrap`, below the `.live` log. The
dashed separator mirrors the agent terminal's prompt divider. */
.op-compose {
position: relative;
display: flex;
align-items: flex-start;
gap: 0.6em;
margin-top: 0.4em;
padding: 0.55em 0.8em;
background: rgba(24, 24, 37, 0.85);
border: 1px solid var(--border);
border-top: none;
border-top: 1px dashed var(--purple-dim);
}
.op-compose-prompt {
color: var(--purple);

View file

@ -61,13 +61,15 @@
<h2>◆ MESS4GE FL0W ◆</h2>
<div class="divider">══════════════════════════════════════════════════════════════</div>
<p class="meta">live tail — newest at the top. tap on every <code>send</code> / <code>recv</code> through the broker. compose below: <code>@name</code> picks the recipient (sticky until you @ someone else); <code>tab</code> completes.</p>
<div id="msgflow" class="msgflow"><span class="meta">connecting…</span></div>
<div id="op-compose" class="op-compose">
<span id="op-compose-prompt" class="op-compose-prompt">@—&gt;</span>
<textarea id="op-compose-input" class="op-compose-input"
placeholder="@agent message… (enter sends, shift+enter newline, tab completes @-mention)"
rows="1" autocomplete="off"></textarea>
<div id="op-compose-suggest" class="op-compose-suggest" hidden></div>
<div class="terminal-wrap">
<div id="msgflow" class="live"><div class="meta">connecting…</div></div>
<div id="op-compose" class="op-compose">
<span id="op-compose-prompt" class="op-compose-prompt">@—&gt;</span>
<textarea id="op-compose-input" class="op-compose-input"
placeholder="@agent message… (enter sends, shift+enter newline, tab completes @-mention)"
rows="1" autocomplete="off"></textarea>
<div id="op-compose-suggest" class="op-compose-suggest" hidden></div>
</div>
</div>
<footer>
@ -75,6 +77,7 @@
<p>▲△▲ <a href="https://git.berlin.ccc.de/vinzenz/hyperhive">hyperhive</a> ▲△▲ hive-c0re on this host ▲△▲</p>
</footer>
<script src="/static/hive-fr0nt.js" defer></script>
<script src="/static/app.js" defer></script>
</body>
</html>

View file

@ -129,6 +129,36 @@ impl Broker {
.map_err(Into::into)
}
/// Latest `limit` messages across every recipient, newest-first.
/// Backs the dashboard's message-flow backfill so a reload doesn't
/// blank the operator's view of recent traffic. Returns each row as
/// a [`MessageEvent::Sent`] so the dashboard's live renderer (which
/// already speaks `MessageEvent`) can replay history through the
/// same code path. We don't synthesise `Delivered` events here —
/// the recv-side acks live in a different table column and would
/// double-render on backfill; the live stream picks them up
/// immediately on the first new `recv`.
pub fn recent_all(&self, limit: u64) -> Result<Vec<MessageEvent>> {
let conn = self.conn.lock().unwrap();
let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX);
let mut stmt = conn.prepare(
"SELECT sender, recipient, body, sent_at
FROM messages
ORDER BY id DESC
LIMIT ?1",
)?;
let rows = stmt.query_map(params![limit_i], |row| {
Ok(MessageEvent::Sent {
from: row.get(0)?,
to: row.get(1)?,
body: row.get(2)?,
at: row.get(3)?,
})
})?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.map_err(Into::into)
}
/// Number of undelivered messages addressed to `recipient`. Non-mutating
/// — used by the harness to surface "N unread" in tool-result status
/// lines without popping the queue.

View file

@ -59,6 +59,8 @@ pub async fn serve(port: u16, coord: Arc<Coordinator>) -> Result<()> {
.route("/op-send", post(post_op_send))
.route("/meta-update", post(post_meta_update))
.route("/messages/stream", get(messages_stream))
.route("/messages/history", get(messages_history))
.route("/static/hive-fr0nt.js", get(serve_shared_js))
.with_state(AppState { coord });
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = bind_with_retry(addr).await?;
@ -133,6 +135,13 @@ async fn serve_app_js() -> impl IntoResponse {
)
}
async fn serve_shared_js() -> impl IntoResponse {
(
[("content-type", "application/javascript")],
hive_fr0nt::TERMINAL_JS,
)
}
#[derive(Serialize)]
struct StateSnapshot {
hostname: String,
@ -699,6 +708,23 @@ fn dir_size_bytes(root: &Path) -> u64 {
total
}
async fn messages_history(State(state): State<AppState>) -> Response {
// Backfill source for the dashboard message-flow terminal. Returns
// up to ~200 historical broker messages as `MessageEvent::Sent` JSON
// — same shape as the live `/messages/stream`, so the renderer
// doesn't branch on history vs. live.
const HISTORY_LIMIT: u64 = 200;
match state.coord.broker.recent_all(HISTORY_LIMIT) {
Ok(mut events) => {
// recent_all returns newest-first; reverse so the replay
// builds chronologically (matches the agent /events/history).
events.reverse();
axum::Json(events).into_response()
}
Err(e) => error_response(&format!("messages/history failed: {e:#}")),
}
}
async fn messages_stream(
State(state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {