//! Manager harness. Talks to the manager socket (bind-mounted from the host //! at `/run/hive/mcp.sock` inside the `hm1nd` container). Two surfaces: //! `serve` (long-lived turn loop) and `mcp` (stdio MCP server claude spawns). use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::Duration; use hive_ag3nt::web_ui::TurnLock; use anyhow::Result; use clap::{Parser, Subcommand}; use hive_ag3nt::events::{Bus, LiveEvent, TurnState}; use hive_ag3nt::login::{self, LoginState}; use hive_ag3nt::turn_stats::{TurnStatRow, TurnStats}; use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui}; use hive_sh4re::{HelperEvent, ManagerRequest, ManagerResponse, SYSTEM_SENDER}; #[derive(Parser)] #[command(name = "hive-m1nd", about = "hyperhive manager harness")] struct Cli { /// Path to the manager MCP socket (bind-mounted from the host). #[arg(long, global = true, default_value = DEFAULT_SOCKET)] socket: PathBuf, #[command(subcommand)] cmd: Cmd, } #[derive(Subcommand)] enum Cmd { /// Long-lived loop polling the manager inbox. Serve { #[arg(long, default_value_t = 1000)] poll_ms: u64, }, /// Run the manager MCP server on stdio. Spawned by claude via /// `--mcp-config`; same shape as `hive-ag3nt mcp` but with the /// manager tool surface (`request_spawn`, `kill`, `start`, `restart`, /// `request_apply_commit`, `ask`, `answer`, `remind`). Mcp, } #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) .init(); let cli = Cli::parse(); match cli.cmd { Cmd::Serve { poll_ms } => { let port = std::env::var("HIVE_PORT") .ok() .and_then(|s| s.parse::().ok()) .unwrap_or(DEFAULT_WEB_PORT); let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hm1nd".into()); let claude_dir = login::default_dir(); let initial = LoginState::from_dir(&claude_dir); tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "hm1nd boot"); let login_state = Arc::new(Mutex::new(initial)); let bus = Bus::new(); let stats = TurnStats::open_default(); if let Some(s) = &stats { let (ctx, cost) = s.last_usage(); if ctx.is_some() || cost.is_some() { bus.seed_usage(ctx, cost); } } let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Manager).await?; let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(())); plugins::install_configured(&cli.socket, None).await; tokio::spawn(web_ui::serve( label, port, login_state.clone(), bus.clone(), cli.socket.clone(), files.clone(), turn_lock.clone(), )); match initial { LoginState::Online => { serve( &cli.socket, Duration::from_millis(poll_ms), bus, stats, &files, turn_lock, ) .await } LoginState::NeedsLogin => { turn::wait_for_login(&claude_dir, login_state, &bus, poll_ms).await; serve( &cli.socket, Duration::from_millis(poll_ms), bus, stats, &files, turn_lock, ) .await } } } Cmd::Mcp => mcp::serve_manager_stdio(cli.socket).await, } } async fn serve( socket: &Path, interval: Duration, bus: Bus, stats: Option, files: &turn::TurnFiles, turn_lock: TurnLock, ) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-m1nd serve"); // Same boot-time recovery as hive-ag3nt — see that loop for the // rationale. Manager-flavour socket so we requeue only manager // inflight rows. requeue_inflight(socket).await; loop { let recv: Result = // Explicit long-poll: see hive-ag3nt's serve loop for the // rationale — recv now defaults to peek when wait_seconds // is None. client::request( socket, &ManagerRequest::Recv { wait_seconds: Some(180), }, ) .await; match recv { Ok(ManagerResponse::Message { from, body, id: _, redelivered, }) => { if from == SYSTEM_SENDER { // Helper events (ApprovalResolved / Spawned / Rebuilt / // Killed / Destroyed) — these are FYI for the manager; // we surface them in the live view and forward them as // a normal claude turn so the manager can react (e.g. // greet a newly-spawned agent, retry a failed rebuild). let parsed = serde_json::from_str::(&body).ok(); if let Some(event) = parsed { tracing::info!(?event, "helper event"); } else { tracing::info!(%from, %body, "system message"); } bus.emit(LiveEvent::Note { text: format!("[system] {body}"), }); // Fall through: drive a turn with the event in the wake // prompt body so claude sees it. Sender stays "system" // so the wake prompt can label it as such. } tracing::info!(%from, %body, %redelivered, "manager inbox"); let unread = inbox_unread(socket).await; bus.emit(LiveEvent::TurnStart { from: from.clone(), body: body.clone(), unread, }); let prompt = format_wake_prompt(&from, &body, unread, redelivered); bus.set_state(TurnState::Thinking); let started_at = now_unix(); let started_instant = std::time::Instant::now(); let model_at_start = bus.model(); let outcome = { let _guard = turn_lock.lock().await; turn::drive_turn(&prompt, files, &bus).await }; turn::emit_turn_end(&bus, &outcome); bus.set_state(TurnState::Idle); // Ack only on a clean turn-end; Failed leaves the // popped ids in-flight for the next boot's requeue. // Mirrors hive-ag3nt; see that loop for full rationale. if matches!(outcome, turn::TurnOutcome::Ok) { ack_turn(socket).await; } if let Some(s) = &stats { let ended_at = now_unix(); let duration_ms = i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX); let (open_threads, open_reminders) = fetch_manager_post_turn_counts(socket).await; let row = build_row( started_at, ended_at, duration_ms, model_at_start, from.clone(), &outcome, &bus, open_threads, open_reminders, ); s.record(&row); } // Check for messages that arrived during the turn so we // surface "draining" in the logs. The loop will already // re-iterate from here — no explicit continue needed. let pending = inbox_unread(socket).await; if pending > 0 { tracing::info!(%pending, "pending messages after turn; fetching next"); } } Ok(ManagerResponse::Empty) => { // Idle: sleep briefly before next long-poll attempt. tokio::time::sleep(interval).await; } Ok( ManagerResponse::Ok | ManagerResponse::Status { .. } | ManagerResponse::QuestionQueued { .. } | ManagerResponse::Recent { .. } | ManagerResponse::Logs { .. } | ManagerResponse::LooseEnds { .. } | ManagerResponse::PendingRemindersCount { .. } | ManagerResponse::Whoami { .. }, ) => { tracing::warn!("recv produced unexpected response kind"); } Ok(ManagerResponse::Err { message }) => { tracing::warn!(%message, "recv error"); } Err(e) => { tracing::warn!(error = ?e, "recv failed; retrying"); } } } } /// Per-turn user prompt. The role/tools/etc. is in the system prompt /// (`prompts/manager.md` → `claude --system-prompt-file`); this is just /// the wake signal. `unread` is the inbox depth after this message was /// popped. `redelivered` adds a "may already be handled" banner above /// the wake body when the broker resurfaced this row (see hive-ag3nt's /// `format_wake_prompt` for the full story). fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String { let banner = if redelivered { hive_ag3nt::mcp::REDELIVERY_HINT } else { "" }; let pending = if unread == 0 { String::new() } else { format!( "\n\n({unread} more message(s) pending in your inbox — drain via `mcp__hyperhive__recv` if relevant.)" ) }; format!("{banner}Incoming message from `{from}`:\n---\n{body}\n---{pending}") } /// Best-effort: tell the broker every message popped during the turn /// is now handled. Mirror of `hive-ag3nt::ack_turn` on the manager /// surface. async fn ack_turn(socket: &Path) { match client::request::<_, ManagerResponse>(socket, &ManagerRequest::AckTurn).await { Ok(ManagerResponse::Ok) => {} Ok(ManagerResponse::Err { message }) => { tracing::warn!(%message, "ack_turn rejected by broker"); } Ok(other) => { tracing::warn!(?other, "ack_turn unexpected response"); } Err(e) => tracing::warn!(error = ?e, "ack_turn transport error"), } } /// Boot-time recovery: ask the broker to resurface any inflight (popped /// but not acked) messages so the next `Recv` re-delivers them with /// the redelivery banner. Mirror of `hive-ag3nt::requeue_inflight`. async fn requeue_inflight(socket: &Path) { match client::request::<_, ManagerResponse>(socket, &ManagerRequest::RequeueInflight).await { Ok(ManagerResponse::Ok) => {} Ok(ManagerResponse::Err { message }) => { tracing::warn!(%message, "requeue_inflight rejected by broker"); } Ok(other) => { tracing::warn!(?other, "requeue_inflight unexpected response"); } Err(e) => tracing::warn!(error = ?e, "requeue_inflight transport error"), } } async fn inbox_unread(socket: &Path) -> u64 { match client::request::<_, ManagerResponse>(socket, &ManagerRequest::Status).await { Ok(ManagerResponse::Status { unread }) => unread, _ => 0, } } fn now_unix() -> i64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0) } /// Manager-flavour equivalent of the agent helper. Mirror shape, just /// uses ManagerRequest/ManagerResponse instead of the agent variants. async fn fetch_manager_post_turn_counts(socket: &Path) -> (Option, Option) { let threads = match client::request::<_, ManagerResponse>( socket, &ManagerRequest::GetLooseEnds, ) .await { Ok(ManagerResponse::LooseEnds { loose_ends }) => u64::try_from(loose_ends.len()).ok(), _ => None, }; let reminders = match client::request::<_, ManagerResponse>( socket, &ManagerRequest::CountPendingReminders, ) .await { Ok(ManagerResponse::PendingRemindersCount { count }) => Some(count), _ => None, }; (threads, reminders) } /// Manager flavour of the agent's `build_row` helper. Duplicated rather /// than shared to keep each bin self-contained at this size. #[allow(clippy::too_many_arguments)] fn build_row( started_at: i64, ended_at: i64, duration_ms: i64, model: String, wake_from: String, outcome: &turn::TurnOutcome, bus: &Bus, open_threads_count: Option, open_reminders_count: Option, ) -> TurnStatRow { let cost = bus.last_cost_usage().unwrap_or_default(); let ctx = bus.last_ctx_usage().unwrap_or(cost); let tool_calls = bus.take_tool_calls(); let tool_call_count: u64 = tool_calls.values().copied().sum(); let tool_call_breakdown_json = if tool_calls.is_empty() { None } else { serde_json::to_string(&tool_calls).ok() }; let (result_kind, note) = match outcome { turn::TurnOutcome::Ok => ("ok", None), turn::TurnOutcome::PromptTooLong => ("prompt_too_long", None), turn::TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))), }; TurnStatRow { started_at, ended_at, duration_ms, model, wake_from, input_tokens: cost.input_tokens, output_tokens: cost.output_tokens, cache_read_input_tokens: cost.cache_read_input_tokens, cache_creation_input_tokens: cost.cache_creation_input_tokens, last_input_tokens: ctx.input_tokens, last_output_tokens: ctx.output_tokens, last_cache_read_input_tokens: ctx.cache_read_input_tokens, last_cache_creation_input_tokens: ctx.cache_creation_input_tokens, tool_call_count, tool_call_breakdown_json, open_threads_count, open_reminders_count, result_kind, note, } }