agent ui: live event panel via SSE + stream-json

This commit is contained in:
müde 2026-05-15 15:01:26 +02:00
parent 3c9d42b2a7
commit 9eab28a716
8 changed files with 277 additions and 33 deletions

View file

@ -3,6 +3,7 @@
//! each instance must bind a distinct port. `HIVE_PORT` is set per agent by
//! `hive-c0re`'s generated per-agent flake (deterministic from agent name).
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
@ -10,11 +11,16 @@ use anyhow::{Context, Result};
use axum::{
Form, Router,
extract::State,
response::{Html, IntoResponse, Redirect, Response},
response::{
Html, IntoResponse, Redirect, Response,
sse::{Event, KeepAlive, Sse},
},
routing::{get, post},
};
use serde::Deserialize;
use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream};
use crate::events::Bus;
use crate::login::LoginState;
use crate::login_session::{LoginSession, drop_if_finished};
@ -28,16 +34,19 @@ struct AppState {
label: String,
login: LoginStateCell,
session: Arc<Mutex<Option<Arc<LoginSession>>>>,
bus: Bus,
}
pub async fn serve(label: String, port: u16, login: LoginStateCell) -> Result<()> {
pub async fn serve(label: String, port: u16, login: LoginStateCell, bus: Bus) -> Result<()> {
let state = AppState {
label,
login,
session: Arc::new(Mutex::new(None)),
bus,
};
let app = Router::new()
.route("/", get(index))
.route("/events/stream", get(events_stream))
.route("/login/start", post(post_login_start))
.route("/login/code", post(post_login_code))
.route("/login/cancel", post(post_login_cancel))
@ -67,9 +76,87 @@ async fn index(State(state): State<AppState>) -> Html<String> {
}
fn render_online() -> String {
"<p class=\"status-online\">▓█▓▒░ harness alive — turn loop running ▓█▓▒░</p>\n<p class=\"meta\">phase 6a placeholder — turn-loop status / inbox / xterm.js coming in 6b+</p>".into()
format!(
"<p class=\"status-online\">▓█▓▒░ harness alive — turn loop running ▓█▓▒░</p>\n{LIVE_PANEL}",
)
}
/// Live event tail rendered into every `/` response when the agent is online.
/// JS opens an `EventSource` on `/events/stream` and appends rows; no full-page
/// reload, so the login flow and other forms aren't clobbered.
const LIVE_PANEL: &str = r#"
<h3>live</h3>
<pre id="live" class="diff"><span class="meta">connecting</span></pre>
<script>
(function() {
const log = document.getElementById('live');
function appendRow(text, cls) {
log.innerHTML = '';
const row = document.createElement('span');
if (cls) row.className = cls;
row.textContent = text + '\n';
log.appendChild(row);
}
function appendLine(text, cls) {
if (log.firstChild && log.firstChild.className === 'meta') log.innerHTML = '';
const row = document.createElement('span');
if (cls) row.className = cls;
row.textContent = text + '\n';
log.appendChild(row);
log.scrollTop = log.scrollHeight;
}
function fmt(ev) {
if (ev.kind === 'turn_start') return ' TURN ' + ev.from + ': ' + ev.body;
if (ev.kind === 'turn_end') return ' TURN END ' + (ev.ok ? 'ok' : 'fail') + (ev.note ? ' ' + ev.note : '');
if (ev.kind === 'note') return '· ' + ev.text;
if (ev.kind === 'stream') {
// serde internal tagging flattens the inner json next to `kind`,
// so the original stream-json event sits under `ev` minus `kind`.
const v = Object.assign({}, ev); delete v.kind;
if (v.type === 'system' && v.subtype === 'init') return '[init] tools=' + (v.tools||[]).length;
if (v.type === 'assistant' && v.message && v.message.content) {
const parts = v.message.content.map(c => {
if (c.type === 'text') return c.text;
if (c.type === 'tool_use') return ' ' + c.name + '(' + JSON.stringify(c.input) + ')';
return c.type;
});
return parts.join('\n');
}
if (v.type === 'user' && v.message && v.message.content) {
const parts = v.message.content.map(c => {
if (c.type === 'tool_result') {
const txt = Array.isArray(c.content) ? c.content.map(p => p.text || '').join(' ') : (c.content || '');
return ' ' + (txt.length > 200 ? txt.slice(0,200) + '…' : txt);
}
return c.type;
});
return parts.join('\n');
}
if (v.type === 'result') return '[done] ' + (v.subtype || '') + (v.is_error ? ' error' : '');
return JSON.stringify(v);
}
return JSON.stringify(ev);
}
function cls(ev) {
if (ev.kind === 'turn_start') return 'turnstart';
if (ev.kind === 'turn_end') return ev.ok ? 'turnok' : 'turnfail';
if (ev.kind === 'note') return 'meta';
return '';
}
const es = new EventSource('/events/stream');
es.onmessage = function(e) {
try {
const ev = JSON.parse(e.data);
appendLine(fmt(ev), cls(ev));
} catch (err) {
appendLine('[parse err] ' + e.data, 'meta');
}
};
es.onerror = function() { appendLine('[disconnected retrying]', 'meta'); };
})();
</script>
"#;
fn render_needs_login_idle() -> String {
"<p class=\"status-needs-login\">▓█▓▒░ NEEDS L0G1N ▓█▓▒░</p>\n<p>No Claude session in <code>~/.claude/</code>. The harness is up but the turn loop is paused until you log in.</p>\n<form method=\"POST\" action=\"/login/start\">\n <button type=\"submit\" class=\"btn btn-login\">◆ ST4RT L0G1N</button>\n</form>\n<p class=\"meta\">Spawns <code>claude auth login</code> over plain stdio pipes. The OAuth URL will appear here when claude emits it; paste the resulting code back into the form below.</p>".into()
}
@ -104,6 +191,18 @@ fn render_login_in_progress(session: &Arc<LoginSession>) -> String {
)
}
async fn events_stream(
State(state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let rx = state.bus.subscribe();
let stream = BroadcastStream::new(rx).filter_map(|res| {
let ev = res.ok()?;
let json = serde_json::to_string(&ev).ok()?;
Some(Ok(Event::default().data(json)))
});
Sse::new(stream).keep_alive(KeepAlive::default())
}
async fn post_login_start(State(state): State<AppState>) -> Response {
drop_if_finished(&state.session);
{
@ -244,5 +343,10 @@ const STYLE: &str = r#"
word-break: break-all;
max-height: 30em;
}
#live { max-height: 24em; overflow-y: auto; }
#live span { display: block; }
#live .turnstart { color: var(--amber); }
#live .turnok { color: var(--green); }
#live .turnfail { color: #ff6b6b; }
</style>
"#;