fix: transient state leak via RAII guard
bare set_transient/clear_transient pairs leak the in-memory transient on task cancellation, panics, or any early return between the two calls — dashboard then shows the agent stuck in 'rebuilding…' forever (coder hit this today). add Coordinator::transient_guard returning a TransientGuard whose Drop clears, and convert every caller (dashboard lifecycle_action, auto_update::rebuild_agent, manager_server Update, actions::destroy, actions Spawn task, migrate phase 4). destroy() now takes &Arc<Coordinator> so it can hold a guard. existing stuck transients clear on next hive-c0re restart since transient state is in-memory only.
This commit is contained in:
parent
1a36c38a54
commit
313121a6e9
6 changed files with 56 additions and 18 deletions
|
|
@ -55,10 +55,14 @@ pub async fn approve(coord: Arc<Coordinator>, id: i64) -> Result<()> {
|
||||||
ApprovalKind::Spawn => {
|
ApprovalKind::Spawn => {
|
||||||
// Run the spawn in the background so the approve POST returns
|
// Run the spawn in the background so the approve POST returns
|
||||||
// immediately. The dashboard reads `transient` to render a spinner.
|
// immediately. The dashboard reads `transient` to render a spinner.
|
||||||
coord.set_transient(&approval.agent, TransientKind::Spawning);
|
// Guard is created synchronously here (so the spinner appears
|
||||||
|
// the moment the operator clicks approve) and moved into the
|
||||||
|
// task; it auto-clears even if the runtime drops the task.
|
||||||
let coord_bg = coord.clone();
|
let coord_bg = coord.clone();
|
||||||
let approval_bg = approval.clone();
|
let approval_bg = approval.clone();
|
||||||
|
let guard = coord_bg.transient_guard(&approval_bg.agent, TransientKind::Spawning);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
let guard = guard;
|
||||||
let agent_bg = approval_bg.agent.clone();
|
let agent_bg = approval_bg.agent.clone();
|
||||||
let result = lifecycle::spawn(
|
let result = lifecycle::spawn(
|
||||||
&approval_bg.agent,
|
&approval_bg.agent,
|
||||||
|
|
@ -72,7 +76,7 @@ pub async fn approve(coord: Arc<Coordinator>, id: i64) -> Result<()> {
|
||||||
&coord_bg.operator_pronouns,
|
&coord_bg.operator_pronouns,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
coord_bg.clear_transient(&agent_bg);
|
drop(guard);
|
||||||
if let Err(e) = finish_approval(&coord_bg, &approval_bg, result, None) {
|
if let Err(e) = finish_approval(&coord_bg, &approval_bg, result, None) {
|
||||||
tracing::warn!(agent = %agent_bg, error = ?e, "spawn approval failed");
|
tracing::warn!(agent = %agent_bg, error = ?e, "spawn approval failed");
|
||||||
}
|
}
|
||||||
|
|
@ -285,17 +289,15 @@ async fn run_apply_commit(
|
||||||
/// anyway. With `purge=true` the persistent trees are also wiped — config
|
/// anyway. With `purge=true` the persistent trees are also wiped — config
|
||||||
/// history, claude creds, notes — there is no undo.
|
/// history, claude creds, notes — there is no undo.
|
||||||
/// Refuses the manager (declarative; would fight with the host's nixos config).
|
/// Refuses the manager (declarative; would fight with the host's nixos config).
|
||||||
pub async fn destroy(coord: &Coordinator, name: &str, purge: bool) -> Result<()> {
|
pub async fn destroy(coord: &Arc<Coordinator>, name: &str, purge: bool) -> Result<()> {
|
||||||
if name == MANAGER_NAME || name == MANAGER_AGENT {
|
if name == MANAGER_NAME || name == MANAGER_AGENT {
|
||||||
bail!("refusing to destroy the manager ({name})");
|
bail!("refusing to destroy the manager ({name})");
|
||||||
}
|
}
|
||||||
tracing::info!(%name, purge, "destroy");
|
tracing::info!(%name, purge, "destroy");
|
||||||
coord.set_transient(name, TransientKind::Destroying);
|
// Guard auto-clears on the success path's final scope exit and on
|
||||||
let result = lifecycle::destroy(name).await;
|
// every early-return / cancellation along the way.
|
||||||
if result.is_err() {
|
let _guard = coord.transient_guard(name, TransientKind::Destroying);
|
||||||
coord.clear_transient(name);
|
lifecycle::destroy(name).await?;
|
||||||
}
|
|
||||||
result?;
|
|
||||||
coord.unregister_agent(name);
|
coord.unregister_agent(name);
|
||||||
let runtime = Coordinator::agent_dir(name);
|
let runtime = Coordinator::agent_dir(name);
|
||||||
if runtime.exists() {
|
if runtime.exists() {
|
||||||
|
|
@ -329,7 +331,7 @@ pub async fn destroy(coord: &Coordinator, name: &str, purge: bool) -> Result<()>
|
||||||
"agent destroyed"
|
"agent destroyed"
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
coord.clear_transient(name);
|
drop(_guard);
|
||||||
coord.notify_manager(&HelperEvent::Destroyed {
|
coord.notify_manager(&HelperEvent::Destroyed {
|
||||||
agent: name.to_owned(),
|
agent: name.to_owned(),
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -63,7 +63,7 @@ pub async fn rebuild_agent(coord: &Arc<Coordinator>, name: &str, current_rev: &s
|
||||||
// lifecycle::rebuild. Dashboard rebuilds already do this via
|
// lifecycle::rebuild. Dashboard rebuilds already do this via
|
||||||
// lifecycle_action; this catches the auto-update scan + any
|
// lifecycle_action; this catches the auto-update scan + any
|
||||||
// other direct caller.
|
// other direct caller.
|
||||||
coord.set_transient(name, crate::coordinator::TransientKind::Rebuilding);
|
let _guard = coord.transient_guard(name, crate::coordinator::TransientKind::Rebuilding);
|
||||||
let result = lifecycle::rebuild(
|
let result = lifecycle::rebuild(
|
||||||
name,
|
name,
|
||||||
&coord.hyperhive_flake,
|
&coord.hyperhive_flake,
|
||||||
|
|
@ -75,7 +75,7 @@ pub async fn rebuild_agent(coord: &Arc<Coordinator>, name: &str, current_rev: &s
|
||||||
&coord.operator_pronouns,
|
&coord.operator_pronouns,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
coord.clear_transient(name);
|
drop(_guard);
|
||||||
match &result {
|
match &result {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
if let Err(e) = std::fs::write(rev_marker_path(name), current_rev) {
|
if let Err(e) = std::fs::write(rev_marker_path(name), current_rev) {
|
||||||
|
|
|
||||||
|
|
@ -57,6 +57,21 @@ pub struct TransientState {
|
||||||
pub since: std::time::Instant,
|
pub since: std::time::Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// RAII handle returned by `Coordinator::transient_guard`. Cleared on
|
||||||
|
/// drop — including drop-via-cancellation, the path that bare
|
||||||
|
/// `set_transient` / `clear_transient` pairs leaked through. Holds an
|
||||||
|
/// `Arc<Coordinator>` so the guard is freely returnable / movable.
|
||||||
|
pub struct TransientGuard {
|
||||||
|
coord: Arc<Coordinator>,
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for TransientGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.coord.clear_transient(&self.name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
pub enum TransientKind {
|
pub enum TransientKind {
|
||||||
/// `lifecycle::spawn` is running (nixos-container create + update + start).
|
/// `lifecycle::spawn` is running (nixos-container create + update + start).
|
||||||
|
|
@ -122,6 +137,13 @@ impl Coordinator {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark an agent as in-progress (only one state per agent for now).
|
/// Mark an agent as in-progress (only one state per agent for now).
|
||||||
|
///
|
||||||
|
/// Prefer `transient_guard` when possible — it auto-clears on drop
|
||||||
|
/// even if the surrounding future is cancelled (HTTP request
|
||||||
|
/// aborted, runtime shutdown mid-rebuild, panic between set and
|
||||||
|
/// clear). The bare `set_transient` / `clear_transient` pair leaks
|
||||||
|
/// the transient on any of those paths and the dashboard then
|
||||||
|
/// shows the agent stuck in "rebuilding…" forever.
|
||||||
pub fn set_transient(&self, name: &str, kind: TransientKind) {
|
pub fn set_transient(&self, name: &str, kind: TransientKind) {
|
||||||
self.transient.lock().unwrap().insert(
|
self.transient.lock().unwrap().insert(
|
||||||
name.to_owned(),
|
name.to_owned(),
|
||||||
|
|
@ -136,6 +158,19 @@ impl Coordinator {
|
||||||
self.transient.lock().unwrap().remove(name);
|
self.transient.lock().unwrap().remove(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set a transient state and return a guard that clears it on drop.
|
||||||
|
/// Use this from any path where the surrounding future could be
|
||||||
|
/// cancelled or panic between set and clear (HTTP handlers, spawned
|
||||||
|
/// tasks). The guard's `Drop` runs even on task cancellation, so
|
||||||
|
/// the dashboard's spinner can't get pinned forever.
|
||||||
|
pub fn transient_guard(self: &Arc<Self>, name: &str, kind: TransientKind) -> TransientGuard {
|
||||||
|
self.set_transient(name, kind);
|
||||||
|
TransientGuard {
|
||||||
|
coord: self.clone(),
|
||||||
|
name: name.to_owned(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn transient_snapshot(&self) -> HashMap<String, TransientState> {
|
pub fn transient_snapshot(&self) -> HashMap<String, TransientState> {
|
||||||
self.transient.lock().unwrap().clone()
|
self.transient.lock().unwrap().clone()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1125,9 +1125,9 @@ where
|
||||||
Fut: std::future::Future<Output = anyhow::Result<()>>,
|
Fut: std::future::Future<Output = anyhow::Result<()>>,
|
||||||
{
|
{
|
||||||
let logical = strip_container_prefix(name);
|
let logical = strip_container_prefix(name);
|
||||||
state.coord.set_transient(&logical, kind);
|
let _guard = state.coord.transient_guard(&logical, kind);
|
||||||
let result = body(logical.clone()).await;
|
let result = body(logical.clone()).await;
|
||||||
state.coord.clear_transient(&logical);
|
drop(_guard);
|
||||||
match result {
|
match result {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
extra(state, &logical);
|
extra(state, &logical);
|
||||||
|
|
|
||||||
|
|
@ -228,9 +228,10 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResp
|
||||||
message: "update: hyperhive_flake has no canonical path".into(),
|
message: "update: hyperhive_flake has no canonical path".into(),
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
coord.set_transient(name, crate::coordinator::TransientKind::Rebuilding);
|
let _guard =
|
||||||
|
coord.transient_guard(name, crate::coordinator::TransientKind::Rebuilding);
|
||||||
let result = crate::auto_update::rebuild_agent(coord, name, ¤t_rev).await;
|
let result = crate::auto_update::rebuild_agent(coord, name, ¤t_rev).await;
|
||||||
coord.clear_transient(name);
|
drop(_guard);
|
||||||
match result {
|
match result {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
coord.kick_agent(name, "container rebuilt");
|
coord.kick_agent(name, "container rebuilt");
|
||||||
|
|
|
||||||
|
|
@ -101,9 +101,9 @@ pub async fn run(coord: &Arc<Coordinator>) -> Result<()> {
|
||||||
// update activation triggers. Without this, crash_watch
|
// update activation triggers. Without this, crash_watch
|
||||||
// would fire ContainerCrash for every agent here and the
|
// would fire ContainerCrash for every agent here and the
|
||||||
// manager would spuriously try to recover them.
|
// manager would spuriously try to recover them.
|
||||||
coord.set_transient(name, crate::coordinator::TransientKind::Rebuilding);
|
let _guard = coord.transient_guard(name, crate::coordinator::TransientKind::Rebuilding);
|
||||||
let result = repoint_container(name).await;
|
let result = repoint_container(name).await;
|
||||||
coord.clear_transient(name);
|
drop(_guard);
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
tracing::warn!(%name, error = ?e, "migration: container repoint failed");
|
tracing::warn!(%name, error = ?e, "migration: container repoint failed");
|
||||||
all_ok = false;
|
all_ok = false;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue