event handler, claude bridge, frontmatter parser, rate limiter, full message loop

This commit is contained in:
Damocles 2026-04-29 22:03:03 +02:00
parent 1cc02e6f36
commit 4c17146b6f
2 changed files with 297 additions and 14 deletions

View file

@ -53,9 +53,13 @@ async fn main() -> anyhow::Result<()> {
client.restore_session(session.user_session).await?; client.restore_session(session.user_session).await?;
// need at least one sync so the client knows about joined rooms // need at least one sync so the client knows about joined rooms
client.sync_once(matrix_sdk::config::SyncSettings::default()).await?; client
.sync_once(matrix_sdk::config::SyncSettings::default())
.await?;
let room = client.get_room(&room_id).context("room not found - has the bot joined it?")?; let room = client
.get_room(&room_id)
.context("room not found - has the bot joined it?")?;
let content = RoomMessageEventContent::text_plain(&message); let content = RoomMessageEventContent::text_plain(&message);
room.send(content).await?; room.send(content).await?;

View file

@ -1,4 +1,5 @@
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{Context, bail}; use anyhow::{Context, bail};
use matrix_sdk::{ use matrix_sdk::{
@ -6,12 +7,16 @@ use matrix_sdk::{
authentication::matrix::MatrixSession, authentication::matrix::MatrixSession,
config::SyncSettings, config::SyncSettings,
ruma::{ ruma::{
OwnedRoomId, OwnedUserId,
api::client::filter::FilterDefinition, api::client::filter::FilterDefinition,
events::room::message::{MessageType, OriginalSyncRoomMessageEvent}, events::room::message::{
MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent,
},
}, },
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::fs; use tokio::fs;
use tokio::sync::Mutex;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
@ -30,8 +35,27 @@ struct PersistedSession {
sync_token: Option<String>, sync_token: Option<String>,
} }
#[derive(Clone, Debug)]
struct ChatMessage {
sender: OwnedUserId,
sender_name: String,
body: String,
is_self: bool,
}
struct DaemonState {
own_user_id: OwnedUserId,
room_history: std::collections::HashMap<OwnedRoomId, Vec<ChatMessage>>,
pending_rooms: Vec<OwnedRoomId>,
rate_budget: u32,
last_rate_reset: std::time::Instant,
}
const STATE_DIR: &str = "/persist/damocles-lab/state"; const STATE_DIR: &str = "/persist/damocles-lab/state";
const IDENTITY_DIR: &str = "/persist/damocles-lab/state/identity";
const CONFIG_PATH: &str = "/persist/damocles-lab/config.json"; const CONFIG_PATH: &str = "/persist/damocles-lab/config.json";
const MAX_HISTORY: usize = 20;
const RATE_LIMIT_PER_MIN: u32 = 2;
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@ -43,6 +67,7 @@ async fn main() -> anyhow::Result<()> {
let state_dir = Path::new(STATE_DIR); let state_dir = Path::new(STATE_DIR);
fs::create_dir_all(state_dir).await?; fs::create_dir_all(state_dir).await?;
fs::create_dir_all(Path::new(IDENTITY_DIR)).await?;
let session_file = state_dir.join("session.json"); let session_file = state_dir.join("session.json");
let db_path = state_dir.join("db"); let db_path = state_dir.join("db");
@ -54,7 +79,25 @@ async fn main() -> anyhow::Result<()> {
(login(&config, &db_path, &session_file).await?, None) (login(&config, &db_path, &session_file).await?, None)
}; };
sync(client, sync_token, &session_file).await let own_user_id = client.user_id().context("not logged in")?.to_owned();
tracing::info!(user = %own_user_id, "ready");
let state = Arc::new(Mutex::new(DaemonState {
own_user_id: own_user_id.clone(),
room_history: std::collections::HashMap::new(),
pending_rooms: Vec::new(),
rate_budget: RATE_LIMIT_PER_MIN,
last_rate_reset: std::time::Instant::now(),
}));
// spawn the processor that invokes claude for pending messages
let processor_state = state.clone();
let processor_client = client.clone();
tokio::spawn(async move {
process_loop(processor_state, processor_client).await;
});
sync(client, sync_token, &session_file, state).await
} }
async fn load_config() -> anyhow::Result<Config> { async fn load_config() -> anyhow::Result<Config> {
@ -81,11 +124,7 @@ async fn restore_session(session_file: &Path) -> anyhow::Result<(Client, Option<
Ok((client, session.sync_token)) Ok((client, session.sync_token))
} }
async fn login( async fn login(config: &Config, db_path: &Path, session_file: &Path) -> anyhow::Result<Client> {
config: &Config,
db_path: &Path,
session_file: &Path,
) -> anyhow::Result<Client> {
tracing::info!(homeserver = %config.homeserver, user = %config.username, "logging in"); tracing::info!(homeserver = %config.homeserver, user = %config.username, "logging in");
let client = Client::builder() let client = Client::builder()
@ -121,6 +160,7 @@ async fn sync(
client: Client, client: Client,
initial_sync_token: Option<String>, initial_sync_token: Option<String>,
session_file: &Path, session_file: &Path,
state: Arc<Mutex<DaemonState>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
tracing::info!("initial sync (ignoring past messages)"); tracing::info!("initial sync (ignoring past messages)");
@ -131,7 +171,7 @@ async fn sync(
sync_settings = sync_settings.token(token); sync_settings = sync_settings.token(token);
} }
// initial sync loop - retries on transient errors // initial sync - ignore old messages
loop { loop {
match client.sync_once(sync_settings.clone()).await { match client.sync_once(sync_settings.clone()).await {
Ok(response) => { Ok(response) => {
@ -147,9 +187,14 @@ async fn sync(
tracing::info!("synced, listening for messages"); tracing::info!("synced, listening for messages");
client.add_event_handler(on_room_message); // register event handler with shared state
client.add_event_handler(move |event: OriginalSyncRoomMessageEvent, room: Room| {
let state = state.clone();
async move {
on_room_message(event, room, state).await;
}
});
// main sync loop
client.sync(sync_settings).await?; client.sync(sync_settings).await?;
bail!("sync loop exited unexpectedly") bail!("sync loop exited unexpectedly")
@ -163,7 +208,11 @@ async fn persist_sync_token(session_file: &Path, sync_token: String) -> anyhow::
Ok(()) Ok(())
} }
async fn on_room_message(event: OriginalSyncRoomMessageEvent, room: Room) { async fn on_room_message(
event: OriginalSyncRoomMessageEvent,
room: Room,
state: Arc<Mutex<DaemonState>>,
) {
if room.state() != RoomState::Joined { if room.state() != RoomState::Joined {
return; return;
} }
@ -171,16 +220,246 @@ async fn on_room_message(event: OriginalSyncRoomMessageEvent, room: Room) {
return; return;
}; };
let room_id = room.room_id().to_owned();
let room_name = room let room_name = room
.display_name() .display_name()
.await .await
.map(|n| n.to_string()) .map(|n| n.to_string())
.unwrap_or_else(|_| room.room_id().to_string()); .unwrap_or_else(|_| room_id.to_string());
let mut state = state.lock().await;
let is_self = event.sender == state.own_user_id;
let msg = ChatMessage {
sender: event.sender.clone(),
sender_name: event.sender.localpart().to_owned(),
body: text_content.body.clone(),
is_self,
};
tracing::info!( tracing::info!(
room = %room_name, room = %room_name,
sender = %event.sender, sender = %event.sender,
self_msg = is_self,
"{}", "{}",
text_content.body text_content.body
); );
// add to history
let history = state.room_history.entry(room_id.clone()).or_default();
history.push(msg);
if history.len() > MAX_HISTORY {
history.drain(..history.len() - MAX_HISTORY);
}
// only invoke claude for non-self messages
if !is_self && !state.pending_rooms.contains(&room_id) {
state.pending_rooms.push(room_id);
}
}
async fn process_loop(state: Arc<Mutex<DaemonState>>, client: Client) {
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let room_id = {
let mut state = state.lock().await;
// reset rate budget every 60s
if state.last_rate_reset.elapsed() >= std::time::Duration::from_secs(60) {
state.rate_budget = RATE_LIMIT_PER_MIN;
state.last_rate_reset = std::time::Instant::now();
}
if state.rate_budget == 0 {
continue;
}
state.pending_rooms.pop()
};
let Some(room_id) = room_id else {
continue;
};
let history = {
let state = state.lock().await;
state
.room_history
.get(&room_id)
.cloned()
.unwrap_or_default()
};
let room_name = client
.get_room(&room_id)
.and_then(|r| {
// can't easily get display_name synchronously, use room_id
Some(r.room_id().to_string())
})
.unwrap_or_else(|| room_id.to_string());
match invoke_claude(&room_id, &room_name, &history).await {
Ok(Some(response)) => {
if let Some(room) = client.get_room(&response.room) {
let content = RoomMessageEventContent::text_plain(&response.body);
match room.send(content).await {
Ok(_) => {
let mut state = state.lock().await;
state.rate_budget = state.rate_budget.saturating_sub(1);
tracing::info!(
room = %response.room,
"sent response ({} budget remaining)",
state.rate_budget
);
}
Err(e) => tracing::error!("failed to send: {e}"),
}
} else {
tracing::warn!(room = %response.room, "target room not found");
}
}
Ok(None) => {
tracing::debug!(room = %room_id, "claude chose to skip");
}
Err(e) => {
tracing::error!(room = %room_id, "claude invocation failed: {e}");
}
}
}
}
struct ClaudeResponse {
room: OwnedRoomId,
body: String,
}
async fn invoke_claude(
source_room: &OwnedRoomId,
room_name: &str,
history: &[ChatMessage],
) -> anyhow::Result<Option<ClaudeResponse>> {
// build the prompt with conversation context
let mut prompt = String::new();
prompt.push_str(&format!("[room: {} ({})]\n", source_room, room_name));
prompt.push_str("[new messages below this line]\n");
for msg in history {
let prefix = if msg.is_self { "(you) " } else { "" };
prompt.push_str(&format!("{}{}: {}\n", prefix, msg.sender, msg.body));
}
tracing::debug!("invoking claude with {} messages", history.len());
let output = tokio::process::Command::new("claude")
.args([
"--print",
"--bare",
"--add-dir",
IDENTITY_DIR,
"--allowedTools",
"Read Edit Write Glob Grep",
])
.current_dir(IDENTITY_DIR)
.arg(&prompt)
.output()
.await
.context("failed to run claude")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
bail!("claude exited with {}: {}", output.status, stderr);
}
let raw = String::from_utf8_lossy(&output.stdout).to_string();
parse_response(&raw, source_room)
}
fn parse_response(raw: &str, default_room: &OwnedRoomId) -> anyhow::Result<Option<ClaudeResponse>> {
let trimmed = raw.trim();
// check for frontmatter
if trimmed.starts_with("---") {
let parts: Vec<&str> = trimmed.splitn(3, "---").collect();
if parts.len() >= 3 {
let frontmatter = parts[1].trim();
let body = parts[2].trim();
// check for skip
if frontmatter.contains("skip: true") || frontmatter.contains("skip:true") {
return Ok(None);
}
// extract room override
let room = frontmatter
.lines()
.find(|l| l.starts_with("room:"))
.and_then(|l| l.strip_prefix("room:"))
.and_then(|r| r.trim().parse().ok())
.unwrap_or_else(|| default_room.clone());
if body.is_empty() {
return Ok(None);
}
return Ok(Some(ClaudeResponse {
room,
body: body.to_owned(),
}));
}
}
// no frontmatter - treat entire output as message to default room
if trimmed.is_empty() {
return Ok(None);
}
Ok(Some(ClaudeResponse {
room: default_room.clone(),
body: trimmed.to_owned(),
}))
}
#[cfg(test)]
mod tests {
use super::*;
fn test_room() -> OwnedRoomId {
"!test:example.com".parse().unwrap()
}
#[test]
fn parse_frontmatter_response() {
let raw = "---\nroom: !other:server\n---\nhello world";
let resp = parse_response(raw, &test_room()).unwrap().unwrap();
assert_eq!(resp.room.as_str(), "!other:server");
assert_eq!(resp.body, "hello world");
}
#[test]
fn parse_skip_response() {
let raw = "---\nskip: true\n---\n";
assert!(parse_response(raw, &test_room()).unwrap().is_none());
}
#[test]
fn parse_plain_response() {
let raw = "just a message";
let resp = parse_response(raw, &test_room()).unwrap().unwrap();
assert_eq!(resp.room, test_room());
assert_eq!(resp.body, "just a message");
}
#[test]
fn parse_empty_response() {
assert!(parse_response("", &test_room()).unwrap().is_none());
assert!(parse_response(" \n ", &test_room()).unwrap().is_none());
}
#[test]
fn parse_default_room() {
let raw = "---\n---\nhello";
let resp = parse_response(raw, &test_room()).unwrap().unwrap();
assert_eq!(resp.room, test_room());
}
} }