hive-c0re: in-memory broker + per-agent sockets + coordinator state

This commit is contained in:
müde 2026-05-14 21:42:51 +02:00
parent 4545c08908
commit d79b5a39a1
6 changed files with 220 additions and 9 deletions

View file

@ -0,0 +1,101 @@
//! Per-agent socket listener. Each socket file's existence on disk
//! authenticates the caller: connecting to `<.../agents/foo/mcp.sock>` means
//! you are `foo`.
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{Context, Result};
use hive_sh4re::{AgentRequest, AgentResponse, Message};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use tokio::task::JoinHandle;
use crate::broker::Broker;
pub struct AgentSocket {
pub path: PathBuf,
pub handle: JoinHandle<()>,
}
pub async fn start(
agent: String,
socket_path: PathBuf,
broker: Arc<Broker>,
) -> Result<AgentSocket> {
if let Some(parent) = socket_path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create agent socket dir {}", parent.display()))?;
}
if socket_path.exists() {
std::fs::remove_file(&socket_path).context("remove stale agent socket")?;
}
let listener = UnixListener::bind(&socket_path)
.with_context(|| format!("bind agent socket {}", socket_path.display()))?;
tracing::info!(%agent, socket = %socket_path.display(), "agent socket listening");
let path = socket_path.clone();
let handle = tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, _)) => {
let agent = agent.clone();
let broker = broker.clone();
tokio::spawn(async move {
if let Err(e) = serve(stream, agent, broker).await {
tracing::warn!(error = ?e, "agent connection failed");
}
});
}
Err(e) => {
tracing::warn!(error = ?e, "agent listener accept failed; exiting");
return;
}
}
}
});
Ok(AgentSocket { path, handle })
}
async fn serve(stream: UnixStream, agent: String, broker: Arc<Broker>) -> Result<()> {
let (read, mut write) = stream.into_split();
let mut reader = BufReader::new(read);
let mut line = String::new();
loop {
line.clear();
let n = reader.read_line(&mut line).await?;
if n == 0 {
return Ok(());
}
let resp = match serde_json::from_str::<AgentRequest>(line.trim()) {
Ok(req) => dispatch(&req, &agent, &broker),
Err(e) => AgentResponse::Err {
message: format!("parse error: {e}"),
},
};
let mut payload = serde_json::to_string(&resp)?;
payload.push('\n');
write.write_all(payload.as_bytes()).await?;
write.flush().await?;
}
}
fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse {
match req {
AgentRequest::Send { to, body } => {
broker.send(Message {
from: agent.to_owned(),
to: to.clone(),
body: body.clone(),
});
AgentResponse::Ok
}
AgentRequest::Recv => match broker.recv(agent) {
Some(msg) => AgentResponse::Message {
from: msg.from,
body: msg.body,
},
None => AgentResponse::Empty,
},
}
}

30
hive-c0re/src/broker.rs Normal file
View file

@ -0,0 +1,30 @@
//! In-memory message broker. Phase 3 replaces this with a sqlite-backed store.
use std::collections::{HashMap, VecDeque};
use std::sync::Mutex;
use hive_sh4re::Message;
#[derive(Default)]
pub struct Broker {
queues: Mutex<HashMap<String, VecDeque<Message>>>,
}
impl Broker {
pub fn new() -> Self {
Self::default()
}
pub fn send(&self, message: Message) {
let mut queues = self.queues.lock().unwrap();
queues
.entry(message.to.clone())
.or_default()
.push_back(message);
}
pub fn recv(&self, recipient: &str) -> Option<Message> {
let mut queues = self.queues.lock().unwrap();
queues.get_mut(recipient).and_then(|q| q.pop_front())
}
}

View file

@ -0,0 +1,56 @@
//! Runtime state shared between the host admin socket and the per-agent
//! sockets: the broker plus a map of `name -> AgentSocket`.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use anyhow::{Context, Result};
use crate::agent_server::{self, AgentSocket};
use crate::broker::Broker;
const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents";
pub struct Coordinator {
pub broker: Arc<Broker>,
agents: Mutex<HashMap<String, AgentSocket>>,
}
impl Coordinator {
pub fn new() -> Self {
Self {
broker: Arc::new(Broker::new()),
agents: Mutex::new(HashMap::new()),
}
}
pub async fn register_agent(&self, name: &str) -> Result<PathBuf> {
let agent_dir = Self::agent_dir(name);
std::fs::create_dir_all(&agent_dir)
.with_context(|| format!("create agent dir {}", agent_dir.display()))?;
let socket_path = Self::socket_path(name);
let socket =
agent_server::start(name.to_owned(), socket_path, self.broker.clone()).await?;
self.agents
.lock()
.unwrap()
.insert(name.to_owned(), socket);
Ok(agent_dir)
}
pub fn unregister_agent(&self, name: &str) {
if let Some(socket) = self.agents.lock().unwrap().remove(name) {
socket.handle.abort();
let _ = std::fs::remove_file(&socket.path);
}
}
pub fn agent_dir(name: &str) -> PathBuf {
PathBuf::from(format!("{AGENT_RUNTIME_ROOT}/{name}"))
}
pub fn socket_path(name: &str) -> PathBuf {
Self::agent_dir(name).join("mcp.sock")
}
}

View file

@ -1,18 +1,24 @@
//! Thin async wrappers over `nixos-container`. //! Thin async wrappers over `nixos-container`.
use std::path::Path;
use anyhow::{Context, Result, bail}; use anyhow::{Context, Result, bail};
use tokio::process::Command; use tokio::process::Command;
pub const AGENT_PREFIX: &str = "hive-agent-"; pub const AGENT_PREFIX: &str = "hive-agent-";
pub const HIVE_PREFIX: &str = "hive-"; pub const HIVE_PREFIX: &str = "hive-";
/// Mount point of the per-agent runtime directory inside the container.
pub const CONTAINER_RUNTIME_MOUNT: &str = "/run/hive";
pub fn container_name(name: &str) -> String { pub fn container_name(name: &str) -> String {
format!("{AGENT_PREFIX}{name}") format!("{AGENT_PREFIX}{name}")
} }
pub async fn spawn(name: &str, agent_flake: &str) -> Result<()> { pub async fn spawn(name: &str, agent_flake: &str, agent_dir: &Path) -> Result<()> {
let container = container_name(name); let container = container_name(name);
run(&["create", &container, "--flake", agent_flake]).await?; let bind = format!("{}:{CONTAINER_RUNTIME_MOUNT}", agent_dir.display());
run(&["create", &container, "--flake", agent_flake, "--bind", &bind]).await?;
run(&["start", &container]).await run(&["start", &container]).await
} }

View file

@ -1,13 +1,19 @@
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{Result, bail}; use anyhow::{Result, bail};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use hive_sh4re::{HostRequest, HostResponse}; use hive_sh4re::{HostRequest, HostResponse};
mod agent_server;
mod broker;
mod client; mod client;
mod coordinator;
mod lifecycle; mod lifecycle;
mod server; mod server;
use coordinator::Coordinator;
#[derive(Parser)] #[derive(Parser)]
#[command(name = "hive-c0re", about = "hyperhive coordinator daemon and CLI")] #[command(name = "hive-c0re", about = "hyperhive coordinator daemon and CLI")]
struct Cli { struct Cli {
@ -48,7 +54,10 @@ async fn main() -> Result<()> {
let cli = Cli::parse(); let cli = Cli::parse();
match cli.cmd { match cli.cmd {
Cmd::Serve { agent_flake } => server::serve(&cli.socket, &agent_flake).await, Cmd::Serve { agent_flake } => {
let coord = Arc::new(Coordinator::new());
server::serve(&cli.socket, &agent_flake, coord).await
}
Cmd::Spawn { name } => { Cmd::Spawn { name } => {
render(client::request(&cli.socket, HostRequest::Spawn { name }).await?) render(client::request(&cli.socket, HostRequest::Spawn { name }).await?)
} }

View file

@ -1,13 +1,15 @@
use std::path::Path; use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use hive_sh4re::{HostRequest, HostResponse}; use hive_sh4re::{HostRequest, HostResponse};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream}; use tokio::net::{UnixListener, UnixStream};
use crate::coordinator::Coordinator;
use crate::lifecycle; use crate::lifecycle;
pub async fn serve(socket: &Path, agent_flake: &str) -> Result<()> { pub async fn serve(socket: &Path, agent_flake: &str, coord: Arc<Coordinator>) -> Result<()> {
if let Some(parent) = socket.parent() { if let Some(parent) = socket.parent() {
std::fs::create_dir_all(parent) std::fs::create_dir_all(parent)
.with_context(|| format!("create socket parent {}", parent.display()))?; .with_context(|| format!("create socket parent {}", parent.display()))?;
@ -23,15 +25,16 @@ pub async fn serve(socket: &Path, agent_flake: &str) -> Result<()> {
loop { loop {
let (stream, _) = listener.accept().await.context("accept connection")?; let (stream, _) = listener.accept().await.context("accept connection")?;
let agent_flake = agent_flake.to_owned(); let agent_flake = agent_flake.to_owned();
let coord = coord.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = handle(stream, &agent_flake).await { if let Err(e) = handle(stream, &agent_flake, coord).await {
tracing::warn!(error = ?e, "connection failed"); tracing::warn!(error = ?e, "connection failed");
} }
}); });
} }
} }
async fn handle(stream: UnixStream, agent_flake: &str) -> Result<()> { async fn handle(stream: UnixStream, agent_flake: &str, coord: Arc<Coordinator>) -> Result<()> {
let (read, mut write) = stream.into_split(); let (read, mut write) = stream.into_split();
let mut reader = BufReader::new(read); let mut reader = BufReader::new(read);
let mut line = String::new(); let mut line = String::new();
@ -43,7 +46,7 @@ async fn handle(stream: UnixStream, agent_flake: &str) -> Result<()> {
return Ok(()); return Ok(());
} }
let resp = match serde_json::from_str::<HostRequest>(line.trim()) { let resp = match serde_json::from_str::<HostRequest>(line.trim()) {
Ok(req) => dispatch(&req, agent_flake).await, Ok(req) => dispatch(&req, agent_flake, &coord).await,
Err(e) => HostResponse::error(format!("parse error: {e}")), Err(e) => HostResponse::error(format!("parse error: {e}")),
}; };
let mut payload = serde_json::to_string(&resp)?; let mut payload = serde_json::to_string(&resp)?;
@ -53,17 +56,23 @@ async fn handle(stream: UnixStream, agent_flake: &str) -> Result<()> {
} }
} }
async fn dispatch(req: &HostRequest, agent_flake: &str) -> HostResponse { async fn dispatch(req: &HostRequest, agent_flake: &str, coord: &Coordinator) -> HostResponse {
let result: anyhow::Result<HostResponse> = async { let result: anyhow::Result<HostResponse> = async {
Ok(match req { Ok(match req {
HostRequest::Spawn { name } => { HostRequest::Spawn { name } => {
tracing::info!(%name, "spawn"); tracing::info!(%name, "spawn");
lifecycle::spawn(name, agent_flake).await?; let agent_dir = coord.register_agent(name).await?;
if let Err(e) = lifecycle::spawn(name, agent_flake, &agent_dir).await {
// Roll back socket registration if container creation failed.
coord.unregister_agent(name);
return Err(e);
}
HostResponse::success() HostResponse::success()
} }
HostRequest::Kill { name } => { HostRequest::Kill { name } => {
tracing::info!(%name, "kill"); tracing::info!(%name, "kill");
lifecycle::kill(name).await?; lifecycle::kill(name).await?;
coord.unregister_agent(name);
HostResponse::success() HostResponse::success()
} }
HostRequest::Rebuild { name } => { HostRequest::Rebuild { name } => {