retry hive socket up to 5x over 60s, surface retry count to claude

socket client now retries connect/IO failures with 2-4-8-16-30s
backoffs (60s total budget). transparent for non-tool callers via
request(); tool handlers go through request_retried() which also
returns the retry count, then annotate_retries() appends a one-line
note to the tool result so claude knows the slow round-trip was a
c0re flicker, not a content failure — avoids burning tokens on an
LLM-level retry.
This commit is contained in:
müde 2026-05-16 15:28:18 +02:00
parent 4a8a668348
commit 7d33da3727
2 changed files with 180 additions and 67 deletions

View file

@ -1,33 +1,109 @@
use std::path::Path;
use std::time::Duration;
use anyhow::{Context, Result, bail};
use anyhow::{Context, Result, anyhow};
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
/// Generic JSON-line request/response over a unix socket. One request, one
/// response, then drop. Used by both the agent and manager harnesses.
/// Backoff schedule between attempts. Five entries → up to 5 retries on
/// top of the initial attempt; total wall-clock cap = 2+4+8+16+30 = 60s.
/// Sized to ride out a hive-c0re restart (systemd usually has the unix
/// socket back inside ~5s) without the agent-side claude session having
/// to handle the transient itself — burning tokens on a tool-error retry
/// loop is more expensive than 60s of in-harness sleep.
const RETRY_BACKOFFS_MS: &[u64] = &[2_000, 4_000, 8_000, 16_000, 30_000];
/// Transparent retry wrapper around [`request_retried`] that throws away
/// the retry count. Use this from non-tool callers (the harness serve
/// loop, web UI, CLI subcommands) where we just want the socket-restart
/// resilience without surfacing the bookkeeping.
pub async fn request<Req, Resp>(socket: &Path, req: &Req) -> Result<Resp>
where
Req: Serialize + ?Sized,
Resp: DeserializeOwned,
{
request_retried(socket, req).await.map(|(resp, _)| resp)
}
/// Same wire shape as [`request`], but reports how many retries it took
/// past the initial attempt (0 = succeeded first try). MCP tool handlers
/// use this so they can append a one-line hint to the tool result when
/// retries happened — that way claude knows the prior socket flake
/// wasn't a content error and shouldn't trigger an LLM-level retry of
/// its own.
pub async fn request_retried<Req, Resp>(socket: &Path, req: &Req) -> Result<(Resp, u32)>
where
Req: Serialize + ?Sized,
Resp: DeserializeOwned,
{
let mut last_err: Option<anyhow::Error> = None;
let max_retries = u32::try_from(RETRY_BACKOFFS_MS.len()).unwrap();
for attempt in 0..=max_retries {
match try_once::<Req, Resp>(socket, req).await {
Ok(resp) => return Ok((resp, attempt)),
Err(RequestError::Fatal(e)) => return Err(e),
Err(RequestError::Transient(e)) => {
if attempt < max_retries {
let sleep_ms = RETRY_BACKOFFS_MS[attempt as usize];
tracing::warn!(
attempt = attempt + 1,
sleep_ms,
error = %e,
"hive socket attempt failed; retrying"
);
last_err = Some(e);
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
} else {
last_err = Some(e);
}
}
}
}
Err(last_err.unwrap_or_else(|| anyhow!("hive socket: retries exhausted")))
}
/// Transient = connect / IO error worth a retry (server restart, broken
/// pipe). Fatal = serialization / deserialization / protocol error
/// where retrying would just repeat the same failure.
enum RequestError {
Transient(anyhow::Error),
Fatal(anyhow::Error),
}
async fn try_once<Req, Resp>(socket: &Path, req: &Req) -> Result<Resp, RequestError>
where
Req: Serialize + ?Sized,
Resp: DeserializeOwned,
{
let stream = UnixStream::connect(socket)
.await
.with_context(|| format!("connect to {}", socket.display()))?;
.with_context(|| format!("connect to {}", socket.display()))
.map_err(RequestError::Transient)?;
let (read, mut write) = stream.into_split();
let mut payload = serde_json::to_string(req)?;
let mut payload = serde_json::to_string(req).map_err(|e| RequestError::Fatal(e.into()))?;
payload.push('\n');
write.write_all(payload.as_bytes()).await?;
write.flush().await?;
write
.write_all(payload.as_bytes())
.await
.map_err(|e| RequestError::Transient(e.into()))?;
write
.flush()
.await
.map_err(|e| RequestError::Transient(e.into()))?;
let mut reader = BufReader::new(read);
let mut line = String::new();
reader.read_line(&mut line).await?;
if line.is_empty() {
bail!("server closed connection without responding");
let read_bytes = reader
.read_line(&mut line)
.await
.map_err(|e| RequestError::Transient(e.into()))?;
if read_bytes == 0 || line.is_empty() {
return Err(RequestError::Transient(anyhow!(
"server closed connection without responding"
)));
}
Ok(serde_json::from_str(line.trim())?)
serde_json::from_str(line.trim()).map_err(|e| RequestError::Fatal(e.into()))
}