//! Manager socket listener. Privileged tool surface: agent-style send/recv //! plus lifecycle verbs (Phase 4). Phase 5 will gate Spawn/Kill behind the //! commit-approval flow; for now they hit the same code path the host admin //! socket uses. use std::sync::Arc; use anyhow::{Context, Result}; use hive_sh4re::{MANAGER_AGENT, ManagerRequest, ManagerResponse, Message}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; use crate::coordinator::Coordinator; use crate::lifecycle; pub fn start(coord: Arc) -> Result<()> { let dir = Coordinator::manager_dir(); std::fs::create_dir_all(&dir) .with_context(|| format!("create manager dir {}", dir.display()))?; let socket = Coordinator::manager_socket_path(); if socket.exists() { std::fs::remove_file(&socket).context("remove stale manager socket")?; } let listener = UnixListener::bind(&socket) .with_context(|| format!("bind manager socket {}", socket.display()))?; tracing::info!(socket = %socket.display(), "manager socket listening"); tokio::spawn(async move { loop { match listener.accept().await { Ok((stream, _)) => { let coord = coord.clone(); tokio::spawn(async move { if let Err(e) = serve(stream, coord).await { tracing::warn!(error = ?e, "manager connection failed"); } }); } Err(e) => { tracing::warn!(error = ?e, "manager listener accept failed"); return; } } } }); Ok(()) } async fn serve(stream: UnixStream, coord: Arc) -> Result<()> { let (read, mut write) = stream.into_split(); let mut reader = BufReader::new(read); let mut line = String::new(); loop { line.clear(); let n = reader.read_line(&mut line).await?; if n == 0 { return Ok(()); } let resp = match serde_json::from_str::(line.trim()) { Ok(req) => dispatch(&req, &coord).await, Err(e) => ManagerResponse::Err { message: format!("parse error: {e}"), }, }; let mut payload = serde_json::to_string(&resp)?; payload.push('\n'); write.write_all(payload.as_bytes()).await?; write.flush().await?; } } const MANAGER_RECV_LONG_POLL: std::time::Duration = std::time::Duration::from_secs(30); #[allow(clippy::too_many_lines)] async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse { match req { ManagerRequest::Send { to, body } => match coord.broker.send(&Message { from: MANAGER_AGENT.to_owned(), to: to.clone(), body: body.clone(), }) { Ok(()) => ManagerResponse::Ok, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, }, ManagerRequest::OperatorMsg { body } => match coord.broker.send(&Message { from: hive_sh4re::OPERATOR_RECIPIENT.to_owned(), to: MANAGER_AGENT.to_owned(), body: body.clone(), }) { Ok(()) => ManagerResponse::Ok, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, }, ManagerRequest::Status => match coord.broker.count_pending(MANAGER_AGENT) { Ok(unread) => ManagerResponse::Status { unread }, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, }, ManagerRequest::Recv => match coord .broker .recv_blocking(MANAGER_AGENT, MANAGER_RECV_LONG_POLL) .await { Ok(Some(msg)) => ManagerResponse::Message { from: msg.from, body: msg.body, }, Ok(None) => ManagerResponse::Empty, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, }, ManagerRequest::RequestSpawn { name } => { tracing::info!(%name, "manager: request_spawn"); match coord .approvals .submit_kind(name, hive_sh4re::ApprovalKind::Spawn, "") { Ok(id) => { tracing::info!(%id, %name, "spawn approval queued"); ManagerResponse::Ok } Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } ManagerRequest::Kill { name } => { tracing::info!(%name, "manager: kill"); if name == crate::lifecycle::MANAGER_NAME { return ManagerResponse::Err { message: "refusing to kill the manager".into(), }; } let result: Result<()> = async { lifecycle::kill(name).await?; coord.unregister_agent(name); Ok(()) } .await; match result { Ok(()) => { coord.notify_manager(&hive_sh4re::HelperEvent::Killed { agent: name.clone(), }); ManagerResponse::Ok } Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } ManagerRequest::Start { name } => { tracing::info!(%name, "manager: start"); if name == crate::lifecycle::MANAGER_NAME { return ManagerResponse::Err { message: "refusing to start the manager from itself".into(), }; } match lifecycle::start(name).await { Ok(()) => ManagerResponse::Ok, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } ManagerRequest::Restart { name } => { tracing::info!(%name, "manager: restart"); if name == crate::lifecycle::MANAGER_NAME { return ManagerResponse::Err { message: "refusing to restart the manager from itself".into(), }; } match lifecycle::restart(name).await { Ok(()) => ManagerResponse::Ok, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } ManagerRequest::AskOperator { question, options } => { tracing::info!(%question, ?options, "manager: ask_operator"); match coord.questions.submit(MANAGER_AGENT, question, options) { Ok(id) => { tracing::info!(%id, "operator question queued"); ManagerResponse::QuestionQueued { id } } Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } ManagerRequest::RequestApplyCommit { agent, commit_ref } => { tracing::info!(%agent, %commit_ref, "manager: request_apply_commit"); match coord.approvals.submit(agent, commit_ref) { Ok(id) => { tracing::info!(%id, %agent, %commit_ref, "approval queued"); ManagerResponse::Ok } Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } } }