agent loop: claude drives; tool envelope (log/run/status/log)

This commit is contained in:
müde 2026-05-15 14:54:10 +02:00
parent a061f83cfa
commit 3c9d42b2a7
6 changed files with 147 additions and 47 deletions

View file

@ -173,9 +173,29 @@ The turn loop in `hive-ag3nt serve` writes
Each turn invokes: Each turn invokes:
``` ```
claude --print --mcp-config <path> --tools <builtins> --allowedTools <builtins+mcp> <prompt> claude --print --model haiku --mcp-config <path> --tools <builtins> --allowedTools <builtins+mcp> <prompt>
``` ```
**Loop control.** The harness pops one inbox message (the wake signal) per
cycle and hands claude a prompt naming the agent, the sender, the body,
and the MCP tools. Claude drives any further `recv`/`send` itself —
harness no longer relays claude's stdout as a reply. Stdout is logged for
debugging; the side effects (sends via MCP) are what matter.
**Tool envelope.** Every MCP tool handler in `hive_ag3nt::mcp::AgentServer`
wraps its logic in `run_tool(name, args_debug, async { ... })`. The
envelope guarantees:
1. Pre-log of the request (tool + args).
2. The tool's own logic runs.
3. A status line is appended to the result body
(`[status] N unread message(s) in inbox`) so claude always sees the
current inbox depth without an extra tool call.
4. Post-log of the full result.
`AgentRequest::Status` is the non-mutating peek that powers the status
line (broker's `count_pending`). When adding new tools (manager surface,
notes/state, etc.), use `run_tool` and they pick up the envelope for free.
**Tool whitelist** (see `ALLOWED_BUILTIN_TOOLS` in `hive-ag3nt::mcp`): **Tool whitelist** (see `ALLOWED_BUILTIN_TOOLS` in `hive-ag3nt::mcp`):
- Allowed built-ins: `Bash`, `Edit`, `Glob`, `Grep`, `NotebookEdit`, `Read`, - Allowed built-ins: `Bash`, `Edit`, `Glob`, `Grep`, `NotebookEdit`, `Read`,
`TodoWrite`, `Write`. `TodoWrite`, `Write`.

View file

@ -123,32 +123,21 @@ async fn serve(socket: &Path, interval: Duration, state: Arc<Mutex<LoginState>>)
tracing::info!(socket = %socket.display(), "hive-ag3nt serve"); tracing::info!(socket = %socket.display(), "hive-ag3nt serve");
let _ = state; // reserved for future state transitions (turn-loop -> needs-login) let _ = state; // reserved for future state transitions (turn-loop -> needs-login)
let mcp_config = write_mcp_config(socket).await?; let mcp_config = write_mcp_config(socket).await?;
let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hive-ag3nt".into());
loop { loop {
let recv: Result<AgentResponse> = client::request(socket, &AgentRequest::Recv).await; let recv: Result<AgentResponse> = client::request(socket, &AgentRequest::Recv).await;
match recv { match recv {
Ok(AgentResponse::Message { from, body }) => { Ok(AgentResponse::Message { from, body }) => {
tracing::info!(%from, %body, "inbox"); tracing::info!(%from, %body, "inbox");
// Don't auto-reply to echoes — prevents infinite ping-pong when let prompt = format_wake_prompt(&label, &from, &body);
// both ends are falling back to echo. Real loop control is the match invoke_claude(&prompt, &mcp_config).await {
// manager's job (Phase 4+). Ok(out) => tracing::info!(stdout = %out.trim(), "claude turn finished"),
if !body.starts_with("echo: ") { Err(e) => tracing::warn!(error = %format!("{e:#}"), "claude turn failed"),
let reply = compute_reply(&body, &mcp_config).await;
let send: Result<AgentResponse> = client::request(
socket,
&AgentRequest::Send {
to: from,
body: reply,
},
)
.await;
if let Err(e) = send {
tracing::warn!(error = ?e, "send reply failed");
}
} }
} }
Ok(AgentResponse::Empty) => {} Ok(AgentResponse::Empty) => {}
Ok(AgentResponse::Ok) => { Ok(AgentResponse::Ok | AgentResponse::Status { .. }) => {
tracing::warn!("recv produced Ok (unexpected)"); tracing::warn!("recv produced unexpected response kind");
} }
Ok(AgentResponse::Err { message }) => { Ok(AgentResponse::Err { message }) => {
tracing::warn!(%message, "recv error"); tracing::warn!(%message, "recv error");
@ -161,14 +150,27 @@ async fn serve(socket: &Path, interval: Duration, state: Arc<Mutex<LoginState>>)
} }
} }
async fn compute_reply(prompt: &str, mcp_config: &Path) -> String { /// System prompt handed to claude on each turn. The harness has already
match invoke_claude(prompt, mcp_config).await { /// popped one message off the inbox (the wake signal); claude is told
Ok(s) => s, /// about it and the MCP tools, and is expected to drive any further
Err(e) => { /// recv/send itself.
tracing::warn!(error = %format!("{e:#}"), "claude failed; falling back to echo"); fn format_wake_prompt(label: &str, from: &str, body: &str) -> String {
format!("echo: {prompt}") format!(
} "You are hyperhive agent `{label}` in a multi-agent system.\n\
} \n\
Incoming message from `{from}`:\n\
---\n\
{body}\n\
---\n\
\n\
Tools:\n\
- `mcp__hyperhive__recv()` drain one more message from your inbox \
(returns `(empty)` if nothing pending).\n\
- `mcp__hyperhive__send(to, body)` message a peer (by their name) \
or the operator (recipient `operator`, surfaces in the dashboard).\n\
\n\
Handle the inbox, then stop. Don't narrate intent act."
)
} }
async fn invoke_claude(prompt: &str, mcp_config: &Path) -> Result<String> { async fn invoke_claude(prompt: &str, mcp_config: &Path) -> Result<String> {

View file

@ -48,6 +48,50 @@ impl AgentServer {
pub fn new(socket: PathBuf) -> Self { pub fn new(socket: PathBuf) -> Self {
Self { socket } Self { socket }
} }
/// Wrap every tool handler in the same envelope:
/// 1. Log the request (tool name + args via `Debug`).
/// 2. Run the tool's actual logic.
/// 3. Append a status line (inbox state) to the result so claude always
/// has a current "how many unread messages" hint without an extra
/// tool call.
/// 4. Log the result body.
///
/// New tools just call `self.run_tool("name", &args, async { ... })`
/// and get the same shape for free.
async fn run_tool<F>(&self, tool: &'static str, args: String, body: F) -> String
where
F: std::future::Future<Output = String>,
{
tracing::info!(tool, %args, "tool: request");
let result = body.await;
let status = self.status_line().await;
let full = if status.is_empty() {
result
} else {
format!("{result}\n\n[status] {status}")
};
tracing::info!(tool, result = %full, "tool: result");
full
}
/// Non-mutating peek used in the status line. Falls back to a vague
/// note rather than failing the whole tool call when the socket
/// hiccups.
async fn status_line(&self) -> String {
match client::request::<_, hive_sh4re::AgentResponse>(
&self.socket,
&hive_sh4re::AgentRequest::Status,
)
.await
{
Ok(hive_sh4re::AgentResponse::Status { unread }) => {
format!("{unread} unread message(s) in inbox")
}
Ok(other) => format!("status: unexpected response {other:?}"),
Err(e) => format!("status: transport error: {e:#}"),
}
}
} }
#[tool_router] #[tool_router]
@ -57,33 +101,42 @@ impl AgentServer {
Use this to talk to peers or to surface output for the human at the dashboard." Use this to talk to peers or to surface output for the human at the dashboard."
)] )]
async fn send(&self, Parameters(args): Parameters<SendArgs>) -> String { async fn send(&self, Parameters(args): Parameters<SendArgs>) -> String {
let req = hive_sh4re::AgentRequest::Send { let log = format!("{args:?}");
to: args.to.clone(), let to = args.to.clone();
body: args.body, self.run_tool("send", log, async move {
}; let req = hive_sh4re::AgentRequest::Send {
match client::request::<_, hive_sh4re::AgentResponse>(&self.socket, &req).await { to: args.to,
Ok(hive_sh4re::AgentResponse::Ok) => format!("sent to {}", args.to), body: args.body,
Ok(hive_sh4re::AgentResponse::Err { message }) => format!("send failed: {message}"), };
Ok(other) => format!("send unexpected response: {other:?}"), match client::request::<_, hive_sh4re::AgentResponse>(&self.socket, &req).await {
Err(e) => format!("send transport error: {e:#}"), Ok(hive_sh4re::AgentResponse::Ok) => format!("sent to {to}"),
} Ok(hive_sh4re::AgentResponse::Err { message }) => format!("send failed: {message}"),
Ok(other) => format!("send unexpected response: {other:?}"),
Err(e) => format!("send transport error: {e:#}"),
}
})
.await
} }
#[tool( #[tool(
description = "Pop one message from this agent's inbox. Returns the sender and body, \ description = "Pop one message from this agent's inbox. Returns the sender and body, \
or an empty marker if nothing is waiting." or an empty marker if nothing is waiting."
)] )]
async fn recv(&self, Parameters(_): Parameters<RecvArgs>) -> String { async fn recv(&self, Parameters(args): Parameters<RecvArgs>) -> String {
let req = hive_sh4re::AgentRequest::Recv; let log = format!("{args:?}");
match client::request::<_, hive_sh4re::AgentResponse>(&self.socket, &req).await { self.run_tool("recv", log, async move {
Ok(hive_sh4re::AgentResponse::Message { from, body }) => { let req = hive_sh4re::AgentRequest::Recv;
format!("from: {from}\n\n{body}") match client::request::<_, hive_sh4re::AgentResponse>(&self.socket, &req).await {
Ok(hive_sh4re::AgentResponse::Message { from, body }) => {
format!("from: {from}\n\n{body}")
}
Ok(hive_sh4re::AgentResponse::Empty) => "(empty)".into(),
Ok(hive_sh4re::AgentResponse::Err { message }) => format!("recv failed: {message}"),
Ok(other) => format!("recv unexpected response: {other:?}"),
Err(e) => format!("recv transport error: {e:#}"),
} }
Ok(hive_sh4re::AgentResponse::Empty) => "(empty)".into(), })
Ok(hive_sh4re::AgentResponse::Err { message }) => format!("recv failed: {message}"), .await
Ok(other) => format!("recv unexpected response: {other:?}"),
Err(e) => format!("recv transport error: {e:#}"),
}
} }
} }

View file

@ -101,5 +101,11 @@ fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse {
message: format!("{e:#}"), message: format!("{e:#}"),
}, },
}, },
AgentRequest::Status => match broker.count_pending(agent) {
Ok(unread) => AgentResponse::Status { unread },
Err(e) => AgentResponse::Err {
message: format!("{e:#}"),
},
},
} }
} }

View file

@ -86,6 +86,20 @@ impl Broker {
Ok(()) Ok(())
} }
/// Number of undelivered messages addressed to `recipient`. Non-mutating
/// — used by the harness to surface "N unread" in tool-result status
/// lines without popping the queue.
pub fn count_pending(&self, recipient: &str) -> Result<u64> {
let conn = self.conn.lock().unwrap();
let n: i64 = conn.query_row(
"SELECT COUNT(*) FROM messages
WHERE recipient = ?1 AND delivered_at IS NULL",
params![recipient],
|row| row.get(0),
)?;
Ok(u64::try_from(n.max(0)).unwrap_or(0))
}
pub fn recv(&self, recipient: &str) -> Result<Option<Message>> { pub fn recv(&self, recipient: &str) -> Result<Option<Message>> {
let conn = self.conn.lock().unwrap(); let conn = self.conn.lock().unwrap();
let row: Option<(i64, String, String, String)> = conn let row: Option<(i64, String, String, String)> = conn

View file

@ -148,6 +148,9 @@ pub enum AgentRequest {
Send { to: String, body: String }, Send { to: String, body: String },
/// Pop one pending message from this agent's inbox. /// Pop one pending message from this agent's inbox.
Recv, Recv,
/// Non-mutating: how many pending messages are addressed to me?
/// Used by the harness to render a status line after each tool call.
Status,
} }
/// Responses on a per-agent socket. /// Responses on a per-agent socket.
@ -162,6 +165,8 @@ pub enum AgentResponse {
Message { from: String, body: String }, Message { from: String, body: String },
/// `Recv` found nothing pending. /// `Recv` found nothing pending.
Empty, Empty,
/// `Status` result: how many pending messages are in this agent's inbox.
Status { unread: u64 },
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------