Phase 5b: per-agent config flakes; approve validates + advances commit

This commit is contained in:
müde 2026-05-14 23:09:35 +02:00
parent 22b65d35f3
commit 433c0d212e
6 changed files with 182 additions and 25 deletions

View file

@ -14,22 +14,25 @@ use crate::broker::Broker;
const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents";
const MANAGER_RUNTIME_ROOT: &str = "/run/hyperhive/manager";
const AGENT_STATE_ROOT: &str = "/var/lib/hyperhive/agents";
pub struct Coordinator {
pub broker: Arc<Broker>,
pub approvals: Arc<Approvals>,
pub agent_flake: String,
/// URL of the hyperhive flake (no fragment). Inlined into per-agent
/// `flake.nix` files as `inputs.hyperhive.url`.
pub hyperhive_flake: String,
agents: Mutex<HashMap<String, AgentSocket>>,
}
impl Coordinator {
pub fn open(db_path: &Path, agent_flake: String) -> Result<Self> {
pub fn open(db_path: &Path, hyperhive_flake: String) -> Result<Self> {
let broker = Broker::open(db_path).context("open broker")?;
let approvals = Approvals::open(db_path).context("open approvals")?;
Ok(Self {
broker: Arc::new(broker),
approvals: Arc::new(approvals),
agent_flake,
hyperhive_flake,
agents: Mutex::new(HashMap::new()),
})
}
@ -69,4 +72,8 @@ impl Coordinator {
pub fn manager_socket_path() -> PathBuf {
Self::manager_dir().join("mcp.sock")
}
pub fn agent_config_dir(name: &str) -> PathBuf {
PathBuf::from(format!("{AGENT_STATE_ROOT}/{name}/config"))
}
}

View file

@ -1,4 +1,4 @@
//! Thin async wrappers over `nixos-container`.
//! `nixos-container` lifecycle + per-agent config flake generation.
use std::path::Path;
@ -16,6 +16,9 @@ pub const MANAGER_NAME: &str = "hm1nd";
/// Mount point of the per-agent runtime directory inside the container.
pub const CONTAINER_RUNTIME_MOUNT: &str = "/run/hive";
const GIT_NAME: &str = "hive-c0re";
const GIT_EMAIL: &str = "hive-c0re@hyperhive";
pub fn container_name(name: &str) -> String {
format!("{AGENT_PREFIX}{name}")
}
@ -33,10 +36,17 @@ fn validate(name: &str) -> Result<()> {
Ok(())
}
pub async fn spawn(name: &str, agent_flake: &str, agent_dir: &Path) -> Result<()> {
pub async fn spawn(
name: &str,
hyperhive_flake: &str,
agent_dir: &Path,
config_dir: &Path,
) -> Result<()> {
validate(name)?;
setup_config(config_dir, name, hyperhive_flake).await?;
let container = container_name(name);
run(&["create", &container, "--flake", agent_flake]).await?;
let flake_ref = format!("{}#default", config_dir.display());
run(&["create", &container, "--flake", &flake_ref]).await?;
set_nspawn_flags(&container, agent_dir)?;
run(&["start", &container]).await
}
@ -47,11 +57,18 @@ pub async fn kill(name: &str) -> Result<()> {
run(&["stop", &container]).await
}
pub async fn rebuild(name: &str, agent_flake: &str, agent_dir: &Path) -> Result<()> {
pub async fn rebuild(
name: &str,
hyperhive_flake: &str,
agent_dir: &Path,
config_dir: &Path,
) -> Result<()> {
validate(name)?;
setup_config(config_dir, name, hyperhive_flake).await?;
let container = container_name(name);
let flake_ref = format!("{}#default", config_dir.display());
set_nspawn_flags(&container, agent_dir)?;
run(&["update", &container, "--flake", agent_flake]).await?;
run(&["update", &container, "--flake", &flake_ref]).await?;
// Restart so any nspawn-level changes (bind mounts, networking, etc.) apply.
run(&["stop", &container]).await?;
run(&["start", &container]).await
@ -78,6 +95,113 @@ pub async fn list() -> Result<Vec<String>> {
.collect())
}
/// Ensure `config_dir` exists as a git repo containing a per-agent flake. The
/// `flake.nix` is rewritten every call (so a new hyperhive store path
/// propagates on rebuild); `agent.nix` is written only the first time
/// (manager-editable thereafter).
pub async fn setup_config(config_dir: &Path, name: &str, hyperhive_flake: &str) -> Result<()> {
std::fs::create_dir_all(config_dir)
.with_context(|| format!("create {}", config_dir.display()))?;
let flake_path = config_dir.join("flake.nix");
let flake_body = format!(
r#"{{
description = "hyperhive sub-agent {name}";
inputs.hyperhive.url = "{hyperhive_flake}";
outputs =
{{ hyperhive, ... }}:
{{
nixosConfigurations.default = hyperhive.nixosConfigurations.agent-base.extendModules {{
modules = [ ./agent.nix ];
}};
}};
}}
"#,
);
std::fs::write(&flake_path, flake_body)
.with_context(|| format!("write {}", flake_path.display()))?;
let agent_path = config_dir.join("agent.nix");
if !agent_path.exists() {
let initial = format!(
"{{ ... }}:\n{{\n # Per-agent overrides for {name}. The manager edits this\n # file (and commits) to customise the agent's NixOS config.\n}}\n",
);
std::fs::write(&agent_path, initial)
.with_context(|| format!("write {}", agent_path.display()))?;
}
if !config_dir.join(".git").exists() {
git(config_dir, &["init", "--initial-branch=main"]).await?;
}
git(config_dir, &["add", "-A"]).await?;
let clean = git_status(config_dir, &["diff", "--cached", "--quiet"]).await?;
if !clean {
git(
config_dir,
&[
"-c",
&format!("user.name={GIT_NAME}"),
"-c",
&format!("user.email={GIT_EMAIL}"),
"commit",
"-m",
"hive-c0re sync",
],
)
.await?;
}
Ok(())
}
/// Verify `commit_ref` exists in the config repo, advance `main` to it, and
/// reset the working tree. Caller is responsible for the subsequent rebuild.
pub async fn apply_commit(config_dir: &Path, commit_ref: &str) -> Result<()> {
let st = Command::new("git")
.current_dir(config_dir)
.args(["cat-file", "-e", commit_ref])
.status()
.await
.with_context(|| format!("git cat-file in {}", config_dir.display()))?;
if !st.success() {
bail!(
"commit {commit_ref} not found in {}",
config_dir.display()
);
}
git(config_dir, &["update-ref", "refs/heads/main", commit_ref]).await?;
git(config_dir, &["reset", "--hard", commit_ref]).await?;
Ok(())
}
async fn git(dir: &Path, args: &[&str]) -> Result<()> {
let out = Command::new("git")
.current_dir(dir)
.args(args)
.output()
.await
.with_context(|| format!("git {} in {}", args.join(" "), dir.display()))?;
if !out.status.success() {
bail!(
"git {} failed ({}): {}",
args.join(" "),
out.status,
String::from_utf8_lossy(&out.stderr).trim()
);
}
Ok(())
}
/// Returns true if the command exits 0.
async fn git_status(dir: &Path, args: &[&str]) -> Result<bool> {
let st = Command::new("git")
.current_dir(dir)
.args(args)
.status()
.await
.with_context(|| format!("git {} in {}", args.join(" "), dir.display()))?;
Ok(st.success())
}
/// Idempotently rewrite the `EXTRA_NSPAWN_FLAGS` line in
/// `/etc/nixos-containers/<container>.conf`. The start script expands this
/// variable unquoted into the `systemd-nspawn` command.

View file

@ -31,9 +31,10 @@ struct Cli {
enum Cmd {
/// Run the coordinator daemon.
Serve {
/// Flake reference for the agent base template.
#[arg(long, default_value = "/etc/hyperhive#agent-base")]
agent_flake: String,
/// URL of the hyperhive flake. Inlined into each per-agent
/// `flake.nix` as the `hyperhive` input.
#[arg(long, default_value = "/etc/hyperhive")]
hyperhive_flake: String,
/// Path to the sqlite message store.
#[arg(long, default_value = "/var/lib/hyperhive/broker.sqlite")]
db: PathBuf,
@ -65,8 +66,11 @@ async fn main() -> Result<()> {
let cli = Cli::parse();
match cli.cmd {
Cmd::Serve { agent_flake, db } => {
let coord = Arc::new(Coordinator::open(&db, agent_flake)?);
Cmd::Serve {
hyperhive_flake,
db,
} => {
let coord = Arc::new(Coordinator::open(&db, hyperhive_flake)?);
manager_server::start(coord.clone())?;
server::serve(&cli.socket, coord).await
}

View file

@ -95,7 +95,10 @@ async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse
tracing::info!(%name, "manager: spawn");
let result: Result<()> = async {
let agent_dir = coord.register_agent(name)?;
if let Err(e) = lifecycle::spawn(name, &coord.agent_flake, &agent_dir).await {
let config_dir = Coordinator::agent_config_dir(name);
if let Err(e) =
lifecycle::spawn(name, &coord.hyperhive_flake, &agent_dir, &config_dir).await
{
coord.unregister_agent(name);
return Err(e);
}

View file

@ -20,7 +20,7 @@ pub async fn serve(socket: &Path, coord: Arc<Coordinator>) -> Result<()> {
let listener = UnixListener::bind(socket)
.with_context(|| format!("bind admin socket {}", socket.display()))?;
tracing::info!(socket = %socket.display(), agent_flake = %coord.agent_flake, "hive-c0re admin listening");
tracing::info!(socket = %socket.display(), hyperhive_flake = %coord.hyperhive_flake, "hive-c0re admin listening");
loop {
let (stream, _) = listener.accept().await.context("accept connection")?;
@ -61,7 +61,10 @@ async fn dispatch(req: &HostRequest, coord: &Coordinator) -> HostResponse {
HostRequest::Spawn { name } => {
tracing::info!(%name, "spawn");
let agent_dir = coord.register_agent(name)?;
if let Err(e) = lifecycle::spawn(name, &coord.agent_flake, &agent_dir).await {
let config_dir = Coordinator::agent_config_dir(name);
if let Err(e) =
lifecycle::spawn(name, &coord.hyperhive_flake, &agent_dir, &config_dir).await
{
// Roll back socket registration if container creation failed.
coord.unregister_agent(name);
return Err(e);
@ -77,18 +80,29 @@ async fn dispatch(req: &HostRequest, coord: &Coordinator) -> HostResponse {
HostRequest::Rebuild { name } => {
tracing::info!(%name, "rebuild");
let agent_dir = coord.register_agent(name)?;
lifecycle::rebuild(name, &coord.agent_flake, &agent_dir).await?;
let config_dir = Coordinator::agent_config_dir(name);
lifecycle::rebuild(name, &coord.hyperhive_flake, &agent_dir, &config_dir).await?;
HostResponse::success()
}
HostRequest::List => HostResponse::list(lifecycle::list().await?),
HostRequest::Pending => HostResponse::pending(coord.approvals.pending()?),
HostRequest::Approve { id } => {
let approval = coord.approvals.mark_approved(*id)?;
tracing::info!(%approval.id, %approval.agent, %approval.commit_ref, "approval applied: rebuilding agent");
tracing::info!(%approval.id, %approval.agent, %approval.commit_ref, "approval applied: advancing main + rebuilding");
let agent_dir = coord.register_agent(&approval.agent)?;
if let Err(e) =
lifecycle::rebuild(&approval.agent, &coord.agent_flake, &agent_dir).await
{
let config_dir = Coordinator::agent_config_dir(&approval.agent);
let result: anyhow::Result<()> = async {
lifecycle::apply_commit(&config_dir, &approval.commit_ref).await?;
lifecycle::rebuild(
&approval.agent,
&coord.hyperhive_flake,
&agent_dir,
&config_dir,
)
.await
}
.await;
if let Err(e) = result {
let note = format!("{e:#}");
let _ = coord.approvals.mark_failed(approval.id, &note);
return Err(e);