broker: sqlite-backed (survives hive-c0re restart)

This commit is contained in:
müde 2026-05-14 22:17:16 +02:00
parent af464e27f4
commit d220720f6a
6 changed files with 90 additions and 27 deletions

View file

@ -14,6 +14,7 @@ version = "0.1.0"
anyhow = "1" anyhow = "1"
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
hive-sh4re = { path = "hive-sh4re" } hive-sh4re = { path = "hive-sh4re" }
rusqlite = { version = "0.37", features = ["bundled"] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
tokio = { version = "1", features = ["io-util", "macros", "net", "process", "rt-multi-thread", "signal", "time"] } tokio = { version = "1", features = ["io-util", "macros", "net", "process", "rt-multi-thread", "signal", "time"] }

View file

@ -7,6 +7,7 @@ version.workspace = true
anyhow.workspace = true anyhow.workspace = true
clap.workspace = true clap.workspace = true
hive-sh4re.workspace = true hive-sh4re.workspace = true
rusqlite.workspace = true
serde.workspace = true serde.workspace = true
serde_json.workspace = true serde_json.workspace = true
tokio.workspace = true tokio.workspace = true

View file

@ -83,19 +83,26 @@ async fn serve(stream: UnixStream, agent: String, broker: Arc<Broker>) -> Result
fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse { fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse {
match req { match req {
AgentRequest::Send { to, body } => { AgentRequest::Send { to, body } => {
broker.send(Message { match broker.send(Message {
from: agent.to_owned(), from: agent.to_owned(),
to: to.clone(), to: to.clone(),
body: body.clone(), body: body.clone(),
}); }) {
AgentResponse::Ok Ok(()) => AgentResponse::Ok,
Err(e) => AgentResponse::Err {
message: format!("{e:#}"),
},
}
} }
AgentRequest::Recv => match broker.recv(agent) { AgentRequest::Recv => match broker.recv(agent) {
Some(msg) => AgentResponse::Message { Ok(Some(msg)) => AgentResponse::Message {
from: msg.from, from: msg.from,
body: msg.body, body: msg.body,
}, },
None => AgentResponse::Empty, Ok(None) => AgentResponse::Empty,
Err(e) => AgentResponse::Err {
message: format!("{e:#}"),
},
}, },
} }
} }

View file

@ -1,30 +1,80 @@
//! In-memory message broker. Phase 3 replaces this with a sqlite-backed store. //! Sqlite-backed message broker. Survives `hive-c0re` restart.
use std::collections::{HashMap, VecDeque}; use std::path::Path;
use std::sync::Mutex; use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result};
use hive_sh4re::Message; use hive_sh4re::Message;
use rusqlite::{Connection, OptionalExtension, params};
const SCHEMA: &str = r#"
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender TEXT NOT NULL,
recipient TEXT NOT NULL,
body TEXT NOT NULL,
sent_at INTEGER NOT NULL,
delivered_at INTEGER
);
CREATE INDEX IF NOT EXISTS idx_messages_undelivered
ON messages (recipient, id) WHERE delivered_at IS NULL;
"#;
#[derive(Default)]
pub struct Broker { pub struct Broker {
queues: Mutex<HashMap<String, VecDeque<Message>>>, conn: Mutex<Connection>,
} }
impl Broker { impl Broker {
pub fn new() -> Self { pub fn open(path: &Path) -> Result<Self> {
Self::default() if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create db parent {}", parent.display()))?;
}
let conn = Connection::open(path)
.with_context(|| format!("open broker db {}", path.display()))?;
conn.execute_batch(SCHEMA).context("apply broker schema")?;
Ok(Self {
conn: Mutex::new(conn),
})
} }
pub fn send(&self, message: Message) { pub fn send(&self, message: Message) -> Result<()> {
let mut queues = self.queues.lock().unwrap(); let conn = self.conn.lock().unwrap();
queues conn.execute(
.entry(message.to.clone()) "INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)",
.or_default() params![message.from, message.to, message.body, now_unix()],
.push_back(message); )?;
Ok(())
} }
pub fn recv(&self, recipient: &str) -> Option<Message> { pub fn recv(&self, recipient: &str) -> Result<Option<Message>> {
let mut queues = self.queues.lock().unwrap(); let conn = self.conn.lock().unwrap();
queues.get_mut(recipient).and_then(|q| q.pop_front()) let row: Option<(i64, String, String, String)> = conn
.query_row(
"SELECT id, sender, recipient, body
FROM messages
WHERE recipient = ?1 AND delivered_at IS NULL
ORDER BY id ASC
LIMIT 1",
params![recipient],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
)
.optional()?;
let Some((id, from, to, body)) = row else {
return Ok(None);
};
conn.execute(
"UPDATE messages SET delivered_at = ?1 WHERE id = ?2",
params![now_unix(), id],
)?;
Ok(Some(Message { from, to, body }))
} }
} }
fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0)
}

View file

@ -2,7 +2,7 @@
//! sockets: the broker plus a map of `name -> AgentSocket`. //! sockets: the broker plus a map of `name -> AgentSocket`.
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -18,11 +18,12 @@ pub struct Coordinator {
} }
impl Coordinator { impl Coordinator {
pub fn new() -> Self { pub fn open(db_path: &Path) -> Result<Self> {
Self { let broker = Broker::open(db_path).context("open broker")?;
broker: Arc::new(Broker::new()), Ok(Self {
broker: Arc::new(broker),
agents: Mutex::new(HashMap::new()), agents: Mutex::new(HashMap::new()),
} })
} }
pub async fn register_agent(&self, name: &str) -> Result<PathBuf> { pub async fn register_agent(&self, name: &str) -> Result<PathBuf> {

View file

@ -32,6 +32,9 @@ enum Cmd {
/// Flake reference for the agent base template. /// Flake reference for the agent base template.
#[arg(long, default_value = "/etc/hyperhive#agent-base")] #[arg(long, default_value = "/etc/hyperhive#agent-base")]
agent_flake: String, agent_flake: String,
/// Path to the sqlite message store.
#[arg(long, default_value = "/var/lib/hyperhive/broker.sqlite")]
db: PathBuf,
}, },
/// Spawn a new agent container (`hive-agent-<name>`). /// Spawn a new agent container (`hive-agent-<name>`).
Spawn { name: String }, Spawn { name: String },
@ -54,8 +57,8 @@ async fn main() -> Result<()> {
let cli = Cli::parse(); let cli = Cli::parse();
match cli.cmd { match cli.cmd {
Cmd::Serve { agent_flake } => { Cmd::Serve { agent_flake, db } => {
let coord = Arc::new(Coordinator::new()); let coord = Arc::new(Coordinator::open(&db)?);
server::serve(&cli.socket, &agent_flake, coord).await server::serve(&cli.socket, &agent_flake, coord).await
} }
Cmd::Spawn { name } => { Cmd::Spawn { name } => {