diff --git a/CLAUDE.md b/CLAUDE.md index 7bb8180..ba7fca0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -173,9 +173,29 @@ The turn loop in `hive-ag3nt serve` writes Each turn invokes: ``` -claude --print --mcp-config --tools --allowedTools +claude --print --model haiku --mcp-config --tools --allowedTools ``` +**Loop control.** The harness pops one inbox message (the wake signal) per +cycle and hands claude a prompt naming the agent, the sender, the body, +and the MCP tools. Claude drives any further `recv`/`send` itself — +harness no longer relays claude's stdout as a reply. Stdout is logged for +debugging; the side effects (sends via MCP) are what matter. + +**Tool envelope.** Every MCP tool handler in `hive_ag3nt::mcp::AgentServer` +wraps its logic in `run_tool(name, args_debug, async { ... })`. The +envelope guarantees: +1. Pre-log of the request (tool + args). +2. The tool's own logic runs. +3. A status line is appended to the result body + (`[status] N unread message(s) in inbox`) so claude always sees the + current inbox depth without an extra tool call. +4. Post-log of the full result. + +`AgentRequest::Status` is the non-mutating peek that powers the status +line (broker's `count_pending`). When adding new tools (manager surface, +notes/state, etc.), use `run_tool` and they pick up the envelope for free. + **Tool whitelist** (see `ALLOWED_BUILTIN_TOOLS` in `hive-ag3nt::mcp`): - Allowed built-ins: `Bash`, `Edit`, `Glob`, `Grep`, `NotebookEdit`, `Read`, `TodoWrite`, `Write`. diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index 76c6254..ee2671c 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -123,32 +123,21 @@ async fn serve(socket: &Path, interval: Duration, state: Arc>) tracing::info!(socket = %socket.display(), "hive-ag3nt serve"); let _ = state; // reserved for future state transitions (turn-loop -> needs-login) let mcp_config = write_mcp_config(socket).await?; + let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hive-ag3nt".into()); loop { let recv: Result = client::request(socket, &AgentRequest::Recv).await; match recv { Ok(AgentResponse::Message { from, body }) => { tracing::info!(%from, %body, "inbox"); - // Don't auto-reply to echoes — prevents infinite ping-pong when - // both ends are falling back to echo. Real loop control is the - // manager's job (Phase 4+). - if !body.starts_with("echo: ") { - let reply = compute_reply(&body, &mcp_config).await; - let send: Result = client::request( - socket, - &AgentRequest::Send { - to: from, - body: reply, - }, - ) - .await; - if let Err(e) = send { - tracing::warn!(error = ?e, "send reply failed"); - } + let prompt = format_wake_prompt(&label, &from, &body); + match invoke_claude(&prompt, &mcp_config).await { + Ok(out) => tracing::info!(stdout = %out.trim(), "claude turn finished"), + Err(e) => tracing::warn!(error = %format!("{e:#}"), "claude turn failed"), } } Ok(AgentResponse::Empty) => {} - Ok(AgentResponse::Ok) => { - tracing::warn!("recv produced Ok (unexpected)"); + Ok(AgentResponse::Ok | AgentResponse::Status { .. }) => { + tracing::warn!("recv produced unexpected response kind"); } Ok(AgentResponse::Err { message }) => { tracing::warn!(%message, "recv error"); @@ -161,14 +150,27 @@ async fn serve(socket: &Path, interval: Duration, state: Arc>) } } -async fn compute_reply(prompt: &str, mcp_config: &Path) -> String { - match invoke_claude(prompt, mcp_config).await { - Ok(s) => s, - Err(e) => { - tracing::warn!(error = %format!("{e:#}"), "claude failed; falling back to echo"); - format!("echo: {prompt}") - } - } +/// System prompt handed to claude on each turn. The harness has already +/// popped one message off the inbox (the wake signal); claude is told +/// about it and the MCP tools, and is expected to drive any further +/// recv/send itself. +fn format_wake_prompt(label: &str, from: &str, body: &str) -> String { + format!( + "You are hyperhive agent `{label}` in a multi-agent system.\n\ + \n\ + Incoming message from `{from}`:\n\ + ---\n\ + {body}\n\ + ---\n\ + \n\ + Tools:\n\ + - `mcp__hyperhive__recv()` — drain one more message from your inbox \ + (returns `(empty)` if nothing pending).\n\ + - `mcp__hyperhive__send(to, body)` — message a peer (by their name) \ + or the operator (recipient `operator`, surfaces in the dashboard).\n\ + \n\ + Handle the inbox, then stop. Don't narrate intent — act." + ) } async fn invoke_claude(prompt: &str, mcp_config: &Path) -> Result { diff --git a/hive-ag3nt/src/mcp.rs b/hive-ag3nt/src/mcp.rs index 5e99ebf..264d1c3 100644 --- a/hive-ag3nt/src/mcp.rs +++ b/hive-ag3nt/src/mcp.rs @@ -48,6 +48,50 @@ impl AgentServer { pub fn new(socket: PathBuf) -> Self { Self { socket } } + + /// Wrap every tool handler in the same envelope: + /// 1. Log the request (tool name + args via `Debug`). + /// 2. Run the tool's actual logic. + /// 3. Append a status line (inbox state) to the result so claude always + /// has a current "how many unread messages" hint without an extra + /// tool call. + /// 4. Log the result body. + /// + /// New tools just call `self.run_tool("name", &args, async { ... })` + /// and get the same shape for free. + async fn run_tool(&self, tool: &'static str, args: String, body: F) -> String + where + F: std::future::Future, + { + tracing::info!(tool, %args, "tool: request"); + let result = body.await; + let status = self.status_line().await; + let full = if status.is_empty() { + result + } else { + format!("{result}\n\n[status] {status}") + }; + tracing::info!(tool, result = %full, "tool: result"); + full + } + + /// Non-mutating peek used in the status line. Falls back to a vague + /// note rather than failing the whole tool call when the socket + /// hiccups. + async fn status_line(&self) -> String { + match client::request::<_, hive_sh4re::AgentResponse>( + &self.socket, + &hive_sh4re::AgentRequest::Status, + ) + .await + { + Ok(hive_sh4re::AgentResponse::Status { unread }) => { + format!("{unread} unread message(s) in inbox") + } + Ok(other) => format!("status: unexpected response {other:?}"), + Err(e) => format!("status: transport error: {e:#}"), + } + } } #[tool_router] @@ -57,33 +101,42 @@ impl AgentServer { Use this to talk to peers or to surface output for the human at the dashboard." )] async fn send(&self, Parameters(args): Parameters) -> String { - let req = hive_sh4re::AgentRequest::Send { - to: args.to.clone(), - body: args.body, - }; - match client::request::<_, hive_sh4re::AgentResponse>(&self.socket, &req).await { - Ok(hive_sh4re::AgentResponse::Ok) => format!("sent to {}", args.to), - Ok(hive_sh4re::AgentResponse::Err { message }) => format!("send failed: {message}"), - Ok(other) => format!("send unexpected response: {other:?}"), - Err(e) => format!("send transport error: {e:#}"), - } + let log = format!("{args:?}"); + let to = args.to.clone(); + self.run_tool("send", log, async move { + let req = hive_sh4re::AgentRequest::Send { + to: args.to, + body: args.body, + }; + match client::request::<_, hive_sh4re::AgentResponse>(&self.socket, &req).await { + Ok(hive_sh4re::AgentResponse::Ok) => format!("sent to {to}"), + Ok(hive_sh4re::AgentResponse::Err { message }) => format!("send failed: {message}"), + Ok(other) => format!("send unexpected response: {other:?}"), + Err(e) => format!("send transport error: {e:#}"), + } + }) + .await } #[tool( description = "Pop one message from this agent's inbox. Returns the sender and body, \ or an empty marker if nothing is waiting." )] - async fn recv(&self, Parameters(_): Parameters) -> String { - let req = hive_sh4re::AgentRequest::Recv; - match client::request::<_, hive_sh4re::AgentResponse>(&self.socket, &req).await { - Ok(hive_sh4re::AgentResponse::Message { from, body }) => { - format!("from: {from}\n\n{body}") + async fn recv(&self, Parameters(args): Parameters) -> String { + let log = format!("{args:?}"); + self.run_tool("recv", log, async move { + let req = hive_sh4re::AgentRequest::Recv; + match client::request::<_, hive_sh4re::AgentResponse>(&self.socket, &req).await { + Ok(hive_sh4re::AgentResponse::Message { from, body }) => { + format!("from: {from}\n\n{body}") + } + Ok(hive_sh4re::AgentResponse::Empty) => "(empty)".into(), + Ok(hive_sh4re::AgentResponse::Err { message }) => format!("recv failed: {message}"), + Ok(other) => format!("recv unexpected response: {other:?}"), + Err(e) => format!("recv transport error: {e:#}"), } - Ok(hive_sh4re::AgentResponse::Empty) => "(empty)".into(), - Ok(hive_sh4re::AgentResponse::Err { message }) => format!("recv failed: {message}"), - Ok(other) => format!("recv unexpected response: {other:?}"), - Err(e) => format!("recv transport error: {e:#}"), - } + }) + .await } } diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index b08e18f..2525046 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -101,5 +101,11 @@ fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse { message: format!("{e:#}"), }, }, + AgentRequest::Status => match broker.count_pending(agent) { + Ok(unread) => AgentResponse::Status { unread }, + Err(e) => AgentResponse::Err { + message: format!("{e:#}"), + }, + }, } } diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 78d2810..eb8a2ec 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -86,6 +86,20 @@ impl Broker { Ok(()) } + /// Number of undelivered messages addressed to `recipient`. Non-mutating + /// — used by the harness to surface "N unread" in tool-result status + /// lines without popping the queue. + pub fn count_pending(&self, recipient: &str) -> Result { + let conn = self.conn.lock().unwrap(); + let n: i64 = conn.query_row( + "SELECT COUNT(*) FROM messages + WHERE recipient = ?1 AND delivered_at IS NULL", + params![recipient], + |row| row.get(0), + )?; + Ok(u64::try_from(n.max(0)).unwrap_or(0)) + } + pub fn recv(&self, recipient: &str) -> Result> { let conn = self.conn.lock().unwrap(); let row: Option<(i64, String, String, String)> = conn diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index 0cdb76d..b6a5dc3 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -148,6 +148,9 @@ pub enum AgentRequest { Send { to: String, body: String }, /// Pop one pending message from this agent's inbox. Recv, + /// Non-mutating: how many pending messages are addressed to me? + /// Used by the harness to render a status line after each tool call. + Status, } /// Responses on a per-agent socket. @@ -162,6 +165,8 @@ pub enum AgentResponse { Message { from: String, body: String }, /// `Recv` found nothing pending. Empty, + /// `Status` result: how many pending messages are in this agent's inbox. + Status { unread: u64 }, } // -----------------------------------------------------------------------------