//! Global rebuild queue. //! //! Every long-running container/meta operation (rebuild, meta-update, //! first-spawn) goes through this queue. A single background worker //! drains it in FIFO order so we never overlap two `nixos-container //! update` runs on the same agent and never start a fresh agent rebuild //! while a meta-update's lock bump is mid-flight. //! //! ## Why one queue //! //! Before this module landed, four independent call paths could fire //! `auto_update::rebuild_agent` concurrently: //! - dashboard manual rebuild button //! - `update-all` / `meta-update` cascade //! - approval handler (apply-commit / spawn) //! - startup auto-update sweep //! //! Nothing serialised them. nix-daemon serialises the actual store //! ops, but the rest of `rebuild_agent` (token sync, kick, rescan, //! lock-bump emit) interleaved unpredictably. The single-worker queue //! gives operators a visible, ordered runway and lets the UI render //! "what's about to happen" instead of "something might be happening //! somewhere." //! //! ## Scope //! //! In-queue kinds: //! - `Rebuild` — a single-agent rebuild (covers manual / approval-driven / //! auto-update / meta-update cascade variants — they all funnel here). //! - `MetaUpdate` — `nix flake update` on the meta flake. The worker //! runs the lock bump itself, then enqueues a cascade of `Rebuild` //! entries with `parent_id` set to the meta-update's id. //! - `Spawn` — first-deploy of an agent (approval-driven). Same //! serialisation as `Rebuild` from the operator's POV. //! - `Destroy` — for future use (`destroy --purge` does real I/O); not //! currently routed through the queue. //! //! Out of scope (intentionally not queued — these are sub-second ops //! and adding them adds visual noise without serving the "one at a //! time" goal): //! - `start` / `stop` / `restart` //! - `kill` //! //! ## Dedup //! //! Enqueueing `(kind, agent)` that already has a `Queued` entry returns //! the existing entry's id and appends the new reason as an //! "also requested by …" line. Running entries do not dedup — a //! re-queue during a run is legitimate (something changed since the //! current run started). use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use serde::Serialize; use tokio::sync::Notify; /// What the queue can run. Each variant maps to a specific worker /// execution path; `agent` (in `QueueEntry`) names the target where /// relevant. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize)] #[serde(rename_all = "snake_case")] pub enum QueueKind { /// Rebuild a single agent's container (`auto_update::rebuild_agent`). Rebuild, /// Run `nix flake update` on the meta flake. Triggers cascade /// `Rebuild` entries (with `parent_id`) once the lock bump lands. MetaUpdate, /// First-deploy spawn of a new agent (approval-driven). Spawn, /// Destroy with `--purge` (real fs work). Not yet routed here; the /// variant exists so the wire shape doesn't need to change later. Destroy, } impl QueueKind { pub fn as_str(self) -> &'static str { match self { QueueKind::Rebuild => "rebuild", QueueKind::MetaUpdate => "meta_update", QueueKind::Spawn => "spawn", QueueKind::Destroy => "destroy", } } } /// Where the enqueue request originated. Drives the "why" chip on the /// dashboard and lets the UI group cascade entries under their parent /// without parsing the reason text. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] #[serde(rename_all = "snake_case")] pub enum QueueSource { /// Operator clicked rebuild / update-all / meta-update on the /// dashboard, or any other direct human action (CLI, manager tool). Manual, /// Spawned as a cascade from a `MetaUpdate` entry's lock-bump /// fan-out. The `parent_id` on the `QueueEntry` points back at /// the originating meta-update. MetaUpdate, /// `auto_update::run` startup sweep — rebuild every container on /// hive-c0re boot. AutoUpdate, /// Crash recovery path (future use — currently no auto-rebuild on /// crash, but the variant exists for the imminent feature). CrashRecover, } impl QueueSource { pub fn as_str(self) -> &'static str { match self { QueueSource::Manual => "manual", QueueSource::MetaUpdate => "meta_update", QueueSource::AutoUpdate => "auto_update", QueueSource::CrashRecover => "crash_recover", } } } /// Lifecycle state of an entry. `Done` / `Failed` / `Cancelled` are /// retained in the queue snapshot for a short tail (`MAX_HISTORY_PER_KIND`) /// so the dashboard can show "last few" runs alongside live state. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] #[serde(rename_all = "snake_case")] pub enum QueueState { Queued, Running, Done, Failed, Cancelled, } impl QueueState { pub fn is_terminal(self) -> bool { matches!(self, QueueState::Done | QueueState::Failed | QueueState::Cancelled) } } /// A single queue entry — what's pending, running, or recently finished. /// Serialised verbatim onto the dashboard event channel and the /// `/api/state` snapshot. #[derive(Debug, Clone, Serialize)] pub struct QueueEntry { /// Monotonic per-process id. Stable for the lifetime of the entry /// so SSE upserts land in place rather than churning the list. pub id: u64, /// Target agent name, or the literal `"hyperhive"` for entries /// (MetaUpdate) that affect the meta flake rather than a single /// agent. pub agent: String, pub kind: QueueKind, pub state: QueueState, pub source: QueueSource, /// Groups cascade entries under their originating parent. For a /// `MetaUpdate` entry this is `None`; for the per-agent rebuilds /// the worker enqueues after the lock bump it's `Some(meta_id)`. #[serde(default, skip_serializing_if = "Option::is_none")] pub parent_id: Option, /// Human-readable "why" — populated by the enqueuer (`"manual via /// dashboard"`, `"meta-update cascade (hyperhive bumped)"`, /// `"startup sweep"`). Free-form; dedup appends `(also requested /// by …)` lines on repeated enqueues. pub reason: String, pub enqueued_at: i64, #[serde(default, skip_serializing_if = "Option::is_none")] pub started_at: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub finished_at: Option, /// Populated when `state == Failed`. Carries the worker's error /// string (already truncated to a reasonable length by the caller). #[serde(default, skip_serializing_if = "Option::is_none")] pub error: Option, /// `MetaUpdate`-only payload: the list of meta flake inputs to run /// through `nix flake update`. Empty / absent on `Rebuild` / /// `Spawn` / `Destroy` entries; absent on the wire (never /// serialised) when the entry kind doesn't have meaningful inputs. #[serde(default, skip_serializing_if = "Vec::is_empty")] pub inputs: Vec, } /// How many terminal-state entries (`Done` / `Failed` / `Cancelled`) /// to retain per kind in the snapshot. Older entries get evicted to /// keep `/api/state` tight; the live event channel is unaffected. const MAX_HISTORY_PER_KIND: usize = 5; /// Inner state guarded by a single mutex. Held briefly — every /// operation is constant-time relative to the queue's depth, and /// the depths in practice are tiny (single-digit). #[derive(Debug, Default)] struct Inner { entries: VecDeque, next_id: u64, } /// Global rebuild queue. Lives on `Coordinator` (one per hive-c0re /// process). The associated `Notify` wakes the worker when something /// new arrives. #[derive(Debug)] pub struct RebuildQueue { inner: Mutex, /// Worker wakes on this signal. The worker checks the queue and /// loops back to `notified().await` when there's nothing to run. pub(crate) notify: Notify, } impl Default for RebuildQueue { fn default() -> Self { Self { inner: Mutex::new(Inner::default()), notify: Notify::new(), } } } impl RebuildQueue { pub fn new() -> Self { Self::default() } /// Add an entry to the queue. Returns the entry's id (newly-allocated /// or — on dedup — the existing entry's id with the new reason /// appended). /// /// Dedup rule: a `Queued` entry with the same `(kind, agent)` swallows /// the new request and returns its existing id. Running and terminal /// entries do not dedup — operators are free to re-queue a rebuild /// that's currently running (something changed since it started) or /// re-run one that just finished. pub fn enqueue( &self, kind: QueueKind, agent: String, source: QueueSource, reason: String, parent_id: Option, ) -> u64 { self.enqueue_with_inputs(kind, agent, source, reason, parent_id, Vec::new()) } /// Same as `enqueue` but carries an `inputs` payload — used by /// `MetaUpdate` enqueues to tell the worker which meta-flake /// inputs to bump. pub fn enqueue_with_inputs( &self, kind: QueueKind, agent: String, source: QueueSource, reason: String, parent_id: Option, inputs: Vec, ) -> u64 { let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned"); // Dedup against a pending entry with the same (kind, agent). for entry in inner.entries.iter_mut() { if entry.state == QueueState::Queued && entry.kind == kind && entry.agent == agent { if !entry.reason.contains(&reason) { entry.reason.push_str(&format!("\nalso requested by: {reason}")); } return entry.id; } } inner.next_id += 1; let id = inner.next_id; let entry = QueueEntry { id, agent, kind, state: QueueState::Queued, source, parent_id, reason, enqueued_at: now_unix(), started_at: None, finished_at: None, error: None, inputs, }; inner.entries.push_back(entry); // Wake the worker. `notify_one` is a no-op when there's no // waiter; the next `notified().await` returns immediately. self.notify.notify_one(); id } /// Pop the next `Queued` entry and mark it `Running`. Returns the /// entry (a clone — the original stays in the queue so live state /// reflects "this is currently running"). Returns `None` when there's /// nothing queued. pub fn take_next(&self) -> Option { let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned"); let pos = inner .entries .iter() .position(|e| e.state == QueueState::Queued)?; let entry = &mut inner.entries[pos]; entry.state = QueueState::Running; entry.started_at = Some(now_unix()); Some(entry.clone()) } /// Mark an entry terminal. `error` is populated for `Failed`; /// `Done` / `Cancelled` ignore it. Trims the history tail. pub fn finish(&self, id: u64, state: QueueState, error: Option) { debug_assert!(state.is_terminal(), "finish() called with non-terminal {state:?}"); let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned"); if let Some(entry) = inner.entries.iter_mut().find(|e| e.id == id) { entry.state = state; entry.finished_at = Some(now_unix()); entry.error = error.filter(|_| state == QueueState::Failed); } Self::trim_history(&mut inner); } /// Snapshot the queue for `/api/state` and `RebuildQueueChanged`. /// Cheap clone — entries are small (~hundreds of bytes each). pub fn snapshot(&self) -> Vec { let inner = self.inner.lock().expect("rebuild_queue mutex poisoned"); inner.entries.iter().cloned().collect() } /// Cancel a `Queued` entry (no-op for `Running` / terminal — the /// in-flight rebuild owns the agent's nix store and can't be /// safely interrupted). Returns true when an entry was cancelled. pub fn cancel(&self, id: u64) -> bool { let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned"); if let Some(entry) = inner.entries.iter_mut().find(|e| e.id == id) { if entry.state == QueueState::Queued { entry.state = QueueState::Cancelled; entry.finished_at = Some(now_unix()); Self::trim_history(&mut inner); return true; } } false } /// Keep only the most recent `MAX_HISTORY_PER_KIND` terminal entries /// per kind. Pending + running entries are never evicted. fn trim_history(inner: &mut Inner) { let mut counts: std::collections::HashMap = std::collections::HashMap::new(); // Walk newest-first; keep the first MAX_HISTORY_PER_KIND // terminals per kind, evict the rest. let entries: Vec = inner .entries .iter() .rev() .filter(|e| { if !e.state.is_terminal() { return true; } let n = counts.entry(e.kind).or_insert(0); *n += 1; *n <= MAX_HISTORY_PER_KIND }) .cloned() .collect(); inner.entries = entries.into_iter().rev().collect(); } } /// Background worker that drains the queue. Spawned once at hive-c0re /// startup from `main.rs`. Loops forever: /// 1. Pop the next `Queued` entry (`take_next` marks it `Running` and /// fires a `RebuildQueueChanged` snapshot via the caller). /// 2. Dispatch by kind — single-agent rebuild, meta-update + cascade, /// or first-spawn. /// 3. Mark the entry terminal (`finish`) and emit another snapshot. /// 4. When the queue is empty, `await` on `notify` until something /// new lands. /// /// Shutdown semantics: subscribes to `coord.shutdown_rx()`. On a true /// signal the worker exits after its current entry finishes; pending /// `Queued` entries are dropped (they'll either be replayed by the /// startup sweep on next boot or left for an operator to re-queue). pub async fn run_worker(coord: std::sync::Arc) { let mut shutdown = coord.shutdown_rx(); loop { // Drain everything available now. while let Some(entry) = coord.rebuild_queue.take_next() { coord.emit_rebuild_queue_snapshot(); tracing::info!( id = entry.id, kind = entry.kind.as_str(), agent = %entry.agent, source = entry.source.as_str(), "rebuild_queue: running" ); let result = dispatch(&coord, &entry).await; match result { Ok(()) => { coord.rebuild_queue.finish(entry.id, QueueState::Done, None); tracing::info!(id = entry.id, "rebuild_queue: done"); } Err(e) => { let msg = format!("{e:#}"); let truncated = if msg.len() > 2_000 { format!("{}…", &msg[..2_000]) } else { msg.clone() }; coord .rebuild_queue .finish(entry.id, QueueState::Failed, Some(truncated)); tracing::warn!(id = entry.id, error = %msg, "rebuild_queue: failed"); } } coord.emit_rebuild_queue_snapshot(); } // Park until something new is enqueued OR shutdown fires. tokio::select! { biased; res = shutdown.changed() => { if res.is_err() || *shutdown.borrow() { tracing::info!("rebuild_queue: worker exiting on shutdown"); return; } } _ = coord.rebuild_queue.notify.notified() => { // New entry — back to the drain loop. } } } } /// Run a single queue entry to completion. Kind-dispatched; failures /// bubble up to the worker which marks the entry `Failed`. async fn dispatch( coord: &std::sync::Arc, entry: &QueueEntry, ) -> anyhow::Result<()> { match entry.kind { QueueKind::Rebuild => { let current_rev = crate::auto_update::current_flake_rev(&coord.hyperhive_flake) .unwrap_or_default(); crate::auto_update::rebuild_agent(coord, &entry.agent, ¤t_rev).await } QueueKind::MetaUpdate => run_meta_update(coord, entry).await, QueueKind::Spawn => { // First-deploy spawns route through `actions::approve_spawn` // / `actions::approve_apply_commit` today; they enqueue a // Spawn entry only to claim the queue slot, the actual // spawn work runs inside those handlers before completion. // Keeping this arm a no-op so we don't double-run. tracing::debug!( id = entry.id, agent = %entry.agent, "rebuild_queue: Spawn entry is a queue claim; actual work elsewhere" ); Ok(()) } QueueKind::Destroy => { // Reserved for future `destroy --purge` integration. anyhow::bail!("Destroy kind not yet implemented in rebuild_queue worker"); } } } /// Run one `MetaUpdate` entry: bump the meta flake's locks for the /// requested inputs, then enqueue a cascade of `Rebuild` entries /// (with `parent_id` set to this entry's id) for every agent affected /// by the bump. Mirrors the previous `dashboard::run_meta_update` /// semantics; that path now enqueues into this queue rather than /// running the bump + rebuild loop inline. async fn run_meta_update( coord: &std::sync::Arc, entry: &QueueEntry, ) -> anyhow::Result<()> { let _progress = coord.meta_update_guard(); let inputs = entry.inputs.clone(); tracing::info!(?inputs, parent = entry.id, "rebuild_queue: meta-update starting"); if inputs.is_empty() { crate::meta::lock_update(&[]).await?; } else { crate::meta::lock_update(&inputs).await?; } // Decide which agents to rebuild. Same logic as the previous // `run_meta_update` — anything in the hyperhive subtree affects // every agent; anything in `agent-/...` only the named agent. let touched_hyperhive = inputs .iter() .any(|i| i == "hyperhive" || i.starts_with("hyperhive/")); let touched_agents: Vec = inputs .iter() .filter_map(|i| i.strip_prefix("agent-")) .map(|rest| rest.split('/').next().unwrap_or(rest).to_owned()) .collect(); let agents_to_rebuild: Vec = if touched_hyperhive || inputs.is_empty() { crate::lifecycle::list() .await .unwrap_or_default() .into_iter() .filter_map(|c| { if c == crate::lifecycle::MANAGER_NAME { Some(crate::lifecycle::MANAGER_NAME.to_owned()) } else { c.strip_prefix(crate::lifecycle::AGENT_PREFIX).map(str::to_owned) } }) .collect() } else { touched_agents }; let reason_hint = if inputs.is_empty() { "meta-update cascade (all inputs)".to_owned() } else { format!("meta-update cascade ({})", inputs.join(", ")) }; for name in agents_to_rebuild { coord.rebuild_queue.enqueue( QueueKind::Rebuild, name, QueueSource::MetaUpdate, reason_hint.clone(), Some(entry.id), ); } // Lock file changed — meta-inputs panel re-renders. crate::dashboard::emit_meta_inputs_snapshot(coord.as_ref()); Ok(()) } /// Current unix timestamp in seconds. `now()` calls are pulled into a /// helper so tests can swap them out later. fn now_unix() -> i64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0) } #[cfg(test)] mod tests { use super::*; #[test] fn enqueue_and_take_in_order() { let q = RebuildQueue::new(); let a = q.enqueue( QueueKind::Rebuild, "agent-a".to_owned(), QueueSource::Manual, "first".to_owned(), None, ); let b = q.enqueue( QueueKind::Rebuild, "agent-b".to_owned(), QueueSource::Manual, "second".to_owned(), None, ); assert_ne!(a, b); let next = q.take_next().expect("queued"); assert_eq!(next.id, a); assert_eq!(next.state, QueueState::Running); let next = q.take_next().expect("queued"); assert_eq!(next.id, b); assert!(q.take_next().is_none()); } #[test] fn dedup_pending_same_kind_and_agent() { let q = RebuildQueue::new(); let a = q.enqueue( QueueKind::Rebuild, "agent-a".to_owned(), QueueSource::Manual, "first".to_owned(), None, ); let b = q.enqueue( QueueKind::Rebuild, "agent-a".to_owned(), QueueSource::AutoUpdate, "auto sweep".to_owned(), None, ); assert_eq!(a, b, "dedup should return existing id"); let snap = q.snapshot(); assert_eq!(snap.len(), 1); assert!(snap[0].reason.contains("first")); assert!(snap[0].reason.contains("auto sweep")); } #[test] fn dedup_does_not_apply_across_kinds_or_agents() { let q = RebuildQueue::new(); let a = q.enqueue( QueueKind::Rebuild, "agent-a".to_owned(), QueueSource::Manual, "r".to_owned(), None, ); let b = q.enqueue( QueueKind::Rebuild, "agent-b".to_owned(), QueueSource::Manual, "r".to_owned(), None, ); let c = q.enqueue( QueueKind::Spawn, "agent-a".to_owned(), QueueSource::Manual, "s".to_owned(), None, ); assert_ne!(a, b); assert_ne!(a, c); assert_eq!(q.snapshot().len(), 3); } #[test] fn dedup_skips_running_entries() { let q = RebuildQueue::new(); q.enqueue( QueueKind::Rebuild, "agent-a".to_owned(), QueueSource::Manual, "first".to_owned(), None, ); let running = q.take_next().expect("queued"); assert_eq!(running.state, QueueState::Running); // While the original is running, re-enqueue is legitimate. let again = q.enqueue( QueueKind::Rebuild, "agent-a".to_owned(), QueueSource::Manual, "config bumped during build".to_owned(), None, ); assert_ne!(running.id, again); let snap = q.snapshot(); assert_eq!(snap.len(), 2); } #[test] fn finish_marks_state_and_keeps_history() { let q = RebuildQueue::new(); let id = q.enqueue( QueueKind::Rebuild, "agent-a".to_owned(), QueueSource::Manual, "r".to_owned(), None, ); q.take_next(); q.finish(id, QueueState::Done, None); let snap = q.snapshot(); assert_eq!(snap.len(), 1); assert_eq!(snap[0].state, QueueState::Done); assert!(snap[0].finished_at.is_some()); assert!(snap[0].error.is_none()); } #[test] fn finish_with_failure_records_error() { let q = RebuildQueue::new(); let id = q.enqueue( QueueKind::Rebuild, "agent-a".to_owned(), QueueSource::Manual, "r".to_owned(), None, ); q.take_next(); q.finish(id, QueueState::Failed, Some("nix build failed".to_owned())); let snap = q.snapshot(); assert_eq!(snap[0].state, QueueState::Failed); assert_eq!(snap[0].error.as_deref(), Some("nix build failed")); } #[test] fn history_evicts_old_terminals_per_kind() { let q = RebuildQueue::new(); for i in 0..(MAX_HISTORY_PER_KIND + 3) { let id = q.enqueue( QueueKind::Rebuild, format!("agent-{i}"), QueueSource::Manual, "r".to_owned(), None, ); q.take_next(); q.finish(id, QueueState::Done, None); } let snap = q.snapshot(); assert_eq!(snap.len(), MAX_HISTORY_PER_KIND); } #[test] fn cancel_clears_queued_entry() { let q = RebuildQueue::new(); let id = q.enqueue( QueueKind::Rebuild, "agent-a".to_owned(), QueueSource::Manual, "r".to_owned(), None, ); assert!(q.cancel(id)); let snap = q.snapshot(); assert_eq!(snap[0].state, QueueState::Cancelled); assert!(q.take_next().is_none()); } #[test] fn cancel_refuses_running_entry() { let q = RebuildQueue::new(); let id = q.enqueue( QueueKind::Rebuild, "agent-a".to_owned(), QueueSource::Manual, "r".to_owned(), None, ); q.take_next(); assert!(!q.cancel(id)); let snap = q.snapshot(); assert_eq!(snap[0].state, QueueState::Running); } #[test] fn parent_id_groups_cascade() { let q = RebuildQueue::new(); let meta = q.enqueue( QueueKind::MetaUpdate, "hyperhive".to_owned(), QueueSource::Manual, "lock bump".to_owned(), None, ); let child = q.enqueue( QueueKind::Rebuild, "agent-a".to_owned(), QueueSource::MetaUpdate, "cascade".to_owned(), Some(meta), ); let snap = q.snapshot(); let child_entry = snap.iter().find(|e| e.id == child).expect("child queued"); assert_eq!(child_entry.parent_id, Some(meta)); } }