diff --git a/hive-ag3nt/src/client.rs b/hive-ag3nt/src/client.rs index 5a314bd..64e1293 100644 --- a/hive-ag3nt/src/client.rs +++ b/hive-ag3nt/src/client.rs @@ -1,33 +1,109 @@ use std::path::Path; +use std::time::Duration; -use anyhow::{Context, Result, bail}; +use anyhow::{Context, Result, anyhow}; use serde::Serialize; use serde::de::DeserializeOwned; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixStream; -/// Generic JSON-line request/response over a unix socket. One request, one -/// response, then drop. Used by both the agent and manager harnesses. +/// Backoff schedule between attempts. Five entries → up to 5 retries on +/// top of the initial attempt; total wall-clock cap = 2+4+8+16+30 = 60s. +/// Sized to ride out a hive-c0re restart (systemd usually has the unix +/// socket back inside ~5s) without the agent-side claude session having +/// to handle the transient itself — burning tokens on a tool-error retry +/// loop is more expensive than 60s of in-harness sleep. +const RETRY_BACKOFFS_MS: &[u64] = &[2_000, 4_000, 8_000, 16_000, 30_000]; + +/// Transparent retry wrapper around [`request_retried`] that throws away +/// the retry count. Use this from non-tool callers (the harness serve +/// loop, web UI, CLI subcommands) where we just want the socket-restart +/// resilience without surfacing the bookkeeping. pub async fn request(socket: &Path, req: &Req) -> Result +where + Req: Serialize + ?Sized, + Resp: DeserializeOwned, +{ + request_retried(socket, req).await.map(|(resp, _)| resp) +} + +/// Same wire shape as [`request`], but reports how many retries it took +/// past the initial attempt (0 = succeeded first try). MCP tool handlers +/// use this so they can append a one-line hint to the tool result when +/// retries happened — that way claude knows the prior socket flake +/// wasn't a content error and shouldn't trigger an LLM-level retry of +/// its own. +pub async fn request_retried(socket: &Path, req: &Req) -> Result<(Resp, u32)> +where + Req: Serialize + ?Sized, + Resp: DeserializeOwned, +{ + let mut last_err: Option = None; + let max_retries = u32::try_from(RETRY_BACKOFFS_MS.len()).unwrap(); + for attempt in 0..=max_retries { + match try_once::(socket, req).await { + Ok(resp) => return Ok((resp, attempt)), + Err(RequestError::Fatal(e)) => return Err(e), + Err(RequestError::Transient(e)) => { + if attempt < max_retries { + let sleep_ms = RETRY_BACKOFFS_MS[attempt as usize]; + tracing::warn!( + attempt = attempt + 1, + sleep_ms, + error = %e, + "hive socket attempt failed; retrying" + ); + last_err = Some(e); + tokio::time::sleep(Duration::from_millis(sleep_ms)).await; + } else { + last_err = Some(e); + } + } + } + } + Err(last_err.unwrap_or_else(|| anyhow!("hive socket: retries exhausted"))) +} + +/// Transient = connect / IO error worth a retry (server restart, broken +/// pipe). Fatal = serialization / deserialization / protocol error +/// where retrying would just repeat the same failure. +enum RequestError { + Transient(anyhow::Error), + Fatal(anyhow::Error), +} + +async fn try_once(socket: &Path, req: &Req) -> Result where Req: Serialize + ?Sized, Resp: DeserializeOwned, { let stream = UnixStream::connect(socket) .await - .with_context(|| format!("connect to {}", socket.display()))?; + .with_context(|| format!("connect to {}", socket.display())) + .map_err(RequestError::Transient)?; let (read, mut write) = stream.into_split(); - let mut payload = serde_json::to_string(req)?; + let mut payload = serde_json::to_string(req).map_err(|e| RequestError::Fatal(e.into()))?; payload.push('\n'); - write.write_all(payload.as_bytes()).await?; - write.flush().await?; + write + .write_all(payload.as_bytes()) + .await + .map_err(|e| RequestError::Transient(e.into()))?; + write + .flush() + .await + .map_err(|e| RequestError::Transient(e.into()))?; let mut reader = BufReader::new(read); let mut line = String::new(); - reader.read_line(&mut line).await?; - if line.is_empty() { - bail!("server closed connection without responding"); + let read_bytes = reader + .read_line(&mut line) + .await + .map_err(|e| RequestError::Transient(e.into()))?; + if read_bytes == 0 || line.is_empty() { + return Err(RequestError::Transient(anyhow!( + "server closed connection without responding" + ))); } - Ok(serde_json::from_str(line.trim())?) + serde_json::from_str(line.trim()).map_err(|e| RequestError::Fatal(e.into())) } diff --git a/hive-ag3nt/src/mcp.rs b/hive-ag3nt/src/mcp.rs index 90a8e73..0a72639 100644 --- a/hive-ag3nt/src/mcp.rs +++ b/hive-ag3nt/src/mcp.rs @@ -107,6 +107,22 @@ where result } +/// Append a short note to a tool result when the underlying socket call +/// took retries to land. Lets claude distinguish "my request was wrong" +/// from "c0re flickered and the harness rode it out" — without the +/// hint, a tool result that took 30s to come back looks identical to a +/// content failure and the model would burn a turn retrying it. +pub fn annotate_retries(mut s: String, retries: u32) -> String { + if retries > 0 { + let suffix = if retries == 1 { "retry" } else { "retries" }; + s.push_str(&format!( + "\n\n(note: hive socket connect needed {retries} {suffix} — c0re likely \ + restarted. Your request did succeed on the final attempt; no action needed.)" + )); + } + s +} + #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct SendArgs { /// Logical agent name to deliver the message to (e.g. `"manager"`, @@ -138,6 +154,19 @@ impl AgentServer { pub fn new(socket: PathBuf) -> Self { Self { socket } } + + /// Issue any `AgentRequest` through the retry-aware client and pull + /// the reply through `SocketReply`. Returns the retry count so tool + /// handlers can annotate their result (see `annotate_retries`). + async fn dispatch( + &self, + req: hive_sh4re::AgentRequest, + ) -> (Result, u32) { + match client::request_retried::<_, hive_sh4re::AgentResponse>(&self.socket, &req).await { + Ok((r, n)) => (Ok(SocketReply::from(r)), n), + Err(e) => (Err(e), 0), + } + } } #[tool_router] @@ -153,16 +182,13 @@ impl AgentServer { return run_tool_envelope("send", log, async move { refusal }).await; } run_tool_envelope("send", log, async move { - let resp = client::request::<_, hive_sh4re::AgentResponse>( - &self.socket, - &hive_sh4re::AgentRequest::Send { + let (resp, retries) = self + .dispatch(hive_sh4re::AgentRequest::Send { to: args.to, body: args.body, - }, - ) - .await - .map(SocketReply::from); - format_ack(resp, "send", format!("sent to {to}")) + }) + .await; + annotate_retries(format_ack(resp, "send", format!("sent to {to}")), retries) }) .await } @@ -182,18 +208,15 @@ impl AgentServer { async fn ask_operator(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); run_tool_envelope("ask_operator", log, async move { - let resp = client::request::<_, hive_sh4re::AgentResponse>( - &self.socket, - &hive_sh4re::AgentRequest::AskOperator { + let (resp, retries) = self + .dispatch(hive_sh4re::AgentRequest::AskOperator { question: args.question, options: args.options, multi: args.multi, ttl_seconds: args.ttl_seconds, - }, - ) - .await - .map(SocketReply::from); - match resp { + }) + .await; + let s = match resp { Ok(SocketReply::QuestionQueued(id)) => format!( "question queued (id={id}); operator's answer will arrive as a system \ `operator_answered` event in your inbox" @@ -201,7 +224,8 @@ impl AgentServer { Ok(SocketReply::Err(m)) => format!("ask_operator failed: {m}"), Ok(other) => format!("ask_operator unexpected response: {other:?}"), Err(e) => format!("ask_operator transport error: {e:#}"), - } + }; + annotate_retries(s, retries) }) .await } @@ -219,15 +243,12 @@ impl AgentServer { async fn recv(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); run_tool_envelope("recv", log, async move { - let resp = client::request::<_, hive_sh4re::AgentResponse>( - &self.socket, - &hive_sh4re::AgentRequest::Recv { + let (resp, retries) = self + .dispatch(hive_sh4re::AgentRequest::Recv { wait_seconds: args.wait_seconds, - }, - ) - .await - .map(SocketReply::from); - format_recv(resp) + }) + .await; + annotate_retries(format_recv(resp), retries) }) .await } @@ -341,15 +362,18 @@ impl ManagerServer { Self { socket } } - /// Helper: issue any `ManagerRequest`, convert the reply through - /// `SocketReply`. Manager tools that just need an `Ok` ack share this. + /// Helper: issue any `ManagerRequest` through the retry-aware + /// client, convert the reply through `SocketReply`, and return the + /// retry count alongside so the tool handler can `annotate_retries` + /// on the final string. async fn dispatch( &self, req: hive_sh4re::ManagerRequest, - ) -> Result { - client::request::<_, hive_sh4re::ManagerResponse>(&self.socket, &req) - .await - .map(SocketReply::from) + ) -> (Result, u32) { + match client::request_retried::<_, hive_sh4re::ManagerResponse>(&self.socket, &req).await { + Ok((r, n)) => (Ok(SocketReply::from(r)), n), + Err(e) => (Err(e), 0), + } } } @@ -363,13 +387,13 @@ impl ManagerServer { let log = format!("{args:?}"); let to = args.to.clone(); run_tool_envelope("send", log, async move { - let resp = self + let (resp, retries) = self .dispatch(hive_sh4re::ManagerRequest::Send { to: args.to, body: args.body, }) .await; - format_ack(resp, "send", format!("sent to {to}")) + annotate_retries(format_ack(resp, "send", format!("sent to {to}")), retries) }) .await } @@ -384,12 +408,12 @@ impl ManagerServer { async fn recv(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); run_tool_envelope("recv", log, async move { - let resp = self + let (resp, retries) = self .dispatch(hive_sh4re::ManagerRequest::Recv { wait_seconds: args.wait_seconds, }) .await; - format_recv(resp) + annotate_retries(format_recv(resp), retries) }) .await } @@ -402,16 +426,19 @@ impl ManagerServer { let log = format!("{args:?}"); let name = args.name.clone(); run_tool_envelope("request_spawn", log, async move { - let resp = self + let (resp, retries) = self .dispatch(hive_sh4re::ManagerRequest::RequestSpawn { name: args.name, description: args.description, }) .await; - format_ack( - resp, - "request_spawn", - format!("spawn approval queued for {name}"), + annotate_retries( + format_ack( + resp, + "request_spawn", + format!("spawn approval queued for {name}"), + ), + retries, ) }) .await @@ -425,10 +452,10 @@ impl ManagerServer { let log = format!("{args:?}"); let name = args.name.clone(); run_tool_envelope("kill", log, async move { - let resp = self + let (resp, retries) = self .dispatch(hive_sh4re::ManagerRequest::Kill { name: args.name }) .await; - format_ack(resp, "kill", format!("killed {name}")) + annotate_retries(format_ack(resp, "kill", format!("killed {name}")), retries) }) .await } @@ -441,10 +468,10 @@ impl ManagerServer { let log = format!("{args:?}"); let name = args.name.clone(); run_tool_envelope("start", log, async move { - let resp = self + let (resp, retries) = self .dispatch(hive_sh4re::ManagerRequest::Start { name: args.name }) .await; - format_ack(resp, "start", format!("started {name}")) + annotate_retries(format_ack(resp, "start", format!("started {name}")), retries) }) .await } @@ -454,10 +481,13 @@ impl ManagerServer { let log = format!("{args:?}"); let name = args.name.clone(); run_tool_envelope("restart", log, async move { - let resp = self + let (resp, retries) = self .dispatch(hive_sh4re::ManagerRequest::Restart { name: args.name }) .await; - format_ack(resp, "restart", format!("restarted {name}")) + annotate_retries( + format_ack(resp, "restart", format!("restarted {name}")), + retries, + ) }) .await } @@ -471,10 +501,13 @@ impl ManagerServer { let log = format!("{args:?}"); let name = args.name.clone(); run_tool_envelope("update", log, async move { - let resp = self + let (resp, retries) = self .dispatch(hive_sh4re::ManagerRequest::Update { name: args.name }) .await; - format_ack(resp, "update", format!("updated {name}")) + annotate_retries( + format_ack(resp, "update", format!("updated {name}")), + retries, + ) }) .await } @@ -494,7 +527,7 @@ impl ManagerServer { async fn ask_operator(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); run_tool_envelope("ask_operator", log, async move { - let resp = self + let (resp, retries) = self .dispatch(hive_sh4re::ManagerRequest::AskOperator { question: args.question, options: args.options, @@ -502,7 +535,7 @@ impl ManagerServer { ttl_seconds: args.ttl_seconds, }) .await; - match resp { + let s = match resp { Ok(SocketReply::QuestionQueued(id)) => format!( "question queued (id={id}); operator's answer will arrive as a system \ `operator_answered` event in your inbox" @@ -510,7 +543,8 @@ impl ManagerServer { Ok(SocketReply::Err(m)) => format!("ask_operator failed: {m}"), Ok(other) => format!("ask_operator unexpected response: {other:?}"), Err(e) => format!("ask_operator transport error: {e:#}"), - } + }; + annotate_retries(s, retries) }) .await } @@ -528,17 +562,20 @@ impl ManagerServer { let agent = args.agent.clone(); let commit_ref = args.commit_ref.clone(); run_tool_envelope("request_apply_commit", log, async move { - let resp = self + let (resp, retries) = self .dispatch(hive_sh4re::ManagerRequest::RequestApplyCommit { agent: args.agent, commit_ref: args.commit_ref, description: args.description, }) .await; - format_ack( - resp, - "request_apply_commit", - format!("apply approval queued for {agent} @ {commit_ref}"), + annotate_retries( + format_ack( + resp, + "request_apply_commit", + format!("apply approval queued for {agent} @ {commit_ref}"), + ), + retries, ) }) .await