auto_update: rebuild all on startup, needs_update = applied HEAD vs deployed sha
This commit is contained in:
parent
4814aaefdb
commit
433bc85b91
4 changed files with 44 additions and 139 deletions
|
|
@ -1,12 +1,9 @@
|
||||||
//! Startup auto-update: on `hive-c0re serve` boot, rebuild any sub-agent
|
//! Startup auto-update: on `hive-c0re serve` boot, rebuild every known
|
||||||
//! container whose recorded "hyperhive rev" differs from the current one,
|
//! container unconditionally. `nixos-container update` is a no-op at the
|
||||||
//! then write the new rev as the marker. Skips rebuild when nothing changed
|
//! nix level when nothing changed (same store path), so the cost of always
|
||||||
//! so warm restarts are near-free.
|
//! running it on startup is low and avoids the complexity of rev-marker
|
||||||
//!
|
//! staleness (issue #179: all agents always needed update when any meta
|
||||||
//! "Rev" is the canonical filesystem path of the configured hyperhive flake
|
//! commit landed).
|
||||||
//! (e.g. `/nix/store/<hash>-source` when `/etc/hyperhive` is a symlink the
|
|
||||||
//! NixOS module wires up). For non-path flake URLs we don't have a cheap rev
|
|
||||||
//! signal, so auto-update is a no-op — operators rebuild manually.
|
|
||||||
|
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
@ -38,50 +35,29 @@ pub fn current_flake_rev(hyperhive_flake: &str) -> Option<String> {
|
||||||
.map(|p| p.display().to_string())
|
.map(|p| p.display().to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read the current git HEAD of the meta flake at
|
/// Returns true when the applied repo has commits that have not yet been
|
||||||
/// `/var/lib/hyperhive/meta`. Returns `None` when the repo does not exist
|
/// deployed (i.e. the applied HEAD differs from the sha currently locked in
|
||||||
/// or `git rev-parse HEAD` fails (non-path flake, first-boot before
|
/// meta's flake.lock). This is the semantic the dashboard `needs_update` chip
|
||||||
/// `sync_agents` has run, etc.). Callers treat `None` as "unknown" and
|
/// conveys: "there is a config change ready to apply via rebuild."
|
||||||
/// skip the meta-rev component of the combined marker.
|
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn current_meta_rev() -> Option<String> {
|
pub fn agent_config_pending(name: &str, deployed_sha: Option<&str>) -> bool {
|
||||||
let out = std::process::Command::new("git")
|
let applied_head = std::process::Command::new("git")
|
||||||
.args(["-C", "/var/lib/hyperhive/meta", "rev-parse", "HEAD"])
|
.args([
|
||||||
|
"-C",
|
||||||
|
&format!("/var/lib/hyperhive/applied/{name}"),
|
||||||
|
"rev-parse",
|
||||||
|
"HEAD",
|
||||||
|
])
|
||||||
.output()
|
.output()
|
||||||
.ok()?;
|
|
||||||
if !out.status.success() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
let rev = String::from_utf8(out.stdout).ok()?;
|
|
||||||
let rev = rev.trim().to_owned();
|
|
||||||
if rev.is_empty() {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(rev)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Combine the hyperhive package rev and the optional meta flake rev into
|
|
||||||
/// one opaque marker string stored on disk. Including the meta rev means a
|
|
||||||
/// `sync_agents` run that rewrites the meta flake (e.g. adding a new
|
|
||||||
/// `HIVE_CONTEXT_WINDOW_TOKENS_*` env var) is detected and triggers a
|
|
||||||
/// container rebuild on the next hive-c0re boot.
|
|
||||||
#[must_use]
|
|
||||||
pub fn combined_rev(hyperhive_rev: &str, meta_rev: Option<&str>) -> String {
|
|
||||||
match meta_rev {
|
|
||||||
Some(m) => format!("{hyperhive_rev}:{m}"),
|
|
||||||
None => hyperhive_rev.to_owned(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read the marker for `name` and return whether the recorded rev matches
|
|
||||||
/// `current_rev`. Missing/unreadable marker counts as out-of-date.
|
|
||||||
#[must_use]
|
|
||||||
pub fn agent_needs_update(name: &str, current_rev: &str) -> bool {
|
|
||||||
let prev = std::fs::read_to_string(rev_marker_path(name))
|
|
||||||
.ok()
|
.ok()
|
||||||
|
.filter(|o| o.status.success())
|
||||||
|
.and_then(|o| String::from_utf8(o.stdout).ok())
|
||||||
.map(|s| s.trim().to_owned());
|
.map(|s| s.trim().to_owned());
|
||||||
prev.as_deref() != Some(current_rev)
|
|
||||||
|
match (applied_head.as_deref(), deployed_sha) {
|
||||||
|
(Some(head), Some(sha)) => !head.starts_with(sha) && !sha.starts_with(head),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Rebuild one sub-agent and refresh its marker. Used by both the startup
|
/// Rebuild one sub-agent and refresh its marker. Used by both the startup
|
||||||
|
|
@ -159,10 +135,7 @@ pub async fn rebuild_agent(coord: &Arc<Coordinator>, name: &str, current_rev: &s
|
||||||
/// the approval queue — manager is required infrastructure. Idempotent.
|
/// the approval queue — manager is required infrastructure. Idempotent.
|
||||||
pub async fn ensure_manager(coord: &Arc<Coordinator>) -> Result<()> {
|
pub async fn ensure_manager(coord: &Arc<Coordinator>) -> Result<()> {
|
||||||
let existing = lifecycle::list().await.unwrap_or_default();
|
let existing = lifecycle::list().await.unwrap_or_default();
|
||||||
let flake_rev = current_flake_rev(&coord.hyperhive_flake);
|
let current_rev = current_flake_rev(&coord.hyperhive_flake);
|
||||||
let meta_rev = current_meta_rev();
|
|
||||||
let current_rev =
|
|
||||||
flake_rev.as_deref().map(|f| combined_rev(f, meta_rev.as_deref()));
|
|
||||||
if existing.iter().any(|c| c == MANAGER_NAME) {
|
if existing.iter().any(|c| c == MANAGER_NAME) {
|
||||||
// Container exists already. If it predates the unified lifecycle
|
// Container exists already. If it predates the unified lifecycle
|
||||||
// (no applied flake on disk) we must rebuild — otherwise it's
|
// (no applied flake on disk) we must rebuild — otherwise it's
|
||||||
|
|
@ -176,7 +149,7 @@ pub async fn ensure_manager(coord: &Arc<Coordinator>) -> Result<()> {
|
||||||
"manager container exists but no applied flake — forcing rebuild to migrate"
|
"manager container exists but no applied flake — forcing rebuild to migrate"
|
||||||
);
|
);
|
||||||
let coord_clone = coord.clone();
|
let coord_clone = coord.clone();
|
||||||
if let Err(e) = rebuild_agent(&coord_clone, MANAGER_NAME, rev).await {
|
if let Err(e) = rebuild_agent(&coord_clone, MANAGER_NAME, rev.as_str()).await {
|
||||||
tracing::warn!(error = ?e, "manager migration rebuild failed");
|
tracing::warn!(error = ?e, "manager migration rebuild failed");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -204,29 +177,16 @@ pub async fn ensure_manager(coord: &Arc<Coordinator>) -> Result<()> {
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
if let Some(rev) = current_rev {
|
if let Some(rev) = current_rev {
|
||||||
let _ = std::fs::write(rev_marker_path(MANAGER_NAME), rev);
|
let _ = std::fs::write(rev_marker_path(MANAGER_NAME), &rev);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Rebuild every sub-agent whose marker differs from the current rev. Logs
|
/// Rebuild every container on startup. Sequential to avoid nix-store sqlite
|
||||||
/// per-agent outcomes and continues past failures. Returns Ok even if some
|
/// races and keep logs readable. Returns Ok even if some rebuilds failed.
|
||||||
/// rebuilds failed — startup shouldn't be blocked by a broken agent.
|
|
||||||
pub async fn run(coord: Arc<Coordinator>) -> Result<()> {
|
pub async fn run(coord: Arc<Coordinator>) -> Result<()> {
|
||||||
let Some(flake_rev) = current_flake_rev(&coord.hyperhive_flake) else {
|
// Bump meta's hyperhive input up-front so per-agent rebuilds build
|
||||||
tracing::info!(
|
// against the latest base. Non-fatal on failure.
|
||||||
flake = %coord.hyperhive_flake,
|
|
||||||
"auto-update: hyperhive_flake has no canonical path; skipping",
|
|
||||||
);
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
let meta_rev = current_meta_rev();
|
|
||||||
let current_rev = combined_rev(&flake_rev, meta_rev.as_deref());
|
|
||||||
tracing::info!(rev = %current_rev, "auto-update: scanning agents");
|
|
||||||
|
|
||||||
// Bump meta's hyperhive input up-front so the per-agent rebuilds
|
|
||||||
// below build against the new base. Failure here is logged but
|
|
||||||
// not fatal — individual rebuilds will surface concrete errors.
|
|
||||||
if let Err(e) = crate::meta::lock_update_hyperhive().await {
|
if let Err(e) = crate::meta::lock_update_hyperhive().await {
|
||||||
tracing::warn!(error = ?e, "auto-update: meta lock_update_hyperhive failed");
|
tracing::warn!(error = ?e, "auto-update: meta lock_update_hyperhive failed");
|
||||||
}
|
}
|
||||||
|
|
@ -239,27 +199,17 @@ pub async fn run(coord: Arc<Coordinator>) -> Result<()> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Sequential, one agent at a time. Parallel rebuilds collide on
|
let current_rev =
|
||||||
// nix-store's sqlite cache (the "sqlite db busy, not using
|
current_flake_rev(&coord.hyperhive_flake).unwrap_or_default();
|
||||||
// cache" warning) and also race the meta-lock mutex; the
|
|
||||||
// resulting log interleave was bad enough on its own. Builds
|
tracing::info!(agents = containers.len(), "auto-update: rebuilding all on startup");
|
||||||
// serialize on nix-daemon internally anyway, so this isn't a
|
|
||||||
// throughput loss in practice.
|
|
||||||
for container in containers {
|
for container in containers {
|
||||||
// Manager and sub-agents share the same lifecycle now; both go
|
|
||||||
// through rebuild_agent with name-derived paths.
|
|
||||||
let logical = if container == MANAGER_NAME {
|
let logical = if container == MANAGER_NAME {
|
||||||
Some(MANAGER_NAME.to_owned())
|
Some(MANAGER_NAME.to_owned())
|
||||||
} else {
|
} else {
|
||||||
container.strip_prefix(AGENT_PREFIX).map(str::to_owned)
|
container.strip_prefix(AGENT_PREFIX).map(str::to_owned)
|
||||||
};
|
};
|
||||||
let Some(name) = logical else {
|
let Some(name) = logical else { continue };
|
||||||
continue;
|
|
||||||
};
|
|
||||||
if !agent_needs_update(&name, ¤t_rev) {
|
|
||||||
tracing::debug!(%name, "auto-update: up-to-date");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if let Err(e) = rebuild_agent(&coord, &name, ¤t_rev).await {
|
if let Err(e) = rebuild_agent(&coord, &name, ¤t_rev).await {
|
||||||
tracing::warn!(%name, error = ?e, "auto-update: rebuild failed");
|
tracing::warn!(%name, error = ?e, "auto-update: rebuild failed");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -56,11 +56,6 @@ pub struct ContainerView {
|
||||||
/// resolves every per-agent attribute the dashboard surfaces.
|
/// resolves every per-agent attribute the dashboard surfaces.
|
||||||
pub async fn build_all(coord: &Coordinator) -> Vec<ContainerView> {
|
pub async fn build_all(coord: &Coordinator) -> Vec<ContainerView> {
|
||||||
let raw = lifecycle::list().await.unwrap_or_default();
|
let raw = lifecycle::list().await.unwrap_or_default();
|
||||||
let current_rev = crate::auto_update::current_flake_rev(&coord.hyperhive_flake)
|
|
||||||
.map(|flake_rev| {
|
|
||||||
let meta_rev = crate::auto_update::current_meta_rev();
|
|
||||||
crate::auto_update::combined_rev(&flake_rev, meta_rev.as_deref())
|
|
||||||
});
|
|
||||||
let locked = read_meta_locked_revs();
|
let locked = read_meta_locked_revs();
|
||||||
let mut out = Vec::new();
|
let mut out = Vec::new();
|
||||||
for c in &raw {
|
for c in &raw {
|
||||||
|
|
@ -71,13 +66,11 @@ pub async fn build_all(coord: &Coordinator) -> Vec<ContainerView> {
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
let needs_update =
|
let deployed_full = locked.get(&format!("agent-{logical}")).map(|s| s.as_str());
|
||||||
current_rev.as_deref().is_some_and(|rev| crate::auto_update::agent_needs_update(&logical, rev));
|
let needs_update = crate::auto_update::agent_config_pending(&logical, deployed_full);
|
||||||
let needs_login =
|
let needs_login =
|
||||||
!is_manager && !claude_has_session(&Coordinator::agent_claude_dir(&logical));
|
!is_manager && !claude_has_session(&Coordinator::agent_claude_dir(&logical));
|
||||||
let deployed_sha = locked
|
let deployed_sha = deployed_full.map(|s| s[..s.len().min(12)].to_owned());
|
||||||
.get(&format!("agent-{logical}"))
|
|
||||||
.map(|s| s[..s.len().min(12)].to_owned());
|
|
||||||
// Recipient name the broker uses for this agent — sub-agents
|
// Recipient name the broker uses for this agent — sub-agents
|
||||||
// are addressed by logical name, the manager by the
|
// are addressed by logical name, the manager by the
|
||||||
// MANAGER_AGENT constant. Mirrors the rest of the broker
|
// MANAGER_AGENT constant. Mirrors the rest of the broker
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
//! Per-container state watcher. Polls every managed container on a
|
//! Per-container state watcher. Polls every managed container on a
|
||||||
//! fixed interval, tracks three orthogonal state-sets across ticks,
|
//! fixed interval, tracks two orthogonal state-sets across ticks,
|
||||||
//! and emits a `HelperEvent` to the manager on each transition:
|
//! and emits a `HelperEvent` to the manager on each transition:
|
||||||
//!
|
//!
|
||||||
//! - **running**: container is up. running → stopped without an
|
//! - **running**: container is up. running → stopped without an
|
||||||
|
|
@ -8,9 +8,9 @@
|
||||||
//! - **logged-in**: claude session dir is populated. ! → ✓ →
|
//! - **logged-in**: claude session dir is populated. ! → ✓ →
|
||||||
//! `LoggedIn`; ✓ → ! → `NeedsLogin` (rare — usually only fires
|
//! `LoggedIn`; ✓ → ! → `NeedsLogin` (rare — usually only fires
|
||||||
//! on a fresh spawn / purge).
|
//! on a fresh spawn / purge).
|
||||||
//! - **up-to-date**: agent's recorded flake rev matches current. ✓
|
//!
|
||||||
//! → ! → `NeedsUpdate`. The reverse direction (`NeedsUpdate`
|
//! `NeedsUpdate` events are now fired from the apply-commit path
|
||||||
//! resolved) is covered by `Rebuilt`, so no separate event.
|
//! directly rather than via rev-marker polling (issue #179 cleanup).
|
||||||
//!
|
//!
|
||||||
//! D-Bus subscription would be lower-latency for the first axis,
|
//! D-Bus subscription would be lower-latency for the first axis,
|
||||||
//! but polling is simpler and a 10s detection delay is fine.
|
//! but polling is simpler and a 10s detection delay is fine.
|
||||||
|
|
@ -30,14 +30,11 @@ pub fn spawn(coord: Arc<Coordinator>) {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut prev_running: HashSet<String> = HashSet::new();
|
let mut prev_running: HashSet<String> = HashSet::new();
|
||||||
let mut prev_logged_in: HashSet<String> = HashSet::new();
|
let mut prev_logged_in: HashSet<String> = HashSet::new();
|
||||||
let mut prev_updated: HashSet<String> = HashSet::new();
|
|
||||||
let mut seeded = false;
|
let mut seeded = false;
|
||||||
loop {
|
loop {
|
||||||
let raw = lifecycle::list().await.unwrap_or_default();
|
let raw = lifecycle::list().await.unwrap_or_default();
|
||||||
let current_rev = crate::auto_update::current_flake_rev(&coord.hyperhive_flake);
|
|
||||||
let mut current_running = HashSet::new();
|
let mut current_running = HashSet::new();
|
||||||
let mut current_logged_in = HashSet::new();
|
let mut current_logged_in = HashSet::new();
|
||||||
let mut current_updated = HashSet::new();
|
|
||||||
let mut sub_agents: Vec<String> = Vec::new();
|
let mut sub_agents: Vec<String> = Vec::new();
|
||||||
for c in &raw {
|
for c in &raw {
|
||||||
let logical = if c == MANAGER_NAME {
|
let logical = if c == MANAGER_NAME {
|
||||||
|
|
@ -58,17 +55,11 @@ pub fn spawn(coord: Arc<Coordinator>) {
|
||||||
{
|
{
|
||||||
current_logged_in.insert(logical.clone());
|
current_logged_in.insert(logical.clone());
|
||||||
}
|
}
|
||||||
if let Some(rev) = current_rev.as_deref()
|
|
||||||
&& !crate::auto_update::agent_needs_update(&logical, rev)
|
|
||||||
{
|
|
||||||
current_updated.insert(logical.clone());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if seeded {
|
if seeded {
|
||||||
emit_crash_transitions(&coord, &prev_running, ¤t_running);
|
emit_crash_transitions(&coord, &prev_running, ¤t_running);
|
||||||
emit_login_transitions(&coord, &prev_logged_in, ¤t_logged_in, &sub_agents);
|
emit_login_transitions(&coord, &prev_logged_in, ¤t_logged_in, &sub_agents);
|
||||||
emit_update_transitions(&coord, &prev_updated, ¤t_updated, &sub_agents);
|
|
||||||
}
|
}
|
||||||
// Periodic container rescan — catches state flips that
|
// Periodic container rescan — catches state flips that
|
||||||
// happen outside our mutation surface (operator runs
|
// happen outside our mutation surface (operator runs
|
||||||
|
|
@ -78,7 +69,6 @@ pub fn spawn(coord: Arc<Coordinator>) {
|
||||||
coord.rescan_containers_and_emit().await;
|
coord.rescan_containers_and_emit().await;
|
||||||
prev_running = current_running;
|
prev_running = current_running;
|
||||||
prev_logged_in = current_logged_in;
|
prev_logged_in = current_logged_in;
|
||||||
prev_updated = current_updated;
|
|
||||||
seeded = true;
|
seeded = true;
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
|
@ -151,28 +141,3 @@ fn emit_login_transitions(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn emit_update_transitions(
|
|
||||||
coord: &Coordinator,
|
|
||||||
prev_updated: &HashSet<String>,
|
|
||||||
current_updated: &HashSet<String>,
|
|
||||||
sub_agents: &[String],
|
|
||||||
) {
|
|
||||||
// Fired on the "was up-to-date, now isn't" transition. The
|
|
||||||
// reverse (rebuilt) is already covered by HelperEvent::Rebuilt.
|
|
||||||
let prev_stale: HashSet<&str> = sub_agents
|
|
||||||
.iter()
|
|
||||||
.map(String::as_str)
|
|
||||||
.filter(|n| !prev_updated.contains(*n))
|
|
||||||
.collect();
|
|
||||||
let current_stale: HashSet<&str> = sub_agents
|
|
||||||
.iter()
|
|
||||||
.map(String::as_str)
|
|
||||||
.filter(|n| !current_updated.contains(*n))
|
|
||||||
.collect();
|
|
||||||
for agent in current_stale.difference(&prev_stale) {
|
|
||||||
tracing::info!(%agent, "agent needs update");
|
|
||||||
coord.notify_manager(&hive_sh4re::HelperEvent::NeedsUpdate {
|
|
||||||
agent: (*agent).to_owned(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -1684,9 +1684,6 @@ async fn post_update_all(State(state): State<AppState>) -> Response {
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
if !crate::auto_update::agent_needs_update(&logical, ¤t_rev) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if let Err(e) =
|
if let Err(e) =
|
||||||
crate::auto_update::rebuild_agent(&state.coord, &logical, ¤t_rev).await
|
crate::auto_update::rebuild_agent(&state.coord, &logical, ¤t_rev).await
|
||||||
{
|
{
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue