diff --git a/src/main.rs b/src/main.rs index 878f5b8..d36ef89 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,7 @@ use matrix_sdk::{ authentication::matrix::MatrixSession, config::SyncSettings, ruma::{ - OwnedRoomId, OwnedUserId, + OwnedEventId, OwnedRoomId, OwnedUserId, api::client::filter::FilterDefinition, events::room::message::{ MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent, @@ -48,10 +48,9 @@ struct ChatMessage { struct DaemonState { own_user_id: OwnedUserId, - room_history: std::collections::HashMap>, - /// For each room, the index in room_history up to which messages have been - /// shown to Claude. Messages at indexes >= this value are "new". - seen_count: std::collections::HashMap, + /// Per-room: the latest event_id that's been "shown" to Claude. Events + /// after this are "new" on the next invocation. Cleared on daemon restart. + last_shown: std::collections::HashMap, pending_rooms: Vec, rate_budget: u32, rate_limit_per_min: u32, @@ -92,10 +91,15 @@ async fn main() -> anyhow::Result<()> { let own_user_id = client.user_id().context("not logged in")?.to_owned(); tracing::info!(user = %own_user_id, rate_limit = rate_limit_per_min, "ready"); + // Enable persistent event cache (matrix-sdk's sqlite store keeps the timeline) + client + .event_cache() + .subscribe() + .context("subscribe event cache")?; + let state = Arc::new(Mutex::new(DaemonState { own_user_id, - room_history: std::collections::HashMap::new(), - seen_count: std::collections::HashMap::new(), + last_shown: std::collections::HashMap::new(), pending_rooms: Vec::new(), rate_budget: rate_limit_per_min, rate_limit_per_min, @@ -230,34 +234,17 @@ async fn on_room_message( }; let room_id = room.room_id().to_owned(); - let room_name = room - .display_name() - .await - .map_or_else(|_| room_id.to_string(), |n| n.to_string()); - let mut state = state.lock().await; let is_self = event.sender == state.own_user_id; - let msg = ChatMessage { - sender: event.sender.clone(), - body: text_content.body.clone(), - is_self, - }; - tracing::info!( - room = %room_name, + room = %room_id, sender = %event.sender, self_msg = is_self, "{}", text_content.body ); - let history = state.room_history.entry(room_id.clone()).or_default(); - history.push(msg); - if history.len() > MAX_HISTORY { - history.drain(..history.len() - MAX_HISTORY); - } - if !is_self && !state.pending_rooms.contains(&room_id) { state.pending_rooms.push(room_id); } @@ -286,31 +273,63 @@ async fn process_loop(state: Arc>, client: Client) { continue; }; - let (history, seen_idx) = { + // Snapshot last_shown for this room so we can mark seen vs new + let prev_last_shown = { let state = state.lock().await; - let history = state - .room_history - .get(&room_id) - .cloned() - .unwrap_or_default(); - let seen = state.seen_count.get(&room_id).copied().unwrap_or(0); - (history, seen) + state.last_shown.get(&room_id).cloned() }; - let room_name = client - .get_room(&room_id) - .map_or_else(|| room_id.to_string(), |r| r.room_id().to_string()); + let Some(room) = client.get_room(&room_id) else { + tracing::warn!(room = %room_id, "room not found in client"); + continue; + }; + let room_name = room + .display_name() + .await + .map_or_else(|_| room_id.to_string(), |n| n.to_string()); - match invoke_claude(&room_id, &room_name, &history, seen_idx).await { + // Load recent history from matrix-sdk's persistent event cache + let history = match load_recent_messages(&room, MAX_HISTORY).await { + Ok(h) => h, + Err(e) => { + tracing::error!(room = %room_id, "failed to load history: {e}"); + continue; + } + }; + + let own_user = { + let state = state.lock().await; + state.own_user_id.clone() + }; + + let chat_msgs: Vec = history + .iter() + .map(|(_, sender, body)| ChatMessage { + sender: sender.clone(), + body: body.clone(), + is_self: sender == &own_user, + }) + .collect(); + + // Determine seen split: everything before (and including) prev_last_shown is "seen" + let seen_idx = prev_last_shown + .as_ref() + .and_then(|id| history.iter().position(|(eid, _, _)| eid == id)) + .map_or(0, |pos| pos + 1); + + let new_last_event_id = history.last().map(|(eid, _, _)| eid.clone()); + + match invoke_claude(&room_id, &room_name, &chat_msgs, seen_idx).await { Ok(Some(response)) => { - if let Some(room) = client.get_room(&response.room) { + if let Some(target_room) = client.get_room(&response.room) { let content = RoomMessageEventContent::text_plain(&response.body); - match room.send(content).await { + match target_room.send(content).await { Ok(_) => { let mut state = state.lock().await; state.rate_budget = state.rate_budget.saturating_sub(1); - // Mark current history as seen - state.seen_count.insert(room_id.clone(), history.len()); + if let Some(eid) = new_last_event_id { + state.last_shown.insert(room_id.clone(), eid); + } tracing::info!( room = %response.room, "sent response ({} budget remaining)", @@ -325,9 +344,10 @@ async fn process_loop(state: Arc>, client: Client) { } Ok(None) => { tracing::debug!(room = %room_id, "claude chose to skip"); - // Even on skip, mark messages as seen so we don't reprocess let mut state = state.lock().await; - state.seen_count.insert(room_id.clone(), history.len()); + if let Some(eid) = new_last_event_id { + state.last_shown.insert(room_id.clone(), eid); + } } Err(e) => { tracing::error!(room = %room_id, "claude invocation failed: {e}"); @@ -336,6 +356,46 @@ async fn process_loop(state: Arc>, client: Client) { } } +/// Load the last N text messages from the room's persistent event cache. +/// Returns oldest-first list of (event_id, sender, body). +async fn load_recent_messages( + room: &Room, + limit: usize, +) -> anyhow::Result> { + use matrix_sdk::ruma::events::AnySyncTimelineEvent; + + let (cache, _handles) = room.event_cache().await?; + let events = cache.events().await; + + let mut out: Vec<(OwnedEventId, OwnedUserId, String)> = Vec::new(); + for ev in events.iter().rev() { + if out.len() >= limit { + break; + } + let raw = ev.raw(); + let Ok(deserialized) = raw.deserialize() else { + continue; + }; + if let AnySyncTimelineEvent::MessageLike(msg) = deserialized { + if let matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage( + matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), + ) = msg + { + if let MessageType::Text(text) = &orig.content.msgtype { + out.push(( + orig.event_id.clone(), + orig.sender.clone(), + text.body.clone(), + )); + } + } + } + } + + out.reverse(); + Ok(out) +} + struct ClaudeResponse { room: OwnedRoomId, body: String,