diff --git a/hive-c0re/src/coordinator.rs b/hive-c0re/src/coordinator.rs index d4d41ff..357e9b4 100644 --- a/hive-c0re/src/coordinator.rs +++ b/hive-c0re/src/coordinator.rs @@ -88,6 +88,12 @@ pub struct Coordinator { /// tokio mutex so the rescan can `await` `lifecycle::list` / /// `is_running` without blocking other coordinator paths. last_containers: tokio::sync::Mutex>, + /// Global rebuild queue. Every long-running container/meta op + /// (rebuild, meta-update, first-spawn) goes through this queue so + /// hive-c0re runs at most one at a time and the dashboard can + /// render a single ordered view of pending + running work. See + /// `rebuild_queue.rs` for the dedup rules + history retention. + pub rebuild_queue: Arc, /// Shutdown signal broadcast to all background tasks. Sending /// `true` asks every loop to exit after its current work item. /// Use `shutdown_rx()` to subscribe; `request_shutdown()` to fire. @@ -202,10 +208,23 @@ impl Coordinator { event_seq: AtomicU64::new(0), meta_updates_active: AtomicU64::new(0), last_containers: tokio::sync::Mutex::new(HashMap::new()), + rebuild_queue: Arc::new(crate::rebuild_queue::RebuildQueue::new()), shutdown_tx, }) } + /// Emit a `RebuildQueueChanged` snapshot event. Called from the + /// queue mutation helpers (`enqueue` / `finish` / `cancel`-adjacent + /// wrappers below) and the worker so every state transition + /// surfaces on the dashboard without extra plumbing. + pub fn emit_rebuild_queue_snapshot(self: &Arc) { + let queue = self.rebuild_queue.snapshot(); + self.emit_dashboard_event(DashboardEvent::RebuildQueueChanged { + seq: self.next_seq(), + queue, + }); + } + /// Subscribe to the shutdown watch channel. Background tasks call /// this at spawn time and break their loop when the receiver /// transitions to `true` (via `Coordinator::request_shutdown`). diff --git a/hive-c0re/src/dashboard_events.rs b/hive-c0re/src/dashboard_events.rs index f82814d..fcea9c7 100644 --- a/hive-c0re/src/dashboard_events.rs +++ b/hive-c0re/src/dashboard_events.rs @@ -27,6 +27,7 @@ use serde::Serialize; use crate::container_view::ContainerView; use crate::dashboard::{MetaInputView, TombstoneView}; +use crate::rebuild_queue::QueueEntry; #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "snake_case", tag = "kind")] @@ -204,4 +205,16 @@ pub enum DashboardEvent { /// when the active-run count crosses 0, so concurrent updates flip /// the flag exactly once. MetaUpdateRunning { seq: u64, running: bool }, + /// Full snapshot of the rebuild queue (`hive-c0re::rebuild_queue`) + /// — every entry, in enqueue order, including the few most-recent + /// terminal entries the queue retains for history. Same + /// snapshot-shape rationale as `TombstonesChanged` / + /// `MetaInputsChanged`: the list is small, snapshot semantics avoid + /// the add/remove races a per-row event would have, and the + /// dashboard's grouping (parent_id) is most naturally re-derived + /// from the full list. + RebuildQueueChanged { + seq: u64, + queue: Vec, + }, } diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 39794a7..5cd24a3 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -233,6 +233,18 @@ async fn cmd_serve( // Reminder scheduler: drains due reminders + handles // file_path payload persistence. See reminder_scheduler.rs. reminder_scheduler::spawn(coord.clone()); + // Rebuild-queue worker: drains the global rebuild/meta-update/ + // spawn queue FIFO so hive-c0re never runs two heavyweight + // container ops concurrently. Existing rebuild call sites + // (auto_update, dashboard, manager, approval handler) enqueue + // here instead of awaiting `rebuild_agent` inline. See + // `rebuild_queue.rs`. + { + let q_coord = coord.clone(); + tokio::spawn(async move { + rebuild_queue::run_worker(q_coord).await; + }); + } // Forward every broker event onto the unified dashboard // channel with a freshly-stamped seq, so the dashboard SSE // sees broker messages + future mutation events on one diff --git a/hive-c0re/src/rebuild_queue.rs b/hive-c0re/src/rebuild_queue.rs index 8bc8943..3515cb0 100644 --- a/hive-c0re/src/rebuild_queue.rs +++ b/hive-c0re/src/rebuild_queue.rs @@ -169,6 +169,12 @@ pub struct QueueEntry { /// string (already truncated to a reasonable length by the caller). #[serde(default, skip_serializing_if = "Option::is_none")] pub error: Option, + /// `MetaUpdate`-only payload: the list of meta flake inputs to run + /// through `nix flake update`. Empty / absent on `Rebuild` / + /// `Spawn` / `Destroy` entries; absent on the wire (never + /// serialised) when the entry kind doesn't have meaningful inputs. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub inputs: Vec, } /// How many terminal-state entries (`Done` / `Failed` / `Cancelled`) @@ -226,6 +232,21 @@ impl RebuildQueue { source: QueueSource, reason: String, parent_id: Option, + ) -> u64 { + self.enqueue_with_inputs(kind, agent, source, reason, parent_id, Vec::new()) + } + + /// Same as `enqueue` but carries an `inputs` payload — used by + /// `MetaUpdate` enqueues to tell the worker which meta-flake + /// inputs to bump. + pub fn enqueue_with_inputs( + &self, + kind: QueueKind, + agent: String, + source: QueueSource, + reason: String, + parent_id: Option, + inputs: Vec, ) -> u64 { let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned"); // Dedup against a pending entry with the same (kind, agent). @@ -251,6 +272,7 @@ impl RebuildQueue { started_at: None, finished_at: None, error: None, + inputs, }; inner.entries.push_back(entry); // Wake the worker. `notify_one` is a no-op when there's no @@ -336,6 +358,169 @@ impl RebuildQueue { } } +/// Background worker that drains the queue. Spawned once at hive-c0re +/// startup from `main.rs`. Loops forever: +/// 1. Pop the next `Queued` entry (`take_next` marks it `Running` and +/// fires a `RebuildQueueChanged` snapshot via the caller). +/// 2. Dispatch by kind — single-agent rebuild, meta-update + cascade, +/// or first-spawn. +/// 3. Mark the entry terminal (`finish`) and emit another snapshot. +/// 4. When the queue is empty, `await` on `notify` until something +/// new lands. +/// +/// Shutdown semantics: subscribes to `coord.shutdown_rx()`. On a true +/// signal the worker exits after its current entry finishes; pending +/// `Queued` entries are dropped (they'll either be replayed by the +/// startup sweep on next boot or left for an operator to re-queue). +pub async fn run_worker(coord: std::sync::Arc) { + let mut shutdown = coord.shutdown_rx(); + loop { + // Drain everything available now. + while let Some(entry) = coord.rebuild_queue.take_next() { + coord.emit_rebuild_queue_snapshot(); + tracing::info!( + id = entry.id, + kind = entry.kind.as_str(), + agent = %entry.agent, + source = entry.source.as_str(), + "rebuild_queue: running" + ); + let result = dispatch(&coord, &entry).await; + match result { + Ok(()) => { + coord.rebuild_queue.finish(entry.id, QueueState::Done, None); + tracing::info!(id = entry.id, "rebuild_queue: done"); + } + Err(e) => { + let msg = format!("{e:#}"); + let truncated = if msg.len() > 2_000 { + format!("{}…", &msg[..2_000]) + } else { + msg.clone() + }; + coord + .rebuild_queue + .finish(entry.id, QueueState::Failed, Some(truncated)); + tracing::warn!(id = entry.id, error = %msg, "rebuild_queue: failed"); + } + } + coord.emit_rebuild_queue_snapshot(); + } + // Park until something new is enqueued OR shutdown fires. + tokio::select! { + biased; + res = shutdown.changed() => { + if res.is_err() || *shutdown.borrow() { + tracing::info!("rebuild_queue: worker exiting on shutdown"); + return; + } + } + _ = coord.rebuild_queue.notify.notified() => { + // New entry — back to the drain loop. + } + } + } +} + +/// Run a single queue entry to completion. Kind-dispatched; failures +/// bubble up to the worker which marks the entry `Failed`. +async fn dispatch( + coord: &std::sync::Arc, + entry: &QueueEntry, +) -> anyhow::Result<()> { + match entry.kind { + QueueKind::Rebuild => { + let current_rev = crate::auto_update::current_flake_rev(&coord.hyperhive_flake) + .unwrap_or_default(); + crate::auto_update::rebuild_agent(coord, &entry.agent, ¤t_rev).await + } + QueueKind::MetaUpdate => run_meta_update(coord, entry).await, + QueueKind::Spawn => { + // First-deploy spawns route through `actions::approve_spawn` + // / `actions::approve_apply_commit` today; they enqueue a + // Spawn entry only to claim the queue slot, the actual + // spawn work runs inside those handlers before completion. + // Keeping this arm a no-op so we don't double-run. + tracing::debug!( + id = entry.id, + agent = %entry.agent, + "rebuild_queue: Spawn entry is a queue claim; actual work elsewhere" + ); + Ok(()) + } + QueueKind::Destroy => { + // Reserved for future `destroy --purge` integration. + anyhow::bail!("Destroy kind not yet implemented in rebuild_queue worker"); + } + } +} + +/// Run one `MetaUpdate` entry: bump the meta flake's locks for the +/// requested inputs, then enqueue a cascade of `Rebuild` entries +/// (with `parent_id` set to this entry's id) for every agent affected +/// by the bump. Mirrors the previous `dashboard::run_meta_update` +/// semantics; that path now enqueues into this queue rather than +/// running the bump + rebuild loop inline. +async fn run_meta_update( + coord: &std::sync::Arc, + entry: &QueueEntry, +) -> anyhow::Result<()> { + let _progress = coord.meta_update_guard(); + let inputs = entry.inputs.clone(); + tracing::info!(?inputs, parent = entry.id, "rebuild_queue: meta-update starting"); + if inputs.is_empty() { + crate::meta::lock_update(&[]).await?; + } else { + crate::meta::lock_update(&inputs).await?; + } + + // Decide which agents to rebuild. Same logic as the previous + // `run_meta_update` — anything in the hyperhive subtree affects + // every agent; anything in `agent-/...` only the named agent. + let touched_hyperhive = inputs + .iter() + .any(|i| i == "hyperhive" || i.starts_with("hyperhive/")); + let touched_agents: Vec = inputs + .iter() + .filter_map(|i| i.strip_prefix("agent-")) + .map(|rest| rest.split('/').next().unwrap_or(rest).to_owned()) + .collect(); + let agents_to_rebuild: Vec = if touched_hyperhive || inputs.is_empty() { + crate::lifecycle::list() + .await + .unwrap_or_default() + .into_iter() + .filter_map(|c| { + if c == crate::lifecycle::MANAGER_NAME { + Some(crate::lifecycle::MANAGER_NAME.to_owned()) + } else { + c.strip_prefix(crate::lifecycle::AGENT_PREFIX).map(str::to_owned) + } + }) + .collect() + } else { + touched_agents + }; + + let reason_hint = if inputs.is_empty() { + "meta-update cascade (all inputs)".to_owned() + } else { + format!("meta-update cascade ({})", inputs.join(", ")) + }; + for name in agents_to_rebuild { + coord.rebuild_queue.enqueue( + QueueKind::Rebuild, + name, + QueueSource::MetaUpdate, + reason_hint.clone(), + Some(entry.id), + ); + } + // Lock file changed — meta-inputs panel re-renders. + crate::dashboard::emit_meta_inputs_snapshot(coord.as_ref()); + Ok(()) +} + /// Current unix timestamp in seconds. `now()` calls are pulled into a /// helper so tests can swap them out later. fn now_unix() -> i64 {