From 31ad42f637bed44e77c7f3dd28298ec4cff5a783 Mon Sep 17 00:00:00 2001 From: Damocles Date: Thu, 30 Apr 2026 21:09:03 +0200 Subject: [PATCH] matrix read receipts; seed last_shown from stored receipts on restart --- src/main.rs | 76 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 62 insertions(+), 14 deletions(-) diff --git a/src/main.rs b/src/main.rs index 15f7646..23fd631 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,10 +11,15 @@ use matrix_sdk::{ config::SyncSettings, ruma::{ OwnedEventId, OwnedRoomId, OwnedUserId, UserId, - api::client::filter::FilterDefinition, - events::room::{ - member::StrippedRoomMemberEvent, - message::{MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent}, + api::client::{ + filter::FilterDefinition, receipt::create_receipt::v3::ReceiptType as CreateReceiptType, + }, + events::{ + receipt::ReceiptThread, + room::{ + member::StrippedRoomMemberEvent, + message::{MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent}, + }, }, }, }; @@ -434,16 +439,42 @@ async fn process_loop(state: Arc>, client: Client) { continue; }; - // Snapshot last_shown for this room so we can mark seen vs new - let prev_last_shown = { - let state = state.lock().await; - state.last_shown.get(&room_id).cloned() - }; - let Some(room) = client.get_room(&room_id) else { tracing::warn!(room = %room_id, "room not found in client"); continue; }; + + // Snapshot last_shown for this room so we can mark seen vs new. + // If we don't have one in memory (e.g. fresh daemon start), seed from + // the stored read receipt so we don't reprocess old messages. + let in_memory = { + let s = state.lock().await; + s.last_shown.get(&room_id).cloned() + }; + let prev_last_shown = if let Some(eid) = in_memory { + Some(eid) + } else { + let from_receipt = match room + .load_user_receipt( + matrix_sdk::ruma::events::receipt::ReceiptType::Read, + ReceiptThread::Unthreaded, + client.user_id().expect("logged in"), + ) + .await + { + Ok(Some((eid, _))) => Some(eid), + Ok(None) => None, + Err(e) => { + tracing::warn!(room = %room_id, "failed to load receipt: {e}"); + None + } + }; + if let Some(ref eid) = from_receipt { + let mut s = state.lock().await; + s.last_shown.insert(room_id.clone(), eid.clone()); + } + from_receipt + }; let room_name = room .display_name() .await @@ -489,7 +520,7 @@ async fn process_loop(state: Arc>, client: Client) { Ok(_) => { let mut state = state.lock().await; state.rate_budget = state.rate_budget.saturating_sub(1); - if let Some(eid) = new_last_event_id { + if let Some(eid) = new_last_event_id.clone() { state.last_shown.insert(room_id.clone(), eid); } tracing::info!( @@ -497,6 +528,8 @@ async fn process_loop(state: Arc>, client: Client) { "sent response ({} budget remaining)", state.rate_budget ); + drop(state); + send_read_receipt(&room, new_last_event_id.clone()).await; } Err(e) => tracing::error!("failed to send: {e}"), } @@ -506,10 +539,13 @@ async fn process_loop(state: Arc>, client: Client) { } Ok(None) => { tracing::debug!(room = %room_id, "claude chose to skip"); - let mut state = state.lock().await; - if let Some(eid) = new_last_event_id { - state.last_shown.insert(room_id.clone(), eid); + { + let mut state = state.lock().await; + if let Some(eid) = new_last_event_id.clone() { + state.last_shown.insert(room_id.clone(), eid); + } } + send_read_receipt(&room, new_last_event_id.clone()).await; } Err(e) => { tracing::error!(room = %room_id, "claude invocation failed: {e}"); @@ -518,6 +554,18 @@ async fn process_loop(state: Arc>, client: Client) { } } +async fn send_read_receipt(room: &Room, event_id: Option) { + let Some(eid) = event_id else { + return; + }; + if let Err(e) = room + .send_single_receipt(CreateReceiptType::Read, ReceiptThread::Unthreaded, eid) + .await + { + tracing::warn!(room = %room.room_id(), "failed to send read receipt: {e}"); + } +} + /// Load the last N text messages from the room's persistent event cache. /// Returns oldest-first list of (event_id, sender, body, ts_secs). async fn load_recent_messages(