diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index ed9f32b..7c661a0 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -116,7 +116,8 @@ async fn serve( let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hive-ag3nt".into()); let system_prompt = turn::write_system_prompt(socket, &label, mcp::Flavor::Agent).await?; loop { - let recv: Result = client::request(socket, &AgentRequest::Recv).await; + let recv: Result = + client::request(socket, &AgentRequest::Recv { wait_seconds: None }).await; match recv { Ok(AgentResponse::Message { from, body }) => { tracing::info!(%from, %body, "inbox"); diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 7d3113c..bf99716 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -96,7 +96,8 @@ async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> { let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hm1nd".into()); let system_prompt = turn::write_system_prompt(socket, &label, mcp::Flavor::Manager).await?; loop { - let recv: Result = client::request(socket, &ManagerRequest::Recv).await; + let recv: Result = + client::request(socket, &ManagerRequest::Recv { wait_seconds: None }).await; match recv { Ok(ManagerResponse::Message { from, body }) => { if from == SYSTEM_SENDER { diff --git a/hive-ag3nt/src/mcp.rs b/hive-ag3nt/src/mcp.rs index d47a021..c610ccf 100644 --- a/hive-ag3nt/src/mcp.rs +++ b/hive-ag3nt/src/mcp.rs @@ -116,7 +116,14 @@ pub struct SendArgs { } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -pub struct RecvArgs {} +pub struct RecvArgs { + /// How long to long-poll for a new message before returning the + /// empty marker. Capped at 60s server-side. Default (None) is + /// 30s. Useful when an agent wants to throttle wakes without + /// actually napping — pick a longer wait to coalesce bursts. + #[serde(default)] + pub wait_seconds: Option, +} /// Per-agent tool surface. Holds the socket path so each tool call doesn't /// re-derive it; the socket itself is the per-container `/run/hive/mcp.sock`. @@ -158,13 +165,17 @@ impl AgentServer { #[tool( description = "Pop one message from this agent's inbox. Returns the sender and body, \ - or an empty marker if nothing is waiting." + or an empty marker if nothing is waiting. Optional `wait_seconds` long-polls \ + for that many seconds (capped at 60) before returning empty — default 30." )] - async fn recv(&self, Parameters(_args): Parameters) -> String { - run_tool_envelope("recv", String::new(), async move { + async fn recv(&self, Parameters(args): Parameters) -> String { + let log = format!("{args:?}"); + run_tool_envelope("recv", log, async move { let resp = client::request::<_, hive_sh4re::AgentResponse>( &self.socket, - &hive_sh4re::AgentRequest::Recv, + &hive_sh4re::AgentRequest::Recv { + wait_seconds: args.wait_seconds, + }, ) .await .map(SocketReply::from); @@ -303,11 +314,17 @@ impl ManagerServer { #[tool( description = "Pop one message from the manager inbox. Returns sender + body, or \ - empty." + empty. Optional `wait_seconds` long-polls (capped at 60, default 30) so the \ + manager can sit on Recv when there's nothing to do without burning turns." )] - async fn recv(&self, Parameters(_args): Parameters) -> String { - run_tool_envelope("recv", String::new(), async move { - let resp = self.dispatch(hive_sh4re::ManagerRequest::Recv).await; + async fn recv(&self, Parameters(args): Parameters) -> String { + let log = format!("{args:?}"); + run_tool_envelope("recv", log, async move { + let resp = self + .dispatch(hive_sh4re::ManagerRequest::Recv { + wait_seconds: args.wait_seconds, + }) + .await; format_recv(resp) }) .await diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index 1dacf46..3904959 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -77,9 +77,19 @@ async fn serve(stream: UnixStream, agent: String, broker: Arc) -> Result } } -/// How long the long-poll `Recv` holds a connection open waiting for new -/// mail. Set well below typical TCP/proxy idle limits. -const RECV_LONG_POLL: std::time::Duration = std::time::Duration::from_secs(30); +/// Default and max long-poll window for `Recv`. Caller can request a +/// shorter (or longer up to `RECV_LONG_POLL_MAX`) wait via the +/// `wait_seconds` field; values above the cap are clamped. Set well +/// below typical TCP / proxy idle limits. +const RECV_LONG_POLL_DEFAULT: std::time::Duration = std::time::Duration::from_secs(30); +const RECV_LONG_POLL_MAX: std::time::Duration = std::time::Duration::from_secs(60); + +fn recv_timeout(wait_seconds: Option) -> std::time::Duration { + match wait_seconds { + Some(s) => std::time::Duration::from_secs(s).min(RECV_LONG_POLL_MAX), + None => RECV_LONG_POLL_DEFAULT, + } +} async fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse { match req { @@ -95,7 +105,10 @@ async fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResp }, } } - AgentRequest::Recv => match broker.recv_blocking(agent, RECV_LONG_POLL).await { + AgentRequest::Recv { wait_seconds } => match broker + .recv_blocking(agent, recv_timeout(*wait_seconds)) + .await + { Ok(Some(msg)) => AgentResponse::Message { from: msg.from, body: msg.body, diff --git a/hive-c0re/src/manager_server.rs b/hive-c0re/src/manager_server.rs index d946b27..8765c75 100644 --- a/hive-c0re/src/manager_server.rs +++ b/hive-c0re/src/manager_server.rs @@ -69,7 +69,17 @@ async fn serve(stream: UnixStream, coord: Arc) -> Result<()> { } } -const MANAGER_RECV_LONG_POLL: std::time::Duration = std::time::Duration::from_secs(30); +/// Default and max long-poll window for manager `Recv`. Caller can +/// request a shorter or longer (up to MAX) wait via `wait_seconds`. +const MANAGER_RECV_LONG_POLL_DEFAULT: std::time::Duration = std::time::Duration::from_secs(30); +const MANAGER_RECV_LONG_POLL_MAX: std::time::Duration = std::time::Duration::from_secs(60); + +fn manager_recv_timeout(wait_seconds: Option) -> std::time::Duration { + match wait_seconds { + Some(s) => std::time::Duration::from_secs(s).min(MANAGER_RECV_LONG_POLL_MAX), + None => MANAGER_RECV_LONG_POLL_DEFAULT, + } +} #[allow(clippy::too_many_lines)] async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResponse { @@ -106,9 +116,9 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResp message: format!("{e:#}"), }, }, - ManagerRequest::Recv => match coord + ManagerRequest::Recv { wait_seconds } => match coord .broker - .recv_blocking(MANAGER_AGENT, MANAGER_RECV_LONG_POLL) + .recv_blocking(MANAGER_AGENT, manager_recv_timeout(*wait_seconds)) .await { Ok(Some(msg)) => ManagerResponse::Message { diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index 52f8000..90949d8 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -166,8 +166,13 @@ pub struct InboxRow { pub enum AgentRequest { /// Send a message to another agent. Send { to: String, body: String }, - /// Pop one pending message from this agent's inbox. - Recv, + /// Pop one pending message from this agent's inbox. Long-polls + /// up to `wait_seconds` (capped at 60s server-side, default 30s + /// when None) before returning `Empty`. + Recv { + #[serde(default)] + wait_seconds: Option, + }, /// Non-mutating: how many pending messages are addressed to me? /// Used by the harness to render a status line after each tool call. Status, @@ -274,7 +279,12 @@ pub enum ManagerRequest { to: String, body: String, }, - Recv, + /// Same shape as `AgentRequest::Recv` — caller-tunable long-poll + /// duration, capped at 60s server-side, default 30s when None. + Recv { + #[serde(default)] + wait_seconds: Option, + }, /// Non-mutating: pending message count, used to render a status line /// after each MCP tool call (mirrors `AgentRequest::Status`). Status,