mod paths; use std::fmt::Write as _; use std::path::Path; use std::sync::Arc; use anyhow::{Context, bail}; use matrix_sdk::{ Client, Room, RoomState, authentication::matrix::MatrixSession, config::SyncSettings, ruma::{ OwnedRoomId, OwnedUserId, api::client::filter::FilterDefinition, events::room::message::{ MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent, }, }, }; use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::sync::Mutex; use tracing_subscriber::EnvFilter; #[derive(Debug, Deserialize)] struct Config { homeserver: String, username: String, password: String, rate_limit_per_min: Option, } #[derive(Debug, Serialize, Deserialize)] struct PersistedSession { homeserver: String, db_path: std::path::PathBuf, user_session: MatrixSession, #[serde(skip_serializing_if = "Option::is_none")] sync_token: Option, } #[derive(Clone, Debug)] struct ChatMessage { sender: OwnedUserId, body: String, is_self: bool, } struct DaemonState { own_user_id: OwnedUserId, room_history: std::collections::HashMap>, /// For each room, the index in room_history up to which messages have been /// shown to Claude. Messages at indexes >= this value are "new". seen_count: std::collections::HashMap, pending_rooms: Vec, rate_budget: u32, rate_limit_per_min: u32, last_rate_reset: std::time::Instant, } const MAX_HISTORY: usize = 20; const DEFAULT_RATE_LIMIT_PER_MIN: u32 = 1; #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .init(); tracing::info!("damocles-daemon starting"); let state_dir = paths::state_dir(); fs::create_dir_all(&state_dir).await?; fs::create_dir_all(paths::identity_dir()).await?; fs::create_dir_all(state_dir.join("rooms")).await?; fs::create_dir_all(state_dir.join("people")).await?; let session_file = paths::session_path(); let db_path = paths::db_path(); let config = load_config().await?; let rate_limit_per_min = config .rate_limit_per_min .unwrap_or(DEFAULT_RATE_LIMIT_PER_MIN); let (client, sync_token) = if session_file.exists() { restore_session(&session_file).await? } else { (login(&config, &db_path, &session_file).await?, None) }; let own_user_id = client.user_id().context("not logged in")?.to_owned(); tracing::info!(user = %own_user_id, rate_limit = rate_limit_per_min, "ready"); let state = Arc::new(Mutex::new(DaemonState { own_user_id, room_history: std::collections::HashMap::new(), seen_count: std::collections::HashMap::new(), pending_rooms: Vec::new(), rate_budget: rate_limit_per_min, rate_limit_per_min, last_rate_reset: std::time::Instant::now(), })); let processor_state = state.clone(); let processor_client = client.clone(); tokio::spawn(async move { process_loop(processor_state, processor_client).await; }); sync(client, sync_token, &session_file, state).await } async fn load_config() -> anyhow::Result { let data = fs::read_to_string(paths::config_path()) .await .context("failed to read config.json")?; serde_json::from_str(&data).context("failed to parse config.json") } async fn restore_session(session_file: &Path) -> anyhow::Result<(Client, Option)> { let data = fs::read_to_string(session_file).await?; let session: PersistedSession = serde_json::from_str(&data)?; tracing::info!(user = %session.user_session.meta.user_id, "restoring session"); let client = Client::builder() .homeserver_url(&session.homeserver) .sqlite_store(paths::db_path(), None) .build() .await?; client.restore_session(session.user_session).await?; Ok((client, session.sync_token)) } async fn login(config: &Config, db_path: &Path, session_file: &Path) -> anyhow::Result { tracing::info!(homeserver = %config.homeserver, user = %config.username, "logging in"); let client = Client::builder() .homeserver_url(&config.homeserver) .sqlite_store(db_path, None) .build() .await?; client .matrix_auth() .login_username(&config.username, &config.password) .initial_device_display_name("damocles-daemon") .await?; let user_session = client .matrix_auth() .session() .context("no session after login")?; let persisted = PersistedSession { homeserver: config.homeserver.clone(), db_path: db_path.to_owned(), user_session, sync_token: None, }; fs::write(session_file, serde_json::to_string_pretty(&persisted)?).await?; tracing::info!("session persisted"); Ok(client) } async fn sync( client: Client, initial_sync_token: Option, session_file: &Path, state: Arc>, ) -> anyhow::Result<()> { tracing::info!("initial sync (ignoring past messages)"); let filter = FilterDefinition::with_lazy_loading(); let mut sync_settings = SyncSettings::default().filter(filter.into()); if let Some(token) = initial_sync_token { sync_settings = sync_settings.token(token); } loop { match client.sync_once(sync_settings.clone()).await { Ok(response) => { sync_settings = sync_settings.token(response.next_batch.clone()); persist_sync_token(session_file, response.next_batch).await?; break; } Err(e) => { tracing::warn!("initial sync failed, retrying: {e}"); } } } tracing::info!("synced, listening for messages"); client.add_event_handler(move |event: OriginalSyncRoomMessageEvent, room: Room| { let state = state.clone(); async move { on_room_message(event, room, state).await; } }); client.sync(sync_settings).await?; bail!("sync loop exited unexpectedly") } async fn persist_sync_token(session_file: &Path, sync_token: String) -> anyhow::Result<()> { let data = fs::read_to_string(session_file).await?; let mut session: PersistedSession = serde_json::from_str(&data)?; session.sync_token = Some(sync_token); fs::write(session_file, serde_json::to_string_pretty(&session)?).await?; Ok(()) } async fn on_room_message( event: OriginalSyncRoomMessageEvent, room: Room, state: Arc>, ) { if room.state() != RoomState::Joined { return; } let MessageType::Text(text_content) = &event.content.msgtype else { return; }; let room_id = room.room_id().to_owned(); let room_name = room .display_name() .await .map_or_else(|_| room_id.to_string(), |n| n.to_string()); let mut state = state.lock().await; let is_self = event.sender == state.own_user_id; let msg = ChatMessage { sender: event.sender.clone(), body: text_content.body.clone(), is_self, }; tracing::info!( room = %room_name, sender = %event.sender, self_msg = is_self, "{}", text_content.body ); let history = state.room_history.entry(room_id.clone()).or_default(); history.push(msg); if history.len() > MAX_HISTORY { history.drain(..history.len() - MAX_HISTORY); } if !is_self && !state.pending_rooms.contains(&room_id) { state.pending_rooms.push(room_id); } } async fn process_loop(state: Arc>, client: Client) { loop { tokio::time::sleep(std::time::Duration::from_secs(1)).await; let room_id = { let mut state = state.lock().await; if state.last_rate_reset.elapsed() >= std::time::Duration::from_secs(60) { state.rate_budget = state.rate_limit_per_min; state.last_rate_reset = std::time::Instant::now(); } if state.rate_budget == 0 { continue; } state.pending_rooms.pop() }; let Some(room_id) = room_id else { continue; }; let (history, seen_idx) = { let state = state.lock().await; let history = state .room_history .get(&room_id) .cloned() .unwrap_or_default(); let seen = state.seen_count.get(&room_id).copied().unwrap_or(0); (history, seen) }; let room_name = client .get_room(&room_id) .map_or_else(|| room_id.to_string(), |r| r.room_id().to_string()); match invoke_claude(&room_id, &room_name, &history, seen_idx).await { Ok(Some(response)) => { if let Some(room) = client.get_room(&response.room) { let content = RoomMessageEventContent::text_plain(&response.body); match room.send(content).await { Ok(_) => { let mut state = state.lock().await; state.rate_budget = state.rate_budget.saturating_sub(1); // Mark current history as seen state.seen_count.insert(room_id.clone(), history.len()); tracing::info!( room = %response.room, "sent response ({} budget remaining)", state.rate_budget ); } Err(e) => tracing::error!("failed to send: {e}"), } } else { tracing::warn!(room = %response.room, "target room not found"); } } Ok(None) => { tracing::debug!(room = %room_id, "claude chose to skip"); // Even on skip, mark messages as seen so we don't reprocess let mut state = state.lock().await; state.seen_count.insert(room_id.clone(), history.len()); } Err(e) => { tracing::error!(room = %room_id, "claude invocation failed: {e}"); } } } } struct ClaudeResponse { room: OwnedRoomId, body: String, } async fn invoke_claude( source_room: &OwnedRoomId, room_name: &str, history: &[ChatMessage], seen_idx: usize, ) -> anyhow::Result> { let identity_dir = paths::identity_dir(); let identity_str = identity_dir.to_string_lossy(); let mut prompt = String::new(); writeln!(prompt, "[room_id: {source_room}]").unwrap(); writeln!(prompt, "[room_name: {room_name}]").unwrap(); writeln!( prompt, "[room notes path: ../rooms/{source_room}/notes.md (create dir if needed)]" ) .unwrap(); // Collect unique non-self senders for per-person note discovery let mut senders: Vec<&OwnedUserId> = history .iter() .filter(|m| !m.is_self) .map(|m| &m.sender) .collect(); senders.sort(); senders.dedup(); if !senders.is_empty() { writeln!( prompt, "\n[people in this room — check ../people//notes.md for each]" ) .unwrap(); for s in &senders { writeln!(prompt, " {s}").unwrap(); } } let seen = seen_idx.min(history.len()); let (old, new) = history.split_at(seen); if !old.is_empty() { writeln!(prompt, "\n[previously seen messages — for context]").unwrap(); for msg in old { let prefix = if msg.is_self { "(you) " } else { "" }; writeln!(prompt, "{prefix}{}: {}", msg.sender, msg.body).unwrap(); } } writeln!(prompt, "\n[new messages — respond to these]").unwrap(); if new.is_empty() { writeln!(prompt, "(none)").unwrap(); } else { for msg in new { let prefix = if msg.is_self { "(you) " } else { "" }; writeln!(prompt, "{prefix}{}: {}", msg.sender, msg.body).unwrap(); } } tracing::info!("invoking claude: {} new, {} seen", new.len(), old.len()); tracing::trace!("full prompt:\n{prompt}"); use tokio::process::Command; let mut cmd = Command::new("claude"); cmd.args([ "--print", "--add-dir", &identity_str, "--allowedTools", "Read Edit Write Glob Grep", "-p", &prompt, ]); cmd.current_dir(&identity_dir); cmd.stdin(std::process::Stdio::null()); let output = cmd.output().await.context("failed to run claude")?; let stderr = String::from_utf8_lossy(&output.stderr); let stdout = String::from_utf8_lossy(&output.stdout); if !output.status.success() { bail!( "claude exited with {}:\nstdout: {}\nstderr: {}", output.status, stdout, stderr ); } if !stderr.is_empty() { tracing::warn!("claude stderr: {stderr}"); } let raw = String::from_utf8_lossy(&output.stdout).to_string(); Ok(parse_response(&raw, source_room)) } fn parse_response(raw: &str, default_room: &OwnedRoomId) -> Option { let trimmed = raw.trim(); if trimmed.starts_with("---") { let parts: Vec<&str> = trimmed.splitn(3, "---").collect(); if parts.len() >= 3 { let frontmatter = parts[1].trim(); let body = parts[2].trim(); if frontmatter.contains("skip: true") || frontmatter.contains("skip:true") { return None; } let room = frontmatter .lines() .find(|l| l.starts_with("room:")) .and_then(|l| l.strip_prefix("room:")) .and_then(|r| r.trim().parse().ok()) .unwrap_or_else(|| default_room.clone()); if body.is_empty() { return None; } return Some(ClaudeResponse { room, body: body.to_owned(), }); } } if trimmed.is_empty() { return None; } Some(ClaudeResponse { room: default_room.clone(), body: trimmed.to_owned(), }) } #[cfg(test)] mod tests { use super::*; fn test_room() -> OwnedRoomId { "!test:example.com".parse().unwrap() } #[test] fn parse_frontmatter_response() { let raw = "---\nroom: !other:server\n---\nhello world"; let resp = parse_response(raw, &test_room()).unwrap(); assert_eq!(resp.room.as_str(), "!other:server"); assert_eq!(resp.body, "hello world"); } #[test] fn parse_skip_response() { let raw = "---\nskip: true\n---\n"; assert!(parse_response(raw, &test_room()).is_none()); } #[test] fn parse_plain_response() { let raw = "just a message"; let resp = parse_response(raw, &test_room()).unwrap(); assert_eq!(resp.room, test_room()); assert_eq!(resp.body, "just a message"); } #[test] fn parse_empty_response() { assert!(parse_response("", &test_room()).is_none()); assert!(parse_response(" \n ", &test_room()).is_none()); } #[test] fn parse_default_room() { let raw = "---\n---\nhello"; let resp = parse_response(raw, &test_room()).unwrap(); assert_eq!(resp.room, test_room()); } }