reminder: add sqlite storage + broker methods + dispatch
This commit is contained in:
parent
7e9fd8e978
commit
4fc9c02934
2 changed files with 79 additions and 4 deletions
|
|
@ -193,10 +193,28 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
|
||||||
timing,
|
timing,
|
||||||
file_path,
|
file_path,
|
||||||
} => {
|
} => {
|
||||||
// TODO: submit to reminder scheduler
|
use hive_sh4re::ReminderTiming;
|
||||||
// For now, return a stub response
|
let due_at = match timing {
|
||||||
AgentResponse::Err {
|
ReminderTiming::InSeconds { seconds } => {
|
||||||
message: "remind not yet implemented".to_owned(),
|
std::time::SystemTime::now()
|
||||||
|
.checked_add(std::time::Duration::from_secs(*seconds))
|
||||||
|
.and_then(|t| {
|
||||||
|
t.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.ok()
|
||||||
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
||||||
|
})
|
||||||
|
.unwrap_or(0)
|
||||||
|
}
|
||||||
|
ReminderTiming::At { unix_timestamp } => *unix_timestamp,
|
||||||
|
};
|
||||||
|
match broker.store_reminder(agent, message, file_path.as_deref(), due_at) {
|
||||||
|
Ok(id) => {
|
||||||
|
tracing::info!(%id, %agent, %due_at, "reminder scheduled");
|
||||||
|
AgentResponse::Ok
|
||||||
|
}
|
||||||
|
Err(e) => AgentResponse::Err {
|
||||||
|
message: format!("failed to store reminder: {e:#}"),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,18 @@ CREATE TABLE IF NOT EXISTS messages (
|
||||||
);
|
);
|
||||||
CREATE INDEX IF NOT EXISTS idx_messages_undelivered
|
CREATE INDEX IF NOT EXISTS idx_messages_undelivered
|
||||||
ON messages (recipient, id) WHERE delivered_at IS NULL;
|
ON messages (recipient, id) WHERE delivered_at IS NULL;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS reminders (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
agent TEXT NOT NULL,
|
||||||
|
message TEXT NOT NULL,
|
||||||
|
file_path TEXT,
|
||||||
|
due_at INTEGER NOT NULL,
|
||||||
|
created_at INTEGER NOT NULL,
|
||||||
|
sent_at INTEGER
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_reminders_due
|
||||||
|
ON reminders (agent, due_at) WHERE sent_at IS NULL;
|
||||||
";
|
";
|
||||||
|
|
||||||
/// Capacity of the live event channel. Slow subscribers (e.g. an idle browser)
|
/// Capacity of the live event channel. Slow subscribers (e.g. an idle browser)
|
||||||
|
|
@ -205,6 +217,51 @@ impl Broker {
|
||||||
});
|
});
|
||||||
Ok(Some(Message { from, to, body }))
|
Ok(Some(Message { from, to, body }))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Store a new reminder. Returns the reminder id.
|
||||||
|
pub fn store_reminder(
|
||||||
|
&self,
|
||||||
|
agent: &str,
|
||||||
|
message: &str,
|
||||||
|
file_path: Option<&str>,
|
||||||
|
due_at: i64,
|
||||||
|
) -> Result<i64> {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO reminders (agent, message, file_path, due_at, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
|
||||||
|
params![agent, message, file_path, due_at, now_unix()],
|
||||||
|
)?;
|
||||||
|
let id = conn.last_insert_rowid();
|
||||||
|
Ok(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get all reminders for an agent that are due now or in the past.
|
||||||
|
/// Returns (id, message, file_path) tuples.
|
||||||
|
pub fn get_due_reminders(&self, agent: &str) -> Result<Vec<(i64, String, Option<String>)>> {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
let mut stmt = conn.prepare(
|
||||||
|
"SELECT id, message, file_path FROM reminders WHERE agent = ?1 AND due_at <= ?2 AND sent_at IS NULL ORDER BY due_at ASC"
|
||||||
|
)?;
|
||||||
|
let rows = stmt.query_map(params![agent, now_unix()], |row| {
|
||||||
|
Ok((
|
||||||
|
row.get::<_, i64>(0)?,
|
||||||
|
row.get::<_, String>(1)?,
|
||||||
|
row.get::<_, Option<String>>(2)?,
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
rows.collect::<rusqlite::Result<Vec<_>>>()
|
||||||
|
.context("query reminders")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark a reminder as sent (delivered).
|
||||||
|
pub fn mark_reminder_sent(&self, id: i64) -> Result<()> {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE reminders SET sent_at = ?1 WHERE id = ?2",
|
||||||
|
params![now_unix(), id],
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn now_unix() -> i64 {
|
fn now_unix() -> i64 {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue