use std::path::Path; use std::time::Duration; use anyhow::{Context, Result, anyhow}; use serde::Serialize; use serde::de::DeserializeOwned; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixStream; /// Backoff schedule between attempts. Five entries → up to 5 retries on /// top of the initial attempt; total wall-clock cap = 2+4+8+16+30 = 60s. /// Sized to ride out a hive-c0re restart (systemd usually has the unix /// socket back inside ~5s) without the agent-side claude session having /// to handle the transient itself — burning tokens on a tool-error retry /// loop is more expensive than 60s of in-harness sleep. const RETRY_BACKOFFS_MS: &[u64] = &[2_000, 4_000, 8_000, 16_000, 30_000]; /// Transparent retry wrapper around [`request_retried`] that throws away /// the retry count. Use this from non-tool callers (the harness serve /// loop, web UI, CLI subcommands) where we just want the socket-restart /// resilience without surfacing the bookkeeping. pub async fn request(socket: &Path, req: &Req) -> Result where Req: Serialize + ?Sized, Resp: DeserializeOwned, { request_retried(socket, req).await.map(|(resp, _)| resp) } /// Same wire shape as [`request`], but reports how many retries it took /// past the initial attempt (0 = succeeded first try). MCP tool handlers /// use this so they can append a one-line hint to the tool result when /// retries happened — that way claude knows the prior socket flake /// wasn't a content error and shouldn't trigger an LLM-level retry of /// its own. pub async fn request_retried(socket: &Path, req: &Req) -> Result<(Resp, u32)> where Req: Serialize + ?Sized, Resp: DeserializeOwned, { let mut last_err: Option = None; let max_retries = u32::try_from(RETRY_BACKOFFS_MS.len()).unwrap(); for attempt in 0..=max_retries { match try_once::(socket, req).await { Ok(resp) => return Ok((resp, attempt)), Err(RequestError::Fatal(e)) => return Err(e), Err(RequestError::Transient(e)) => { if attempt < max_retries { let sleep_ms = RETRY_BACKOFFS_MS[attempt as usize]; tracing::warn!( attempt = attempt + 1, sleep_ms, error = %e, "hive socket attempt failed; retrying" ); last_err = Some(e); tokio::time::sleep(Duration::from_millis(sleep_ms)).await; } else { last_err = Some(e); } } } } Err(last_err.unwrap_or_else(|| anyhow!("hive socket: retries exhausted"))) } /// Transient = connect / IO error worth a retry (server restart, broken /// pipe). Fatal = serialization / deserialization / protocol error /// where retrying would just repeat the same failure. enum RequestError { Transient(anyhow::Error), Fatal(anyhow::Error), } async fn try_once(socket: &Path, req: &Req) -> Result where Req: Serialize + ?Sized, Resp: DeserializeOwned, { let stream = UnixStream::connect(socket) .await .with_context(|| format!("connect to {}", socket.display())) .map_err(RequestError::Transient)?; let (read, mut write) = stream.into_split(); let mut payload = serde_json::to_string(req).map_err(|e| RequestError::Fatal(e.into()))?; payload.push('\n'); write .write_all(payload.as_bytes()) .await .map_err(|e| RequestError::Transient(e.into()))?; write .flush() .await .map_err(|e| RequestError::Transient(e.into()))?; let mut reader = BufReader::new(read); let mut line = String::new(); let read_bytes = reader .read_line(&mut line) .await .map_err(|e| RequestError::Transient(e.into()))?; if read_bytes == 0 || line.is_empty() { return Err(RequestError::Transient(anyhow!( "server closed connection without responding" ))); } serde_json::from_str(line.trim()).map_err(|e| RequestError::Fatal(e.into())) }