hyperhive/hive-c0re/src/coordinator.rs
müde 313121a6e9 fix: transient state leak via RAII guard
bare set_transient/clear_transient pairs leak the in-memory transient
on task cancellation, panics, or any early return between the two
calls — dashboard then shows the agent stuck in 'rebuilding…'
forever (coder hit this today). add Coordinator::transient_guard
returning a TransientGuard whose Drop clears, and convert every
caller (dashboard lifecycle_action, auto_update::rebuild_agent,
manager_server Update, actions::destroy, actions Spawn task,
migrate phase 4). destroy() now takes &Arc<Coordinator> so it can
hold a guard. existing stuck transients clear on next hive-c0re
restart since transient state is in-memory only.
2026-05-16 19:47:52 +02:00

332 lines
13 KiB
Rust

//! Runtime state + config shared between the host admin socket, the manager
//! socket, and the per-agent sockets: the broker, configured `agent_flake`,
//! and the map of registered agent sockets.
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use anyhow::{Context, Result};
use crate::agent_server::{self, AgentSocket};
use crate::approvals::Approvals;
use crate::broker::Broker;
use crate::operator_questions::OperatorQuestions;
const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents";
const MANAGER_RUNTIME_ROOT: &str = "/run/hyperhive/manager";
/// Manager-editable per-agent config repos. Bind-mounted RW into the manager
/// container as `/agents/<name>/`. Hive-c0re only writes to these on first
/// spawn (initial commit); after that it's manager-only.
const AGENT_STATE_ROOT: &str = "/var/lib/hyperhive/agents";
/// Hive-c0re-only authoritative per-agent config repos. Containers build from
/// these. Manager has no filesystem access; the only way to update is via
/// `request_apply_commit` + user approval.
const APPLIED_STATE_ROOT: &str = "/var/lib/hyperhive/applied";
pub struct Coordinator {
pub broker: Arc<Broker>,
pub approvals: Arc<Approvals>,
pub questions: Arc<OperatorQuestions>,
/// URL of the hyperhive flake (no fragment). Inlined into per-agent
/// `flake.nix` files as `inputs.hyperhive.url`.
pub hyperhive_flake: String,
/// TCP port the host's hive-c0re dashboard listens on. Inlined into
/// each per-agent flake so the agent's web UI can build the right
/// rebuild-button URL pointing back at the dashboard.
pub dashboard_port: u16,
/// Operator pronouns (free text) — `she/her` by default, set via
/// the NixOS module option `services.hive-c0re.operatorPronouns`.
/// Reaches each container as the `HIVE_OPERATOR_PRONOUNS` env var
/// (injected into systemd.services.<harness>.environment by the
/// meta flake); the harness substitutes it into the agent /
/// manager system prompt at boot.
pub operator_pronouns: String,
agents: Mutex<HashMap<String, AgentSocket>>,
/// Agents whose lifecycle action (currently just spawn) is in flight.
/// Read by the dashboard to render a spinner; cleared when the action
/// resolves (success or failure).
transient: Mutex<HashMap<String, TransientState>>,
}
/// Per-agent in-progress state that the dashboard surfaces between approve
/// click and container ready.
#[derive(Debug, Clone)]
pub struct TransientState {
pub kind: TransientKind,
pub since: std::time::Instant,
}
/// RAII handle returned by `Coordinator::transient_guard`. Cleared on
/// drop — including drop-via-cancellation, the path that bare
/// `set_transient` / `clear_transient` pairs leaked through. Holds an
/// `Arc<Coordinator>` so the guard is freely returnable / movable.
pub struct TransientGuard {
coord: Arc<Coordinator>,
name: String,
}
impl Drop for TransientGuard {
fn drop(&mut self) {
self.coord.clear_transient(&self.name);
}
}
#[derive(Debug, Clone, Copy)]
pub enum TransientKind {
/// `lifecycle::spawn` is running (nixos-container create + update + start).
Spawning,
/// `lifecycle::start` is running.
Starting,
/// `lifecycle::kill` is running.
Stopping,
/// `lifecycle::restart` is running.
Restarting,
/// `lifecycle::rebuild` is running (nixos-container update).
Rebuilding,
/// `actions::destroy` is running.
Destroying,
}
impl Coordinator {
pub fn open(
db_path: &Path,
hyperhive_flake: String,
dashboard_port: u16,
operator_pronouns: String,
) -> Result<Self> {
let broker = Broker::open(db_path).context("open broker")?;
let approvals = Approvals::open(db_path).context("open approvals")?;
let questions = OperatorQuestions::open(db_path).context("open operator_questions")?;
Ok(Self {
broker: Arc::new(broker),
approvals: Arc::new(approvals),
questions: Arc::new(questions),
hyperhive_flake,
dashboard_port,
operator_pronouns,
agents: Mutex::new(HashMap::new()),
transient: Mutex::new(HashMap::new()),
})
}
pub fn register_agent(self: &Arc<Self>, name: &str) -> Result<PathBuf> {
// Idempotent: drop any existing listener so re-registration (e.g. on rebuild,
// or after a hive-c0re restart cleared /run/hyperhive) gets a fresh socket.
self.unregister_agent(name);
let agent_dir = Self::agent_dir(name);
std::fs::create_dir_all(&agent_dir)
.with_context(|| format!("create agent dir {}", agent_dir.display()))?;
let socket_path = Self::socket_path(name);
// Hand the full Coordinator to the per-agent socket — it
// needs broker + operator_questions to handle the agent-side
// `ask_operator` tool, not just the broker.
let socket = agent_server::start(name, &socket_path, self.clone())?;
self.agents.lock().unwrap().insert(name.to_owned(), socket);
Ok(agent_dir)
}
pub fn unregister_agent(&self, name: &str) {
if let Some(socket) = self.agents.lock().unwrap().remove(name) {
socket.handle.abort();
let _ = std::fs::remove_file(&socket.path);
}
}
pub fn list_agents(&self) -> Vec<String> {
self.agents.lock().unwrap().keys().cloned().collect()
}
/// Mark an agent as in-progress (only one state per agent for now).
///
/// Prefer `transient_guard` when possible — it auto-clears on drop
/// even if the surrounding future is cancelled (HTTP request
/// aborted, runtime shutdown mid-rebuild, panic between set and
/// clear). The bare `set_transient` / `clear_transient` pair leaks
/// the transient on any of those paths and the dashboard then
/// shows the agent stuck in "rebuilding…" forever.
pub fn set_transient(&self, name: &str, kind: TransientKind) {
self.transient.lock().unwrap().insert(
name.to_owned(),
TransientState {
kind,
since: std::time::Instant::now(),
},
);
}
pub fn clear_transient(&self, name: &str) {
self.transient.lock().unwrap().remove(name);
}
/// Set a transient state and return a guard that clears it on drop.
/// Use this from any path where the surrounding future could be
/// cancelled or panic between set and clear (HTTP handlers, spawned
/// tasks). The guard's `Drop` runs even on task cancellation, so
/// the dashboard's spinner can't get pinned forever.
pub fn transient_guard(self: &Arc<Self>, name: &str, kind: TransientKind) -> TransientGuard {
self.set_transient(name, kind);
TransientGuard {
coord: self.clone(),
name: name.to_owned(),
}
}
pub fn transient_snapshot(&self) -> HashMap<String, TransientState> {
self.transient.lock().unwrap().clone()
}
/// Drop a system message into the given agent's inbox. Wakes the
/// turn loop with a "you were just (re)started" hint — operator
/// caused the transition, agent picks up where it left off
/// (notes are in /state/, last turn is in --continue's session).
/// Best-effort; broker errors are logged but don't propagate.
pub fn kick_agent(&self, name: &str, reason: &str) {
let body = format!(
"{reason}\n\nYou were just (re)started by the operator. \
If you were mid-task, check `/state/` for your notes \
and pick up where you left off. claude's `--continue` \
session is intact, so prior context is still in your \
window."
);
if let Err(e) = self.broker.send(&hive_sh4re::Message {
from: hive_sh4re::SYSTEM_SENDER.to_owned(),
to: name.to_owned(),
body,
}) {
tracing::warn!(error = ?e, %name, "kick_agent: broker.send failed");
}
}
/// Push a `HelperEvent` into the manager's inbox. Encoded as JSON in
/// `Message::body`; sender = `SYSTEM_SENDER`. The manager harness
/// recognises the sender and parses the body. Best-effort: a serde or
/// broker error is logged but does not propagate.
pub fn notify_manager(&self, event: &hive_sh4re::HelperEvent) {
self.notify_agent(hive_sh4re::MANAGER_AGENT, event);
}
/// Push a `HelperEvent` into an arbitrary agent's inbox. Encoded
/// the same way as `notify_manager` (sender = `SYSTEM_SENDER`,
/// body = JSON-encoded event). Used to route `OperatorAnswered`
/// events back to the agent that called `ask_operator`, not just
/// the manager.
pub fn notify_agent(&self, agent: &str, event: &hive_sh4re::HelperEvent) {
let body = match serde_json::to_string(event) {
Ok(s) => s,
Err(e) => {
tracing::warn!(error = ?e, "failed to encode helper event");
return;
}
};
if let Err(e) = self.broker.send(&hive_sh4re::Message {
from: hive_sh4re::SYSTEM_SENDER.to_owned(),
to: agent.to_owned(),
body,
}) {
tracing::warn!(error = ?e, target = %agent, "failed to push helper event");
}
}
/// Deliver `body` to every currently-registered agent, appending the
/// standard broadcast hint. Returns a list of per-agent error strings
/// for any that failed (empty = all ok). The sender's own inbox is
/// included — the hint text tells agents to ignore if no action needed.
pub fn broadcast_send(&self, from: &str, body: &str) -> Vec<String> {
const HINT: &str =
"\n\n⚠️ _hint: this was a broadcast and may not need any action from you_";
let broadcast_body = format!("{body}{HINT}");
let mut errors = Vec::new();
for agent_name in self.list_agents() {
if let Err(e) = self.broker.send(&hive_sh4re::Message {
from: from.to_owned(),
to: agent_name.clone(),
body: broadcast_body.clone(),
}) {
errors.push(format!("{agent_name}: {e}"));
}
}
errors
}
pub fn agent_dir(name: &str) -> PathBuf {
PathBuf::from(format!("{AGENT_RUNTIME_ROOT}/{name}"))
}
pub fn socket_path(name: &str) -> PathBuf {
Self::agent_dir(name).join("mcp.sock")
}
pub fn manager_dir() -> PathBuf {
PathBuf::from(MANAGER_RUNTIME_ROOT)
}
pub fn manager_socket_path() -> PathBuf {
Self::manager_dir().join("mcp.sock")
}
/// Ensure a runtime dir + (for sub-agents) per-agent socket exists. For
/// the manager, `manager_server::start` owns the socket — just return
/// the dir. For sub-agents this is `register_agent` (creates a fresh
/// listener bound to `socket_path(name)`). Source directory of the
/// `/run/hive/mcp.sock` bind that ends up in `set_nspawn_flags`.
pub fn ensure_runtime(self: &Arc<Self>, name: &str) -> Result<PathBuf> {
if name == crate::lifecycle::MANAGER_NAME {
let dir = Self::manager_dir();
std::fs::create_dir_all(&dir)
.with_context(|| format!("create manager dir {}", dir.display()))?;
return Ok(dir);
}
self.register_agent(name)
}
/// Per-agent state root (parent of `config/`, future `prompts/`, etc.).
pub fn agent_state_root(name: &str) -> PathBuf {
PathBuf::from(format!("{AGENT_STATE_ROOT}/{name}"))
}
/// Manager-editable proposed config repo. Bind-mounted into the manager
/// container as `/agents/<name>/config/`.
pub fn agent_proposed_dir(name: &str) -> PathBuf {
Self::agent_state_root(name).join("config")
}
/// Per-agent Claude credentials dir. Bind-mounted RW into the agent
/// container at `/root/.claude` so OAuth state survives container
/// destroy/recreate. Each agent owns its own token lineage — sharing
/// would break on the first refresh-token rotation.
pub fn agent_claude_dir(name: &str) -> PathBuf {
Self::agent_state_root(name).join("claude")
}
/// Per-agent durable knowledge dir. Bind-mounted RW into the agent
/// container at `/state`. Survives destroy/recreate alongside the
/// claude dir. Agents are told (via the system prompt) to write
/// long-lived notes / scratch state here.
pub fn agent_notes_dir(name: &str) -> PathBuf {
Self::agent_state_root(name).join("state")
}
/// Authoritative applied config repo. Hive-c0re-only.
pub fn agent_applied_dir(name: &str) -> PathBuf {
PathBuf::from(format!("{APPLIED_STATE_ROOT}/{name}"))
}
/// Enumerate names that have a persistent state dir under
/// `/var/lib/hyperhive/agents/` (i.e. config / claude creds /
/// notes survive). Includes both currently-existing containers and
/// destroyed-but-kept tombstones; callers filter the latter by
/// subtracting `lifecycle::list()`.
#[must_use]
pub fn kept_state_names() -> Vec<String> {
let Ok(rd) = std::fs::read_dir(AGENT_STATE_ROOT) else {
return Vec::new();
};
let mut out: Vec<String> = rd
.flatten()
.filter(|e| e.file_type().is_ok_and(|t| t.is_dir()))
.filter_map(|e| e.file_name().into_string().ok())
.collect();
out.sort();
out
}
}