use std::path::Path; use std::sync::Arc; use anyhow::{Context, Result}; use hive_sh4re::{HostRequest, HostResponse}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; use crate::actions; use crate::coordinator::Coordinator; use crate::lifecycle; pub async fn serve(socket: &Path, coord: Arc) -> Result<()> { if let Some(parent) = socket.parent() { std::fs::create_dir_all(parent) .with_context(|| format!("create socket parent {}", parent.display()))?; } if socket.exists() { std::fs::remove_file(socket).context("remove stale socket")?; } let listener = UnixListener::bind(socket) .with_context(|| format!("bind admin socket {}", socket.display()))?; tracing::info!(socket = %socket.display(), hyperhive_flake = %coord.hyperhive_flake, "hive-c0re admin listening"); loop { let (stream, _) = listener.accept().await.context("accept connection")?; let coord = coord.clone(); tokio::spawn(async move { if let Err(e) = handle(stream, coord).await { tracing::warn!(error = ?e, "connection failed"); } }); } } async fn handle(stream: UnixStream, coord: Arc) -> Result<()> { let (read, mut write) = stream.into_split(); let mut reader = BufReader::new(read); let mut line = String::new(); loop { line.clear(); let n = reader.read_line(&mut line).await?; if n == 0 { return Ok(()); } let resp = match serde_json::from_str::(line.trim()) { Ok(req) => dispatch(&req, coord.clone()).await, Err(e) => HostResponse::error(format!("parse error: {e}")), }; let mut payload = serde_json::to_string(&resp)?; payload.push('\n'); write.write_all(payload.as_bytes()).await?; write.flush().await?; } } async fn dispatch(req: &HostRequest, coord: Arc) -> HostResponse { let result: anyhow::Result = async { Ok(match req { HostRequest::Spawn { name } => { tracing::info!(%name, "spawn"); let agent_dir = coord.ensure_runtime(name)?; let proposed_dir = Coordinator::agent_proposed_dir(name); let applied_dir = Coordinator::agent_applied_dir(name); let claude_dir = Coordinator::agent_claude_dir(name); let notes_dir = Coordinator::agent_notes_dir(name); match lifecycle::spawn( name, &coord.hyperhive_flake, &agent_dir, &proposed_dir, &applied_dir, &claude_dir, ¬es_dir, coord.dashboard_port, ) .await { Ok(()) => { coord.notify_manager(&hive_sh4re::HelperEvent::Spawned { agent: name.clone(), ok: true, note: None, sha: None, }); } Err(e) => { // Roll back socket registration if container creation failed. coord.unregister_agent(name); coord.notify_manager(&hive_sh4re::HelperEvent::Spawned { agent: name.clone(), ok: false, note: Some(format!("{e:#}")), sha: None, }); return Err(e); } } HostResponse::success() } HostRequest::RequestSpawn { name } => { tracing::info!(%name, "request_spawn"); let id = coord .approvals .submit_kind(name, hive_sh4re::ApprovalKind::Spawn, "")?; tracing::info!(%id, %name, "spawn approval queued"); HostResponse::success() } HostRequest::Kill { name } => { tracing::info!(%name, "kill"); lifecycle::kill(name).await?; coord.unregister_agent(name); coord.notify_manager(&hive_sh4re::HelperEvent::Killed { agent: name.clone(), }); HostResponse::success() } HostRequest::Destroy { name, purge } => { actions::destroy(&coord, name, *purge).await?; HostResponse::success() } HostRequest::Rebuild { name } => { tracing::info!(%name, "rebuild"); let agent_dir = coord.ensure_runtime(name)?; let applied_dir = Coordinator::agent_applied_dir(name); let claude_dir = Coordinator::agent_claude_dir(name); let notes_dir = Coordinator::agent_notes_dir(name); lifecycle::rebuild( name, &coord.hyperhive_flake, &agent_dir, &applied_dir, &claude_dir, ¬es_dir, coord.dashboard_port, ) .await?; HostResponse::success() } HostRequest::List => HostResponse::list(lifecycle::list().await?), HostRequest::Pending => HostResponse::pending(coord.approvals.pending()?), HostRequest::Approve { id } => { actions::approve(coord.clone(), *id).await?; HostResponse::success() } HostRequest::Deny { id } => { actions::deny(&coord, *id, None).await?; HostResponse::success() } }) } .await; match result { Ok(r) => r, Err(e) => HostResponse::error(format!("{e:#}")), } }