hyperhive/hive-ag3nt/src/web_ui.rs
iris f9d1e69a50 stats: add 1h, 4h, 3d time range windows
Adds Hour (5-min buckets), FourHour (15-min buckets), and ThreeDay
(hourly buckets) to the Window enum, plus the matching tab buttons
in stats.html. Simplifies web_ui.rs to use Window::span_secs()
instead of a duplicate match.

Closes #25
2026-05-20 20:23:09 +02:00

774 lines
29 KiB
Rust

//! Per-container HTTP UI. SPA shape: `GET /` returns a static shell;
//! `GET /static/*` serves CSS + JS; `GET /api/state` returns the page
//! state as JSON; the JS app renders. Live events stream on
//! `/events/stream`. Action POSTs (`/send`, `/login/*`) return either a
//! 303 Redirect (for browsers that submit the form normally) or just
//! 200 OK — the JS app re-fetches `/api/state` afterwards.
use std::convert::Infallible;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use anyhow::{Context, Result};
use axum::{
Form, Router,
extract::State,
http::StatusCode,
response::{
IntoResponse, Response,
sse::{Event, KeepAlive, Sse},
},
routing::{get, post},
};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream};
use crate::client;
use crate::events::Bus;
use crate::login::LoginState;
use crate::login_session::{LoginSession, drop_if_finished};
use crate::mcp;
use crate::turn::TurnFiles;
/// Live login state for the web UI. The harness updates this in place as it
/// transitions between `NeedsLogin` and `Online`; the UI reads on each
/// render.
pub type LoginStateCell = Arc<Mutex<LoginState>>;
/// Shared turn lock. The serve loop acquires this (as an async mutex) for the
/// duration of every `drive_turn` call. The `/api/compact` handler tries
/// `try_lock()` and rejects immediately if a turn is in flight, preventing
/// concurrent access to the claude session.
pub type TurnLock = Arc<tokio::sync::Mutex<()>>;
#[derive(Clone)]
struct AppState {
label: String,
login: LoginStateCell,
session: Arc<Mutex<Option<Arc<LoginSession>>>>,
bus: Bus,
socket: PathBuf,
/// Same `TurnFiles` the harness's turn loop uses. Shared so
/// `/api/compact` re-uses the exact MCP config / system prompt /
/// settings claude saw on the last regular turn — keeps the
/// session shape identical across compact + normal turns.
files: TurnFiles,
/// Prevents `/api/compact` from racing with an in-flight normal turn.
turn_lock: TurnLock,
/// VNC port read from `/etc/hyperhive/gui.json` at startup.
/// `None` when the file is absent (gui not enabled for this agent).
gui_vnc_port: Option<u16>,
}
impl AppState {
fn flavor(&self) -> Flavor {
self.files.flavor
}
}
/// Which wire protocol the per-agent UI's `/send` handler should speak.
/// Sub-agent → `AgentRequest::OperatorMsg`; manager →
/// `ManagerRequest::OperatorMsg`. Reuses the MCP-side enum so a
/// single value drives both the send protocol and (in
/// `post_compact`) the allowed-tools surface claude sees.
pub type Flavor = mcp::Flavor;
pub async fn serve(
label: String,
port: u16,
login: LoginStateCell,
bus: Bus,
socket: PathBuf,
files: TurnFiles,
turn_lock: TurnLock,
) -> Result<()> {
let gui_vnc_port = read_gui_json();
let state = AppState {
label,
login,
session: Arc::new(Mutex::new(None)),
bus,
socket,
files,
turn_lock,
gui_vnc_port,
};
let app = Router::new()
.route("/", get(serve_index))
.route("/static/agent.css", get(serve_css))
.route("/static/app.js", get(serve_app_js))
.route("/static/hive-fr0nt.js", get(serve_shared_js))
.route("/static/marked.js", get(serve_marked_js))
.route("/api/state", get(api_state))
.route("/events/stream", get(events_stream))
.route("/events/history", get(events_history))
.route("/send", post(post_send))
.route("/login/start", post(post_login_start))
.route("/login/code", post(post_login_code))
.route("/login/cancel", post(post_login_cancel))
.route("/api/cancel", post(post_cancel_turn))
.route("/api/compact", post(post_compact))
.route("/api/model", post(post_set_model))
.route("/api/new-session", post(post_new_session))
.route("/api/loose-ends", get(api_loose_ends))
.route("/stats", get(serve_stats))
.route("/static/stats.js", get(serve_stats_js))
.route("/api/stats", get(api_stats))
.route("/screen", get(serve_screen))
.route("/screen/ws", get(screen_ws))
.with_state(state);
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = bind_with_retry(addr, "web UI").await?;
tracing::info!(%port, "web UI listening");
axum::serve(listener, app).await?;
Ok(())
}
// ---------------------------------------------------------------------------
// Static assets + state snapshot
// ---------------------------------------------------------------------------
/// Bind a TCP listener with `SO_REUSEADDR` set, retrying on
/// `AddrInUse` for up to ~20s. nspawn restarts can race the previous
/// harness's socket release; `SO_REUSEADDR` lets us reclaim a port
/// still in `TIME_WAIT` from a clean previous exit, and the retry
/// covers the case where the previous process is genuinely still
/// alive (systemd restart-delay overlap).
async fn bind_with_retry(addr: SocketAddr, label: &str) -> Result<tokio::net::TcpListener> {
let mut delay_ms = 250u64;
let mut attempts = 0u32;
loop {
match try_bind(addr) {
Ok(l) => return Ok(l),
Err(e) if e.kind() == std::io::ErrorKind::AddrInUse && attempts < 12 => {
tracing::warn!(
%addr, attempt = attempts + 1,
"{label}: AddrInUse, retrying in {delay_ms}ms"
);
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
attempts += 1;
delay_ms = (delay_ms * 2).min(2000);
}
Err(e) => {
return Err(e).with_context(|| format!("bind {label} on {addr}"));
}
}
}
}
fn try_bind(addr: SocketAddr) -> std::io::Result<tokio::net::TcpListener> {
let sock = match addr {
SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
};
sock.set_reuseaddr(true)?;
sock.bind(addr)?;
sock.listen(1024)
}
async fn serve_index() -> impl IntoResponse {
(
[("content-type", "text/html; charset=utf-8")],
include_str!("../assets/index.html"),
)
}
async fn serve_css() -> impl IntoResponse {
// Prepend the shared palette/typography so per-page styles only need
// to declare what's actually page-specific. One HTTP request, no
// per-asset cache to invalidate.
let body = format!(
"{}\n{}\n{}",
hive_fr0nt::BASE_CSS,
hive_fr0nt::TERMINAL_CSS,
include_str!("../assets/agent.css"),
);
([("content-type", "text/css")], body)
}
async fn serve_app_js() -> impl IntoResponse {
(
[("content-type", "application/javascript")],
include_str!("../assets/app.js"),
)
}
async fn serve_shared_js() -> impl IntoResponse {
(
[("content-type", "application/javascript")],
hive_fr0nt::TERMINAL_JS,
)
}
async fn serve_marked_js() -> impl IntoResponse {
(
[("content-type", "application/javascript")],
hive_fr0nt::MARKED_JS,
)
}
async fn serve_stats() -> impl IntoResponse {
(
[("content-type", "text/html; charset=utf-8")],
include_str!("../assets/stats.html"),
)
}
async fn serve_stats_js() -> impl IntoResponse {
(
[("content-type", "application/javascript")],
include_str!("../assets/stats.js"),
)
}
async fn serve_screen() -> impl IntoResponse {
(
[("content-type", "text/html; charset=utf-8")],
include_str!("../assets/screen.html"),
)
}
/// Read `/etc/hyperhive/gui.json` and extract the `vnc_port` field.
/// Returns `None` if the file is absent or unparseable — GUI not enabled.
fn read_gui_json() -> Option<u16> {
let text = std::fs::read_to_string("/etc/hyperhive/gui.json").ok()?;
let val: serde_json::Value = serde_json::from_str(&text).ok()?;
val["vnc_port"].as_u64().and_then(|p| u16::try_from(p).ok())
}
/// WebSocket handler: upgrade then pump bytes between the WS client and
/// the VNC server on `127.0.0.1:<vnc_port>`. Returns 404 when gui is not
/// enabled for this agent.
async fn screen_ws(
ws: axum::extract::ws::WebSocketUpgrade,
State(state): State<AppState>,
) -> Response {
let Some(vnc_port) = state.gui_vnc_port else {
return (StatusCode::NOT_FOUND, "gui not enabled for this agent").into_response();
};
ws.on_upgrade(move |socket| relay_ws_vnc(socket, vnc_port))
}
/// Pure byte pump: forwards raw bytes between the WebSocket client and
/// the VNC TCP stream. Transparent to any RFB variant (plain, VeNCrypt).
async fn relay_ws_vnc(socket: axum::extract::ws::WebSocket, vnc_port: u16) {
// Import futures traits locally so they don't conflict with
// tokio_stream::StreamExt used at module scope.
use axum::extract::ws::Message;
use futures_util::{SinkExt, StreamExt as _};
let addr = format!("127.0.0.1:{vnc_port}");
let Ok(tcp) = tokio::net::TcpStream::connect(&addr).await else {
tracing::warn!(%addr, "screen/ws: could not connect to VNC server");
return;
};
let (mut tcp_rx, mut tcp_tx) = tcp.into_split();
let (mut ws_tx, mut ws_rx) = socket.split();
// WS → TCP
let ws_to_tcp = tokio::spawn(async move {
while let Some(Ok(msg)) = futures_util::StreamExt::next(&mut ws_rx).await {
match msg {
Message::Binary(data) => {
if tcp_tx.write_all(&data).await.is_err() {
break;
}
}
Message::Close(_) => break,
_ => {} // ping/pong/text: ignore
}
}
});
// TCP → WS
let tcp_to_ws = tokio::spawn(async move {
let mut buf = vec![0u8; 8192];
loop {
match tcp_rx.read(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(n) => {
if ws_tx
.send(Message::Binary(buf[..n].to_vec().into()))
.await
.is_err()
{
break;
}
}
}
}
});
// Wait for either direction to close, then let both tasks drop.
tokio::select! {
_ = ws_to_tcp => {}
_ = tcp_to_ws => {}
}
}
#[derive(Deserialize)]
struct StatsQuery {
window: Option<String>,
}
async fn api_stats(
State(state): State<AppState>,
axum::extract::Query(q): axum::extract::Query<StatsQuery>,
) -> axum::Json<crate::stats::Snapshot> {
let window = crate::stats::Window::parse(q.window.as_deref().unwrap_or("24h"));
let mut snapshot = crate::stats::snapshot_default(window);
// Pass the window span to the reminder-stats RPC so the broker
// filters its counts to the same time range as the chart data.
let window_secs = window.span_secs();
snapshot.reminder_stats = fetch_reminder_stats(&state.socket, state.flavor(), window_secs as u64).await;
axum::Json(snapshot)
}
#[derive(Serialize)]
struct StateSnapshot {
/// Bus seq at the moment this snapshot was assembled. Clients dedupe
/// their buffered SSE traffic against this value: events with
/// `seq <= snapshot.seq` are already reflected (or pre-date the
/// snapshot); `seq > snapshot.seq` is post-snapshot. Reset to 0 on
/// harness restart — clients treat reconnect as a fresh world.
seq: u64,
label: String,
dashboard_port: u16,
/// `"online"` | `"rate_limited"` | `"needs_login_idle"` | `"needs_login_in_progress"`.
status: &'static str,
/// Present when `status == "needs_login_in_progress"`.
session: Option<SessionView>,
/// Last N messages addressed to this agent, newest-first. Pulled
/// from the broker via the per-agent socket on each render.
/// Empty on transport failure.
inbox: Vec<hive_sh4re::InboxRow>,
/// Authoritative turn-loop state from the harness and the unix
/// timestamp the state was entered. The JS computes the age
/// client-side off this rather than tracking it from SSE events.
turn_state: crate::events::TurnState,
turn_state_since: i64,
/// Currently-active claude model name. Reflected on the page so
/// the operator can see what they just switched to (and what's
/// in flight). Mutable at runtime via `POST /api/model`.
model: String,
/// Effective context-window token budget for the current model.
/// Derived from `events::context_window_tokens(&model)` — respects
/// per-model and global `HIVE_CONTEXT_WINDOW_TOKENS_*` overrides then
/// falls back to model-family heuristic. Consumers (e.g. dashboard
/// badge) use this to render the ctx-usage percentage.
context_window_tokens: u64,
/// Last-inference token usage from the most recent completed
/// turn — represents the current context-window size at turn-end.
/// `null` until the first turn finishes.
ctx_usage: Option<crate::events::TokenUsage>,
/// Cumulative token usage across the most recent turn's inferences
/// (cost signal). `null` until the first turn finishes.
cost_usage: Option<crate::events::TokenUsage>,
/// Whether the weston VNC compositor is configured for this agent
/// (i.e. `/etc/hyperhive/gui.json` was present at harness startup).
/// When true, the UI may render a `🖥 screen` link to `/screen`.
gui_enabled: bool,
}
#[derive(Serialize)]
struct SessionView {
/// First `https://…` claude emitted on stdout, if any.
url: Option<String>,
/// Accumulated stdout + stderr.
output: String,
finished: bool,
exit_note: Option<String>,
}
/// Proxy this agent's loose-ends list via the per-agent socket. The
/// web UI surfaces the result as a collapsible section in the page
/// so the operator can see at a glance what's pending against the
/// agent (questions asked by it, peer questions targeting it,
/// reminders it scheduled, approvals for the manager). Same data
/// the `mcp__hyperhive__get_loose_ends` tool sees from inside the
/// container.
async fn api_loose_ends(State(state): State<AppState>) -> Response {
let loose_ends: Vec<hive_sh4re::LooseEnd> = match state.flavor() {
Flavor::Agent => {
match client::request::<_, hive_sh4re::AgentResponse>(
&state.socket,
&hive_sh4re::AgentRequest::GetLooseEnds,
)
.await
{
Ok(hive_sh4re::AgentResponse::LooseEnds { loose_ends }) => loose_ends,
Ok(hive_sh4re::AgentResponse::Err { message }) => {
return error_response(&format!("get_loose_ends: {message}"));
}
Ok(other) => return error_response(&format!("unexpected response: {other:?}")),
Err(e) => return error_response(&format!("transport: {e:#}")),
}
}
Flavor::Manager => {
match client::request::<_, hive_sh4re::ManagerResponse>(
&state.socket,
&hive_sh4re::ManagerRequest::GetLooseEnds,
)
.await
{
Ok(hive_sh4re::ManagerResponse::LooseEnds { loose_ends }) => loose_ends,
Ok(hive_sh4re::ManagerResponse::Err { message }) => {
return error_response(&format!("get_loose_ends: {message}"));
}
Ok(other) => return error_response(&format!("unexpected response: {other:?}")),
Err(e) => return error_response(&format!("transport: {e:#}")),
}
}
};
axum::Json(serde_json::json!({ "loose_ends": loose_ends })).into_response()
}
async fn api_state(State(state): State<AppState>) -> axum::Json<StateSnapshot> {
// Capture seq *before* any reads so the dedupe contract is
// "events with seq > snapshot.seq are post-snapshot, never missed."
let seq = state.bus.current_seq();
drop_if_finished(&state.session);
let login = *state.login.lock().unwrap();
let session_snapshot = state.session.lock().unwrap().clone();
let (status, session_view) = match (login, session_snapshot) {
(LoginState::Online, _) if state.bus.is_rate_limited() => ("rate_limited", None),
(LoginState::Online, _) => ("online", None),
(LoginState::NeedsLogin, None) => ("needs_login_idle", None),
(LoginState::NeedsLogin, Some(s)) => (
"needs_login_in_progress",
Some(SessionView {
url: s.url(),
output: s.output(),
finished: s.finished(),
exit_note: s.exit_note(),
}),
),
};
let dashboard_port = std::env::var("HIVE_DASHBOARD_PORT")
.ok()
.and_then(|s| s.parse::<u16>().ok())
.unwrap_or(7000);
let inbox = recent_inbox(&state.socket, state.flavor()).await;
let (turn_state, turn_state_since) = state.bus.state_snapshot();
let model = state.bus.model();
let context_window_tokens = crate::events::context_window_tokens(&model);
let ctx_usage = state.bus.last_ctx_usage();
let cost_usage = state.bus.last_cost_usage();
axum::Json(StateSnapshot {
seq,
label: state.label.clone(),
dashboard_port,
status,
session: session_view,
inbox,
turn_state,
turn_state_since,
model,
context_window_tokens,
ctx_usage,
cost_usage,
gui_enabled: state.gui_vnc_port.is_some(),
})
}
/// Best-effort: pull the last 30 messages addressed to us via the
/// per-agent / manager socket. Empty list on any transport / decode
/// failure — the inbox section is decorative, not authoritative.
async fn recent_inbox(socket: &std::path::Path, flavor: Flavor) -> Vec<hive_sh4re::InboxRow> {
const LIMIT: u64 = 30;
match flavor {
Flavor::Agent => {
match client::request::<_, hive_sh4re::AgentResponse>(
socket,
&hive_sh4re::AgentRequest::Recent { limit: LIMIT },
)
.await
{
Ok(hive_sh4re::AgentResponse::Recent { rows }) => rows,
_ => Vec::new(),
}
}
Flavor::Manager => {
match client::request::<_, hive_sh4re::ManagerResponse>(
socket,
&hive_sh4re::ManagerRequest::Recent { limit: LIMIT },
)
.await
{
Ok(hive_sh4re::ManagerResponse::Recent { rows }) => rows,
_ => Vec::new(),
}
}
}
}
/// Fetch reminder activity stats from the broker via the per-agent /
/// manager socket. Returns None on any transport / decode failure — the
/// stats are decorative, not authoritative.
async fn fetch_reminder_stats(socket: &std::path::Path, flavor: Flavor, window_secs: u64) -> Option<hive_sh4re::ReminderStats> {
match flavor {
Flavor::Agent => {
match client::request::<_, hive_sh4re::AgentResponse>(
socket,
&hive_sh4re::AgentRequest::ReminderRollup {
since_secs: window_secs,
},
)
.await
{
Ok(hive_sh4re::AgentResponse::ReminderRollup(stats)) => Some(stats),
_ => None,
}
}
Flavor::Manager => {
match client::request::<_, hive_sh4re::ManagerResponse>(
socket,
&hive_sh4re::ManagerRequest::ReminderRollup {
since_secs: window_secs,
},
)
.await
{
Ok(hive_sh4re::ManagerResponse::ReminderRollup(stats)) => Some(stats),
_ => None,
}
}
}
}
// ---------------------------------------------------------------------------
// Action handlers
// ---------------------------------------------------------------------------
#[derive(Deserialize)]
struct SendForm {
body: String,
}
async fn post_send(State(state): State<AppState>, Form(form): Form<SendForm>) -> Response {
let body = form.body.trim().to_owned();
if body.is_empty() {
return error_response("send: `body` required");
}
let result = match state.flavor() {
Flavor::Agent => match client::request::<_, hive_sh4re::AgentResponse>(
&state.socket,
&hive_sh4re::AgentRequest::OperatorMsg { body },
)
.await
{
Ok(hive_sh4re::AgentResponse::Ok) => Ok(()),
Ok(hive_sh4re::AgentResponse::Err { message }) => Err(message),
Ok(other) => Err(format!("unexpected response: {other:?}")),
Err(e) => Err(format!("transport: {e:#}")),
},
Flavor::Manager => match client::request::<_, hive_sh4re::ManagerResponse>(
&state.socket,
&hive_sh4re::ManagerRequest::OperatorMsg { body },
)
.await
{
Ok(hive_sh4re::ManagerResponse::Ok) => Ok(()),
Ok(hive_sh4re::ManagerResponse::Err { message }) => Err(message),
Ok(other) => Err(format!("unexpected response: {other:?}")),
Err(e) => Err(format!("transport: {e:#}")),
},
};
match result {
// 200 instead of 303 → the client doesn't refetch /api/state.
// The operator message becomes a broker `Sent` (already shown
// server-side in the dashboard); on the agent side, the
// resulting `TurnStart` SSE event drives the terminal + the
// inbox row gets consumed by the time `TurnEnd` fires the
// existing turn-end refresh.
Ok(()) => (axum::http::StatusCode::OK, "ok").into_response(),
Err(e) => error_response(&format!("send failed: {e}")),
}
}
async fn events_history(State(state): State<AppState>) -> axum::Json<serde_json::Value> {
// Capture seq *before* the read so dedupe is "drop buffered events
// you've already seen in history", never "lose an event that fired
// between the read and the timestamp." Historical rows have no
// per-row seq; only the high-water mark matters for the dedupe
// window.
let seq = state.bus.current_seq();
let events = state.bus.history();
axum::Json(serde_json::json!({ "seq": seq, "events": events }))
}
async fn events_stream(
State(state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
tracing::info!("sse: client subscribed");
let rx = state.bus.subscribe();
// Drop a "hello" note into the bus so every new subscriber sees at
// least one event immediately and can clear the connecting placeholder.
state.bus.emit(crate::events::LiveEvent::Note {
text: "live stream attached".into(),
});
let stream = BroadcastStream::new(rx).filter_map(|res| {
let ev = res.ok()?;
let json = serde_json::to_string(&ev).ok()?;
Some(Ok(Event::default().data(json)))
});
Sse::new(stream).keep_alive(KeepAlive::default())
}
async fn post_login_start(State(state): State<AppState>) -> Response {
drop_if_finished(&state.session);
{
let guard = state.session.lock().unwrap();
if guard.is_some() {
return (axum::http::StatusCode::OK, "ok").into_response();
}
}
match LoginSession::start() {
Ok(session) => {
*state.session.lock().unwrap() = Some(Arc::new(session));
// Flip status from needs_login_idle → needs_login_in_progress
// so the web UI's badge + polling kick in (polling is still
// the right tool for the streaming session output during
// the login flow itself; events drop the poll for
// *everything else*).
state.bus.emit_status("needs_login_in_progress");
(axum::http::StatusCode::OK, "ok").into_response()
}
Err(e) => error_response(&format!("login start failed: {e:#}")),
}
}
#[derive(Deserialize)]
struct CodeForm {
code: String,
}
async fn post_login_code(State(state): State<AppState>, Form(form): Form<CodeForm>) -> Response {
let session = state.session.lock().unwrap().clone();
let Some(session) = session else {
return error_response("no login session running");
};
if let Err(e) = session.submit_code(&form.code).await {
return error_response(&format!("submit code failed: {e:#}"));
}
(axum::http::StatusCode::OK, "ok").into_response()
}
async fn post_login_cancel(State(state): State<AppState>) -> Response {
let session = state.session.lock().unwrap().take();
if let Some(session) = session {
session.close_stdin().await;
session.kill();
}
// Back to needs_login_idle (LoginState unchanged, session gone).
state.bus.emit_status("needs_login_idle");
(axum::http::StatusCode::OK, "ok").into_response()
}
/// Operator-initiated session compaction. Spawns `turn::compact_session`
/// in the background — the HTTP handler returns immediately so the
/// async-form spinner can clear. Output (claude's compaction stream,
/// the "/compact done" note) lands in the live event panel like any
/// other turn. If a regular turn is in flight, claude's own session
/// lock will reject this one and we surface the error as a Note.
#[derive(Deserialize)]
struct ModelForm {
model: String,
}
/// Switch the model for future turns. The current turn (if any)
/// keeps its model; `/model <name>` applies starting with the next
/// `recv` cycle. Empty / whitespace-only inputs are rejected. No
/// claude-side validation — we just hand the string through to
/// `claude --model <name>`; an unknown model surfaces as a turn
/// failure in the live panel and the operator can revert.
async fn post_set_model(State(state): State<AppState>, Form(form): Form<ModelForm>) -> Response {
let name = form.model.trim();
if name.is_empty() {
return error_response("model: name required");
}
state.bus.set_model(name);
state.bus.emit(crate::events::LiveEvent::Note {
text: format!("operator: /model — claude model set to '{name}' for future turns"),
});
tracing::info!(%name, "operator set model");
(axum::http::StatusCode::OK, "ok").into_response()
}
async fn post_compact(State(state): State<AppState>) -> Response {
// Clone the Arc before locking so the guard's lifetime is tied to the
// clone (which we can move into the spawn) rather than to `state`.
let lock = state.turn_lock.clone();
// Reject immediately if a normal turn is in flight — concurrent access
// to the claude session is unsafe and produces garbled output.
let Ok(guard) = lock.try_lock_owned() else {
return error_response("turn in flight — wait for it to finish before compacting");
};
let bus = state.bus.clone();
let files = state.files.clone();
tokio::spawn(async move {
let _guard = guard; // keep lock alive for the duration of compaction
bus.emit(crate::events::LiveEvent::Note {
text: "operator: /compact — running on persistent session".into(),
});
bus.set_state(crate::events::TurnState::Compacting);
let r = crate::turn::compact_session(&files, &bus).await;
bus.set_state(crate::events::TurnState::Idle);
if let Err(e) = r {
bus.emit(crate::events::LiveEvent::Note {
text: format!("/compact failed: {e:#}"),
});
}
});
(axum::http::StatusCode::OK, "ok").into_response()
}
/// Cancel the in-flight claude turn. Coarse-grained: shells out
/// `pkill -INT claude` since there's at most one claude per container.
/// SIGINT (not SIGTERM) so claude flushes anything in-flight and emits a
/// final result row. Emits a Note so the operator sees the cancel
/// landed; the actual state transition back to `idle` happens when
/// `run_claude` wakes up and the harness emits `TurnEnd`.
/// Arm a one-shot: the next claude turn drops `--continue`, starting a
/// fresh session. Subsequent turns resume normal `--continue`
/// behavior. Idempotent before the next turn fires — calling twice
/// still results in a single fresh start. Useful when the
/// session-resume context is poisoned (claude went off the rails,
/// hit an unrecoverable refusal, etc.) and a full reset is cheaper
/// than asking claude to forget mid-stream.
async fn post_new_session(State(state): State<AppState>) -> Response {
state.bus.request_new_session();
state.bus.emit(crate::events::LiveEvent::Note {
text: "operator: new session armed — next turn runs without --continue".into(),
});
(axum::http::StatusCode::OK, "ok").into_response()
}
async fn post_cancel_turn(State(state): State<AppState>) -> Response {
let out = tokio::process::Command::new("pkill")
.args(["-INT", "claude"])
.output()
.await;
let note = match out {
Ok(o) if o.status.success() => "operator: /cancel — sent SIGINT to claude".to_owned(),
Ok(o) if o.status.code() == Some(1) => {
"operator: /cancel — no claude process to interrupt".to_owned()
}
Ok(o) => format!(
"operator: /cancel — pkill exited {} stderr={}",
o.status,
String::from_utf8_lossy(&o.stderr).trim()
),
Err(e) => format!("operator: /cancel — pkill failed: {e}"),
};
state.bus.emit(crate::events::LiveEvent::Note { text: note });
(axum::http::StatusCode::OK, "ok").into_response()
}
fn error_response(message: &str) -> Response {
// Plain text — JS app surfaces in `alert()`, HTML wrapping would just
// be noise.
(StatusCode::INTERNAL_SERVER_ERROR, message.to_owned()).into_response()
}