Add fetch_reminder_stats() helper to query ReminderRollup from broker, and update api_stats endpoint to include reminder stats in snapshot. Reminder activity metrics (scheduled, delivered, pending) are now available to the stats page UI for display.
671 lines
25 KiB
Rust
671 lines
25 KiB
Rust
//! Per-container HTTP UI. SPA shape: `GET /` returns a static shell;
|
|
//! `GET /static/*` serves CSS + JS; `GET /api/state` returns the page
|
|
//! state as JSON; the JS app renders. Live events stream on
|
|
//! `/events/stream`. Action POSTs (`/send`, `/login/*`) return either a
|
|
//! 303 Redirect (for browsers that submit the form normally) or just
|
|
//! 200 OK — the JS app re-fetches `/api/state` afterwards.
|
|
|
|
use std::convert::Infallible;
|
|
use std::net::SocketAddr;
|
|
use std::path::PathBuf;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use anyhow::{Context, Result};
|
|
use axum::{
|
|
Form, Router,
|
|
extract::State,
|
|
http::StatusCode,
|
|
response::{
|
|
IntoResponse, Response,
|
|
sse::{Event, KeepAlive, Sse},
|
|
},
|
|
routing::{get, post},
|
|
};
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream};
|
|
|
|
use crate::client;
|
|
use crate::events::Bus;
|
|
use crate::login::LoginState;
|
|
use crate::login_session::{LoginSession, drop_if_finished};
|
|
use crate::mcp;
|
|
use crate::turn::TurnFiles;
|
|
|
|
/// Live login state for the web UI. The harness updates this in place as it
|
|
/// transitions between `NeedsLogin` and `Online`; the UI reads on each
|
|
/// render.
|
|
pub type LoginStateCell = Arc<Mutex<LoginState>>;
|
|
|
|
/// Shared turn lock. The serve loop acquires this (as an async mutex) for the
|
|
/// duration of every `drive_turn` call. The `/api/compact` handler tries
|
|
/// `try_lock()` and rejects immediately if a turn is in flight, preventing
|
|
/// concurrent access to the claude session.
|
|
pub type TurnLock = Arc<tokio::sync::Mutex<()>>;
|
|
|
|
#[derive(Clone)]
|
|
struct AppState {
|
|
label: String,
|
|
login: LoginStateCell,
|
|
session: Arc<Mutex<Option<Arc<LoginSession>>>>,
|
|
bus: Bus,
|
|
socket: PathBuf,
|
|
/// Same `TurnFiles` the harness's turn loop uses. Shared so
|
|
/// `/api/compact` re-uses the exact MCP config / system prompt /
|
|
/// settings claude saw on the last regular turn — keeps the
|
|
/// session shape identical across compact + normal turns.
|
|
files: TurnFiles,
|
|
/// Prevents `/api/compact` from racing with an in-flight normal turn.
|
|
turn_lock: TurnLock,
|
|
}
|
|
|
|
impl AppState {
|
|
fn flavor(&self) -> Flavor {
|
|
self.files.flavor
|
|
}
|
|
}
|
|
|
|
/// Which wire protocol the per-agent UI's `/send` handler should speak.
|
|
/// Sub-agent → `AgentRequest::OperatorMsg`; manager →
|
|
/// `ManagerRequest::OperatorMsg`. Reuses the MCP-side enum so a
|
|
/// single value drives both the send protocol and (in
|
|
/// `post_compact`) the allowed-tools surface claude sees.
|
|
pub type Flavor = mcp::Flavor;
|
|
|
|
pub async fn serve(
|
|
label: String,
|
|
port: u16,
|
|
login: LoginStateCell,
|
|
bus: Bus,
|
|
socket: PathBuf,
|
|
files: TurnFiles,
|
|
turn_lock: TurnLock,
|
|
) -> Result<()> {
|
|
let state = AppState {
|
|
label,
|
|
login,
|
|
session: Arc::new(Mutex::new(None)),
|
|
bus,
|
|
socket,
|
|
files,
|
|
turn_lock,
|
|
};
|
|
let app = Router::new()
|
|
.route("/", get(serve_index))
|
|
.route("/static/agent.css", get(serve_css))
|
|
.route("/static/app.js", get(serve_app_js))
|
|
.route("/static/hive-fr0nt.js", get(serve_shared_js))
|
|
.route("/static/marked.js", get(serve_marked_js))
|
|
.route("/api/state", get(api_state))
|
|
.route("/events/stream", get(events_stream))
|
|
.route("/events/history", get(events_history))
|
|
.route("/send", post(post_send))
|
|
.route("/login/start", post(post_login_start))
|
|
.route("/login/code", post(post_login_code))
|
|
.route("/login/cancel", post(post_login_cancel))
|
|
.route("/api/cancel", post(post_cancel_turn))
|
|
.route("/api/compact", post(post_compact))
|
|
.route("/api/model", post(post_set_model))
|
|
.route("/api/new-session", post(post_new_session))
|
|
.route("/api/loose-ends", get(api_loose_ends))
|
|
.route("/stats", get(serve_stats))
|
|
.route("/static/stats.js", get(serve_stats_js))
|
|
.route("/api/stats", get(api_stats))
|
|
.with_state(state);
|
|
let addr = SocketAddr::from(([0, 0, 0, 0], port));
|
|
let listener = bind_with_retry(addr, "web UI").await?;
|
|
tracing::info!(%port, "web UI listening");
|
|
axum::serve(listener, app).await?;
|
|
Ok(())
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Static assets + state snapshot
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// Bind a TCP listener with `SO_REUSEADDR` set, retrying on
|
|
/// `AddrInUse` for up to ~20s. nspawn restarts can race the previous
|
|
/// harness's socket release; `SO_REUSEADDR` lets us reclaim a port
|
|
/// still in `TIME_WAIT` from a clean previous exit, and the retry
|
|
/// covers the case where the previous process is genuinely still
|
|
/// alive (systemd restart-delay overlap).
|
|
async fn bind_with_retry(addr: SocketAddr, label: &str) -> Result<tokio::net::TcpListener> {
|
|
let mut delay_ms = 250u64;
|
|
let mut attempts = 0u32;
|
|
loop {
|
|
match try_bind(addr) {
|
|
Ok(l) => return Ok(l),
|
|
Err(e) if e.kind() == std::io::ErrorKind::AddrInUse && attempts < 12 => {
|
|
tracing::warn!(
|
|
%addr, attempt = attempts + 1,
|
|
"{label}: AddrInUse, retrying in {delay_ms}ms"
|
|
);
|
|
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
|
|
attempts += 1;
|
|
delay_ms = (delay_ms * 2).min(2000);
|
|
}
|
|
Err(e) => {
|
|
return Err(e).with_context(|| format!("bind {label} on {addr}"));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn try_bind(addr: SocketAddr) -> std::io::Result<tokio::net::TcpListener> {
|
|
let sock = match addr {
|
|
SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
|
|
SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
|
|
};
|
|
sock.set_reuseaddr(true)?;
|
|
sock.bind(addr)?;
|
|
sock.listen(1024)
|
|
}
|
|
|
|
async fn serve_index() -> impl IntoResponse {
|
|
(
|
|
[("content-type", "text/html; charset=utf-8")],
|
|
include_str!("../assets/index.html"),
|
|
)
|
|
}
|
|
|
|
async fn serve_css() -> impl IntoResponse {
|
|
// Prepend the shared palette/typography so per-page styles only need
|
|
// to declare what's actually page-specific. One HTTP request, no
|
|
// per-asset cache to invalidate.
|
|
let body = format!(
|
|
"{}\n{}\n{}",
|
|
hive_fr0nt::BASE_CSS,
|
|
hive_fr0nt::TERMINAL_CSS,
|
|
include_str!("../assets/agent.css"),
|
|
);
|
|
([("content-type", "text/css")], body)
|
|
}
|
|
|
|
async fn serve_app_js() -> impl IntoResponse {
|
|
(
|
|
[("content-type", "application/javascript")],
|
|
include_str!("../assets/app.js"),
|
|
)
|
|
}
|
|
|
|
async fn serve_shared_js() -> impl IntoResponse {
|
|
(
|
|
[("content-type", "application/javascript")],
|
|
hive_fr0nt::TERMINAL_JS,
|
|
)
|
|
}
|
|
|
|
async fn serve_marked_js() -> impl IntoResponse {
|
|
(
|
|
[("content-type", "application/javascript")],
|
|
hive_fr0nt::MARKED_JS,
|
|
)
|
|
}
|
|
|
|
async fn serve_stats() -> impl IntoResponse {
|
|
(
|
|
[("content-type", "text/html; charset=utf-8")],
|
|
include_str!("../assets/stats.html"),
|
|
)
|
|
}
|
|
|
|
async fn serve_stats_js() -> impl IntoResponse {
|
|
(
|
|
[("content-type", "application/javascript")],
|
|
include_str!("../assets/stats.js"),
|
|
)
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct StatsQuery {
|
|
window: Option<String>,
|
|
}
|
|
|
|
async fn api_stats(
|
|
State(state): State<AppState>,
|
|
axum::extract::Query(q): axum::extract::Query<StatsQuery>,
|
|
) -> axum::Json<crate::stats::Snapshot> {
|
|
let window = crate::stats::Window::parse(q.window.as_deref().unwrap_or("24h"));
|
|
let mut snapshot = crate::stats::snapshot_default(window);
|
|
// Fetch reminder stats from the broker. The window spans are:
|
|
// 24h = 86400s, 7d = 604800s, 30d = 2592000s.
|
|
let window_secs = match window {
|
|
crate::stats::Window::Day => 24 * 3600,
|
|
crate::stats::Window::Week => 7 * 24 * 3600,
|
|
crate::stats::Window::Month => 30 * 24 * 3600,
|
|
};
|
|
snapshot.reminder_stats = fetch_reminder_stats(&state.socket, state.flavor(), window_secs as u64).await;
|
|
axum::Json(snapshot)
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct StateSnapshot {
|
|
/// Bus seq at the moment this snapshot was assembled. Clients dedupe
|
|
/// their buffered SSE traffic against this value: events with
|
|
/// `seq <= snapshot.seq` are already reflected (or pre-date the
|
|
/// snapshot); `seq > snapshot.seq` is post-snapshot. Reset to 0 on
|
|
/// harness restart — clients treat reconnect as a fresh world.
|
|
seq: u64,
|
|
label: String,
|
|
dashboard_port: u16,
|
|
/// `"online"` | `"needs_login_idle"` | `"needs_login_in_progress"`.
|
|
status: &'static str,
|
|
/// Present when `status == "needs_login_in_progress"`.
|
|
session: Option<SessionView>,
|
|
/// Last N messages addressed to this agent, newest-first. Pulled
|
|
/// from the broker via the per-agent socket on each render.
|
|
/// Empty on transport failure.
|
|
inbox: Vec<hive_sh4re::InboxRow>,
|
|
/// Authoritative turn-loop state from the harness and the unix
|
|
/// timestamp the state was entered. The JS computes the age
|
|
/// client-side off this rather than tracking it from SSE events.
|
|
turn_state: crate::events::TurnState,
|
|
turn_state_since: i64,
|
|
/// Currently-active claude model name. Reflected on the page so
|
|
/// the operator can see what they just switched to (and what's
|
|
/// in flight). Mutable at runtime via `POST /api/model`.
|
|
model: String,
|
|
/// Last-inference token usage from the most recent completed
|
|
/// turn — represents the current context-window size at turn-end.
|
|
/// `null` until the first turn finishes.
|
|
ctx_usage: Option<crate::events::TokenUsage>,
|
|
/// Cumulative token usage across the most recent turn's inferences
|
|
/// (cost signal). `null` until the first turn finishes.
|
|
cost_usage: Option<crate::events::TokenUsage>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct SessionView {
|
|
/// First `https://…` claude emitted on stdout, if any.
|
|
url: Option<String>,
|
|
/// Accumulated stdout + stderr.
|
|
output: String,
|
|
finished: bool,
|
|
exit_note: Option<String>,
|
|
}
|
|
|
|
/// Proxy this agent's loose-ends list via the per-agent socket. The
|
|
/// web UI surfaces the result as a collapsible section in the page
|
|
/// so the operator can see at a glance what's pending against the
|
|
/// agent (questions asked by it, peer questions targeting it,
|
|
/// reminders it scheduled, approvals for the manager). Same data
|
|
/// the `mcp__hyperhive__get_loose_ends` tool sees from inside the
|
|
/// container.
|
|
async fn api_loose_ends(State(state): State<AppState>) -> Response {
|
|
let loose_ends: Vec<hive_sh4re::LooseEnd> = match state.flavor() {
|
|
Flavor::Agent => {
|
|
match client::request::<_, hive_sh4re::AgentResponse>(
|
|
&state.socket,
|
|
&hive_sh4re::AgentRequest::GetLooseEnds,
|
|
)
|
|
.await
|
|
{
|
|
Ok(hive_sh4re::AgentResponse::LooseEnds { loose_ends }) => loose_ends,
|
|
Ok(hive_sh4re::AgentResponse::Err { message }) => {
|
|
return error_response(&format!("get_loose_ends: {message}"));
|
|
}
|
|
Ok(other) => return error_response(&format!("unexpected response: {other:?}")),
|
|
Err(e) => return error_response(&format!("transport: {e:#}")),
|
|
}
|
|
}
|
|
Flavor::Manager => {
|
|
match client::request::<_, hive_sh4re::ManagerResponse>(
|
|
&state.socket,
|
|
&hive_sh4re::ManagerRequest::GetLooseEnds,
|
|
)
|
|
.await
|
|
{
|
|
Ok(hive_sh4re::ManagerResponse::LooseEnds { loose_ends }) => loose_ends,
|
|
Ok(hive_sh4re::ManagerResponse::Err { message }) => {
|
|
return error_response(&format!("get_loose_ends: {message}"));
|
|
}
|
|
Ok(other) => return error_response(&format!("unexpected response: {other:?}")),
|
|
Err(e) => return error_response(&format!("transport: {e:#}")),
|
|
}
|
|
}
|
|
};
|
|
axum::Json(serde_json::json!({ "loose_ends": loose_ends })).into_response()
|
|
}
|
|
|
|
async fn api_state(State(state): State<AppState>) -> axum::Json<StateSnapshot> {
|
|
// Capture seq *before* any reads so the dedupe contract is
|
|
// "events with seq > snapshot.seq are post-snapshot, never missed."
|
|
let seq = state.bus.current_seq();
|
|
drop_if_finished(&state.session);
|
|
let login = *state.login.lock().unwrap();
|
|
let session_snapshot = state.session.lock().unwrap().clone();
|
|
let (status, session_view) = match (login, session_snapshot) {
|
|
(LoginState::Online, _) => ("online", None),
|
|
(LoginState::NeedsLogin, None) => ("needs_login_idle", None),
|
|
(LoginState::NeedsLogin, Some(s)) => (
|
|
"needs_login_in_progress",
|
|
Some(SessionView {
|
|
url: s.url(),
|
|
output: s.output(),
|
|
finished: s.finished(),
|
|
exit_note: s.exit_note(),
|
|
}),
|
|
),
|
|
};
|
|
let dashboard_port = std::env::var("HIVE_DASHBOARD_PORT")
|
|
.ok()
|
|
.and_then(|s| s.parse::<u16>().ok())
|
|
.unwrap_or(7000);
|
|
let inbox = recent_inbox(&state.socket, state.flavor()).await;
|
|
let (turn_state, turn_state_since) = state.bus.state_snapshot();
|
|
let model = state.bus.model();
|
|
let ctx_usage = state.bus.last_ctx_usage();
|
|
let cost_usage = state.bus.last_cost_usage();
|
|
axum::Json(StateSnapshot {
|
|
seq,
|
|
label: state.label.clone(),
|
|
dashboard_port,
|
|
status,
|
|
session: session_view,
|
|
inbox,
|
|
turn_state,
|
|
turn_state_since,
|
|
model,
|
|
ctx_usage,
|
|
cost_usage,
|
|
})
|
|
}
|
|
|
|
/// Best-effort: pull the last 30 messages addressed to us via the
|
|
/// per-agent / manager socket. Empty list on any transport / decode
|
|
/// failure — the inbox section is decorative, not authoritative.
|
|
async fn recent_inbox(socket: &std::path::Path, flavor: Flavor) -> Vec<hive_sh4re::InboxRow> {
|
|
const LIMIT: u64 = 30;
|
|
match flavor {
|
|
Flavor::Agent => {
|
|
match client::request::<_, hive_sh4re::AgentResponse>(
|
|
socket,
|
|
&hive_sh4re::AgentRequest::Recent { limit: LIMIT },
|
|
)
|
|
.await
|
|
{
|
|
Ok(hive_sh4re::AgentResponse::Recent { rows }) => rows,
|
|
_ => Vec::new(),
|
|
}
|
|
}
|
|
Flavor::Manager => {
|
|
match client::request::<_, hive_sh4re::ManagerResponse>(
|
|
socket,
|
|
&hive_sh4re::ManagerRequest::Recent { limit: LIMIT },
|
|
)
|
|
.await
|
|
{
|
|
Ok(hive_sh4re::ManagerResponse::Recent { rows }) => rows,
|
|
_ => Vec::new(),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Fetch reminder activity stats from the broker via the per-agent /
|
|
/// manager socket. Returns None on any transport / decode failure — the
|
|
/// stats are decorative, not authoritative.
|
|
async fn fetch_reminder_stats(socket: &std::path::Path, flavor: Flavor, window_secs: u64) -> Option<hive_sh4re::ReminderStats> {
|
|
match flavor {
|
|
Flavor::Agent => {
|
|
match client::request::<_, hive_sh4re::AgentResponse>(
|
|
socket,
|
|
&hive_sh4re::AgentRequest::ReminderRollup {
|
|
since_secs: window_secs,
|
|
},
|
|
)
|
|
.await
|
|
{
|
|
Ok(hive_sh4re::AgentResponse::ReminderRollup(stats)) => Some(stats),
|
|
_ => None,
|
|
}
|
|
}
|
|
Flavor::Manager => {
|
|
match client::request::<_, hive_sh4re::ManagerResponse>(
|
|
socket,
|
|
&hive_sh4re::ManagerRequest::ReminderRollup {
|
|
since_secs: window_secs,
|
|
},
|
|
)
|
|
.await
|
|
{
|
|
Ok(hive_sh4re::ManagerResponse::ReminderRollup(stats)) => Some(stats),
|
|
_ => None,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Action handlers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[derive(Deserialize)]
|
|
struct SendForm {
|
|
body: String,
|
|
}
|
|
|
|
async fn post_send(State(state): State<AppState>, Form(form): Form<SendForm>) -> Response {
|
|
let body = form.body.trim().to_owned();
|
|
if body.is_empty() {
|
|
return error_response("send: `body` required");
|
|
}
|
|
let result = match state.flavor() {
|
|
Flavor::Agent => match client::request::<_, hive_sh4re::AgentResponse>(
|
|
&state.socket,
|
|
&hive_sh4re::AgentRequest::OperatorMsg { body },
|
|
)
|
|
.await
|
|
{
|
|
Ok(hive_sh4re::AgentResponse::Ok) => Ok(()),
|
|
Ok(hive_sh4re::AgentResponse::Err { message }) => Err(message),
|
|
Ok(other) => Err(format!("unexpected response: {other:?}")),
|
|
Err(e) => Err(format!("transport: {e:#}")),
|
|
},
|
|
Flavor::Manager => match client::request::<_, hive_sh4re::ManagerResponse>(
|
|
&state.socket,
|
|
&hive_sh4re::ManagerRequest::OperatorMsg { body },
|
|
)
|
|
.await
|
|
{
|
|
Ok(hive_sh4re::ManagerResponse::Ok) => Ok(()),
|
|
Ok(hive_sh4re::ManagerResponse::Err { message }) => Err(message),
|
|
Ok(other) => Err(format!("unexpected response: {other:?}")),
|
|
Err(e) => Err(format!("transport: {e:#}")),
|
|
},
|
|
};
|
|
match result {
|
|
// 200 instead of 303 → the client doesn't refetch /api/state.
|
|
// The operator message becomes a broker `Sent` (already shown
|
|
// server-side in the dashboard); on the agent side, the
|
|
// resulting `TurnStart` SSE event drives the terminal + the
|
|
// inbox row gets consumed by the time `TurnEnd` fires the
|
|
// existing turn-end refresh.
|
|
Ok(()) => (axum::http::StatusCode::OK, "ok").into_response(),
|
|
Err(e) => error_response(&format!("send failed: {e}")),
|
|
}
|
|
}
|
|
|
|
async fn events_history(State(state): State<AppState>) -> axum::Json<serde_json::Value> {
|
|
// Capture seq *before* the read so dedupe is "drop buffered events
|
|
// you've already seen in history", never "lose an event that fired
|
|
// between the read and the timestamp." Historical rows have no
|
|
// per-row seq; only the high-water mark matters for the dedupe
|
|
// window.
|
|
let seq = state.bus.current_seq();
|
|
let events = state.bus.history();
|
|
axum::Json(serde_json::json!({ "seq": seq, "events": events }))
|
|
}
|
|
|
|
async fn events_stream(
|
|
State(state): State<AppState>,
|
|
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
|
|
tracing::info!("sse: client subscribed");
|
|
let rx = state.bus.subscribe();
|
|
// Drop a "hello" note into the bus so every new subscriber sees at
|
|
// least one event immediately and can clear the connecting placeholder.
|
|
state.bus.emit(crate::events::LiveEvent::Note {
|
|
text: "live stream attached".into(),
|
|
});
|
|
let stream = BroadcastStream::new(rx).filter_map(|res| {
|
|
let ev = res.ok()?;
|
|
let json = serde_json::to_string(&ev).ok()?;
|
|
Some(Ok(Event::default().data(json)))
|
|
});
|
|
Sse::new(stream).keep_alive(KeepAlive::default())
|
|
}
|
|
|
|
async fn post_login_start(State(state): State<AppState>) -> Response {
|
|
drop_if_finished(&state.session);
|
|
{
|
|
let guard = state.session.lock().unwrap();
|
|
if guard.is_some() {
|
|
return (axum::http::StatusCode::OK, "ok").into_response();
|
|
}
|
|
}
|
|
match LoginSession::start() {
|
|
Ok(session) => {
|
|
*state.session.lock().unwrap() = Some(Arc::new(session));
|
|
// Flip status from needs_login_idle → needs_login_in_progress
|
|
// so the web UI's badge + polling kick in (polling is still
|
|
// the right tool for the streaming session output during
|
|
// the login flow itself; events drop the poll for
|
|
// *everything else*).
|
|
state.bus.emit_status("needs_login_in_progress");
|
|
(axum::http::StatusCode::OK, "ok").into_response()
|
|
}
|
|
Err(e) => error_response(&format!("login start failed: {e:#}")),
|
|
}
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct CodeForm {
|
|
code: String,
|
|
}
|
|
|
|
async fn post_login_code(State(state): State<AppState>, Form(form): Form<CodeForm>) -> Response {
|
|
let session = state.session.lock().unwrap().clone();
|
|
let Some(session) = session else {
|
|
return error_response("no login session running");
|
|
};
|
|
if let Err(e) = session.submit_code(&form.code).await {
|
|
return error_response(&format!("submit code failed: {e:#}"));
|
|
}
|
|
(axum::http::StatusCode::OK, "ok").into_response()
|
|
}
|
|
|
|
async fn post_login_cancel(State(state): State<AppState>) -> Response {
|
|
let session = state.session.lock().unwrap().take();
|
|
if let Some(session) = session {
|
|
session.close_stdin().await;
|
|
session.kill();
|
|
}
|
|
// Back to needs_login_idle (LoginState unchanged, session gone).
|
|
state.bus.emit_status("needs_login_idle");
|
|
(axum::http::StatusCode::OK, "ok").into_response()
|
|
}
|
|
|
|
/// Operator-initiated session compaction. Spawns `turn::compact_session`
|
|
/// in the background — the HTTP handler returns immediately so the
|
|
/// async-form spinner can clear. Output (claude's compaction stream,
|
|
/// the "/compact done" note) lands in the live event panel like any
|
|
/// other turn. If a regular turn is in flight, claude's own session
|
|
/// lock will reject this one and we surface the error as a Note.
|
|
#[derive(Deserialize)]
|
|
struct ModelForm {
|
|
model: String,
|
|
}
|
|
|
|
/// Switch the model for future turns. The current turn (if any)
|
|
/// keeps its model; `/model <name>` applies starting with the next
|
|
/// `recv` cycle. Empty / whitespace-only inputs are rejected. No
|
|
/// claude-side validation — we just hand the string through to
|
|
/// `claude --model <name>`; an unknown model surfaces as a turn
|
|
/// failure in the live panel and the operator can revert.
|
|
async fn post_set_model(State(state): State<AppState>, Form(form): Form<ModelForm>) -> Response {
|
|
let name = form.model.trim();
|
|
if name.is_empty() {
|
|
return error_response("model: name required");
|
|
}
|
|
state.bus.set_model(name);
|
|
state.bus.emit(crate::events::LiveEvent::Note {
|
|
text: format!("operator: /model — claude model set to '{name}' for future turns"),
|
|
});
|
|
tracing::info!(%name, "operator set model");
|
|
(axum::http::StatusCode::OK, "ok").into_response()
|
|
}
|
|
|
|
async fn post_compact(State(state): State<AppState>) -> Response {
|
|
// Clone the Arc before locking so the guard's lifetime is tied to the
|
|
// clone (which we can move into the spawn) rather than to `state`.
|
|
let lock = state.turn_lock.clone();
|
|
// Reject immediately if a normal turn is in flight — concurrent access
|
|
// to the claude session is unsafe and produces garbled output.
|
|
let Ok(guard) = lock.try_lock_owned() else {
|
|
return error_response("turn in flight — wait for it to finish before compacting");
|
|
};
|
|
let bus = state.bus.clone();
|
|
let files = state.files.clone();
|
|
tokio::spawn(async move {
|
|
let _guard = guard; // keep lock alive for the duration of compaction
|
|
bus.emit(crate::events::LiveEvent::Note {
|
|
text: "operator: /compact — running on persistent session".into(),
|
|
});
|
|
bus.set_state(crate::events::TurnState::Compacting);
|
|
let r = crate::turn::compact_session(&files, &bus).await;
|
|
bus.set_state(crate::events::TurnState::Idle);
|
|
if let Err(e) = r {
|
|
bus.emit(crate::events::LiveEvent::Note {
|
|
text: format!("/compact failed: {e:#}"),
|
|
});
|
|
}
|
|
});
|
|
(axum::http::StatusCode::OK, "ok").into_response()
|
|
}
|
|
|
|
/// Cancel the in-flight claude turn. Coarse-grained: shells out
|
|
/// `pkill -INT claude` since there's at most one claude per container.
|
|
/// SIGINT (not SIGTERM) so claude flushes anything in-flight and emits a
|
|
/// final result row. Emits a Note so the operator sees the cancel
|
|
/// landed; the actual state transition back to `idle` happens when
|
|
/// `run_claude` wakes up and the harness emits `TurnEnd`.
|
|
/// Arm a one-shot: the next claude turn drops `--continue`, starting a
|
|
/// fresh session. Subsequent turns resume normal `--continue`
|
|
/// behavior. Idempotent before the next turn fires — calling twice
|
|
/// still results in a single fresh start. Useful when the
|
|
/// session-resume context is poisoned (claude went off the rails,
|
|
/// hit an unrecoverable refusal, etc.) and a full reset is cheaper
|
|
/// than asking claude to forget mid-stream.
|
|
async fn post_new_session(State(state): State<AppState>) -> Response {
|
|
state.bus.request_new_session();
|
|
state.bus.emit(crate::events::LiveEvent::Note {
|
|
text: "operator: new session armed — next turn runs without --continue".into(),
|
|
});
|
|
(axum::http::StatusCode::OK, "ok").into_response()
|
|
}
|
|
|
|
async fn post_cancel_turn(State(state): State<AppState>) -> Response {
|
|
let out = tokio::process::Command::new("pkill")
|
|
.args(["-INT", "claude"])
|
|
.output()
|
|
.await;
|
|
let note = match out {
|
|
Ok(o) if o.status.success() => "operator: /cancel — sent SIGINT to claude".to_owned(),
|
|
Ok(o) if o.status.code() == Some(1) => {
|
|
"operator: /cancel — no claude process to interrupt".to_owned()
|
|
}
|
|
Ok(o) => format!(
|
|
"operator: /cancel — pkill exited {} stderr={}",
|
|
o.status,
|
|
String::from_utf8_lossy(&o.stderr).trim()
|
|
),
|
|
Err(e) => format!("operator: /cancel — pkill failed: {e}"),
|
|
};
|
|
state.bus.emit(crate::events::LiveEvent::Note { text: note });
|
|
(axum::http::StatusCode::OK, "ok").into_response()
|
|
}
|
|
|
|
fn error_response(message: &str) -> Response {
|
|
// Plain text — JS app surfaces in `alert()`, HTML wrapping would just
|
|
// be noise.
|
|
(StatusCode::INTERNAL_SERVER_ERROR, message.to_owned()).into_response()
|
|
}
|