diff --git a/CLAUDE.md b/CLAUDE.md index ba7fca0..90ab962 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -182,6 +182,24 @@ and the MCP tools. Claude drives any further `recv`/`send` itself — harness no longer relays claude's stdout as a reply. Stdout is logged for debugging; the side effects (sends via MCP) are what matter. +**Live view.** Each agent runs a `hive_ag3nt::events::Bus` (a +`tokio::sync::broadcast` wrapper). The harness emits: +- `TurnStart { from, body }` when a wake-up message is popped. +- `Stream(value)` for every line claude prints on stdout (parsed + stream-json; flattened under `{kind: "stream", type: ...}` via serde + internal tagging). +- `Note(text)` for stderr lines and non-JSON stdout (so nothing's lost). +- `TurnEnd { ok, note }` when claude exits. + +The web UI subscribes via `/events/stream` (SSE) and a small JS panel on +`/` appends rows. No full-page reload — the login form (and anything else +the operator is typing into) stays put. + +claude is invoked with `--print --verbose --output-format stream-json` so +tool calls + assistant text + tool results all land as structured events. +The harness no longer reads claude's text stdout into a reply; claude +calls `mcp__hyperhive__send` itself. + **Tool envelope.** Every MCP tool handler in `hive_ag3nt::mcp::AgentServer` wraps its logic in `run_tool(name, args_debug, async { ... })`. The envelope guarantees: diff --git a/Cargo.lock b/Cargo.lock index 4286126..6ec8564 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -457,6 +457,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", ] diff --git a/hive-ag3nt/Cargo.toml b/hive-ag3nt/Cargo.toml index ae1af3e..19586b8 100644 --- a/hive-ag3nt/Cargo.toml +++ b/hive-ag3nt/Cargo.toml @@ -16,6 +16,7 @@ schemars.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true +tokio-stream.workspace = true tracing.workspace = true tracing-subscriber.workspace = true diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index ee2671c..3273253 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -1,12 +1,15 @@ use std::path::{Path, PathBuf}; +use std::process::Stdio; use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::{Result, bail}; use clap::{Parser, Subcommand}; +use hive_ag3nt::events::{Bus, LiveEvent}; use hive_ag3nt::login::{self, LoginState}; use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, web_ui}; use hive_sh4re::{AgentRequest, AgentResponse}; +use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; #[derive(Parser)] @@ -61,14 +64,16 @@ async fn main() -> Result<()> { tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "harness boot"); let login_state = Arc::new(Mutex::new(initial)); let ui_state = login_state.clone(); + let bus = Bus::new(); + let ui_bus = bus.clone(); tokio::spawn(async move { - if let Err(e) = web_ui::serve(label, port, ui_state).await { + if let Err(e) = web_ui::serve(label, port, ui_state, ui_bus).await { tracing::error!(error = ?e, "web ui failed"); } }); match initial { LoginState::Online => { - serve(&cli.socket, Duration::from_millis(poll_ms), login_state).await + serve(&cli.socket, Duration::from_millis(poll_ms), login_state, bus).await } LoginState::NeedsLogin => { // Partial-run mode: keep the harness alive (so the web UI @@ -77,7 +82,7 @@ async fn main() -> Result<()> { // from the dashboard PTY path in step 4 or via // `root-login` + `claude auth login` in the meantime) // transitions us into the turn loop without a restart. - needs_login_loop(&cli.socket, &claude_dir, login_state, poll_ms).await + needs_login_loop(&cli.socket, &claude_dir, login_state, poll_ms, bus).await } } } @@ -103,6 +108,7 @@ async fn needs_login_loop( claude_dir: &Path, state: Arc>, poll_ms: u64, + bus: Bus, ) -> Result<()> { tracing::warn!( claude_dir = %claude_dir.display(), @@ -114,12 +120,17 @@ async fn needs_login_loop( if login::has_session(claude_dir) { tracing::info!("claude session detected — entering turn loop"); *state.lock().unwrap() = LoginState::Online; - return serve(socket, Duration::from_millis(poll_ms), state).await; + return serve(socket, Duration::from_millis(poll_ms), state, bus).await; } } } -async fn serve(socket: &Path, interval: Duration, state: Arc>) -> Result<()> { +async fn serve( + socket: &Path, + interval: Duration, + state: Arc>, + bus: Bus, +) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-ag3nt serve"); let _ = state; // reserved for future state transitions (turn-loop -> needs-login) let mcp_config = write_mcp_config(socket).await?; @@ -129,10 +140,28 @@ async fn serve(socket: &Path, interval: Duration, state: Arc>) match recv { Ok(AgentResponse::Message { from, body }) => { tracing::info!(%from, %body, "inbox"); + bus.emit(LiveEvent::TurnStart { + from: from.clone(), + body: body.clone(), + }); let prompt = format_wake_prompt(&label, &from, &body); - match invoke_claude(&prompt, &mcp_config).await { - Ok(out) => tracing::info!(stdout = %out.trim(), "claude turn finished"), - Err(e) => tracing::warn!(error = %format!("{e:#}"), "claude turn failed"), + let outcome = run_turn(&prompt, &mcp_config, &bus).await; + match outcome { + Ok(()) => { + bus.emit(LiveEvent::TurnEnd { + ok: true, + note: None, + }); + tracing::info!("claude turn finished"); + } + Err(e) => { + let note = format!("{e:#}"); + bus.emit(LiveEvent::TurnEnd { + ok: false, + note: Some(note.clone()), + }); + tracing::warn!(error = %note, "claude turn failed"); + } } } Ok(AgentResponse::Empty) => {} @@ -173,15 +202,20 @@ fn format_wake_prompt(label: &str, from: &str, body: &str) -> String { ) } -async fn invoke_claude(prompt: &str, mcp_config: &Path) -> Result { - // Whitelist model: `--tools` restricts which built-ins exist in the - // session (omitting WebFetch/WebSearch/Task means claude literally - // can't invoke them); `--allowedTools` auto-approves the same set - // plus the hyperhive MCP surface so there's no permission prompt - // mid-turn. A finer-grained allow-list system for Bash command - // patterns is on the backlog (PLAN.md polish). - let out = Command::new("claude") +/// Spawn `claude` for one turn and stream its `stream-json` stdout into +/// the live event bus. `--verbose` is required by claude-code when pairing +/// `--print` with `--output-format stream-json`. Each stdout line is one +/// JSON event; we broadcast the parsed value (or a `Note` fallback on +/// parse error so the UI doesn't silently lose information). The tool +/// whitelist is the same as before: omit WebFetch/WebSearch/Task; allow +/// the hyperhive MCP surface auto-approved. Bash pattern allow-list is on +/// the backlog (CLAUDE.md). +async fn run_turn(prompt: &str, mcp_config: &Path, bus: &Bus) -> Result<()> { + let mut child = Command::new("claude") .arg("--print") + .arg("--verbose") + .arg("--output-format") + .arg("stream-json") .arg("--model") .arg("haiku") .arg("--mcp-config") @@ -191,20 +225,38 @@ async fn invoke_claude(prompt: &str, mcp_config: &Path) -> Result { .arg("--allowedTools") .arg(mcp::allowed_tools_arg()) .arg(prompt) - .output() - .await?; - if !out.status.success() { - bail!( - "claude exited {}: {}", - out.status, - String::from_utf8_lossy(&out.stderr).trim() - ); + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + let stdout = child.stdout.take().expect("piped stdout"); + let stderr = child.stderr.take().expect("piped stderr"); + + let bus_out = bus.clone(); + let bus_err = bus.clone(); + let pump_stdout = tokio::spawn(async move { + let mut reader = BufReader::new(stdout).lines(); + while let Ok(Some(line)) = reader.next_line().await { + match serde_json::from_str::(&line) { + Ok(v) => bus_out.emit(LiveEvent::Stream(v)), + Err(_) => bus_out.emit(LiveEvent::Note(format!("(non-json) {line}"))), + } + } + }); + let pump_stderr = tokio::spawn(async move { + let mut reader = BufReader::new(stderr).lines(); + while let Ok(Some(line)) = reader.next_line().await { + bus_err.emit(LiveEvent::Note(format!("stderr: {line}"))); + } + }); + + let status = child.wait().await?; + let _ = pump_stdout.await; + let _ = pump_stderr.await; + if !status.success() { + bail!("claude exited {status}"); } - let text = String::from_utf8_lossy(&out.stdout).trim().to_owned(); - if text.is_empty() { - bail!("claude produced empty output"); - } - Ok(text) + Ok(()) } /// Drop the per-agent MCP config on disk so the turn loop can hand its path diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index e3dd7aa..0afb5fc 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -9,6 +9,7 @@ use std::time::Duration; use anyhow::{Result, bail}; use clap::{Parser, Subcommand}; +use hive_ag3nt::events::Bus; use hive_ag3nt::login::{self, LoginState}; use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, web_ui}; use hive_sh4re::{HelperEvent, ManagerRequest, ManagerResponse, SYSTEM_SENDER}; @@ -66,11 +67,14 @@ async fn main() -> Result<()> { tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "hm1nd boot"); let login_state = Arc::new(Mutex::new(initial)); let ui_state = login_state.clone(); + let bus = Bus::new(); + let ui_bus = bus.clone(); tokio::spawn(async move { - if let Err(e) = web_ui::serve(label, port, ui_state).await { + if let Err(e) = web_ui::serve(label, port, ui_state, ui_bus).await { tracing::error!(error = ?e, "web ui failed"); } }); + let _ = bus; // manager turn loop not wired to events yet match initial { LoginState::Online => serve(&cli.socket, Duration::from_millis(poll_ms)).await, LoginState::NeedsLogin => { diff --git a/hive-ag3nt/src/events.rs b/hive-ag3nt/src/events.rs new file mode 100644 index 0000000..1449af2 --- /dev/null +++ b/hive-ag3nt/src/events.rs @@ -0,0 +1,63 @@ +//! Live event stream for the per-agent web UI. The harness emits one +//! `LiveEvent` per interesting thing that happens during a turn — wake-up +//! (the popped inbox message), every line claude prints on stdout +//! (parsed from `--output-format stream-json`), and the turn-end summary. +//! The web UI subscribes via SSE and renders rows live. +//! +//! Channel type is `tokio::sync::broadcast`. New subscribers see only +//! future events; the dashboard JS deals with the cold-start case by +//! showing "connecting…" until the first event arrives. + +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; + +const CHANNEL_CAPACITY: usize = 256; + +/// One row of the agent's live stream. Serialised to JSON for SSE delivery. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum LiveEvent { + /// Harness popped a wake-up message and is about to invoke claude. + TurnStart { from: String, body: String }, + /// One line of claude's `--output-format stream-json` stdout, parsed as + /// a generic JSON value (so we don't have to track every claude-code + /// event variant). The frontend pretty-prints by `type` field. + Stream(serde_json::Value), + /// Free-form note from the harness (e.g. "claude exited 0", + /// "stream-json parse error: ..."). Useful when stream-json itself + /// fails so the UI doesn't just go silent. + Note(String), + /// Turn finished. `ok=false` means claude exited non-zero or the + /// harness hit a transport error. + TurnEnd { ok: bool, note: Option }, +} + +#[derive(Clone)] +pub struct Bus { + tx: Arc>, +} + +impl Bus { + #[must_use] + pub fn new() -> Self { + let (tx, _) = broadcast::channel(CHANNEL_CAPACITY); + Self { tx: Arc::new(tx) } + } + + pub fn emit(&self, event: LiveEvent) { + // Lagged subscribers drop events — fine; the UI is a tail, not a log. + let _ = self.tx.send(event); + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.tx.subscribe() + } +} + +impl Default for Bus { + fn default() -> Self { + Self::new() + } +} diff --git a/hive-ag3nt/src/lib.rs b/hive-ag3nt/src/lib.rs index 4b7a62e..9689db0 100644 --- a/hive-ag3nt/src/lib.rs +++ b/hive-ag3nt/src/lib.rs @@ -2,6 +2,7 @@ //! `hive-m1nd` (manager) binaries. pub mod client; +pub mod events; pub mod login; pub mod login_session; pub mod mcp; diff --git a/hive-ag3nt/src/web_ui.rs b/hive-ag3nt/src/web_ui.rs index 98548bc..bca27a4 100644 --- a/hive-ag3nt/src/web_ui.rs +++ b/hive-ag3nt/src/web_ui.rs @@ -3,6 +3,7 @@ //! each instance must bind a distinct port. `HIVE_PORT` is set per agent by //! `hive-c0re`'s generated per-agent flake (deterministic from agent name). +use std::convert::Infallible; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; @@ -10,11 +11,16 @@ use anyhow::{Context, Result}; use axum::{ Form, Router, extract::State, - response::{Html, IntoResponse, Redirect, Response}, + response::{ + Html, IntoResponse, Redirect, Response, + sse::{Event, KeepAlive, Sse}, + }, routing::{get, post}, }; use serde::Deserialize; +use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream}; +use crate::events::Bus; use crate::login::LoginState; use crate::login_session::{LoginSession, drop_if_finished}; @@ -28,16 +34,19 @@ struct AppState { label: String, login: LoginStateCell, session: Arc>>>, + bus: Bus, } -pub async fn serve(label: String, port: u16, login: LoginStateCell) -> Result<()> { +pub async fn serve(label: String, port: u16, login: LoginStateCell, bus: Bus) -> Result<()> { let state = AppState { label, login, session: Arc::new(Mutex::new(None)), + bus, }; let app = Router::new() .route("/", get(index)) + .route("/events/stream", get(events_stream)) .route("/login/start", post(post_login_start)) .route("/login/code", post(post_login_code)) .route("/login/cancel", post(post_login_cancel)) @@ -67,9 +76,87 @@ async fn index(State(state): State) -> Html { } fn render_online() -> String { - "

▓█▓▒░ harness alive — turn loop running ▓█▓▒░

\n

phase 6a placeholder — turn-loop status / inbox / xterm.js coming in 6b+

".into() + format!( + "

▓█▓▒░ harness alive — turn loop running ▓█▓▒░

\n{LIVE_PANEL}", + ) } +/// Live event tail rendered into every `/` response when the agent is online. +/// JS opens an `EventSource` on `/events/stream` and appends rows; no full-page +/// reload, so the login flow and other forms aren't clobbered. +const LIVE_PANEL: &str = r#" +

live

+
connecting…
+ +"#; + fn render_needs_login_idle() -> String { "

▓█▓▒░ NEEDS L0G1N ▓█▓▒░

\n

No Claude session in ~/.claude/. The harness is up but the turn loop is paused until you log in.

\n
\n \n
\n

Spawns claude auth login over plain stdio pipes. The OAuth URL will appear here when claude emits it; paste the resulting code back into the form below.

".into() } @@ -104,6 +191,18 @@ fn render_login_in_progress(session: &Arc) -> String { ) } +async fn events_stream( + State(state): State, +) -> Sse>> { + let rx = state.bus.subscribe(); + let stream = BroadcastStream::new(rx).filter_map(|res| { + let ev = res.ok()?; + let json = serde_json::to_string(&ev).ok()?; + Some(Ok(Event::default().data(json))) + }); + Sse::new(stream).keep_alive(KeepAlive::default()) +} + async fn post_login_start(State(state): State) -> Response { drop_if_finished(&state.session); { @@ -244,5 +343,10 @@ const STYLE: &str = r#" word-break: break-all; max-height: 30em; } + #live { max-height: 24em; overflow-y: auto; } + #live span { display: block; } + #live .turnstart { color: var(--amber); } + #live .turnok { color: var(--green); } + #live .turnfail { color: #ff6b6b; } "#;