diff --git a/Cargo.toml b/Cargo.toml index 8faa519..dec6e1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ version = "0.1.0" anyhow = "1" clap = { version = "4", features = ["derive"] } hive-sh4re = { path = "hive-sh4re" } +rusqlite = { version = "0.37", features = ["bundled"] } serde = { version = "1", features = ["derive"] } serde_json = "1" tokio = { version = "1", features = ["io-util", "macros", "net", "process", "rt-multi-thread", "signal", "time"] } diff --git a/hive-c0re/Cargo.toml b/hive-c0re/Cargo.toml index be3ed94..2d84474 100644 --- a/hive-c0re/Cargo.toml +++ b/hive-c0re/Cargo.toml @@ -7,6 +7,7 @@ version.workspace = true anyhow.workspace = true clap.workspace = true hive-sh4re.workspace = true +rusqlite.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index 89104a8..05aae47 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -83,19 +83,26 @@ async fn serve(stream: UnixStream, agent: String, broker: Arc) -> Result fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse { match req { AgentRequest::Send { to, body } => { - broker.send(Message { + match broker.send(Message { from: agent.to_owned(), to: to.clone(), body: body.clone(), - }); - AgentResponse::Ok + }) { + Ok(()) => AgentResponse::Ok, + Err(e) => AgentResponse::Err { + message: format!("{e:#}"), + }, + } } AgentRequest::Recv => match broker.recv(agent) { - Some(msg) => AgentResponse::Message { + Ok(Some(msg)) => AgentResponse::Message { from: msg.from, body: msg.body, }, - None => AgentResponse::Empty, + Ok(None) => AgentResponse::Empty, + Err(e) => AgentResponse::Err { + message: format!("{e:#}"), + }, }, } } diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 2867c1f..0eb2555 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -1,30 +1,80 @@ -//! In-memory message broker. Phase 3 replaces this with a sqlite-backed store. +//! Sqlite-backed message broker. Survives `hive-c0re` restart. -use std::collections::{HashMap, VecDeque}; +use std::path::Path; use std::sync::Mutex; +use std::time::{SystemTime, UNIX_EPOCH}; +use anyhow::{Context, Result}; use hive_sh4re::Message; +use rusqlite::{Connection, OptionalExtension, params}; + +const SCHEMA: &str = r#" +CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + sender TEXT NOT NULL, + recipient TEXT NOT NULL, + body TEXT NOT NULL, + sent_at INTEGER NOT NULL, + delivered_at INTEGER +); +CREATE INDEX IF NOT EXISTS idx_messages_undelivered + ON messages (recipient, id) WHERE delivered_at IS NULL; +"#; -#[derive(Default)] pub struct Broker { - queues: Mutex>>, + conn: Mutex, } impl Broker { - pub fn new() -> Self { - Self::default() + pub fn open(path: &Path) -> Result { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("create db parent {}", parent.display()))?; + } + let conn = Connection::open(path) + .with_context(|| format!("open broker db {}", path.display()))?; + conn.execute_batch(SCHEMA).context("apply broker schema")?; + Ok(Self { + conn: Mutex::new(conn), + }) } - pub fn send(&self, message: Message) { - let mut queues = self.queues.lock().unwrap(); - queues - .entry(message.to.clone()) - .or_default() - .push_back(message); + pub fn send(&self, message: Message) -> Result<()> { + let conn = self.conn.lock().unwrap(); + conn.execute( + "INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)", + params![message.from, message.to, message.body, now_unix()], + )?; + Ok(()) } - pub fn recv(&self, recipient: &str) -> Option { - let mut queues = self.queues.lock().unwrap(); - queues.get_mut(recipient).and_then(|q| q.pop_front()) + pub fn recv(&self, recipient: &str) -> Result> { + let conn = self.conn.lock().unwrap(); + let row: Option<(i64, String, String, String)> = conn + .query_row( + "SELECT id, sender, recipient, body + FROM messages + WHERE recipient = ?1 AND delivered_at IS NULL + ORDER BY id ASC + LIMIT 1", + params![recipient], + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), + ) + .optional()?; + let Some((id, from, to, body)) = row else { + return Ok(None); + }; + conn.execute( + "UPDATE messages SET delivered_at = ?1 WHERE id = ?2", + params![now_unix(), id], + )?; + Ok(Some(Message { from, to, body })) } } + +fn now_unix() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0) +} diff --git a/hive-c0re/src/coordinator.rs b/hive-c0re/src/coordinator.rs index 105e92a..04b5e6b 100644 --- a/hive-c0re/src/coordinator.rs +++ b/hive-c0re/src/coordinator.rs @@ -2,7 +2,7 @@ //! sockets: the broker plus a map of `name -> AgentSocket`. use std::collections::HashMap; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; @@ -18,11 +18,12 @@ pub struct Coordinator { } impl Coordinator { - pub fn new() -> Self { - Self { - broker: Arc::new(Broker::new()), + pub fn open(db_path: &Path) -> Result { + let broker = Broker::open(db_path).context("open broker")?; + Ok(Self { + broker: Arc::new(broker), agents: Mutex::new(HashMap::new()), - } + }) } pub async fn register_agent(&self, name: &str) -> Result { diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 760d193..6346cf9 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -32,6 +32,9 @@ enum Cmd { /// Flake reference for the agent base template. #[arg(long, default_value = "/etc/hyperhive#agent-base")] agent_flake: String, + /// Path to the sqlite message store. + #[arg(long, default_value = "/var/lib/hyperhive/broker.sqlite")] + db: PathBuf, }, /// Spawn a new agent container (`hive-agent-`). Spawn { name: String }, @@ -54,8 +57,8 @@ async fn main() -> Result<()> { let cli = Cli::parse(); match cli.cmd { - Cmd::Serve { agent_flake } => { - let coord = Arc::new(Coordinator::new()); + Cmd::Serve { agent_flake, db } => { + let coord = Arc::new(Coordinator::open(&db)?); server::serve(&cli.socket, &agent_flake, coord).await } Cmd::Spawn { name } => {