//! Manager socket listener. Privileged tool surface: agent-style send/recv //! plus lifecycle verbs (Phase 4). Phase 5 will gate Spawn/Kill behind the //! commit-approval flow; for now they hit the same code path the host admin //! socket uses. use std::sync::Arc; use anyhow::{Context, Result}; use hive_sh4re::{MANAGER_AGENT, ManagerRequest, ManagerResponse, Message}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; use crate::coordinator::Coordinator; use crate::lifecycle; pub fn start(coord: Arc) -> Result<()> { let dir = Coordinator::manager_dir(); std::fs::create_dir_all(&dir) .with_context(|| format!("create manager dir {}", dir.display()))?; let socket = Coordinator::manager_socket_path(); if socket.exists() { std::fs::remove_file(&socket).context("remove stale manager socket")?; } let listener = UnixListener::bind(&socket) .with_context(|| format!("bind manager socket {}", socket.display()))?; tracing::info!(socket = %socket.display(), "manager socket listening"); tokio::spawn(async move { loop { match listener.accept().await { Ok((stream, _)) => { let coord = coord.clone(); tokio::spawn(async move { if let Err(e) = serve(stream, coord).await { tracing::warn!(error = ?e, "manager connection failed"); } }); } Err(e) => { tracing::warn!(error = ?e, "manager listener accept failed"); return; } } } }); Ok(()) } async fn serve(stream: UnixStream, coord: Arc) -> Result<()> { let (read, mut write) = stream.into_split(); let mut reader = BufReader::new(read); let mut line = String::new(); loop { line.clear(); let n = reader.read_line(&mut line).await?; if n == 0 { return Ok(()); } let resp = match serde_json::from_str::(line.trim()) { Ok(req) => dispatch(&req, &coord).await, Err(e) => ManagerResponse::Err { message: format!("parse error: {e}"), }, }; let mut payload = serde_json::to_string(&resp)?; payload.push('\n'); write.write_all(payload.as_bytes()).await?; write.flush().await?; } } /// Max long-poll window for manager `Recv`. Same semantics as the /// sub-agent socket: omitted `wait_seconds` (or `0`) = peek and /// return immediately, positive value = park up to that many /// seconds (clamped at MAX). const MANAGER_RECV_LONG_POLL_MAX: std::time::Duration = std::time::Duration::from_secs(180); /// Same shape + rationale as `agent_server::RECV_BATCH_MAX`. Kept /// numerically aligned across surfaces so a tool description that /// quotes the cap stays accurate either way. const MANAGER_RECV_BATCH_MAX: u32 = 32; fn manager_recv_timeout(wait_seconds: Option) -> std::time::Duration { match wait_seconds { Some(s) => std::time::Duration::from_secs(s).min(MANAGER_RECV_LONG_POLL_MAX), None => std::time::Duration::ZERO, } } #[allow(clippy::too_many_lines)] async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResponse { match req { ManagerRequest::Send { to, body } => { if let Err(message) = crate::limits::check_size("send", body) { return ManagerResponse::Err { message }; } if to == "*" { let errors = coord.broadcast_send(MANAGER_AGENT, body); if errors.is_empty() { ManagerResponse::Ok } else { ManagerResponse::Err { message: format!("broadcast failed for agents: {}", errors.join(", ")), } } } else { match coord.broker.send(&Message { from: MANAGER_AGENT.to_owned(), to: to.clone(), body: body.clone(), }) { Ok(()) => ManagerResponse::Ok, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } } ManagerRequest::OperatorMsg { body } => match coord.broker.send(&Message { from: hive_sh4re::OPERATOR_RECIPIENT.to_owned(), to: MANAGER_AGENT.to_owned(), body: body.clone(), }) { Ok(()) => ManagerResponse::Ok, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, }, ManagerRequest::Status => match coord.broker.count_pending(MANAGER_AGENT) { Ok(unread) => ManagerResponse::Status { unread }, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, }, ManagerRequest::Recent { limit } => match coord.broker.recent_for(MANAGER_AGENT, *limit) { Ok(rows) => ManagerResponse::Recent { rows }, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, }, ManagerRequest::Recv { wait_seconds, max } => { let cap = max.unwrap_or(1).min(MANAGER_RECV_BATCH_MAX) as usize; match coord .broker .recv_blocking_batch(MANAGER_AGENT, manager_recv_timeout(*wait_seconds), cap) .await { Ok(deliveries) => ManagerResponse::Messages { messages: deliveries .into_iter() .map(|d| hive_sh4re::DeliveredMessage { from: d.message.from, body: d.message.body, id: d.id, redelivered: d.redelivered, }) .collect(), }, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } ManagerRequest::RequestSpawn { name, description } => { tracing::info!(%name, "manager: request_spawn"); match coord.approvals.submit_kind( name, hive_sh4re::ApprovalKind::Spawn, "", description.as_deref(), ) { Ok(id) => { tracing::info!(%id, %name, "spawn approval queued"); coord.emit_approval_added(id, name, "spawn", None, None, description.clone()); ManagerResponse::Ok } Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } ManagerRequest::Kill { name } => { tracing::info!(%name, "manager: kill"); if name == crate::lifecycle::MANAGER_NAME { return ManagerResponse::Err { message: "refusing to kill the manager".into(), }; } let result: Result<()> = async { lifecycle::kill(name).await?; coord.unregister_agent(name); Ok(()) } .await; match result { Ok(()) => { coord.notify_manager(&hive_sh4re::HelperEvent::Killed { agent: name.clone(), }); ManagerResponse::Ok } Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } ManagerRequest::Start { name } => { tracing::info!(%name, "manager: start"); if name == crate::lifecycle::MANAGER_NAME { return ManagerResponse::Err { message: "refusing to start the manager from itself".into(), }; } match lifecycle::start(name).await { Ok(()) => { coord.kick_agent(name, "container started"); ManagerResponse::Ok } Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } ManagerRequest::Restart { name } => { tracing::info!(%name, "manager: restart"); if name == crate::lifecycle::MANAGER_NAME { return ManagerResponse::Err { message: "refusing to restart the manager from itself".into(), }; } match lifecycle::restart(name).await { Ok(()) => { coord.kick_agent(name, "container restarted"); ManagerResponse::Ok } Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } ManagerRequest::Update { name } => { tracing::info!(%name, "manager: update"); let Some(current_rev) = crate::auto_update::current_flake_rev(&coord.hyperhive_flake) else { return ManagerResponse::Err { message: "update: hyperhive_flake has no canonical path".into(), }; }; let guard = coord.transient_guard(name, crate::coordinator::TransientKind::Rebuilding); let result = crate::auto_update::rebuild_agent(coord, name, ¤t_rev).await; drop(guard); match result { Ok(()) => { coord.kick_agent(name, "container rebuilt"); ManagerResponse::Ok } Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } ManagerRequest::Ask { question, options, multi, ttl_seconds, to, } => crate::questions::handle_ask( coord, MANAGER_AGENT, question, options, *multi, *ttl_seconds, to.as_deref(), ) .map_or_else( |message| ManagerResponse::Err { message }, |id| ManagerResponse::QuestionQueued { id }, ), ManagerRequest::Answer { id, answer } => { crate::questions::handle_answer(coord, MANAGER_AGENT, *id, answer).map_or_else( |message| ManagerResponse::Err { message }, |()| ManagerResponse::Ok, ) } ManagerRequest::GetLogs { agent, lines } => { let n = lines.unwrap_or(50); // `journalctl -M` wants the *machine* name, not the // logical agent name: `gui` → `h-gui`. `container_name` // does that and passes `hm1nd` through unprefixed — but // it doesn't know the broker-logical manager name // `"manager"` (it'd wrongly produce `h-manager`), so // handle that alias explicitly. Either manager spelling // resolves to the unprefixed `hm1nd` machine. let machine = if agent == MANAGER_AGENT { crate::lifecycle::MANAGER_NAME.to_owned() } else { crate::lifecycle::container_name(agent) }; tracing::info!(%agent, %machine, %n, "manager: get_logs"); match tokio::process::Command::new("journalctl") .args([ "-M", &machine, "-n", &n.to_string(), "--no-pager", "--output=short", ]) .output() .await { Ok(out) => { let content = if out.status.success() || !out.stdout.is_empty() { String::from_utf8_lossy(&out.stdout).into_owned() } else { let stderr = String::from_utf8_lossy(&out.stderr); format!("journalctl exited {}: {stderr}", out.status) }; ManagerResponse::Logs { content } } Err(e) => ManagerResponse::Err { message: format!("journalctl spawn failed: {e:#}"), }, } } ManagerRequest::Remind { message, timing, file_path, } => match crate::agent_server::store_remind( coord, MANAGER_AGENT, message, timing, file_path.as_deref(), ) { Ok(()) => ManagerResponse::Ok, Err(message) => ManagerResponse::Err { message }, }, ManagerRequest::RequestApplyCommit { agent, commit_ref, description, } => { tracing::info!(%agent, %commit_ref, "manager: request_apply_commit"); match submit_apply_commit(coord, agent, commit_ref, description.as_deref()).await { Ok((id, sha)) => { tracing::info!(%id, %agent, manager_ref = %commit_ref, %sha, "approval queued + proposal tag planted"); ManagerResponse::Ok } Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } ManagerRequest::GetLooseEnds => match crate::loose_ends::hive_wide(coord) { Ok(loose_ends) => ManagerResponse::LooseEnds { loose_ends }, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, }, ManagerRequest::CountPendingReminders => { match coord.broker.count_pending_reminders_for(MANAGER_AGENT) { Ok(count) => ManagerResponse::PendingRemindersCount { count }, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, } } ManagerRequest::Whoami => ManagerResponse::Whoami { name: MANAGER_AGENT.to_owned(), role: "manager".to_owned(), hyperhive_rev: crate::auto_update::current_flake_rev(&coord.hyperhive_flake), }, ManagerRequest::CancelLooseEnd { kind, id } => crate::questions::handle_cancel_loose_end( coord, MANAGER_AGENT, *kind, *id, ) .map_or_else( |message| ManagerResponse::Err { message }, |()| ManagerResponse::Ok, ), ManagerRequest::AckTurn => match coord.broker.ack_turn(MANAGER_AGENT) { Ok(_n) => ManagerResponse::Ok, Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, }, ManagerRequest::RequeueInflight => match coord.broker.requeue_inflight(MANAGER_AGENT) { Ok(n) => { if n > 0 { tracing::info!(agent = %MANAGER_AGENT, requeued = %n, "requeued in-flight messages"); } ManagerResponse::Ok } Err(e) => ManagerResponse::Err { message: format!("{e:#}"), }, }, } } /// `request_apply_commit` takes a commit SHA only — not a branch or /// tag name. A branch is mutable; pinning the proposal to a concrete /// sha keeps "what the manager asked to deploy" unambiguous and means /// the `proposal/` tag is a faithful record of the request. /// Accepts a 7..=40 char hex string (short or full sha); the exact /// commit is resolved + existence-checked against the proposed repo /// later in `lifecycle::git_fetch_to_tag`. fn validate_commit_ref(commit_ref: &str) -> Result<()> { let n = commit_ref.len(); let hex = commit_ref.chars().all(|c| c.is_ascii_hexdigit()); if !(7..=40).contains(&n) || !hex { anyhow::bail!( "commit_ref '{commit_ref}' is not a commit sha — request_apply_commit \ takes a 7-40 char hex sha, not a branch or tag name" ); } Ok(()) } /// Submit-time half of the apply flow: queue the approval row, then /// fetch the manager's commit from the proposed repo into applied and /// pin it as `refs/tags/proposal/`. From this point on the manager /// repo is irrelevant for this approval — even if the manager amends /// or force-pushes, the canonical sha hive-c0re will eventually /// approve/deny lives in applied's object DB. /// /// If anything fails after the row is inserted (sha missing in /// proposed, fs error, git plumbing crash) we mark the row failed and /// surface the error to the manager. We don't try to roll the row /// back — the failure is part of the audit trail. async fn submit_apply_commit( coord: &Arc, agent: &str, commit_ref: &str, description: Option<&str>, ) -> anyhow::Result<(i64, String)> { validate_commit_ref(commit_ref)?; let proposed_dir = crate::coordinator::Coordinator::agent_proposed_dir(agent); let applied_dir = crate::coordinator::Coordinator::agent_applied_dir(agent); if !proposed_dir.exists() { anyhow::bail!( "proposed repo missing for agent '{agent}' (expected at {})", proposed_dir.display() ); } if !applied_dir.join(".git").exists() { anyhow::bail!( "applied repo at {} is uninitialised — spawn the agent first", applied_dir.display() ); } let id = coord .approvals .submit_kind( agent, hive_sh4re::ApprovalKind::ApplyCommit, commit_ref, description, ) .map_err(|e| anyhow::anyhow!("queue approval row: {e:#}"))?; let tag = format!("proposal/{id}"); let sha = match crate::lifecycle::git_fetch_to_tag(&applied_dir, &proposed_dir, commit_ref, &tag) .await { Ok(s) => s, Err(e) => { // Surface the failure on the approval row so the // dashboard reflects it instead of leaving a phantom // pending entry. The note doubles as the operator-visible // explanation of why the approval can't be approved. let note = format!("{e:#}"); let _ = coord.approvals.mark_failed(id, ¬e); coord.emit_approval_resolved( id, agent, "apply_commit", None, "failed", Some(note), description.map(str::to_owned), ); return Err(anyhow::anyhow!("git_fetch_to_tag: {e:#}")); } }; coord .approvals .set_fetched_sha(id, &sha) .map_err(|e| anyhow::anyhow!("persist fetched_sha: {e:#}"))?; // Mirror the freshly-planted proposal/ tag to the forge. if let Err(e) = crate::forge::push_config(agent).await { tracing::warn!(%agent, %id, error = ?e, "forge: push_config after submit failed"); } // Phase 5b: surface the new pending approval on the dashboard // event channel. Compute the diff once here so live subscribers // get a fully-formed row without a snapshot refetch. let sha_short = sha[..sha.len().min(12)].to_owned(); let diff = crate::dashboard::approval_diff(agent, id).await; coord.emit_approval_added( id, agent, "apply_commit", Some(sha_short), Some(diff), description.map(str::to_owned), ); Ok((id, sha)) } /// On `Ask { ttl_seconds: Some(n) }`, sleep n seconds and then try to /// resolve the question with `[expired]`. If the operator (or any /// other path) already answered it, `answer()` returns Err and we /// no-op silently. Otherwise fire a `QuestionAnswered` helper event /// with `answerer = "ttl-watchdog"` so the asker can distinguish a /// real answer from a deadline trip without parsing the answer text. const TTL_SENTINEL: &str = "[expired]"; /// Synthetic `answerer` label used when the ttl watchdog resolves a /// question instead of a real human / agent. Lives in a distinct /// namespace from agent names + the operator so the asker can pattern /// match `event.answerer == "ttl-watchdog"`. const TTL_ANSWERER: &str = "ttl-watchdog"; pub fn spawn_question_watchdog(coord: &Arc, id: i64, ttl_secs: u64) { let coord = coord.clone(); tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_secs(ttl_secs)).await; // Watchdog has its own answerer label so the authorisation // check in `answer()` permits it for any target. We bypass // the public `answer()` path by calling it with the operator // identity, since the operator is always permitted; the // event we fire carries the real watchdog label for observers. if let Ok((question, asker, target)) = coord .questions .answer(id, TTL_SENTINEL, hive_sh4re::OPERATOR_RECIPIENT) { tracing::info!(%id, %asker, "question expired (ttl)"); coord.notify_agent( &asker, &hive_sh4re::HelperEvent::QuestionAnswered { id, question, answer: TTL_SENTINEL.to_owned(), answerer: TTL_ANSWERER.to_owned(), }, ); coord.emit_question_resolved(id, TTL_SENTINEL, TTL_ANSWERER, false, target.as_deref()); } }); } #[cfg(test)] mod tests { use super::validate_commit_ref; #[test] fn accepts_short_and_full_sha() { assert!(validate_commit_ref("e194f78").is_ok()); assert!(validate_commit_ref("e194f7812ab").is_ok()); assert!(validate_commit_ref(&"a".repeat(40)).is_ok()); // Uppercase hex resolves fine through `git rev-parse`. assert!(validate_commit_ref("E194F78").is_ok()); } #[test] fn rejects_branch_and_tag_names() { // The exact bug class this guard exists for. assert!(validate_commit_ref("main").is_err()); assert!(validate_commit_ref("HEAD").is_err()); assert!(validate_commit_ref("deployed/0").is_err()); assert!(validate_commit_ref("feature-branch").is_err()); } #[test] fn rejects_too_short_too_long_and_empty() { assert!(validate_commit_ref("").is_err()); assert!(validate_commit_ref("abc123").is_err()); // 6 chars assert!(validate_commit_ref(&"a".repeat(41)).is_err()); } }