//! Per-container HTTP UI. SPA shape: `GET /` returns a static shell; //! `GET /static/*` serves CSS + JS; `GET /api/state` returns the page //! state as JSON; the JS app renders. Live events stream on //! `/events/stream`. Action POSTs (`/send`, `/login/*`) return either a //! 303 Redirect (for browsers that submit the form normally) or just //! 200 OK — the JS app re-fetches `/api/state` afterwards. use std::convert::Infallible; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; use axum::{ Form, Router, extract::State, http::StatusCode, response::{ IntoResponse, Redirect, Response, sse::{Event, KeepAlive, Sse}, }, routing::{get, post}, }; use serde::{Deserialize, Serialize}; use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream}; use crate::client; use crate::events::Bus; use crate::login::LoginState; use crate::login_session::{LoginSession, drop_if_finished}; /// Live login state for the web UI. The harness updates this in place as it /// transitions between `NeedsLogin` and `Online`; the UI reads on each /// render. pub type LoginStateCell = Arc>; #[derive(Clone)] struct AppState { label: String, login: LoginStateCell, session: Arc>>>, bus: Bus, socket: PathBuf, flavor: Flavor, } /// Which wire protocol the per-agent UI's `/send` handler should speak. /// Sub-agent → `AgentRequest::OperatorMsg`; manager → `ManagerRequest::OperatorMsg`. #[derive(Debug, Clone, Copy)] pub enum Flavor { Agent, Manager, } pub async fn serve( label: String, port: u16, login: LoginStateCell, bus: Bus, socket: PathBuf, flavor: Flavor, ) -> Result<()> { let state = AppState { label, login, session: Arc::new(Mutex::new(None)), bus, socket, flavor, }; let app = Router::new() .route("/", get(serve_index)) .route("/static/agent.css", get(serve_css)) .route("/static/app.js", get(serve_app_js)) .route("/api/state", get(api_state)) .route("/events/stream", get(events_stream)) .route("/events/history", get(events_history)) .route("/send", post(post_send)) .route("/login/start", post(post_login_start)) .route("/login/code", post(post_login_code)) .route("/login/cancel", post(post_login_cancel)) .route("/api/cancel", post(post_cancel_turn)) .route("/api/compact", post(post_compact)) .route("/api/model", post(post_set_model)) .with_state(state); let addr = SocketAddr::from(([0, 0, 0, 0], port)); let listener = bind_with_retry(addr, "web UI").await?; tracing::info!(%port, "web UI listening"); axum::serve(listener, app).await?; Ok(()) } // --------------------------------------------------------------------------- // Static assets + state snapshot // --------------------------------------------------------------------------- /// Bind a TCP listener with `SO_REUSEADDR` set, retrying on /// `AddrInUse` for up to ~20s. nspawn restarts can race the previous /// harness's socket release; `SO_REUSEADDR` lets us reclaim a port /// still in `TIME_WAIT` from a clean previous exit, and the retry /// covers the case where the previous process is genuinely still /// alive (systemd restart-delay overlap). async fn bind_with_retry(addr: SocketAddr, label: &str) -> Result { let mut delay_ms = 250u64; let mut attempts = 0u32; loop { match try_bind(addr) { Ok(l) => return Ok(l), Err(e) if e.kind() == std::io::ErrorKind::AddrInUse && attempts < 12 => { tracing::warn!( %addr, attempt = attempts + 1, "{label}: AddrInUse, retrying in {delay_ms}ms" ); tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; attempts += 1; delay_ms = (delay_ms * 2).min(2000); } Err(e) => { return Err(e).with_context(|| format!("bind {label} on {addr}")); } } } } fn try_bind(addr: SocketAddr) -> std::io::Result { let sock = match addr { SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?, SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?, }; sock.set_reuseaddr(true)?; sock.bind(addr)?; sock.listen(1024) } async fn serve_index() -> impl IntoResponse { ( [("content-type", "text/html; charset=utf-8")], include_str!("../assets/index.html"), ) } async fn serve_css() -> impl IntoResponse { ( [("content-type", "text/css")], include_str!("../assets/agent.css"), ) } async fn serve_app_js() -> impl IntoResponse { ( [("content-type", "application/javascript")], include_str!("../assets/app.js"), ) } #[derive(Serialize)] struct StateSnapshot { label: String, dashboard_port: u16, /// `"online"` | `"needs_login_idle"` | `"needs_login_in_progress"`. status: &'static str, /// Present when `status == "needs_login_in_progress"`. session: Option, /// Last N messages addressed to this agent, newest-first. Pulled /// from the broker via the per-agent socket on each render. /// Empty on transport failure. inbox: Vec, /// Authoritative turn-loop state from the harness and the unix /// timestamp the state was entered. The JS computes the age /// client-side off this rather than tracking it from SSE events. turn_state: crate::events::TurnState, turn_state_since: i64, /// Currently-active claude model name. Reflected on the page so /// the operator can see what they just switched to (and what's /// in flight). Mutable at runtime via `POST /api/model`. model: String, } #[derive(Serialize)] struct SessionView { /// First `https://…` claude emitted on stdout, if any. url: Option, /// Accumulated stdout + stderr. output: String, finished: bool, exit_note: Option, } async fn api_state(State(state): State) -> axum::Json { drop_if_finished(&state.session); let login = *state.login.lock().unwrap(); let session_snapshot = state.session.lock().unwrap().clone(); let (status, session_view) = match (login, session_snapshot) { (LoginState::Online, _) => ("online", None), (LoginState::NeedsLogin, None) => ("needs_login_idle", None), (LoginState::NeedsLogin, Some(s)) => ( "needs_login_in_progress", Some(SessionView { url: s.url(), output: s.output(), finished: s.finished(), exit_note: s.exit_note(), }), ), }; let dashboard_port = std::env::var("HIVE_DASHBOARD_PORT") .ok() .and_then(|s| s.parse::().ok()) .unwrap_or(7000); let inbox = recent_inbox(&state.socket, state.flavor).await; let (turn_state, turn_state_since) = state.bus.state_snapshot(); let model = state.bus.model(); axum::Json(StateSnapshot { label: state.label.clone(), dashboard_port, status, session: session_view, inbox, turn_state, turn_state_since, model, }) } /// Best-effort: pull the last 30 messages addressed to us via the /// per-agent / manager socket. Empty list on any transport / decode /// failure — the inbox section is decorative, not authoritative. async fn recent_inbox(socket: &std::path::Path, flavor: Flavor) -> Vec { const LIMIT: u64 = 30; match flavor { Flavor::Agent => { match client::request::<_, hive_sh4re::AgentResponse>( socket, &hive_sh4re::AgentRequest::Recent { limit: LIMIT }, ) .await { Ok(hive_sh4re::AgentResponse::Recent { rows }) => rows, _ => Vec::new(), } } Flavor::Manager => { match client::request::<_, hive_sh4re::ManagerResponse>( socket, &hive_sh4re::ManagerRequest::Recent { limit: LIMIT }, ) .await { Ok(hive_sh4re::ManagerResponse::Recent { rows }) => rows, _ => Vec::new(), } } } } // --------------------------------------------------------------------------- // Action handlers // --------------------------------------------------------------------------- #[derive(Deserialize)] struct SendForm { body: String, } async fn post_send(State(state): State, Form(form): Form) -> Response { let body = form.body.trim().to_owned(); if body.is_empty() { return error_response("send: `body` required"); } let result = match state.flavor { Flavor::Agent => match client::request::<_, hive_sh4re::AgentResponse>( &state.socket, &hive_sh4re::AgentRequest::OperatorMsg { body }, ) .await { Ok(hive_sh4re::AgentResponse::Ok) => Ok(()), Ok(hive_sh4re::AgentResponse::Err { message }) => Err(message), Ok(other) => Err(format!("unexpected response: {other:?}")), Err(e) => Err(format!("transport: {e:#}")), }, Flavor::Manager => match client::request::<_, hive_sh4re::ManagerResponse>( &state.socket, &hive_sh4re::ManagerRequest::OperatorMsg { body }, ) .await { Ok(hive_sh4re::ManagerResponse::Ok) => Ok(()), Ok(hive_sh4re::ManagerResponse::Err { message }) => Err(message), Ok(other) => Err(format!("unexpected response: {other:?}")), Err(e) => Err(format!("transport: {e:#}")), }, }; match result { Ok(()) => Redirect::to("/").into_response(), Err(e) => error_response(&format!("send failed: {e}")), } } async fn events_history( State(state): State, ) -> axum::Json> { axum::Json(state.bus.history()) } async fn events_stream( State(state): State, ) -> Sse>> { tracing::info!("sse: client subscribed"); let rx = state.bus.subscribe(); // Drop a "hello" note into the bus so every new subscriber sees at // least one event immediately and can clear the connecting placeholder. state.bus.emit(crate::events::LiveEvent::Note( "live stream attached".into(), )); let stream = BroadcastStream::new(rx).filter_map(|res| { let ev = res.ok()?; let json = serde_json::to_string(&ev).ok()?; Some(Ok(Event::default().data(json))) }); Sse::new(stream).keep_alive(KeepAlive::default()) } async fn post_login_start(State(state): State) -> Response { drop_if_finished(&state.session); { let guard = state.session.lock().unwrap(); if guard.is_some() { return Redirect::to("/").into_response(); } } match LoginSession::start() { Ok(session) => { *state.session.lock().unwrap() = Some(Arc::new(session)); Redirect::to("/").into_response() } Err(e) => error_response(&format!("login start failed: {e:#}")), } } #[derive(Deserialize)] struct CodeForm { code: String, } async fn post_login_code(State(state): State, Form(form): Form) -> Response { let session = state.session.lock().unwrap().clone(); let Some(session) = session else { return error_response("no login session running"); }; if let Err(e) = session.submit_code(&form.code).await { return error_response(&format!("submit code failed: {e:#}")); } Redirect::to("/").into_response() } async fn post_login_cancel(State(state): State) -> Response { let session = state.session.lock().unwrap().take(); if let Some(session) = session { session.close_stdin().await; session.kill(); } Redirect::to("/").into_response() } /// Operator-initiated session compaction. Spawns `turn::compact_session` /// in the background — the HTTP handler returns immediately so the /// async-form spinner can clear. Output (claude's compaction stream, /// the "/compact done" note) lands in the live event panel like any /// other turn. If a regular turn is in flight, claude's own session /// lock will reject this one and we surface the error as a Note. #[derive(Deserialize)] struct ModelForm { model: String, } /// Switch the model for future turns. The current turn (if any) /// keeps its model; `/model ` applies starting with the next /// `recv` cycle. Empty / whitespace-only inputs are rejected. No /// claude-side validation — we just hand the string through to /// `claude --model `; an unknown model surfaces as a turn /// failure in the live panel and the operator can revert. async fn post_set_model(State(state): State, Form(form): Form) -> Response { let name = form.model.trim(); if name.is_empty() { return error_response("model: name required"); } state.bus.set_model(name); state.bus.emit(crate::events::LiveEvent::Note(format!( "operator: /model — claude model set to '{name}' for future turns" ))); tracing::info!(%name, "operator set model"); Redirect::to("/").into_response() } async fn post_compact(State(state): State) -> Response { let bus = state.bus.clone(); let socket = state.socket.clone(); tokio::spawn(async move { bus.emit(crate::events::LiveEvent::Note( "operator: /compact — running on persistent session".into(), )); let settings = match crate::turn::write_settings(&socket).await { Ok(p) => p, Err(e) => { bus.emit(crate::events::LiveEvent::Note(format!( "/compact failed: settings write — {e:#}" ))); return; } }; bus.set_state(crate::events::TurnState::Compacting); let r = crate::turn::compact_session(&settings, &bus).await; bus.set_state(crate::events::TurnState::Idle); if let Err(e) = r { bus.emit(crate::events::LiveEvent::Note(format!( "/compact failed: {e:#}" ))); } }); Redirect::to("/").into_response() } /// Cancel the in-flight claude turn. Coarse-grained: shells out /// `pkill -INT claude` since there's at most one claude per container. /// SIGINT (not SIGTERM) so claude flushes anything in-flight and emits a /// final result row. Emits a Note so the operator sees the cancel /// landed; the actual state transition back to `idle` happens when /// `run_claude` wakes up and the harness emits `TurnEnd`. async fn post_cancel_turn(State(state): State) -> Response { let out = tokio::process::Command::new("pkill") .args(["-INT", "claude"]) .output() .await; let note = match out { Ok(o) if o.status.success() => "operator: /cancel — sent SIGINT to claude".to_owned(), Ok(o) if o.status.code() == Some(1) => { "operator: /cancel — no claude process to interrupt".to_owned() } Ok(o) => format!( "operator: /cancel — pkill exited {} stderr={}", o.status, String::from_utf8_lossy(&o.stderr).trim() ), Err(e) => format!("operator: /cancel — pkill failed: {e}"), }; state.bus.emit(crate::events::LiveEvent::Note(note)); Redirect::to("/").into_response() } fn error_response(message: &str) -> Response { // Plain text — JS app surfaces in `alert()`, HTML wrapping would just // be noise. (StatusCode::INTERNAL_SERVER_ERROR, message.to_owned()).into_response() }