//! Sqlite-backed message broker. Survives `hive-c0re` restart, and taps every //! send/recv onto a broadcast channel so the dashboard can stream it. use std::path::Path; use std::sync::Mutex; use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result}; use hive_sh4re::Message; use rusqlite::{Connection, OptionalExtension, params}; use serde::Serialize; use tokio::sync::broadcast; const SCHEMA: &str = r" CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, sender TEXT NOT NULL, recipient TEXT NOT NULL, body TEXT NOT NULL, sent_at INTEGER NOT NULL, delivered_at INTEGER ); CREATE INDEX IF NOT EXISTS idx_messages_undelivered ON messages (recipient, id) WHERE delivered_at IS NULL; "; /// Capacity of the live event channel. Slow subscribers (e.g. an idle browser) /// may drop events past this; we send a `lagged` notice in their stream. const EVENT_CHANNEL: usize = 256; #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "snake_case", tag = "kind")] pub enum MessageEvent { Sent { from: String, to: String, body: String, at: i64, }, Delivered { from: String, to: String, body: String, at: i64, }, } pub struct Broker { conn: Mutex, events: broadcast::Sender, } impl Broker { pub fn open(path: &Path) -> Result { if let Some(parent) = path.parent() { std::fs::create_dir_all(parent) .with_context(|| format!("create db parent {}", parent.display()))?; } let conn = Connection::open(path) .with_context(|| format!("open broker db {}", path.display()))?; conn.execute_batch(SCHEMA).context("apply broker schema")?; let (events, _) = broadcast::channel(EVENT_CHANNEL); Ok(Self { conn: Mutex::new(conn), events, }) } pub fn subscribe(&self) -> broadcast::Receiver { self.events.subscribe() } pub fn send(&self, message: &Message) -> Result<()> { let conn = self.conn.lock().unwrap(); conn.execute( "INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)", params![message.from, message.to, message.body, now_unix()], )?; drop(conn); let _ = self.events.send(MessageEvent::Sent { from: message.from.clone(), to: message.to.clone(), body: message.body.clone(), at: now_unix(), }); Ok(()) } pub fn recv(&self, recipient: &str) -> Result> { let conn = self.conn.lock().unwrap(); let row: Option<(i64, String, String, String)> = conn .query_row( "SELECT id, sender, recipient, body FROM messages WHERE recipient = ?1 AND delivered_at IS NULL ORDER BY id ASC LIMIT 1", params![recipient], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), ) .optional()?; let Some((id, from, to, body)) = row else { return Ok(None); }; conn.execute( "UPDATE messages SET delivered_at = ?1 WHERE id = ?2", params![now_unix(), id], )?; drop(conn); let _ = self.events.send(MessageEvent::Delivered { from: from.clone(), to: to.clone(), body: body.clone(), at: now_unix(), }); Ok(Some(Message { from, to, body })) } } fn now_unix() -> i64 { SystemTime::now() .duration_since(UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0) }