diff --git a/flake.nix b/flake.nix index 4d84593..660dcf2 100644 --- a/flake.nix +++ b/flake.nix @@ -105,6 +105,7 @@ rust-analyzer rustc rustfmt + sqlite ]; }; } diff --git a/hive-c0re/src/coordinator.rs b/hive-c0re/src/coordinator.rs index b2ba2f9..c79e04d 100644 --- a/hive-c0re/src/coordinator.rs +++ b/hive-c0re/src/coordinator.rs @@ -1,5 +1,6 @@ -//! Runtime state shared between the host admin socket and the per-agent -//! sockets: the broker plus a map of `name -> AgentSocket`. +//! Runtime state + config shared between the host admin socket, the manager +//! socket, and the per-agent sockets: the broker, configured `agent_flake`, +//! and the map of registered agent sockets. use std::collections::HashMap; use std::path::{Path, PathBuf}; @@ -11,17 +12,20 @@ use crate::agent_server::{self, AgentSocket}; use crate::broker::Broker; const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents"; +const MANAGER_RUNTIME_ROOT: &str = "/run/hyperhive/manager"; pub struct Coordinator { pub broker: Arc, + pub agent_flake: String, agents: Mutex>, } impl Coordinator { - pub fn open(db_path: &Path) -> Result { + pub fn open(db_path: &Path, agent_flake: String) -> Result { let broker = Broker::open(db_path).context("open broker")?; Ok(Self { broker: Arc::new(broker), + agent_flake, agents: Mutex::new(HashMap::new()), }) } @@ -34,7 +38,8 @@ impl Coordinator { std::fs::create_dir_all(&agent_dir) .with_context(|| format!("create agent dir {}", agent_dir.display()))?; let socket_path = Self::socket_path(name); - let socket = agent_server::start(name.to_owned(), socket_path, self.broker.clone()).await?; + let socket = + agent_server::start(name.to_owned(), socket_path, self.broker.clone()).await?; self.agents.lock().unwrap().insert(name.to_owned(), socket); Ok(agent_dir) } @@ -53,4 +58,12 @@ impl Coordinator { pub fn socket_path(name: &str) -> PathBuf { Self::agent_dir(name).join("mcp.sock") } + + pub fn manager_dir() -> PathBuf { + PathBuf::from(MANAGER_RUNTIME_ROOT) + } + + pub fn manager_socket_path() -> PathBuf { + Self::manager_dir().join("mcp.sock") + } } diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 6346cf9..c3f57cc 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -10,6 +10,7 @@ mod broker; mod client; mod coordinator; mod lifecycle; +mod manager_server; mod server; use coordinator::Coordinator; @@ -58,8 +59,9 @@ async fn main() -> Result<()> { let cli = Cli::parse(); match cli.cmd { Cmd::Serve { agent_flake, db } => { - let coord = Arc::new(Coordinator::open(&db)?); - server::serve(&cli.socket, &agent_flake, coord).await + let coord = Arc::new(Coordinator::open(&db, agent_flake)?); + manager_server::start(coord.clone()).await?; + server::serve(&cli.socket, coord).await } Cmd::Spawn { name } => { render(client::request(&cli.socket, HostRequest::Spawn { name }).await?) diff --git a/hive-c0re/src/manager_server.rs b/hive-c0re/src/manager_server.rs new file mode 100644 index 0000000..f784c6d --- /dev/null +++ b/hive-c0re/src/manager_server.rs @@ -0,0 +1,128 @@ +//! Manager socket listener. Privileged tool surface: agent-style send/recv +//! plus lifecycle verbs (Phase 4). Phase 5 will gate Spawn/Kill behind the +//! commit-approval flow; for now they hit the same code path the host admin +//! socket uses. + +use std::sync::Arc; + +use anyhow::{Context, Result}; +use hive_sh4re::{MANAGER_AGENT, ManagerRequest, ManagerResponse, Message}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::{UnixListener, UnixStream}; + +use crate::coordinator::Coordinator; +use crate::lifecycle; + +pub async fn start(coord: Arc) -> Result<()> { + let dir = Coordinator::manager_dir(); + std::fs::create_dir_all(&dir) + .with_context(|| format!("create manager dir {}", dir.display()))?; + let socket = Coordinator::manager_socket_path(); + if socket.exists() { + std::fs::remove_file(&socket).context("remove stale manager socket")?; + } + let listener = UnixListener::bind(&socket) + .with_context(|| format!("bind manager socket {}", socket.display()))?; + tracing::info!(socket = %socket.display(), "manager socket listening"); + + tokio::spawn(async move { + loop { + match listener.accept().await { + Ok((stream, _)) => { + let coord = coord.clone(); + tokio::spawn(async move { + if let Err(e) = serve(stream, coord).await { + tracing::warn!(error = ?e, "manager connection failed"); + } + }); + } + Err(e) => { + tracing::warn!(error = ?e, "manager listener accept failed"); + return; + } + } + } + }); + Ok(()) +} + +async fn serve(stream: UnixStream, coord: Arc) -> Result<()> { + let (read, mut write) = stream.into_split(); + let mut reader = BufReader::new(read); + let mut line = String::new(); + loop { + line.clear(); + let n = reader.read_line(&mut line).await?; + if n == 0 { + return Ok(()); + } + let resp = match serde_json::from_str::(line.trim()) { + Ok(req) => dispatch(&req, &coord).await, + Err(e) => ManagerResponse::Err { + message: format!("parse error: {e}"), + }, + }; + let mut payload = serde_json::to_string(&resp)?; + payload.push('\n'); + write.write_all(payload.as_bytes()).await?; + write.flush().await?; + } +} + +async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse { + match req { + ManagerRequest::Send { to, body } => match coord.broker.send(Message { + from: MANAGER_AGENT.to_owned(), + to: to.clone(), + body: body.clone(), + }) { + Ok(()) => ManagerResponse::Ok, + Err(e) => ManagerResponse::Err { + message: format!("{e:#}"), + }, + }, + ManagerRequest::Recv => match coord.broker.recv(MANAGER_AGENT) { + Ok(Some(msg)) => ManagerResponse::Message { + from: msg.from, + body: msg.body, + }, + Ok(None) => ManagerResponse::Empty, + Err(e) => ManagerResponse::Err { + message: format!("{e:#}"), + }, + }, + ManagerRequest::Spawn { name } => { + tracing::info!(%name, "manager: spawn"); + let result: Result<()> = async { + let agent_dir = coord.register_agent(name).await?; + if let Err(e) = lifecycle::spawn(name, &coord.agent_flake, &agent_dir).await { + coord.unregister_agent(name); + return Err(e); + } + Ok(()) + } + .await; + match result { + Ok(()) => ManagerResponse::Ok, + Err(e) => ManagerResponse::Err { + message: format!("{e:#}"), + }, + } + } + ManagerRequest::Kill { name } => { + tracing::info!(%name, "manager: kill"); + let result: Result<()> = async { + lifecycle::kill(name).await?; + coord.unregister_agent(name); + Ok(()) + } + .await; + match result { + Ok(()) => ManagerResponse::Ok, + Err(e) => ManagerResponse::Err { + message: format!("{e:#}"), + }, + } + } + } +} diff --git a/hive-c0re/src/server.rs b/hive-c0re/src/server.rs index 2e34a9b..06a3635 100644 --- a/hive-c0re/src/server.rs +++ b/hive-c0re/src/server.rs @@ -9,7 +9,7 @@ use tokio::net::{UnixListener, UnixStream}; use crate::coordinator::Coordinator; use crate::lifecycle; -pub async fn serve(socket: &Path, agent_flake: &str, coord: Arc) -> Result<()> { +pub async fn serve(socket: &Path, coord: Arc) -> Result<()> { if let Some(parent) = socket.parent() { std::fs::create_dir_all(parent) .with_context(|| format!("create socket parent {}", parent.display()))?; @@ -20,21 +20,20 @@ pub async fn serve(socket: &Path, agent_flake: &str, coord: Arc) -> let listener = UnixListener::bind(socket) .with_context(|| format!("bind admin socket {}", socket.display()))?; - tracing::info!(socket = %socket.display(), %agent_flake, "hive-c0re listening"); + tracing::info!(socket = %socket.display(), agent_flake = %coord.agent_flake, "hive-c0re admin listening"); loop { let (stream, _) = listener.accept().await.context("accept connection")?; - let agent_flake = agent_flake.to_owned(); let coord = coord.clone(); tokio::spawn(async move { - if let Err(e) = handle(stream, &agent_flake, coord).await { + if let Err(e) = handle(stream, coord).await { tracing::warn!(error = ?e, "connection failed"); } }); } } -async fn handle(stream: UnixStream, agent_flake: &str, coord: Arc) -> Result<()> { +async fn handle(stream: UnixStream, coord: Arc) -> Result<()> { let (read, mut write) = stream.into_split(); let mut reader = BufReader::new(read); let mut line = String::new(); @@ -46,7 +45,7 @@ async fn handle(stream: UnixStream, agent_flake: &str, coord: Arc) return Ok(()); } let resp = match serde_json::from_str::(line.trim()) { - Ok(req) => dispatch(&req, agent_flake, &coord).await, + Ok(req) => dispatch(&req, &coord).await, Err(e) => HostResponse::error(format!("parse error: {e}")), }; let mut payload = serde_json::to_string(&resp)?; @@ -56,13 +55,13 @@ async fn handle(stream: UnixStream, agent_flake: &str, coord: Arc) } } -async fn dispatch(req: &HostRequest, agent_flake: &str, coord: &Coordinator) -> HostResponse { +async fn dispatch(req: &HostRequest, coord: &Coordinator) -> HostResponse { let result: anyhow::Result = async { Ok(match req { HostRequest::Spawn { name } => { tracing::info!(%name, "spawn"); let agent_dir = coord.register_agent(name).await?; - if let Err(e) = lifecycle::spawn(name, agent_flake, &agent_dir).await { + if let Err(e) = lifecycle::spawn(name, &coord.agent_flake, &agent_dir).await { // Roll back socket registration if container creation failed. coord.unregister_agent(name); return Err(e); @@ -78,7 +77,7 @@ async fn dispatch(req: &HostRequest, agent_flake: &str, coord: &Coordinator) -> HostRequest::Rebuild { name } => { tracing::info!(%name, "rebuild"); let agent_dir = coord.register_agent(name).await?; - lifecycle::rebuild(name, agent_flake, &agent_dir).await?; + lifecycle::rebuild(name, &coord.agent_flake, &agent_dir).await?; HostResponse::success() } HostRequest::List => HostResponse::list(lifecycle::list().await?), diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index 0668311..fed4176 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -94,3 +94,33 @@ pub enum AgentResponse { /// `Recv` found nothing pending. Empty, } + +// ----------------------------------------------------------------------------- +// Manager socket — /run/hyperhive/manager/mcp.sock on the host, bind-mounted +// into the manager container at /run/hive/mcp.sock. +// ----------------------------------------------------------------------------- + +/// Logical name the broker uses for the manager. +pub const MANAGER_AGENT: &str = "manager"; + +/// Requests on the manager socket. Manager has the agent surface (send/recv) +/// plus privileged lifecycle verbs. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "cmd", rename_all = "snake_case")] +pub enum ManagerRequest { + Send { to: String, body: String }, + Recv, + /// Spawn a sub-agent. Phase 5 will gate this on user approval. + Spawn { name: String }, + /// Stop a sub-agent (graceful). + Kill { name: String }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum ManagerResponse { + Ok, + Err { message: String }, + Message { from: String, body: String }, + Empty, +}