From 4c17146b6f69f5ac9e5e8300125151140475fa83 Mon Sep 17 00:00:00 2001 From: Damocles Date: Wed, 29 Apr 2026 22:03:03 +0200 Subject: [PATCH] event handler, claude bridge, frontmatter parser, rate limiter, full message loop --- src/bin/send.rs | 8 +- src/main.rs | 303 ++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 297 insertions(+), 14 deletions(-) diff --git a/src/bin/send.rs b/src/bin/send.rs index a4d27e6..3c035bb 100644 --- a/src/bin/send.rs +++ b/src/bin/send.rs @@ -53,9 +53,13 @@ async fn main() -> anyhow::Result<()> { client.restore_session(session.user_session).await?; // need at least one sync so the client knows about joined rooms - client.sync_once(matrix_sdk::config::SyncSettings::default()).await?; + client + .sync_once(matrix_sdk::config::SyncSettings::default()) + .await?; - let room = client.get_room(&room_id).context("room not found - has the bot joined it?")?; + let room = client + .get_room(&room_id) + .context("room not found - has the bot joined it?")?; let content = RoomMessageEventContent::text_plain(&message); room.send(content).await?; diff --git a/src/main.rs b/src/main.rs index b238a42..666ec9a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ use std::path::{Path, PathBuf}; +use std::sync::Arc; use anyhow::{Context, bail}; use matrix_sdk::{ @@ -6,12 +7,16 @@ use matrix_sdk::{ authentication::matrix::MatrixSession, config::SyncSettings, ruma::{ + OwnedRoomId, OwnedUserId, api::client::filter::FilterDefinition, - events::room::message::{MessageType, OriginalSyncRoomMessageEvent}, + events::room::message::{ + MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent, + }, }, }; use serde::{Deserialize, Serialize}; use tokio::fs; +use tokio::sync::Mutex; use tracing_subscriber::EnvFilter; #[derive(Debug, Deserialize)] @@ -30,8 +35,27 @@ struct PersistedSession { sync_token: Option, } +#[derive(Clone, Debug)] +struct ChatMessage { + sender: OwnedUserId, + sender_name: String, + body: String, + is_self: bool, +} + +struct DaemonState { + own_user_id: OwnedUserId, + room_history: std::collections::HashMap>, + pending_rooms: Vec, + rate_budget: u32, + last_rate_reset: std::time::Instant, +} + const STATE_DIR: &str = "/persist/damocles-lab/state"; +const IDENTITY_DIR: &str = "/persist/damocles-lab/state/identity"; const CONFIG_PATH: &str = "/persist/damocles-lab/config.json"; +const MAX_HISTORY: usize = 20; +const RATE_LIMIT_PER_MIN: u32 = 2; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -43,6 +67,7 @@ async fn main() -> anyhow::Result<()> { let state_dir = Path::new(STATE_DIR); fs::create_dir_all(state_dir).await?; + fs::create_dir_all(Path::new(IDENTITY_DIR)).await?; let session_file = state_dir.join("session.json"); let db_path = state_dir.join("db"); @@ -54,7 +79,25 @@ async fn main() -> anyhow::Result<()> { (login(&config, &db_path, &session_file).await?, None) }; - sync(client, sync_token, &session_file).await + let own_user_id = client.user_id().context("not logged in")?.to_owned(); + tracing::info!(user = %own_user_id, "ready"); + + let state = Arc::new(Mutex::new(DaemonState { + own_user_id: own_user_id.clone(), + room_history: std::collections::HashMap::new(), + pending_rooms: Vec::new(), + rate_budget: RATE_LIMIT_PER_MIN, + last_rate_reset: std::time::Instant::now(), + })); + + // spawn the processor that invokes claude for pending messages + let processor_state = state.clone(); + let processor_client = client.clone(); + tokio::spawn(async move { + process_loop(processor_state, processor_client).await; + }); + + sync(client, sync_token, &session_file, state).await } async fn load_config() -> anyhow::Result { @@ -81,11 +124,7 @@ async fn restore_session(session_file: &Path) -> anyhow::Result<(Client, Option< Ok((client, session.sync_token)) } -async fn login( - config: &Config, - db_path: &Path, - session_file: &Path, -) -> anyhow::Result { +async fn login(config: &Config, db_path: &Path, session_file: &Path) -> anyhow::Result { tracing::info!(homeserver = %config.homeserver, user = %config.username, "logging in"); let client = Client::builder() @@ -121,6 +160,7 @@ async fn sync( client: Client, initial_sync_token: Option, session_file: &Path, + state: Arc>, ) -> anyhow::Result<()> { tracing::info!("initial sync (ignoring past messages)"); @@ -131,7 +171,7 @@ async fn sync( sync_settings = sync_settings.token(token); } - // initial sync loop - retries on transient errors + // initial sync - ignore old messages loop { match client.sync_once(sync_settings.clone()).await { Ok(response) => { @@ -147,9 +187,14 @@ async fn sync( tracing::info!("synced, listening for messages"); - client.add_event_handler(on_room_message); + // register event handler with shared state + client.add_event_handler(move |event: OriginalSyncRoomMessageEvent, room: Room| { + let state = state.clone(); + async move { + on_room_message(event, room, state).await; + } + }); - // main sync loop client.sync(sync_settings).await?; bail!("sync loop exited unexpectedly") @@ -163,7 +208,11 @@ async fn persist_sync_token(session_file: &Path, sync_token: String) -> anyhow:: Ok(()) } -async fn on_room_message(event: OriginalSyncRoomMessageEvent, room: Room) { +async fn on_room_message( + event: OriginalSyncRoomMessageEvent, + room: Room, + state: Arc>, +) { if room.state() != RoomState::Joined { return; } @@ -171,16 +220,246 @@ async fn on_room_message(event: OriginalSyncRoomMessageEvent, room: Room) { return; }; + let room_id = room.room_id().to_owned(); let room_name = room .display_name() .await .map(|n| n.to_string()) - .unwrap_or_else(|_| room.room_id().to_string()); + .unwrap_or_else(|_| room_id.to_string()); + + let mut state = state.lock().await; + let is_self = event.sender == state.own_user_id; + + let msg = ChatMessage { + sender: event.sender.clone(), + sender_name: event.sender.localpart().to_owned(), + body: text_content.body.clone(), + is_self, + }; tracing::info!( room = %room_name, sender = %event.sender, + self_msg = is_self, "{}", text_content.body ); + + // add to history + let history = state.room_history.entry(room_id.clone()).or_default(); + history.push(msg); + if history.len() > MAX_HISTORY { + history.drain(..history.len() - MAX_HISTORY); + } + + // only invoke claude for non-self messages + if !is_self && !state.pending_rooms.contains(&room_id) { + state.pending_rooms.push(room_id); + } +} + +async fn process_loop(state: Arc>, client: Client) { + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + let room_id = { + let mut state = state.lock().await; + + // reset rate budget every 60s + if state.last_rate_reset.elapsed() >= std::time::Duration::from_secs(60) { + state.rate_budget = RATE_LIMIT_PER_MIN; + state.last_rate_reset = std::time::Instant::now(); + } + + if state.rate_budget == 0 { + continue; + } + + state.pending_rooms.pop() + }; + + let Some(room_id) = room_id else { + continue; + }; + + let history = { + let state = state.lock().await; + state + .room_history + .get(&room_id) + .cloned() + .unwrap_or_default() + }; + + let room_name = client + .get_room(&room_id) + .and_then(|r| { + // can't easily get display_name synchronously, use room_id + Some(r.room_id().to_string()) + }) + .unwrap_or_else(|| room_id.to_string()); + + match invoke_claude(&room_id, &room_name, &history).await { + Ok(Some(response)) => { + if let Some(room) = client.get_room(&response.room) { + let content = RoomMessageEventContent::text_plain(&response.body); + match room.send(content).await { + Ok(_) => { + let mut state = state.lock().await; + state.rate_budget = state.rate_budget.saturating_sub(1); + tracing::info!( + room = %response.room, + "sent response ({} budget remaining)", + state.rate_budget + ); + } + Err(e) => tracing::error!("failed to send: {e}"), + } + } else { + tracing::warn!(room = %response.room, "target room not found"); + } + } + Ok(None) => { + tracing::debug!(room = %room_id, "claude chose to skip"); + } + Err(e) => { + tracing::error!(room = %room_id, "claude invocation failed: {e}"); + } + } + } +} + +struct ClaudeResponse { + room: OwnedRoomId, + body: String, +} + +async fn invoke_claude( + source_room: &OwnedRoomId, + room_name: &str, + history: &[ChatMessage], +) -> anyhow::Result> { + // build the prompt with conversation context + let mut prompt = String::new(); + prompt.push_str(&format!("[room: {} ({})]\n", source_room, room_name)); + prompt.push_str("[new messages below this line]\n"); + + for msg in history { + let prefix = if msg.is_self { "(you) " } else { "" }; + prompt.push_str(&format!("{}{}: {}\n", prefix, msg.sender, msg.body)); + } + + tracing::debug!("invoking claude with {} messages", history.len()); + + let output = tokio::process::Command::new("claude") + .args([ + "--print", + "--bare", + "--add-dir", + IDENTITY_DIR, + "--allowedTools", + "Read Edit Write Glob Grep", + ]) + .current_dir(IDENTITY_DIR) + .arg(&prompt) + .output() + .await + .context("failed to run claude")?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + bail!("claude exited with {}: {}", output.status, stderr); + } + + let raw = String::from_utf8_lossy(&output.stdout).to_string(); + parse_response(&raw, source_room) +} + +fn parse_response(raw: &str, default_room: &OwnedRoomId) -> anyhow::Result> { + let trimmed = raw.trim(); + + // check for frontmatter + if trimmed.starts_with("---") { + let parts: Vec<&str> = trimmed.splitn(3, "---").collect(); + if parts.len() >= 3 { + let frontmatter = parts[1].trim(); + let body = parts[2].trim(); + + // check for skip + if frontmatter.contains("skip: true") || frontmatter.contains("skip:true") { + return Ok(None); + } + + // extract room override + let room = frontmatter + .lines() + .find(|l| l.starts_with("room:")) + .and_then(|l| l.strip_prefix("room:")) + .and_then(|r| r.trim().parse().ok()) + .unwrap_or_else(|| default_room.clone()); + + if body.is_empty() { + return Ok(None); + } + + return Ok(Some(ClaudeResponse { + room, + body: body.to_owned(), + })); + } + } + + // no frontmatter - treat entire output as message to default room + if trimmed.is_empty() { + return Ok(None); + } + + Ok(Some(ClaudeResponse { + room: default_room.clone(), + body: trimmed.to_owned(), + })) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_room() -> OwnedRoomId { + "!test:example.com".parse().unwrap() + } + + #[test] + fn parse_frontmatter_response() { + let raw = "---\nroom: !other:server\n---\nhello world"; + let resp = parse_response(raw, &test_room()).unwrap().unwrap(); + assert_eq!(resp.room.as_str(), "!other:server"); + assert_eq!(resp.body, "hello world"); + } + + #[test] + fn parse_skip_response() { + let raw = "---\nskip: true\n---\n"; + assert!(parse_response(raw, &test_room()).unwrap().is_none()); + } + + #[test] + fn parse_plain_response() { + let raw = "just a message"; + let resp = parse_response(raw, &test_room()).unwrap().unwrap(); + assert_eq!(resp.room, test_room()); + assert_eq!(resp.body, "just a message"); + } + + #[test] + fn parse_empty_response() { + assert!(parse_response("", &test_room()).unwrap().is_none()); + assert!(parse_response(" \n ", &test_room()).unwrap().is_none()); + } + + #[test] + fn parse_default_room() { + let raw = "---\n---\nhello"; + let resp = parse_response(raw, &test_room()).unwrap().unwrap(); + assert_eq!(resp.room, test_room()); + } }