diff --git a/hive-c0re/src/coordinator.rs b/hive-c0re/src/coordinator.rs index ad151a8..6f348cf 100644 --- a/hive-c0re/src/coordinator.rs +++ b/hive-c0re/src/coordinator.rs @@ -118,6 +118,10 @@ impl Coordinator { } } + pub fn list_agents(&self) -> Vec { + self.agents.lock().unwrap().keys().cloned().collect() + } + /// Mark an agent as in-progress (only one state per agent for now). pub fn set_transient(&self, name: &str, kind: TransientKind) { self.transient.lock().unwrap().insert( diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 663f0ad..5908b7b 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -157,6 +157,52 @@ async fn main() -> Result<()> { // when a previously-running container goes away without an // operator-initiated transient state. crash_watch::spawn(coord.clone()); + // Reminder scheduler: checks for due reminders every 5 seconds, + // delivers them as inbox messages from "reminder". + let reminder_coord = coord.clone(); + tokio::spawn(async move { + use hive_sh4re::Message; + loop { + // Get all agents currently registered + let agents = reminder_coord.list_agents(); + for agent in agents { + match reminder_coord.broker.get_due_reminders(&agent) { + Ok(reminders) => { + for (id, message, _file_path) in reminders { + // Deliver as inbox message from "reminder" + if let Err(e) = reminder_coord.broker.send(&Message { + from: "reminder".to_owned(), + to: agent.clone(), + body: message.clone(), + }) { + tracing::warn!( + reminder_id = id, + %agent, + error = ?e, + "failed to deliver reminder" + ); + continue; + } + // Mark as sent + if let Err(e) = reminder_coord.broker.mark_reminder_sent(id) { + tracing::warn!( + reminder_id = id, + error = ?e, + "failed to mark reminder sent" + ); + } + } + } + Err(e) => tracing::warn!( + %agent, + error = ?e, + "failed to query due reminders" + ), + } + } + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + }); let dash_coord = coord.clone(); tokio::spawn(async move { if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {