//! Sqlite-backed message broker. Survives `hive-c0re` restart. use std::path::Path; use std::sync::Mutex; use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result}; use hive_sh4re::Message; use rusqlite::{Connection, OptionalExtension, params}; const SCHEMA: &str = r" CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, sender TEXT NOT NULL, recipient TEXT NOT NULL, body TEXT NOT NULL, sent_at INTEGER NOT NULL, delivered_at INTEGER ); CREATE INDEX IF NOT EXISTS idx_messages_undelivered ON messages (recipient, id) WHERE delivered_at IS NULL; "; pub struct Broker { conn: Mutex, } impl Broker { pub fn open(path: &Path) -> Result { if let Some(parent) = path.parent() { std::fs::create_dir_all(parent) .with_context(|| format!("create db parent {}", parent.display()))?; } let conn = Connection::open(path).with_context(|| format!("open broker db {}", path.display()))?; conn.execute_batch(SCHEMA).context("apply broker schema")?; Ok(Self { conn: Mutex::new(conn), }) } pub fn send(&self, message: &Message) -> Result<()> { let conn = self.conn.lock().unwrap(); conn.execute( "INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)", params![message.from, message.to, message.body, now_unix()], )?; Ok(()) } pub fn recv(&self, recipient: &str) -> Result> { let conn = self.conn.lock().unwrap(); let row: Option<(i64, String, String, String)> = conn .query_row( "SELECT id, sender, recipient, body FROM messages WHERE recipient = ?1 AND delivered_at IS NULL ORDER BY id ASC LIMIT 1", params![recipient], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), ) .optional()?; let Some((id, from, to, body)) = row else { return Ok(None); }; conn.execute( "UPDATE messages SET delivered_at = ?1 WHERE id = ?2", params![now_unix(), id], )?; Ok(Some(Message { from, to, body })) } } fn now_unix() -> i64 { SystemTime::now() .duration_since(UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0) }