//! Per-agent socket listener. Each socket file's existence on disk //! authenticates the caller: connecting to `<.../agents/foo/mcp.sock>` means //! you are `foo`. use std::path::{Path, PathBuf}; use std::sync::Arc; use anyhow::{Context, Result}; use hive_sh4re::{AgentRequest, AgentResponse, Message}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; use tokio::task::JoinHandle; use crate::coordinator::Coordinator; pub struct AgentSocket { pub path: PathBuf, pub handle: JoinHandle<()>, } pub fn start(agent: &str, socket_path: &Path, coord: Arc) -> Result { let agent = agent.to_owned(); if let Some(parent) = socket_path.parent() { std::fs::create_dir_all(parent) .with_context(|| format!("create agent socket dir {}", parent.display()))?; } if socket_path.exists() { std::fs::remove_file(socket_path).context("remove stale agent socket")?; } let listener = UnixListener::bind(socket_path) .with_context(|| format!("bind agent socket {}", socket_path.display()))?; tracing::info!(%agent, socket = %socket_path.display(), "agent socket listening"); let path = socket_path.to_path_buf(); let handle = tokio::spawn(async move { loop { match listener.accept().await { Ok((stream, _)) => { let agent = agent.clone(); let coord = coord.clone(); tokio::spawn(async move { if let Err(e) = serve(stream, agent, coord).await { tracing::warn!(error = ?e, "agent connection failed"); } }); } Err(e) => { tracing::warn!(error = ?e, "agent listener accept failed; exiting"); return; } } } }); Ok(AgentSocket { path, handle }) } async fn serve(stream: UnixStream, agent: String, coord: Arc) -> Result<()> { let (read, mut write) = stream.into_split(); let mut reader = BufReader::new(read); let mut line = String::new(); loop { line.clear(); let n = reader.read_line(&mut line).await?; if n == 0 { return Ok(()); } let resp = match serde_json::from_str::(line.trim()) { Ok(req) => dispatch(&req, &agent, &coord).await, Err(e) => AgentResponse::Err { message: format!("parse error: {e}"), }, }; let mut payload = serde_json::to_string(&resp)?; payload.push('\n'); write.write_all(payload.as_bytes()).await?; write.flush().await?; } } /// Max long-poll window the caller can ask for; values above the /// cap are clamped. 180s keeps us under typical TCP/proxy idle /// limits while still letting agents park their turn until a /// message arrives. Omitting `wait_seconds` (or passing `0`) means /// "peek, don't wait" — claude can call recv whenever it wants a /// cheap "is there anything pending?" check without blocking the /// turn for 30 seconds. To actually park, the caller passes a /// positive `wait_seconds`. const RECV_LONG_POLL_MAX: std::time::Duration = std::time::Duration::from_secs(180); fn recv_timeout(wait_seconds: Option) -> std::time::Duration { match wait_seconds { Some(s) => std::time::Duration::from_secs(s).min(RECV_LONG_POLL_MAX), None => std::time::Duration::ZERO, } } async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> AgentResponse { let broker = &coord.broker; match req { AgentRequest::Send { to, body } => handle_send(coord, agent, to, body), AgentRequest::Recv { wait_seconds } => match broker .recv_blocking(agent, recv_timeout(*wait_seconds)) .await { Ok(Some(msg)) => AgentResponse::Message { from: msg.from, body: msg.body, }, Ok(None) => AgentResponse::Empty, Err(e) => AgentResponse::Err { message: format!("{e:#}"), }, }, AgentRequest::Status => match broker.count_pending(agent) { Ok(unread) => AgentResponse::Status { unread }, Err(e) => AgentResponse::Err { message: format!("{e:#}"), }, }, AgentRequest::OperatorMsg { body } => match broker.send(&Message { from: hive_sh4re::OPERATOR_RECIPIENT.to_owned(), to: agent.to_owned(), body: body.clone(), }) { Ok(()) => AgentResponse::Ok, Err(e) => AgentResponse::Err { message: format!("{e:#}"), }, }, AgentRequest::Wake { from, body } => match broker.send(&Message { from: from.clone(), to: agent.to_owned(), body: body.clone(), }) { Ok(()) => AgentResponse::Ok, Err(e) => AgentResponse::Err { message: format!("{e:#}"), }, }, AgentRequest::Recent { limit } => match broker.recent_for(agent, *limit) { Ok(rows) => AgentResponse::Recent { rows }, Err(e) => AgentResponse::Err { message: format!("{e:#}"), }, }, AgentRequest::Ask { question, options, multi, ttl_seconds, to, } => crate::questions::handle_ask( coord, agent, question, options, *multi, *ttl_seconds, to.as_deref(), ) .map_or_else( |message| AgentResponse::Err { message }, |id| AgentResponse::QuestionQueued { id }, ), AgentRequest::Answer { id, answer } => crate::questions::handle_answer( coord, agent, *id, answer, ) .map_or_else( |message| AgentResponse::Err { message }, |()| AgentResponse::Ok, ), AgentRequest::Remind { message, timing, file_path, } => handle_remind(coord, agent, message, timing, file_path.as_deref()), } } /// Common Send handler shared between dispatch arms. Applies the /// 1 KiB body cap, then routes broadcast (`to == "*"`) vs unicast /// through their respective broker calls. Pulled out of `dispatch` /// to keep that function under the clippy too-many-lines limit; the /// behaviour is identical to inlining. fn handle_send(coord: &Arc, agent: &str, to: &str, body: &str) -> AgentResponse { if let Err(message) = crate::limits::check_size("send", body) { return AgentResponse::Err { message }; } if to == "*" { let errors = coord.broadcast_send(agent, body); return if errors.is_empty() { AgentResponse::Ok } else { AgentResponse::Err { message: format!("broadcast failed for agents: {}", errors.join(", ")), } }; } match coord.broker.send(&Message { from: agent.to_owned(), to: to.to_owned(), body: body.to_owned(), }) { Ok(()) => AgentResponse::Ok, Err(e) => AgentResponse::Err { message: format!("{e:#}"), }, } } fn handle_remind( coord: &Arc, agent: &str, message: &str, timing: &hive_sh4re::ReminderTiming, file_path: Option<&str>, ) -> AgentResponse { match store_remind(coord, agent, message, timing, file_path) { Ok(()) => AgentResponse::Ok, Err(message) => AgentResponse::Err { message }, } } /// Shared remind-storage path used by both the agent and the manager /// dispatchers. Validates timing, applies the auto-file overflow /// dance (see [`prepare_remind_storage`]), and writes the reminder /// row. Returns `Ok(())` on success, or a caller-ready error string /// the dispatcher wraps in `*Response::Err`. pub(crate) fn store_remind( coord: &Arc, agent: &str, message: &str, timing: &hive_sh4re::ReminderTiming, file_path: Option<&str>, ) -> Result<(), String> { let due_at = resolve_due_at(timing).map_err(|e| format!("invalid reminder timing: {e:#}"))?; let (stored_message, stored_path) = prepare_remind_storage(agent, message, file_path)?; let id = coord .broker .store_reminder(agent, &stored_message, stored_path.as_deref(), due_at) .map_err(|e| format!("failed to store reminder: {e:#}"))?; tracing::info!(%id, %agent, %due_at, "reminder scheduled"); Ok(()) } /// Decide what we actually store in the reminders row, applying the /// same byte cap as the rest of the wire protocol /// ([`crate::limits::MESSAGE_MAX_BYTES`]). Three outcomes: /// /// 1. Body within the cap → stored verbatim, with whatever `file_path` /// the caller passed (None or Some). The scheduler honours /// `file_path` at delivery time as before. /// 2. Body over the cap, no caller `file_path` → auto-generate a path /// under `/agents//state/reminders/auto-.md`, write the /// body to disk now, store a short pointer hint as the message and /// clear `file_path` (so the scheduler doesn't re-write at /// delivery and overwrite the body with the hint). /// 3. Body over the cap, caller provided `file_path` → honour the /// caller's path: write the body to it now, store the same hint /// and clear `file_path` for the same reason as (2). /// /// Returns `(stored_message, stored_file_path)` on success, or a /// caller-ready error string on auto-save failure (which is the only /// way a Remind request can be refused for size — the agent never has /// to think about the cap). fn prepare_remind_storage( agent: &str, message: &str, file_path: Option<&str>, ) -> Result<(String, Option), String> { if message.len() <= crate::limits::MESSAGE_MAX_BYTES { return Ok((message.to_owned(), file_path.map(str::to_owned))); } let req_path = match file_path { Some(p) => p.to_owned(), None => auto_reminder_path(agent), }; let host_path = crate::reminder_scheduler::resolve_host_path(agent, &req_path) .map_err(|reason| format!("auto-save path `{req_path}` rejected: {reason}"))?; crate::reminder_scheduler::write_payload(agent, &host_path, message) .map_err(|reason| format!("auto-save of large reminder body to `{req_path}` failed: {reason}"))?; let hint = format!( "[reminder body of {} bytes auto-saved to `{req_path}`; read with your filesystem tools]", message.len() ); Ok((hint, None)) } /// Generate a per-agent path for an auto-saved reminder body. Uses /// `unix_nanos` plus the agent name to keep collisions infinitesimal /// across the agent's own state subtree (we're not stamping a hostname /// since hive-c0re is single-host). fn auto_reminder_path(agent: &str) -> String { let ts_ns = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_nanos()) .unwrap_or(0); format!("/agents/{agent}/state/reminders/auto-{ts_ns}.md") } /// Resolve the `due_at` unix timestamp for a Remind request. Returns /// distinct error messages for each failure mode (overflow on /// `InSeconds`, pre-epoch clock, `i64` cast wrap) so the caller can tell /// what went wrong without inspecting the chain. fn resolve_due_at(timing: &hive_sh4re::ReminderTiming) -> anyhow::Result { use hive_sh4re::ReminderTiming; match timing { ReminderTiming::InSeconds { seconds } => { let now = std::time::SystemTime::now(); let future = now .checked_add(std::time::Duration::from_secs(*seconds)) .ok_or_else(|| { anyhow::anyhow!("InSeconds overflow: {seconds}s exceeds system time range") })?; let duration = future .duration_since(std::time::UNIX_EPOCH) .map_err(|e| anyhow::anyhow!("system time before UNIX_EPOCH: {e}"))?; i64::try_from(duration.as_secs()) .map_err(|e| anyhow::anyhow!("unix timestamp exceeds i64 range: {e}")) } ReminderTiming::At { unix_timestamp } => Ok(*unix_timestamp), } } #[cfg(test)] mod tests { use super::*; #[test] fn auto_reminder_path_format() { let p = auto_reminder_path("damocles"); assert!(p.starts_with("/agents/damocles/state/reminders/auto-")); assert!(p.ends_with(".md")); } #[test] fn prepare_remind_storage_passthrough_under_cap() { let (msg, fp) = prepare_remind_storage("foo", "small body", None).unwrap(); assert_eq!(msg, "small body"); assert_eq!(fp, None); } #[test] fn prepare_remind_storage_passthrough_with_caller_file_path() { let (msg, fp) = prepare_remind_storage("foo", "small", Some("/agents/foo/state/x.md")).unwrap(); assert_eq!(msg, "small"); assert_eq!(fp.as_deref(), Some("/agents/foo/state/x.md")); } }