rebuild_queue: pre-enqueue meta-update cascade at submit time (closes #347)
This commit is contained in:
parent
4c425ede69
commit
d81b430136
2 changed files with 166 additions and 28 deletions
|
|
@ -1578,14 +1578,30 @@ async fn post_meta_update(
|
||||||
if inputs.is_empty() {
|
if inputs.is_empty() {
|
||||||
return error_response("meta-update: no inputs selected");
|
return error_response("meta-update: no inputs selected");
|
||||||
}
|
}
|
||||||
state.coord.rebuild_queue.enqueue_with_inputs(
|
let inputs_label = inputs.join(", ");
|
||||||
|
let parent_id = state.coord.rebuild_queue.enqueue_with_inputs(
|
||||||
crate::rebuild_queue::QueueKind::MetaUpdate,
|
crate::rebuild_queue::QueueKind::MetaUpdate,
|
||||||
"hyperhive".to_owned(),
|
"hyperhive".to_owned(),
|
||||||
crate::rebuild_queue::QueueSource::Manual,
|
crate::rebuild_queue::QueueSource::Manual,
|
||||||
format!("meta-update via dashboard ({})", inputs.join(", ")),
|
format!("meta-update via dashboard ({inputs_label})"),
|
||||||
None,
|
None,
|
||||||
inputs,
|
inputs.clone(),
|
||||||
);
|
);
|
||||||
|
// Pre-enqueue cascade rebuilds NOW so they're visible in the queue
|
||||||
|
// alongside the parent (issue #347). The worker's MetaUpdate arm
|
||||||
|
// no longer enqueues children — it just runs the lock bump and
|
||||||
|
// (on failure) cancels these pre-queued children.
|
||||||
|
let cascade_agents = crate::rebuild_queue::meta_update_cascade_agents(&inputs).await;
|
||||||
|
let cascade_reason = format!("meta-update cascade ({inputs_label})");
|
||||||
|
for name in cascade_agents {
|
||||||
|
state.coord.rebuild_queue.enqueue(
|
||||||
|
crate::rebuild_queue::QueueKind::Rebuild,
|
||||||
|
name,
|
||||||
|
crate::rebuild_queue::QueueSource::MetaUpdate,
|
||||||
|
cascade_reason.clone(),
|
||||||
|
Some(parent_id),
|
||||||
|
);
|
||||||
|
}
|
||||||
state.coord.emit_rebuild_queue_snapshot();
|
state.coord.emit_rebuild_queue_snapshot();
|
||||||
(StatusCode::OK, "ok").into_response()
|
(StatusCode::OK, "ok").into_response()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -317,6 +317,29 @@ impl RebuildQueue {
|
||||||
inner.entries.iter().cloned().collect()
|
inner.entries.iter().cloned().collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Cancel every `Queued` entry whose `parent_id` matches `parent`.
|
||||||
|
/// Used when a `MetaUpdate` parent fails its lock bump — the
|
||||||
|
/// cascade rebuilds the enqueuer pre-queued no longer apply
|
||||||
|
/// (nothing actually changed, so they'd be wasted work). Running
|
||||||
|
/// children are left alone — they were started under the parent's
|
||||||
|
/// assumption and can't be cleanly aborted from the queue side.
|
||||||
|
/// Returns the count of cancelled entries.
|
||||||
|
pub fn cancel_children(&self, parent: u64) -> usize {
|
||||||
|
let mut inner = self.inner.lock().expect("rebuild_queue mutex poisoned");
|
||||||
|
let mut count = 0;
|
||||||
|
for entry in inner.entries.iter_mut() {
|
||||||
|
if entry.parent_id == Some(parent) && entry.state == QueueState::Queued {
|
||||||
|
entry.state = QueueState::Cancelled;
|
||||||
|
entry.finished_at = Some(now_unix());
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if count > 0 {
|
||||||
|
Self::trim_history(&mut inner);
|
||||||
|
}
|
||||||
|
count
|
||||||
|
}
|
||||||
|
|
||||||
/// Cancel a `Queued` entry (no-op for `Running` / terminal — the
|
/// Cancel a `Queued` entry (no-op for `Running` / terminal — the
|
||||||
/// in-flight rebuild owns the agent's nix store and can't be
|
/// in-flight rebuild owns the agent's nix store and can't be
|
||||||
/// safely interrupted). Returns true when an entry was cancelled.
|
/// safely interrupted). Returns true when an entry was cancelled.
|
||||||
|
|
@ -468,15 +491,44 @@ async fn run_meta_update(
|
||||||
let _progress = coord.meta_update_guard();
|
let _progress = coord.meta_update_guard();
|
||||||
let inputs = entry.inputs.clone();
|
let inputs = entry.inputs.clone();
|
||||||
tracing::info!(?inputs, parent = entry.id, "rebuild_queue: meta-update starting");
|
tracing::info!(?inputs, parent = entry.id, "rebuild_queue: meta-update starting");
|
||||||
if inputs.is_empty() {
|
let result = if inputs.is_empty() {
|
||||||
crate::meta::lock_update(&[]).await?;
|
crate::meta::lock_update(&[]).await
|
||||||
} else {
|
} else {
|
||||||
crate::meta::lock_update(&inputs).await?;
|
crate::meta::lock_update(&inputs).await
|
||||||
|
};
|
||||||
|
if let Err(e) = result {
|
||||||
|
// Lock bump failed — cancel any pending cascade rebuilds the
|
||||||
|
// enqueuer pre-queued for this MetaUpdate. Their parent_id
|
||||||
|
// matches this entry; the children no longer make sense (we
|
||||||
|
// never bumped the lock that justified them).
|
||||||
|
let cancelled = coord.rebuild_queue.cancel_children(entry.id);
|
||||||
|
if cancelled > 0 {
|
||||||
|
tracing::warn!(
|
||||||
|
cancelled,
|
||||||
|
parent = entry.id,
|
||||||
|
"rebuild_queue: meta-update failed; cancelled cascade rebuilds"
|
||||||
|
);
|
||||||
|
coord.emit_rebuild_queue_snapshot();
|
||||||
}
|
}
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
// Lock file changed — meta-inputs panel re-renders. The cascade
|
||||||
|
// rebuilds were already enqueued at MetaUpdate submission time,
|
||||||
|
// so no further enqueue is needed here.
|
||||||
|
crate::dashboard::emit_meta_inputs_snapshot(coord.as_ref());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// Decide which agents to rebuild. Same logic as the previous
|
/// Compute which agents a `nix flake update <inputs>` on the meta
|
||||||
// `run_meta_update` — anything in the hyperhive subtree affects
|
/// flake would affect. Used by callers that pre-enqueue cascade
|
||||||
// every agent; anything in `agent-<n>/...` only the named agent.
|
/// `Rebuild` entries at MetaUpdate submission time (issue #347) so the
|
||||||
|
/// dashboard can render the dependent work alongside its parent before
|
||||||
|
/// the lock bump actually runs.
|
||||||
|
///
|
||||||
|
/// Mirrors `run_meta_update`'s post-bump fan-out logic. Empty `inputs`
|
||||||
|
/// or any input under `hyperhive` → every container; otherwise just
|
||||||
|
/// the agents named by `agent-<name>` inputs.
|
||||||
|
pub async fn meta_update_cascade_agents(inputs: &[String]) -> Vec<String> {
|
||||||
let touched_hyperhive = inputs
|
let touched_hyperhive = inputs
|
||||||
.iter()
|
.iter()
|
||||||
.any(|i| i == "hyperhive" || i.starts_with("hyperhive/"));
|
.any(|i| i == "hyperhive" || i.starts_with("hyperhive/"));
|
||||||
|
|
@ -485,7 +537,7 @@ async fn run_meta_update(
|
||||||
.filter_map(|i| i.strip_prefix("agent-"))
|
.filter_map(|i| i.strip_prefix("agent-"))
|
||||||
.map(|rest| rest.split('/').next().unwrap_or(rest).to_owned())
|
.map(|rest| rest.split('/').next().unwrap_or(rest).to_owned())
|
||||||
.collect();
|
.collect();
|
||||||
let agents_to_rebuild: Vec<String> = if touched_hyperhive || inputs.is_empty() {
|
if touched_hyperhive || inputs.is_empty() {
|
||||||
crate::lifecycle::list()
|
crate::lifecycle::list()
|
||||||
.await
|
.await
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
|
|
@ -500,25 +552,7 @@ async fn run_meta_update(
|
||||||
.collect()
|
.collect()
|
||||||
} else {
|
} else {
|
||||||
touched_agents
|
touched_agents
|
||||||
};
|
|
||||||
|
|
||||||
let reason_hint = if inputs.is_empty() {
|
|
||||||
"meta-update cascade (all inputs)".to_owned()
|
|
||||||
} else {
|
|
||||||
format!("meta-update cascade ({})", inputs.join(", "))
|
|
||||||
};
|
|
||||||
for name in agents_to_rebuild {
|
|
||||||
coord.rebuild_queue.enqueue(
|
|
||||||
QueueKind::Rebuild,
|
|
||||||
name,
|
|
||||||
QueueSource::MetaUpdate,
|
|
||||||
reason_hint.clone(),
|
|
||||||
Some(entry.id),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
// Lock file changed — meta-inputs panel re-renders.
|
|
||||||
crate::dashboard::emit_meta_inputs_snapshot(coord.as_ref());
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Current unix timestamp in seconds. `now()` calls are pulled into a
|
/// Current unix timestamp in seconds. `now()` calls are pulled into a
|
||||||
|
|
@ -746,4 +780,92 @@ mod tests {
|
||||||
let child_entry = snap.iter().find(|e| e.id == child).expect("child queued");
|
let child_entry = snap.iter().find(|e| e.id == child).expect("child queued");
|
||||||
assert_eq!(child_entry.parent_id, Some(meta));
|
assert_eq!(child_entry.parent_id, Some(meta));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cancel_children_marks_queued_descendants() {
|
||||||
|
let q = RebuildQueue::new();
|
||||||
|
let meta = q.enqueue(
|
||||||
|
QueueKind::MetaUpdate,
|
||||||
|
"hyperhive".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"lock bump".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let a = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::MetaUpdate,
|
||||||
|
"cascade".to_owned(),
|
||||||
|
Some(meta),
|
||||||
|
);
|
||||||
|
let b = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-b".to_owned(),
|
||||||
|
QueueSource::MetaUpdate,
|
||||||
|
"cascade".to_owned(),
|
||||||
|
Some(meta),
|
||||||
|
);
|
||||||
|
// An unrelated queued entry must not be cancelled.
|
||||||
|
let c = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-c".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"operator queued".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let cancelled = q.cancel_children(meta);
|
||||||
|
assert_eq!(cancelled, 2);
|
||||||
|
let snap = q.snapshot();
|
||||||
|
let find = |id: u64| snap.iter().find(|e| e.id == id).expect("present");
|
||||||
|
assert_eq!(find(a).state, QueueState::Cancelled);
|
||||||
|
assert_eq!(find(b).state, QueueState::Cancelled);
|
||||||
|
assert_eq!(find(c).state, QueueState::Queued);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cancel_children_skips_running_and_terminal() {
|
||||||
|
let q = RebuildQueue::new();
|
||||||
|
let meta = q.enqueue(
|
||||||
|
QueueKind::MetaUpdate,
|
||||||
|
"hyperhive".to_owned(),
|
||||||
|
QueueSource::Manual,
|
||||||
|
"lock bump".to_owned(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
// Running child — must NOT be cancelled.
|
||||||
|
let running = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-a".to_owned(),
|
||||||
|
QueueSource::MetaUpdate,
|
||||||
|
"cascade".to_owned(),
|
||||||
|
Some(meta),
|
||||||
|
);
|
||||||
|
q.take_next(); // pops meta, marks it Running
|
||||||
|
q.take_next(); // pops `running`, marks it Running
|
||||||
|
// Terminal child — must NOT be re-cancelled (its state stays Done).
|
||||||
|
let done = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-b".to_owned(),
|
||||||
|
QueueSource::MetaUpdate,
|
||||||
|
"cascade".to_owned(),
|
||||||
|
Some(meta),
|
||||||
|
);
|
||||||
|
q.take_next();
|
||||||
|
q.finish(done, QueueState::Done, None);
|
||||||
|
// Queued child that should be cancelled.
|
||||||
|
let queued = q.enqueue(
|
||||||
|
QueueKind::Rebuild,
|
||||||
|
"agent-c".to_owned(),
|
||||||
|
QueueSource::MetaUpdate,
|
||||||
|
"cascade".to_owned(),
|
||||||
|
Some(meta),
|
||||||
|
);
|
||||||
|
let n = q.cancel_children(meta);
|
||||||
|
assert_eq!(n, 1);
|
||||||
|
let snap = q.snapshot();
|
||||||
|
let find = |id: u64| snap.iter().find(|e| e.id == id).expect("present");
|
||||||
|
assert_eq!(find(running).state, QueueState::Running);
|
||||||
|
assert_eq!(find(done).state, QueueState::Done);
|
||||||
|
assert_eq!(find(queued).state, QueueState::Cancelled);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue