From 39d8359c10f3967bf4b047d975ca4a70b0d67f25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?m=C3=BCde?= Date: Sun, 17 May 2026 22:49:55 +0200 Subject: [PATCH] agent ui: event-driven status / model / token_usage / turn_state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit new LiveEvent variants on the per-agent bus — status_changed / model_changed / token_usage_changed / turn_state_changed — replace the per-agent web UI's /api/state polling for the badge row. emit sites: - Bus::set_model → model_changed - Bus::record_usage → token_usage_changed - Bus::set_state → turn_state_changed - turn::wait_for_login → status_changed("online") on creds detect - post_login_start / post_login_cancel → status_changed("needs_login_*") per-agent endpoints (post_set_model / post_compact / post_new_session / post_cancel_turn / post_login_*) now all return 200; client drops the post-submit refetch except on login transitions, which still need /api/state to render the OAuth form + session stream. client adds dispatch on the four new event kinds, threads `currentLabel` through so the composer re-enables on a live status flip, and no longer fires refreshState() from turn_end or postModel — the events carry the same signal faster. closes the per-agent half of the dashboard event-channel refactor; TODO entry dropped. --- TODO.md | 1 - hive-ag3nt/assets/app.js | 43 ++++++++++++++++++++--- hive-ag3nt/src/bin/hive-ag3nt.rs | 2 +- hive-ag3nt/src/bin/hive-m1nd.rs | 2 +- hive-ag3nt/src/events.rs | 60 +++++++++++++++++++++++++++++--- hive-ag3nt/src/turn.rs | 8 ++++- hive-ag3nt/src/web_ui.rs | 26 +++++++++----- 7 files changed, 120 insertions(+), 22 deletions(-) diff --git a/TODO.md b/TODO.md index 40515d7..880bf0d 100644 --- a/TODO.md +++ b/TODO.md @@ -23,7 +23,6 @@ - **Reminder delivery-error surface**: `reminder_scheduler::tick` logs failed deliveries but doesn't persist. Add `last_error TEXT, attempt_count INTEGER` columns + a banner on the dashboard row + a "retry" affordance. Needs a sqlite migration (idempotent ALTER TABLE). - **Per-agent reminder status / query interface**: surface pending vs. delivered counts per agent (manager + each sub-agent) as a small chip on the container row. -- **Phase 6 follow-ups** — dashboard side is fully event-driven (Phase 6 leftovers landed); the per-agent web UI's lifecycle endpoints (`/api/{cancel,compact,model,new-session}`, `/login/*`) still 303-redirect-and-poll. Convert them to 200 + `data-no-refresh` so the per-agent page stops refetching `/api/state` on every operator click — `LiveEvent::Note` already covers cancel/compact/model/new-session, login state needs its own `NeedsLogin` / `LoggedIn` events on the per-agent bus. - **Tombstones + meta_inputs events**: not yet event-derived. PURG3 + meta-update still trigger a post-submit `/api/state` refetch on the dashboard. Add `TombstoneAdded`/`TombstoneRemoved` + `MetaInputsChanged` so those forms can drop their refetch too and the cold-load is the only `/api/state` fetch in normal operation. ## Security diff --git a/hive-ag3nt/assets/app.js b/hive-ag3nt/assets/app.js index 375025e..d93f8cf 100644 --- a/hive-ag3nt/assets/app.js +++ b/hive-ag3nt/assets/app.js @@ -168,6 +168,10 @@ // dispatcher to print local-only rows ('help', errors) and to clear // the terminal on `/clear`. let termAPI = null; + // Label captured from the first /api/state cold load — used by the + // bus-driven `status_changed` handler so it can re-enable the + // composer without waiting for the next snapshot fetch. + let currentLabel = ''; const SLASH_COMMANDS = [ { name: '/help', desc: 'list slash commands' }, @@ -192,9 +196,9 @@ const text = await resp.text().catch(() => ''); termAPI.row('turn-end-fail', '✗ /model failed: ' + resp.status + (text ? ' — ' + text : '')); - } else { - refreshState(); } + // No refreshState — the harness emits `model_changed` on the + // SSE bus and the chip handler picks it up live. } catch (err) { if (termAPI) termAPI.row('turn-end-fail', '✗ /model failed: ' + err); } @@ -523,6 +527,7 @@ if (!resp.ok) throw new Error('http ' + resp.status); const s = await resp.json(); if (!headerSet) { setHeader(s.label, s.dashboard_port); headerSet = true; } + currentLabel = s.label; renderTermInput(s.label, s.status === 'online'); renderInbox(s.inbox || []); // Authoritative state comes from the harness via /api/state. @@ -725,8 +730,6 @@ openTurnsFromHistory = Math.max(0, openTurnsFromHistory - 1); } else { setBannerActive(false); setState('idle'); - // Login may have just landed (or session re-enters Online). - refreshState(); } const cls = ev.ok ? 'turn-end-ok' : 'turn-end-fail'; api.row(cls, @@ -738,6 +741,38 @@ const v = Object.assign({}, ev); delete v.kind; renderStream(v, api); }, + // Bus-driven state/badges. `status_changed` may also need a + // /api/state refresh to render the login `#status` block + // (which carries the OAuth URL + form), so we kick the + // existing refresh path on that transition. Online → only + // the badge updates; no /api/state fetch needed. + status_changed(ev, api) { + if (api.fromHistory) return; + renderAliveBadge(ev.status); + renderTermInput(currentLabel, ev.status === 'online'); + // Login-flow transitions need the #status block rebuilt + // (it carries the OAuth URL + form). The existing + // refreshState path also re-arms the in-progress poll for + // session output streaming. Online → only the badge moves; + // no /api/state fetch is necessary. + if (ev.status !== 'online' && ev.status !== lastStatus) { + refreshState(); + } else if (ev.status === 'online' && lastStatus !== 'online') { + // Status block stays as-is or shows the previous + // login UI; clear it so the operator sees a clean + // online state without a separate refetch. + const root = $('status'); + if (root) root.innerHTML = ''; + lastStatus = 'online'; + } + }, + model_changed(ev, api) { if (!api.fromHistory) renderModelChip(ev.model); }, + token_usage_changed(ev, api) { + if (!api.fromHistory) renderTokenUsage(ev.usage); + }, + turn_state_changed(ev, api) { + if (!api.fromHistory) setStateAbs(ev.state, ev.since_unix); + }, }, onBackfillDone() { // If the last replayed turn never closed, the banner shimmer + diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index ec17fc0..4645c1b 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -101,7 +101,7 @@ async fn main() -> Result<()> { // Partial-run mode: keep the harness alive (so the web UI // stays bound) but don't drive the turn loop. Poll the // claude dir; once a session lands we enter `serve`. - turn::wait_for_login(&claude_dir, login_state.clone(), poll_ms).await; + turn::wait_for_login(&claude_dir, login_state.clone(), &bus, poll_ms).await; serve( &cli.socket, Duration::from_millis(poll_ms), diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 705920c..489258b 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -86,7 +86,7 @@ async fn main() -> Result<()> { .await } LoginState::NeedsLogin => { - turn::wait_for_login(&claude_dir, login_state, poll_ms).await; + turn::wait_for_login(&claude_dir, login_state, &bus, poll_ms).await; serve( &cli.socket, Duration::from_millis(poll_ms), diff --git a/hive-ag3nt/src/events.rs b/hive-ag3nt/src/events.rs index 8af3e04..a99f49c 100644 --- a/hive-ag3nt/src/events.rs +++ b/hive-ag3nt/src/events.rs @@ -118,6 +118,30 @@ pub enum LiveEvent { /// Turn finished. `ok=false` means claude exited non-zero or the /// harness hit a transport error. TurnEnd { ok: bool, note: Option }, + /// Harness reachability flipped: `"online"` / + /// `"needs_login_idle"` / `"needs_login_in_progress"`. The web UI + /// drives the alive badge from this so the operator sees a login + /// land (or get revoked) without polling. Session detail + /// (`url`/`output`/`finished`) is still served by `/api/state` + /// during the short-lived in-progress window — the client + /// re-fetches only while that flow is active. + StatusChanged { status: String }, + /// `/api/model` switched the active claude model. The web UI + /// updates the chip + the per-turn stats sink will key off this + /// to mark the boundary in its log. + ModelChanged { model: String }, + /// Final-turn `usage` block landed (input + output + cache + /// counters). Powers the context-window badge + accumulates into + /// the per-turn stats sink. + TokenUsageChanged { usage: TokenUsage }, + /// Harness's `TurnState` transitioned (idle / thinking / + /// compacting). `since_unix` matches `Bus::state_snapshot().1` + /// so the client's elapsed-time ticker keeps progressing across + /// SSE reconnects without drift. + TurnStateChanged { + state: TurnState, + since_unix: i64, + }, } /// sqlite-backed event log. Wraps a `Connection` behind a `Mutex` so the @@ -149,6 +173,10 @@ impl EventStore { LiveEvent::Stream(_) => "stream", LiveEvent::Note { .. } => "note", LiveEvent::TurnEnd { .. } => "turn_end", + LiveEvent::StatusChanged { .. } => "status_changed", + LiveEvent::ModelChanged { .. } => "model_changed", + LiveEvent::TokenUsageChanged { .. } => "token_usage_changed", + LiveEvent::TurnStateChanged { .. } => "turn_state_changed", }; let payload = serde_json::to_string(event).unwrap_or_else(|_| "null".into()); let conn = self.conn.lock().unwrap(); @@ -216,7 +244,7 @@ impl TokenUsage { /// reads via `/api/state` and renders. Lives alongside the bus /// because everyone who has a `Bus` already has the right handle to /// poke the state on transitions. -#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum TurnState { /// Inbox is empty / waiting on `Recv`. @@ -340,11 +368,13 @@ impl Bus { if let Err(e) = persist_model(&value) { tracing::warn!(error = ?e, "model: persist failed"); } + self.emit(LiveEvent::ModelChanged { model: value }); } /// Record the latest token usage from a completed turn. pub fn record_usage(&self, usage: TokenUsage) { *self.last_usage.lock().unwrap() = Some(usage); + self.emit(LiveEvent::TokenUsageChanged { usage }); } /// Last known token usage, or `None` if no turn has completed yet. @@ -356,11 +386,31 @@ impl Bus { /// Update the harness's authoritative turn-loop state. Records /// the transition time so `state_snapshot` can return a since-age. pub fn set_state(&self, next: TurnState) { - let mut guard = self.state.lock().unwrap(); - if guard.0 == next { - return; + let since; + { + let mut guard = self.state.lock().unwrap(); + if guard.0 == next { + return; + } + *guard = (next, now_unix()); + since = guard.1; } - *guard = (next, now_unix()); + self.emit(LiveEvent::TurnStateChanged { + state: next, + since_unix: since, + }); + } + + /// Broadcast a status flip (online / needs_login_*). Called by + /// the bin entry points + `turn::wait_for_login` + the + /// `post_login_*` handlers — every site that mutates the + /// `Arc>` should also call this so the web UI + /// drops its periodic /api/state poll while a turn loop is + /// running. + pub fn emit_status(&self, status: impl Into) { + self.emit(LiveEvent::StatusChanged { + status: status.into(), + }); } /// Current state + since-when (unix seconds). Snapshot copy, no lock held. diff --git a/hive-ag3nt/src/turn.rs b/hive-ag3nt/src/turn.rs index a07dc75..a2f73ca 100644 --- a/hive-ag3nt/src/turn.rs +++ b/hive-ag3nt/src/turn.rs @@ -169,7 +169,12 @@ pub fn emit_turn_end(bus: &Bus, outcome: &TurnOutcome) { /// Block until the bound `~/.claude/` dir contains a session, polling /// `claude_dir` on a `poll_ms` interval (min 2s). Flips `state` to /// `Online` when login lands; caller resumes its serve loop. -pub async fn wait_for_login(claude_dir: &Path, state: Arc>, poll_ms: u64) { +pub async fn wait_for_login( + claude_dir: &Path, + state: Arc>, + bus: &Bus, + poll_ms: u64, +) { tracing::warn!( claude_dir = %claude_dir.display(), "no claude session — staying in partial-run mode (web UI only)" @@ -180,6 +185,7 @@ pub async fn wait_for_login(claude_dir: &Path, state: Arc>, po if login::has_session(claude_dir) { tracing::info!("claude session detected — entering turn loop"); *state.lock().unwrap() = LoginState::Online; + bus.emit_status("online"); return; } } diff --git a/hive-ag3nt/src/web_ui.rs b/hive-ag3nt/src/web_ui.rs index 44243ab..22411de 100644 --- a/hive-ag3nt/src/web_ui.rs +++ b/hive-ag3nt/src/web_ui.rs @@ -16,7 +16,7 @@ use axum::{ extract::State, http::StatusCode, response::{ - IntoResponse, Redirect, Response, + IntoResponse, Response, sse::{Event, KeepAlive, Sse}, }, routing::{get, post}, @@ -388,13 +388,19 @@ async fn post_login_start(State(state): State) -> Response { { let guard = state.session.lock().unwrap(); if guard.is_some() { - return Redirect::to("/").into_response(); + return (axum::http::StatusCode::OK, "ok").into_response(); } } match LoginSession::start() { Ok(session) => { *state.session.lock().unwrap() = Some(Arc::new(session)); - Redirect::to("/").into_response() + // Flip status from needs_login_idle → needs_login_in_progress + // so the web UI's badge + polling kick in (polling is still + // the right tool for the streaming session output during + // the login flow itself; events drop the poll for + // *everything else*). + state.bus.emit_status("needs_login_in_progress"); + (axum::http::StatusCode::OK, "ok").into_response() } Err(e) => error_response(&format!("login start failed: {e:#}")), } @@ -413,7 +419,7 @@ async fn post_login_code(State(state): State, Form(form): Form) -> Response { @@ -422,7 +428,9 @@ async fn post_login_cancel(State(state): State) -> Response { session.close_stdin().await; session.kill(); } - Redirect::to("/").into_response() + // Back to needs_login_idle (LoginState unchanged, session gone). + state.bus.emit_status("needs_login_idle"); + (axum::http::StatusCode::OK, "ok").into_response() } /// Operator-initiated session compaction. Spawns `turn::compact_session` @@ -452,7 +460,7 @@ async fn post_set_model(State(state): State, Form(form): Form) -> Response { @@ -483,7 +491,7 @@ async fn post_compact(State(state): State) -> Response { }); } }); - Redirect::to("/").into_response() + (axum::http::StatusCode::OK, "ok").into_response() } /// Cancel the in-flight claude turn. Coarse-grained: shells out @@ -504,7 +512,7 @@ async fn post_new_session(State(state): State) -> Response { state.bus.emit(crate::events::LiveEvent::Note { text: "operator: new session armed — next turn runs without --continue".into(), }); - Redirect::to("/").into_response() + (axum::http::StatusCode::OK, "ok").into_response() } async fn post_cancel_turn(State(state): State) -> Response { @@ -525,7 +533,7 @@ async fn post_cancel_turn(State(state): State) -> Response { Err(e) => format!("operator: /cancel — pkill failed: {e}"), }; state.bus.emit(crate::events::LiveEvent::Note { text: note }); - Redirect::to("/").into_response() + (axum::http::StatusCode::OK, "ok").into_response() } fn error_response(message: &str) -> Response {