hyperhive/hive-c0re/src/coordinator.rs

57 lines
1.9 KiB
Rust

//! Runtime state shared between the host admin socket and the per-agent
//! sockets: the broker plus a map of `name -> AgentSocket`.
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use anyhow::{Context, Result};
use crate::agent_server::{self, AgentSocket};
use crate::broker::Broker;
const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents";
pub struct Coordinator {
pub broker: Arc<Broker>,
agents: Mutex<HashMap<String, AgentSocket>>,
}
impl Coordinator {
pub fn open(db_path: &Path) -> Result<Self> {
let broker = Broker::open(db_path).context("open broker")?;
Ok(Self {
broker: Arc::new(broker),
agents: Mutex::new(HashMap::new()),
})
}
pub async fn register_agent(&self, name: &str) -> Result<PathBuf> {
// Idempotent: drop any existing listener so re-registration (e.g. on rebuild,
// or after a hive-c0re restart cleared /run/hyperhive) gets a fresh socket.
self.unregister_agent(name);
let agent_dir = Self::agent_dir(name);
std::fs::create_dir_all(&agent_dir)
.with_context(|| format!("create agent dir {}", agent_dir.display()))?;
let socket_path = Self::socket_path(name);
let socket =
agent_server::start(name.to_owned(), socket_path, self.broker.clone()).await?;
self.agents.lock().unwrap().insert(name.to_owned(), socket);
Ok(agent_dir)
}
pub fn unregister_agent(&self, name: &str) {
if let Some(socket) = self.agents.lock().unwrap().remove(name) {
socket.handle.abort();
let _ = std::fs::remove_file(&socket.path);
}
}
pub fn agent_dir(name: &str) -> PathBuf {
PathBuf::from(format!("{AGENT_RUNTIME_ROOT}/{name}"))
}
pub fn socket_path(name: &str) -> PathBuf {
Self::agent_dir(name).join("mcp.sock")
}
}