rebuild_queue: add module with types + dedup + cancellation
This commit is contained in:
parent
73871f18c3
commit
5890e6796a
2 changed files with 565 additions and 0 deletions
|
|
@ -27,6 +27,7 @@ mod meta;
|
||||||
mod migrate;
|
mod migrate;
|
||||||
mod operator_questions;
|
mod operator_questions;
|
||||||
mod questions;
|
mod questions;
|
||||||
|
mod rebuild_queue;
|
||||||
mod reminder_scheduler;
|
mod reminder_scheduler;
|
||||||
mod server;
|
mod server;
|
||||||
|
|
||||||
|
|
|
||||||
564
hive-c0re/src/rebuild_queue.rs
Normal file
564
hive-c0re/src/rebuild_queue.rs
Normal file
|
|
@ -0,0 +1,564 @@
|
||||||
|
//! Global rebuild queue.
|
||||||
|
//!
|
||||||
|
//! Every long-running container/meta operation (rebuild, meta-update,
|
||||||
|
//! first-spawn) goes through this queue. A single background worker
|
||||||
|
//! drains it in FIFO order so we never overlap two `nixos-container
|
||||||
|
//! update` runs on the same agent and never start a fresh agent rebuild
|
||||||
|
//! while a meta-update's lock bump is mid-flight.
|
||||||
|
//!
|
||||||
|
//! ## Why one queue
|
||||||
|
//!
|
||||||
|
//! Before this module landed, four independent call paths could fire
|
||||||
|
//! `auto_update::rebuild_agent` concurrently:
|
||||||
|
//! - dashboard manual rebuild button
|
||||||
|
//! - `update-all` / `meta-update` cascade
|
||||||
|
//! - approval handler (apply-commit / spawn)
|
||||||
|
//! - startup auto-update sweep
|
||||||
|
//!
|
||||||
|
//! Nothing serialised them. nix-daemon serialises the actual store
|
||||||
|
//! ops, but the rest of `rebuild_agent` (token sync, kick, rescan,
|
||||||
|
//! lock-bump emit) interleaved unpredictably. The single-worker queue
|
||||||
|
//! gives operators a visible, ordered runway and lets the UI render
|
||||||
|
//! "what's about to happen" instead of "something might be happening
|
||||||
|
//! somewhere."
|
||||||
|
//!
|
||||||
|
//! ## Scope
|
||||||
|
//!
|
||||||
|
//! In-queue kinds:
|
||||||
|
//! - `Rebuild` — a single-agent rebuild (covers manual / approval-driven /
|
||||||
|
//! auto-update / meta-update cascade variants — they all funnel here).
|
||||||
|
//! - `MetaUpdate` — `nix flake update` on the meta flake. The worker
|
||||||
|
//! runs the lock bump itself, then enqueues a cascade of `Rebuild`
|
||||||
|
//! entries with `parent_id` set to the meta-update's id.
|
||||||
|
//! - `Spawn` — first-deploy of an agent (approval-driven). Same
|
||||||
|
//! serialisation as `Rebuild` from the operator's POV.
|
||||||
|
//! - `Destroy` — for future use (`destroy --purge` does real I/O); not
|
||||||
|
//! currently routed through the queue.
|
||||||
|
//!
|
||||||
|
//! Out of scope (intentionally not queued — these are sub-second ops
|
||||||
|
//! and adding them adds visual noise without serving the "one at a
|
||||||
|
//! time" goal):
|
||||||
|
//! - `start` / `stop` / `restart`
|
||||||
|
//! - `kill`
|
||||||
|
//!
|
||||||
|
//! ## Dedup
|
||||||
|
//!
|
||||||
|
//! Enqueueing `(kind, agent)` that already has a `Queued` entry returns
|
||||||
|
//! the existing entry's id and appends the new reason as an
|
||||||
|
//! "also requested by …" line. Running entries do not dedup — a
|
||||||
|
//! re-queue during a run is legitimate (something changed since the
|
||||||
|
//! current run started).
|
||||||
|
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use serde::Serialize;
|
||||||
|
use tokio::sync::Notify;
|
||||||
|
|
||||||
|
/// What the queue can run. Each variant maps to a specific worker
|
||||||
|
/// execution path; `agent` (in `QueueEntry`) names the target where
|
||||||
|
/// relevant.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum QueueKind {
|
||||||
|
/// Rebuild a single agent's container (`auto_update::rebuild_agent`).
|
||||||
|
Rebuild,
|
||||||
|
/// Run `nix flake update` on the meta flake. Triggers cascade
|
||||||
|
/// `Rebuild` entries (with `parent_id`) once the lock bump lands.
|
||||||
|
MetaUpdate,
|
||||||
|
/// First-deploy spawn of a new agent (approval-driven).
|
||||||
|
Spawn,
|
||||||
|
/// Destroy with `--purge` (real fs work). Not yet routed here; the
|
||||||
|
/// variant exists so the wire shape doesn't need to change later.
|
||||||
|
Destroy,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl QueueKind {
|
||||||
|
pub fn as_str(self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
QueueKind::Rebuild => "rebuild",
|
||||||
|
QueueKind::MetaUpdate => "meta_update",
|
||||||
|
QueueKind::Spawn => "spawn",
|
||||||
|
QueueKind::Destroy => "destroy",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Where the enqueue request originated. Drives the "why" chip on the
|
||||||
|
/// dashboard and lets the UI group cascade entries under their parent
|
||||||
|
/// without parsing the reason text.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum QueueSource {
|
||||||
|
/// Operator clicked rebuild / update-all / meta-update on the
|
||||||
|
/// dashboard, or any other direct human action (CLI, manager tool).
|
||||||
|
Manual,
|
||||||
|
/// Spawned as a cascade from a `MetaUpdate` entry's lock-bump
|
||||||
|
/// fan-out. The `parent_id` on the `QueueEntry` points back at
|
||||||
|
/// the originating meta-update.
|
||||||
|
MetaUpdate,
|
||||||
|
/// `auto_update::run` startup sweep — rebuild every container on
|
||||||
|
/// hive-c0re boot.
|
||||||
|
AutoUpdate,
|
||||||
|
/// Crash recovery path (future use — currently no auto-rebuild on
|
||||||
|
/// crash, but the variant exists for the imminent feature).
|
||||||
|
CrashRecover,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl QueueSource {
|
||||||
|
pub fn as_str(self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
QueueSource::Manual => "manual",
|
||||||
|
QueueSource::MetaUpdate => "meta_update",
|
||||||
|
QueueSource::AutoUpdate => "auto_update",
|
||||||
|
QueueSource::CrashRecover => "crash_recover",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Lifecycle state of an entry. `Done` / `Failed` / `Cancelled` are
|
||||||
|
/// retained in the queue snapshot for a short tail (`MAX_HISTORY_PER_KIND`)
|
||||||
|
/// so the dashboard can show "last few" runs alongside live state.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum QueueState {
|
||||||
|
Queued,
|
||||||
|
Running,
|
||||||
|
Done,
|
||||||
|
Failed,
|
||||||
|
Cancelled,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl QueueState {
|
||||||
|
pub fn is_terminal(self) -> bool {
|
||||||
|
matches!(self, QueueState::Done | QueueState::Failed | QueueState::Cancelled)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A single queue entry — what's pending, running, or recently finished.
|
||||||
|
/// Serialised verbatim onto the dashboard event channel and the
|
||||||
|
/// `/api/state` snapshot.
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct QueueEntry {
|
||||||
|
/// Monotonic per-process id. Stable for the lifetime of the entry
|
||||||
|
/// so SSE upserts land in place rather than churning the list.
|
||||||
|
pub id: u64,
|
||||||
|
/// Target agent name, or the literal `"hyperhive"` for entries
|
||||||
|
/// (MetaUpdate) that affect the meta flake rather than a single
|
||||||
|
/// agent.
|
||||||
|
pub agent: String,
|
||||||
|
pub kind: QueueKind,
|
||||||
|
pub state: QueueState,
|
||||||
|
pub source: QueueSource,
|
||||||
|
/// Groups cascade entries under their originating parent. For a
|
||||||
|
/// `MetaUpdate` entry this is `None`; for the per-agent rebuilds
|
||||||
|
/// the worker enqueues after the lock bump it's `Some(meta_id)`.
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub parent_id: Option<u64>,
|
||||||
|
/// Human-readable "why" — populated by the enqueuer (`"manual via
|
||||||
|
/// dashboard"`, `"meta-update cascade (hyperhive bumped)"`,
|
||||||
|
/// `"startup sweep"`). Free-form; dedup appends `(also requested
|
||||||
|
/// by …)` lines on repeated enqueues.
|
||||||
|
pub reason: String,
|
||||||
|
pub enqueued_at: i64,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub started_at: Option<i64>,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub finished_at: Option<i64>,
|
||||||
|
/// Populated when `state == Failed`. Carries the worker's error
|
||||||
|
/// string (already truncated to a reasonable length by the caller).
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// How many terminal-state entries (`Done` / `Failed` / `Cancelled`)
|
||||||
|
/// to retain per kind in the snapshot. Older entries get evicted to
|
||||||
|
/// keep `/api/state` tight; the live event channel is unaffected.
|
||||||
|
const MAX_HISTORY_PER_KIND: usize = 5;
|
||||||
|
|
||||||
|
/// Inner state guarded by a single mutex. Held briefly — every
|
||||||
|
/// operation is constant-time relative to the queue's depth, and
|
||||||
|
/// the depths in practice are tiny (single-digit).
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
struct Inner {
|
||||||
|
entries: VecDeque<QueueEntry>,
|
||||||
|
next_id: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Global rebuild queue. Lives on `Coordinator` (one per hive-c0re
|
||||||
|
/// process). The associated `Notify` wakes the worker when something
|
||||||
|
/// new arrives.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct RebuildQueue {
|
||||||
|
inner: Mutex<Inner>,
|
||||||
|
/// Worker wakes on this signal. The worker checks the queue and
|
||||||
|
/// loops back to `notified().await` when there's nothing to run.
|
||||||
|
pub(crate) notify: Notify,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for RebuildQueue {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Mutex::new(Inner::default()),
|
||||||
|
notify: Notify::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RebuildQueue {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add an entry to the queue. Returns the entry's id (newly-allocated
|
||||||
|
/// or — on dedup — the existing entry's id with the new reason
|
||||||
|
/// appended).
|
||||||
|
///
|
||||||
|
/// Dedup rule: a `Queued` entry with the same `(kind, agent)` swallows
|
||||||
|
/// the new request and returns its existing id. Running and terminal
|
||||||
|
/// entries do not dedup — operators are free to re-queue a rebuild
|
||||||
|
/// that's currently running (something changed since it started) or
|
||||||
|
/// re-run one that just finished.
|
||||||
|
pub fn enqueue(
|
||||||
|
&self,
|
||||||
|
kind: QueueKind,
|
||||||
|
agent: String,
|
||||||
|
source: QueueSource,
|
||||||
|
reason: String,
|
||||||
|
parent_id: Option<u64>,
|
||||||
|
) -> u64 {
|
||||||
|
let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned");
|
||||||
|
// Dedup against a pending entry with the same (kind, agent).
|
||||||
|
for entry in inner.entries.iter_mut() {
|
||||||
|
if entry.state == QueueState::Queued && entry.kind == kind && entry.agent == agent {
|
||||||
|
if !entry.reason.contains(&reason) {
|
||||||
|
entry.reason.push_str(&format!("\nalso requested by: {reason}"));
|
||||||
|
}
|
||||||
|
return entry.id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
inner.next_id += 1;
|
||||||
|
let id = inner.next_id;
|
||||||
|
let entry = QueueEntry {
|
||||||
|
id,
|
||||||
|
agent,
|
||||||
|
kind,
|
||||||
|
state: QueueState::Queued,
|
||||||
|
source,
|
||||||
|
parent_id,
|
||||||
|
reason,
|
||||||
|
enqueued_at: now_unix(),
|
||||||
|
started_at: None,
|
||||||
|
finished_at: None,
|
||||||
|
error: None,
|
||||||
|
};
|
||||||
|
inner.entries.push_back(entry);
|
||||||
|
// Wake the worker. `notify_one` is a no-op when there's no
|
||||||
|
// waiter; the next `notified().await` returns immediately.
|
||||||
|
self.notify.notify_one();
|
||||||
|
id
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pop the next `Queued` entry and mark it `Running`. Returns the
|
||||||
|
/// entry (a clone — the original stays in the queue so live state
|
||||||
|
/// reflects "this is currently running"). Returns `None` when there's
|
||||||
|
/// nothing queued.
|
||||||
|
pub fn take_next(&self) -> Option<QueueEntry> {
|
||||||
|
let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned");
|
||||||
|
let pos = inner
|
||||||
|
.entries
|
||||||
|
.iter()
|
||||||
|
.position(|e| e.state == QueueState::Queued)?;
|
||||||
|
let entry = &mut inner.entries[pos];
|
||||||
|
entry.state = QueueState::Running;
|
||||||
|
entry.started_at = Some(now_unix());
|
||||||
|
Some(entry.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark an entry terminal. `error` is populated for `Failed`;
|
||||||
|
/// `Done` / `Cancelled` ignore it. Trims the history tail.
|
||||||
|
pub fn finish(&self, id: u64, state: QueueState, error: Option<String>) {
|
||||||
|
debug_assert!(state.is_terminal(), "finish() called with non-terminal {state:?}");
|
||||||
|
let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned");
|
||||||
|
if let Some(entry) = inner.entries.iter_mut().find(|e| e.id == id) {
|
||||||
|
entry.state = state;
|
||||||
|
entry.finished_at = Some(now_unix());
|
||||||
|
entry.error = error.filter(|_| state == QueueState::Failed);
|
||||||
|
}
|
||||||
|
Self::trim_history(&mut inner);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Snapshot the queue for `/api/state` and `RebuildQueueChanged`.
|
||||||
|
/// Cheap clone — entries are small (~hundreds of bytes each).
|
||||||
|
pub fn snapshot(&self) -> Vec<QueueEntry> {
|
||||||
|
let inner = self.inner.lock().expect("rebuild_queue mutex poisoned");
|
||||||
|
inner.entries.iter().cloned().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cancel a `Queued` entry (no-op for `Running` / terminal — the
|
||||||
|
/// in-flight rebuild owns the agent's nix store and can't be
|
||||||
|
/// safely interrupted). Returns true when an entry was cancelled.
|
||||||
|
pub fn cancel(&self, id: u64) -> bool {
|
||||||
|
let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned");
|
||||||
|
if let Some(entry) = inner.entries.iter_mut().find(|e| e.id == id) {
|
||||||
|
if entry.state == QueueState::Queued {
|
||||||
|
entry.state = QueueState::Cancelled;
|
||||||
|
entry.finished_at = Some(now_unix());
|
||||||
|
Self::trim_history(&mut inner);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Keep only the most recent `MAX_HISTORY_PER_KIND` terminal entries
|
||||||
|
/// per kind. Pending + running entries are never evicted.
|
||||||
|
fn trim_history(inner: &mut Inner) {
|
||||||
|
let mut counts: std::collections::HashMap<QueueKind, usize> =
|
||||||
|
std::collections::HashMap::new();
|
||||||
|
// Walk newest-first; keep the first MAX_HISTORY_PER_KIND
|
||||||
|
// terminals per kind, evict the rest.
|
||||||
|
let entries: Vec<QueueEntry> = inner
|
||||||
|
.entries
|
||||||
|
.iter()
|
||||||
|
.rev()
|
||||||
|
.filter(|e| {
|
||||||
|
if !e.state.is_terminal() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
let n = counts.entry(e.kind).or_insert(0);
|
||||||
|
*n += 1;
|
||||||
|
*n <= MAX_HISTORY_PER_KIND
|
||||||
|
})
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
inner.entries = entries.into_iter().rev().collect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Current unix timestamp in seconds. `now()` calls are pulled into a
|
||||||
|
/// helper so tests can swap them out later.
|
||||||
|
fn now_unix() -> i64 {
|
||||||
|
std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.ok()
|
||||||
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
||||||
|
.unwrap_or(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn enqueue_and_take_in_order() {
|
||||||
|
let q = RebuildQueue::new();
|
||||||
|
let a = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"first".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let b = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-b".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"second".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
assert_ne!(a, b);
|
||||||
|
let next = q.take_next().expect("queued");
|
||||||
|
assert_eq!(next.id, a);
|
||||||
|
assert_eq!(next.state, QueueState::Running);
|
||||||
|
let next = q.take_next().expect("queued");
|
||||||
|
assert_eq!(next.id, b);
|
||||||
|
assert!(q.take_next().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dedup_pending_same_kind_and_agent() {
|
||||||
|
let q = RebuildQueue::new();
|
||||||
|
let a = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"first".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let b = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::AutoUpdate,
|
||||||
|
"auto sweep".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
assert_eq!(a, b, "dedup should return existing id");
|
||||||
|
let snap = q.snapshot();
|
||||||
|
assert_eq!(snap.len(), 1);
|
||||||
|
assert!(snap[0].reason.contains("first"));
|
||||||
|
assert!(snap[0].reason.contains("auto sweep"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dedup_does_not_apply_across_kinds_or_agents() {
|
||||||
|
let q = RebuildQueue::new();
|
||||||
|
let a = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"r".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let b = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-b".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"r".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let c = q.enqueue(
|
||||||
|
QueueKind::Spawn,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"s".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
assert_ne!(a, b);
|
||||||
|
assert_ne!(a, c);
|
||||||
|
assert_eq!(q.snapshot().len(), 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dedup_skips_running_entries() {
|
||||||
|
let q = RebuildQueue::new();
|
||||||
|
q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"first".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let running = q.take_next().expect("queued");
|
||||||
|
assert_eq!(running.state, QueueState::Running);
|
||||||
|
// While the original is running, re-enqueue is legitimate.
|
||||||
|
let again = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"config bumped during build".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
assert_ne!(running.id, again);
|
||||||
|
let snap = q.snapshot();
|
||||||
|
assert_eq!(snap.len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn finish_marks_state_and_keeps_history() {
|
||||||
|
let q = RebuildQueue::new();
|
||||||
|
let id = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"r".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
q.take_next();
|
||||||
|
q.finish(id, QueueState::Done, None);
|
||||||
|
let snap = q.snapshot();
|
||||||
|
assert_eq!(snap.len(), 1);
|
||||||
|
assert_eq!(snap[0].state, QueueState::Done);
|
||||||
|
assert!(snap[0].finished_at.is_some());
|
||||||
|
assert!(snap[0].error.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn finish_with_failure_records_error() {
|
||||||
|
let q = RebuildQueue::new();
|
||||||
|
let id = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"r".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
q.take_next();
|
||||||
|
q.finish(id, QueueState::Failed, Some("nix build failed".to_owned()));
|
||||||
|
let snap = q.snapshot();
|
||||||
|
assert_eq!(snap[0].state, QueueState::Failed);
|
||||||
|
assert_eq!(snap[0].error.as_deref(), Some("nix build failed"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn history_evicts_old_terminals_per_kind() {
|
||||||
|
let q = RebuildQueue::new();
|
||||||
|
for i in 0..(MAX_HISTORY_PER_KIND + 3) {
|
||||||
|
let id = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
format!("agent-{i}"),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"r".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
q.take_next();
|
||||||
|
q.finish(id, QueueState::Done, None);
|
||||||
|
}
|
||||||
|
let snap = q.snapshot();
|
||||||
|
assert_eq!(snap.len(), MAX_HISTORY_PER_KIND);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cancel_clears_queued_entry() {
|
||||||
|
let q = RebuildQueue::new();
|
||||||
|
let id = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"r".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
assert!(q.cancel(id));
|
||||||
|
let snap = q.snapshot();
|
||||||
|
assert_eq!(snap[0].state, QueueState::Cancelled);
|
||||||
|
assert!(q.take_next().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cancel_refuses_running_entry() {
|
||||||
|
let q = RebuildQueue::new();
|
||||||
|
let id = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"r".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
q.take_next();
|
||||||
|
assert!(!q.cancel(id));
|
||||||
|
let snap = q.snapshot();
|
||||||
|
assert_eq!(snap[0].state, QueueState::Running);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parent_id_groups_cascade() {
|
||||||
|
let q = RebuildQueue::new();
|
||||||
|
let meta = q.enqueue(
|
||||||
|
QueueKind::MetaUpdate,
|
||||||
|
"hyperhive".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"lock bump".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let child = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::MetaUpdate,
|
||||||
|
"cascade".to_owned(),
|
||||||
|
Some(meta),
|
||||||
|
);
|
||||||
|
let snap = q.snapshot();
|
||||||
|
let child_entry = snap.iter().find(|e| e.id == child).expect("child queued");
|
||||||
|
assert_eq!(child_entry.parent_id, Some(meta));
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue