From 09259ee5fa2bd699a691f23fcaf622f5d1a9c2e5 Mon Sep 17 00:00:00 2001 From: Damocles Date: Fri, 1 May 2026 02:15:03 +0200 Subject: [PATCH] split main.rs into types, timeline, claude, handlers, session modules --- src/claude.rs | 419 +++++++++++++ src/handlers.rs | 217 +++++++ src/main.rs | 1519 ++++++----------------------------------------- src/session.rs | 64 ++ src/timeline.rs | 328 ++++++++++ src/types.rs | 112 ++++ 6 files changed, 1337 insertions(+), 1322 deletions(-) create mode 100644 src/claude.rs create mode 100644 src/handlers.rs create mode 100644 src/session.rs create mode 100644 src/timeline.rs create mode 100644 src/types.rs diff --git a/src/claude.rs b/src/claude.rs new file mode 100644 index 0000000..61c5d67 --- /dev/null +++ b/src/claude.rs @@ -0,0 +1,419 @@ +use std::collections::HashMap; +use std::fmt::Write as _; + +use anyhow::{Context, bail}; +use matrix_sdk::ruma::{OwnedEventId, OwnedRoomId, OwnedUserId}; + +use crate::paths; +use crate::timeline::render_timeline_item; +use crate::types::{ClaudeDoc, ResponseTarget, TimelineItem}; + +pub async fn invoke_claude( + source_room: &OwnedRoomId, + room_name: &str, + timeline: &[TimelineItem], + seen_idx: usize, + model: &str, + read_markers: &HashMap>, +) -> anyhow::Result> { + let identity_dir = paths::identity_dir(); + let identity_str = identity_dir.to_string_lossy(); + + let mut prompt = String::new(); + writeln!(prompt, "[room_id: {source_room}]").unwrap(); + writeln!(prompt, "[room_name: {room_name}]").unwrap(); + writeln!( + prompt, + "[room notes path: ../rooms/{source_room}/notes.md (create dir if needed)]" + ) + .unwrap(); + + // Collect unique non-self participants (message senders + reactors) + let mut senders: Vec<&OwnedUserId> = timeline + .iter() + .filter(|t| !t.is_self()) + .map(TimelineItem::sender) + .collect(); + senders.sort(); + senders.dedup(); + if !senders.is_empty() { + writeln!( + prompt, + "\n[people in this room - check ../people//notes.md for each]" + ) + .unwrap(); + for s in &senders { + writeln!(prompt, " {s}").unwrap(); + } + } + + let seen = seen_idx.min(timeline.len()); + let (old, new) = timeline.split_at(seen); + + if !old.is_empty() { + writeln!(prompt, "\n[previously seen events - for context]").unwrap(); + for item in old { + render_timeline_item(&mut prompt, item, read_markers); + } + } + + writeln!(prompt, "\n[new events - respond to these]").unwrap(); + if new.is_empty() { + writeln!(prompt, "(none)").unwrap(); + } else { + for item in new { + render_timeline_item(&mut prompt, item, read_markers); + } + } + + let new_msg_count = new + .iter() + .filter(|t| matches!(t, TimelineItem::Message { .. })) + .count(); + let new_react_count = new.len() - new_msg_count; + tracing::info!( + "invoking claude: {} new ({} msg + {} react), {} seen", + new.len(), + new_msg_count, + new_react_count, + old.len() + ); + tracing::trace!("full prompt:\n{prompt}"); + + use tokio::process::Command; + let mut cmd = Command::new("claude"); + cmd.args([ + "--print", + "--model", + model, + "--add-dir", + &identity_str, + "--allowedTools", + "Read Edit Write Glob Grep", + "-p", + &prompt, + ]); + cmd.current_dir(&identity_dir); + cmd.stdin(std::process::Stdio::null()); + let output = cmd.output().await.context("failed to run claude")?; + + let stderr = String::from_utf8_lossy(&output.stderr); + let stdout = String::from_utf8_lossy(&output.stdout); + + if !output.status.success() { + bail!( + "claude exited with {}:\nstdout: {}\nstderr: {}", + output.status, + stdout, + stderr + ); + } + + if !stderr.is_empty() { + tracing::warn!("claude stderr: {stderr}"); + } + + let raw = String::from_utf8_lossy(&output.stdout).to_string(); + Ok(parse_response(&raw, source_room)) +} + +/// Parse Claude's stdout into a list of documents. +/// +/// Format: each doc starts with a line `=== [arg]`. Body is everything +/// until the next `===` line or EOF. Types: +/// - `=== thought` -> `ClaudeDoc::Thought` (logged, not sent) +/// - `=== room []` -> `ClaudeDoc::Message` to that room (or source room if no arg) +/// - `=== dm ` -> `ClaudeDoc::Message` as DM +/// - `=== skip` -> `ClaudeDoc::Skip` (no-op) +/// +/// Anything before the first `===` line is treated as a preamble thought. +/// Bare text with no `===` is treated as a single message to default_room. +pub fn parse_response(raw: &str, default_room: &OwnedRoomId) -> Vec { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return Vec::new(); + } + + let mut docs = Vec::new(); + let mut current_header: Option = None; + let mut current_body = String::new(); + let mut preamble = String::new(); + + for line in trimmed.lines() { + if let Some(header) = line.strip_prefix("===") { + if let Some(h) = current_header.take() { + if let Some(doc) = build_doc(&h, current_body.trim(), default_room) { + docs.push(doc); + } + current_body.clear(); + } else { + let p = preamble.trim(); + if !p.is_empty() { + docs.push(ClaudeDoc::Thought(p.to_owned())); + } + preamble.clear(); + } + current_header = Some(header.trim().to_owned()); + } else if current_header.is_some() { + current_body.push_str(line); + current_body.push('\n'); + } else { + preamble.push_str(line); + preamble.push('\n'); + } + } + + if let Some(h) = current_header { + if let Some(doc) = build_doc(&h, current_body.trim(), default_room) { + docs.push(doc); + } + } else { + let p = preamble.trim(); + if !p.is_empty() { + docs.push(ClaudeDoc::Message { + target: ResponseTarget::Room(default_room.clone()), + body: p.to_owned(), + }); + } + } + + docs +} + +fn build_doc(header: &str, body: &str, default_room: &OwnedRoomId) -> Option { + let mut parts = header.splitn(2, char::is_whitespace); + let kind = parts.next().unwrap_or("").trim(); + let arg = parts.next().unwrap_or("").trim(); + + match kind { + "skip" => Some(ClaudeDoc::Skip), + "thought" => { + if body.is_empty() { + None + } else { + Some(ClaudeDoc::Thought(body.to_owned())) + } + } + "room" => { + if body.is_empty() { + return None; + } + let target = if arg.is_empty() { + ResponseTarget::Room(default_room.clone()) + } else { + match arg.parse::() { + Ok(rid) => ResponseTarget::Room(rid), + Err(_) => return None, + } + }; + Some(ClaudeDoc::Message { + target, + body: body.to_owned(), + }) + } + "dm" => { + if body.is_empty() { + return None; + } + match arg.parse::() { + Ok(uid) => Some(ClaudeDoc::Message { + target: ResponseTarget::Dm(uid), + body: body.to_owned(), + }), + Err(_) => None, + } + } + "react" => { + let mut header_parts = arg.splitn(2, char::is_whitespace); + let eid_arg = header_parts.next().unwrap_or("").trim(); + let key_in_header = header_parts.next().unwrap_or("").trim(); + if eid_arg.is_empty() { + return None; + } + let key = if !key_in_header.is_empty() { + key_in_header.to_owned() + } else if !body.is_empty() { + body.to_owned() + } else { + return None; + }; + Some(ClaudeDoc::Reaction { + target_id_arg: eid_arg.to_owned(), + key, + }) + } + _ => { + if body.is_empty() { + None + } else { + Some(ClaudeDoc::Thought(format!( + "[unknown header '{header}'] {body}" + ))) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_room() -> OwnedRoomId { + "!test:example.com".parse().unwrap() + } + + fn first_message(docs: &[ClaudeDoc]) -> (&ResponseTarget, &str) { + for d in docs { + if let ClaudeDoc::Message { target, body } = d { + return (target, body.as_str()); + } + } + panic!("no message doc found"); + } + + fn assert_room(target: &ResponseTarget, expected: &str) { + match target { + ResponseTarget::Room(r) => assert_eq!(r.as_str(), expected), + ResponseTarget::Dm(_) => panic!("expected room target, got dm"), + } + } + + #[test] + fn parse_room_with_arg() { + let raw = "=== room !other:server\nhello world"; + let docs = parse_response(raw, &test_room()); + let (target, body) = first_message(&docs); + assert_room(target, "!other:server"); + assert_eq!(body, "hello world"); + } + + #[test] + fn parse_room_no_arg_uses_default() { + let raw = "=== room\nhi"; + let docs = parse_response(raw, &test_room()); + let (target, _) = first_message(&docs); + assert_room(target, "!test:example.com"); + } + + #[test] + fn parse_skip() { + let raw = "=== skip"; + let docs = parse_response(raw, &test_room()); + assert_eq!(docs.len(), 1); + assert!(matches!(docs[0], ClaudeDoc::Skip)); + } + + #[test] + fn parse_plain_text_no_header() { + let raw = "just a message"; + let docs = parse_response(raw, &test_room()); + let (target, body) = first_message(&docs); + assert_room(target, "!test:example.com"); + assert_eq!(body, "just a message"); + } + + #[test] + fn parse_empty() { + assert!(parse_response("", &test_room()).is_empty()); + assert!(parse_response(" \n ", &test_room()).is_empty()); + } + + #[test] + fn parse_dm() { + let raw = "=== dm @alice:example.com\nhi alice"; + let docs = parse_response(raw, &test_room()); + let (target, body) = first_message(&docs); + match target { + ResponseTarget::Dm(u) => assert_eq!(u.as_str(), "@alice:example.com"), + ResponseTarget::Room(_) => panic!("expected dm target"), + } + assert_eq!(body, "hi alice"); + } + + #[test] + fn parse_thought() { + let raw = "=== thought\nthinking about whether to reply..."; + let docs = parse_response(raw, &test_room()); + assert_eq!(docs.len(), 1); + match &docs[0] { + ClaudeDoc::Thought(s) => assert_eq!(s, "thinking about whether to reply..."), + _ => panic!("expected thought"), + } + } + + #[test] + fn parse_multi_doc() { + let raw = "\ +=== thought +let me check notes + +=== room !x:y +hi + +=== dm @u:s +private + +=== skip +"; + let docs = parse_response(raw, &test_room()); + assert_eq!(docs.len(), 4); + assert!(matches!(docs[0], ClaudeDoc::Thought(_))); + assert!(matches!( + docs[1], + ClaudeDoc::Message { + target: ResponseTarget::Room(_), + .. + } + )); + assert!(matches!( + docs[2], + ClaudeDoc::Message { + target: ResponseTarget::Dm(_), + .. + } + )); + assert!(matches!(docs[3], ClaudeDoc::Skip)); + } + + #[test] + fn parse_preamble_becomes_thought() { + let raw = "preamble line\n=== room !x:y\nhello"; + let docs = parse_response(raw, &test_room()); + assert_eq!(docs.len(), 2); + assert!(matches!(docs[0], ClaudeDoc::Thought(_))); + assert!(matches!(docs[1], ClaudeDoc::Message { .. })); + } + + #[test] + fn parse_react_with_key_in_header() { + let raw = "=== react $abc12345… šŸ‘€"; + let docs = parse_response(raw, &test_room()); + assert_eq!(docs.len(), 1); + match &docs[0] { + ClaudeDoc::Reaction { target_id_arg, key } => { + assert_eq!(target_id_arg, "$abc12345…"); + assert_eq!(key, "šŸ‘€"); + } + _ => panic!("expected reaction"), + } + } + + #[test] + fn parse_react_with_key_in_body() { + let raw = "=== react $abc12345…\nšŸ”„"; + let docs = parse_response(raw, &test_room()); + assert_eq!(docs.len(), 1); + match &docs[0] { + ClaudeDoc::Reaction { key, .. } => assert_eq!(key, "šŸ”„"), + _ => panic!("expected reaction"), + } + } + + #[test] + fn parse_unknown_header_becomes_thought() { + let raw = "=== mystery foo\nbody"; + let docs = parse_response(raw, &test_room()); + assert_eq!(docs.len(), 1); + assert!(matches!(docs[0], ClaudeDoc::Thought(_))); + } +} diff --git a/src/handlers.rs b/src/handlers.rs new file mode 100644 index 0000000..7af6e4c --- /dev/null +++ b/src/handlers.rs @@ -0,0 +1,217 @@ +use std::path::Path; +use std::sync::Arc; + +use matrix_sdk::{ + Client, Room, RoomState, + ruma::{ + UserId, + api::client::receipt::create_receipt::v3::ReceiptType as CreateReceiptType, + events::{ + receipt::ReceiptThread, + room::{ + member::StrippedRoomMemberEvent, + message::{MessageType, OriginalSyncRoomMessageEvent}, + }, + }, + }, +}; +use tokio::fs; +use tokio::sync::Mutex; + +use crate::paths; +use crate::timeline::chrono_now; +use crate::types::{DaemonState, PersistedSession}; + +pub async fn on_room_message( + event: OriginalSyncRoomMessageEvent, + room: Room, + state: Arc>, +) { + if room.state() != RoomState::Joined { + return; + } + let MessageType::Text(text_content) = &event.content.msgtype else { + return; + }; + + let room_id = room.room_id().to_owned(); + let is_self = { + let state = state.lock().await; + event.sender == state.own_user_id + }; + + tracing::info!( + room = %room_id, + sender = %event.sender, + self_msg = is_self, + "{}", + text_content.body + ); + + if let Err(e) = ensure_room_notes(&room).await { + tracing::warn!(room = %room_id, "failed to ensure room notes: {e}"); + } + if !is_self { + if let Err(e) = ensure_person_notes(&room, &event.sender).await { + tracing::warn!(sender = %event.sender, "failed to ensure person notes: {e}"); + } + } + + if !is_self { + let mut state = state.lock().await; + if !state.pending_rooms.contains(&room_id) { + state.pending_rooms.push(room_id); + } + } +} + +pub async fn on_reaction( + event: matrix_sdk::ruma::events::reaction::OriginalSyncReactionEvent, + room: Room, + state: Arc>, +) { + if room.state() != RoomState::Joined { + return; + } + let room_id = room.room_id().to_owned(); + let mut state = state.lock().await; + let is_self = event.sender == state.own_user_id; + + tracing::info!( + room = %room_id, + sender = %event.sender, + self_react = is_self, + target = %event.content.relates_to.event_id, + key = %event.content.relates_to.key, + "reaction" + ); + + if !is_self && !state.pending_rooms.contains(&room_id) { + state.pending_rooms.push(room_id); + } +} + +pub async fn on_stripped_state_member(event: StrippedRoomMemberEvent, client: Client, room: Room) { + let Some(my_id) = client.user_id() else { + return; + }; + if event.state_key != my_id { + return; + } + let room_id = room.room_id().to_owned(); + tokio::spawn(async move { + tracing::info!(room = %room_id, "auto-joining invite"); + let mut delay = 2u64; + loop { + match room.join().await { + Ok(()) => { + tracing::info!(room = %room_id, "joined"); + if let Err(e) = ensure_room_notes(&room).await { + tracing::warn!(room = %room_id, "failed to write room notes: {e}"); + } + return; + } + Err(e) => { + tracing::warn!(room = %room_id, "join failed, retry in {delay}s: {e}"); + tokio::time::sleep(std::time::Duration::from_secs(delay)).await; + delay = (delay * 2).min(300); + if delay >= 300 { + tracing::error!(room = %room_id, "giving up on auto-join"); + return; + } + } + } + } + }); +} + +/// Create state/rooms//notes.md if it doesn't exist. +pub async fn ensure_room_notes(room: &Room) -> anyhow::Result<()> { + let room_id = room.room_id(); + let dir = paths::state_dir().join("rooms").join(room_id.as_str()); + let file = dir.join("notes.md"); + if file.exists() { + return Ok(()); + } + fs::create_dir_all(&dir).await?; + + let display_name = room + .display_name() + .await + .map(|n| n.to_string()) + .unwrap_or_else(|_| room_id.to_string()); + let now = chrono_now(); + + let body = format!("# {room_id}\n\nDisplay name: {display_name}\n\nFirst joined: {now}\n"); + fs::write(&file, body).await?; + tracing::info!(room = %room_id, "created room notes"); + Ok(()) +} + +/// Create state/people//notes.md if it doesn't exist. +/// Pre-fill with display name (from this room's member info) and the room as +/// "first met in". +pub async fn ensure_person_notes(room: &Room, user_id: &UserId) -> anyhow::Result<()> { + let dir = paths::state_dir().join("people").join(user_id.as_str()); + let file = dir.join("notes.md"); + if file.exists() { + return Ok(()); + } + fs::create_dir_all(&dir).await?; + + let display_name = room + .get_member_no_sync(user_id) + .await + .ok() + .flatten() + .and_then(|m| m.display_name().map(ToOwned::to_owned)) + .unwrap_or_default(); + let display_line = if display_name.is_empty() { + String::new() + } else { + format!("Display name: {display_name}\n") + }; + let room_id = room.room_id(); + let now = chrono_now(); + + let body = format!("# {user_id}\n\n{display_line}First met in: {room_id} on {now}\n"); + fs::write(&file, body).await?; + tracing::info!(user = %user_id, "created person notes"); + Ok(()) +} + +/// Find an existing DM room with the given user, or create one. +pub async fn find_or_create_dm(client: &Client, user_id: &UserId) -> anyhow::Result { + for room in client.joined_rooms() { + if room.is_direct().await.unwrap_or(false) + && room + .direct_targets() + .iter() + .any(|t| t.as_str() == user_id.as_str()) + { + return Ok(room); + } + } + tracing::info!(user = %user_id, "creating new DM room"); + Ok(client.create_dm(user_id).await?) +} + +pub async fn send_read_receipt(room: &Room, event_id: Option) { + let Some(eid) = event_id else { + return; + }; + if let Err(e) = room + .send_single_receipt(CreateReceiptType::Read, ReceiptThread::Unthreaded, eid) + .await + { + tracing::warn!(room = %room.room_id(), "failed to send read receipt: {e}"); + } +} + +pub async fn persist_sync_token(session_file: &Path, sync_token: String) -> anyhow::Result<()> { + let data = fs::read_to_string(session_file).await?; + let mut session: PersistedSession = serde_json::from_str(&data)?; + session.sync_token = Some(sync_token); + fs::write(session_file, serde_json::to_string_pretty(&session)?).await?; + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index df7d6df..d5712f9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,120 +1,34 @@ +mod claude; +mod handlers; mod paths; +mod session; +mod timeline; +mod types; -use std::fmt::Write as _; -use std::path::Path; +use std::collections::HashSet; use std::sync::Arc; use anyhow::{Context, bail}; use matrix_sdk::{ - Client, Room, RoomState, - authentication::matrix::MatrixSession, + Client, Room, config::SyncSettings, ruma::{ - OwnedEventId, OwnedRoomId, OwnedUserId, UserId, - api::client::{ - filter::FilterDefinition, receipt::create_receipt::v3::ReceiptType as CreateReceiptType, - }, + OwnedEventId, OwnedRoomId, + api::client::filter::FilterDefinition, events::{ - reaction::ReactionEventContent, - receipt::ReceiptThread, - relation::Annotation, - room::{ - member::StrippedRoomMemberEvent, - message::{MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent}, - }, + reaction::ReactionEventContent, receipt::ReceiptThread, relation::Annotation, + room::message::RoomMessageEventContent, }, }, }; -use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::sync::Mutex; use tracing_subscriber::EnvFilter; -#[derive(Debug, Deserialize)] -struct Config { - homeserver: String, - username: String, - password: String, - rate_limit_per_min: Option, - model: Option, - max_history: Option, -} - -const DEFAULT_MODEL: &str = "claude-sonnet-4-6"; -const DEFAULT_MAX_HISTORY: usize = 20; - -#[derive(Debug, Serialize, Deserialize)] -struct PersistedSession { - homeserver: String, - db_path: std::path::PathBuf, - user_session: MatrixSession, - #[serde(skip_serializing_if = "Option::is_none")] - sync_token: Option, -} - -#[derive(Clone, Debug)] -enum TimelineItem { - Message { - event_id: OwnedEventId, - sender: OwnedUserId, - body: String, - is_self: bool, - /// Unix seconds. 0 if unknown. - ts: i64, - in_reply_to: Option, - }, - Reaction { - sender: OwnedUserId, - target_event_id: OwnedEventId, - key: String, - is_self: bool, - ts: i64, - }, -} - -impl TimelineItem { - fn ts(&self) -> i64 { - match self { - TimelineItem::Message { ts, .. } | TimelineItem::Reaction { ts, .. } => *ts, - } - } - - fn event_id(&self) -> Option<&OwnedEventId> { - match self { - TimelineItem::Message { event_id, .. } => Some(event_id), - TimelineItem::Reaction { .. } => None, - } - } - - fn sender(&self) -> &OwnedUserId { - match self { - TimelineItem::Message { sender, .. } | TimelineItem::Reaction { sender, .. } => sender, - } - } - - fn is_self(&self) -> bool { - match self { - TimelineItem::Message { is_self, .. } | TimelineItem::Reaction { is_self, .. } => { - *is_self - } - } - } -} - -struct DaemonState { - own_user_id: OwnedUserId, - /// Per-room: the latest event_id that's been "shown" to Claude. Events - /// after this are "new" on the next invocation. Cleared on daemon restart. - last_shown: std::collections::HashMap, - pending_rooms: Vec, - rate_budget: u32, - rate_limit_per_min: u32, - last_rate_reset: std::time::Instant, - model: String, - max_history: usize, -} - -const DEFAULT_RATE_LIMIT_PER_MIN: u32 = 1; +use types::{ + ClaudeDoc, DEFAULT_MAX_HISTORY, DEFAULT_MODEL, DEFAULT_RATE_LIMIT_PER_MIN, DaemonState, + ResponseTarget, TimelineItem, +}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -133,7 +47,7 @@ async fn main() -> anyhow::Result<()> { let session_file = paths::session_path(); let db_path = paths::db_path(); - let config = load_config().await?; + let config = session::load_config().await?; let rate_limit_per_min = config .rate_limit_per_min .unwrap_or(DEFAULT_RATE_LIMIT_PER_MIN); @@ -144,9 +58,12 @@ async fn main() -> anyhow::Result<()> { let max_history = config.max_history.unwrap_or(DEFAULT_MAX_HISTORY); let (client, sync_token) = if session_file.exists() { - restore_session(&session_file).await? + session::restore_session(&session_file).await? } else { - (login(&config, &db_path, &session_file).await?, None) + ( + session::login(&config, &db_path, &session_file).await?, + None, + ) }; let own_user_id = client.user_id().context("not logged in")?.to_owned(); @@ -157,7 +74,6 @@ async fn main() -> anyhow::Result<()> { "ready" ); - // Enable persistent event cache (matrix-sdk's sqlite store keeps the timeline) client .event_cache() .subscribe() @@ -183,66 +99,10 @@ async fn main() -> anyhow::Result<()> { sync(client, sync_token, &session_file, state).await } -async fn load_config() -> anyhow::Result { - let data = fs::read_to_string(paths::config_path()) - .await - .context("failed to read config.json")?; - serde_json::from_str(&data).context("failed to parse config.json") -} - -async fn restore_session(session_file: &Path) -> anyhow::Result<(Client, Option)> { - let data = fs::read_to_string(session_file).await?; - let session: PersistedSession = serde_json::from_str(&data)?; - - tracing::info!(user = %session.user_session.meta.user_id, "restoring session"); - - let client = Client::builder() - .homeserver_url(&session.homeserver) - .sqlite_store(paths::db_path(), None) - .build() - .await?; - - client.restore_session(session.user_session).await?; - - Ok((client, session.sync_token)) -} - -async fn login(config: &Config, db_path: &Path, session_file: &Path) -> anyhow::Result { - tracing::info!(homeserver = %config.homeserver, user = %config.username, "logging in"); - - let client = Client::builder() - .homeserver_url(&config.homeserver) - .sqlite_store(db_path, None) - .build() - .await?; - - client - .matrix_auth() - .login_username(&config.username, &config.password) - .initial_device_display_name("damocles-daemon") - .await?; - - let user_session = client - .matrix_auth() - .session() - .context("no session after login")?; - - let persisted = PersistedSession { - homeserver: config.homeserver.clone(), - db_path: db_path.to_owned(), - user_session, - sync_token: None, - }; - fs::write(session_file, serde_json::to_string_pretty(&persisted)?).await?; - - tracing::info!("session persisted"); - Ok(client) -} - async fn sync( client: Client, initial_sync_token: Option, - session_file: &Path, + session_file: &std::path::Path, state: Arc>, ) -> anyhow::Result<()> { tracing::info!("initial sync (ignoring past messages)"); @@ -258,7 +118,7 @@ async fn sync( match client.sync_once(sync_settings.clone()).await { Ok(response) => { sync_settings = sync_settings.token(response.next_batch.clone()); - persist_sync_token(session_file, response.next_batch).await?; + handlers::persist_sync_token(session_file, response.next_batch).await?; break; } Err(e) => { @@ -270,235 +130,33 @@ async fn sync( tracing::info!("synced, listening for messages"); let msg_state = state.clone(); - client.add_event_handler(move |event: OriginalSyncRoomMessageEvent, room: Room| { - let state = msg_state.clone(); - async move { - on_room_message(event, room, state).await; - } - }); + client.add_event_handler( + move |event: matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent, + room: Room| { + let state = msg_state.clone(); + async move { + handlers::on_room_message(event, room, state).await; + } + }, + ); let react_state = state.clone(); client.add_event_handler( move |event: matrix_sdk::ruma::events::reaction::OriginalSyncReactionEvent, room: Room| { let state = react_state.clone(); async move { - on_reaction(event, room, state).await; + handlers::on_reaction(event, room, state).await; } }, ); - client.add_event_handler(on_stripped_state_member); + client.add_event_handler(handlers::on_stripped_state_member); client.sync(sync_settings).await?; bail!("sync loop exited unexpectedly") } -async fn on_reaction( - event: matrix_sdk::ruma::events::reaction::OriginalSyncReactionEvent, - room: Room, - state: Arc>, -) { - if room.state() != RoomState::Joined { - return; - } - let room_id = room.room_id().to_owned(); - let mut state = state.lock().await; - let is_self = event.sender == state.own_user_id; - - tracing::info!( - room = %room_id, - sender = %event.sender, - self_react = is_self, - target = %event.content.relates_to.event_id, - key = %event.content.relates_to.key, - "reaction" - ); - - if !is_self && !state.pending_rooms.contains(&room_id) { - state.pending_rooms.push(room_id); - } -} - -async fn on_stripped_state_member(event: StrippedRoomMemberEvent, client: Client, room: Room) { - let Some(my_id) = client.user_id() else { - return; - }; - if event.state_key != my_id { - return; - } - let room_id = room.room_id().to_owned(); - tokio::spawn(async move { - tracing::info!(room = %room_id, "auto-joining invite"); - let mut delay = 2u64; - loop { - match room.join().await { - Ok(()) => { - tracing::info!(room = %room_id, "joined"); - if let Err(e) = ensure_room_notes(&room).await { - tracing::warn!(room = %room_id, "failed to write room notes: {e}"); - } - return; - } - Err(e) => { - tracing::warn!(room = %room_id, "join failed, retry in {delay}s: {e}"); - tokio::time::sleep(std::time::Duration::from_secs(delay)).await; - delay = (delay * 2).min(300); - if delay >= 300 { - tracing::error!(room = %room_id, "giving up on auto-join"); - return; - } - } - } - } - }); -} - -async fn persist_sync_token(session_file: &Path, sync_token: String) -> anyhow::Result<()> { - let data = fs::read_to_string(session_file).await?; - let mut session: PersistedSession = serde_json::from_str(&data)?; - session.sync_token = Some(sync_token); - fs::write(session_file, serde_json::to_string_pretty(&session)?).await?; - Ok(()) -} - -async fn on_room_message( - event: OriginalSyncRoomMessageEvent, - room: Room, - state: Arc>, -) { - if room.state() != RoomState::Joined { - return; - } - let MessageType::Text(text_content) = &event.content.msgtype else { - return; - }; - - let room_id = room.room_id().to_owned(); - let is_self = { - let state = state.lock().await; - event.sender == state.own_user_id - }; - - tracing::info!( - room = %room_id, - sender = %event.sender, - self_msg = is_self, - "{}", - text_content.body - ); - - if let Err(e) = ensure_room_notes(&room).await { - tracing::warn!(room = %room_id, "failed to ensure room notes: {e}"); - } - if !is_self { - if let Err(e) = ensure_person_notes(&room, &event.sender).await { - tracing::warn!(sender = %event.sender, "failed to ensure person notes: {e}"); - } - } - - if !is_self { - let mut state = state.lock().await; - if !state.pending_rooms.contains(&room_id) { - state.pending_rooms.push(room_id); - } - } -} - -/// Create state/rooms//notes.md if it doesn't exist. -async fn ensure_room_notes(room: &Room) -> anyhow::Result<()> { - let room_id = room.room_id(); - let dir = paths::state_dir().join("rooms").join(room_id.as_str()); - let file = dir.join("notes.md"); - if file.exists() { - return Ok(()); - } - fs::create_dir_all(&dir).await?; - - let display_name = room - .display_name() - .await - .map(|n| n.to_string()) - .unwrap_or_else(|_| room_id.to_string()); - let now = chrono_now(); - - let body = format!("# {room_id}\n\nDisplay name: {display_name}\n\nFirst joined: {now}\n",); - fs::write(&file, body).await?; - tracing::info!(room = %room_id, "created room notes"); - Ok(()) -} - -/// Create state/people//notes.md if it doesn't exist. -/// Pre-fill with display name (from this room's member info) and the room as -/// "first met in". -async fn ensure_person_notes(room: &Room, user_id: &UserId) -> anyhow::Result<()> { - let dir = paths::state_dir().join("people").join(user_id.as_str()); - let file = dir.join("notes.md"); - if file.exists() { - return Ok(()); - } - fs::create_dir_all(&dir).await?; - - let display_name = room - .get_member_no_sync(user_id) - .await - .ok() - .flatten() - .and_then(|m| m.display_name().map(ToOwned::to_owned)) - .unwrap_or_default(); - let display_line = if display_name.is_empty() { - String::new() - } else { - format!("Display name: {display_name}\n") - }; - let room_id = room.room_id(); - let now = chrono_now(); - - let body = format!("# {user_id}\n\n{display_line}First met in: {room_id} on {now}\n",); - fs::write(&file, body).await?; - tracing::info!(user = %user_id, "created person notes"); - Ok(()) -} - -fn chrono_now() -> String { - use std::time::{SystemTime, UNIX_EPOCH}; - let secs = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_secs() as i64) - .unwrap_or(0); - // Simple ISO-ish without pulling chrono. YYYY-MM-DD only is fine for notes. - let days = secs / 86400; - let (y, m, d) = days_to_ymd(days); - format!("{y:04}-{m:02}-{d:02}") -} - -/// Format a unix-seconds timestamp as `YYYY-MM-DD HH:MM` UTC. Returns "?" for 0. -fn format_ts(secs: i64) -> String { - if secs == 0 { - return "?".into(); - } - let days = secs.div_euclid(86400); - let day_secs = secs.rem_euclid(86400); - let (y, m, d) = days_to_ymd(days); - let h = day_secs / 3600; - let min = (day_secs % 3600) / 60; - format!("{y:04}-{m:02}-{d:02} {h:02}:{min:02}") -} - -/// Convert days-since-1970-01-01 to (year, month, day). Civil-date algorithm. -fn days_to_ymd(z: i64) -> (i64, u32, u32) { - let z = z + 719_468; - let era = z.div_euclid(146_097); - let doe = z.rem_euclid(146_097) as u32; - let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365; - let y = yoe as i64 + era * 400; - let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); - let mp = (5 * doy + 2) / 153; - let d = doy - (153 * mp + 2) / 5 + 1; - let m = if mp < 10 { mp + 3 } else { mp - 9 }; - (if m <= 2 { y + 1 } else { y }, m, d) -} - async fn process_loop(state: Arc>, client: Client) { loop { tokio::time::sleep(std::time::Duration::from_secs(1)).await; @@ -527,981 +185,198 @@ async fn process_loop(state: Arc>, client: Client) { continue; }; - // Snapshot last_shown for this room so we can mark seen vs new. - // If we don't have one in memory (e.g. fresh daemon start), seed from - // the stored read receipt so we don't reprocess old messages. - let in_memory = { - let s = state.lock().await; - s.last_shown.get(&room_id).cloned() - }; - let prev_last_shown = if let Some(eid) = in_memory { - Some(eid) - } else { - let from_receipt = match room - .load_user_receipt( - matrix_sdk::ruma::events::receipt::ReceiptType::Read, - ReceiptThread::Unthreaded, - client.user_id().expect("logged in"), - ) - .await - { - Ok(Some((eid, _))) => Some(eid), - Ok(None) => None, - Err(e) => { - tracing::warn!(room = %room_id, "failed to load receipt: {e}"); - None - } - }; - if let Some(ref eid) = from_receipt { - let mut s = state.lock().await; - s.last_shown.insert(room_id.clone(), eid.clone()); - } - from_receipt - }; - let room_name = room - .display_name() - .await - .map_or_else(|_| room_id.to_string(), |n| n.to_string()); + if let Err(e) = process_room(&state, &client, &room_id, &room).await { + tracing::error!(room = %room_id, "failed to process room: {e}"); + } + } +} - let (own_user, model, max_history) = { - let state = state.lock().await; - ( - state.own_user_id.clone(), - state.model.clone(), - state.max_history, +async fn process_room( + state: &Arc>, + client: &Client, + room_id: &OwnedRoomId, + room: &Room, +) -> anyhow::Result<()> { + // Snapshot last_shown for this room so we can mark seen vs new. + let in_memory = { + let s = state.lock().await; + s.last_shown.get(room_id).cloned() + }; + let prev_last_shown = if let Some(eid) = in_memory { + Some(eid) + } else { + let from_receipt = match room + .load_user_receipt( + matrix_sdk::ruma::events::receipt::ReceiptType::Read, + ReceiptThread::Unthreaded, + client.user_id().expect("logged in"), ) - }; - - // Load recent timeline (messages + reactions) from matrix-sdk's - // persistent event cache. - let mut timeline = match load_timeline(&room, max_history, &own_user).await { - Ok(t) => t, + .await + { + Ok(Some((eid, _))) => Some(eid), + Ok(None) => None, Err(e) => { - tracing::error!(room = %room_id, "failed to load timeline: {e}"); - continue; + tracing::warn!(room = %room_id, "failed to load receipt: {e}"); + None } }; - - // For any new messages that reply to events outside the window, fetch - // the replied-to event from cache and prepend it as extra context. - let in_window: std::collections::HashSet = timeline - .iter() - .filter_map(TimelineItem::event_id) - .cloned() - .collect(); - let seen_idx_initial = prev_last_shown - .as_ref() - .and_then(|id| { - timeline.iter().position(|t| match t { - TimelineItem::Message { event_id, .. } => event_id == id, - _ => false, - }) - }) - .map_or(0, |pos| pos + 1); - let mut reply_targets: Vec = Vec::new(); - for item in timeline.iter().skip(seen_idx_initial) { - if let TimelineItem::Message { - in_reply_to: Some(target), - .. - } = item - { - if !in_window.contains(target) && !reply_targets.contains(target) { - reply_targets.push(target.clone()); - } - } - } - if !reply_targets.is_empty() { - if let Ok((cache, _h)) = room.event_cache().await { - for target in &reply_targets { - if let Some(found) = fetch_message(&cache, target, &own_user).await { - timeline.insert(0, found); - } - } - } + if let Some(ref eid) = from_receipt { + let mut s = state.lock().await; + s.last_shown.insert(room_id.clone(), eid.clone()); } + from_receipt + }; - // Determine seen split based on the last_shown message event id - let seen_idx = prev_last_shown - .as_ref() - .and_then(|id| { - timeline.iter().position(|t| match t { - TimelineItem::Message { event_id, .. } => event_id == id, - _ => false, - }) - }) - .map_or(0, |pos| pos + 1); + let room_name = room + .display_name() + .await + .map_or_else(|_| room_id.to_string(), |n| n.to_string()); - // The "last shown" pointer should advance to the latest message we've - // loaded (not a reaction). - let new_last_event_id = timeline.iter().rev().find_map(|t| match t { - TimelineItem::Message { event_id, .. } => Some(event_id.clone()), - _ => None, - }); - - // Compute who has read which message - let read_markers = compute_read_markers(&room, &timeline, &own_user).await; - - // Tell the room we're "typing" while claude thinks. Best-effort; no - // hard fail if it doesn't go through. - if let Err(e) = room.typing_notice(true).await { - tracing::debug!(room = %room_id, "failed to send typing start: {e}"); - } - - let invoke_result = invoke_claude( - &room_id, - &room_name, - &timeline, - seen_idx, - &model, - &read_markers, + let (own_user, model, max_history) = { + let state = state.lock().await; + ( + state.own_user_id.clone(), + state.model.clone(), + state.max_history, ) - .await; + }; - if let Err(e) = room.typing_notice(false).await { - tracing::debug!(room = %room_id, "failed to send typing stop: {e}"); - } + let mut tl = timeline::load_timeline(room, max_history, &own_user).await?; - let docs = match invoke_result { - Ok(d) => d, - Err(e) => { - tracing::error!(room = %room_id, "claude invocation failed: {e}"); - continue; + // For any new messages that reply to events outside the window, fetch + // the replied-to event from cache and prepend it as extra context. + let in_window: HashSet = tl + .iter() + .filter_map(TimelineItem::event_id) + .cloned() + .collect(); + let seen_idx_initial = prev_last_shown + .as_ref() + .and_then(|id| { + tl.iter().position(|t| match t { + TimelineItem::Message { event_id, .. } => event_id == id, + _ => false, + }) + }) + .map_or(0, |pos| pos + 1); + let mut reply_targets: Vec = Vec::new(); + for item in tl.iter().skip(seen_idx_initial) { + if let TimelineItem::Message { + in_reply_to: Some(target), + .. + } = item + { + if !in_window.contains(target) && !reply_targets.contains(target) { + reply_targets.push(target.clone()); } - }; + } + } + if !reply_targets.is_empty() { + if let Ok((cache, _h)) = room.event_cache().await { + for target in &reply_targets { + if let Some(found) = timeline::fetch_message(&cache, target, &own_user).await { + tl.insert(0, found); + } + } + } + } - let mut sent_any = false; - for doc in docs { - match doc { - ClaudeDoc::Skip => { - tracing::debug!(room = %room_id, "claude doc: skip"); - } - ClaudeDoc::Thought(body) => { - tracing::info!(room = %room_id, thought = %body.chars().take(120).collect::(), "claude doc: thought"); - tracing::trace!("full thought: {body}"); - } - ClaudeDoc::Message { target, body } => { - let target_room = match &target { - ResponseTarget::Room(rid) => client.get_room(rid), - ResponseTarget::Dm(user) => match find_or_create_dm(&client, user).await { + let seen_idx = prev_last_shown + .as_ref() + .and_then(|id| { + tl.iter().position(|t| match t { + TimelineItem::Message { event_id, .. } => event_id == id, + _ => false, + }) + }) + .map_or(0, |pos| pos + 1); + + let new_last_event_id = tl.iter().rev().find_map(|t| match t { + TimelineItem::Message { event_id, .. } => Some(event_id.clone()), + _ => None, + }); + + let read_markers = timeline::compute_read_markers(room, &tl, &own_user).await; + + if let Err(e) = room.typing_notice(true).await { + tracing::debug!(room = %room_id, "failed to send typing start: {e}"); + } + + let invoke_result = + claude::invoke_claude(room_id, &room_name, &tl, seen_idx, &model, &read_markers).await; + + if let Err(e) = room.typing_notice(false).await { + tracing::debug!(room = %room_id, "failed to send typing stop: {e}"); + } + + let docs = invoke_result?; + + for doc in docs { + match doc { + ClaudeDoc::Skip => { + tracing::debug!(room = %room_id, "claude doc: skip"); + } + ClaudeDoc::Thought(body) => { + tracing::info!(room = %room_id, thought = %body.chars().take(120).collect::(), "claude doc: thought"); + tracing::trace!("full thought: {body}"); + } + ClaudeDoc::Message { target, body } => { + let target_room = match &target { + ResponseTarget::Room(rid) => client.get_room(rid), + ResponseTarget::Dm(user) => { + match handlers::find_or_create_dm(client, user).await { Ok(r) => Some(r), Err(e) => { tracing::error!(user = %user, "failed to get/create DM: {e}"); None } - }, - }; - let target_label = match &target { - ResponseTarget::Room(rid) => rid.to_string(), - ResponseTarget::Dm(user) => format!("dm:{user}"), - }; - if let Some(target_room) = target_room { - let content = RoomMessageEventContent::text_plain(&body); - match target_room.send(content).await { - Ok(_) => { - let mut state = state.lock().await; - state.rate_budget = state.rate_budget.saturating_sub(1); - tracing::info!( - target = %target_label, - "sent response ({} budget remaining)", - state.rate_budget - ); - sent_any = true; - } - Err(e) => tracing::error!("failed to send: {e}"), } - } else { - tracing::warn!(target = %target_label, "target not available"); } - } - ClaudeDoc::Reaction { target_id_arg, key } => { - let Some(full_eid) = resolve_event_id(&timeline, &target_id_arg) else { - tracing::warn!(arg = %target_id_arg, "react: target event id not found in timeline"); - continue; - }; - let content = - ReactionEventContent::new(Annotation::new(full_eid.clone(), key.clone())); - match room.send(content).await { - Ok(_) => tracing::info!(target = %full_eid, %key, "sent reaction"), - Err(e) => tracing::error!("failed to send reaction: {e}"), + }; + let target_label = match &target { + ResponseTarget::Room(rid) => rid.to_string(), + ResponseTarget::Dm(user) => format!("dm:{user}"), + }; + if let Some(target_room) = target_room { + let content = RoomMessageEventContent::text_plain(&body); + match target_room.send(content).await { + Ok(_) => { + let mut state = state.lock().await; + state.rate_budget = state.rate_budget.saturating_sub(1); + tracing::info!( + target = %target_label, + "sent response ({} budget remaining)", + state.rate_budget + ); + } + Err(e) => tracing::error!("failed to send: {e}"), } + } else { + tracing::warn!(target = %target_label, "target not available"); } } - } - - // Update last_shown and send read receipt regardless of whether we - // sent a message - the agent saw the messages either way. - { - let mut state = state.lock().await; - if let Some(eid) = new_last_event_id.clone() { - state.last_shown.insert(room_id.clone(), eid); - } - } - send_read_receipt(&room, new_last_event_id).await; - - let _ = sent_any; - } -} - -/// Find an existing DM room with the given user, or create one. -async fn find_or_create_dm(client: &Client, user_id: &UserId) -> anyhow::Result { - for room in client.joined_rooms() { - if room.is_direct().await.unwrap_or(false) - && room - .direct_targets() - .iter() - .any(|t| t.as_str() == user_id.as_str()) - { - return Ok(room); - } - } - tracing::info!(user = %user_id, "creating new DM room"); - Ok(client.create_dm(user_id).await?) -} - -async fn send_read_receipt(room: &Room, event_id: Option) { - let Some(eid) = event_id else { - return; - }; - if let Err(e) = room - .send_single_receipt(CreateReceiptType::Read, ReceiptThread::Unthreaded, eid) - .await - { - tracing::warn!(room = %room.room_id(), "failed to send read receipt: {e}"); - } -} - -/// Load the last N timeline items (messages + reactions) from the room's -/// persistent event cache. Returns oldest-first. -/// -/// We walk events newest-first, collect messages until we have `limit`, then -/// also include any reactions whose timestamps fall within the message window. -async fn load_timeline( - room: &Room, - limit: usize, - own_user: &OwnedUserId, -) -> anyhow::Result> { - use matrix_sdk::ruma::events::AnySyncTimelineEvent; - - let (cache, _handles) = room.event_cache().await?; - let events = cache.events().await; - - let mut messages: Vec = Vec::new(); - let mut reactions: Vec = Vec::new(); - let mut earliest_message_ts: Option = None; - - for ev in events.iter().rev() { - let raw = ev.raw(); - let Ok(deserialized) = raw.deserialize() else { - continue; - }; - let AnySyncTimelineEvent::MessageLike(msg) = deserialized else { - continue; - }; - - match msg { - matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage( - matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), - ) => { - if messages.len() >= limit { - continue; - } - let MessageType::Text(text) = &orig.content.msgtype else { - continue; - }; - let ts = ts_secs_from(orig.origin_server_ts.0); - let in_reply_to = match &orig.content.relates_to { - Some(matrix_sdk::ruma::events::room::message::Relation::Reply { - in_reply_to, - }) => Some(in_reply_to.event_id.clone()), - _ => None, - }; - if earliest_message_ts.is_none_or(|e| ts < e) { - earliest_message_ts = Some(ts); - } - messages.push(TimelineItem::Message { - event_id: orig.event_id.clone(), - sender: orig.sender.clone(), - body: text.body.clone(), - is_self: &orig.sender == own_user, - ts, - in_reply_to, - }); - } - matrix_sdk::ruma::events::AnySyncMessageLikeEvent::Reaction( - matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), - ) => { - let ts = ts_secs_from(orig.origin_server_ts.0); - reactions.push(TimelineItem::Reaction { - sender: orig.sender.clone(), - target_event_id: orig.content.relates_to.event_id.clone(), - key: orig.content.relates_to.key.clone(), - is_self: &orig.sender == own_user, - ts, - }); - } - _ => {} - } - } - - // Drop reactions older than the oldest visible message, to avoid - // referencing messages we don't have in scope. - if let Some(min_ts) = earliest_message_ts { - reactions.retain(|r| r.ts() >= min_ts); - } - - let mut combined: Vec = Vec::with_capacity(messages.len() + reactions.len()); - combined.extend(messages); - combined.extend(reactions); - combined.sort_by_key(TimelineItem::ts); - Ok(combined) -} - -/// Render one timeline item into the prompt. -/// Messages: `[ts] $eid... [(you) ]@user: body [read by: ...]` -/// Reactions: `[ts] [(you) ]@user reacted to $eid... with KEY` -fn render_timeline_item( - prompt: &mut String, - item: &TimelineItem, - read_markers: &std::collections::HashMap>, -) { - match item { - TimelineItem::Message { - event_id, - sender, - body, - is_self, - ts, - .. - } => { - let ts_str = format_ts(*ts); - let id = short_event_id(event_id); - let prefix = if *is_self { "(you) " } else { "" }; - let readers_str = match read_markers.get(event_id) { - Some(rs) if !rs.is_empty() => { - let mut sorted = rs.clone(); - sorted.sort(); - let names: Vec = sorted.iter().map(|u| u.to_string()).collect(); - format!(" [read by: {}]", names.join(", ")) - } - _ => String::new(), - }; - writeln!( - prompt, - "[{ts_str}] {id} {prefix}{sender}: {body}{readers_str}" - ) - .unwrap(); - } - TimelineItem::Reaction { - sender, - target_event_id, - key, - is_self, - ts, - } => { - let ts_str = format_ts(*ts); - let id = short_event_id(target_event_id); - let prefix = if *is_self { "(you) " } else { "" }; - writeln!( - prompt, - "[{ts_str}] {prefix}{sender} reacted to {id} with {key}" - ) - .unwrap(); - } - } -} - -/// Shorten an event id for prompt display: `$abc123def456...` → `$abc123de`. -fn short_event_id(id: &OwnedEventId) -> String { - let s = id.as_str(); - let prefix: String = s.chars().take(9).collect(); - if s.len() > 9 { - format!("{prefix}…") - } else { - prefix - } -} - -/// For each message in the timeline, compute the list of OTHER users who -/// have a read receipt at or after that message. Self is excluded. -async fn compute_read_markers( - room: &Room, - timeline: &[TimelineItem], - own_user: &OwnedUserId, -) -> std::collections::HashMap> { - use matrix_sdk::ruma::events::receipt::ReceiptType; - - // Collect unique non-self users from the timeline (senders + reactors) - let mut users: Vec = timeline - .iter() - .filter(|t| !t.is_self()) - .map(|t| t.sender().clone()) - .collect(); - users.sort(); - users.dedup(); - - // Build position map for messages in timeline: event_id -> index - let positions: std::collections::HashMap = timeline - .iter() - .enumerate() - .filter_map(|(i, t)| match t { - TimelineItem::Message { event_id, .. } => Some((event_id.clone(), i)), - _ => None, - }) - .collect(); - - // For each user, find their receipt position. Then for every message - // at or before that position, add them as a reader. - let mut readers: std::collections::HashMap> = - std::collections::HashMap::new(); - - for user in &users { - if user == own_user { - continue; - } - let (receipt_eid, receipt_ts) = match room - .load_user_receipt(ReceiptType::Read, ReceiptThread::Unthreaded, user) - .await - { - Ok(Some((eid, r))) => { - let ts = r.ts.map(|t| ts_secs_from(t.0)).unwrap_or(0); - (eid, ts) - } - _ => continue, - }; - - // Three cases: - // 1. Receipt event is in our window → mark messages at idx <= user_pos - // 2. Receipt is for an event newer than our window (timestamp-based) - // → mark every message as read - // 3. Receipt is older than our window → mark nothing - let user_msg_idx_inclusive: Option = if let Some(&p) = positions.get(&receipt_eid) { - Some(p) - } else { - // Compare receipt ts with the newest message ts - let newest_msg_ts = timeline - .iter() - .rev() - .find_map(|t| match t { - TimelineItem::Message { ts, .. } => Some(*ts), - _ => None, - }) - .unwrap_or(0); - if receipt_ts > 0 && receipt_ts >= newest_msg_ts { - // User has read past our entire window - Some(timeline.len().saturating_sub(1)) - } else { - None - } - }; - - if let Some(up_to) = user_msg_idx_inclusive { - for item in timeline.iter().take(up_to + 1) { - if let TimelineItem::Message { event_id, .. } = item { - readers - .entry(event_id.clone()) - .or_default() - .push(user.clone()); - } - } - } - } - - readers -} - -/// Resolve a (possibly shortened/ellipsized) event id to a full one by -/// looking up against the timeline. Returns the matching message's full -/// event id if found. -fn resolve_event_id(timeline: &[TimelineItem], arg: &str) -> Option { - // Strip a trailing ellipsis or `...` if present - let cleaned = arg.trim_end_matches('…').trim_end_matches('.').trim(); - if cleaned.is_empty() { - return None; - } - for item in timeline { - if let TimelineItem::Message { event_id, .. } = item { - if event_id.as_str() == cleaned || event_id.as_str().starts_with(cleaned) { - return Some(event_id.clone()); - } - } - } - None -} - -fn ts_secs_from(ts: matrix_sdk::ruma::UInt) -> i64 { - let ms: u64 = ts.into(); - i64::try_from(ms).unwrap_or(0) / 1000 -} - -/// Fetch a single text message by event_id from the room's event cache. -async fn fetch_message( - cache: &matrix_sdk::event_cache::RoomEventCache, - event_id: &matrix_sdk::ruma::EventId, - own_user: &OwnedUserId, -) -> Option { - use matrix_sdk::ruma::events::AnySyncTimelineEvent; - - let ev = cache.find_event(event_id).await?; - let deserialized = ev.raw().deserialize().ok()?; - let AnySyncTimelineEvent::MessageLike(msg) = deserialized else { - return None; - }; - let matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage( - matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), - ) = msg - else { - return None; - }; - let MessageType::Text(text) = &orig.content.msgtype else { - return None; - }; - let ts = ts_secs_from(orig.origin_server_ts.0); - Some(TimelineItem::Message { - event_id: orig.event_id.clone(), - sender: orig.sender.clone(), - body: text.body.clone(), - is_self: &orig.sender == own_user, - ts, - in_reply_to: None, - }) -} - -enum ResponseTarget { - Room(OwnedRoomId), - Dm(OwnedUserId), -} - -/// One document within Claude's multi-doc output. Each doc has its own -/// frontmatter; the daemon routes based on which fields are present. -enum ClaudeDoc { - /// A chat message to send. - Message { - target: ResponseTarget, - body: String, - }, - /// A reaction to a message. `target_id_arg` is the event id (possibly - /// shortened) the agent saw in the prompt; daemon expands by prefix match. - Reaction { target_id_arg: String, key: String }, - /// Agent's internal monologue. Not sent to chat. Logged to tracing. - Thought(String), - /// Explicit "do nothing for this slot". Useful as a placeholder. - Skip, -} - -async fn invoke_claude( - source_room: &OwnedRoomId, - room_name: &str, - timeline: &[TimelineItem], - seen_idx: usize, - model: &str, - read_markers: &std::collections::HashMap>, -) -> anyhow::Result> { - let identity_dir = paths::identity_dir(); - let identity_str = identity_dir.to_string_lossy(); - - let mut prompt = String::new(); - writeln!(prompt, "[room_id: {source_room}]").unwrap(); - writeln!(prompt, "[room_name: {room_name}]").unwrap(); - writeln!( - prompt, - "[room notes path: ../rooms/{source_room}/notes.md (create dir if needed)]" - ) - .unwrap(); - - // Collect unique non-self participants (message senders + reactors) - let mut senders: Vec<&OwnedUserId> = timeline - .iter() - .filter(|t| !t.is_self()) - .map(TimelineItem::sender) - .collect(); - senders.sort(); - senders.dedup(); - if !senders.is_empty() { - writeln!( - prompt, - "\n[people in this room — check ../people//notes.md for each]" - ) - .unwrap(); - for s in &senders { - writeln!(prompt, " {s}").unwrap(); - } - } - - let seen = seen_idx.min(timeline.len()); - let (old, new) = timeline.split_at(seen); - - if !old.is_empty() { - writeln!(prompt, "\n[previously seen events — for context]").unwrap(); - for item in old { - render_timeline_item(&mut prompt, item, read_markers); - } - } - - writeln!(prompt, "\n[new events — respond to these]").unwrap(); - if new.is_empty() { - writeln!(prompt, "(none)").unwrap(); - } else { - for item in new { - render_timeline_item(&mut prompt, item, read_markers); - } - } - - let new_msg_count = new - .iter() - .filter(|t| matches!(t, TimelineItem::Message { .. })) - .count(); - let new_react_count = new.len() - new_msg_count; - tracing::info!( - "invoking claude: {} new ({} msg + {} react), {} seen", - new.len(), - new_msg_count, - new_react_count, - old.len() - ); - tracing::trace!("full prompt:\n{prompt}"); - - use tokio::process::Command; - let mut cmd = Command::new("claude"); - cmd.args([ - "--print", - "--model", - model, - "--add-dir", - &identity_str, - "--allowedTools", - "Read Edit Write Glob Grep", - "-p", - &prompt, - ]); - cmd.current_dir(&identity_dir); - cmd.stdin(std::process::Stdio::null()); - let output = cmd.output().await.context("failed to run claude")?; - - let stderr = String::from_utf8_lossy(&output.stderr); - let stdout = String::from_utf8_lossy(&output.stdout); - - if !output.status.success() { - bail!( - "claude exited with {}:\nstdout: {}\nstderr: {}", - output.status, - stdout, - stderr - ); - } - - if !stderr.is_empty() { - tracing::warn!("claude stderr: {stderr}"); - } - - let raw = String::from_utf8_lossy(&output.stdout).to_string(); - Ok(parse_response(&raw, source_room)) -} - -/// Parse Claude's stdout into a list of documents. -/// -/// Format: each doc starts with a line `=== [arg]`. Body is everything -/// until the next `===` line or EOF. Types: -/// - `=== thought` → ClaudeDoc::Thought (logged, not sent) -/// - `=== room []` → ClaudeDoc::Message to that room (or source room if no arg) -/// - `=== dm ` → ClaudeDoc::Message as DM -/// - `=== skip` → ClaudeDoc::Skip (no-op) -/// -/// Anything before the first `===` line is treated as a preamble thought. -/// Bare text with no `===` is treated as a single message to default_room. -fn parse_response(raw: &str, default_room: &OwnedRoomId) -> Vec { - let trimmed = raw.trim(); - if trimmed.is_empty() { - return Vec::new(); - } - - // Walk lines, splitting on lines that start with "=== " - let mut docs = Vec::new(); - let mut current_header: Option = None; - let mut current_body = String::new(); - let mut preamble = String::new(); - - for line in trimmed.lines() { - if let Some(header) = line.strip_prefix("===") { - // Flush previous doc - if let Some(h) = current_header.take() { - if let Some(doc) = build_doc(&h, current_body.trim(), default_room) { - docs.push(doc); - } - current_body.clear(); - } else { - // We were collecting preamble - let p = preamble.trim(); - if !p.is_empty() { - docs.push(ClaudeDoc::Thought(p.to_owned())); - } - preamble.clear(); - } - current_header = Some(header.trim().to_owned()); - } else if current_header.is_some() { - current_body.push_str(line); - current_body.push('\n'); - } else { - preamble.push_str(line); - preamble.push('\n'); - } - } - - // Flush the last doc or preamble - if let Some(h) = current_header { - if let Some(doc) = build_doc(&h, current_body.trim(), default_room) { - docs.push(doc); - } - } else { - // No `===` headers at all - treat whole output as a single message - let p = preamble.trim(); - if !p.is_empty() { - docs.push(ClaudeDoc::Message { - target: ResponseTarget::Room(default_room.clone()), - body: p.to_owned(), - }); - } - } - - docs -} - -fn build_doc(header: &str, body: &str, default_room: &OwnedRoomId) -> Option { - let mut parts = header.splitn(2, char::is_whitespace); - let kind = parts.next().unwrap_or("").trim(); - let arg = parts.next().unwrap_or("").trim(); - - match kind { - "skip" => Some(ClaudeDoc::Skip), - "thought" => { - if body.is_empty() { - None - } else { - Some(ClaudeDoc::Thought(body.to_owned())) - } - } - "room" => { - if body.is_empty() { - return None; - } - let target = if arg.is_empty() { - ResponseTarget::Room(default_room.clone()) - } else { - match arg.parse::() { - Ok(rid) => ResponseTarget::Room(rid), - Err(_) => return None, - } - }; - Some(ClaudeDoc::Message { - target, - body: body.to_owned(), - }) - } - "dm" => { - if body.is_empty() { - return None; - } - match arg.parse::() { - Ok(uid) => Some(ClaudeDoc::Message { - target: ResponseTarget::Dm(uid), - body: body.to_owned(), - }), - Err(_) => None, - } - } - "react" => { - // Allow either `=== react ` (key in header) or - // `=== react ` with body=key. - let mut header_parts = arg.splitn(2, char::is_whitespace); - let eid_arg = header_parts.next().unwrap_or("").trim(); - let key_in_header = header_parts.next().unwrap_or("").trim(); - if eid_arg.is_empty() { - return None; - } - let key = if !key_in_header.is_empty() { - key_in_header.to_owned() - } else if !body.is_empty() { - body.to_owned() - } else { - return None; - }; - Some(ClaudeDoc::Reaction { - target_id_arg: eid_arg.to_owned(), - key, - }) - } - _ => { - // Unknown header - treat body as a thought so it doesn't leak to chat - if body.is_empty() { - None - } else { - Some(ClaudeDoc::Thought(format!( - "[unknown header '{header}'] {body}" - ))) - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn test_room() -> OwnedRoomId { - "!test:example.com".parse().unwrap() - } - - fn first_message(docs: &[ClaudeDoc]) -> (&ResponseTarget, &str) { - for d in docs { - if let ClaudeDoc::Message { target, body } = d { - return (target, body.as_str()); - } - } - panic!("no message doc found"); - } - - fn assert_room(target: &ResponseTarget, expected: &str) { - match target { - ResponseTarget::Room(r) => assert_eq!(r.as_str(), expected), - ResponseTarget::Dm(_) => panic!("expected room target, got dm"), - } - } - - #[test] - fn parse_room_with_arg() { - let raw = "=== room !other:server\nhello world"; - let docs = parse_response(raw, &test_room()); - let (target, body) = first_message(&docs); - assert_room(target, "!other:server"); - assert_eq!(body, "hello world"); - } - - #[test] - fn parse_room_no_arg_uses_default() { - let raw = "=== room\nhi"; - let docs = parse_response(raw, &test_room()); - let (target, _) = first_message(&docs); - assert_room(target, "!test:example.com"); - } - - #[test] - fn parse_skip() { - let raw = "=== skip"; - let docs = parse_response(raw, &test_room()); - assert_eq!(docs.len(), 1); - assert!(matches!(docs[0], ClaudeDoc::Skip)); - } - - #[test] - fn parse_plain_text_no_header() { - let raw = "just a message"; - let docs = parse_response(raw, &test_room()); - let (target, body) = first_message(&docs); - assert_room(target, "!test:example.com"); - assert_eq!(body, "just a message"); - } - - #[test] - fn parse_empty() { - assert!(parse_response("", &test_room()).is_empty()); - assert!(parse_response(" \n ", &test_room()).is_empty()); - } - - #[test] - fn parse_dm() { - let raw = "=== dm @alice:example.com\nhi alice"; - let docs = parse_response(raw, &test_room()); - let (target, body) = first_message(&docs); - match target { - ResponseTarget::Dm(u) => assert_eq!(u.as_str(), "@alice:example.com"), - ResponseTarget::Room(_) => panic!("expected dm target"), - } - assert_eq!(body, "hi alice"); - } - - #[test] - fn parse_thought() { - let raw = "=== thought\nthinking about whether to reply..."; - let docs = parse_response(raw, &test_room()); - assert_eq!(docs.len(), 1); - match &docs[0] { - ClaudeDoc::Thought(s) => assert_eq!(s, "thinking about whether to reply..."), - _ => panic!("expected thought"), - } - } - - #[test] - fn parse_multi_doc() { - let raw = "\ -=== thought -let me check notes - -=== room !x:y -hi - -=== dm @u:s -private - -=== skip -"; - let docs = parse_response(raw, &test_room()); - assert_eq!(docs.len(), 4); - assert!(matches!(docs[0], ClaudeDoc::Thought(_))); - assert!(matches!( - docs[1], - ClaudeDoc::Message { - target: ResponseTarget::Room(_), - .. - } - )); - assert!(matches!( - docs[2], - ClaudeDoc::Message { - target: ResponseTarget::Dm(_), - .. - } - )); - assert!(matches!(docs[3], ClaudeDoc::Skip)); - } - - #[test] - fn parse_preamble_becomes_thought() { - let raw = "preamble line\n=== room !x:y\nhello"; - let docs = parse_response(raw, &test_room()); - assert_eq!(docs.len(), 2); - assert!(matches!(docs[0], ClaudeDoc::Thought(_))); - assert!(matches!(docs[1], ClaudeDoc::Message { .. })); - } - - #[test] - fn parse_react_with_key_in_header() { - let raw = "=== react $abc12345… šŸ‘€"; - let docs = parse_response(raw, &test_room()); - assert_eq!(docs.len(), 1); - match &docs[0] { ClaudeDoc::Reaction { target_id_arg, key } => { - assert_eq!(target_id_arg, "$abc12345…"); - assert_eq!(key, "šŸ‘€"); + let Some(full_eid) = timeline::resolve_event_id(&tl, &target_id_arg) else { + tracing::warn!(arg = %target_id_arg, "react: target event id not found in timeline"); + continue; + }; + let content = + ReactionEventContent::new(Annotation::new(full_eid.clone(), key.clone())); + match room.send(content).await { + Ok(_) => tracing::info!(target = %full_eid, %key, "sent reaction"), + Err(e) => tracing::error!("failed to send reaction: {e}"), + } } - _ => panic!("expected reaction"), } } - #[test] - fn parse_react_with_key_in_body() { - let raw = "=== react $abc12345…\nšŸ”„"; - let docs = parse_response(raw, &test_room()); - assert_eq!(docs.len(), 1); - match &docs[0] { - ClaudeDoc::Reaction { key, .. } => assert_eq!(key, "šŸ”„"), - _ => panic!("expected reaction"), + { + let mut state = state.lock().await; + if let Some(eid) = new_last_event_id.clone() { + state.last_shown.insert(room_id.clone(), eid); } } + handlers::send_read_receipt(room, new_last_event_id).await; - #[test] - fn parse_unknown_header_becomes_thought() { - let raw = "=== mystery foo\nbody"; - let docs = parse_response(raw, &test_room()); - assert_eq!(docs.len(), 1); - assert!(matches!(docs[0], ClaudeDoc::Thought(_))); - } + Ok(()) } diff --git a/src/session.rs b/src/session.rs new file mode 100644 index 0000000..c5d9a4b --- /dev/null +++ b/src/session.rs @@ -0,0 +1,64 @@ +use std::path::Path; + +use anyhow::Context; +use matrix_sdk::Client; +use tokio::fs; + +use crate::paths; +use crate::types::{Config, PersistedSession}; + +pub async fn load_config() -> anyhow::Result { + let data = fs::read_to_string(paths::config_path()) + .await + .context("failed to read config.json")?; + serde_json::from_str(&data).context("failed to parse config.json") +} + +pub async fn restore_session(session_file: &Path) -> anyhow::Result<(Client, Option)> { + let data = fs::read_to_string(session_file).await?; + let session: PersistedSession = serde_json::from_str(&data)?; + + tracing::info!(user = %session.user_session.meta.user_id, "restoring session"); + + let client = Client::builder() + .homeserver_url(&session.homeserver) + .sqlite_store(paths::db_path(), None) + .build() + .await?; + + client.restore_session(session.user_session).await?; + + Ok((client, session.sync_token)) +} + +pub async fn login(config: &Config, db_path: &Path, session_file: &Path) -> anyhow::Result { + tracing::info!(homeserver = %config.homeserver, user = %config.username, "logging in"); + + let client = Client::builder() + .homeserver_url(&config.homeserver) + .sqlite_store(db_path, None) + .build() + .await?; + + client + .matrix_auth() + .login_username(&config.username, &config.password) + .initial_device_display_name("damocles-daemon") + .await?; + + let user_session = client + .matrix_auth() + .session() + .context("no session after login")?; + + let persisted = PersistedSession { + homeserver: config.homeserver.clone(), + db_path: db_path.to_owned(), + user_session, + sync_token: None, + }; + fs::write(session_file, serde_json::to_string_pretty(&persisted)?).await?; + + tracing::info!("session persisted"); + Ok(client) +} diff --git a/src/timeline.rs b/src/timeline.rs new file mode 100644 index 0000000..c9cfedc --- /dev/null +++ b/src/timeline.rs @@ -0,0 +1,328 @@ +use std::collections::HashMap; +use std::fmt::Write as _; + +use matrix_sdk::{ + Room, + ruma::{OwnedEventId, OwnedUserId, events::room::message::MessageType}, +}; + +use crate::types::TimelineItem; + +/// Format a unix-seconds timestamp as `YYYY-MM-DD HH:MM` UTC. Returns "?" for 0. +pub fn format_ts(secs: i64) -> String { + if secs == 0 { + return "?".into(); + } + let days = secs.div_euclid(86400); + let day_secs = secs.rem_euclid(86400); + let (y, m, d) = days_to_ymd(days); + let h = day_secs / 3600; + let min = (day_secs % 3600) / 60; + format!("{y:04}-{m:02}-{d:02} {h:02}:{min:02}") +} + +pub fn chrono_now() -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0); + let days = secs / 86400; + let (y, m, d) = days_to_ymd(days); + format!("{y:04}-{m:02}-{d:02}") +} + +/// Convert days-since-1970-01-01 to (year, month, day). Civil-date algorithm. +fn days_to_ymd(z: i64) -> (i64, u32, u32) { + let z = z + 719_468; + let era = z.div_euclid(146_097); + let doe = z.rem_euclid(146_097) as u32; + let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365; + let y = yoe as i64 + era * 400; + let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); + let mp = (5 * doy + 2) / 153; + let d = doy - (153 * mp + 2) / 5 + 1; + let m = if mp < 10 { mp + 3 } else { mp - 9 }; + (if m <= 2 { y + 1 } else { y }, m, d) +} + +pub fn ts_secs_from(ts: matrix_sdk::ruma::UInt) -> i64 { + let ms: u64 = ts.into(); + i64::try_from(ms).unwrap_or(0) / 1000 +} + +/// Shorten an event id for prompt display: `$abc123def456...` -> `$abc123de`. +pub fn short_event_id(id: &OwnedEventId) -> String { + let s = id.as_str(); + let prefix: String = s.chars().take(9).collect(); + if s.len() > 9 { + format!("{prefix}…") + } else { + prefix + } +} + +/// Resolve a (possibly shortened/ellipsized) event id to a full one by +/// looking up against the timeline. Returns the matching message's full +/// event id if found. +pub fn resolve_event_id(timeline: &[TimelineItem], arg: &str) -> Option { + let cleaned = arg.trim_end_matches('…').trim_end_matches('.').trim(); + if cleaned.is_empty() { + return None; + } + for item in timeline { + if let TimelineItem::Message { event_id, .. } = item { + if event_id.as_str() == cleaned || event_id.as_str().starts_with(cleaned) { + return Some(event_id.clone()); + } + } + } + None +} + +/// Render one timeline item into the prompt. +/// Messages: `[ts] $eid... [(you) ]@user: body [read by: ...]` +/// Reactions: `[ts] [(you) ]@user reacted to $eid... with KEY` +pub fn render_timeline_item( + prompt: &mut String, + item: &TimelineItem, + read_markers: &HashMap>, +) { + match item { + TimelineItem::Message { + event_id, + sender, + body, + is_self, + ts, + .. + } => { + let ts_str = format_ts(*ts); + let id = short_event_id(event_id); + let prefix = if *is_self { "(you) " } else { "" }; + let readers_str = match read_markers.get(event_id) { + Some(rs) if !rs.is_empty() => { + let mut sorted = rs.clone(); + sorted.sort(); + let names: Vec = sorted.iter().map(|u| u.to_string()).collect(); + format!(" [read by: {}]", names.join(", ")) + } + _ => String::new(), + }; + writeln!( + prompt, + "[{ts_str}] {id} {prefix}{sender}: {body}{readers_str}" + ) + .unwrap(); + } + TimelineItem::Reaction { + sender, + target_event_id, + key, + is_self, + ts, + } => { + let ts_str = format_ts(*ts); + let id = short_event_id(target_event_id); + let prefix = if *is_self { "(you) " } else { "" }; + writeln!( + prompt, + "[{ts_str}] {prefix}{sender} reacted to {id} with {key}" + ) + .unwrap(); + } + } +} + +/// Load the last N timeline items (messages + reactions) from the room's +/// persistent event cache. Returns oldest-first. +/// +/// We walk events newest-first, collect messages until we have `limit`, then +/// also include any reactions whose timestamps fall within the message window. +pub async fn load_timeline( + room: &Room, + limit: usize, + own_user: &OwnedUserId, +) -> anyhow::Result> { + use matrix_sdk::ruma::events::AnySyncTimelineEvent; + + let (cache, _handles) = room.event_cache().await?; + let events = cache.events().await; + + let mut messages: Vec = Vec::new(); + let mut reactions: Vec = Vec::new(); + let mut earliest_message_ts: Option = None; + + for ev in events.iter().rev() { + let raw = ev.raw(); + let Ok(deserialized) = raw.deserialize() else { + continue; + }; + let AnySyncTimelineEvent::MessageLike(msg) = deserialized else { + continue; + }; + + match msg { + matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage( + matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), + ) => { + if messages.len() >= limit { + continue; + } + let MessageType::Text(text) = &orig.content.msgtype else { + continue; + }; + let ts = ts_secs_from(orig.origin_server_ts.0); + let in_reply_to = match &orig.content.relates_to { + Some(matrix_sdk::ruma::events::room::message::Relation::Reply { + in_reply_to, + }) => Some(in_reply_to.event_id.clone()), + _ => None, + }; + if earliest_message_ts.is_none_or(|e| ts < e) { + earliest_message_ts = Some(ts); + } + messages.push(TimelineItem::Message { + event_id: orig.event_id.clone(), + sender: orig.sender.clone(), + body: text.body.clone(), + is_self: &orig.sender == own_user, + ts, + in_reply_to, + }); + } + matrix_sdk::ruma::events::AnySyncMessageLikeEvent::Reaction( + matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), + ) => { + let ts = ts_secs_from(orig.origin_server_ts.0); + reactions.push(TimelineItem::Reaction { + sender: orig.sender.clone(), + target_event_id: orig.content.relates_to.event_id.clone(), + key: orig.content.relates_to.key.clone(), + is_self: &orig.sender == own_user, + ts, + }); + } + _ => {} + } + } + + if let Some(min_ts) = earliest_message_ts { + reactions.retain(|r| r.ts() >= min_ts); + } + + let mut combined: Vec = Vec::with_capacity(messages.len() + reactions.len()); + combined.extend(messages); + combined.extend(reactions); + combined.sort_by_key(TimelineItem::ts); + Ok(combined) +} + +/// Fetch a single text message by event_id from the room's event cache. +pub async fn fetch_message( + cache: &matrix_sdk::event_cache::RoomEventCache, + event_id: &matrix_sdk::ruma::EventId, + own_user: &OwnedUserId, +) -> Option { + use matrix_sdk::ruma::events::AnySyncTimelineEvent; + + let ev = cache.find_event(event_id).await?; + let deserialized = ev.raw().deserialize().ok()?; + let AnySyncTimelineEvent::MessageLike(msg) = deserialized else { + return None; + }; + let matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage( + matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), + ) = msg + else { + return None; + }; + let MessageType::Text(text) = &orig.content.msgtype else { + return None; + }; + let ts = ts_secs_from(orig.origin_server_ts.0); + Some(TimelineItem::Message { + event_id: orig.event_id.clone(), + sender: orig.sender.clone(), + body: text.body.clone(), + is_self: &orig.sender == own_user, + ts, + in_reply_to: None, + }) +} + +/// For each message in the timeline, compute the list of OTHER users who +/// have a read receipt at or after that message. Self is excluded. +pub async fn compute_read_markers( + room: &Room, + timeline: &[TimelineItem], + own_user: &OwnedUserId, +) -> HashMap> { + use matrix_sdk::ruma::events::receipt::{ReceiptThread, ReceiptType}; + + let mut users: Vec = timeline + .iter() + .filter(|t| !t.is_self()) + .map(|t| t.sender().clone()) + .collect(); + users.sort(); + users.dedup(); + + let positions: HashMap = timeline + .iter() + .enumerate() + .filter_map(|(i, t)| match t { + TimelineItem::Message { event_id, .. } => Some((event_id.clone(), i)), + _ => None, + }) + .collect(); + + let mut readers: HashMap> = HashMap::new(); + + for user in &users { + if user == own_user { + continue; + } + let (receipt_eid, receipt_ts) = match room + .load_user_receipt(ReceiptType::Read, ReceiptThread::Unthreaded, user) + .await + { + Ok(Some((eid, r))) => { + let ts = r.ts.map(|t| ts_secs_from(t.0)).unwrap_or(0); + (eid, ts) + } + _ => continue, + }; + + let user_msg_idx_inclusive: Option = if let Some(&p) = positions.get(&receipt_eid) { + Some(p) + } else { + let newest_msg_ts = timeline + .iter() + .rev() + .find_map(|t| match t { + TimelineItem::Message { ts, .. } => Some(*ts), + _ => None, + }) + .unwrap_or(0); + if receipt_ts > 0 && receipt_ts >= newest_msg_ts { + Some(timeline.len().saturating_sub(1)) + } else { + None + } + }; + + if let Some(up_to) = user_msg_idx_inclusive { + for item in timeline.iter().take(up_to + 1) { + if let TimelineItem::Message { event_id, .. } = item { + readers + .entry(event_id.clone()) + .or_default() + .push(user.clone()); + } + } + } + } + + readers +} diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 0000000..599e1ea --- /dev/null +++ b/src/types.rs @@ -0,0 +1,112 @@ +use std::collections::HashMap; + +use matrix_sdk::{ + authentication::matrix::MatrixSession, + ruma::{OwnedEventId, OwnedRoomId, OwnedUserId}, +}; +use serde::{Deserialize, Serialize}; + +pub const DEFAULT_MODEL: &str = "claude-sonnet-4-6"; +pub const DEFAULT_MAX_HISTORY: usize = 20; +pub const DEFAULT_RATE_LIMIT_PER_MIN: u32 = 1; + +#[derive(Debug, Deserialize)] +pub struct Config { + pub homeserver: String, + pub username: String, + pub password: String, + pub rate_limit_per_min: Option, + pub model: Option, + pub max_history: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PersistedSession { + pub homeserver: String, + pub db_path: std::path::PathBuf, + pub user_session: MatrixSession, + #[serde(skip_serializing_if = "Option::is_none")] + pub sync_token: Option, +} + +#[derive(Clone, Debug)] +pub enum TimelineItem { + Message { + event_id: OwnedEventId, + sender: OwnedUserId, + body: String, + is_self: bool, + /// Unix seconds. 0 if unknown. + ts: i64, + in_reply_to: Option, + }, + Reaction { + sender: OwnedUserId, + target_event_id: OwnedEventId, + key: String, + is_self: bool, + ts: i64, + }, +} + +impl TimelineItem { + pub fn ts(&self) -> i64 { + match self { + Self::Message { ts, .. } | Self::Reaction { ts, .. } => *ts, + } + } + + pub fn event_id(&self) -> Option<&OwnedEventId> { + match self { + Self::Message { event_id, .. } => Some(event_id), + Self::Reaction { .. } => None, + } + } + + pub fn sender(&self) -> &OwnedUserId { + match self { + Self::Message { sender, .. } | Self::Reaction { sender, .. } => sender, + } + } + + pub fn is_self(&self) -> bool { + match self { + Self::Message { is_self, .. } | Self::Reaction { is_self, .. } => *is_self, + } + } +} + +pub struct DaemonState { + pub own_user_id: OwnedUserId, + /// Per-room: the latest event_id that's been "shown" to Claude. Events + /// after this are "new" on the next invocation. Cleared on daemon restart. + pub last_shown: HashMap, + pub pending_rooms: Vec, + pub rate_budget: u32, + pub rate_limit_per_min: u32, + pub last_rate_reset: std::time::Instant, + pub model: String, + pub max_history: usize, +} + +pub enum ResponseTarget { + Room(OwnedRoomId), + Dm(OwnedUserId), +} + +/// One document within Claude's multi-doc output. Each doc has its own +/// frontmatter; the daemon routes based on which fields are present. +pub enum ClaudeDoc { + /// A chat message to send. + Message { + target: ResponseTarget, + body: String, + }, + /// A reaction to a message. `target_id_arg` is the event id (possibly + /// shortened) the agent saw in the prompt; daemon expands by prefix match. + Reaction { target_id_arg: String, key: String }, + /// Agent's internal monologue. Not sent to chat. Logged to tracing. + Thought(String), + /// Explicit "do nothing for this slot". Useful as a placeholder. + Skip, +}