155 lines
6 KiB
Rust
155 lines
6 KiB
Rust
//! Per-container state watcher. Polls every managed container on a
|
|
//! fixed interval, tracks two orthogonal state-sets across ticks,
|
|
//! and emits a `HelperEvent` to the manager on each transition:
|
|
//!
|
|
//! - **running**: container is up. running → stopped without an
|
|
//! operator-initiated transient (`Stopping` / `Restarting` /
|
|
//! `Destroying` / `Rebuilding`) → `ContainerCrash`.
|
|
//! - **logged-in**: claude session dir is populated. ! → ✓ →
|
|
//! `LoggedIn`; ✓ → ! → `NeedsLogin` (rare — usually only fires
|
|
//! on a fresh spawn / purge).
|
|
//!
|
|
//! `NeedsUpdate` events are now fired from the apply-commit path
|
|
//! directly rather than via rev-marker polling (issue #179 cleanup).
|
|
//!
|
|
//! D-Bus subscription would be lower-latency for the first axis,
|
|
//! but polling is simpler and a 10s detection delay is fine.
|
|
|
|
use std::collections::HashSet;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use crate::container_view::claude_has_session;
|
|
use crate::coordinator::{Coordinator, TransientKind};
|
|
use crate::lifecycle::{self, AGENT_PREFIX, MANAGER_NAME};
|
|
|
|
const POLL_INTERVAL: Duration = Duration::from_secs(10);
|
|
|
|
pub fn spawn(coord: Arc<Coordinator>) {
|
|
let mut shutdown = coord.shutdown_rx();
|
|
tokio::spawn(async move {
|
|
let mut prev_running: HashSet<String> = HashSet::new();
|
|
let mut prev_logged_in: HashSet<String> = HashSet::new();
|
|
let mut prev_sub_agents: HashSet<String> = HashSet::new();
|
|
let mut seeded = false;
|
|
loop {
|
|
let raw = lifecycle::list().await.unwrap_or_default();
|
|
let mut current_running = HashSet::new();
|
|
let mut current_logged_in = HashSet::new();
|
|
let mut sub_agents: Vec<String> = Vec::new();
|
|
for c in &raw {
|
|
let logical = if c == MANAGER_NAME {
|
|
MANAGER_NAME.to_owned()
|
|
} else if let Some(n) = c.strip_prefix(AGENT_PREFIX) {
|
|
n.to_owned()
|
|
} else {
|
|
continue;
|
|
};
|
|
if logical != MANAGER_NAME {
|
|
sub_agents.push(logical.clone());
|
|
}
|
|
if lifecycle::is_running(&logical).await {
|
|
current_running.insert(logical.clone());
|
|
}
|
|
if logical != MANAGER_NAME
|
|
&& claude_has_session(&Coordinator::agent_claude_dir(&logical))
|
|
{
|
|
current_logged_in.insert(logical.clone());
|
|
}
|
|
}
|
|
|
|
if seeded {
|
|
emit_crash_transitions(&coord, &prev_running, ¤t_running);
|
|
emit_login_transitions(
|
|
&coord,
|
|
&prev_logged_in,
|
|
¤t_logged_in,
|
|
&sub_agents,
|
|
&prev_sub_agents,
|
|
);
|
|
}
|
|
// Periodic container rescan — catches state flips that
|
|
// happen outside our mutation surface (operator runs
|
|
// `nixos-container stop` over ssh, agent logs in via its
|
|
// own web UI, etc.) so the dashboard converges within one
|
|
// POLL_INTERVAL. Idempotent + cheap when nothing changed.
|
|
coord.rescan_containers_and_emit().await;
|
|
prev_running = current_running;
|
|
prev_logged_in = current_logged_in;
|
|
prev_sub_agents = sub_agents.into_iter().collect();
|
|
seeded = true;
|
|
|
|
tokio::select! {
|
|
() = tokio::time::sleep(POLL_INTERVAL) => {}
|
|
_ = shutdown.changed() => {
|
|
tracing::info!("crash watcher: shutdown signal received");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
fn emit_crash_transitions(coord: &Coordinator, prev: &HashSet<String>, current: &HashSet<String>) {
|
|
let transients = coord.transient_snapshot();
|
|
for stopped in prev.difference(current) {
|
|
let deliberate = transients.get(stopped).is_some_and(|st| {
|
|
matches!(
|
|
st.kind,
|
|
TransientKind::Stopping
|
|
| TransientKind::Restarting
|
|
| TransientKind::Destroying
|
|
| TransientKind::Rebuilding
|
|
)
|
|
});
|
|
if deliberate {
|
|
continue;
|
|
}
|
|
tracing::warn!(agent = %stopped, "container crash detected");
|
|
coord.notify_manager(&hive_sh4re::HelperEvent::ContainerCrash {
|
|
agent: stopped.clone(),
|
|
note: Some("container stopped without an operator action".into()),
|
|
});
|
|
}
|
|
}
|
|
|
|
fn emit_login_transitions(
|
|
coord: &Coordinator,
|
|
prev: &HashSet<String>,
|
|
current: &HashSet<String>,
|
|
sub_agents: &[String],
|
|
prev_sub_agents: &HashSet<String>,
|
|
) {
|
|
for agent in current.difference(prev) {
|
|
tracing::info!(%agent, "agent logged in");
|
|
coord.notify_manager(&hive_sh4re::HelperEvent::LoggedIn {
|
|
agent: agent.clone(),
|
|
});
|
|
}
|
|
// Detect transitions into "needs login": an agent that was previously
|
|
// logged-in goes unsigned (credentials deleted), OR a brand-new agent
|
|
// appears without a session.
|
|
//
|
|
// prev_needs uses prev_sub_agents (the agent set from the last tick) so
|
|
// that a newly-spawned agent — which does not appear in prev_sub_agents —
|
|
// is absent from prev_needs even though it's not in prev_logged_in.
|
|
// Without this, new agents land in both prev_needs and current_needs and
|
|
// the set difference is empty, silently dropping the event.
|
|
let prev_needs: HashSet<&str> = prev_sub_agents
|
|
.iter()
|
|
.map(String::as_str)
|
|
.filter(|n| !prev.contains(*n))
|
|
.collect();
|
|
let current_needs: HashSet<&str> = sub_agents
|
|
.iter()
|
|
.map(String::as_str)
|
|
.filter(|n| !current.contains(*n))
|
|
.collect();
|
|
for agent in current_needs.difference(&prev_needs) {
|
|
tracing::info!(%agent, "agent needs login");
|
|
coord.notify_manager(&hive_sh4re::HelperEvent::NeedsLogin {
|
|
agent: (*agent).to_owned(),
|
|
});
|
|
}
|
|
}
|
|
|