From 09787659abd235c5ecfeb008db674ff57a5252e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?m=C3=BCde?= Date: Fri, 15 May 2026 15:13:26 +0200 Subject: [PATCH] manager: same agent loop, ManagerServer MCP surface --- CLAUDE.md | 12 +- hive-ag3nt/src/bin/hive-ag3nt.rs | 99 +----------- hive-ag3nt/src/bin/hive-m1nd.rs | 90 +++++++++-- hive-ag3nt/src/lib.rs | 1 + hive-ag3nt/src/mcp.rs | 266 ++++++++++++++++++++++++++----- hive-ag3nt/src/turn.rs | 96 +++++++++++ 6 files changed, 422 insertions(+), 142 deletions(-) create mode 100644 hive-ag3nt/src/turn.rs diff --git a/CLAUDE.md b/CLAUDE.md index 90ab962..08c4967 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -226,8 +226,16 @@ system for command patterns (`Bash(git *)`-style). When that lands, the `builtin_tools_arg` shape will probably change to a setting / hooks combo per claude-code's permissions plumbing. -Manager will get its own subcommand later with `request_spawn`, `kill`, -`request_apply_commit` added to the TOOLS list. +The manager (`hive-m1nd`) runs the same loop with a `ManagerServer` MCP +flavor: +- `mcp__hyperhive__send`, `recv` — agent surface. +- `mcp__hyperhive__request_spawn(name)` — queue Spawn approval. +- `mcp__hyperhive__kill(name)` — graceful stop of a sub-agent. +- `mcp__hyperhive__request_apply_commit(agent, commit_ref)` — submit a + config change for any agent (`hm1nd` for self-modification). + +The shared per-turn plumbing lives in `hive_ag3nt::turn::{write_mcp_config, +run_turn}` so both binaries can't drift apart. ## Manager (hm1nd) is hive-c0re-managed diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index a4c03b5..b2d8f4c 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -1,5 +1,4 @@ use std::path::{Path, PathBuf}; -use std::process::Stdio; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -7,10 +6,8 @@ use anyhow::{Result, bail}; use clap::{Parser, Subcommand}; use hive_ag3nt::events::{Bus, LiveEvent}; use hive_ag3nt::login::{self, LoginState}; -use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, web_ui}; +use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, turn, web_ui}; use hive_sh4re::{AgentRequest, AgentResponse}; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::process::Command; #[derive(Parser)] #[command(name = "hive-ag3nt", about = "hyperhive sub-agent harness")] @@ -97,7 +94,7 @@ async fn main() -> Result<()> { render(&resp)?; check(&resp) } - Cmd::Mcp => mcp::serve_stdio(cli.socket).await, + Cmd::Mcp => mcp::serve_agent_stdio(cli.socket).await, } } @@ -133,7 +130,7 @@ async fn serve( ) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-ag3nt serve"); let _ = state; // reserved for future state transitions (turn-loop -> needs-login) - let mcp_config = write_mcp_config(socket).await?; + let mcp_config = turn::write_mcp_config(socket).await?; let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hive-ag3nt".into()); loop { let recv: Result = client::request(socket, &AgentRequest::Recv).await; @@ -145,7 +142,8 @@ async fn serve( body: body.clone(), }); let prompt = format_wake_prompt(&label, &from, &body); - let outcome = run_turn(&prompt, &mcp_config, &bus).await; + let outcome = + turn::run_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Agent).await; match outcome { Ok(()) => { bus.emit(LiveEvent::TurnEnd { @@ -202,93 +200,6 @@ fn format_wake_prompt(label: &str, from: &str, body: &str) -> String { ) } -/// Spawn `claude` for one turn and stream its `stream-json` stdout into -/// the live event bus. `--verbose` is required by claude-code when pairing -/// `--print` with `--output-format stream-json`. Each stdout line is one -/// JSON event; we broadcast the parsed value (or a `Note` fallback on -/// parse error so the UI doesn't silently lose information). The tool -/// whitelist is the same as before: omit WebFetch/WebSearch/Task; allow -/// the hyperhive MCP surface auto-approved. Bash pattern allow-list is on -/// the backlog (CLAUDE.md). -async fn run_turn(prompt: &str, mcp_config: &Path, bus: &Bus) -> Result<()> { - // Don't pass the prompt as a positional arg: `--allowedTools ` - // and `--tools ` are variadic in claude-code, and the - // trailing positional gets swallowed into one of them — claude then - // errors with "Input must be provided either through stdin or as a - // prompt argument when using --print". Pipe via stdin instead. - let mut child = Command::new("claude") - .arg("--print") - .arg("--verbose") - .arg("--output-format") - .arg("stream-json") - .arg("--model") - .arg("haiku") - .arg("--mcp-config") - .arg(mcp_config) - .arg("--tools") - .arg(mcp::builtin_tools_arg()) - .arg("--allowedTools") - .arg(mcp::allowed_tools_arg()) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - - if let Some(mut stdin) = child.stdin.take() { - stdin.write_all(prompt.as_bytes()).await?; - stdin.shutdown().await.ok(); - drop(stdin); // signal EOF to claude - } - let stdout = child.stdout.take().expect("piped stdout"); - let stderr = child.stderr.take().expect("piped stderr"); - - let bus_out = bus.clone(); - let bus_err = bus.clone(); - let pump_stdout = tokio::spawn(async move { - let mut reader = BufReader::new(stdout).lines(); - while let Ok(Some(line)) = reader.next_line().await { - match serde_json::from_str::(&line) { - Ok(v) => bus_out.emit(LiveEvent::Stream(v)), - Err(_) => bus_out.emit(LiveEvent::Note(format!("(non-json) {line}"))), - } - } - }); - let pump_stderr = tokio::spawn(async move { - let mut reader = BufReader::new(stderr).lines(); - while let Ok(Some(line)) = reader.next_line().await { - bus_err.emit(LiveEvent::Note(format!("stderr: {line}"))); - } - }); - - let status = child.wait().await?; - let _ = pump_stdout.await; - let _ = pump_stderr.await; - if !status.success() { - bail!("claude exited {status}"); - } - Ok(()) -} - -/// Drop the per-agent MCP config on disk so the turn loop can hand its path -/// to `claude --mcp-config`. Lives under `/run/hive/` (the bind-mounted -/// per-agent runtime dir) so it's ephemeral and isolated per container. -/// Returns the config path. -async fn write_mcp_config(socket: &Path) -> Result { - let parent = socket.parent().unwrap_or_else(|| Path::new("/run/hive")); - tokio::fs::create_dir_all(parent).await.ok(); - let path = parent.join("claude-mcp-config.json"); - // `/proc/self/exe` resolves to the running hive-ag3nt binary's nix store - // path, which the spawned child can re-invoke as the MCP server. Avoids - // needing claude-code's $PATH to contain hive-ag3nt. - let exe = std::env::current_exe() - .ok() - .map_or_else(|| "hive-ag3nt".into(), |p| p.display().to_string()); - let body = mcp::render_claude_config(&exe, socket); - tokio::fs::write(&path, body).await?; - tracing::info!(path = %path.display(), "wrote claude MCP config"); - Ok(path) -} - fn render(resp: &AgentResponse) -> Result<()> { println!("{}", serde_json::to_string_pretty(resp)?); Ok(()) diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 7244099..9928f28 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -9,9 +9,9 @@ use std::time::Duration; use anyhow::{Result, bail}; use clap::{Parser, Subcommand}; -use hive_ag3nt::events::Bus; +use hive_ag3nt::events::{Bus, LiveEvent}; use hive_ag3nt::login::{self, LoginState}; -use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, web_ui}; +use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, turn, web_ui}; use hive_sh4re::{HelperEvent, ManagerRequest, ManagerResponse, SYSTEM_SENDER}; #[derive(Parser)] @@ -43,6 +43,11 @@ enum Cmd { Kill { name: String }, /// Submit a config commit on the agent's config repo for user approval. RequestApplyCommit { agent: String, commit_ref: String }, + /// Run the manager MCP server on stdio. Spawned by claude via + /// `--mcp-config`; same shape as `hive-ag3nt mcp` but with the + /// manager tool surface (`request_spawn`, `kill`, + /// `request_apply_commit`). + Mcp, } #[tokio::main] @@ -74,15 +79,16 @@ async fn main() -> Result<()> { tracing::error!(error = ?e, "web ui failed"); } }); - let _ = bus; // manager turn loop not wired to events yet match initial { - LoginState::Online => serve(&cli.socket, Duration::from_millis(poll_ms)).await, + LoginState::Online => { + serve(&cli.socket, Duration::from_millis(poll_ms), bus).await + } LoginState::NeedsLogin => { tracing::warn!( claude_dir = %claude_dir.display(), "manager has no claude session — staying in partial-run mode" ); - needs_login_loop(&cli.socket, &claude_dir, login_state, poll_ms).await + needs_login_loop(&cli.socket, &claude_dir, login_state, poll_ms, bus).await } } } @@ -99,6 +105,7 @@ async fn main() -> Result<()> { ) .await } + Cmd::Mcp => mcp::serve_manager_stdio(cli.socket).await, } } @@ -118,6 +125,7 @@ async fn needs_login_loop( claude_dir: &Path, state: Arc>, poll_ms: u64, + bus: Bus, ) -> Result<()> { let probe = Duration::from_millis(poll_ms.max(2000)); loop { @@ -125,25 +133,55 @@ async fn needs_login_loop( if login::has_session(claude_dir) { tracing::info!("manager claude session detected — entering inbox loop"); *state.lock().unwrap() = LoginState::Online; - return serve(socket, Duration::from_millis(poll_ms)).await; + return serve(socket, Duration::from_millis(poll_ms), bus).await; } } } -async fn serve(socket: &Path, interval: Duration) -> Result<()> { +async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-m1nd serve"); + let mcp_config = turn::write_mcp_config(socket).await?; + let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hm1nd".into()); loop { let recv: Result = client::request(socket, &ManagerRequest::Recv).await; match recv { Ok(ManagerResponse::Message { from, body }) => { if from == SYSTEM_SENDER { - if let Ok(event) = serde_json::from_str::(&body) { + // Helper events (ApprovalResolved, etc.) — log + surface + // in live view but don't burn a claude turn on them. + let parsed = serde_json::from_str::(&body).ok(); + if let Some(event) = parsed { tracing::info!(?event, "helper event"); } else { tracing::info!(%from, %body, "system message"); } - } else { - tracing::info!(%from, %body, "manager inbox"); + bus.emit(LiveEvent::Note(format!("[system] {body}"))); + continue; + } + tracing::info!(%from, %body, "manager inbox"); + bus.emit(LiveEvent::TurnStart { + from: from.clone(), + body: body.clone(), + }); + let prompt = format_wake_prompt(&label, &from, &body); + let outcome = + turn::run_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Manager).await; + match outcome { + Ok(()) => { + bus.emit(LiveEvent::TurnEnd { + ok: true, + note: None, + }); + tracing::info!("manager turn finished"); + } + Err(e) => { + let note = format!("{e:#}"); + bus.emit(LiveEvent::TurnEnd { + ok: false, + note: Some(note.clone()), + }); + tracing::warn!(error = %note, "manager turn failed"); + } } } Ok(ManagerResponse::Empty) => {} @@ -160,3 +198,35 @@ async fn serve(socket: &Path, interval: Duration) -> Result<()> { tokio::time::sleep(interval).await; } } + +/// Manager-flavored wake prompt. Mentions the privileged tools the sub-agent +/// prompt doesn't have access to, and points the manager at its own +/// editable config repo for self-modification. +fn format_wake_prompt(label: &str, from: &str, body: &str) -> String { + format!( + "You are the hyperhive manager `{label}` in a multi-agent system. You \ + coordinate sub-agents and relay between them and the operator.\n\ + \n\ + Incoming message from `{from}`:\n\ + ---\n\ + {body}\n\ + ---\n\ + \n\ + Tools (hyperhive surface):\n\ + - `mcp__hyperhive__recv()` — drain one more message from your inbox.\n\ + - `mcp__hyperhive__send(to, body)` — message an agent (by name), \ + another peer, or the operator (`operator` surfaces in the dashboard).\n\ + - `mcp__hyperhive__request_spawn(name)` — queue a brand-new sub-agent \ + for operator approval (≤9 char name).\n\ + - `mcp__hyperhive__kill(name)` — graceful stop on a sub-agent.\n\ + - `mcp__hyperhive__request_apply_commit(agent, commit_ref)` — submit \ + a config change for any agent (`hm1nd` for self) for operator \ + approval.\n\ + \n\ + Your own editable config lives at `/agents/hm1nd/config/agent.nix`; \ + every sub-agent's lives at `/agents//config/agent.nix`. Use \ + file/git tools to edit + commit, then `request_apply_commit`.\n\ + \n\ + Handle the inbox, then stop. Don't narrate intent — act." + ) +} diff --git a/hive-ag3nt/src/lib.rs b/hive-ag3nt/src/lib.rs index 9689db0..234939b 100644 --- a/hive-ag3nt/src/lib.rs +++ b/hive-ag3nt/src/lib.rs @@ -6,6 +6,7 @@ pub mod events; pub mod login; pub mod login_session; pub mod mcp; +pub mod turn; pub mod web_ui; /// Default socket path inside the container — bind-mounted by `hive-c0re`. diff --git a/hive-ag3nt/src/mcp.rs b/hive-ag3nt/src/mcp.rs index 26acb29..a91cb8f 100644 --- a/hive-ag3nt/src/mcp.rs +++ b/hive-ag3nt/src/mcp.rs @@ -16,6 +16,7 @@ //! Both go through the same `run_tool_envelope` helper so logging + status //! line stay uniform. +use std::future::Future; use std::path::PathBuf; use anyhow::Result; @@ -28,6 +29,32 @@ use rmcp::{ use crate::client; +/// Common envelope around every MCP tool handler: pre-log → run → append +/// a status line → post-log. Free function so both `AgentServer` and +/// `ManagerServer` use the same shape; the per-server `status_line` +/// closure is what differs (different `Status` wire types). +pub async fn run_tool_envelope( + tool: &'static str, + args: String, + status: S, + body: F, +) -> String +where + F: Future, + S: Future, +{ + tracing::info!(tool, %args, "tool: request"); + let result = body.await; + let status_text = status.await; + let full = if status_text.is_empty() { + result + } else { + format!("{result}\n\n[status] {status_text}") + }; + tracing::info!(tool, result = %full, "tool: result"); + full +} + #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct SendArgs { /// Logical agent name to deliver the message to (e.g. `"manager"`, @@ -53,32 +80,6 @@ impl AgentServer { Self { socket } } - /// Wrap every tool handler in the same envelope: - /// 1. Log the request (tool name + args via `Debug`). - /// 2. Run the tool's actual logic. - /// 3. Append a status line (inbox state) to the result so claude always - /// has a current "how many unread messages" hint without an extra - /// tool call. - /// 4. Log the result body. - /// - /// New tools just call `self.run_tool("name", &args, async { ... })` - /// and get the same shape for free. - async fn run_tool(&self, tool: &'static str, args: String, body: F) -> String - where - F: std::future::Future, - { - tracing::info!(tool, %args, "tool: request"); - let result = body.await; - let status = self.status_line().await; - let full = if status.is_empty() { - result - } else { - format!("{result}\n\n[status] {status}") - }; - tracing::info!(tool, result = %full, "tool: result"); - full - } - /// Non-mutating peek used in the status line. Falls back to a vague /// note rather than failing the whole tool call when the socket /// hiccups. @@ -107,7 +108,7 @@ impl AgentServer { async fn send(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); let to = args.to.clone(); - self.run_tool("send", log, async move { + run_tool_envelope("send", log, self.status_line(), async move { let req = hive_sh4re::AgentRequest::Send { to: args.to, body: args.body, @@ -128,7 +129,7 @@ impl AgentServer { )] async fn recv(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); - self.run_tool("recv", log, async move { + run_tool_envelope("recv", log, self.status_line(), async move { let req = hive_sh4re::AgentRequest::Recv; match client::request::<_, hive_sh4re::AgentResponse>(&self.socket, &req).await { Ok(hive_sh4re::AgentResponse::Message { from, body }) => { @@ -151,14 +152,194 @@ impl AgentServer { )] impl ServerHandler for AgentServer {} -/// Run the MCP server over stdio. Returns when the client disconnects. -pub async fn serve_stdio(socket: PathBuf) -> Result<()> { +/// Run the agent MCP server over stdio. Returns when the client disconnects. +pub async fn serve_agent_stdio(socket: PathBuf) -> Result<()> { let server = AgentServer::new(socket); let service = server.serve(stdio()).await?; service.waiting().await?; Ok(()) } +/// Run the manager MCP server over stdio. Same idea, different tool surface. +pub async fn serve_manager_stdio(socket: PathBuf) -> Result<()> { + let server = ManagerServer::new(socket); + let service = server.serve(stdio()).await?; + service.waiting().await?; + Ok(()) +} + +// ----------------------------------------------------------------------------- +// Manager tool surface +// ----------------------------------------------------------------------------- + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct RequestSpawnArgs { + /// New sub-agent name (≤9 chars). Queues a Spawn approval; the + /// operator approves on the dashboard before the container is created. + pub name: String, +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct KillArgs { + /// Sub-agent name (without the `h-` container prefix). + pub name: String, +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct RequestApplyCommitArgs { + /// Agent whose config repo the commit lives in (use `"hm1nd"` for the + /// manager's own config). + pub agent: String, + /// Git sha (full or short) pointing at the proposed `agent.nix`. + pub commit_ref: String, +} + +#[derive(Debug, Clone)] +pub struct ManagerServer { + socket: PathBuf, +} + +impl ManagerServer { + #[must_use] + pub fn new(socket: PathBuf) -> Self { + Self { socket } + } + + async fn status_line(&self) -> String { + match client::request::<_, hive_sh4re::ManagerResponse>( + &self.socket, + &hive_sh4re::ManagerRequest::Status, + ) + .await + { + Ok(hive_sh4re::ManagerResponse::Status { unread }) => { + format!("{unread} unread message(s) in inbox") + } + Ok(other) => format!("status: unexpected response {other:?}"), + Err(e) => format!("status: transport error: {e:#}"), + } + } +} + +#[tool_router] +impl ManagerServer { + #[tool(description = "Send a message to a sub-agent (by logical name), to another agent, \ + or to the operator (recipient `operator`, surfaces in the dashboard).")] + async fn send(&self, Parameters(args): Parameters) -> String { + let log = format!("{args:?}"); + let to = args.to.clone(); + run_tool_envelope("send", log, self.status_line(), async move { + let req = hive_sh4re::ManagerRequest::Send { + to: args.to, + body: args.body, + }; + match client::request::<_, hive_sh4re::ManagerResponse>(&self.socket, &req).await { + Ok(hive_sh4re::ManagerResponse::Ok) => format!("sent to {to}"), + Ok(hive_sh4re::ManagerResponse::Err { message }) => { + format!("send failed: {message}") + } + Ok(other) => format!("send unexpected response: {other:?}"), + Err(e) => format!("send transport error: {e:#}"), + } + }) + .await + } + + #[tool(description = "Pop one message from the manager inbox. Returns sender + body, or \ + empty.")] + async fn recv(&self, Parameters(args): Parameters) -> String { + let log = format!("{args:?}"); + run_tool_envelope("recv", log, self.status_line(), async move { + let req = hive_sh4re::ManagerRequest::Recv; + match client::request::<_, hive_sh4re::ManagerResponse>(&self.socket, &req).await { + Ok(hive_sh4re::ManagerResponse::Message { from, body }) => { + format!("from: {from}\n\n{body}") + } + Ok(hive_sh4re::ManagerResponse::Empty) => "(empty)".into(), + Ok(hive_sh4re::ManagerResponse::Err { message }) => format!("recv failed: {message}"), + Ok(other) => format!("recv unexpected response: {other:?}"), + Err(e) => format!("recv transport error: {e:#}"), + } + }) + .await + } + + #[tool(description = "Queue a Spawn approval for a brand-new sub-agent. The operator \ + approves on the dashboard before the container is actually created.")] + async fn request_spawn(&self, Parameters(args): Parameters) -> String { + let log = format!("{args:?}"); + let name = args.name.clone(); + run_tool_envelope("request_spawn", log, self.status_line(), async move { + let req = hive_sh4re::ManagerRequest::RequestSpawn { name: args.name }; + match client::request::<_, hive_sh4re::ManagerResponse>(&self.socket, &req).await { + Ok(hive_sh4re::ManagerResponse::Ok) => format!("spawn approval queued for {name}"), + Ok(hive_sh4re::ManagerResponse::Err { message }) => { + format!("request_spawn failed: {message}") + } + Ok(other) => format!("request_spawn unexpected response: {other:?}"), + Err(e) => format!("request_spawn transport error: {e:#}"), + } + }) + .await + } + + #[tool(description = "Stop a sub-agent container (graceful). The state dir is kept; \ + recreating reuses prior config + Claude credentials.")] + async fn kill(&self, Parameters(args): Parameters) -> String { + let log = format!("{args:?}"); + let name = args.name.clone(); + run_tool_envelope("kill", log, self.status_line(), async move { + let req = hive_sh4re::ManagerRequest::Kill { name: args.name }; + match client::request::<_, hive_sh4re::ManagerResponse>(&self.socket, &req).await { + Ok(hive_sh4re::ManagerResponse::Ok) => format!("killed {name}"), + Ok(hive_sh4re::ManagerResponse::Err { message }) => format!("kill failed: {message}"), + Ok(other) => format!("kill unexpected response: {other:?}"), + Err(e) => format!("kill transport error: {e:#}"), + } + }) + .await + } + + #[tool(description = "Submit a config change for operator approval. Pass the agent name \ + (e.g. `alice` or `hm1nd` for the manager's own config) and a commit sha in that \ + agent's proposed config repo. On approval hive-c0re rebuilds the container.")] + async fn request_apply_commit( + &self, + Parameters(args): Parameters, + ) -> String { + let log = format!("{args:?}"); + let agent = args.agent.clone(); + let commit_ref = args.commit_ref.clone(); + run_tool_envelope("request_apply_commit", log, self.status_line(), async move { + let req = hive_sh4re::ManagerRequest::RequestApplyCommit { + agent: args.agent, + commit_ref: args.commit_ref, + }; + match client::request::<_, hive_sh4re::ManagerResponse>(&self.socket, &req).await { + Ok(hive_sh4re::ManagerResponse::Ok) => { + format!("apply approval queued for {agent} @ {commit_ref}") + } + Ok(hive_sh4re::ManagerResponse::Err { message }) => { + format!("request_apply_commit failed: {message}") + } + Ok(other) => format!("request_apply_commit unexpected response: {other:?}"), + Err(e) => format!("request_apply_commit transport error: {e:#}"), + } + }) + .await + } +} + +#[tool_handler( + instructions = "You are the hyperhive manager (hm1nd). You coordinate sub-agents and \ + relay between them and the operator. Use `send` to talk to agents/operator, `recv` \ + to drain your inbox. Privileged: `request_spawn` (new agent, gated on operator \ + approval), `kill` (graceful stop), `request_apply_commit` (config change for \ + any agent including yourself). The manager's own config lives at \ + `/agents/hm1nd/config/agent.nix`." +)] +impl ServerHandler for ManagerServer {} + /// Name of the hyperhive MCP server inside claude's view. Claude prefixes /// tools as `mcp____` (e.g. `mcp__hyperhive__send`). pub const SERVER_NAME: &str = "hyperhive"; @@ -180,12 +361,25 @@ pub const ALLOWED_BUILTIN_TOOLS: &[&str] = &[ "Write", ]; +/// Which MCP tool surface to advertise via `--allowedTools`. The agent +/// list is the strict subset of the manager list, so we just thread the +/// flavor through. +#[derive(Debug, Clone, Copy)] +pub enum Flavor { + Agent, + Manager, +} + /// MCP tools claude is allowed to call without prompting. Mirrors the -/// hyperhive surface so a new tool added below propagates to claude's -/// allow-list automatically. +/// hyperhive surface so a new tool added in the corresponding `#[tool_router]` +/// impl needs to be listed here too. #[must_use] -pub fn allowed_mcp_tools() -> Vec { - ["send", "recv"] +pub fn allowed_mcp_tools(flavor: Flavor) -> Vec { + let names: &[&str] = match flavor { + Flavor::Agent => &["send", "recv"], + Flavor::Manager => &["send", "recv", "request_spawn", "kill", "request_apply_commit"], + }; + names .iter() .map(|t| format!("mcp__{SERVER_NAME}__{t}")) .collect() @@ -194,9 +388,9 @@ pub fn allowed_mcp_tools() -> Vec { /// Combined allow-list passed to `--allowedTools` (auto-approve) — covers /// both the built-ins and the MCP surface. #[must_use] -pub fn allowed_tools_arg() -> String { +pub fn allowed_tools_arg(flavor: Flavor) -> String { let mut all: Vec = ALLOWED_BUILTIN_TOOLS.iter().map(|s| (*s).to_owned()).collect(); - all.extend(allowed_mcp_tools()); + all.extend(allowed_mcp_tools(flavor)); all.join(",") } diff --git a/hive-ag3nt/src/turn.rs b/hive-ag3nt/src/turn.rs new file mode 100644 index 0000000..8943fd6 --- /dev/null +++ b/hive-ag3nt/src/turn.rs @@ -0,0 +1,96 @@ +//! Per-turn claude invocation shared by `hive-ag3nt` and `hive-m1nd`. The +//! two binaries differ only in their MCP `Flavor` (agent surface vs. +//! manager surface) and their wake-prompt wording; the spawn shape, +//! arg-vector, stdin plumbing, and stream-json pumping are identical. + +use std::path::{Path, PathBuf}; +use std::process::Stdio; + +use anyhow::{Result, bail}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::Command; + +use crate::events::{Bus, LiveEvent}; +use crate::mcp; + +/// Drop the MCP config blob claude reads from `--mcp-config `. +/// `socket` is the hyperhive per-container socket (forwarded to the child +/// as `--socket `); `binary_subcommand` is e.g. `"mcp"` for sub-agents +/// or `"mcp"` for the manager (both binaries name their MCP subcommand the +/// same — the differentiator is which binary `/proc/self/exe` resolves to). +pub async fn write_mcp_config(socket: &Path) -> Result { + let parent = socket.parent().unwrap_or_else(|| Path::new("/run/hive")); + tokio::fs::create_dir_all(parent).await.ok(); + let path = parent.join("claude-mcp-config.json"); + let exe = std::env::current_exe() + .ok() + .map_or_else(|| "hive-ag3nt".into(), |p| p.display().to_string()); + let body = mcp::render_claude_config(&exe, socket); + tokio::fs::write(&path, body).await?; + tracing::info!(path = %path.display(), "wrote claude MCP config"); + Ok(path) +} + +/// Spawn `claude` for one turn and pump `stream-json` stdout into the +/// live event bus. Prompt goes over stdin (variadic +/// `--allowedTools`/`--tools` would otherwise eat a trailing positional +/// prompt). On non-zero exit returns an error; the caller emits the +/// `TurnEnd` event. +pub async fn run_turn( + prompt: &str, + mcp_config: &Path, + bus: &Bus, + flavor: mcp::Flavor, +) -> Result<()> { + let mut child = Command::new("claude") + .arg("--print") + .arg("--verbose") + .arg("--output-format") + .arg("stream-json") + .arg("--model") + .arg("haiku") + .arg("--mcp-config") + .arg(mcp_config) + .arg("--tools") + .arg(mcp::builtin_tools_arg()) + .arg("--allowedTools") + .arg(mcp::allowed_tools_arg(flavor)) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + if let Some(mut stdin) = child.stdin.take() { + stdin.write_all(prompt.as_bytes()).await?; + stdin.shutdown().await.ok(); + drop(stdin); + } + let stdout = child.stdout.take().expect("piped stdout"); + let stderr = child.stderr.take().expect("piped stderr"); + + let bus_out = bus.clone(); + let bus_err = bus.clone(); + let pump_stdout = tokio::spawn(async move { + let mut reader = BufReader::new(stdout).lines(); + while let Ok(Some(line)) = reader.next_line().await { + match serde_json::from_str::(&line) { + Ok(v) => bus_out.emit(LiveEvent::Stream(v)), + Err(_) => bus_out.emit(LiveEvent::Note(format!("(non-json) {line}"))), + } + } + }); + let pump_stderr = tokio::spawn(async move { + let mut reader = BufReader::new(stderr).lines(); + while let Ok(Some(line)) = reader.next_line().await { + bus_err.emit(LiveEvent::Note(format!("stderr: {line}"))); + } + }); + + let status = child.wait().await?; + let _ = pump_stdout.await; + let _ = pump_stderr.await; + if !status.success() { + bail!("claude exited {status}"); + } + Ok(()) +}