recv: optional wait_seconds parameter, capped at 60s
AgentRequest::Recv and ManagerRequest::Recv grow an optional wait_seconds field (default None → 30s, capped at 60s server-side). agent_server / manager_server clamp via recv_timeout(). MCP tool schemas advertise the param so claude can pick its own poll window — useful when an agent wants to throttle wakes without entering a distinct nap state. both harness loops still pass None, keeping the existing 30s default behaviour for system-level Recvs.
This commit is contained in:
parent
637085644d
commit
f65ee88269
6 changed files with 73 additions and 21 deletions
|
|
@ -116,7 +116,8 @@ async fn serve(
|
||||||
let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hive-ag3nt".into());
|
let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hive-ag3nt".into());
|
||||||
let system_prompt = turn::write_system_prompt(socket, &label, mcp::Flavor::Agent).await?;
|
let system_prompt = turn::write_system_prompt(socket, &label, mcp::Flavor::Agent).await?;
|
||||||
loop {
|
loop {
|
||||||
let recv: Result<AgentResponse> = client::request(socket, &AgentRequest::Recv).await;
|
let recv: Result<AgentResponse> =
|
||||||
|
client::request(socket, &AgentRequest::Recv { wait_seconds: None }).await;
|
||||||
match recv {
|
match recv {
|
||||||
Ok(AgentResponse::Message { from, body }) => {
|
Ok(AgentResponse::Message { from, body }) => {
|
||||||
tracing::info!(%from, %body, "inbox");
|
tracing::info!(%from, %body, "inbox");
|
||||||
|
|
|
||||||
|
|
@ -96,7 +96,8 @@ async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> {
|
||||||
let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hm1nd".into());
|
let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hm1nd".into());
|
||||||
let system_prompt = turn::write_system_prompt(socket, &label, mcp::Flavor::Manager).await?;
|
let system_prompt = turn::write_system_prompt(socket, &label, mcp::Flavor::Manager).await?;
|
||||||
loop {
|
loop {
|
||||||
let recv: Result<ManagerResponse> = client::request(socket, &ManagerRequest::Recv).await;
|
let recv: Result<ManagerResponse> =
|
||||||
|
client::request(socket, &ManagerRequest::Recv { wait_seconds: None }).await;
|
||||||
match recv {
|
match recv {
|
||||||
Ok(ManagerResponse::Message { from, body }) => {
|
Ok(ManagerResponse::Message { from, body }) => {
|
||||||
if from == SYSTEM_SENDER {
|
if from == SYSTEM_SENDER {
|
||||||
|
|
|
||||||
|
|
@ -116,7 +116,14 @@ pub struct SendArgs {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
|
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
|
||||||
pub struct RecvArgs {}
|
pub struct RecvArgs {
|
||||||
|
/// How long to long-poll for a new message before returning the
|
||||||
|
/// empty marker. Capped at 60s server-side. Default (None) is
|
||||||
|
/// 30s. Useful when an agent wants to throttle wakes without
|
||||||
|
/// actually napping — pick a longer wait to coalesce bursts.
|
||||||
|
#[serde(default)]
|
||||||
|
pub wait_seconds: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Per-agent tool surface. Holds the socket path so each tool call doesn't
|
/// Per-agent tool surface. Holds the socket path so each tool call doesn't
|
||||||
/// re-derive it; the socket itself is the per-container `/run/hive/mcp.sock`.
|
/// re-derive it; the socket itself is the per-container `/run/hive/mcp.sock`.
|
||||||
|
|
@ -158,13 +165,17 @@ impl AgentServer {
|
||||||
|
|
||||||
#[tool(
|
#[tool(
|
||||||
description = "Pop one message from this agent's inbox. Returns the sender and body, \
|
description = "Pop one message from this agent's inbox. Returns the sender and body, \
|
||||||
or an empty marker if nothing is waiting."
|
or an empty marker if nothing is waiting. Optional `wait_seconds` long-polls \
|
||||||
|
for that many seconds (capped at 60) before returning empty — default 30."
|
||||||
)]
|
)]
|
||||||
async fn recv(&self, Parameters(_args): Parameters<RecvArgs>) -> String {
|
async fn recv(&self, Parameters(args): Parameters<RecvArgs>) -> String {
|
||||||
run_tool_envelope("recv", String::new(), async move {
|
let log = format!("{args:?}");
|
||||||
|
run_tool_envelope("recv", log, async move {
|
||||||
let resp = client::request::<_, hive_sh4re::AgentResponse>(
|
let resp = client::request::<_, hive_sh4re::AgentResponse>(
|
||||||
&self.socket,
|
&self.socket,
|
||||||
&hive_sh4re::AgentRequest::Recv,
|
&hive_sh4re::AgentRequest::Recv {
|
||||||
|
wait_seconds: args.wait_seconds,
|
||||||
|
},
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map(SocketReply::from);
|
.map(SocketReply::from);
|
||||||
|
|
@ -303,11 +314,17 @@ impl ManagerServer {
|
||||||
|
|
||||||
#[tool(
|
#[tool(
|
||||||
description = "Pop one message from the manager inbox. Returns sender + body, or \
|
description = "Pop one message from the manager inbox. Returns sender + body, or \
|
||||||
empty."
|
empty. Optional `wait_seconds` long-polls (capped at 60, default 30) so the \
|
||||||
|
manager can sit on Recv when there's nothing to do without burning turns."
|
||||||
)]
|
)]
|
||||||
async fn recv(&self, Parameters(_args): Parameters<RecvArgs>) -> String {
|
async fn recv(&self, Parameters(args): Parameters<RecvArgs>) -> String {
|
||||||
run_tool_envelope("recv", String::new(), async move {
|
let log = format!("{args:?}");
|
||||||
let resp = self.dispatch(hive_sh4re::ManagerRequest::Recv).await;
|
run_tool_envelope("recv", log, async move {
|
||||||
|
let resp = self
|
||||||
|
.dispatch(hive_sh4re::ManagerRequest::Recv {
|
||||||
|
wait_seconds: args.wait_seconds,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
format_recv(resp)
|
format_recv(resp)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
|
|
|
||||||
|
|
@ -77,9 +77,19 @@ async fn serve(stream: UnixStream, agent: String, broker: Arc<Broker>) -> Result
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// How long the long-poll `Recv` holds a connection open waiting for new
|
/// Default and max long-poll window for `Recv`. Caller can request a
|
||||||
/// mail. Set well below typical TCP/proxy idle limits.
|
/// shorter (or longer up to `RECV_LONG_POLL_MAX`) wait via the
|
||||||
const RECV_LONG_POLL: std::time::Duration = std::time::Duration::from_secs(30);
|
/// `wait_seconds` field; values above the cap are clamped. Set well
|
||||||
|
/// below typical TCP / proxy idle limits.
|
||||||
|
const RECV_LONG_POLL_DEFAULT: std::time::Duration = std::time::Duration::from_secs(30);
|
||||||
|
const RECV_LONG_POLL_MAX: std::time::Duration = std::time::Duration::from_secs(60);
|
||||||
|
|
||||||
|
fn recv_timeout(wait_seconds: Option<u64>) -> std::time::Duration {
|
||||||
|
match wait_seconds {
|
||||||
|
Some(s) => std::time::Duration::from_secs(s).min(RECV_LONG_POLL_MAX),
|
||||||
|
None => RECV_LONG_POLL_DEFAULT,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse {
|
async fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse {
|
||||||
match req {
|
match req {
|
||||||
|
|
@ -95,7 +105,10 @@ async fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResp
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
AgentRequest::Recv => match broker.recv_blocking(agent, RECV_LONG_POLL).await {
|
AgentRequest::Recv { wait_seconds } => match broker
|
||||||
|
.recv_blocking(agent, recv_timeout(*wait_seconds))
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(Some(msg)) => AgentResponse::Message {
|
Ok(Some(msg)) => AgentResponse::Message {
|
||||||
from: msg.from,
|
from: msg.from,
|
||||||
body: msg.body,
|
body: msg.body,
|
||||||
|
|
|
||||||
|
|
@ -69,7 +69,17 @@ async fn serve(stream: UnixStream, coord: Arc<Coordinator>) -> Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const MANAGER_RECV_LONG_POLL: std::time::Duration = std::time::Duration::from_secs(30);
|
/// Default and max long-poll window for manager `Recv`. Caller can
|
||||||
|
/// request a shorter or longer (up to MAX) wait via `wait_seconds`.
|
||||||
|
const MANAGER_RECV_LONG_POLL_DEFAULT: std::time::Duration = std::time::Duration::from_secs(30);
|
||||||
|
const MANAGER_RECV_LONG_POLL_MAX: std::time::Duration = std::time::Duration::from_secs(60);
|
||||||
|
|
||||||
|
fn manager_recv_timeout(wait_seconds: Option<u64>) -> std::time::Duration {
|
||||||
|
match wait_seconds {
|
||||||
|
Some(s) => std::time::Duration::from_secs(s).min(MANAGER_RECV_LONG_POLL_MAX),
|
||||||
|
None => MANAGER_RECV_LONG_POLL_DEFAULT,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_lines)]
|
#[allow(clippy::too_many_lines)]
|
||||||
async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResponse {
|
async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResponse {
|
||||||
|
|
@ -106,9 +116,9 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResp
|
||||||
message: format!("{e:#}"),
|
message: format!("{e:#}"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
ManagerRequest::Recv => match coord
|
ManagerRequest::Recv { wait_seconds } => match coord
|
||||||
.broker
|
.broker
|
||||||
.recv_blocking(MANAGER_AGENT, MANAGER_RECV_LONG_POLL)
|
.recv_blocking(MANAGER_AGENT, manager_recv_timeout(*wait_seconds))
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Some(msg)) => ManagerResponse::Message {
|
Ok(Some(msg)) => ManagerResponse::Message {
|
||||||
|
|
|
||||||
|
|
@ -166,8 +166,13 @@ pub struct InboxRow {
|
||||||
pub enum AgentRequest {
|
pub enum AgentRequest {
|
||||||
/// Send a message to another agent.
|
/// Send a message to another agent.
|
||||||
Send { to: String, body: String },
|
Send { to: String, body: String },
|
||||||
/// Pop one pending message from this agent's inbox.
|
/// Pop one pending message from this agent's inbox. Long-polls
|
||||||
Recv,
|
/// up to `wait_seconds` (capped at 60s server-side, default 30s
|
||||||
|
/// when None) before returning `Empty`.
|
||||||
|
Recv {
|
||||||
|
#[serde(default)]
|
||||||
|
wait_seconds: Option<u64>,
|
||||||
|
},
|
||||||
/// Non-mutating: how many pending messages are addressed to me?
|
/// Non-mutating: how many pending messages are addressed to me?
|
||||||
/// Used by the harness to render a status line after each tool call.
|
/// Used by the harness to render a status line after each tool call.
|
||||||
Status,
|
Status,
|
||||||
|
|
@ -274,7 +279,12 @@ pub enum ManagerRequest {
|
||||||
to: String,
|
to: String,
|
||||||
body: String,
|
body: String,
|
||||||
},
|
},
|
||||||
Recv,
|
/// Same shape as `AgentRequest::Recv` — caller-tunable long-poll
|
||||||
|
/// duration, capped at 60s server-side, default 30s when None.
|
||||||
|
Recv {
|
||||||
|
#[serde(default)]
|
||||||
|
wait_seconds: Option<u64>,
|
||||||
|
},
|
||||||
/// Non-mutating: pending message count, used to render a status line
|
/// Non-mutating: pending message count, used to render a status line
|
||||||
/// after each MCP tool call (mirrors `AgentRequest::Status`).
|
/// after each MCP tool call (mirrors `AgentRequest::Status`).
|
||||||
Status,
|
Status,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue