diff --git a/TODO.md b/TODO.md index 23277de..5301610 100644 --- a/TODO.md +++ b/TODO.md @@ -10,8 +10,14 @@ - Handle text overflow → suggest file_path option for long messages - Per-agent reminder limits (burst capacity, rate limiting) +- **File path delivery**: currently unused in scheduler delivery loop — implement file write/delivery to /state//reminders/ or similar +- **Orphan reminders**: handle partial failures (e.g. delivery succeeds but mark_reminder_sent fails) to avoid resending +- **Unbounded batches**: implement per-cycle delivery limit so burst of 10k reminders doesn't flood the broker in one cycle +- **Scheduler shutdown**: add graceful shutdown signal when coordinator is destroyed (currently runs forever) +- **DB lock contention**: under high reminder volume, many concurrent mark_reminder_sent calls may serialize behind the Mutex lock — consider batch updates ## Dashboard - Per-agent reminder status (pending, delivered) - Reminder query interface for debugging +- Display reminder delivery errors (failed sends, mark failures) diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index a4cad21..961de51 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -194,26 +194,48 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> file_path, } => { use hive_sh4re::ReminderTiming; - let due_at = match timing { + + // Calculate the due_at timestamp, propagating errors instead of silently + // defaulting to epoch 1970 on overflow/conversion failure. + let due_at_result: Result = match timing { ReminderTiming::InSeconds { seconds } => { - std::time::SystemTime::now() - .checked_add(std::time::Duration::from_secs(*seconds)) - .and_then(|t| { - t.duration_since(std::time::UNIX_EPOCH) - .ok() - .and_then(|d| i64::try_from(d.as_secs()).ok()) - }) - .unwrap_or(0) + let now = std::time::SystemTime::now(); + let future = match now.checked_add(std::time::Duration::from_secs(*seconds)) { + Some(t) => t, + None => return AgentResponse::Err { + message: format!("InSeconds overflow: {seconds}s exceeds system time range"), + }, + }; + let duration = match future.duration_since(std::time::UNIX_EPOCH) { + Ok(d) => d, + Err(e) => return AgentResponse::Err { + message: format!("system time before UNIX_EPOCH: {e}"), + }, + }; + match i64::try_from(duration.as_secs()) { + Ok(ts) => Ok(ts), + Err(e) => return AgentResponse::Err { + message: format!("unix timestamp exceeds i64 range: {e}"), + }, + } } - ReminderTiming::At { unix_timestamp } => *unix_timestamp, + ReminderTiming::At { unix_timestamp } => Ok(*unix_timestamp), }; - match broker.store_reminder(agent, message, file_path.as_deref(), due_at) { - Ok(id) => { - tracing::info!(%id, %agent, %due_at, "reminder scheduled"); - AgentResponse::Ok + + match due_at_result { + Ok(due_at) => { + match broker.store_reminder(agent, message, file_path.as_deref(), due_at) { + Ok(id) => { + tracing::info!(%id, %agent, %due_at, "reminder scheduled"); + AgentResponse::Ok + } + Err(e) => AgentResponse::Err { + message: format!("failed to store reminder: {e:#}"), + }, + } } Err(e) => AgentResponse::Err { - message: format!("failed to store reminder: {e:#}"), + message: format!("invalid reminder timing: {e:#}"), }, } } diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 2077326..7935cde 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -253,6 +253,25 @@ impl Broker { .context("query reminders") } + /// Get all due reminders across all agents in a single query. + /// Returns a vec of (agent, id, message, file_path) tuples. + pub fn get_all_due_reminders(&self) -> Result)>> { + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT agent, id, message, file_path FROM reminders WHERE due_at <= ?1 AND sent_at IS NULL ORDER BY agent, due_at ASC" + )?; + let rows = stmt.query_map(params![now_unix()], |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, i64>(1)?, + row.get::<_, String>(2)?, + row.get::<_, Option>(3)?, + )) + })?; + rows.collect::>>() + .context("query all due reminders") + } + /// Mark a reminder as sent (delivered). pub fn mark_reminder_sent(&self, id: i64) -> Result<()> { let conn = self.conn.lock().unwrap(); diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 5908b7b..8d03682 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -163,41 +163,36 @@ async fn main() -> Result<()> { tokio::spawn(async move { use hive_sh4re::Message; loop { - // Get all agents currently registered - let agents = reminder_coord.list_agents(); - for agent in agents { - match reminder_coord.broker.get_due_reminders(&agent) { - Ok(reminders) => { - for (id, message, _file_path) in reminders { - // Deliver as inbox message from "reminder" - if let Err(e) = reminder_coord.broker.send(&Message { - from: "reminder".to_owned(), - to: agent.clone(), - body: message.clone(), - }) { - tracing::warn!( - reminder_id = id, - %agent, - error = ?e, - "failed to deliver reminder" - ); - continue; - } - // Mark as sent - if let Err(e) = reminder_coord.broker.mark_reminder_sent(id) { - tracing::warn!( - reminder_id = id, - error = ?e, - "failed to mark reminder sent" - ); - } + // Query all due reminders in a single DB call + match reminder_coord.broker.get_all_due_reminders() { + Ok(reminders) => { + for (agent, id, message, _file_path) in reminders { + // Deliver as inbox message from "reminder" + if let Err(e) = reminder_coord.broker.send(&Message { + from: "reminder".to_owned(), + to: agent.clone(), + body: message.clone(), + }) { + tracing::warn!( + reminder_id = id, + %agent, + error = ?e, + "failed to deliver reminder" + ); + continue; + } + // Mark as sent + if let Err(e) = reminder_coord.broker.mark_reminder_sent(id) { + tracing::warn!( + reminder_id = id, + error = ?e, + "failed to mark reminder sent" + ); } } - Err(e) => tracing::warn!( - %agent, - error = ?e, - "failed to query due reminders" - ), + } + Err(e) => { + tracing::warn!(error = ?e, "failed to query due reminders"); } } tokio::time::sleep(std::time::Duration::from_secs(5)).await;