agent ui: event-driven status / model / token_usage / turn_state

new LiveEvent variants on the per-agent bus —
status_changed / model_changed / token_usage_changed /
turn_state_changed — replace the per-agent web UI's
/api/state polling for the badge row.

emit sites:
- Bus::set_model → model_changed
- Bus::record_usage → token_usage_changed
- Bus::set_state → turn_state_changed
- turn::wait_for_login → status_changed("online") on creds detect
- post_login_start / post_login_cancel → status_changed("needs_login_*")

per-agent endpoints (post_set_model / post_compact / post_new_session
/ post_cancel_turn / post_login_*) now all return 200; client
drops the post-submit refetch except on login transitions, which
still need /api/state to render the OAuth form + session stream.

client adds dispatch on the four new event kinds, threads
`currentLabel` through so the composer re-enables on a live
status flip, and no longer fires refreshState() from turn_end or
postModel — the events carry the same signal faster.

closes the per-agent half of the dashboard event-channel
refactor; TODO entry dropped.
This commit is contained in:
müde 2026-05-17 22:49:55 +02:00
parent b444dac6e8
commit 39d8359c10
7 changed files with 120 additions and 22 deletions

View file

@ -23,7 +23,6 @@
- **Reminder delivery-error surface**: `reminder_scheduler::tick` logs failed deliveries but doesn't persist. Add `last_error TEXT, attempt_count INTEGER` columns + a banner on the dashboard row + a "retry" affordance. Needs a sqlite migration (idempotent ALTER TABLE). - **Reminder delivery-error surface**: `reminder_scheduler::tick` logs failed deliveries but doesn't persist. Add `last_error TEXT, attempt_count INTEGER` columns + a banner on the dashboard row + a "retry" affordance. Needs a sqlite migration (idempotent ALTER TABLE).
- **Per-agent reminder status / query interface**: surface pending vs. delivered counts per agent (manager + each sub-agent) as a small chip on the container row. - **Per-agent reminder status / query interface**: surface pending vs. delivered counts per agent (manager + each sub-agent) as a small chip on the container row.
- **Phase 6 follow-ups** — dashboard side is fully event-driven (Phase 6 leftovers landed); the per-agent web UI's lifecycle endpoints (`/api/{cancel,compact,model,new-session}`, `/login/*`) still 303-redirect-and-poll. Convert them to 200 + `data-no-refresh` so the per-agent page stops refetching `/api/state` on every operator click — `LiveEvent::Note` already covers cancel/compact/model/new-session, login state needs its own `NeedsLogin` / `LoggedIn` events on the per-agent bus.
- **Tombstones + meta_inputs events**: not yet event-derived. PURG3 + meta-update still trigger a post-submit `/api/state` refetch on the dashboard. Add `TombstoneAdded`/`TombstoneRemoved` + `MetaInputsChanged` so those forms can drop their refetch too and the cold-load is the only `/api/state` fetch in normal operation. - **Tombstones + meta_inputs events**: not yet event-derived. PURG3 + meta-update still trigger a post-submit `/api/state` refetch on the dashboard. Add `TombstoneAdded`/`TombstoneRemoved` + `MetaInputsChanged` so those forms can drop their refetch too and the cold-load is the only `/api/state` fetch in normal operation.
## Security ## Security

View file

@ -168,6 +168,10 @@
// dispatcher to print local-only rows ('help', errors) and to clear // dispatcher to print local-only rows ('help', errors) and to clear
// the terminal on `/clear`. // the terminal on `/clear`.
let termAPI = null; let termAPI = null;
// Label captured from the first /api/state cold load — used by the
// bus-driven `status_changed` handler so it can re-enable the
// composer without waiting for the next snapshot fetch.
let currentLabel = '';
const SLASH_COMMANDS = [ const SLASH_COMMANDS = [
{ name: '/help', desc: 'list slash commands' }, { name: '/help', desc: 'list slash commands' },
@ -192,9 +196,9 @@
const text = await resp.text().catch(() => ''); const text = await resp.text().catch(() => '');
termAPI.row('turn-end-fail', '✗ /model failed: ' + resp.status termAPI.row('turn-end-fail', '✗ /model failed: ' + resp.status
+ (text ? ' — ' + text : '')); + (text ? ' — ' + text : ''));
} else {
refreshState();
} }
// No refreshState — the harness emits `model_changed` on the
// SSE bus and the chip handler picks it up live.
} catch (err) { } catch (err) {
if (termAPI) termAPI.row('turn-end-fail', '✗ /model failed: ' + err); if (termAPI) termAPI.row('turn-end-fail', '✗ /model failed: ' + err);
} }
@ -523,6 +527,7 @@
if (!resp.ok) throw new Error('http ' + resp.status); if (!resp.ok) throw new Error('http ' + resp.status);
const s = await resp.json(); const s = await resp.json();
if (!headerSet) { setHeader(s.label, s.dashboard_port); headerSet = true; } if (!headerSet) { setHeader(s.label, s.dashboard_port); headerSet = true; }
currentLabel = s.label;
renderTermInput(s.label, s.status === 'online'); renderTermInput(s.label, s.status === 'online');
renderInbox(s.inbox || []); renderInbox(s.inbox || []);
// Authoritative state comes from the harness via /api/state. // Authoritative state comes from the harness via /api/state.
@ -725,8 +730,6 @@
openTurnsFromHistory = Math.max(0, openTurnsFromHistory - 1); openTurnsFromHistory = Math.max(0, openTurnsFromHistory - 1);
} else { } else {
setBannerActive(false); setState('idle'); setBannerActive(false); setState('idle');
// Login may have just landed (or session re-enters Online).
refreshState();
} }
const cls = ev.ok ? 'turn-end-ok' : 'turn-end-fail'; const cls = ev.ok ? 'turn-end-ok' : 'turn-end-fail';
api.row(cls, api.row(cls,
@ -738,6 +741,38 @@
const v = Object.assign({}, ev); delete v.kind; const v = Object.assign({}, ev); delete v.kind;
renderStream(v, api); renderStream(v, api);
}, },
// Bus-driven state/badges. `status_changed` may also need a
// /api/state refresh to render the login `#status` block
// (which carries the OAuth URL + form), so we kick the
// existing refresh path on that transition. Online → only
// the badge updates; no /api/state fetch needed.
status_changed(ev, api) {
if (api.fromHistory) return;
renderAliveBadge(ev.status);
renderTermInput(currentLabel, ev.status === 'online');
// Login-flow transitions need the #status block rebuilt
// (it carries the OAuth URL + form). The existing
// refreshState path also re-arms the in-progress poll for
// session output streaming. Online → only the badge moves;
// no /api/state fetch is necessary.
if (ev.status !== 'online' && ev.status !== lastStatus) {
refreshState();
} else if (ev.status === 'online' && lastStatus !== 'online') {
// Status block stays as-is or shows the previous
// login UI; clear it so the operator sees a clean
// online state without a separate refetch.
const root = $('status');
if (root) root.innerHTML = '';
lastStatus = 'online';
}
},
model_changed(ev, api) { if (!api.fromHistory) renderModelChip(ev.model); },
token_usage_changed(ev, api) {
if (!api.fromHistory) renderTokenUsage(ev.usage);
},
turn_state_changed(ev, api) {
if (!api.fromHistory) setStateAbs(ev.state, ev.since_unix);
},
}, },
onBackfillDone() { onBackfillDone() {
// If the last replayed turn never closed, the banner shimmer + // If the last replayed turn never closed, the banner shimmer +

View file

@ -101,7 +101,7 @@ async fn main() -> Result<()> {
// Partial-run mode: keep the harness alive (so the web UI // Partial-run mode: keep the harness alive (so the web UI
// stays bound) but don't drive the turn loop. Poll the // stays bound) but don't drive the turn loop. Poll the
// claude dir; once a session lands we enter `serve`. // claude dir; once a session lands we enter `serve`.
turn::wait_for_login(&claude_dir, login_state.clone(), poll_ms).await; turn::wait_for_login(&claude_dir, login_state.clone(), &bus, poll_ms).await;
serve( serve(
&cli.socket, &cli.socket,
Duration::from_millis(poll_ms), Duration::from_millis(poll_ms),

View file

@ -86,7 +86,7 @@ async fn main() -> Result<()> {
.await .await
} }
LoginState::NeedsLogin => { LoginState::NeedsLogin => {
turn::wait_for_login(&claude_dir, login_state, poll_ms).await; turn::wait_for_login(&claude_dir, login_state, &bus, poll_ms).await;
serve( serve(
&cli.socket, &cli.socket,
Duration::from_millis(poll_ms), Duration::from_millis(poll_ms),

View file

@ -118,6 +118,30 @@ pub enum LiveEvent {
/// Turn finished. `ok=false` means claude exited non-zero or the /// Turn finished. `ok=false` means claude exited non-zero or the
/// harness hit a transport error. /// harness hit a transport error.
TurnEnd { ok: bool, note: Option<String> }, TurnEnd { ok: bool, note: Option<String> },
/// Harness reachability flipped: `"online"` /
/// `"needs_login_idle"` / `"needs_login_in_progress"`. The web UI
/// drives the alive badge from this so the operator sees a login
/// land (or get revoked) without polling. Session detail
/// (`url`/`output`/`finished`) is still served by `/api/state`
/// during the short-lived in-progress window — the client
/// re-fetches only while that flow is active.
StatusChanged { status: String },
/// `/api/model` switched the active claude model. The web UI
/// updates the chip + the per-turn stats sink will key off this
/// to mark the boundary in its log.
ModelChanged { model: String },
/// Final-turn `usage` block landed (input + output + cache
/// counters). Powers the context-window badge + accumulates into
/// the per-turn stats sink.
TokenUsageChanged { usage: TokenUsage },
/// Harness's `TurnState` transitioned (idle / thinking /
/// compacting). `since_unix` matches `Bus::state_snapshot().1`
/// so the client's elapsed-time ticker keeps progressing across
/// SSE reconnects without drift.
TurnStateChanged {
state: TurnState,
since_unix: i64,
},
} }
/// sqlite-backed event log. Wraps a `Connection` behind a `Mutex` so the /// sqlite-backed event log. Wraps a `Connection` behind a `Mutex` so the
@ -149,6 +173,10 @@ impl EventStore {
LiveEvent::Stream(_) => "stream", LiveEvent::Stream(_) => "stream",
LiveEvent::Note { .. } => "note", LiveEvent::Note { .. } => "note",
LiveEvent::TurnEnd { .. } => "turn_end", LiveEvent::TurnEnd { .. } => "turn_end",
LiveEvent::StatusChanged { .. } => "status_changed",
LiveEvent::ModelChanged { .. } => "model_changed",
LiveEvent::TokenUsageChanged { .. } => "token_usage_changed",
LiveEvent::TurnStateChanged { .. } => "turn_state_changed",
}; };
let payload = serde_json::to_string(event).unwrap_or_else(|_| "null".into()); let payload = serde_json::to_string(event).unwrap_or_else(|_| "null".into());
let conn = self.conn.lock().unwrap(); let conn = self.conn.lock().unwrap();
@ -216,7 +244,7 @@ impl TokenUsage {
/// reads via `/api/state` and renders. Lives alongside the bus /// reads via `/api/state` and renders. Lives alongside the bus
/// because everyone who has a `Bus` already has the right handle to /// because everyone who has a `Bus` already has the right handle to
/// poke the state on transitions. /// poke the state on transitions.
#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum TurnState { pub enum TurnState {
/// Inbox is empty / waiting on `Recv`. /// Inbox is empty / waiting on `Recv`.
@ -340,11 +368,13 @@ impl Bus {
if let Err(e) = persist_model(&value) { if let Err(e) = persist_model(&value) {
tracing::warn!(error = ?e, "model: persist failed"); tracing::warn!(error = ?e, "model: persist failed");
} }
self.emit(LiveEvent::ModelChanged { model: value });
} }
/// Record the latest token usage from a completed turn. /// Record the latest token usage from a completed turn.
pub fn record_usage(&self, usage: TokenUsage) { pub fn record_usage(&self, usage: TokenUsage) {
*self.last_usage.lock().unwrap() = Some(usage); *self.last_usage.lock().unwrap() = Some(usage);
self.emit(LiveEvent::TokenUsageChanged { usage });
} }
/// Last known token usage, or `None` if no turn has completed yet. /// Last known token usage, or `None` if no turn has completed yet.
@ -356,11 +386,31 @@ impl Bus {
/// Update the harness's authoritative turn-loop state. Records /// Update the harness's authoritative turn-loop state. Records
/// the transition time so `state_snapshot` can return a since-age. /// the transition time so `state_snapshot` can return a since-age.
pub fn set_state(&self, next: TurnState) { pub fn set_state(&self, next: TurnState) {
let mut guard = self.state.lock().unwrap(); let since;
if guard.0 == next { {
return; let mut guard = self.state.lock().unwrap();
if guard.0 == next {
return;
}
*guard = (next, now_unix());
since = guard.1;
} }
*guard = (next, now_unix()); self.emit(LiveEvent::TurnStateChanged {
state: next,
since_unix: since,
});
}
/// Broadcast a status flip (online / needs_login_*). Called by
/// the bin entry points + `turn::wait_for_login` + the
/// `post_login_*` handlers — every site that mutates the
/// `Arc<Mutex<LoginState>>` should also call this so the web UI
/// drops its periodic /api/state poll while a turn loop is
/// running.
pub fn emit_status(&self, status: impl Into<String>) {
self.emit(LiveEvent::StatusChanged {
status: status.into(),
});
} }
/// Current state + since-when (unix seconds). Snapshot copy, no lock held. /// Current state + since-when (unix seconds). Snapshot copy, no lock held.

View file

@ -169,7 +169,12 @@ pub fn emit_turn_end(bus: &Bus, outcome: &TurnOutcome) {
/// Block until the bound `~/.claude/` dir contains a session, polling /// Block until the bound `~/.claude/` dir contains a session, polling
/// `claude_dir` on a `poll_ms` interval (min 2s). Flips `state` to /// `claude_dir` on a `poll_ms` interval (min 2s). Flips `state` to
/// `Online` when login lands; caller resumes its serve loop. /// `Online` when login lands; caller resumes its serve loop.
pub async fn wait_for_login(claude_dir: &Path, state: Arc<Mutex<LoginState>>, poll_ms: u64) { pub async fn wait_for_login(
claude_dir: &Path,
state: Arc<Mutex<LoginState>>,
bus: &Bus,
poll_ms: u64,
) {
tracing::warn!( tracing::warn!(
claude_dir = %claude_dir.display(), claude_dir = %claude_dir.display(),
"no claude session — staying in partial-run mode (web UI only)" "no claude session — staying in partial-run mode (web UI only)"
@ -180,6 +185,7 @@ pub async fn wait_for_login(claude_dir: &Path, state: Arc<Mutex<LoginState>>, po
if login::has_session(claude_dir) { if login::has_session(claude_dir) {
tracing::info!("claude session detected — entering turn loop"); tracing::info!("claude session detected — entering turn loop");
*state.lock().unwrap() = LoginState::Online; *state.lock().unwrap() = LoginState::Online;
bus.emit_status("online");
return; return;
} }
} }

View file

@ -16,7 +16,7 @@ use axum::{
extract::State, extract::State,
http::StatusCode, http::StatusCode,
response::{ response::{
IntoResponse, Redirect, Response, IntoResponse, Response,
sse::{Event, KeepAlive, Sse}, sse::{Event, KeepAlive, Sse},
}, },
routing::{get, post}, routing::{get, post},
@ -388,13 +388,19 @@ async fn post_login_start(State(state): State<AppState>) -> Response {
{ {
let guard = state.session.lock().unwrap(); let guard = state.session.lock().unwrap();
if guard.is_some() { if guard.is_some() {
return Redirect::to("/").into_response(); return (axum::http::StatusCode::OK, "ok").into_response();
} }
} }
match LoginSession::start() { match LoginSession::start() {
Ok(session) => { Ok(session) => {
*state.session.lock().unwrap() = Some(Arc::new(session)); *state.session.lock().unwrap() = Some(Arc::new(session));
Redirect::to("/").into_response() // Flip status from needs_login_idle → needs_login_in_progress
// so the web UI's badge + polling kick in (polling is still
// the right tool for the streaming session output during
// the login flow itself; events drop the poll for
// *everything else*).
state.bus.emit_status("needs_login_in_progress");
(axum::http::StatusCode::OK, "ok").into_response()
} }
Err(e) => error_response(&format!("login start failed: {e:#}")), Err(e) => error_response(&format!("login start failed: {e:#}")),
} }
@ -413,7 +419,7 @@ async fn post_login_code(State(state): State<AppState>, Form(form): Form<CodeFor
if let Err(e) = session.submit_code(&form.code).await { if let Err(e) = session.submit_code(&form.code).await {
return error_response(&format!("submit code failed: {e:#}")); return error_response(&format!("submit code failed: {e:#}"));
} }
Redirect::to("/").into_response() (axum::http::StatusCode::OK, "ok").into_response()
} }
async fn post_login_cancel(State(state): State<AppState>) -> Response { async fn post_login_cancel(State(state): State<AppState>) -> Response {
@ -422,7 +428,9 @@ async fn post_login_cancel(State(state): State<AppState>) -> Response {
session.close_stdin().await; session.close_stdin().await;
session.kill(); session.kill();
} }
Redirect::to("/").into_response() // Back to needs_login_idle (LoginState unchanged, session gone).
state.bus.emit_status("needs_login_idle");
(axum::http::StatusCode::OK, "ok").into_response()
} }
/// Operator-initiated session compaction. Spawns `turn::compact_session` /// Operator-initiated session compaction. Spawns `turn::compact_session`
@ -452,7 +460,7 @@ async fn post_set_model(State(state): State<AppState>, Form(form): Form<ModelFor
text: format!("operator: /model — claude model set to '{name}' for future turns"), text: format!("operator: /model — claude model set to '{name}' for future turns"),
}); });
tracing::info!(%name, "operator set model"); tracing::info!(%name, "operator set model");
Redirect::to("/").into_response() (axum::http::StatusCode::OK, "ok").into_response()
} }
async fn post_compact(State(state): State<AppState>) -> Response { async fn post_compact(State(state): State<AppState>) -> Response {
@ -483,7 +491,7 @@ async fn post_compact(State(state): State<AppState>) -> Response {
}); });
} }
}); });
Redirect::to("/").into_response() (axum::http::StatusCode::OK, "ok").into_response()
} }
/// Cancel the in-flight claude turn. Coarse-grained: shells out /// Cancel the in-flight claude turn. Coarse-grained: shells out
@ -504,7 +512,7 @@ async fn post_new_session(State(state): State<AppState>) -> Response {
state.bus.emit(crate::events::LiveEvent::Note { state.bus.emit(crate::events::LiveEvent::Note {
text: "operator: new session armed — next turn runs without --continue".into(), text: "operator: new session armed — next turn runs without --continue".into(),
}); });
Redirect::to("/").into_response() (axum::http::StatusCode::OK, "ok").into_response()
} }
async fn post_cancel_turn(State(state): State<AppState>) -> Response { async fn post_cancel_turn(State(state): State<AppState>) -> Response {
@ -525,7 +533,7 @@ async fn post_cancel_turn(State(state): State<AppState>) -> Response {
Err(e) => format!("operator: /cancel — pkill failed: {e}"), Err(e) => format!("operator: /cancel — pkill failed: {e}"),
}; };
state.bus.emit(crate::events::LiveEvent::Note { text: note }); state.bus.emit(crate::events::LiveEvent::Note { text: note });
Redirect::to("/").into_response() (axum::http::StatusCode::OK, "ok").into_response()
} }
fn error_response(message: &str) -> Response { fn error_response(message: &str) -> Response {