diff --git a/hive-c0re/src/coordinator.rs b/hive-c0re/src/coordinator.rs index 881ce04..30c738e 100644 --- a/hive-c0re/src/coordinator.rs +++ b/hive-c0re/src/coordinator.rs @@ -8,7 +8,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, watch}; use crate::agent_server::{self, AgentSocket}; use crate::approvals::Approvals; @@ -73,6 +73,10 @@ pub struct Coordinator { /// tokio mutex so the rescan can `await` `lifecycle::list` / /// `is_running` without blocking other coordinator paths. last_containers: tokio::sync::Mutex>, + /// Shutdown signal broadcast to all background tasks. Sending + /// `true` asks every loop to exit after its current work item. + /// Use `shutdown_rx()` to subscribe; `request_shutdown()` to fire. + shutdown_tx: watch::Sender, } /// Per-agent in-progress state that the dashboard surfaces between approve @@ -140,6 +144,7 @@ impl Coordinator { let approvals = Approvals::open(db_path).context("open approvals")?; let questions = OperatorQuestions::open(db_path).context("open operator_questions")?; let (dashboard_events, _) = broadcast::channel(DASHBOARD_CHANNEL); + let (shutdown_tx, _) = watch::channel(false); Ok(Self { broker: Arc::new(broker), approvals: Arc::new(approvals), @@ -152,9 +157,27 @@ impl Coordinator { dashboard_events, event_seq: AtomicU64::new(0), last_containers: tokio::sync::Mutex::new(HashMap::new()), + shutdown_tx, }) } + /// Subscribe to the shutdown watch channel. Background tasks call + /// this at spawn time and break their loop when the receiver + /// transitions to `true` (via `Coordinator::request_shutdown`). + /// A closed channel (i.e. the Coordinator was dropped) also + /// signals tasks to exit. + pub fn shutdown_rx(&self) -> watch::Receiver { + self.shutdown_tx.subscribe() + } + + /// Signal all background tasks to exit cleanly. The tasks break + /// out of their poll loop after completing their current work item. + /// Best-effort — does nothing if all receivers have already been + /// dropped (e.g. process is already mid-shutdown). + pub fn request_shutdown(&self) { + let _ = self.shutdown_tx.send(true); + } + /// Subscribe to the unified dashboard event channel. Used by the /// `/dashboard/stream` SSE handler and by the broker-to-dashboard /// forwarder task. diff --git a/hive-c0re/src/crash_watch.rs b/hive-c0re/src/crash_watch.rs index 724bd8e..34270f2 100644 --- a/hive-c0re/src/crash_watch.rs +++ b/hive-c0re/src/crash_watch.rs @@ -26,6 +26,7 @@ use crate::lifecycle::{self, AGENT_PREFIX, MANAGER_NAME}; const POLL_INTERVAL: Duration = Duration::from_secs(10); pub fn spawn(coord: Arc) { + let mut shutdown = coord.shutdown_rx(); tokio::spawn(async move { let mut prev_running: HashSet = HashSet::new(); let mut prev_logged_in: HashSet = HashSet::new(); @@ -80,7 +81,13 @@ pub fn spawn(coord: Arc) { prev_updated = current_updated; seeded = true; - tokio::time::sleep(POLL_INTERVAL).await; + tokio::select! { + _ = tokio::time::sleep(POLL_INTERVAL) => {} + _ = shutdown.changed() => { + tracing::info!("crash watcher: shutdown signal received"); + break; + } + } } }); } diff --git a/hive-c0re/src/events_vacuum.rs b/hive-c0re/src/events_vacuum.rs index 53362b0..2c99fed 100644 --- a/hive-c0re/src/events_vacuum.rs +++ b/hive-c0re/src/events_vacuum.rs @@ -24,13 +24,17 @@ const KEEP_SECS: i64 = 7 * 24 * 3600; /// the vacuum SQL against its events.sqlite if present. Errors are /// logged but don't tear the loop down. pub fn spawn(coord: Arc) { + let mut shutdown = coord.shutdown_rx(); tokio::spawn(async move { loop { sweep_once(); - // touching coord keeps the type wired in case future sweeps - // need approvals/etc.; the ref is otherwise unused today. - let _ = &coord; - tokio::time::sleep(VACUUM_INTERVAL).await; + tokio::select! { + _ = tokio::time::sleep(VACUUM_INTERVAL) => {} + _ = shutdown.changed() => { + tracing::info!("events vacuum: shutdown signal received"); + break; + } + } } }); } diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index df9e989..00ad47e 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -154,8 +154,9 @@ async fn main() -> Result<()> { // requeue_inflight) and undelivered rows are always kept. // Runs hourly; first sweep happens immediately. let vacuum_coord = coord.clone(); + let mut vacuum_shutdown = coord.shutdown_rx(); tokio::spawn(async move { - let interval_secs = 3600u64; + let interval = std::time::Duration::from_secs(3600); let keep_secs: i64 = 30 * 24 * 3600; loop { match vacuum_coord.broker.vacuum_delivered(keep_secs) { @@ -163,7 +164,13 @@ async fn main() -> Result<()> { Ok(n) => tracing::info!(removed = n, "broker vacuum"), Err(e) => tracing::warn!(error = ?e, "broker vacuum failed"), } - tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; + tokio::select! { + _ = tokio::time::sleep(interval) => {} + _ = vacuum_shutdown.changed() => { + tracing::info!("broker vacuum: shutdown signal received"); + break; + } + } } }); // Per-agent events.sqlite vacuum: host-side so the harness @@ -191,7 +198,27 @@ async fn main() -> Result<()> { tracing::error!(error = ?e, "dashboard failed"); } }); - server::serve(&cli.socket, coord).await + // Run the admin socket until a signal arrives; then signal + // all background tasks so they exit cleanly before the + // process terminates. + let coord_sig = coord.clone(); + tokio::select! { + res = server::serve(&cli.socket, coord) => { res? } + _ = tokio::signal::ctrl_c() => { + tracing::info!("SIGINT received — requesting shutdown"); + coord_sig.request_shutdown(); + } + _ = async { + let mut sig = tokio::signal::unix::signal( + tokio::signal::unix::SignalKind::terminate() + ).expect("failed to install SIGTERM handler"); + sig.recv().await; + } => { + tracing::info!("SIGTERM received — requesting shutdown"); + coord_sig.request_shutdown(); + } + } + Ok(()) } Cmd::Spawn { name } => { render(client::request(&cli.socket, HostRequest::Spawn { name }).await?) diff --git a/hive-c0re/src/reminder_scheduler.rs b/hive-c0re/src/reminder_scheduler.rs index 92ce426..95c53ff 100644 --- a/hive-c0re/src/reminder_scheduler.rs +++ b/hive-c0re/src/reminder_scheduler.rs @@ -52,10 +52,17 @@ const REMINDER_BATCH_LIMIT: u64 = 100; const POLL_INTERVAL: Duration = Duration::from_secs(5); pub fn spawn(coord: Arc) { + let mut shutdown = coord.shutdown_rx(); tokio::spawn(async move { loop { tick(&coord); - tokio::time::sleep(POLL_INTERVAL).await; + tokio::select! { + _ = tokio::time::sleep(POLL_INTERVAL) => {} + _ = shutdown.changed() => { + tracing::info!("reminder scheduler: shutdown signal received"); + break; + } + } } }); } diff --git a/hive-c0re/src/stats_vacuum.rs b/hive-c0re/src/stats_vacuum.rs index 17db547..ab252bd 100644 --- a/hive-c0re/src/stats_vacuum.rs +++ b/hive-c0re/src/stats_vacuum.rs @@ -22,11 +22,17 @@ const KEEP_SECS: i64 = 90 * 24 * 3600; /// the vacuum SQL against its turn-stats.sqlite if present. Errors /// are logged but don't tear the loop down. pub fn spawn(coord: Arc) { + let mut shutdown = coord.shutdown_rx(); tokio::spawn(async move { loop { sweep_once(); - let _ = &coord; - tokio::time::sleep(VACUUM_INTERVAL).await; + tokio::select! { + _ = tokio::time::sleep(VACUUM_INTERVAL) => {} + _ = shutdown.changed() => { + tracing::info!("stats vacuum: shutdown signal received"); + break; + } + } } }); }