use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::{Result, bail}; use clap::{Parser, Subcommand}; use hive_ag3nt::events::{Bus, LiveEvent}; use hive_ag3nt::login::{self, LoginState}; use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, turn, web_ui}; use hive_sh4re::{AgentRequest, AgentResponse}; #[derive(Parser)] #[command(name = "hive-ag3nt", about = "hyperhive sub-agent harness")] struct Cli { /// Path to the per-agent MCP socket (bind-mounted from the host). #[arg(long, global = true, default_value = DEFAULT_SOCKET)] socket: PathBuf, #[command(subcommand)] cmd: Cmd, } #[derive(Subcommand)] enum Cmd { /// Run the long-lived harness loop. Polls inbox; replies via `claude --print` /// when available, falling back to a simple echo otherwise. Serve { /// Inbox poll interval in milliseconds. #[arg(long, default_value_t = 1000)] poll_ms: u64, }, /// Send a message to another agent. Send { to: String, body: String }, /// Pop one message from the inbox. Recv, /// Run the agent's MCP server on stdio. Spawned by `claude` via /// `--mcp-config`; tools dispatch through `/run/hive/mcp.sock` back into /// the hyperhive broker. Mcp, } #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) .init(); let cli = Cli::parse(); match cli.cmd { Cmd::Serve { poll_ms } => { let port = std::env::var("HIVE_PORT") .ok() .and_then(|s| s.parse::().ok()) .unwrap_or(DEFAULT_WEB_PORT); let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hive-ag3nt".into()); let claude_dir = PathBuf::from(login::DEFAULT_CLAUDE_DIR); let initial = LoginState::from_dir(&claude_dir); tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "harness boot"); let login_state = Arc::new(Mutex::new(initial)); let ui_state = login_state.clone(); let bus = Bus::new(); let ui_bus = bus.clone(); let ui_socket = cli.socket.clone(); tokio::spawn(async move { if let Err(e) = web_ui::serve( label, port, ui_state, ui_bus, ui_socket, web_ui::Flavor::Agent, ) .await { tracing::error!(error = ?e, "web ui failed"); } }); match initial { LoginState::Online => { serve( &cli.socket, Duration::from_millis(poll_ms), login_state, bus, ) .await } LoginState::NeedsLogin => { // Partial-run mode: keep the harness alive (so the web UI // stays bound) but don't drive the turn loop. Poll the // claude dir; once a session lands we enter `serve`. turn::wait_for_login(&claude_dir, login_state.clone(), poll_ms).await; serve( &cli.socket, Duration::from_millis(poll_ms), login_state, bus, ) .await } } } Cmd::Send { to, body } => { let resp: AgentResponse = client::request(&cli.socket, &AgentRequest::Send { to, body }).await?; render(&resp)?; check(&resp) } Cmd::Recv => { let resp: AgentResponse = client::request(&cli.socket, &AgentRequest::Recv).await?; render(&resp)?; check(&resp) } Cmd::Mcp => mcp::serve_agent_stdio(cli.socket).await, } } async fn serve( socket: &Path, interval: Duration, state: Arc>, bus: Bus, ) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-ag3nt serve"); let _ = state; // reserved for future state transitions (turn-loop -> needs-login) let mcp_config = turn::write_mcp_config(socket).await?; let settings = turn::write_settings(socket).await?; let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hive-ag3nt".into()); let system_prompt = turn::write_system_prompt(socket, &label, mcp::Flavor::Agent).await?; loop { let recv: Result = client::request(socket, &AgentRequest::Recv).await; match recv { Ok(AgentResponse::Message { from, body }) => { tracing::info!(%from, %body, "inbox"); bus.emit(LiveEvent::TurnStart { from: from.clone(), body: body.clone(), }); let prompt = format_wake_prompt(&from, &body); let outcome = turn::drive_turn( &prompt, &mcp_config, &system_prompt, &settings, &bus, mcp::Flavor::Agent, ) .await; turn::emit_turn_end(&bus, &outcome); } Ok(AgentResponse::Empty) => {} Ok(AgentResponse::Ok | AgentResponse::Status { .. }) => { tracing::warn!("recv produced unexpected response kind"); } Ok(AgentResponse::Err { message }) => { tracing::warn!(%message, "recv error"); } Err(e) => { tracing::warn!(error = ?e, "recv failed; retrying"); } } tokio::time::sleep(interval).await; } } /// Per-turn user prompt. The role/tools/etc. is in the system prompt /// (`prompts/agent.md` → `claude --system-prompt-file`); this is just the /// wake signal claude reacts to. fn format_wake_prompt(from: &str, body: &str) -> String { format!("Incoming message from `{from}`:\n---\n{body}\n---") } fn render(resp: &AgentResponse) -> Result<()> { println!("{}", serde_json::to_string_pretty(resp)?); Ok(()) } fn check(resp: &AgentResponse) -> Result<()> { if let AgentResponse::Err { message } = resp { bail!("{message}"); } Ok(()) }