From d79b5a39a1d411fefb2ac61a5e5c0b7c74a8e05e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?m=C3=BCde?= Date: Thu, 14 May 2026 21:42:51 +0200 Subject: [PATCH] hive-c0re: in-memory broker + per-agent sockets + coordinator state --- hive-c0re/src/agent_server.rs | 101 ++++++++++++++++++++++++++++++++++ hive-c0re/src/broker.rs | 30 ++++++++++ hive-c0re/src/coordinator.rs | 56 +++++++++++++++++++ hive-c0re/src/lifecycle.rs | 10 +++- hive-c0re/src/main.rs | 11 +++- hive-c0re/src/server.rs | 21 +++++-- 6 files changed, 220 insertions(+), 9 deletions(-) create mode 100644 hive-c0re/src/agent_server.rs create mode 100644 hive-c0re/src/broker.rs create mode 100644 hive-c0re/src/coordinator.rs diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs new file mode 100644 index 0000000..89104a8 --- /dev/null +++ b/hive-c0re/src/agent_server.rs @@ -0,0 +1,101 @@ +//! Per-agent socket listener. Each socket file's existence on disk +//! authenticates the caller: connecting to `<.../agents/foo/mcp.sock>` means +//! you are `foo`. + +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use hive_sh4re::{AgentRequest, AgentResponse, Message}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::{UnixListener, UnixStream}; +use tokio::task::JoinHandle; + +use crate::broker::Broker; + +pub struct AgentSocket { + pub path: PathBuf, + pub handle: JoinHandle<()>, +} + +pub async fn start( + agent: String, + socket_path: PathBuf, + broker: Arc, +) -> Result { + if let Some(parent) = socket_path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("create agent socket dir {}", parent.display()))?; + } + if socket_path.exists() { + std::fs::remove_file(&socket_path).context("remove stale agent socket")?; + } + let listener = UnixListener::bind(&socket_path) + .with_context(|| format!("bind agent socket {}", socket_path.display()))?; + tracing::info!(%agent, socket = %socket_path.display(), "agent socket listening"); + + let path = socket_path.clone(); + let handle = tokio::spawn(async move { + loop { + match listener.accept().await { + Ok((stream, _)) => { + let agent = agent.clone(); + let broker = broker.clone(); + tokio::spawn(async move { + if let Err(e) = serve(stream, agent, broker).await { + tracing::warn!(error = ?e, "agent connection failed"); + } + }); + } + Err(e) => { + tracing::warn!(error = ?e, "agent listener accept failed; exiting"); + return; + } + } + } + }); + Ok(AgentSocket { path, handle }) +} + +async fn serve(stream: UnixStream, agent: String, broker: Arc) -> Result<()> { + let (read, mut write) = stream.into_split(); + let mut reader = BufReader::new(read); + let mut line = String::new(); + loop { + line.clear(); + let n = reader.read_line(&mut line).await?; + if n == 0 { + return Ok(()); + } + let resp = match serde_json::from_str::(line.trim()) { + Ok(req) => dispatch(&req, &agent, &broker), + Err(e) => AgentResponse::Err { + message: format!("parse error: {e}"), + }, + }; + let mut payload = serde_json::to_string(&resp)?; + payload.push('\n'); + write.write_all(payload.as_bytes()).await?; + write.flush().await?; + } +} + +fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse { + match req { + AgentRequest::Send { to, body } => { + broker.send(Message { + from: agent.to_owned(), + to: to.clone(), + body: body.clone(), + }); + AgentResponse::Ok + } + AgentRequest::Recv => match broker.recv(agent) { + Some(msg) => AgentResponse::Message { + from: msg.from, + body: msg.body, + }, + None => AgentResponse::Empty, + }, + } +} diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs new file mode 100644 index 0000000..2867c1f --- /dev/null +++ b/hive-c0re/src/broker.rs @@ -0,0 +1,30 @@ +//! In-memory message broker. Phase 3 replaces this with a sqlite-backed store. + +use std::collections::{HashMap, VecDeque}; +use std::sync::Mutex; + +use hive_sh4re::Message; + +#[derive(Default)] +pub struct Broker { + queues: Mutex>>, +} + +impl Broker { + pub fn new() -> Self { + Self::default() + } + + pub fn send(&self, message: Message) { + let mut queues = self.queues.lock().unwrap(); + queues + .entry(message.to.clone()) + .or_default() + .push_back(message); + } + + pub fn recv(&self, recipient: &str) -> Option { + let mut queues = self.queues.lock().unwrap(); + queues.get_mut(recipient).and_then(|q| q.pop_front()) + } +} diff --git a/hive-c0re/src/coordinator.rs b/hive-c0re/src/coordinator.rs new file mode 100644 index 0000000..66d8b7d --- /dev/null +++ b/hive-c0re/src/coordinator.rs @@ -0,0 +1,56 @@ +//! Runtime state shared between the host admin socket and the per-agent +//! sockets: the broker plus a map of `name -> AgentSocket`. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +use anyhow::{Context, Result}; + +use crate::agent_server::{self, AgentSocket}; +use crate::broker::Broker; + +const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents"; + +pub struct Coordinator { + pub broker: Arc, + agents: Mutex>, +} + +impl Coordinator { + pub fn new() -> Self { + Self { + broker: Arc::new(Broker::new()), + agents: Mutex::new(HashMap::new()), + } + } + + pub async fn register_agent(&self, name: &str) -> Result { + let agent_dir = Self::agent_dir(name); + std::fs::create_dir_all(&agent_dir) + .with_context(|| format!("create agent dir {}", agent_dir.display()))?; + let socket_path = Self::socket_path(name); + let socket = + agent_server::start(name.to_owned(), socket_path, self.broker.clone()).await?; + self.agents + .lock() + .unwrap() + .insert(name.to_owned(), socket); + Ok(agent_dir) + } + + pub fn unregister_agent(&self, name: &str) { + if let Some(socket) = self.agents.lock().unwrap().remove(name) { + socket.handle.abort(); + let _ = std::fs::remove_file(&socket.path); + } + } + + pub fn agent_dir(name: &str) -> PathBuf { + PathBuf::from(format!("{AGENT_RUNTIME_ROOT}/{name}")) + } + + pub fn socket_path(name: &str) -> PathBuf { + Self::agent_dir(name).join("mcp.sock") + } +} diff --git a/hive-c0re/src/lifecycle.rs b/hive-c0re/src/lifecycle.rs index 60f1f71..72c26cd 100644 --- a/hive-c0re/src/lifecycle.rs +++ b/hive-c0re/src/lifecycle.rs @@ -1,18 +1,24 @@ //! Thin async wrappers over `nixos-container`. +use std::path::Path; + use anyhow::{Context, Result, bail}; use tokio::process::Command; pub const AGENT_PREFIX: &str = "hive-agent-"; pub const HIVE_PREFIX: &str = "hive-"; +/// Mount point of the per-agent runtime directory inside the container. +pub const CONTAINER_RUNTIME_MOUNT: &str = "/run/hive"; + pub fn container_name(name: &str) -> String { format!("{AGENT_PREFIX}{name}") } -pub async fn spawn(name: &str, agent_flake: &str) -> Result<()> { +pub async fn spawn(name: &str, agent_flake: &str, agent_dir: &Path) -> Result<()> { let container = container_name(name); - run(&["create", &container, "--flake", agent_flake]).await?; + let bind = format!("{}:{CONTAINER_RUNTIME_MOUNT}", agent_dir.display()); + run(&["create", &container, "--flake", agent_flake, "--bind", &bind]).await?; run(&["start", &container]).await } diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index c03df93..760d193 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -1,13 +1,19 @@ use std::path::PathBuf; +use std::sync::Arc; use anyhow::{Result, bail}; use clap::{Parser, Subcommand}; use hive_sh4re::{HostRequest, HostResponse}; +mod agent_server; +mod broker; mod client; +mod coordinator; mod lifecycle; mod server; +use coordinator::Coordinator; + #[derive(Parser)] #[command(name = "hive-c0re", about = "hyperhive coordinator daemon and CLI")] struct Cli { @@ -48,7 +54,10 @@ async fn main() -> Result<()> { let cli = Cli::parse(); match cli.cmd { - Cmd::Serve { agent_flake } => server::serve(&cli.socket, &agent_flake).await, + Cmd::Serve { agent_flake } => { + let coord = Arc::new(Coordinator::new()); + server::serve(&cli.socket, &agent_flake, coord).await + } Cmd::Spawn { name } => { render(client::request(&cli.socket, HostRequest::Spawn { name }).await?) } diff --git a/hive-c0re/src/server.rs b/hive-c0re/src/server.rs index 48eeb56..131e1b4 100644 --- a/hive-c0re/src/server.rs +++ b/hive-c0re/src/server.rs @@ -1,13 +1,15 @@ use std::path::Path; +use std::sync::Arc; use anyhow::{Context, Result}; use hive_sh4re::{HostRequest, HostResponse}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; +use crate::coordinator::Coordinator; use crate::lifecycle; -pub async fn serve(socket: &Path, agent_flake: &str) -> Result<()> { +pub async fn serve(socket: &Path, agent_flake: &str, coord: Arc) -> Result<()> { if let Some(parent) = socket.parent() { std::fs::create_dir_all(parent) .with_context(|| format!("create socket parent {}", parent.display()))?; @@ -23,15 +25,16 @@ pub async fn serve(socket: &Path, agent_flake: &str) -> Result<()> { loop { let (stream, _) = listener.accept().await.context("accept connection")?; let agent_flake = agent_flake.to_owned(); + let coord = coord.clone(); tokio::spawn(async move { - if let Err(e) = handle(stream, &agent_flake).await { + if let Err(e) = handle(stream, &agent_flake, coord).await { tracing::warn!(error = ?e, "connection failed"); } }); } } -async fn handle(stream: UnixStream, agent_flake: &str) -> Result<()> { +async fn handle(stream: UnixStream, agent_flake: &str, coord: Arc) -> Result<()> { let (read, mut write) = stream.into_split(); let mut reader = BufReader::new(read); let mut line = String::new(); @@ -43,7 +46,7 @@ async fn handle(stream: UnixStream, agent_flake: &str) -> Result<()> { return Ok(()); } let resp = match serde_json::from_str::(line.trim()) { - Ok(req) => dispatch(&req, agent_flake).await, + Ok(req) => dispatch(&req, agent_flake, &coord).await, Err(e) => HostResponse::error(format!("parse error: {e}")), }; let mut payload = serde_json::to_string(&resp)?; @@ -53,17 +56,23 @@ async fn handle(stream: UnixStream, agent_flake: &str) -> Result<()> { } } -async fn dispatch(req: &HostRequest, agent_flake: &str) -> HostResponse { +async fn dispatch(req: &HostRequest, agent_flake: &str, coord: &Coordinator) -> HostResponse { let result: anyhow::Result = async { Ok(match req { HostRequest::Spawn { name } => { tracing::info!(%name, "spawn"); - lifecycle::spawn(name, agent_flake).await?; + let agent_dir = coord.register_agent(name).await?; + if let Err(e) = lifecycle::spawn(name, agent_flake, &agent_dir).await { + // Roll back socket registration if container creation failed. + coord.unregister_agent(name); + return Err(e); + } HostResponse::success() } HostRequest::Kill { name } => { tracing::info!(%name, "kill"); lifecycle::kill(name).await?; + coord.unregister_agent(name); HostResponse::success() } HostRequest::Rebuild { name } => {