diff --git a/TODO.md b/TODO.md index 0decc12..cce021e 100644 --- a/TODO.md +++ b/TODO.md @@ -34,4 +34,4 @@ - ~~**Pending message wake-up**~~ ✓ fixed (e423d57) — subscribe-before-check race in `broker.recv_blocking` meant a send landing between the initial `recv()` and `subscribe()` was missed; agent then sat on the 180s long-poll until another, unrelated message woke it. Now subscribe first. - **Post-rebuild system-message missed wake**: at 09:13:14 the dashboard showed `system → damocles container rebuilt` as ✓ delivered, but the agent harness never ran a turn for it (no claude invocation, no operator-visible activity). A subsequent `recv()` from inside the agent returned `(empty)`, confirming the message was popped + marked delivered server-side — yet drove no turn. Most likely cause: the agent_server `serve_agent_stdio` task is up and answering MCP/socket calls, but the `hive-ag3nt::serve` long-poll loop that drives `drive_turn` either died silently during rebuild or never restarted. Investigate: (a) does hive-ag3nt's serve loop survive `nixos-container update` cleanly, or does its tokio runtime get torn down mid-loop? (b) is there an early-exit path on a transient socket error during rebuild that drops the serve task without notifying the manager? (c) compare timeline with manager's own post-rebuild wake to see if this is rebuilt-agents-only or universal. Could be related to the `recv_blocking` fix in `e423d57` if the rebuild restarts the broker mid-subscribe. -- **`LiveEvent::Note(String)` never reaches the browser**: the enum is `#[serde(tag = "kind", rename_all = "snake_case")]` with `Note(String)` as a newtype variant — `serde_json::to_string` errors at runtime with `cannot serialize tagged newtype variant containing a string`. The SSE handler's `filter_map(... .ok()? ...)` silently drops the event; the sqlite history persists it as the literal string `"null"`. Every `bus.emit(LiveEvent::Note(...))` call site has been a no-op since the variant was added, and the JS terminal's `note` renderer is dead code. Fix: convert to a struct variant `Note { text: String }` (matches what the JS already reads via `ev.text`) and verify the existing call sites still type-check. While there, audit the sqlite-stored `"null"` rows so history-replay doesn't trip on them. +- ~~**`LiveEvent::Note(String)` never reaches the browser**~~ ✓ fixed — converted to struct variant `Note { text: String }`; wire shape `{"kind":"note","text":"..."}` matches what the JS already reads via `ev.text`. Historical sqlite rows persisted as the literal string `"null"` (from when serialization silently failed) get filtered out by the `rows.flatten().flatten()` pipeline in `EventStore::recent`, so replay tolerates them. diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index d4bdd72..4e88813 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -136,7 +136,9 @@ async fn serve( } else { tracing::info!(%from, %body, "system message"); } - bus.emit(LiveEvent::Note(format!("[system] {body}"))); + bus.emit(LiveEvent::Note { + text: format!("[system] {body}"), + }); // Fall through: drive a turn with the event in the wake // prompt body so claude sees it. Sender stays "system" // so the wake prompt can label it as such. diff --git a/hive-ag3nt/src/events.rs b/hive-ag3nt/src/events.rs index e894483..8af3e04 100644 --- a/hive-ag3nt/src/events.rs +++ b/hive-ag3nt/src/events.rs @@ -105,7 +105,16 @@ pub enum LiveEvent { /// Free-form note from the harness (e.g. "claude exited 0", /// "stream-json parse error: ..."). Useful when stream-json itself /// fails so the UI doesn't just go silent. - Note(String), + /// + /// Must be a struct variant (not `Note(String)`): internally-tagged + /// enums can't flatten a tag onto a primitive newtype, and serde + /// fails serialization at runtime — silently, because the SSE + /// handler's `filter_map(... .ok()? ...)` swallows the error. From + /// 2025-08 through 2026-05 every `Note` emission was a no-op + the + /// sqlite history persisted them as the literal string `"null"`. + /// The web UI's `note` renderer already reads `ev.text`, so the + /// wire shape matches without a JS change. + Note { text: String }, /// Turn finished. `ok=false` means claude exited non-zero or the /// harness hit a transport error. TurnEnd { ok: bool, note: Option }, @@ -138,7 +147,7 @@ impl EventStore { let kind = match event { LiveEvent::TurnStart { .. } => "turn_start", LiveEvent::Stream(_) => "stream", - LiveEvent::Note(_) => "note", + LiveEvent::Note { .. } => "note", LiveEvent::TurnEnd { .. } => "turn_end", }; let payload = serde_json::to_string(event).unwrap_or_else(|_| "null".into()); diff --git a/hive-ag3nt/src/turn.rs b/hive-ag3nt/src/turn.rs index 1992317..a07dc75 100644 --- a/hive-ag3nt/src/turn.rs +++ b/hive-ag3nt/src/turn.rs @@ -206,11 +206,13 @@ pub async fn run_turn(prompt: &str, files: &TurnFiles, bus: &Bus) -> TurnOutcome /// compact state matches a normal turn's. Only the prompt over stdin /// differs (`/compact` vs the wake-up payload). pub async fn compact_session(files: &TurnFiles, bus: &Bus) -> Result<()> { - bus.emit(LiveEvent::Note( - "context overflow — running /compact on the persistent session".into(), - )); + bus.emit(LiveEvent::Note { + text: "context overflow — running /compact on the persistent session".into(), + }); let _ = run_claude("/compact", files, bus).await?; - bus.emit(LiveEvent::Note("/compact done".into())); + bus.emit(LiveEvent::Note { + text: "/compact done".into(), + }); Ok(()) } @@ -218,9 +220,9 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result let model = bus.model(); let resume = !bus.take_skip_continue(); if !resume { - bus.emit(LiveEvent::Note( - "fresh session (--continue suppressed for this turn)".into(), - )); + bus.emit(LiveEvent::Note { + text: "fresh session (--continue suppressed for this turn)".into(), + }); } let mut cmd = Command::new("claude"); // Spawn inside the agent's state dir so relative paths in tool calls @@ -282,7 +284,9 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result } bus_out.emit(LiveEvent::Stream(v)); } - Err(_) => bus_out.emit(LiveEvent::Note(format!("(non-json) {line}"))), + Err(_) => bus_out.emit(LiveEvent::Note { + text: format!("(non-json) {line}"), + }), } } }); @@ -304,7 +308,9 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result // renders; the tracing line is what `journalctl -M -b` // surfaces when claude exits non-zero. tracing::warn!(line = %line, "claude stderr"); - bus_err.emit(LiveEvent::Note(format!("stderr: {line}"))); + bus_err.emit(LiveEvent::Note { + text: format!("stderr: {line}"), + }); let mut t = tail_clone.lock().unwrap(); if t.len() >= STDERR_TAIL_LINES { t.pop_front(); diff --git a/hive-ag3nt/src/web_ui.rs b/hive-ag3nt/src/web_ui.rs index 69f22f3..44243ab 100644 --- a/hive-ag3nt/src/web_ui.rs +++ b/hive-ag3nt/src/web_ui.rs @@ -372,9 +372,9 @@ async fn events_stream( let rx = state.bus.subscribe(); // Drop a "hello" note into the bus so every new subscriber sees at // least one event immediately and can clear the connecting placeholder. - state.bus.emit(crate::events::LiveEvent::Note( - "live stream attached".into(), - )); + state.bus.emit(crate::events::LiveEvent::Note { + text: "live stream attached".into(), + }); let stream = BroadcastStream::new(rx).filter_map(|res| { let ev = res.ok()?; let json = serde_json::to_string(&ev).ok()?; @@ -448,9 +448,9 @@ async fn post_set_model(State(state): State, Form(form): Form) -> Response { let files = state.files.clone(); tokio::spawn(async move { let _guard = guard; // keep lock alive for the duration of compaction - bus.emit(crate::events::LiveEvent::Note( - "operator: /compact — running on persistent session".into(), - )); + bus.emit(crate::events::LiveEvent::Note { + text: "operator: /compact — running on persistent session".into(), + }); bus.set_state(crate::events::TurnState::Compacting); let r = crate::turn::compact_session(&files, &bus).await; bus.set_state(crate::events::TurnState::Idle); if let Err(e) = r { - bus.emit(crate::events::LiveEvent::Note(format!( - "/compact failed: {e:#}" - ))); + bus.emit(crate::events::LiveEvent::Note { + text: format!("/compact failed: {e:#}"), + }); } }); Redirect::to("/").into_response() @@ -501,9 +501,9 @@ async fn post_compact(State(state): State) -> Response { /// than asking claude to forget mid-stream. async fn post_new_session(State(state): State) -> Response { state.bus.request_new_session(); - state.bus.emit(crate::events::LiveEvent::Note( - "operator: new session armed — next turn runs without --continue".into(), - )); + state.bus.emit(crate::events::LiveEvent::Note { + text: "operator: new session armed — next turn runs without --continue".into(), + }); Redirect::to("/").into_response() } @@ -524,7 +524,7 @@ async fn post_cancel_turn(State(state): State) -> Response { ), Err(e) => format!("operator: /cancel — pkill failed: {e}"), }; - state.bus.emit(crate::events::LiveEvent::Note(note)); + state.bus.emit(crate::events::LiveEvent::Note { text: note }); Redirect::to("/").into_response() }