hyperhive/hive-ag3nt/src/mcp.rs

1221 lines
50 KiB
Rust

//! Embedded MCP server. Claude Code (running inside the agent container)
//! launches this as a stdio child via `--mcp-config`; tool calls land here
//! and are translated to `AgentRequest::*` / `ManagerRequest::*` against
//! hyperhive's own per-container unix socket at `/run/hive/mcp.sock`.
//!
//! Two protocols, two surfaces:
//! - **hyperhive socket** at `/run/hive/mcp.sock` — JSON-line, our
//! broker-routed protocol. Unaffected by this module.
//! - **MCP stdio** owned by this module — what claude actually speaks.
//!
//! Two server flavors:
//! - `AgentServer` — sub-agent tools (`send`, `recv`).
//! - `ManagerServer` — agent tools + lifecycle (`request_spawn`, `kill`,
//! `request_apply_commit`).
//!
//! Both go through the same `run_tool_envelope` helper so logging + status
//! line stay uniform.
use std::future::Future;
use std::path::PathBuf;
use anyhow::Result;
use rmcp::{
ServerHandler, ServiceExt, handler::server::wrapper::Parameters, schemars, tool, tool_handler,
tool_router, transport::stdio,
};
use crate::client;
/// Wire-protocol-agnostic view of a hyperhive socket response. Both
/// `AgentResponse` and `ManagerResponse` convert into this so the tool
/// formatters can be shared between `AgentServer` and `ManagerServer`.
#[derive(Debug)]
pub enum SocketReply {
Ok,
Err(String),
Message { from: String, body: String },
Empty,
Status(u64),
QuestionQueued(i64),
Recent(Vec<hive_sh4re::InboxRow>),
Logs(String),
OpenThreads(Vec<hive_sh4re::OpenThread>),
PendingRemindersCount(u64),
Whoami {
name: String,
role: String,
hyperhive_rev: Option<String>,
},
}
impl From<hive_sh4re::AgentResponse> for SocketReply {
fn from(r: hive_sh4re::AgentResponse) -> Self {
match r {
hive_sh4re::AgentResponse::Ok => Self::Ok,
hive_sh4re::AgentResponse::Err { message } => Self::Err(message),
hive_sh4re::AgentResponse::Message { from, body } => Self::Message { from, body },
hive_sh4re::AgentResponse::Empty => Self::Empty,
hive_sh4re::AgentResponse::Status { unread } => Self::Status(unread),
hive_sh4re::AgentResponse::Recent { rows } => Self::Recent(rows),
hive_sh4re::AgentResponse::QuestionQueued { id } => Self::QuestionQueued(id),
hive_sh4re::AgentResponse::OpenThreads { threads } => Self::OpenThreads(threads),
hive_sh4re::AgentResponse::PendingRemindersCount { count } => {
Self::PendingRemindersCount(count)
}
hive_sh4re::AgentResponse::Whoami {
name,
role,
hyperhive_rev,
} => Self::Whoami {
name,
role,
hyperhive_rev,
},
}
}
}
impl From<hive_sh4re::ManagerResponse> for SocketReply {
fn from(r: hive_sh4re::ManagerResponse) -> Self {
match r {
hive_sh4re::ManagerResponse::Ok => Self::Ok,
hive_sh4re::ManagerResponse::Err { message } => Self::Err(message),
hive_sh4re::ManagerResponse::Message { from, body } => Self::Message { from, body },
hive_sh4re::ManagerResponse::Empty => Self::Empty,
hive_sh4re::ManagerResponse::Status { unread } => Self::Status(unread),
hive_sh4re::ManagerResponse::QuestionQueued { id } => Self::QuestionQueued(id),
hive_sh4re::ManagerResponse::Recent { rows } => Self::Recent(rows),
hive_sh4re::ManagerResponse::Logs { content } => Self::Logs(content),
hive_sh4re::ManagerResponse::OpenThreads { threads } => Self::OpenThreads(threads),
hive_sh4re::ManagerResponse::PendingRemindersCount { count } => {
Self::PendingRemindersCount(count)
}
hive_sh4re::ManagerResponse::Whoami {
name,
role,
hyperhive_rev,
} => Self::Whoami {
name,
role,
hyperhive_rev,
},
}
}
}
/// Format helper for "send-like" tools (anything that expects an `Ok`).
/// `tool` and `ok_msg` only appear in the result string; they don't change
/// behavior.
pub fn format_ack(resp: Result<SocketReply, anyhow::Error>, tool: &str, ok_msg: String) -> String {
match resp {
Ok(SocketReply::Ok) => ok_msg,
Ok(SocketReply::Err(m)) => format!("{tool} failed: {m}"),
Ok(other) => format!("{tool} unexpected response: {other:?}"),
Err(e) => format!("{tool} transport error: {e:#}"),
}
}
/// Format helper for `recv` tools: `Message` → from + body block;
/// `Empty` → marker; anything else surfaces as an error.
pub fn format_recv(resp: Result<SocketReply, anyhow::Error>) -> String {
match resp {
Ok(SocketReply::Message { from, body }) => format!("from: {from}\n\n{body}"),
Ok(SocketReply::Empty) => "(empty)".into(),
Ok(SocketReply::Err(m)) => format!("recv failed: {m}"),
Ok(other) => format!("recv unexpected response: {other:?}"),
Err(e) => format!("recv transport error: {e:#}"),
}
}
/// Format helper for `get_open_threads`: renders a short bulleted list
/// of pending approvals + questions. Empty list collapses to a clear
/// marker so claude doesn't go hunting for a payload that isn't there.
pub fn format_open_threads(resp: Result<SocketReply, anyhow::Error>) -> String {
use std::fmt::Write as _;
let threads = match resp {
Ok(SocketReply::OpenThreads(t)) => t,
Ok(SocketReply::Err(m)) => return format!("get_open_threads failed: {m}"),
Ok(other) => return format!("get_open_threads unexpected response: {other:?}"),
Err(e) => return format!("get_open_threads transport error: {e:#}"),
};
if threads.is_empty() {
return "(no open threads)".to_owned();
}
let mut out = format!("{} open thread(s):\n", threads.len());
for t in &threads {
match t {
hive_sh4re::OpenThread::Approval {
id,
agent,
commit_ref,
description,
age_seconds,
} => {
let desc = description
.as_deref()
.map(|d| format!("{d}"))
.unwrap_or_default();
let _ = writeln!(
out,
"- approval #{id} ({agent} @ {commit_ref}, {age_seconds}s old){desc}"
);
}
hive_sh4re::OpenThread::Question {
id,
asker,
target,
question,
age_seconds,
} => {
let to = target.as_deref().unwrap_or("operator");
let _ = writeln!(
out,
"- question #{id} ({asker} → {to}, {age_seconds}s old): {question}"
);
}
}
}
out
}
/// Format helper for `whoami`: renders the identity block as a short
/// human-readable string. Skips fields that are `None` so the output
/// doesn't carry dead placeholders.
pub fn format_whoami(resp: Result<SocketReply, anyhow::Error>) -> String {
match resp {
Ok(SocketReply::Whoami {
name,
role,
hyperhive_rev,
}) => {
let rev = hyperhive_rev.as_deref().unwrap_or("<unknown>");
format!("name: {name}\nrole: {role}\nhyperhive_rev: {rev}")
}
Ok(SocketReply::Err(m)) => format!("whoami failed: {m}"),
Ok(other) => format!("whoami unexpected response: {other:?}"),
Err(e) => format!("whoami transport error: {e:#}"),
}
}
/// Common envelope around every MCP tool handler: pre-log → run →
/// post-log. The inbox-status hint used to be appended to every tool
/// result; that lives in the wake prompt + UI header now, so tool
/// results stay clean.
pub async fn run_tool_envelope<F>(tool: &'static str, args: String, body: F) -> String
where
F: Future<Output = String>,
{
tracing::info!(tool, %args, "tool: request");
let result = body.await;
tracing::info!(tool, result = %result, "tool: result");
result
}
/// Append a short note to a tool result when the underlying socket call
/// took retries to land. Lets claude distinguish "my request was wrong"
/// from "c0re flickered and the harness rode it out" — without the
/// hint, a tool result that took 30s to come back looks identical to a
/// content failure and the model would burn a turn retrying it.
pub fn annotate_retries(mut s: String, retries: u32) -> String {
if retries > 0 {
let suffix = if retries == 1 { "retry" } else { "retries" };
s.push_str(&format!(
"\n\n(note: hive socket connect needed {retries} {suffix} — c0re likely \
restarted. Your request did succeed on the final attempt; no action needed.)"
));
}
s
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct SendArgs {
/// Logical agent name to deliver the message to (e.g. `"manager"`,
/// `"alice"`, or the literal `"operator"` for the dashboard's T4LK box).
pub to: String,
/// Message body. Plain text; the broker doesn't parse it.
pub body: String,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct RecvArgs {
/// How long to long-poll for a new message before returning the
/// empty marker. Capped at 60s server-side. Default (None) is
/// 30s. Useful when an agent wants to throttle wakes without
/// actually napping — pick a longer wait to coalesce bursts.
#[serde(default)]
pub wait_seconds: Option<u64>,
}
/// MCP tool args for `remind`. Exactly one of `delay_seconds` or
/// `at_unix_timestamp` must be set; both / neither is a tool-side error.
/// Hides the tagged `ReminderTiming` enum behind a flatter schema so the
/// model picks one field instead of building `{"timing_type": "in_seconds",
/// "seconds": 60}` shaped objects.
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct RemindArgs {
/// Body that lands in your inbox when the reminder fires (sender
/// will appear as `reminder`). Soft cap at 4 KiB inline — anything
/// larger gets auto-persisted to a file under
/// `/agents/<you>/state/reminders/auto-<ts>.md` and the inbox
/// message becomes a short pointer. Pass `file_path` if you want
/// to control the destination yourself.
pub message: String,
/// Fire `delay_seconds` from now (relative). Set this OR
/// `at_unix_timestamp`, not both.
#[serde(default)]
pub delay_seconds: Option<u64>,
/// Fire at this absolute unix timestamp (seconds since epoch). Set
/// this OR `delay_seconds`, not both.
#[serde(default)]
pub at_unix_timestamp: Option<i64>,
/// Optional path to a file the scheduler should reference instead of
/// inlining a long `message`. Use this for large payloads (research
/// notes, file lists, intermediate state). Path must be reachable from
/// the agent's container — typically under `/agents/<you>/state/`.
#[serde(default)]
pub file_path: Option<String>,
}
/// Per-agent tool surface. Holds the socket path so each tool call doesn't
/// re-derive it; the socket itself is the per-container `/run/hive/mcp.sock`.
#[derive(Debug, Clone)]
pub struct AgentServer {
socket: PathBuf,
}
impl AgentServer {
#[must_use]
pub fn new(socket: PathBuf) -> Self {
Self { socket }
}
/// Issue any `AgentRequest` through the retry-aware client and pull
/// the reply through `SocketReply`. Returns the retry count so tool
/// handlers can annotate their result (see `annotate_retries`).
async fn dispatch(
&self,
req: hive_sh4re::AgentRequest,
) -> (Result<SocketReply, anyhow::Error>, u32) {
match client::request_retried::<_, hive_sh4re::AgentResponse>(&self.socket, &req).await {
Ok((r, n)) => (Ok(SocketReply::from(r)), n),
Err(e) => (Err(e), 0),
}
}
}
#[tool_router]
impl AgentServer {
#[tool(
description = "Send a message to another hyperhive agent (or to the operator). \
Use this to talk to peers or to surface output for the human at the dashboard."
)]
async fn send(&self, Parameters(args): Parameters<SendArgs>) -> String {
let log = format!("{args:?}");
let to = args.to.clone();
if let Err(refusal) = check_send_allowed(&to) {
return run_tool_envelope("send", log, async move { refusal }).await;
}
run_tool_envelope("send", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::AgentRequest::Send {
to: args.to,
body: args.body,
})
.await;
annotate_retries(format_ack(resp, "send", format!("sent to {to}")), retries)
})
.await
}
#[tool(
description = "Surface a structured question to either the operator OR a peer agent. \
Returns immediately with a question id — do NOT wait inline. When the recipient \
answers, a system message with event `question_answered { id, question, answer, \
answerer }` lands in your inbox; handle it on a future turn. \n\n\
Recipient: omit `to` (or set `to: \"operator\"`) for the human operator on the \
dashboard. Set `to: \"<agent-name>\"` to ask a peer agent — they receive a \
`question_asked { id, asker, question, options, multi }` event in their inbox \
and answer via `mcp__hyperhive__answer`. \n\n\
`options` is advisory: pass a short fixed-choice list when applicable, otherwise \
leave empty for free text. Set `multi: true` to let the answerer pick multiple \
options (checkboxes on the dashboard, hint to the agent otherwise) — answer comes \
back as a comma-separated string. Set `ttl_seconds` to auto-cancel a \
no-longer-relevant question — on expiry the answer is `[expired]` (with \
`answerer: \"ttl-watchdog\"`) and the same `question_answered` event fires."
)]
async fn ask(&self, Parameters(args): Parameters<AskArgs>) -> String {
let log = format!("{args:?}");
run_tool_envelope("ask", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::AgentRequest::Ask {
question: args.question,
options: args.options,
multi: args.multi,
ttl_seconds: args.ttl_seconds,
to: args.to,
})
.await;
let s = match resp {
Ok(SocketReply::QuestionQueued(id)) => format!(
"question queued (id={id}); answer will arrive as a system \
`question_answered` event in your inbox"
),
Ok(SocketReply::Err(m)) => format!("ask failed: {m}"),
Ok(other) => format!("ask unexpected response: {other:?}"),
Err(e) => format!("ask transport error: {e:#}"),
};
annotate_retries(s, retries)
})
.await
}
#[tool(
description = "Answer a question that was routed to YOU via a `question_asked` system \
event in your inbox. Pass the `id` from that event and your `answer` string. The \
answer will surface in the asker's inbox as a `question_answered { id, question, \
answer, answerer: <your-name> }` event. \n\n\
Authorisation is strict — you can only answer questions where you are the declared \
target (i.e. the asker did `ask(to: \"<your-name>\", ...)`). Trying to answer an \
operator-targeted question or a question addressed to a different agent will fail."
)]
async fn answer(&self, Parameters(args): Parameters<AnswerArgs>) -> String {
let log = format!("{args:?}");
let id = args.id;
run_tool_envelope("answer", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::AgentRequest::Answer {
id,
answer: args.answer,
})
.await;
annotate_retries(
format_ack(resp, "answer", format!("answered question {id}")),
retries,
)
})
.await
}
#[tool(
description = "Pop one message from this agent's inbox. Returns the sender and body, \
or an empty marker if nothing is waiting. Without `wait_seconds` (or with 0) the \
call returns immediately — a cheap 'anything pending?' peek. Pass a positive \
`wait_seconds` (capped at 180) to park the turn waiting for new work — incoming \
messages wake you instantly, otherwise the call returns empty at the timeout. \
That's strictly better than a fixed shell `sleep`. Typical pattern: when you have \
nothing else useful to do, call `recv(wait_seconds: 180)` to park until \
something arrives."
)]
async fn recv(&self, Parameters(args): Parameters<RecvArgs>) -> String {
let log = format!("{args:?}");
run_tool_envelope("recv", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::AgentRequest::Recv {
wait_seconds: args.wait_seconds,
})
.await;
annotate_retries(format_recv(resp), retries)
})
.await
}
#[tool(
description = "List loose ends pending against this agent: unanswered questions \
where you are the asker (waiting on someone) or the target (someone's waiting on \
you), plus — for the manager only — pending approvals you submitted that the \
operator hasn't acted on yet. Cheap server-side sweep, no args. Useful at turn \
start to remember what you owe / what's owed to you without scrolling inbox \
history. Output is a short bulleted list with ids, ages in seconds, and the \
relevant context. Empty result is reported clearly."
)]
async fn get_open_threads(&self) -> String {
run_tool_envelope("get_open_threads", String::new(), async move {
let (resp, retries) = self.dispatch(hive_sh4re::AgentRequest::GetOpenThreads).await;
annotate_retries(format_open_threads(resp), retries)
})
.await
}
#[tool(
description = "Self-introspection: returns your own canonical agent name (the \
socket-identity name, NOT the prompt-substituted label), role (`agent`), and \
the current hyperhive rev hive-c0re is running against. No args. Useful when \
you want a trustworthy identity stamp for state files / commit messages / \
cross-agent attribution that won't drift across renames or session-continue \
boundaries where the system-prompt label could be stale."
)]
async fn whoami(&self) -> String {
run_tool_envelope("whoami", String::new(), async move {
let (resp, retries) = self.dispatch(hive_sh4re::AgentRequest::Whoami).await;
annotate_retries(format_whoami(resp), retries)
})
.await
}
#[tool(
description = "Schedule a reminder that lands in this agent's own inbox at a future \
time (sender will appear as `reminder`). Use for self-paced follow-ups: 'check task \
status in 60s', 'retry failed deploy at 14:00 UTC', 'nudge me when the operator's \
deploy window opens'. Set EXACTLY ONE of `delay_seconds` (fire N seconds from now) \
or `at_unix_timestamp` (fire at absolute epoch second). Body soft-caps at 4 KiB \
inline — anything larger gets auto-persisted to a file under your \
`/agents/<you>/state/reminders/` dir and the inbox message becomes a short pointer; \
pass `file_path` if you want to control the destination yourself. Returns \
immediately — the reminder lives in the broker until due."
)]
async fn remind(&self, Parameters(args): Parameters<RemindArgs>) -> String {
let log = format!("{args:?}");
run_tool_envelope("remind", log, async move {
let timing = match (args.delay_seconds, args.at_unix_timestamp) {
(Some(_), Some(_)) => {
return "remind failed: pass exactly one of `delay_seconds` or \
`at_unix_timestamp`, not both"
.to_string();
}
(None, None) => {
return "remind failed: pass exactly one of `delay_seconds` or \
`at_unix_timestamp`"
.to_string();
}
(Some(s), None) => hive_sh4re::ReminderTiming::InSeconds { seconds: s },
(None, Some(t)) => hive_sh4re::ReminderTiming::At { unix_timestamp: t },
};
let (resp, retries) = self
.dispatch(hive_sh4re::AgentRequest::Remind {
message: args.message,
timing,
file_path: args.file_path,
})
.await;
annotate_retries(format_ack(resp, "remind", "reminder scheduled".to_string()), retries)
})
.await
}
}
#[tool_handler(
instructions = "You are a hyperhive agent. Use `send` to talk to peers (by their logical \
name) or to the operator (recipient `operator`). Use `recv` to drain your inbox one \
message at a time. Use `remind` to schedule a future wake-up message for yourself."
)]
impl ServerHandler for AgentServer {}
/// Run the agent MCP server over stdio. Returns when the client disconnects.
pub async fn serve_agent_stdio(socket: PathBuf) -> Result<()> {
let server = AgentServer::new(socket);
let service = server.serve(stdio()).await?;
service.waiting().await?;
Ok(())
}
/// Run the manager MCP server over stdio. Same idea, different tool surface.
pub async fn serve_manager_stdio(socket: PathBuf) -> Result<()> {
let server = ManagerServer::new(socket);
let service = server.serve(stdio()).await?;
service.waiting().await?;
Ok(())
}
// -----------------------------------------------------------------------------
// Manager tool surface
// -----------------------------------------------------------------------------
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct RequestSpawnArgs {
/// New sub-agent name (≤9 chars). Queues a Spawn approval; the
/// operator approves on the dashboard before the container is created.
pub name: String,
/// Optional description shown on the dashboard approval card so the
/// operator knows what the new agent is for without a separate message.
#[serde(default)]
pub description: Option<String>,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct KillArgs {
/// Sub-agent name (without the `h-` container prefix).
pub name: String,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct StartArgs {
/// Sub-agent name (without the `h-` container prefix).
pub name: String,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct RestartArgs {
/// Sub-agent name (without the `h-` container prefix).
pub name: String,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct UpdateArgs {
/// Sub-agent name (without the `h-` container prefix).
pub name: String,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct AskArgs {
/// The question to surface.
pub question: String,
/// Optional fixed-choice answers. The dashboard renders these as
/// chips alongside a free-text fallback ("Other…") so the operator
/// is never trapped by an incomplete list; peer-agent recipients
/// see the list in their inbox event and can return any string.
#[serde(default)]
pub options: Vec<String>,
/// When true, options are rendered as checkboxes — the answerer
/// can pick any subset. The answer comes back as a single string
/// with selections joined by ", ". Ignored when `options` is empty.
#[serde(default)]
pub multi: bool,
/// Optional auto-cancel after `ttl_seconds` (capped server-side at
/// 6 hours). On expiry the question resolves with answer
/// `[expired]` and the asker receives the usual
/// `question_answered` system event (with `answerer:
/// "ttl-watchdog"`). `None` (default) = wait indefinitely.
#[serde(default)]
pub ttl_seconds: Option<u64>,
/// Recipient. Omit (or pass `"operator"`) to ask the human
/// operator via the dashboard. Pass another agent's logical name
/// to ask that peer — they receive a `question_asked` event in
/// their inbox and answer via `mcp__hyperhive__answer`.
#[serde(default)]
pub to: Option<String>,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct AnswerArgs {
/// Id of the question being answered — comes from the
/// `question_asked` event in your inbox.
pub id: i64,
/// Free-text answer body. Soft-capped at 4 KiB by the same
/// `MESSAGE_MAX_BYTES` limit as `send`; keep it short or write the
/// detail to a file and pass a path.
pub answer: String,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct RequestApplyCommitArgs {
/// Agent whose config repo the commit lives in (use `"hm1nd"` for the
/// manager's own config).
pub agent: String,
/// Git sha (full or short) pointing at the proposed `agent.nix`.
pub commit_ref: String,
/// Optional description shown on the dashboard approval card so the
/// operator knows what the change does without opening the diff.
#[serde(default)]
pub description: Option<String>,
}
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct GetLogsArgs {
/// Logical name of the sub-agent container to fetch logs for.
pub agent: String,
/// How many journal lines to return (default: 50, max: 500).
#[serde(default)]
pub lines: Option<u32>,
}
#[derive(Debug, Clone)]
pub struct ManagerServer {
socket: PathBuf,
}
impl ManagerServer {
#[must_use]
pub fn new(socket: PathBuf) -> Self {
Self { socket }
}
/// Helper: issue any `ManagerRequest` through the retry-aware
/// client, convert the reply through `SocketReply`, and return the
/// retry count alongside so the tool handler can `annotate_retries`
/// on the final string.
async fn dispatch(
&self,
req: hive_sh4re::ManagerRequest,
) -> (Result<SocketReply, anyhow::Error>, u32) {
match client::request_retried::<_, hive_sh4re::ManagerResponse>(&self.socket, &req).await {
Ok((r, n)) => (Ok(SocketReply::from(r)), n),
Err(e) => (Err(e), 0),
}
}
}
#[tool_router]
impl ManagerServer {
#[tool(
description = "Send a message to a sub-agent (by logical name), to another agent, \
or to the operator (recipient `operator`, surfaces in the dashboard)."
)]
async fn send(&self, Parameters(args): Parameters<SendArgs>) -> String {
let log = format!("{args:?}");
let to = args.to.clone();
run_tool_envelope("send", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::Send {
to: args.to,
body: args.body,
})
.await;
annotate_retries(format_ack(resp, "send", format!("sent to {to}")), retries)
})
.await
}
#[tool(
description = "Pop one message from the manager inbox. Returns sender + body, or \
empty. Without `wait_seconds` (or 0) returns immediately — a cheap inbox peek. \
Pass a positive value (capped at 180) to park until either a message arrives \
or the timeout fires; prefer a long wait (120 or 180) over ending a turn \
early when you have nothing else to do."
)]
async fn recv(&self, Parameters(args): Parameters<RecvArgs>) -> String {
let log = format!("{args:?}");
run_tool_envelope("recv", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::Recv {
wait_seconds: args.wait_seconds,
})
.await;
annotate_retries(format_recv(resp), retries)
})
.await
}
#[tool(
description = "Queue a Spawn approval for a brand-new sub-agent. The operator \
approves on the dashboard before the container is actually created."
)]
async fn request_spawn(&self, Parameters(args): Parameters<RequestSpawnArgs>) -> String {
let log = format!("{args:?}");
let name = args.name.clone();
run_tool_envelope("request_spawn", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::RequestSpawn {
name: args.name,
description: args.description,
})
.await;
annotate_retries(
format_ack(
resp,
"request_spawn",
format!("spawn approval queued for {name}"),
),
retries,
)
})
.await
}
#[tool(
description = "Stop a sub-agent container (graceful). The state dir is kept; \
recreating reuses prior config + Claude credentials. No approval required."
)]
async fn kill(&self, Parameters(args): Parameters<KillArgs>) -> String {
let log = format!("{args:?}");
let name = args.name.clone();
run_tool_envelope("kill", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::Kill { name: args.name })
.await;
annotate_retries(format_ack(resp, "kill", format!("killed {name}")), retries)
})
.await
}
#[tool(
description = "Start a stopped sub-agent container. No approval required — \
lifecycle ops on existing containers are at the manager's discretion."
)]
async fn start(&self, Parameters(args): Parameters<StartArgs>) -> String {
let log = format!("{args:?}");
let name = args.name.clone();
run_tool_envelope("start", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::Start { name: args.name })
.await;
annotate_retries(
format_ack(resp, "start", format!("started {name}")),
retries,
)
})
.await
}
#[tool(description = "Restart a sub-agent container (stop + start). No approval required.")]
async fn restart(&self, Parameters(args): Parameters<RestartArgs>) -> String {
let log = format!("{args:?}");
let name = args.name.clone();
run_tool_envelope("restart", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::Restart { name: args.name })
.await;
annotate_retries(
format_ack(resp, "restart", format!("restarted {name}")),
retries,
)
})
.await
}
#[tool(
description = "Rebuild a sub-agent: re-applies the current hyperhive flake + agent.nix \
and restarts the container. No approval required — idempotent. Use when you receive a \
`needs_update` system event for an agent."
)]
async fn update(&self, Parameters(args): Parameters<UpdateArgs>) -> String {
let log = format!("{args:?}");
let name = args.name.clone();
run_tool_envelope("update", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::Update { name: args.name })
.await;
annotate_retries(
format_ack(resp, "update", format!("updated {name}")),
retries,
)
})
.await
}
#[tool(
description = "Surface a structured question to either the operator OR a sub-agent. \
Returns immediately with a question id — do NOT wait inline. When the recipient \
answers, a system message with event `question_answered { id, question, answer, \
answerer }` lands in your inbox; handle it on a future turn. \n\n\
Recipient: omit `to` (or set `to: \"operator\"`) for the human operator on the \
dashboard. Set `to: \"<agent-name>\"` to ask a sub-agent — they receive a \
`question_asked` event in their inbox and answer via their `mcp__hyperhive__answer` \
tool. Useful for delegating decisions / clarifications without losing the \
question id correlation. \n\n\
`options` is advisory: pass a short fixed-choice list when applicable, otherwise \
leave empty for free text. Set `multi: true` to render checkboxes; the answer \
comes back as a comma-separated string. Set `ttl_seconds` to auto-cancel — on \
expiry the answer is `[expired]` (with `answerer: \"ttl-watchdog\"`) and the same \
`question_answered` event fires."
)]
async fn ask(&self, Parameters(args): Parameters<AskArgs>) -> String {
let log = format!("{args:?}");
run_tool_envelope("ask", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::Ask {
question: args.question,
options: args.options,
multi: args.multi,
ttl_seconds: args.ttl_seconds,
to: args.to,
})
.await;
let s = match resp {
Ok(SocketReply::QuestionQueued(id)) => format!(
"question queued (id={id}); answer will arrive as a system \
`question_answered` event in your inbox"
),
Ok(SocketReply::Err(m)) => format!("ask failed: {m}"),
Ok(other) => format!("ask unexpected response: {other:?}"),
Err(e) => format!("ask transport error: {e:#}"),
};
annotate_retries(s, retries)
})
.await
}
#[tool(
description = "Answer a question that was routed to the manager via a `question_asked` \
system event in the manager's inbox (i.e. a sub-agent did `ask(to: \"manager\", \
...)`). Pass the `id` from the event and your `answer`. The answer surfaces in the \
asker's inbox as a `question_answered` event."
)]
async fn answer(&self, Parameters(args): Parameters<AnswerArgs>) -> String {
let log = format!("{args:?}");
let id = args.id;
run_tool_envelope("answer", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::Answer {
id,
answer: args.answer,
})
.await;
annotate_retries(
format_ack(resp, "answer", format!("answered question {id}")),
retries,
)
})
.await
}
#[tool(
description = "Submit a config change for operator approval. Pass the agent name \
(e.g. `alice` or `hm1nd` for the manager's own config) and a commit sha in that \
agent's proposed config repo. On approval hive-c0re rebuilds the container."
)]
async fn request_apply_commit(
&self,
Parameters(args): Parameters<RequestApplyCommitArgs>,
) -> String {
let log = format!("{args:?}");
let agent = args.agent.clone();
let commit_ref = args.commit_ref.clone();
run_tool_envelope("request_apply_commit", log, async move {
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::RequestApplyCommit {
agent: args.agent,
commit_ref: args.commit_ref,
description: args.description,
})
.await;
annotate_retries(
format_ack(
resp,
"request_apply_commit",
format!("apply approval queued for {agent} @ {commit_ref}"),
),
retries,
)
})
.await
}
#[tool(
description = "Schedule a reminder that lands in the manager's own inbox at a future \
time (sender will appear as `reminder`). Use for self-paced manager follow-ups: \
'recheck pending approval in 10m', 'nudge alice if she hasn't replied by 14:00 \
UTC'. Set EXACTLY ONE of `delay_seconds` (fire N seconds from now) or \
`at_unix_timestamp` (fire at absolute epoch second). Body soft-caps at 4 KiB \
inline — anything larger gets auto-persisted to a file under `/state/reminders/` \
(the manager's own state mount) and the inbox message becomes a short pointer. \
Pass `file_path` if you want to control the destination yourself."
)]
async fn remind(&self, Parameters(args): Parameters<RemindArgs>) -> String {
let log = format!("{args:?}");
run_tool_envelope("remind", log, async move {
let timing = match (args.delay_seconds, args.at_unix_timestamp) {
(Some(_), Some(_)) => {
return "remind failed: pass exactly one of `delay_seconds` or \
`at_unix_timestamp`, not both"
.to_string();
}
(None, None) => {
return "remind failed: pass exactly one of `delay_seconds` or \
`at_unix_timestamp`"
.to_string();
}
(Some(s), None) => hive_sh4re::ReminderTiming::InSeconds { seconds: s },
(None, Some(t)) => hive_sh4re::ReminderTiming::At { unix_timestamp: t },
};
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::Remind {
message: args.message,
timing,
file_path: args.file_path,
})
.await;
annotate_retries(format_ack(resp, "remind", "reminder scheduled".to_string()), retries)
})
.await
}
#[tool(
description = "Hive-wide loose ends: EVERY pending approval + EVERY unanswered \
question across the swarm. Use to scan for stalled coordination — questions \
sub-agents asked each other that nobody's answering, approvals stuck waiting on \
the operator, etc. No args. The sub-agent flavour of this tool only returns the \
agent's own threads; the manager flavour is unfiltered."
)]
async fn get_open_threads(&self) -> String {
run_tool_envelope("get_open_threads", String::new(), async move {
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::GetOpenThreads)
.await;
annotate_retries(format_open_threads(resp), retries)
})
.await
}
#[tool(
description = "Self-introspection for the manager: returns canonical name \
(`manager`), role (`manager`), and the current hyperhive rev. Same shape as \
the agent flavour; useful for cross-agent attribution / boot announcements / \
state-file headers without trusting prompt substitution."
)]
async fn whoami(&self) -> String {
run_tool_envelope("whoami", String::new(), async move {
let (resp, retries) = self.dispatch(hive_sh4re::ManagerRequest::Whoami).await;
annotate_retries(format_whoami(resp), retries)
})
.await
}
#[tool(
description = "Fetch recent journal log lines for a sub-agent container. Useful \
for diagnosing MCP server registration failures, startup crashes, plugin install \
errors, or any harness issue you can't see from inside the container. `lines` \
defaults to 50 (max capped at 500 on the host side)."
)]
async fn get_logs(&self, Parameters(args): Parameters<GetLogsArgs>) -> String {
let log = format!("{args:?}");
let agent = args.agent.clone();
run_tool_envelope("get_logs", log, async move {
let lines = args.lines.map(|n| n.min(500));
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::GetLogs {
agent: agent.clone(),
lines,
})
.await;
let s = match resp {
Ok(SocketReply::Logs(content)) => {
if content.is_empty() {
format!("(no journal output for {agent})")
} else {
content
}
}
Ok(SocketReply::Err(m)) => format!("get_logs failed: {m}"),
Ok(other) => format!("get_logs unexpected response: {other:?}"),
Err(e) => format!("get_logs transport error: {e:#}"),
};
annotate_retries(s, retries)
})
.await
}
}
#[tool_handler(
instructions = "You are the hyperhive manager (hm1nd). You coordinate sub-agents and \
relay between them and the operator. Use `send` to talk to agents/operator, `recv` \
to drain your inbox. Privileged: `request_spawn` (new agent, gated on operator \
approval), `kill` (graceful stop), `request_apply_commit` (config change for \
any agent including yourself), `ask` (structured question to the operator or a \
sub-agent — non-blocking, answer arrives later as a `question_answered` event), \
`answer` (respond to a `question_asked` event directed at you), \
`get_open_threads` (hive-wide loose ends — pending approvals + unanswered \
questions across the swarm), `whoami` (self-introspection — canonical name, \
role, current hyperhive rev). The manager's own config lives at \
`/agents/hm1nd/config/agent.nix`."
)]
impl ServerHandler for ManagerServer {}
/// Name of the hyperhive MCP server inside claude's view. Claude prefixes
/// tools as `mcp__<this>__<tool>` (e.g. `mcp__hyperhive__send`).
pub const SERVER_NAME: &str = "hyperhive";
/// Built-in claude tools the turn loop enables via `--tools`. Anything not
/// in this list literally doesn't exist in the session (claude won't even
/// try to call it). Web egress (`WebFetch`/`WebSearch`) and nested agents
/// (`Task`) are intentionally omitted for now; `Bash` is allowed pending a
/// finer-grained allow-list system for shell command patterns. `TodoWrite`
/// is omitted because the todo list lives in claude's in-process session
/// state and silently evaporates on /compact or session reset — agents
/// should plan in /state notes instead. Edit later as our trust model
/// evolves.
pub const ALLOWED_BUILTIN_TOOLS: &[&str] = &["Bash", "Edit", "Glob", "Grep", "Read", "Write"];
/// Which MCP tool surface to advertise via `--allowedTools`. The agent
/// list is the strict subset of the manager list, so we just thread the
/// flavor through.
#[derive(Debug, Clone, Copy)]
pub enum Flavor {
Agent,
Manager,
}
/// MCP tools claude is allowed to call without prompting. Mirrors the
/// hyperhive surface so a new tool added in the corresponding `#[tool_router]`
/// impl needs to be listed here too.
#[must_use]
pub fn allowed_mcp_tools(flavor: Flavor) -> Vec<String> {
let names: &[&str] = match flavor {
Flavor::Agent => &[
"send",
"recv",
"ask",
"answer",
"remind",
"get_open_threads",
"whoami",
],
Flavor::Manager => &[
"send",
"recv",
"request_spawn",
"kill",
"start",
"restart",
"update",
"request_apply_commit",
"ask",
"answer",
"get_logs",
"get_open_threads",
"remind",
"whoami",
],
};
let mut out: Vec<String> = names
.iter()
.map(|t| format!("mcp__{SERVER_NAME}__{t}"))
.collect();
// Extra MCP servers declared via `hyperhive.extraMcpServers` in
// the agent's NixOS config. Each entry maps its `allowedTools`
// pattern list to `mcp__<server>__<pattern>` so claude can call
// them without per-tool operator approval. `["*"]` (the default)
// expands to `mcp__<server>__*` — every tool from that server.
for (server, spec) in load_extra_mcp() {
if server == SERVER_NAME {
continue;
}
for pat in spec.allowed_tools {
out.push(format!("mcp__{server}__{pat}"));
}
}
out
}
/// Combined allow-list passed to `--allowedTools` (auto-approve) — covers
/// both the built-ins and the MCP surface.
#[must_use]
pub fn allowed_tools_arg(flavor: Flavor) -> String {
let mut all: Vec<String> = ALLOWED_BUILTIN_TOOLS
.iter()
.map(|s| (*s).to_owned())
.collect();
all.extend(allowed_mcp_tools(flavor));
all.join(",")
}
/// Built-in tools list for `--tools` (which built-ins exist in this
/// session). Same as `ALLOWED_BUILTIN_TOOLS` but joined comma-separated.
#[must_use]
pub fn builtin_tools_arg() -> String {
ALLOWED_BUILTIN_TOOLS.join(",")
}
/// Where the NixOS module writes the per-agent extra-MCP spec (see
/// `nix/templates/harness-base.nix`). Each entry becomes an additional
/// `mcpServers.<key>` block in the rendered claude config + a
/// `mcp__<key>__<tool>` pattern in `--allowedTools`.
const EXTRA_MCP_PATH: &str = "/etc/hyperhive/extra-mcp.json";
/// Where the NixOS module writes the per-agent send allow-list (see
/// `nix/templates/harness-base.nix`). Empty list = unrestricted (the
/// default). Non-empty list constrains `mcp__hyperhive__send`'s `to`
/// field; the manager is always implicitly permitted regardless of
/// the list contents.
const SEND_ALLOW_PATH: &str = "/etc/hyperhive/send-allow.json";
/// Enforce the per-agent send allow-list. Returns `Ok` when the
/// recipient is permitted (no list configured, manager always
/// allowed, or `to` is in the list); returns `Err(refusal)` with a
/// claude-readable string when blocked — the harness surfaces the
/// refusal as the tool result so claude knows the message didn't
/// land and can react (e.g. route via the manager instead).
fn check_send_allowed(to: &str) -> Result<(), String> {
if to == hive_sh4re::MANAGER_AGENT {
// Always allow agents to talk to the manager — otherwise a
// misconfigured allow-list could leave a sub-agent unable
// to ask for help.
return Ok(());
}
let Ok(raw) = std::fs::read_to_string(SEND_ALLOW_PATH) else {
return Ok(()); // file missing → no policy configured → unrestricted
};
let allow: Vec<String> = match serde_json::from_str(&raw) {
Ok(v) => v,
Err(e) => {
tracing::warn!(
path = SEND_ALLOW_PATH,
error = ?e,
"send allow-list parse failed; falling back to unrestricted",
);
return Ok(());
}
};
if allow.is_empty() {
return Ok(()); // empty list = unrestricted (back-compat)
}
if allow.iter().any(|n| n == to) {
return Ok(());
}
Err(format!(
"send refused: recipient '{to}' not in hyperhive.allowedRecipients \
(configured in agent.nix). Allowed: {allow:?}. The manager is \
always reachable — route through `send(to: \"manager\", …)` if \
you need to reach someone outside the allow-list."
))
}
#[derive(Debug, serde::Deserialize)]
struct ExtraMcpServer {
command: String,
#[serde(default)]
args: Vec<String>,
#[serde(default)]
env: std::collections::BTreeMap<String, String>,
#[serde(default = "default_allowed_tools")]
#[serde(rename = "allowedTools")]
allowed_tools: Vec<String>,
}
fn default_allowed_tools() -> Vec<String> {
vec!["*".to_owned()]
}
/// Read + parse the extra-MCP spec. Returns an empty map on missing /
/// unparsable file (the agent has none configured, or the file is
/// malformed — both cases degrade to "no extra servers").
fn load_extra_mcp() -> std::collections::BTreeMap<String, ExtraMcpServer> {
let Ok(raw) = std::fs::read_to_string(EXTRA_MCP_PATH) else {
return std::collections::BTreeMap::new();
};
serde_json::from_str(&raw).unwrap_or_else(|e| {
tracing::warn!(
path = EXTRA_MCP_PATH,
error = ?e,
"extra-mcp spec parse failed; ignoring",
);
std::collections::BTreeMap::new()
})
}
/// Render the MCP config blob claude reads from `--mcp-config <path>`.
/// `agent_binary` is the path (or PATH-resolvable name) of the `hive-ag3nt`
/// executable; `socket` is the hyperhive per-agent socket bind-mounted into
/// the container (forwarded to the child as `--socket <path>`). Merges in
/// any extra MCP servers declared via `hyperhive.extraMcpServers` in the
/// agent's NixOS config.
#[must_use]
pub fn render_claude_config(agent_binary: &str, socket: &std::path::Path) -> String {
let mut servers = serde_json::Map::new();
servers.insert(
SERVER_NAME.to_owned(),
serde_json::json!({
"command": agent_binary,
"args": ["--socket", socket.display().to_string(), "mcp"],
"env": {}
}),
);
for (name, spec) in load_extra_mcp() {
if name == SERVER_NAME {
tracing::warn!(
"extra MCP server name `{SERVER_NAME}` collides with the built-in surface; ignoring",
);
continue;
}
servers.insert(
name,
serde_json::json!({
"command": spec.command,
"args": spec.args,
"env": spec.env,
}),
);
}
let config = serde_json::json!({ "mcpServers": servers });
serde_json::to_string_pretty(&config).unwrap_or_else(|_| "{}".into())
}