hyperhive/hive-ag3nt/src/web_ui.rs
müde 538e0446d7 agent page: inbox view of last 30 messages addressed to this agent
new wire request AgentRequest::Recent { limit } / ManagerRequest::Recent
(plus matching responses with Vec<InboxRow>). InboxRow moved to
hive-sh4re so it lives on both surfaces without an internal-to-wire
conversion. host-side dispatch in agent_server / manager_server
calls broker.recent_for(name, limit).

per-agent web_ui /api/state grew an inbox: Vec<InboxRow> populated
via the same per-agent socket (best-effort; transport failure
returns empty). frontend renders as a collapsible <details> section
between the state row and the terminal — fmt timestamp / from /
body in a tight grid, capped at 16em scrollable. only visible when
there are rows.
2026-05-15 20:32:19 +02:00

377 lines
13 KiB
Rust

//! Per-container HTTP UI. SPA shape: `GET /` returns a static shell;
//! `GET /static/*` serves CSS + JS; `GET /api/state` returns the page
//! state as JSON; the JS app renders. Live events stream on
//! `/events/stream`. Action POSTs (`/send`, `/login/*`) return either a
//! 303 Redirect (for browsers that submit the form normally) or just
//! 200 OK — the JS app re-fetches `/api/state` afterwards.
use std::convert::Infallible;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use anyhow::{Context, Result};
use axum::{
Form, Router,
extract::State,
http::StatusCode,
response::{
IntoResponse, Redirect, Response,
sse::{Event, KeepAlive, Sse},
},
routing::{get, post},
};
use serde::{Deserialize, Serialize};
use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream};
use crate::client;
use crate::events::Bus;
use crate::login::LoginState;
use crate::login_session::{LoginSession, drop_if_finished};
/// Live login state for the web UI. The harness updates this in place as it
/// transitions between `NeedsLogin` and `Online`; the UI reads on each
/// render.
pub type LoginStateCell = Arc<Mutex<LoginState>>;
#[derive(Clone)]
struct AppState {
label: String,
login: LoginStateCell,
session: Arc<Mutex<Option<Arc<LoginSession>>>>,
bus: Bus,
socket: PathBuf,
flavor: Flavor,
}
/// Which wire protocol the per-agent UI's `/send` handler should speak.
/// Sub-agent → `AgentRequest::OperatorMsg`; manager → `ManagerRequest::OperatorMsg`.
#[derive(Debug, Clone, Copy)]
pub enum Flavor {
Agent,
Manager,
}
pub async fn serve(
label: String,
port: u16,
login: LoginStateCell,
bus: Bus,
socket: PathBuf,
flavor: Flavor,
) -> Result<()> {
let state = AppState {
label,
login,
session: Arc::new(Mutex::new(None)),
bus,
socket,
flavor,
};
let app = Router::new()
.route("/", get(serve_index))
.route("/static/agent.css", get(serve_css))
.route("/static/app.js", get(serve_app_js))
.route("/api/state", get(api_state))
.route("/events/stream", get(events_stream))
.route("/events/history", get(events_history))
.route("/send", post(post_send))
.route("/login/start", post(post_login_start))
.route("/login/code", post(post_login_code))
.route("/login/cancel", post(post_login_cancel))
.route("/api/cancel", post(post_cancel_turn))
.route("/api/compact", post(post_compact))
.with_state(state);
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = tokio::net::TcpListener::bind(addr)
.await
.with_context(|| format!("bind web UI on port {port}"))?;
tracing::info!(%port, "web UI listening");
axum::serve(listener, app).await?;
Ok(())
}
// ---------------------------------------------------------------------------
// Static assets + state snapshot
// ---------------------------------------------------------------------------
async fn serve_index() -> impl IntoResponse {
(
[("content-type", "text/html; charset=utf-8")],
include_str!("../assets/index.html"),
)
}
async fn serve_css() -> impl IntoResponse {
(
[("content-type", "text/css")],
include_str!("../assets/agent.css"),
)
}
async fn serve_app_js() -> impl IntoResponse {
(
[("content-type", "application/javascript")],
include_str!("../assets/app.js"),
)
}
#[derive(Serialize)]
struct StateSnapshot {
label: String,
dashboard_port: u16,
/// `"online"` | `"needs_login_idle"` | `"needs_login_in_progress"`.
status: &'static str,
/// Present when `status == "needs_login_in_progress"`.
session: Option<SessionView>,
/// Last N messages addressed to this agent, newest-first. Pulled
/// from the broker via the per-agent socket on each render.
/// Empty on transport failure.
inbox: Vec<hive_sh4re::InboxRow>,
}
#[derive(Serialize)]
struct SessionView {
/// First `https://…` claude emitted on stdout, if any.
url: Option<String>,
/// Accumulated stdout + stderr.
output: String,
finished: bool,
exit_note: Option<String>,
}
async fn api_state(State(state): State<AppState>) -> axum::Json<StateSnapshot> {
drop_if_finished(&state.session);
let login = *state.login.lock().unwrap();
let session_snapshot = state.session.lock().unwrap().clone();
let (status, session_view) = match (login, session_snapshot) {
(LoginState::Online, _) => ("online", None),
(LoginState::NeedsLogin, None) => ("needs_login_idle", None),
(LoginState::NeedsLogin, Some(s)) => (
"needs_login_in_progress",
Some(SessionView {
url: s.url(),
output: s.output(),
finished: s.finished(),
exit_note: s.exit_note(),
}),
),
};
let dashboard_port = std::env::var("HIVE_DASHBOARD_PORT")
.ok()
.and_then(|s| s.parse::<u16>().ok())
.unwrap_or(7000);
let inbox = recent_inbox(&state.socket, state.flavor).await;
axum::Json(StateSnapshot {
label: state.label.clone(),
dashboard_port,
status,
session: session_view,
inbox,
})
}
/// Best-effort: pull the last 30 messages addressed to us via the
/// per-agent / manager socket. Empty list on any transport / decode
/// failure — the inbox section is decorative, not authoritative.
async fn recent_inbox(socket: &std::path::Path, flavor: Flavor) -> Vec<hive_sh4re::InboxRow> {
const LIMIT: u64 = 30;
match flavor {
Flavor::Agent => {
match client::request::<_, hive_sh4re::AgentResponse>(
socket,
&hive_sh4re::AgentRequest::Recent { limit: LIMIT },
)
.await
{
Ok(hive_sh4re::AgentResponse::Recent { rows }) => rows,
_ => Vec::new(),
}
}
Flavor::Manager => {
match client::request::<_, hive_sh4re::ManagerResponse>(
socket,
&hive_sh4re::ManagerRequest::Recent { limit: LIMIT },
)
.await
{
Ok(hive_sh4re::ManagerResponse::Recent { rows }) => rows,
_ => Vec::new(),
}
}
}
}
// ---------------------------------------------------------------------------
// Action handlers
// ---------------------------------------------------------------------------
#[derive(Deserialize)]
struct SendForm {
body: String,
}
async fn post_send(State(state): State<AppState>, Form(form): Form<SendForm>) -> Response {
let body = form.body.trim().to_owned();
if body.is_empty() {
return error_response("send: `body` required");
}
let result = match state.flavor {
Flavor::Agent => match client::request::<_, hive_sh4re::AgentResponse>(
&state.socket,
&hive_sh4re::AgentRequest::OperatorMsg { body },
)
.await
{
Ok(hive_sh4re::AgentResponse::Ok) => Ok(()),
Ok(hive_sh4re::AgentResponse::Err { message }) => Err(message),
Ok(other) => Err(format!("unexpected response: {other:?}")),
Err(e) => Err(format!("transport: {e:#}")),
},
Flavor::Manager => match client::request::<_, hive_sh4re::ManagerResponse>(
&state.socket,
&hive_sh4re::ManagerRequest::OperatorMsg { body },
)
.await
{
Ok(hive_sh4re::ManagerResponse::Ok) => Ok(()),
Ok(hive_sh4re::ManagerResponse::Err { message }) => Err(message),
Ok(other) => Err(format!("unexpected response: {other:?}")),
Err(e) => Err(format!("transport: {e:#}")),
},
};
match result {
Ok(()) => Redirect::to("/").into_response(),
Err(e) => error_response(&format!("send failed: {e}")),
}
}
async fn events_history(
State(state): State<AppState>,
) -> axum::Json<Vec<crate::events::LiveEvent>> {
axum::Json(state.bus.history())
}
async fn events_stream(
State(state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
tracing::info!("sse: client subscribed");
let rx = state.bus.subscribe();
// Drop a "hello" note into the bus so every new subscriber sees at
// least one event immediately and can clear the connecting placeholder.
state.bus.emit(crate::events::LiveEvent::Note(
"live stream attached".into(),
));
let stream = BroadcastStream::new(rx).filter_map(|res| {
let ev = res.ok()?;
let json = serde_json::to_string(&ev).ok()?;
Some(Ok(Event::default().data(json)))
});
Sse::new(stream).keep_alive(KeepAlive::default())
}
async fn post_login_start(State(state): State<AppState>) -> Response {
drop_if_finished(&state.session);
{
let guard = state.session.lock().unwrap();
if guard.is_some() {
return Redirect::to("/").into_response();
}
}
match LoginSession::start() {
Ok(session) => {
*state.session.lock().unwrap() = Some(Arc::new(session));
Redirect::to("/").into_response()
}
Err(e) => error_response(&format!("login start failed: {e:#}")),
}
}
#[derive(Deserialize)]
struct CodeForm {
code: String,
}
async fn post_login_code(State(state): State<AppState>, Form(form): Form<CodeForm>) -> Response {
let session = state.session.lock().unwrap().clone();
let Some(session) = session else {
return error_response("no login session running");
};
if let Err(e) = session.submit_code(&form.code).await {
return error_response(&format!("submit code failed: {e:#}"));
}
Redirect::to("/").into_response()
}
async fn post_login_cancel(State(state): State<AppState>) -> Response {
let session = state.session.lock().unwrap().take();
if let Some(session) = session {
session.close_stdin().await;
session.kill();
}
Redirect::to("/").into_response()
}
/// Operator-initiated session compaction. Spawns `turn::compact_session`
/// in the background — the HTTP handler returns immediately so the
/// async-form spinner can clear. Output (claude's compaction stream,
/// the "/compact done" note) lands in the live event panel like any
/// other turn. If a regular turn is in flight, claude's own session
/// lock will reject this one and we surface the error as a Note.
async fn post_compact(State(state): State<AppState>) -> Response {
let bus = state.bus.clone();
let socket = state.socket.clone();
tokio::spawn(async move {
bus.emit(crate::events::LiveEvent::Note(
"operator: /compact — running on persistent session".into(),
));
let settings = match crate::turn::write_settings(&socket).await {
Ok(p) => p,
Err(e) => {
bus.emit(crate::events::LiveEvent::Note(format!(
"/compact failed: settings write — {e:#}"
)));
return;
}
};
if let Err(e) = crate::turn::compact_session(&settings, &bus).await {
bus.emit(crate::events::LiveEvent::Note(format!(
"/compact failed: {e:#}"
)));
}
});
Redirect::to("/").into_response()
}
/// Cancel the in-flight claude turn. Coarse-grained: shells out
/// `pkill -INT claude` since there's at most one claude per container.
/// SIGINT (not SIGTERM) so claude flushes anything in-flight and emits a
/// final result row. Emits a Note so the operator sees the cancel
/// landed; the actual state transition back to `idle` happens when
/// `run_claude` wakes up and the harness emits `TurnEnd`.
async fn post_cancel_turn(State(state): State<AppState>) -> Response {
let out = tokio::process::Command::new("pkill")
.args(["-INT", "claude"])
.output()
.await;
let note = match out {
Ok(o) if o.status.success() => "operator: /cancel — sent SIGINT to claude".to_owned(),
Ok(o) if o.status.code() == Some(1) => {
"operator: /cancel — no claude process to interrupt".to_owned()
}
Ok(o) => format!(
"operator: /cancel — pkill exited {} stderr={}",
o.status,
String::from_utf8_lossy(&o.stderr).trim()
),
Err(e) => format!("operator: /cancel — pkill failed: {e}"),
};
state.bus.emit(crate::events::LiveEvent::Note(note));
Redirect::to("/").into_response()
}
fn error_response(message: &str) -> Response {
// Plain text — JS app surfaces in `alert()`, HTML wrapping would just
// be noise.
(StatusCode::INTERNAL_SERVER_ERROR, message.to_owned()).into_response()
}