matrix read receipts; seed last_shown from stored receipts on restart

This commit is contained in:
Damocles 2026-04-30 21:09:03 +02:00
parent 5ab592071e
commit 31ad42f637

View file

@ -11,10 +11,15 @@ use matrix_sdk::{
config::SyncSettings,
ruma::{
OwnedEventId, OwnedRoomId, OwnedUserId, UserId,
api::client::filter::FilterDefinition,
events::room::{
member::StrippedRoomMemberEvent,
message::{MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent},
api::client::{
filter::FilterDefinition, receipt::create_receipt::v3::ReceiptType as CreateReceiptType,
},
events::{
receipt::ReceiptThread,
room::{
member::StrippedRoomMemberEvent,
message::{MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent},
},
},
},
};
@ -434,16 +439,42 @@ async fn process_loop(state: Arc<Mutex<DaemonState>>, client: Client) {
continue;
};
// Snapshot last_shown for this room so we can mark seen vs new
let prev_last_shown = {
let state = state.lock().await;
state.last_shown.get(&room_id).cloned()
};
let Some(room) = client.get_room(&room_id) else {
tracing::warn!(room = %room_id, "room not found in client");
continue;
};
// Snapshot last_shown for this room so we can mark seen vs new.
// If we don't have one in memory (e.g. fresh daemon start), seed from
// the stored read receipt so we don't reprocess old messages.
let in_memory = {
let s = state.lock().await;
s.last_shown.get(&room_id).cloned()
};
let prev_last_shown = if let Some(eid) = in_memory {
Some(eid)
} else {
let from_receipt = match room
.load_user_receipt(
matrix_sdk::ruma::events::receipt::ReceiptType::Read,
ReceiptThread::Unthreaded,
client.user_id().expect("logged in"),
)
.await
{
Ok(Some((eid, _))) => Some(eid),
Ok(None) => None,
Err(e) => {
tracing::warn!(room = %room_id, "failed to load receipt: {e}");
None
}
};
if let Some(ref eid) = from_receipt {
let mut s = state.lock().await;
s.last_shown.insert(room_id.clone(), eid.clone());
}
from_receipt
};
let room_name = room
.display_name()
.await
@ -489,7 +520,7 @@ async fn process_loop(state: Arc<Mutex<DaemonState>>, client: Client) {
Ok(_) => {
let mut state = state.lock().await;
state.rate_budget = state.rate_budget.saturating_sub(1);
if let Some(eid) = new_last_event_id {
if let Some(eid) = new_last_event_id.clone() {
state.last_shown.insert(room_id.clone(), eid);
}
tracing::info!(
@ -497,6 +528,8 @@ async fn process_loop(state: Arc<Mutex<DaemonState>>, client: Client) {
"sent response ({} budget remaining)",
state.rate_budget
);
drop(state);
send_read_receipt(&room, new_last_event_id.clone()).await;
}
Err(e) => tracing::error!("failed to send: {e}"),
}
@ -506,10 +539,13 @@ async fn process_loop(state: Arc<Mutex<DaemonState>>, client: Client) {
}
Ok(None) => {
tracing::debug!(room = %room_id, "claude chose to skip");
let mut state = state.lock().await;
if let Some(eid) = new_last_event_id {
state.last_shown.insert(room_id.clone(), eid);
{
let mut state = state.lock().await;
if let Some(eid) = new_last_event_id.clone() {
state.last_shown.insert(room_id.clone(), eid);
}
}
send_read_receipt(&room, new_last_event_id.clone()).await;
}
Err(e) => {
tracing::error!(room = %room_id, "claude invocation failed: {e}");
@ -518,6 +554,18 @@ async fn process_loop(state: Arc<Mutex<DaemonState>>, client: Client) {
}
}
async fn send_read_receipt(room: &Room, event_id: Option<OwnedEventId>) {
let Some(eid) = event_id else {
return;
};
if let Err(e) = room
.send_single_receipt(CreateReceiptType::Read, ReceiptThread::Unthreaded, eid)
.await
{
tracing::warn!(room = %room.room_id(), "failed to send read receipt: {e}");
}
}
/// Load the last N text messages from the room's persistent event cache.
/// Returns oldest-first list of (event_id, sender, body, ts_secs).
async fn load_recent_messages(