use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::Duration; use hive_ag3nt::web_ui::TurnLock; use anyhow::Result; use clap::{Parser, Subcommand}; use hive_ag3nt::events::{Bus, LiveEvent, TurnState}; use hive_ag3nt::login::{self, LoginState}; use hive_ag3nt::turn_stats::{TurnStatRow, TurnStats}; use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui}; use hive_sh4re::{AgentRequest, AgentResponse}; #[derive(Parser)] #[command(name = "hive-ag3nt", about = "hyperhive sub-agent harness")] struct Cli { /// Path to the per-agent MCP socket (bind-mounted from the host). #[arg(long, global = true, default_value = DEFAULT_SOCKET)] socket: PathBuf, #[command(subcommand)] cmd: Cmd, } #[derive(Subcommand)] enum Cmd { /// Run the long-lived harness loop. Polls inbox; replies via `claude --print` /// when available, falling back to a simple echo otherwise. Serve { /// Inbox poll interval in milliseconds. #[arg(long, default_value_t = 1000)] poll_ms: u64, }, /// Run the agent's MCP server on stdio. Spawned by `claude` via /// `--mcp-config`; tools dispatch through `/run/hive/mcp.sock` back into /// the hyperhive broker. Mcp, /// Inject a wake-up event into this agent's inbox so the next turn /// fires with the given body. Intended for extra MCP servers / /// helpers running inside the container (matrix bridge, scraper, /// webhook listener) that need to nudge claude on external events. /// `from` is the sender label that appears in the wake prompt /// (claude sees "from: matrix" etc.). Wake { #[arg(long)] from: String, /// Body of the wake message. Pass `-` to read from stdin. #[arg(long)] body: String, }, } #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) .init(); let cli = Cli::parse(); match cli.cmd { Cmd::Serve { poll_ms } => { let port = std::env::var("HIVE_PORT") .ok() .and_then(|s| s.parse::().ok()) .unwrap_or(DEFAULT_WEB_PORT); let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hive-ag3nt".into()); let claude_dir = login::default_dir(); let initial = LoginState::from_dir(&claude_dir); tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "harness boot"); let login_state = Arc::new(Mutex::new(initial)); let bus = Bus::new(); let stats = TurnStats::open_default(); if let Some(s) = &stats { let (ctx, cost) = s.last_usage(); if ctx.is_some() || cost.is_some() { bus.seed_usage(ctx, cost); } } let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Agent).await?; let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(())); plugins::install_configured(&cli.socket, Some("manager")).await; tokio::spawn(web_ui::serve( label.clone(), port, login_state.clone(), bus.clone(), cli.socket.clone(), files.clone(), turn_lock.clone(), )); match initial { LoginState::Online => { serve( &cli.socket, Duration::from_millis(poll_ms), login_state, bus, stats, &files, turn_lock, &label, ) .await } LoginState::NeedsLogin => { // Partial-run mode: keep the harness alive (so the web UI // stays bound) but don't drive the turn loop. Poll the // claude dir; once a session lands we enter `serve`. turn::wait_for_login(&claude_dir, login_state.clone(), &bus, poll_ms).await; serve( &cli.socket, Duration::from_millis(poll_ms), login_state, bus, stats, &files, turn_lock, &label, ) .await } } } Cmd::Mcp => mcp::serve_agent_stdio(cli.socket).await, Cmd::Wake { from, body } => { // Read body from stdin if caller passed `-`. Same convention // many CLI tools use; keeps multi-line / shell-quoting // friction out of the body content. let body = if body == "-" { let mut buf = String::new(); std::io::Read::read_to_string(&mut std::io::stdin(), &mut buf)?; buf } else { body }; let resp: AgentResponse = client::request(&cli.socket, &AgentRequest::Wake { from, body }).await?; match resp { AgentResponse::Ok => Ok(()), AgentResponse::Err { message } => anyhow::bail!("wake: {message}"), other => anyhow::bail!("wake: unexpected response {other:?}"), } } } } #[allow(clippy::too_many_arguments, clippy::similar_names)] async fn serve( socket: &Path, interval: Duration, state: Arc>, bus: Bus, stats: Option, files: &turn::TurnFiles, turn_lock: TurnLock, label: &str, ) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-ag3nt serve"); let _ = state; // reserved for future state transitions (turn-loop -> needs-login) // Boot-time recovery: ask the broker to resurface anything we // popped in a previous harness session but never acked // (crashed mid-turn / OOM / container restart). The broker // resets `delivered_at = NULL` on those rows and remembers // their ids so the next `Recv` tags them `redelivered: true`; // we then prepend a "may already be handled" hint to the wake // prompt. Single shot before entering the serve loop; idempotent // when there's nothing inflight. requeue_inflight(socket).await; loop { let recv: Result = // Explicit long-poll: the new agent_server semantics treat // `None` as "peek, don't wait", which would tight-loop on // sleep(interval). The harness wants to park until a // message arrives, so opt into the full 180s cap. // `max: None` (= 1) — the serve loop drives one turn per // wake; claude itself calls recv(max: N) in-turn to drain // a burst when the wake prompt mentions pending. client::request( socket, &AgentRequest::Recv { wait_seconds: Some(180), max: None, }, ) .await; match recv { Ok(AgentResponse::Messages { messages }) if !messages.is_empty() => { let first = messages.into_iter().next().expect("checked non-empty"); let from = first.from; let body = first.body; let redelivered = first.redelivered; tracing::info!(%from, %body, %redelivered, "inbox"); let unread = inbox_unread(socket).await; bus.emit(LiveEvent::TurnStart { from: from.clone(), body: body.clone(), unread, }); bus.set_state(TurnState::Thinking); let started_at = now_unix(); let started_instant = std::time::Instant::now(); let model_at_start = bus.model(); let prompt = format_wake_prompt(&from, &body, unread, redelivered); let outcome = { let _guard = turn_lock.lock().await; turn::drive_turn(&prompt, files, &bus).await }; turn::emit_turn_end(&bus, &outcome); bus.set_state(TurnState::Idle); // Ack only on a clean turn-end. `Failed` leaves every // message popped during the turn in the unacked list; // next harness boot's `RequeueInflight` will reset // `delivered_at = NULL` and tag them `redelivered`. // `PromptTooLong` is absorbed inside `drive_turn` via // compaction so it shouldn't reach here, but if it // does we also skip the ack (safer to redeliver than // to lose the message). if matches!(outcome, turn::TurnOutcome::Ok) { ack_turn(socket).await; } // Rate-limited: park until the quota resets, then requeue // the unacked message so it resurfaces in the same session. if matches!(outcome, turn::TurnOutcome::RateLimited) { let secs = turn::rate_limit_sleep_secs(); bus.emit_status("rate_limited"); bus.emit(LiveEvent::Note { text: format!( "API rate-limited — sleeping {secs}s before retry" ), }); tracing::warn!(sleep_secs = secs, "rate-limited; parking"); tokio::time::sleep(Duration::from_secs(secs)).await; requeue_inflight(socket).await; bus.emit_status("online"); } // Failures are unhandled by definition — PromptTooLong is // absorbed inside drive_turn via compaction, so anything // that reaches Failed here is a real crash. Notify the // manager so it can investigate / restart / page the // operator; best-effort, swallow the send error. if let turn::TurnOutcome::Failed(e) = &outcome { notify_manager_of_failure(socket, label, e).await; } if let Some(s) = &stats { let ended_at = now_unix(); let duration_ms = i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX); let (open_threads, open_reminders) = fetch_agent_post_turn_counts(socket).await; let row = build_row( started_at, ended_at, duration_ms, model_at_start, from.clone(), &outcome, &bus, open_threads, open_reminders, ); s.record(&row); } // After turn completes, log whether messages arrived during // the turn — the outer loop will iterate back to recv() on // its own (the Empty-arm sleep only fires when recv // actually returned Empty), so no explicit continue needed. let pending = inbox_unread(socket).await; if pending > 0 { tracing::info!(%pending, "pending messages after turn; fetching next"); } } Ok(AgentResponse::Messages { .. }) => { // Idle: empty list = nothing pending. Brief sleep // before next poll so a stretch of empty long-poll // returns doesn't tight-loop. tokio::time::sleep(interval).await; } Ok( AgentResponse::Ok | AgentResponse::Status { .. } | AgentResponse::Recent { .. } | AgentResponse::QuestionQueued { .. } | AgentResponse::LooseEnds { .. } | AgentResponse::PendingRemindersCount { .. } | AgentResponse::ReminderRollup { .. } | AgentResponse::Whoami { .. }, ) => { tracing::warn!("recv produced unexpected response kind"); } Ok(AgentResponse::Err { message }) => { tracing::warn!(%message, "recv error"); } Err(e) => { tracing::warn!(error = ?e, "recv failed; retrying"); } } } } /// Per-turn user prompt. The role/tools/etc. is in the system prompt /// (`prompts/agent.md` → `claude --system-prompt-file`); this is just the /// wake signal claude reacts to. `unread` is the count of *other* /// messages in the inbox right after this one was popped. /// `redelivered` flags messages that were popped in a prior harness /// session, never acked, and resurfaced after a restart — a banner /// at the top of the wake prompt warns that any side-effects of /// previous handling may already have happened. fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String { let banner = if redelivered { hive_ag3nt::mcp::REDELIVERY_HINT } else { "" }; let pending = if unread == 0 { String::new() } else { format!( "\n\n({unread} more message(s) pending in your inbox — call `mcp__hyperhive__recv` \ with `max: {unread}` to drain them all in one round-trip before acting.)" ) }; format!("{banner}Incoming message from `{from}`:\n---\n{body}\n---{pending}") } /// Best-effort: tell the broker every message we popped during the /// turn is now fully handled (turn-end-OK). Swallows transport /// errors — the worst case is a redundant requeue on next boot. async fn ack_turn(socket: &Path) { match client::request::<_, AgentResponse>(socket, &AgentRequest::AckTurn).await { Ok(AgentResponse::Ok) => {} Ok(AgentResponse::Err { message }) => { tracing::warn!(%message, "ack_turn rejected by broker"); } Ok(other) => { tracing::warn!(?other, "ack_turn unexpected response"); } Err(e) => tracing::warn!(error = ?e, "ack_turn transport error"), } } /// Boot-time recovery: ask the broker to resurface anything we /// popped in a previous harness session but never acked. The broker /// resets `delivered_at = NULL` on those rows and remembers their /// ids so the next `Recv` carries `redelivered: true`. Swallows /// transport errors — they degrade to "no recovery this boot", /// which is no worse than the pre-feature behaviour (silent drop). async fn requeue_inflight(socket: &Path) { match client::request::<_, AgentResponse>(socket, &AgentRequest::RequeueInflight).await { Ok(AgentResponse::Ok) => {} Ok(AgentResponse::Err { message }) => { tracing::warn!(%message, "requeue_inflight rejected by broker"); } Ok(other) => { tracing::warn!(?other, "requeue_inflight unexpected response"); } Err(e) => tracing::warn!(error = ?e, "requeue_inflight transport error"), } } /// Best-effort: tell the manager that this agent's last turn crashed /// (claude exited non-zero, compaction didn't help, etc.). Routed /// through the normal send path so the manager's inbox surfaces it /// as a system-style event; `label` is included explicitly in the /// body so the manager can identify the failing agent without having /// to look at the `from` field (which is broker-stamped and may /// differ from what the operator sees in the dashboard). Swallows /// transport errors — we just logged the failure, the worst case is /// the manager learns about the crash from the dashboard instead of /// inbox. async fn notify_manager_of_failure(socket: &Path, label: &str, err: &anyhow::Error) { let body = format!("[system] agent `{label}` claude turn failed:\n{err:#}"); let res = client::request::<_, AgentResponse>( socket, &AgentRequest::Send { to: "manager".into(), body, }, ) .await; if let Err(e) = res { tracing::warn!(error = ?e, "failed to notify manager of turn failure"); } } /// Best-effort: ask our own per-agent socket how many messages are still /// pending after the wake-up Recv. Returns 0 if anything goes wrong. async fn inbox_unread(socket: &Path) -> u64 { match client::request::<_, AgentResponse>(socket, &AgentRequest::Status).await { Ok(AgentResponse::Status { unread }) => unread, _ => 0, } } fn now_unix() -> i64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0) } /// Best-effort: ask hive-c0re for this agent's open thread count + pending /// reminder count, after the turn finishes. Either roundtrip can fail /// (transport hiccup, race with hive-c0re restart) — in those cases we /// just drop a `None` into the stats row rather than blocking the loop. async fn fetch_agent_post_turn_counts(socket: &Path) -> (Option, Option) { let threads = match client::request::<_, AgentResponse>( socket, &AgentRequest::GetLooseEnds, ) .await { Ok(AgentResponse::LooseEnds { loose_ends }) => u64::try_from(loose_ends.len()).ok(), _ => None, }; let reminders = match client::request::<_, AgentResponse>( socket, &AgentRequest::CountPendingReminders, ) .await { Ok(AgentResponse::PendingRemindersCount { count }) => Some(count), _ => None, }; (threads, reminders) } /// Assemble a `TurnStatRow` from the harness's per-turn state. Shared /// shape between the agent + manager bin loops (each lives in its own /// crate root so this helper is duplicated; the savings of a shared /// module aren't worth the cross-crate ceremony at this size). #[allow(clippy::too_many_arguments)] fn build_row( started_at: i64, ended_at: i64, duration_ms: i64, model: String, wake_from: String, outcome: &turn::TurnOutcome, bus: &Bus, open_threads_count: Option, open_reminders_count: Option, ) -> TurnStatRow { let cost = bus.last_cost_usage().unwrap_or_default(); let ctx = bus.last_ctx_usage().unwrap_or(cost); let tool_calls = bus.take_tool_calls(); let tool_call_count: u64 = tool_calls.values().copied().sum(); let tool_call_breakdown_json = if tool_calls.is_empty() { None } else { serde_json::to_string(&tool_calls).ok() }; let (result_kind, note) = match outcome { turn::TurnOutcome::Ok => ("ok", None), turn::TurnOutcome::PromptTooLong => ("prompt_too_long", None), turn::TurnOutcome::RateLimited => ("rate_limited", None), turn::TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))), }; TurnStatRow { started_at, ended_at, duration_ms, model, wake_from, input_tokens: cost.input_tokens, output_tokens: cost.output_tokens, cache_read_input_tokens: cost.cache_read_input_tokens, cache_creation_input_tokens: cost.cache_creation_input_tokens, last_input_tokens: ctx.input_tokens, last_output_tokens: ctx.output_tokens, last_cache_read_input_tokens: ctx.cache_read_input_tokens, last_cache_creation_input_tokens: ctx.cache_creation_input_tokens, tool_call_count, tool_call_breakdown_json, open_threads_count, open_reminders_count, result_kind, note, } }