use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::Result; use clap::{Parser, Subcommand}; use hive_ag3nt::events::{Bus, LiveEvent, TurnState}; use hive_ag3nt::login::{self, LoginState}; use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui}; use hive_sh4re::{AgentRequest, AgentResponse}; #[derive(Parser)] #[command(name = "hive-ag3nt", about = "hyperhive sub-agent harness")] struct Cli { /// Path to the per-agent MCP socket (bind-mounted from the host). #[arg(long, global = true, default_value = DEFAULT_SOCKET)] socket: PathBuf, #[command(subcommand)] cmd: Cmd, } #[derive(Subcommand)] enum Cmd { /// Run the long-lived harness loop. Polls inbox; replies via `claude --print` /// when available, falling back to a simple echo otherwise. Serve { /// Inbox poll interval in milliseconds. #[arg(long, default_value_t = 1000)] poll_ms: u64, }, /// Run the agent's MCP server on stdio. Spawned by `claude` via /// `--mcp-config`; tools dispatch through `/run/hive/mcp.sock` back into /// the hyperhive broker. Mcp, /// Inject a wake-up event into this agent's inbox so the next turn /// fires with the given body. Intended for extra MCP servers / /// helpers running inside the container (matrix bridge, scraper, /// webhook listener) that need to nudge claude on external events. /// `from` is the sender label that appears in the wake prompt /// (claude sees "from: matrix" etc.). Wake { #[arg(long)] from: String, /// Body of the wake message. Pass `-` to read from stdin. #[arg(long)] body: String, }, } #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) .init(); let cli = Cli::parse(); match cli.cmd { Cmd::Serve { poll_ms } => { let port = std::env::var("HIVE_PORT") .ok() .and_then(|s| s.parse::().ok()) .unwrap_or(DEFAULT_WEB_PORT); let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hive-ag3nt".into()); let claude_dir = login::default_dir(); let initial = LoginState::from_dir(&claude_dir); tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "harness boot"); let login_state = Arc::new(Mutex::new(initial)); let bus = Bus::new(); let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Agent).await?; plugins::install_configured().await; tokio::spawn(web_ui::serve( label, port, login_state.clone(), bus.clone(), cli.socket.clone(), files.clone(), )); match initial { LoginState::Online => { serve( &cli.socket, Duration::from_millis(poll_ms), login_state, bus, &files, ) .await } LoginState::NeedsLogin => { // Partial-run mode: keep the harness alive (so the web UI // stays bound) but don't drive the turn loop. Poll the // claude dir; once a session lands we enter `serve`. turn::wait_for_login(&claude_dir, login_state.clone(), poll_ms).await; serve( &cli.socket, Duration::from_millis(poll_ms), login_state, bus, &files, ) .await } } } Cmd::Mcp => mcp::serve_agent_stdio(cli.socket).await, Cmd::Wake { from, body } => { // Read body from stdin if caller passed `-`. Same convention // many CLI tools use; keeps multi-line / shell-quoting // friction out of the body content. let body = if body == "-" { let mut buf = String::new(); std::io::Read::read_to_string(&mut std::io::stdin(), &mut buf)?; buf } else { body }; let resp: AgentResponse = client::request(&cli.socket, &AgentRequest::Wake { from, body }).await?; match resp { AgentResponse::Ok => Ok(()), AgentResponse::Err { message } => anyhow::bail!("wake: {message}"), other => anyhow::bail!("wake: unexpected response {other:?}"), } } } } async fn serve( socket: &Path, interval: Duration, state: Arc>, bus: Bus, files: &turn::TurnFiles, ) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-ag3nt serve"); let _ = state; // reserved for future state transitions (turn-loop -> needs-login) loop { let recv: Result = // Explicit long-poll: the new agent_server semantics treat // `None` as "peek, don't wait", which would tight-loop on // sleep(interval). The harness wants to park until a // message arrives, so opt into the full 180s cap. client::request( socket, &AgentRequest::Recv { wait_seconds: Some(180), }, ) .await; match recv { Ok(AgentResponse::Message { from, body }) => { tracing::info!(%from, %body, "inbox"); let unread = inbox_unread(socket).await; bus.emit(LiveEvent::TurnStart { from: from.clone(), body: body.clone(), unread, }); bus.set_state(TurnState::Thinking); let prompt = format_wake_prompt(&from, &body, unread); let outcome = turn::drive_turn(&prompt, files, &bus).await; turn::emit_turn_end(&bus, &outcome); bus.set_state(TurnState::Idle); // After turn completes, check if there are pending messages waiting. // If so, immediately process them instead of blocking on recv(). // This ensures messages queued during the turn are processed ASAP. let pending = inbox_unread(socket).await; if pending > 0 { tracing::info!(%pending, "pending messages after turn; fetching next"); continue; // Loop back to recv() immediately instead of sleeping } } Ok(AgentResponse::Empty) => { // Idle: brief sleep before next poll to avoid busy-looping // on consecutive Empty responses. The recv() call already // waits up to 180s for messages, so this is just for // responsiveness if recv() times out. tokio::time::sleep(interval).await; } Ok(AgentResponse::Ok | AgentResponse::Status { .. } | AgentResponse::Recent { .. } | AgentResponse::QuestionQueued { .. }) => { tracing::warn!("recv produced unexpected response kind"); } Ok(AgentResponse::Err { message }) => { tracing::warn!(%message, "recv error"); } Err(e) => { tracing::warn!(error = ?e, "recv failed; retrying"); } } } } /// Per-turn user prompt. The role/tools/etc. is in the system prompt /// (`prompts/agent.md` → `claude --system-prompt-file`); this is just the /// wake signal claude reacts to. `unread` is the count of *other* /// messages in the inbox right after this one was popped. fn format_wake_prompt(from: &str, body: &str, unread: u64) -> String { let pending = if unread == 0 { String::new() } else { format!( "\n\n({unread} more message(s) pending in your inbox — drain via `mcp__hyperhive__recv` if relevant.)" ) }; format!("Incoming message from `{from}`:\n---\n{body}\n---{pending}") } /// Best-effort: ask our own per-agent socket how many messages are still /// pending after the wake-up Recv. Returns 0 if anything goes wrong. async fn inbox_unread(socket: &Path) -> u64 { match client::request::<_, AgentResponse>(socket, &AgentRequest::Status).await { Ok(AgentResponse::Status { unread }) => unread, _ => 0, } }