//! Embedded MCP server. Claude Code (running inside the agent container) //! launches this as a stdio child via `--mcp-config`; tool calls land here //! and are translated to `AgentRequest::*` / `ManagerRequest::*` against //! hyperhive's own per-container unix socket at `/run/hive/mcp.sock`. //! //! Two protocols, two surfaces: //! - **hyperhive socket** at `/run/hive/mcp.sock` — JSON-line, our //! broker-routed protocol. Unaffected by this module. //! - **MCP stdio** owned by this module — what claude actually speaks. //! //! Two server flavors: //! - `AgentServer` — sub-agent tools (`send`, `recv`). //! - `ManagerServer` — agent tools + lifecycle (`request_spawn`, `kill`, //! `request_apply_commit`). //! //! Both go through the same `run_tool_envelope` helper so logging + status //! line stay uniform. use std::future::Future; use std::path::PathBuf; use anyhow::Result; use rmcp::{ ServerHandler, ServiceExt, handler::server::wrapper::Parameters, schemars, tool, tool_handler, tool_router, transport::stdio, }; use crate::client; /// Wire-protocol-agnostic view of a hyperhive socket response. Both /// `AgentResponse` and `ManagerResponse` convert into this so the tool /// formatters can be shared between `AgentServer` and `ManagerServer`. #[derive(Debug)] pub enum SocketReply { Ok, Err(String), Message { from: String, body: String }, Empty, Status(u64), QuestionQueued(i64), Recent(Vec), } impl From for SocketReply { fn from(r: hive_sh4re::AgentResponse) -> Self { match r { hive_sh4re::AgentResponse::Ok => Self::Ok, hive_sh4re::AgentResponse::Err { message } => Self::Err(message), hive_sh4re::AgentResponse::Message { from, body } => Self::Message { from, body }, hive_sh4re::AgentResponse::Empty => Self::Empty, hive_sh4re::AgentResponse::Status { unread } => Self::Status(unread), hive_sh4re::AgentResponse::Recent { rows } => Self::Recent(rows), hive_sh4re::AgentResponse::QuestionQueued { id } => Self::QuestionQueued(id), } } } impl From for SocketReply { fn from(r: hive_sh4re::ManagerResponse) -> Self { match r { hive_sh4re::ManagerResponse::Ok => Self::Ok, hive_sh4re::ManagerResponse::Err { message } => Self::Err(message), hive_sh4re::ManagerResponse::Message { from, body } => Self::Message { from, body }, hive_sh4re::ManagerResponse::Empty => Self::Empty, hive_sh4re::ManagerResponse::Status { unread } => Self::Status(unread), hive_sh4re::ManagerResponse::QuestionQueued { id } => Self::QuestionQueued(id), hive_sh4re::ManagerResponse::Recent { rows } => Self::Recent(rows), } } } /// Format helper for "send-like" tools (anything that expects an `Ok`). /// `tool` and `ok_msg` only appear in the result string; they don't change /// behavior. pub fn format_ack(resp: Result, tool: &str, ok_msg: String) -> String { match resp { Ok(SocketReply::Ok) => ok_msg, Ok(SocketReply::Err(m)) => format!("{tool} failed: {m}"), Ok(other) => format!("{tool} unexpected response: {other:?}"), Err(e) => format!("{tool} transport error: {e:#}"), } } /// Format helper for `recv` tools: `Message` → from + body block; /// `Empty` → marker; anything else surfaces as an error. pub fn format_recv(resp: Result) -> String { match resp { Ok(SocketReply::Message { from, body }) => format!("from: {from}\n\n{body}"), Ok(SocketReply::Empty) => "(empty)".into(), Ok(SocketReply::Err(m)) => format!("recv failed: {m}"), Ok(other) => format!("recv unexpected response: {other:?}"), Err(e) => format!("recv transport error: {e:#}"), } } /// Common envelope around every MCP tool handler: pre-log → run → /// post-log. The inbox-status hint used to be appended to every tool /// result; that lives in the wake prompt + UI header now, so tool /// results stay clean. pub async fn run_tool_envelope(tool: &'static str, args: String, body: F) -> String where F: Future, { tracing::info!(tool, %args, "tool: request"); let result = body.await; tracing::info!(tool, result = %result, "tool: result"); result } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct SendArgs { /// Logical agent name to deliver the message to (e.g. `"manager"`, /// `"alice"`, or the literal `"operator"` for the dashboard's T4LK box). pub to: String, /// Message body. Plain text; the broker doesn't parse it. pub body: String, } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct RecvArgs { /// How long to long-poll for a new message before returning the /// empty marker. Capped at 60s server-side. Default (None) is /// 30s. Useful when an agent wants to throttle wakes without /// actually napping — pick a longer wait to coalesce bursts. #[serde(default)] pub wait_seconds: Option, } /// Per-agent tool surface. Holds the socket path so each tool call doesn't /// re-derive it; the socket itself is the per-container `/run/hive/mcp.sock`. #[derive(Debug, Clone)] pub struct AgentServer { socket: PathBuf, } impl AgentServer { #[must_use] pub fn new(socket: PathBuf) -> Self { Self { socket } } } #[tool_router] impl AgentServer { #[tool( description = "Send a message to another hyperhive agent (or to the operator). \ Use this to talk to peers or to surface output for the human at the dashboard." )] async fn send(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); let to = args.to.clone(); if let Err(refusal) = check_send_allowed(&to) { return run_tool_envelope("send", log, async move { refusal }).await; } run_tool_envelope("send", log, async move { let resp = client::request::<_, hive_sh4re::AgentResponse>( &self.socket, &hive_sh4re::AgentRequest::Send { to: args.to, body: args.body, }, ) .await .map(SocketReply::from); format_ack(resp, "send", format!("sent to {to}")) }) .await } #[tool( description = "Surface a question to the operator on the dashboard. Returns immediately \ with a question id — do NOT wait inline. When the operator answers, a system message \ with event `operator_answered { id, question, answer }` lands in your inbox; handle it \ on a future turn. Use this when a decision needs human signal (ambiguous scope, \ permission to do something risky, choosing between options). `options` is advisory: \ pass a short fixed-choice list when applicable, otherwise leave empty for free text. \ Set `multi: true` to let the operator pick multiple options (checkboxes); the answer \ comes back as a comma-separated string. Set `ttl_seconds` to auto-cancel a \ no-longer-relevant question — on expiry the answer is `[expired]` and the same \ `operator_answered` event fires." )] async fn ask_operator(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); run_tool_envelope("ask_operator", log, async move { let resp = client::request::<_, hive_sh4re::AgentResponse>( &self.socket, &hive_sh4re::AgentRequest::AskOperator { question: args.question, options: args.options, multi: args.multi, ttl_seconds: args.ttl_seconds, }, ) .await .map(SocketReply::from); match resp { Ok(SocketReply::QuestionQueued(id)) => format!( "question queued (id={id}); operator's answer will arrive as a system \ `operator_answered` event in your inbox" ), Ok(SocketReply::Err(m)) => format!("ask_operator failed: {m}"), Ok(other) => format!("ask_operator unexpected response: {other:?}"), Err(e) => format!("ask_operator transport error: {e:#}"), } }) .await } #[tool( description = "Pop one message from this agent's inbox. Returns the sender and body, \ or an empty marker if nothing is waiting. Without `wait_seconds` (or with 0) the \ call returns immediately — a cheap 'anything pending?' peek. Pass a positive \ `wait_seconds` (capped at 180) to park the turn waiting for new work — incoming \ messages wake you instantly, otherwise the call returns empty at the timeout. \ That's strictly better than a fixed shell `sleep`. Typical pattern: when you have \ nothing else useful to do, call `recv(wait_seconds: 180)` to park until \ something arrives." )] async fn recv(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); run_tool_envelope("recv", log, async move { let resp = client::request::<_, hive_sh4re::AgentResponse>( &self.socket, &hive_sh4re::AgentRequest::Recv { wait_seconds: args.wait_seconds, }, ) .await .map(SocketReply::from); format_recv(resp) }) .await } } #[tool_handler( instructions = "You are a hyperhive agent. Use `send` to talk to peers (by their logical \ name) or to the operator (recipient `operator`). Use `recv` to drain your inbox one \ message at a time." )] impl ServerHandler for AgentServer {} /// Run the agent MCP server over stdio. Returns when the client disconnects. pub async fn serve_agent_stdio(socket: PathBuf) -> Result<()> { let server = AgentServer::new(socket); let service = server.serve(stdio()).await?; service.waiting().await?; Ok(()) } /// Run the manager MCP server over stdio. Same idea, different tool surface. pub async fn serve_manager_stdio(socket: PathBuf) -> Result<()> { let server = ManagerServer::new(socket); let service = server.serve(stdio()).await?; service.waiting().await?; Ok(()) } // ----------------------------------------------------------------------------- // Manager tool surface // ----------------------------------------------------------------------------- #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct RequestSpawnArgs { /// New sub-agent name (≤9 chars). Queues a Spawn approval; the /// operator approves on the dashboard before the container is created. pub name: String, } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct KillArgs { /// Sub-agent name (without the `h-` container prefix). pub name: String, } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct StartArgs { /// Sub-agent name (without the `h-` container prefix). pub name: String, } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct RestartArgs { /// Sub-agent name (without the `h-` container prefix). pub name: String, } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct UpdateArgs { /// Sub-agent name (without the `h-` container prefix). pub name: String, } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct AskOperatorArgs { /// The question to surface on the dashboard. pub question: String, /// Optional fixed-choice answers. The dashboard always renders a /// free-text fallback ("Other…") so the operator is never trapped /// by an incomplete list. #[serde(default)] pub options: Vec, /// When true, options are rendered as checkboxes — operator can pick /// any subset. The answer comes back as a single string with /// selections joined by ", ". Ignored when `options` is empty. #[serde(default)] pub multi: bool, /// Optional auto-cancel after `ttl_seconds`. On expiry the question /// resolves with answer `[expired]` and the manager receives the /// usual `operator_answered` system event. `None` (default) = /// wait indefinitely. #[serde(default)] pub ttl_seconds: Option, } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct RequestApplyCommitArgs { /// Agent whose config repo the commit lives in (use `"hm1nd"` for the /// manager's own config). pub agent: String, /// Git sha (full or short) pointing at the proposed `agent.nix`. pub commit_ref: String, } #[derive(Debug, Clone)] pub struct ManagerServer { socket: PathBuf, } impl ManagerServer { #[must_use] pub fn new(socket: PathBuf) -> Self { Self { socket } } /// Helper: issue any `ManagerRequest`, convert the reply through /// `SocketReply`. Manager tools that just need an `Ok` ack share this. async fn dispatch( &self, req: hive_sh4re::ManagerRequest, ) -> Result { client::request::<_, hive_sh4re::ManagerResponse>(&self.socket, &req) .await .map(SocketReply::from) } } #[tool_router] impl ManagerServer { #[tool( description = "Send a message to a sub-agent (by logical name), to another agent, \ or to the operator (recipient `operator`, surfaces in the dashboard)." )] async fn send(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); let to = args.to.clone(); run_tool_envelope("send", log, async move { let resp = self .dispatch(hive_sh4re::ManagerRequest::Send { to: args.to, body: args.body, }) .await; format_ack(resp, "send", format!("sent to {to}")) }) .await } #[tool( description = "Pop one message from the manager inbox. Returns sender + body, or \ empty. Without `wait_seconds` (or 0) returns immediately — a cheap inbox peek. \ Pass a positive value (capped at 180) to park until either a message arrives \ or the timeout fires; prefer a long wait (120 or 180) over ending a turn \ early when you have nothing else to do." )] async fn recv(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); run_tool_envelope("recv", log, async move { let resp = self .dispatch(hive_sh4re::ManagerRequest::Recv { wait_seconds: args.wait_seconds, }) .await; format_recv(resp) }) .await } #[tool( description = "Queue a Spawn approval for a brand-new sub-agent. The operator \ approves on the dashboard before the container is actually created." )] async fn request_spawn(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); let name = args.name.clone(); run_tool_envelope("request_spawn", log, async move { let resp = self .dispatch(hive_sh4re::ManagerRequest::RequestSpawn { name: args.name }) .await; format_ack( resp, "request_spawn", format!("spawn approval queued for {name}"), ) }) .await } #[tool( description = "Stop a sub-agent container (graceful). The state dir is kept; \ recreating reuses prior config + Claude credentials. No approval required." )] async fn kill(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); let name = args.name.clone(); run_tool_envelope("kill", log, async move { let resp = self .dispatch(hive_sh4re::ManagerRequest::Kill { name: args.name }) .await; format_ack(resp, "kill", format!("killed {name}")) }) .await } #[tool( description = "Start a stopped sub-agent container. No approval required — \ lifecycle ops on existing containers are at the manager's discretion." )] async fn start(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); let name = args.name.clone(); run_tool_envelope("start", log, async move { let resp = self .dispatch(hive_sh4re::ManagerRequest::Start { name: args.name }) .await; format_ack(resp, "start", format!("started {name}")) }) .await } #[tool(description = "Restart a sub-agent container (stop + start). No approval required.")] async fn restart(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); let name = args.name.clone(); run_tool_envelope("restart", log, async move { let resp = self .dispatch(hive_sh4re::ManagerRequest::Restart { name: args.name }) .await; format_ack(resp, "restart", format!("restarted {name}")) }) .await } #[tool( description = "Rebuild a sub-agent: re-applies the current hyperhive flake + agent.nix \ and restarts the container. No approval required — idempotent. Use when you receive a \ `needs_update` system event for an agent." )] async fn update(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); let name = args.name.clone(); run_tool_envelope("update", log, async move { let resp = self .dispatch(hive_sh4re::ManagerRequest::Update { name: args.name }) .await; format_ack(resp, "update", format!("updated {name}")) }) .await } #[tool( description = "Surface a question to the operator on the dashboard. Returns immediately \ with a question id — do NOT wait inline. When the operator answers, a system message \ with event `operator_answered { id, question, answer }` lands in your inbox; handle it \ on a future turn. Use this when a decision needs human signal (ambiguous sub-agent \ request, policy call, scope clarification). `options` is advisory: pass a short \ fixed-choice list when applicable, otherwise leave empty for free text. Set \ `multi: true` to let the operator pick multiple options (checkboxes); the answer \ comes back as a comma-separated string. Set `ttl_seconds` to auto-cancel a \ no-longer-relevant question instead of blocking forever — on expiry the answer \ is `[expired]` and the same `operator_answered` event fires." )] async fn ask_operator(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); run_tool_envelope("ask_operator", log, async move { let resp = self .dispatch(hive_sh4re::ManagerRequest::AskOperator { question: args.question, options: args.options, multi: args.multi, ttl_seconds: args.ttl_seconds, }) .await; match resp { Ok(SocketReply::QuestionQueued(id)) => format!( "question queued (id={id}); operator's answer will arrive as a system \ `operator_answered` event in your inbox" ), Ok(SocketReply::Err(m)) => format!("ask_operator failed: {m}"), Ok(other) => format!("ask_operator unexpected response: {other:?}"), Err(e) => format!("ask_operator transport error: {e:#}"), } }) .await } #[tool( description = "Submit a config change for operator approval. Pass the agent name \ (e.g. `alice` or `hm1nd` for the manager's own config) and a commit sha in that \ agent's proposed config repo. On approval hive-c0re rebuilds the container." )] async fn request_apply_commit( &self, Parameters(args): Parameters, ) -> String { let log = format!("{args:?}"); let agent = args.agent.clone(); let commit_ref = args.commit_ref.clone(); run_tool_envelope("request_apply_commit", log, async move { let resp = self .dispatch(hive_sh4re::ManagerRequest::RequestApplyCommit { agent: args.agent, commit_ref: args.commit_ref, }) .await; format_ack( resp, "request_apply_commit", format!("apply approval queued for {agent} @ {commit_ref}"), ) }) .await } } #[tool_handler( instructions = "You are the hyperhive manager (hm1nd). You coordinate sub-agents and \ relay between them and the operator. Use `send` to talk to agents/operator, `recv` \ to drain your inbox. Privileged: `request_spawn` (new agent, gated on operator \ approval), `kill` (graceful stop), `request_apply_commit` (config change for \ any agent including yourself), `ask_operator` (block on a human answer via the \ dashboard). The manager's own config lives at \ `/agents/hm1nd/config/agent.nix`." )] impl ServerHandler for ManagerServer {} /// Name of the hyperhive MCP server inside claude's view. Claude prefixes /// tools as `mcp____` (e.g. `mcp__hyperhive__send`). pub const SERVER_NAME: &str = "hyperhive"; /// Built-in claude tools the turn loop enables via `--tools`. Anything not /// in this list literally doesn't exist in the session (claude won't even /// try to call it). Web egress (`WebFetch`/`WebSearch`) and nested agents /// (`Task`) are intentionally omitted for now; `Bash` is allowed pending a /// finer-grained allow-list system for shell command patterns. Edit later /// as our trust model evolves. pub const ALLOWED_BUILTIN_TOOLS: &[&str] = &["Bash", "Edit", "Glob", "Grep", "Read", "TodoWrite", "Write"]; /// Which MCP tool surface to advertise via `--allowedTools`. The agent /// list is the strict subset of the manager list, so we just thread the /// flavor through. #[derive(Debug, Clone, Copy)] pub enum Flavor { Agent, Manager, } /// MCP tools claude is allowed to call without prompting. Mirrors the /// hyperhive surface so a new tool added in the corresponding `#[tool_router]` /// impl needs to be listed here too. #[must_use] pub fn allowed_mcp_tools(flavor: Flavor) -> Vec { let names: &[&str] = match flavor { Flavor::Agent => &["send", "recv", "ask_operator"], Flavor::Manager => &[ "send", "recv", "request_spawn", "kill", "start", "restart", "update", "request_apply_commit", "ask_operator", ], }; let mut out: Vec = names .iter() .map(|t| format!("mcp__{SERVER_NAME}__{t}")) .collect(); // Extra MCP servers declared via `hyperhive.extraMcpServers` in // the agent's NixOS config. Each entry maps its `allowedTools` // pattern list to `mcp____` so claude can call // them without per-tool operator approval. `["*"]` (the default) // expands to `mcp____*` — every tool from that server. for (server, spec) in load_extra_mcp() { if server == SERVER_NAME { continue; } for pat in spec.allowed_tools { out.push(format!("mcp__{server}__{pat}")); } } out } /// Combined allow-list passed to `--allowedTools` (auto-approve) — covers /// both the built-ins and the MCP surface. #[must_use] pub fn allowed_tools_arg(flavor: Flavor) -> String { let mut all: Vec = ALLOWED_BUILTIN_TOOLS .iter() .map(|s| (*s).to_owned()) .collect(); all.extend(allowed_mcp_tools(flavor)); all.join(",") } /// Built-in tools list for `--tools` (which built-ins exist in this /// session). Same as `ALLOWED_BUILTIN_TOOLS` but joined comma-separated. #[must_use] pub fn builtin_tools_arg() -> String { ALLOWED_BUILTIN_TOOLS.join(",") } /// Where the NixOS module writes the per-agent extra-MCP spec (see /// `nix/templates/harness-base.nix`). Each entry becomes an additional /// `mcpServers.` block in the rendered claude config + a /// `mcp____` pattern in `--allowedTools`. const EXTRA_MCP_PATH: &str = "/etc/hyperhive/extra-mcp.json"; /// Where the NixOS module writes the per-agent send allow-list (see /// `nix/templates/harness-base.nix`). Empty list = unrestricted (the /// default). Non-empty list constrains `mcp__hyperhive__send`'s `to` /// field; the manager is always implicitly permitted regardless of /// the list contents. const SEND_ALLOW_PATH: &str = "/etc/hyperhive/send-allow.json"; /// Enforce the per-agent send allow-list. Returns `Ok` when the /// recipient is permitted (no list configured, manager always /// allowed, or `to` is in the list); returns `Err(refusal)` with a /// claude-readable string when blocked — the harness surfaces the /// refusal as the tool result so claude knows the message didn't /// land and can react (e.g. route via the manager instead). fn check_send_allowed(to: &str) -> Result<(), String> { if to == hive_sh4re::MANAGER_AGENT { // Always allow agents to talk to the manager — otherwise a // misconfigured allow-list could leave a sub-agent unable // to ask for help. return Ok(()); } let Ok(raw) = std::fs::read_to_string(SEND_ALLOW_PATH) else { return Ok(()); // file missing → no policy configured → unrestricted }; let allow: Vec = match serde_json::from_str(&raw) { Ok(v) => v, Err(e) => { tracing::warn!( path = SEND_ALLOW_PATH, error = ?e, "send allow-list parse failed; falling back to unrestricted", ); return Ok(()); } }; if allow.is_empty() { return Ok(()); // empty list = unrestricted (back-compat) } if allow.iter().any(|n| n == to) { return Ok(()); } Err(format!( "send refused: recipient '{to}' not in hyperhive.allowedRecipients \ (configured in agent.nix). Allowed: {allow:?}. The manager is \ always reachable — route through `send(to: \"manager\", …)` if \ you need to reach someone outside the allow-list." )) } #[derive(Debug, serde::Deserialize)] struct ExtraMcpServer { command: String, #[serde(default)] args: Vec, #[serde(default)] env: std::collections::BTreeMap, #[serde(default = "default_allowed_tools")] #[serde(rename = "allowedTools")] allowed_tools: Vec, } fn default_allowed_tools() -> Vec { vec!["*".to_owned()] } /// Read + parse the extra-MCP spec. Returns an empty map on missing / /// unparsable file (the agent has none configured, or the file is /// malformed — both cases degrade to "no extra servers"). fn load_extra_mcp() -> std::collections::BTreeMap { let Ok(raw) = std::fs::read_to_string(EXTRA_MCP_PATH) else { return std::collections::BTreeMap::new(); }; serde_json::from_str(&raw).unwrap_or_else(|e| { tracing::warn!( path = EXTRA_MCP_PATH, error = ?e, "extra-mcp spec parse failed; ignoring", ); std::collections::BTreeMap::new() }) } /// Render the MCP config blob claude reads from `--mcp-config `. /// `agent_binary` is the path (or PATH-resolvable name) of the `hive-ag3nt` /// executable; `socket` is the hyperhive per-agent socket bind-mounted into /// the container (forwarded to the child as `--socket `). Merges in /// any extra MCP servers declared via `hyperhive.extraMcpServers` in the /// agent's NixOS config. #[must_use] pub fn render_claude_config(agent_binary: &str, socket: &std::path::Path) -> String { let mut servers = serde_json::Map::new(); servers.insert( SERVER_NAME.to_owned(), serde_json::json!({ "command": agent_binary, "args": ["--socket", socket.display().to_string(), "mcp"], "env": {} }), ); for (name, spec) in load_extra_mcp() { if name == SERVER_NAME { tracing::warn!( "extra MCP server name `{SERVER_NAME}` collides with the built-in surface; ignoring", ); continue; } servers.insert( name, serde_json::json!({ "command": spec.command, "args": spec.args, "env": spec.env, }), ); } let config = serde_json::json!({ "mcpServers": servers }); serde_json::to_string_pretty(&config).unwrap_or_else(|_| "{}".into()) }