From 5890e6796af08663a20941a658ef3e796bd326a5 Mon Sep 17 00:00:00 2001 From: damocles Date: Sat, 23 May 2026 11:45:10 +0200 Subject: [PATCH] rebuild_queue: add module with types + dedup + cancellation --- hive-c0re/src/main.rs | 1 + hive-c0re/src/rebuild_queue.rs | 564 +++++++++++++++++++++++++++++++++ 2 files changed, 565 insertions(+) create mode 100644 hive-c0re/src/rebuild_queue.rs diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 07e6d17..39794a7 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -27,6 +27,7 @@ mod meta; mod migrate; mod operator_questions; mod questions; +mod rebuild_queue; mod reminder_scheduler; mod server; diff --git a/hive-c0re/src/rebuild_queue.rs b/hive-c0re/src/rebuild_queue.rs new file mode 100644 index 0000000..8bc8943 --- /dev/null +++ b/hive-c0re/src/rebuild_queue.rs @@ -0,0 +1,564 @@ +//! Global rebuild queue. +//! +//! Every long-running container/meta operation (rebuild, meta-update, +//! first-spawn) goes through this queue. A single background worker +//! drains it in FIFO order so we never overlap two `nixos-container +//! update` runs on the same agent and never start a fresh agent rebuild +//! while a meta-update's lock bump is mid-flight. +//! +//! ## Why one queue +//! +//! Before this module landed, four independent call paths could fire +//! `auto_update::rebuild_agent` concurrently: +//! - dashboard manual rebuild button +//! - `update-all` / `meta-update` cascade +//! - approval handler (apply-commit / spawn) +//! - startup auto-update sweep +//! +//! Nothing serialised them. nix-daemon serialises the actual store +//! ops, but the rest of `rebuild_agent` (token sync, kick, rescan, +//! lock-bump emit) interleaved unpredictably. The single-worker queue +//! gives operators a visible, ordered runway and lets the UI render +//! "what's about to happen" instead of "something might be happening +//! somewhere." +//! +//! ## Scope +//! +//! In-queue kinds: +//! - `Rebuild` — a single-agent rebuild (covers manual / approval-driven / +//! auto-update / meta-update cascade variants — they all funnel here). +//! - `MetaUpdate` — `nix flake update` on the meta flake. The worker +//! runs the lock bump itself, then enqueues a cascade of `Rebuild` +//! entries with `parent_id` set to the meta-update's id. +//! - `Spawn` — first-deploy of an agent (approval-driven). Same +//! serialisation as `Rebuild` from the operator's POV. +//! - `Destroy` — for future use (`destroy --purge` does real I/O); not +//! currently routed through the queue. +//! +//! Out of scope (intentionally not queued — these are sub-second ops +//! and adding them adds visual noise without serving the "one at a +//! time" goal): +//! - `start` / `stop` / `restart` +//! - `kill` +//! +//! ## Dedup +//! +//! Enqueueing `(kind, agent)` that already has a `Queued` entry returns +//! the existing entry's id and appends the new reason as an +//! "also requested by …" line. Running entries do not dedup — a +//! re-queue during a run is legitimate (something changed since the +//! current run started). + +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; + +use serde::Serialize; +use tokio::sync::Notify; + +/// What the queue can run. Each variant maps to a specific worker +/// execution path; `agent` (in `QueueEntry`) names the target where +/// relevant. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum QueueKind { + /// Rebuild a single agent's container (`auto_update::rebuild_agent`). + Rebuild, + /// Run `nix flake update` on the meta flake. Triggers cascade + /// `Rebuild` entries (with `parent_id`) once the lock bump lands. + MetaUpdate, + /// First-deploy spawn of a new agent (approval-driven). + Spawn, + /// Destroy with `--purge` (real fs work). Not yet routed here; the + /// variant exists so the wire shape doesn't need to change later. + Destroy, +} + +impl QueueKind { + pub fn as_str(self) -> &'static str { + match self { + QueueKind::Rebuild => "rebuild", + QueueKind::MetaUpdate => "meta_update", + QueueKind::Spawn => "spawn", + QueueKind::Destroy => "destroy", + } + } +} + +/// Where the enqueue request originated. Drives the "why" chip on the +/// dashboard and lets the UI group cascade entries under their parent +/// without parsing the reason text. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum QueueSource { + /// Operator clicked rebuild / update-all / meta-update on the + /// dashboard, or any other direct human action (CLI, manager tool). + Manual, + /// Spawned as a cascade from a `MetaUpdate` entry's lock-bump + /// fan-out. The `parent_id` on the `QueueEntry` points back at + /// the originating meta-update. + MetaUpdate, + /// `auto_update::run` startup sweep — rebuild every container on + /// hive-c0re boot. + AutoUpdate, + /// Crash recovery path (future use — currently no auto-rebuild on + /// crash, but the variant exists for the imminent feature). + CrashRecover, +} + +impl QueueSource { + pub fn as_str(self) -> &'static str { + match self { + QueueSource::Manual => "manual", + QueueSource::MetaUpdate => "meta_update", + QueueSource::AutoUpdate => "auto_update", + QueueSource::CrashRecover => "crash_recover", + } + } +} + +/// Lifecycle state of an entry. `Done` / `Failed` / `Cancelled` are +/// retained in the queue snapshot for a short tail (`MAX_HISTORY_PER_KIND`) +/// so the dashboard can show "last few" runs alongside live state. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum QueueState { + Queued, + Running, + Done, + Failed, + Cancelled, +} + +impl QueueState { + pub fn is_terminal(self) -> bool { + matches!(self, QueueState::Done | QueueState::Failed | QueueState::Cancelled) + } +} + +/// A single queue entry — what's pending, running, or recently finished. +/// Serialised verbatim onto the dashboard event channel and the +/// `/api/state` snapshot. +#[derive(Debug, Clone, Serialize)] +pub struct QueueEntry { + /// Monotonic per-process id. Stable for the lifetime of the entry + /// so SSE upserts land in place rather than churning the list. + pub id: u64, + /// Target agent name, or the literal `"hyperhive"` for entries + /// (MetaUpdate) that affect the meta flake rather than a single + /// agent. + pub agent: String, + pub kind: QueueKind, + pub state: QueueState, + pub source: QueueSource, + /// Groups cascade entries under their originating parent. For a + /// `MetaUpdate` entry this is `None`; for the per-agent rebuilds + /// the worker enqueues after the lock bump it's `Some(meta_id)`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub parent_id: Option, + /// Human-readable "why" — populated by the enqueuer (`"manual via + /// dashboard"`, `"meta-update cascade (hyperhive bumped)"`, + /// `"startup sweep"`). Free-form; dedup appends `(also requested + /// by …)` lines on repeated enqueues. + pub reason: String, + pub enqueued_at: i64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub started_at: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub finished_at: Option, + /// Populated when `state == Failed`. Carries the worker's error + /// string (already truncated to a reasonable length by the caller). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// How many terminal-state entries (`Done` / `Failed` / `Cancelled`) +/// to retain per kind in the snapshot. Older entries get evicted to +/// keep `/api/state` tight; the live event channel is unaffected. +const MAX_HISTORY_PER_KIND: usize = 5; + +/// Inner state guarded by a single mutex. Held briefly — every +/// operation is constant-time relative to the queue's depth, and +/// the depths in practice are tiny (single-digit). +#[derive(Debug, Default)] +struct Inner { + entries: VecDeque, + next_id: u64, +} + +/// Global rebuild queue. Lives on `Coordinator` (one per hive-c0re +/// process). The associated `Notify` wakes the worker when something +/// new arrives. +#[derive(Debug)] +pub struct RebuildQueue { + inner: Mutex, + /// Worker wakes on this signal. The worker checks the queue and + /// loops back to `notified().await` when there's nothing to run. + pub(crate) notify: Notify, +} + +impl Default for RebuildQueue { + fn default() -> Self { + Self { + inner: Mutex::new(Inner::default()), + notify: Notify::new(), + } + } +} + +impl RebuildQueue { + pub fn new() -> Self { + Self::default() + } + + /// Add an entry to the queue. Returns the entry's id (newly-allocated + /// or — on dedup — the existing entry's id with the new reason + /// appended). + /// + /// Dedup rule: a `Queued` entry with the same `(kind, agent)` swallows + /// the new request and returns its existing id. Running and terminal + /// entries do not dedup — operators are free to re-queue a rebuild + /// that's currently running (something changed since it started) or + /// re-run one that just finished. + pub fn enqueue( + &self, + kind: QueueKind, + agent: String, + source: QueueSource, + reason: String, + parent_id: Option, + ) -> u64 { + let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned"); + // Dedup against a pending entry with the same (kind, agent). + for entry in inner.entries.iter_mut() { + if entry.state == QueueState::Queued && entry.kind == kind && entry.agent == agent { + if !entry.reason.contains(&reason) { + entry.reason.push_str(&format!("\nalso requested by: {reason}")); + } + return entry.id; + } + } + inner.next_id += 1; + let id = inner.next_id; + let entry = QueueEntry { + id, + agent, + kind, + state: QueueState::Queued, + source, + parent_id, + reason, + enqueued_at: now_unix(), + started_at: None, + finished_at: None, + error: None, + }; + inner.entries.push_back(entry); + // Wake the worker. `notify_one` is a no-op when there's no + // waiter; the next `notified().await` returns immediately. + self.notify.notify_one(); + id + } + + /// Pop the next `Queued` entry and mark it `Running`. Returns the + /// entry (a clone — the original stays in the queue so live state + /// reflects "this is currently running"). Returns `None` when there's + /// nothing queued. + pub fn take_next(&self) -> Option { + let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned"); + let pos = inner + .entries + .iter() + .position(|e| e.state == QueueState::Queued)?; + let entry = &mut inner.entries[pos]; + entry.state = QueueState::Running; + entry.started_at = Some(now_unix()); + Some(entry.clone()) + } + + /// Mark an entry terminal. `error` is populated for `Failed`; + /// `Done` / `Cancelled` ignore it. Trims the history tail. + pub fn finish(&self, id: u64, state: QueueState, error: Option) { + debug_assert!(state.is_terminal(), "finish() called with non-terminal {state:?}"); + let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned"); + if let Some(entry) = inner.entries.iter_mut().find(|e| e.id == id) { + entry.state = state; + entry.finished_at = Some(now_unix()); + entry.error = error.filter(|_| state == QueueState::Failed); + } + Self::trim_history(&mut inner); + } + + /// Snapshot the queue for `/api/state` and `RebuildQueueChanged`. + /// Cheap clone — entries are small (~hundreds of bytes each). + pub fn snapshot(&self) -> Vec { + let inner = self.inner.lock().expect("rebuild_queue mutex poisoned"); + inner.entries.iter().cloned().collect() + } + + /// Cancel a `Queued` entry (no-op for `Running` / terminal — the + /// in-flight rebuild owns the agent's nix store and can't be + /// safely interrupted). Returns true when an entry was cancelled. + pub fn cancel(&self, id: u64) -> bool { + let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned"); + if let Some(entry) = inner.entries.iter_mut().find(|e| e.id == id) { + if entry.state == QueueState::Queued { + entry.state = QueueState::Cancelled; + entry.finished_at = Some(now_unix()); + Self::trim_history(&mut inner); + return true; + } + } + false + } + + /// Keep only the most recent `MAX_HISTORY_PER_KIND` terminal entries + /// per kind. Pending + running entries are never evicted. + fn trim_history(inner: &mut Inner) { + let mut counts: std::collections::HashMap = + std::collections::HashMap::new(); + // Walk newest-first; keep the first MAX_HISTORY_PER_KIND + // terminals per kind, evict the rest. + let entries: Vec = inner + .entries + .iter() + .rev() + .filter(|e| { + if !e.state.is_terminal() { + return true; + } + let n = counts.entry(e.kind).or_insert(0); + *n += 1; + *n <= MAX_HISTORY_PER_KIND + }) + .cloned() + .collect(); + inner.entries = entries.into_iter().rev().collect(); + } +} + +/// Current unix timestamp in seconds. `now()` calls are pulled into a +/// helper so tests can swap them out later. +fn now_unix() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + .unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn enqueue_and_take_in_order() { + let q = RebuildQueue::new(); + let a = q.enqueue( + QueueKind::Rebuild, + "agent-a".to_owned(), + QueueSource::Manual, + "first".to_owned(), + None, + ); + let b = q.enqueue( + QueueKind::Rebuild, + "agent-b".to_owned(), + QueueSource::Manual, + "second".to_owned(), + None, + ); + assert_ne!(a, b); + let next = q.take_next().expect("queued"); + assert_eq!(next.id, a); + assert_eq!(next.state, QueueState::Running); + let next = q.take_next().expect("queued"); + assert_eq!(next.id, b); + assert!(q.take_next().is_none()); + } + + #[test] + fn dedup_pending_same_kind_and_agent() { + let q = RebuildQueue::new(); + let a = q.enqueue( + QueueKind::Rebuild, + "agent-a".to_owned(), + QueueSource::Manual, + "first".to_owned(), + None, + ); + let b = q.enqueue( + QueueKind::Rebuild, + "agent-a".to_owned(), + QueueSource::AutoUpdate, + "auto sweep".to_owned(), + None, + ); + assert_eq!(a, b, "dedup should return existing id"); + let snap = q.snapshot(); + assert_eq!(snap.len(), 1); + assert!(snap[0].reason.contains("first")); + assert!(snap[0].reason.contains("auto sweep")); + } + + #[test] + fn dedup_does_not_apply_across_kinds_or_agents() { + let q = RebuildQueue::new(); + let a = q.enqueue( + QueueKind::Rebuild, + "agent-a".to_owned(), + QueueSource::Manual, + "r".to_owned(), + None, + ); + let b = q.enqueue( + QueueKind::Rebuild, + "agent-b".to_owned(), + QueueSource::Manual, + "r".to_owned(), + None, + ); + let c = q.enqueue( + QueueKind::Spawn, + "agent-a".to_owned(), + QueueSource::Manual, + "s".to_owned(), + None, + ); + assert_ne!(a, b); + assert_ne!(a, c); + assert_eq!(q.snapshot().len(), 3); + } + + #[test] + fn dedup_skips_running_entries() { + let q = RebuildQueue::new(); + q.enqueue( + QueueKind::Rebuild, + "agent-a".to_owned(), + QueueSource::Manual, + "first".to_owned(), + None, + ); + let running = q.take_next().expect("queued"); + assert_eq!(running.state, QueueState::Running); + // While the original is running, re-enqueue is legitimate. + let again = q.enqueue( + QueueKind::Rebuild, + "agent-a".to_owned(), + QueueSource::Manual, + "config bumped during build".to_owned(), + None, + ); + assert_ne!(running.id, again); + let snap = q.snapshot(); + assert_eq!(snap.len(), 2); + } + + #[test] + fn finish_marks_state_and_keeps_history() { + let q = RebuildQueue::new(); + let id = q.enqueue( + QueueKind::Rebuild, + "agent-a".to_owned(), + QueueSource::Manual, + "r".to_owned(), + None, + ); + q.take_next(); + q.finish(id, QueueState::Done, None); + let snap = q.snapshot(); + assert_eq!(snap.len(), 1); + assert_eq!(snap[0].state, QueueState::Done); + assert!(snap[0].finished_at.is_some()); + assert!(snap[0].error.is_none()); + } + + #[test] + fn finish_with_failure_records_error() { + let q = RebuildQueue::new(); + let id = q.enqueue( + QueueKind::Rebuild, + "agent-a".to_owned(), + QueueSource::Manual, + "r".to_owned(), + None, + ); + q.take_next(); + q.finish(id, QueueState::Failed, Some("nix build failed".to_owned())); + let snap = q.snapshot(); + assert_eq!(snap[0].state, QueueState::Failed); + assert_eq!(snap[0].error.as_deref(), Some("nix build failed")); + } + + #[test] + fn history_evicts_old_terminals_per_kind() { + let q = RebuildQueue::new(); + for i in 0..(MAX_HISTORY_PER_KIND + 3) { + let id = q.enqueue( + QueueKind::Rebuild, + format!("agent-{i}"), + QueueSource::Manual, + "r".to_owned(), + None, + ); + q.take_next(); + q.finish(id, QueueState::Done, None); + } + let snap = q.snapshot(); + assert_eq!(snap.len(), MAX_HISTORY_PER_KIND); + } + + #[test] + fn cancel_clears_queued_entry() { + let q = RebuildQueue::new(); + let id = q.enqueue( + QueueKind::Rebuild, + "agent-a".to_owned(), + QueueSource::Manual, + "r".to_owned(), + None, + ); + assert!(q.cancel(id)); + let snap = q.snapshot(); + assert_eq!(snap[0].state, QueueState::Cancelled); + assert!(q.take_next().is_none()); + } + + #[test] + fn cancel_refuses_running_entry() { + let q = RebuildQueue::new(); + let id = q.enqueue( + QueueKind::Rebuild, + "agent-a".to_owned(), + QueueSource::Manual, + "r".to_owned(), + None, + ); + q.take_next(); + assert!(!q.cancel(id)); + let snap = q.snapshot(); + assert_eq!(snap[0].state, QueueState::Running); + } + + #[test] + fn parent_id_groups_cascade() { + let q = RebuildQueue::new(); + let meta = q.enqueue( + QueueKind::MetaUpdate, + "hyperhive".to_owned(), + QueueSource::Manual, + "lock bump".to_owned(), + None, + ); + let child = q.enqueue( + QueueKind::Rebuild, + "agent-a".to_owned(), + QueueSource::MetaUpdate, + "cascade".to_owned(), + Some(meta), + ); + let snap = q.snapshot(); + let child_entry = snap.iter().find(|e| e.id == child).expect("child queued"); + assert_eq!(child_entry.parent_id, Some(meta)); + } +}