From 4fc9c029341dabe735eb70c6231c7b8075c2c3fb Mon Sep 17 00:00:00 2001 From: damocles Date: Sat, 16 May 2026 12:40:38 +0200 Subject: [PATCH] reminder: add sqlite storage + broker methods + dispatch --- hive-c0re/src/agent_server.rs | 26 +++++++++++++--- hive-c0re/src/broker.rs | 57 +++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 4 deletions(-) diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index f77f953..a4cad21 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -193,10 +193,28 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> timing, file_path, } => { - // TODO: submit to reminder scheduler - // For now, return a stub response - AgentResponse::Err { - message: "remind not yet implemented".to_owned(), + use hive_sh4re::ReminderTiming; + let due_at = match timing { + ReminderTiming::InSeconds { seconds } => { + std::time::SystemTime::now() + .checked_add(std::time::Duration::from_secs(*seconds)) + .and_then(|t| { + t.duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + }) + .unwrap_or(0) + } + ReminderTiming::At { unix_timestamp } => *unix_timestamp, + }; + match broker.store_reminder(agent, message, file_path.as_deref(), due_at) { + Ok(id) => { + tracing::info!(%id, %agent, %due_at, "reminder scheduled"); + AgentResponse::Ok + } + Err(e) => AgentResponse::Err { + message: format!("failed to store reminder: {e:#}"), + }, } } } diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 72486ba..2077326 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -22,6 +22,18 @@ CREATE TABLE IF NOT EXISTS messages ( ); CREATE INDEX IF NOT EXISTS idx_messages_undelivered ON messages (recipient, id) WHERE delivered_at IS NULL; + +CREATE TABLE IF NOT EXISTS reminders ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent TEXT NOT NULL, + message TEXT NOT NULL, + file_path TEXT, + due_at INTEGER NOT NULL, + created_at INTEGER NOT NULL, + sent_at INTEGER +); +CREATE INDEX IF NOT EXISTS idx_reminders_due + ON reminders (agent, due_at) WHERE sent_at IS NULL; "; /// Capacity of the live event channel. Slow subscribers (e.g. an idle browser) @@ -205,6 +217,51 @@ impl Broker { }); Ok(Some(Message { from, to, body })) } + + /// Store a new reminder. Returns the reminder id. + pub fn store_reminder( + &self, + agent: &str, + message: &str, + file_path: Option<&str>, + due_at: i64, + ) -> Result { + let conn = self.conn.lock().unwrap(); + conn.execute( + "INSERT INTO reminders (agent, message, file_path, due_at, created_at) VALUES (?1, ?2, ?3, ?4, ?5)", + params![agent, message, file_path, due_at, now_unix()], + )?; + let id = conn.last_insert_rowid(); + Ok(id) + } + + /// Get all reminders for an agent that are due now or in the past. + /// Returns (id, message, file_path) tuples. + pub fn get_due_reminders(&self, agent: &str) -> Result)>> { + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT id, message, file_path FROM reminders WHERE agent = ?1 AND due_at <= ?2 AND sent_at IS NULL ORDER BY due_at ASC" + )?; + let rows = stmt.query_map(params![agent, now_unix()], |row| { + Ok(( + row.get::<_, i64>(0)?, + row.get::<_, String>(1)?, + row.get::<_, Option>(2)?, + )) + })?; + rows.collect::>>() + .context("query reminders") + } + + /// Mark a reminder as sent (delivered). + pub fn mark_reminder_sent(&self, id: i64) -> Result<()> { + let conn = self.conn.lock().unwrap(); + conn.execute( + "UPDATE reminders SET sent_at = ?1 WHERE id = ?2", + params![now_unix(), id], + )?; + Ok(()) + } } fn now_unix() -> i64 {