manager: needs_login / logged_in / needs_update events + update tool
crash_watch grows two more state-axes alongside running/stopped: - logged-in (claude session dir populated for the agent) - up-to-date (recorded flake rev matches current) per-tick transitions emit HelperEvent::NeedsLogin / LoggedIn / NeedsUpdate. seed-on-first-tick semantics retained — nothing fires on harness boot for agents that were already in their state. only needs_update fires the 'stale appeared' direction; the resolved direction is already covered by Rebuilt. new mcp__hyperhive__update(name) on the manager surface: idempotent rebuild via auto_update::rebuild_agent. transient-aware (Rebuilding) so the dashboard shows the spinner. login intentionally has NO tool — it's interactive OAuth, only the operator can complete it. prompts + approvals doc + turn-loop doc updated. todo grows a 'show per-agent applied config in dashboard' entry (separate follow-up).
This commit is contained in:
parent
b374f39b0d
commit
80229c6af9
8 changed files with 230 additions and 34 deletions
|
|
@ -1,15 +1,22 @@
|
|||
//! Container crash watcher. Polls every managed container's running
|
||||
//! state on a fixed interval; when a previously-running container is
|
||||
//! suddenly stopped AND no operator-initiated transient (`Stopping`,
|
||||
//! `Restarting`, `Destroying`) was set, fire `HelperEvent::ContainerCrash`
|
||||
//! into the manager's inbox. The manager can then react — usually
|
||||
//! a `start` or a config rebuild.
|
||||
//! Per-container state watcher. Polls every managed container on a
|
||||
//! fixed interval, tracks three orthogonal state-sets across ticks,
|
||||
//! and emits a `HelperEvent` to the manager on each transition:
|
||||
//!
|
||||
//! D-Bus subscription would be lower-latency, but polling is far
|
||||
//! simpler and the failure modes are honest (a crash discovered 10s
|
||||
//! late is fine for our scale).
|
||||
//! - **running**: container is up. running → stopped without an
|
||||
//! operator-initiated transient (`Stopping` / `Restarting` /
|
||||
//! `Destroying` / `Rebuilding`) → `ContainerCrash`.
|
||||
//! - **logged-in**: claude session dir is populated. ! → ✓ →
|
||||
//! `LoggedIn`; ✓ → ! → `NeedsLogin` (rare — usually only fires
|
||||
//! on a fresh spawn / purge).
|
||||
//! - **up-to-date**: agent's recorded flake rev matches current. ✓
|
||||
//! → ! → `NeedsUpdate`. The reverse direction (`NeedsUpdate`
|
||||
//! resolved) is covered by `Rebuilt`, so no separate event.
|
||||
//!
|
||||
//! D-Bus subscription would be lower-latency for the first axis,
|
||||
//! but polling is simpler and a 10s detection delay is fine.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
@ -20,14 +27,17 @@ const POLL_INTERVAL: Duration = Duration::from_secs(10);
|
|||
|
||||
pub fn spawn(coord: Arc<Coordinator>) {
|
||||
tokio::spawn(async move {
|
||||
// Seed the running-set from the first poll so we don't emit a
|
||||
// crash for every agent on startup. First tick fills it; only
|
||||
// running→stopped transitions across subsequent ticks count.
|
||||
let mut prev_running: HashSet<String> = HashSet::new();
|
||||
let mut prev_logged_in: HashSet<String> = HashSet::new();
|
||||
let mut prev_updated: HashSet<String> = HashSet::new();
|
||||
let mut seeded = false;
|
||||
loop {
|
||||
let raw = lifecycle::list().await.unwrap_or_default();
|
||||
let current_rev = crate::auto_update::current_flake_rev(&coord.hyperhive_flake);
|
||||
let mut current_running = HashSet::new();
|
||||
let mut current_logged_in = HashSet::new();
|
||||
let mut current_updated = HashSet::new();
|
||||
let mut sub_agents: Vec<String> = Vec::new();
|
||||
for c in &raw {
|
||||
let logical = if c == MANAGER_NAME {
|
||||
MANAGER_NAME.to_owned()
|
||||
|
|
@ -36,37 +46,131 @@ pub fn spawn(coord: Arc<Coordinator>) {
|
|||
} else {
|
||||
continue;
|
||||
};
|
||||
if logical != MANAGER_NAME {
|
||||
sub_agents.push(logical.clone());
|
||||
}
|
||||
if lifecycle::is_running(&logical).await {
|
||||
current_running.insert(logical);
|
||||
current_running.insert(logical.clone());
|
||||
}
|
||||
if logical != MANAGER_NAME
|
||||
&& claude_has_session(&Coordinator::agent_claude_dir(&logical))
|
||||
{
|
||||
current_logged_in.insert(logical.clone());
|
||||
}
|
||||
if let Some(rev) = current_rev.as_deref()
|
||||
&& !crate::auto_update::agent_needs_update(&logical, rev)
|
||||
{
|
||||
current_updated.insert(logical.clone());
|
||||
}
|
||||
}
|
||||
|
||||
if seeded {
|
||||
let transients = coord.transient_snapshot();
|
||||
for stopped in prev_running.difference(¤t_running) {
|
||||
let deliberate = transients.get(stopped).is_some_and(|st| {
|
||||
matches!(
|
||||
st.kind,
|
||||
TransientKind::Stopping
|
||||
| TransientKind::Restarting
|
||||
| TransientKind::Destroying
|
||||
| TransientKind::Rebuilding
|
||||
)
|
||||
});
|
||||
if deliberate {
|
||||
continue;
|
||||
}
|
||||
tracing::warn!(agent = %stopped, "container crash detected");
|
||||
coord.notify_manager(&hive_sh4re::HelperEvent::ContainerCrash {
|
||||
agent: stopped.clone(),
|
||||
note: Some("container stopped without an operator action".into()),
|
||||
});
|
||||
}
|
||||
emit_crash_transitions(&coord, &prev_running, ¤t_running);
|
||||
emit_login_transitions(&coord, &prev_logged_in, ¤t_logged_in, &sub_agents);
|
||||
emit_update_transitions(&coord, &prev_updated, ¤t_updated, &sub_agents);
|
||||
}
|
||||
prev_running = current_running;
|
||||
prev_logged_in = current_logged_in;
|
||||
prev_updated = current_updated;
|
||||
seeded = true;
|
||||
|
||||
tokio::time::sleep(POLL_INTERVAL).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn emit_crash_transitions(coord: &Coordinator, prev: &HashSet<String>, current: &HashSet<String>) {
|
||||
let transients = coord.transient_snapshot();
|
||||
for stopped in prev.difference(current) {
|
||||
let deliberate = transients.get(stopped).is_some_and(|st| {
|
||||
matches!(
|
||||
st.kind,
|
||||
TransientKind::Stopping
|
||||
| TransientKind::Restarting
|
||||
| TransientKind::Destroying
|
||||
| TransientKind::Rebuilding
|
||||
)
|
||||
});
|
||||
if deliberate {
|
||||
continue;
|
||||
}
|
||||
tracing::warn!(agent = %stopped, "container crash detected");
|
||||
coord.notify_manager(&hive_sh4re::HelperEvent::ContainerCrash {
|
||||
agent: stopped.clone(),
|
||||
note: Some("container stopped without an operator action".into()),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_login_transitions(
|
||||
coord: &Coordinator,
|
||||
prev: &HashSet<String>,
|
||||
current: &HashSet<String>,
|
||||
sub_agents: &[String],
|
||||
) {
|
||||
for agent in current.difference(prev) {
|
||||
tracing::info!(%agent, "agent logged in");
|
||||
coord.notify_manager(&hive_sh4re::HelperEvent::LoggedIn {
|
||||
agent: agent.clone(),
|
||||
});
|
||||
}
|
||||
// Only count NeedsLogin transitions for agents that exist and
|
||||
// are *not* logged in — the difference set above already gives
|
||||
// us "was in prev, gone from current" but we also want to fire
|
||||
// for agents that newly appeared as not-logged-in (post-spawn /
|
||||
// post-purge). Treat sub_agents minus current as the
|
||||
// currently-needs-login set; emit when an agent enters it.
|
||||
let prev_needs: HashSet<&str> = sub_agents
|
||||
.iter()
|
||||
.map(String::as_str)
|
||||
.filter(|n| !prev.contains(*n))
|
||||
.collect();
|
||||
let current_needs: HashSet<&str> = sub_agents
|
||||
.iter()
|
||||
.map(String::as_str)
|
||||
.filter(|n| !current.contains(*n))
|
||||
.collect();
|
||||
for agent in current_needs.difference(&prev_needs) {
|
||||
tracing::info!(%agent, "agent needs login");
|
||||
coord.notify_manager(&hive_sh4re::HelperEvent::NeedsLogin {
|
||||
agent: (*agent).to_owned(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_update_transitions(
|
||||
coord: &Coordinator,
|
||||
prev_updated: &HashSet<String>,
|
||||
current_updated: &HashSet<String>,
|
||||
sub_agents: &[String],
|
||||
) {
|
||||
// Fired on the "was up-to-date, now isn't" transition. The
|
||||
// reverse (rebuilt) is already covered by HelperEvent::Rebuilt.
|
||||
let prev_stale: HashSet<&str> = sub_agents
|
||||
.iter()
|
||||
.map(String::as_str)
|
||||
.filter(|n| !prev_updated.contains(*n))
|
||||
.collect();
|
||||
let current_stale: HashSet<&str> = sub_agents
|
||||
.iter()
|
||||
.map(String::as_str)
|
||||
.filter(|n| !current_updated.contains(*n))
|
||||
.collect();
|
||||
for agent in current_stale.difference(&prev_stale) {
|
||||
tracing::info!(%agent, "agent needs update");
|
||||
coord.notify_manager(&hive_sh4re::HelperEvent::NeedsUpdate {
|
||||
agent: (*agent).to_owned(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Mirrors `dashboard::claude_has_session`. Lives here too so the
|
||||
/// watcher doesn't depend on dashboard internals.
|
||||
fn claude_has_session(dir: &Path) -> bool {
|
||||
let Ok(entries) = std::fs::read_dir(dir) else {
|
||||
return false;
|
||||
};
|
||||
entries
|
||||
.flatten()
|
||||
.any(|e| e.file_type().is_ok_and(|t| t.is_file()))
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue