//! Manager harness. Talks to the manager socket (bind-mounted from the host //! at `/run/hive/mcp.sock` inside the `hm1nd` container). Two surfaces: //! `serve` (long-lived turn loop) and `mcp` (stdio MCP server claude spawns). use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::Result; use clap::{Parser, Subcommand}; use hive_ag3nt::events::{Bus, LiveEvent}; use hive_ag3nt::login::{self, LoginState}; use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, turn, web_ui}; use hive_sh4re::{HelperEvent, ManagerRequest, ManagerResponse, SYSTEM_SENDER}; #[derive(Parser)] #[command(name = "hive-m1nd", about = "hyperhive manager harness")] struct Cli { /// Path to the manager MCP socket (bind-mounted from the host). #[arg(long, global = true, default_value = DEFAULT_SOCKET)] socket: PathBuf, #[command(subcommand)] cmd: Cmd, } #[derive(Subcommand)] enum Cmd { /// Long-lived loop polling the manager inbox. Serve { #[arg(long, default_value_t = 1000)] poll_ms: u64, }, /// Run the manager MCP server on stdio. Spawned by claude via /// `--mcp-config`; same shape as `hive-ag3nt mcp` but with the /// manager tool surface (`request_spawn`, `kill`, `start`, `restart`, /// `request_apply_commit`, `ask_operator`). Mcp, } #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) .init(); let cli = Cli::parse(); match cli.cmd { Cmd::Serve { poll_ms } => { let port = std::env::var("HIVE_PORT") .ok() .and_then(|s| s.parse::().ok()) .unwrap_or(DEFAULT_WEB_PORT); let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hm1nd".into()); let claude_dir = PathBuf::from(login::DEFAULT_CLAUDE_DIR); let initial = LoginState::from_dir(&claude_dir); tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "hm1nd boot"); let login_state = Arc::new(Mutex::new(initial)); let ui_state = login_state.clone(); let bus = Bus::new(); let ui_bus = bus.clone(); let ui_socket = cli.socket.clone(); tokio::spawn(async move { if let Err(e) = web_ui::serve( label, port, ui_state, ui_bus, ui_socket, web_ui::Flavor::Manager, ) .await { tracing::error!(error = ?e, "web ui failed"); } }); match initial { LoginState::Online => serve(&cli.socket, Duration::from_millis(poll_ms), bus).await, LoginState::NeedsLogin => { turn::wait_for_login(&claude_dir, login_state, poll_ms).await; serve(&cli.socket, Duration::from_millis(poll_ms), bus).await } } } Cmd::Mcp => mcp::serve_manager_stdio(cli.socket).await, } } async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-m1nd serve"); let mcp_config = turn::write_mcp_config(socket).await?; let settings = turn::write_settings(socket).await?; let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hm1nd".into()); let system_prompt = turn::write_system_prompt(socket, &label, mcp::Flavor::Manager).await?; loop { let recv: Result = client::request(socket, &ManagerRequest::Recv).await; match recv { Ok(ManagerResponse::Message { from, body }) => { if from == SYSTEM_SENDER { // Helper events (ApprovalResolved / Spawned / Rebuilt / // Killed / Destroyed) — these are FYI for the manager; // we surface them in the live view and forward them as // a normal claude turn so the manager can react (e.g. // greet a newly-spawned agent, retry a failed rebuild). let parsed = serde_json::from_str::(&body).ok(); if let Some(event) = parsed { tracing::info!(?event, "helper event"); } else { tracing::info!(%from, %body, "system message"); } bus.emit(LiveEvent::Note(format!("[system] {body}"))); // Fall through: drive a turn with the event in the wake // prompt body so claude sees it. Sender stays "system" // so the wake prompt can label it as such. } tracing::info!(%from, %body, "manager inbox"); let unread = inbox_unread(socket).await; bus.emit(LiveEvent::TurnStart { from: from.clone(), body: body.clone(), unread, }); let prompt = format_wake_prompt(&from, &body, unread); let outcome = turn::drive_turn( &prompt, &mcp_config, &system_prompt, &settings, &bus, mcp::Flavor::Manager, ) .await; turn::emit_turn_end(&bus, &outcome); } Ok(ManagerResponse::Empty) => {} Ok( ManagerResponse::Ok | ManagerResponse::Status { .. } | ManagerResponse::QuestionQueued { .. }, ) => { tracing::warn!("recv produced unexpected response kind"); } Ok(ManagerResponse::Err { message }) => { tracing::warn!(%message, "recv error"); } Err(e) => { tracing::warn!(error = ?e, "recv failed; retrying"); } } tokio::time::sleep(interval).await; } } /// Per-turn user prompt. The role/tools/etc. is in the system prompt /// (`prompts/manager.md` → `claude --system-prompt-file`); this is just /// the wake signal. `unread` is the inbox depth after this message was /// popped. fn format_wake_prompt(from: &str, body: &str, unread: u64) -> String { let pending = if unread == 0 { String::new() } else { format!( "\n\n({unread} more message(s) pending in your inbox — drain via `mcp__hyperhive__recv` if relevant.)" ) }; format!("Incoming message from `{from}`:\n---\n{body}\n---{pending}") } async fn inbox_unread(socket: &Path) -> u64 { match client::request::<_, ManagerResponse>(socket, &ManagerRequest::Status).await { Ok(ManagerResponse::Status { unread }) => unread, _ => 0, } }