events: LiveEvent::Note becomes struct variant so serde can actually serialize it
This commit is contained in:
parent
aa24080f7b
commit
b60774a66c
5 changed files with 46 additions and 29 deletions
2
TODO.md
2
TODO.md
|
|
@ -34,4 +34,4 @@
|
||||||
|
|
||||||
- ~~**Pending message wake-up**~~ ✓ fixed (e423d57) — subscribe-before-check race in `broker.recv_blocking` meant a send landing between the initial `recv()` and `subscribe()` was missed; agent then sat on the 180s long-poll until another, unrelated message woke it. Now subscribe first.
|
- ~~**Pending message wake-up**~~ ✓ fixed (e423d57) — subscribe-before-check race in `broker.recv_blocking` meant a send landing between the initial `recv()` and `subscribe()` was missed; agent then sat on the 180s long-poll until another, unrelated message woke it. Now subscribe first.
|
||||||
- **Post-rebuild system-message missed wake**: at 09:13:14 the dashboard showed `system → damocles container rebuilt` as ✓ delivered, but the agent harness never ran a turn for it (no claude invocation, no operator-visible activity). A subsequent `recv()` from inside the agent returned `(empty)`, confirming the message was popped + marked delivered server-side — yet drove no turn. Most likely cause: the agent_server `serve_agent_stdio` task is up and answering MCP/socket calls, but the `hive-ag3nt::serve` long-poll loop that drives `drive_turn` either died silently during rebuild or never restarted. Investigate: (a) does hive-ag3nt's serve loop survive `nixos-container update` cleanly, or does its tokio runtime get torn down mid-loop? (b) is there an early-exit path on a transient socket error during rebuild that drops the serve task without notifying the manager? (c) compare timeline with manager's own post-rebuild wake to see if this is rebuilt-agents-only or universal. Could be related to the `recv_blocking` fix in `e423d57` if the rebuild restarts the broker mid-subscribe.
|
- **Post-rebuild system-message missed wake**: at 09:13:14 the dashboard showed `system → damocles container rebuilt` as ✓ delivered, but the agent harness never ran a turn for it (no claude invocation, no operator-visible activity). A subsequent `recv()` from inside the agent returned `(empty)`, confirming the message was popped + marked delivered server-side — yet drove no turn. Most likely cause: the agent_server `serve_agent_stdio` task is up and answering MCP/socket calls, but the `hive-ag3nt::serve` long-poll loop that drives `drive_turn` either died silently during rebuild or never restarted. Investigate: (a) does hive-ag3nt's serve loop survive `nixos-container update` cleanly, or does its tokio runtime get torn down mid-loop? (b) is there an early-exit path on a transient socket error during rebuild that drops the serve task without notifying the manager? (c) compare timeline with manager's own post-rebuild wake to see if this is rebuilt-agents-only or universal. Could be related to the `recv_blocking` fix in `e423d57` if the rebuild restarts the broker mid-subscribe.
|
||||||
- **`LiveEvent::Note(String)` never reaches the browser**: the enum is `#[serde(tag = "kind", rename_all = "snake_case")]` with `Note(String)` as a newtype variant — `serde_json::to_string` errors at runtime with `cannot serialize tagged newtype variant containing a string`. The SSE handler's `filter_map(... .ok()? ...)` silently drops the event; the sqlite history persists it as the literal string `"null"`. Every `bus.emit(LiveEvent::Note(...))` call site has been a no-op since the variant was added, and the JS terminal's `note` renderer is dead code. Fix: convert to a struct variant `Note { text: String }` (matches what the JS already reads via `ev.text`) and verify the existing call sites still type-check. While there, audit the sqlite-stored `"null"` rows so history-replay doesn't trip on them.
|
- ~~**`LiveEvent::Note(String)` never reaches the browser**~~ ✓ fixed — converted to struct variant `Note { text: String }`; wire shape `{"kind":"note","text":"..."}` matches what the JS already reads via `ev.text`. Historical sqlite rows persisted as the literal string `"null"` (from when serialization silently failed) get filtered out by the `rows.flatten().flatten()` pipeline in `EventStore::recent`, so replay tolerates them.
|
||||||
|
|
|
||||||
|
|
@ -136,7 +136,9 @@ async fn serve(
|
||||||
} else {
|
} else {
|
||||||
tracing::info!(%from, %body, "system message");
|
tracing::info!(%from, %body, "system message");
|
||||||
}
|
}
|
||||||
bus.emit(LiveEvent::Note(format!("[system] {body}")));
|
bus.emit(LiveEvent::Note {
|
||||||
|
text: format!("[system] {body}"),
|
||||||
|
});
|
||||||
// Fall through: drive a turn with the event in the wake
|
// Fall through: drive a turn with the event in the wake
|
||||||
// prompt body so claude sees it. Sender stays "system"
|
// prompt body so claude sees it. Sender stays "system"
|
||||||
// so the wake prompt can label it as such.
|
// so the wake prompt can label it as such.
|
||||||
|
|
|
||||||
|
|
@ -105,7 +105,16 @@ pub enum LiveEvent {
|
||||||
/// Free-form note from the harness (e.g. "claude exited 0",
|
/// Free-form note from the harness (e.g. "claude exited 0",
|
||||||
/// "stream-json parse error: ..."). Useful when stream-json itself
|
/// "stream-json parse error: ..."). Useful when stream-json itself
|
||||||
/// fails so the UI doesn't just go silent.
|
/// fails so the UI doesn't just go silent.
|
||||||
Note(String),
|
///
|
||||||
|
/// Must be a struct variant (not `Note(String)`): internally-tagged
|
||||||
|
/// enums can't flatten a tag onto a primitive newtype, and serde
|
||||||
|
/// fails serialization at runtime — silently, because the SSE
|
||||||
|
/// handler's `filter_map(... .ok()? ...)` swallows the error. From
|
||||||
|
/// 2025-08 through 2026-05 every `Note` emission was a no-op + the
|
||||||
|
/// sqlite history persisted them as the literal string `"null"`.
|
||||||
|
/// The web UI's `note` renderer already reads `ev.text`, so the
|
||||||
|
/// wire shape matches without a JS change.
|
||||||
|
Note { text: String },
|
||||||
/// Turn finished. `ok=false` means claude exited non-zero or the
|
/// Turn finished. `ok=false` means claude exited non-zero or the
|
||||||
/// harness hit a transport error.
|
/// harness hit a transport error.
|
||||||
TurnEnd { ok: bool, note: Option<String> },
|
TurnEnd { ok: bool, note: Option<String> },
|
||||||
|
|
@ -138,7 +147,7 @@ impl EventStore {
|
||||||
let kind = match event {
|
let kind = match event {
|
||||||
LiveEvent::TurnStart { .. } => "turn_start",
|
LiveEvent::TurnStart { .. } => "turn_start",
|
||||||
LiveEvent::Stream(_) => "stream",
|
LiveEvent::Stream(_) => "stream",
|
||||||
LiveEvent::Note(_) => "note",
|
LiveEvent::Note { .. } => "note",
|
||||||
LiveEvent::TurnEnd { .. } => "turn_end",
|
LiveEvent::TurnEnd { .. } => "turn_end",
|
||||||
};
|
};
|
||||||
let payload = serde_json::to_string(event).unwrap_or_else(|_| "null".into());
|
let payload = serde_json::to_string(event).unwrap_or_else(|_| "null".into());
|
||||||
|
|
|
||||||
|
|
@ -206,11 +206,13 @@ pub async fn run_turn(prompt: &str, files: &TurnFiles, bus: &Bus) -> TurnOutcome
|
||||||
/// compact state matches a normal turn's. Only the prompt over stdin
|
/// compact state matches a normal turn's. Only the prompt over stdin
|
||||||
/// differs (`/compact` vs the wake-up payload).
|
/// differs (`/compact` vs the wake-up payload).
|
||||||
pub async fn compact_session(files: &TurnFiles, bus: &Bus) -> Result<()> {
|
pub async fn compact_session(files: &TurnFiles, bus: &Bus) -> Result<()> {
|
||||||
bus.emit(LiveEvent::Note(
|
bus.emit(LiveEvent::Note {
|
||||||
"context overflow — running /compact on the persistent session".into(),
|
text: "context overflow — running /compact on the persistent session".into(),
|
||||||
));
|
});
|
||||||
let _ = run_claude("/compact", files, bus).await?;
|
let _ = run_claude("/compact", files, bus).await?;
|
||||||
bus.emit(LiveEvent::Note("/compact done".into()));
|
bus.emit(LiveEvent::Note {
|
||||||
|
text: "/compact done".into(),
|
||||||
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -218,9 +220,9 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<bool>
|
||||||
let model = bus.model();
|
let model = bus.model();
|
||||||
let resume = !bus.take_skip_continue();
|
let resume = !bus.take_skip_continue();
|
||||||
if !resume {
|
if !resume {
|
||||||
bus.emit(LiveEvent::Note(
|
bus.emit(LiveEvent::Note {
|
||||||
"fresh session (--continue suppressed for this turn)".into(),
|
text: "fresh session (--continue suppressed for this turn)".into(),
|
||||||
));
|
});
|
||||||
}
|
}
|
||||||
let mut cmd = Command::new("claude");
|
let mut cmd = Command::new("claude");
|
||||||
// Spawn inside the agent's state dir so relative paths in tool calls
|
// Spawn inside the agent's state dir so relative paths in tool calls
|
||||||
|
|
@ -282,7 +284,9 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<bool>
|
||||||
}
|
}
|
||||||
bus_out.emit(LiveEvent::Stream(v));
|
bus_out.emit(LiveEvent::Stream(v));
|
||||||
}
|
}
|
||||||
Err(_) => bus_out.emit(LiveEvent::Note(format!("(non-json) {line}"))),
|
Err(_) => bus_out.emit(LiveEvent::Note {
|
||||||
|
text: format!("(non-json) {line}"),
|
||||||
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
@ -304,7 +308,9 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<bool>
|
||||||
// renders; the tracing line is what `journalctl -M <c> -b`
|
// renders; the tracing line is what `journalctl -M <c> -b`
|
||||||
// surfaces when claude exits non-zero.
|
// surfaces when claude exits non-zero.
|
||||||
tracing::warn!(line = %line, "claude stderr");
|
tracing::warn!(line = %line, "claude stderr");
|
||||||
bus_err.emit(LiveEvent::Note(format!("stderr: {line}")));
|
bus_err.emit(LiveEvent::Note {
|
||||||
|
text: format!("stderr: {line}"),
|
||||||
|
});
|
||||||
let mut t = tail_clone.lock().unwrap();
|
let mut t = tail_clone.lock().unwrap();
|
||||||
if t.len() >= STDERR_TAIL_LINES {
|
if t.len() >= STDERR_TAIL_LINES {
|
||||||
t.pop_front();
|
t.pop_front();
|
||||||
|
|
|
||||||
|
|
@ -372,9 +372,9 @@ async fn events_stream(
|
||||||
let rx = state.bus.subscribe();
|
let rx = state.bus.subscribe();
|
||||||
// Drop a "hello" note into the bus so every new subscriber sees at
|
// Drop a "hello" note into the bus so every new subscriber sees at
|
||||||
// least one event immediately and can clear the connecting placeholder.
|
// least one event immediately and can clear the connecting placeholder.
|
||||||
state.bus.emit(crate::events::LiveEvent::Note(
|
state.bus.emit(crate::events::LiveEvent::Note {
|
||||||
"live stream attached".into(),
|
text: "live stream attached".into(),
|
||||||
));
|
});
|
||||||
let stream = BroadcastStream::new(rx).filter_map(|res| {
|
let stream = BroadcastStream::new(rx).filter_map(|res| {
|
||||||
let ev = res.ok()?;
|
let ev = res.ok()?;
|
||||||
let json = serde_json::to_string(&ev).ok()?;
|
let json = serde_json::to_string(&ev).ok()?;
|
||||||
|
|
@ -448,9 +448,9 @@ async fn post_set_model(State(state): State<AppState>, Form(form): Form<ModelFor
|
||||||
return error_response("model: name required");
|
return error_response("model: name required");
|
||||||
}
|
}
|
||||||
state.bus.set_model(name);
|
state.bus.set_model(name);
|
||||||
state.bus.emit(crate::events::LiveEvent::Note(format!(
|
state.bus.emit(crate::events::LiveEvent::Note {
|
||||||
"operator: /model — claude model set to '{name}' for future turns"
|
text: format!("operator: /model — claude model set to '{name}' for future turns"),
|
||||||
)));
|
});
|
||||||
tracing::info!(%name, "operator set model");
|
tracing::info!(%name, "operator set model");
|
||||||
Redirect::to("/").into_response()
|
Redirect::to("/").into_response()
|
||||||
}
|
}
|
||||||
|
|
@ -471,16 +471,16 @@ async fn post_compact(State(state): State<AppState>) -> Response {
|
||||||
let files = state.files.clone();
|
let files = state.files.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _guard = guard; // keep lock alive for the duration of compaction
|
let _guard = guard; // keep lock alive for the duration of compaction
|
||||||
bus.emit(crate::events::LiveEvent::Note(
|
bus.emit(crate::events::LiveEvent::Note {
|
||||||
"operator: /compact — running on persistent session".into(),
|
text: "operator: /compact — running on persistent session".into(),
|
||||||
));
|
});
|
||||||
bus.set_state(crate::events::TurnState::Compacting);
|
bus.set_state(crate::events::TurnState::Compacting);
|
||||||
let r = crate::turn::compact_session(&files, &bus).await;
|
let r = crate::turn::compact_session(&files, &bus).await;
|
||||||
bus.set_state(crate::events::TurnState::Idle);
|
bus.set_state(crate::events::TurnState::Idle);
|
||||||
if let Err(e) = r {
|
if let Err(e) = r {
|
||||||
bus.emit(crate::events::LiveEvent::Note(format!(
|
bus.emit(crate::events::LiveEvent::Note {
|
||||||
"/compact failed: {e:#}"
|
text: format!("/compact failed: {e:#}"),
|
||||||
)));
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Redirect::to("/").into_response()
|
Redirect::to("/").into_response()
|
||||||
|
|
@ -501,9 +501,9 @@ async fn post_compact(State(state): State<AppState>) -> Response {
|
||||||
/// than asking claude to forget mid-stream.
|
/// than asking claude to forget mid-stream.
|
||||||
async fn post_new_session(State(state): State<AppState>) -> Response {
|
async fn post_new_session(State(state): State<AppState>) -> Response {
|
||||||
state.bus.request_new_session();
|
state.bus.request_new_session();
|
||||||
state.bus.emit(crate::events::LiveEvent::Note(
|
state.bus.emit(crate::events::LiveEvent::Note {
|
||||||
"operator: new session armed — next turn runs without --continue".into(),
|
text: "operator: new session armed — next turn runs without --continue".into(),
|
||||||
));
|
});
|
||||||
Redirect::to("/").into_response()
|
Redirect::to("/").into_response()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -524,7 +524,7 @@ async fn post_cancel_turn(State(state): State<AppState>) -> Response {
|
||||||
),
|
),
|
||||||
Err(e) => format!("operator: /cancel — pkill failed: {e}"),
|
Err(e) => format!("operator: /cancel — pkill failed: {e}"),
|
||||||
};
|
};
|
||||||
state.bus.emit(crate::events::LiveEvent::Note(note));
|
state.bus.emit(crate::events::LiveEvent::Note { text: note });
|
||||||
Redirect::to("/").into_response()
|
Redirect::to("/").into_response()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue