From bf29e9f7d9149b72fe93f09ffe5cc044c6c8fe6c Mon Sep 17 00:00:00 2001 From: Damocles Date: Thu, 30 Apr 2026 22:27:51 +0200 Subject: [PATCH] surface reactions: messages get short event_id prefix, reactions appear as standalone timeline events --- src/main.rs | 331 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 239 insertions(+), 92 deletions(-) diff --git a/src/main.rs b/src/main.rs index f5a7469..94f3d3c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -51,12 +51,52 @@ struct PersistedSession { } #[derive(Clone, Debug)] -struct ChatMessage { - sender: OwnedUserId, - body: String, - is_self: bool, - /// Unix seconds. 0 if unknown. - ts: i64, +enum TimelineItem { + Message { + event_id: OwnedEventId, + sender: OwnedUserId, + body: String, + is_self: bool, + /// Unix seconds. 0 if unknown. + ts: i64, + in_reply_to: Option, + }, + Reaction { + sender: OwnedUserId, + target_event_id: OwnedEventId, + key: String, + is_self: bool, + ts: i64, + }, +} + +impl TimelineItem { + fn ts(&self) -> i64 { + match self { + TimelineItem::Message { ts, .. } | TimelineItem::Reaction { ts, .. } => *ts, + } + } + + fn event_id(&self) -> Option<&OwnedEventId> { + match self { + TimelineItem::Message { event_id, .. } => Some(event_id), + TimelineItem::Reaction { .. } => None, + } + } + + fn sender(&self) -> &OwnedUserId { + match self { + TimelineItem::Message { sender, .. } | TimelineItem::Reaction { sender, .. } => sender, + } + } + + fn is_self(&self) -> bool { + match self { + TimelineItem::Message { is_self, .. } | TimelineItem::Reaction { is_self, .. } => { + *is_self + } + } + } } struct DaemonState { @@ -493,28 +533,39 @@ async fn process_loop(state: Arc>, client: Client) { ) }; - // Load recent history from matrix-sdk's persistent event cache - let mut history = match load_recent_messages(&room, max_history).await { - Ok(h) => h, + // Load recent timeline (messages + reactions) from matrix-sdk's + // persistent event cache. + let mut timeline = match load_timeline(&room, max_history, &own_user).await { + Ok(t) => t, Err(e) => { - tracing::error!(room = %room_id, "failed to load history: {e}"); + tracing::error!(room = %room_id, "failed to load timeline: {e}"); continue; } }; // For any new messages that reply to events outside the window, fetch // the replied-to event from cache and prepend it as extra context. + let in_window: std::collections::HashSet = timeline + .iter() + .filter_map(TimelineItem::event_id) + .cloned() + .collect(); let seen_idx_initial = prev_last_shown .as_ref() - .and_then(|id| history.iter().position(|(eid, _, _, _, _)| eid == id)) + .and_then(|id| { + timeline.iter().position(|t| match t { + TimelineItem::Message { event_id, .. } => event_id == id, + _ => false, + }) + }) .map_or(0, |pos| pos + 1); - let in_window: std::collections::HashSet = history - .iter() - .map(|(eid, _, _, _, _)| eid.clone()) - .collect(); let mut reply_targets: Vec = Vec::new(); - for (_, _, _, _, in_reply_to) in history.iter().skip(seen_idx_initial) { - if let Some(target) = in_reply_to { + for item in timeline.iter().skip(seen_idx_initial) { + if let TimelineItem::Message { + in_reply_to: Some(target), + .. + } = item + { if !in_window.contains(target) && !reply_targets.contains(target) { reply_targets.push(target.clone()); } @@ -523,32 +574,32 @@ async fn process_loop(state: Arc>, client: Client) { if !reply_targets.is_empty() { if let Ok((cache, _h)) = room.event_cache().await { for target in &reply_targets { - if let Some(found) = fetch_message(&cache, target).await { - history.insert(0, found); + if let Some(found) = fetch_message(&cache, target, &own_user).await { + timeline.insert(0, found); } } } } - let chat_msgs: Vec = history - .iter() - .map(|(_, sender, body, ts, _)| ChatMessage { - sender: sender.clone(), - body: body.clone(), - is_self: sender == &own_user, - ts: *ts, - }) - .collect(); - - // Determine seen split: everything before (and including) prev_last_shown is "seen" + // Determine seen split based on the last_shown message event id let seen_idx = prev_last_shown .as_ref() - .and_then(|id| history.iter().position(|(eid, _, _, _, _)| eid == id)) + .and_then(|id| { + timeline.iter().position(|t| match t { + TimelineItem::Message { event_id, .. } => event_id == id, + _ => false, + }) + }) .map_or(0, |pos| pos + 1); - let new_last_event_id = history.last().map(|(eid, _, _, _, _)| eid.clone()); + // The "last shown" pointer should advance to the latest message we've + // loaded (not a reaction). + let new_last_event_id = timeline.iter().rev().find_map(|t| match t { + TimelineItem::Message { event_id, .. } => Some(event_id.clone()), + _ => None, + }); - let docs = match invoke_claude(&room_id, &room_name, &chat_msgs, seen_idx, &model).await { + let docs = match invoke_claude(&room_id, &room_name, &timeline, seen_idx, &model).await { Ok(d) => d, Err(e) => { tracing::error!(room = %room_id, "claude invocation failed: {e}"); @@ -645,62 +696,151 @@ async fn send_read_receipt(room: &Room, event_id: Option) { } } -/// Load the last N text messages from the room's persistent event cache. -/// Returns oldest-first list of (event_id, sender, body, ts_secs). -/// Returns oldest-first list of (event_id, sender, body, ts_secs, in_reply_to). -async fn load_recent_messages( +/// Load the last N timeline items (messages + reactions) from the room's +/// persistent event cache. Returns oldest-first. +/// +/// We walk events newest-first, collect messages until we have `limit`, then +/// also include any reactions whose timestamps fall within the message window. +async fn load_timeline( room: &Room, limit: usize, -) -> anyhow::Result)>> { + own_user: &OwnedUserId, +) -> anyhow::Result> { use matrix_sdk::ruma::events::AnySyncTimelineEvent; let (cache, _handles) = room.event_cache().await?; let events = cache.events().await; - let mut out: Vec<(OwnedEventId, OwnedUserId, String, i64, Option)> = Vec::new(); + let mut messages: Vec = Vec::new(); + let mut reactions: Vec = Vec::new(); + let mut earliest_message_ts: Option = None; + for ev in events.iter().rev() { - if out.len() >= limit { - break; - } let raw = ev.raw(); let Ok(deserialized) = raw.deserialize() else { continue; }; - if let AnySyncTimelineEvent::MessageLike(msg) = deserialized { - if let matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage( + let AnySyncTimelineEvent::MessageLike(msg) = deserialized else { + continue; + }; + + match msg { + matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage( matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), - ) = msg - { - if let MessageType::Text(text) = &orig.content.msgtype { - let ts_ms: u64 = orig.origin_server_ts.0.into(); - let ts_secs: i64 = i64::try_from(ts_ms).unwrap_or(0) / 1000; - let in_reply_to = match &orig.content.relates_to { - Some(matrix_sdk::ruma::events::room::message::Relation::Reply { - in_reply_to, - }) => Some(in_reply_to.event_id.clone()), - _ => None, - }; - out.push(( - orig.event_id.clone(), - orig.sender.clone(), - text.body.clone(), - ts_secs, - in_reply_to, - )); + ) => { + if messages.len() >= limit { + continue; } + let MessageType::Text(text) = &orig.content.msgtype else { + continue; + }; + let ts = ts_secs_from(orig.origin_server_ts.0); + let in_reply_to = match &orig.content.relates_to { + Some(matrix_sdk::ruma::events::room::message::Relation::Reply { + in_reply_to, + }) => Some(in_reply_to.event_id.clone()), + _ => None, + }; + if earliest_message_ts.is_none_or(|e| ts < e) { + earliest_message_ts = Some(ts); + } + messages.push(TimelineItem::Message { + event_id: orig.event_id.clone(), + sender: orig.sender.clone(), + body: text.body.clone(), + is_self: &orig.sender == own_user, + ts, + in_reply_to, + }); } + matrix_sdk::ruma::events::AnySyncMessageLikeEvent::Reaction( + matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), + ) => { + let ts = ts_secs_from(orig.origin_server_ts.0); + reactions.push(TimelineItem::Reaction { + sender: orig.sender.clone(), + target_event_id: orig.content.relates_to.event_id.clone(), + key: orig.content.relates_to.key.clone(), + is_self: &orig.sender == own_user, + ts, + }); + } + _ => {} } } - out.reverse(); - Ok(out) + // Drop reactions older than the oldest visible message, to avoid + // referencing messages we don't have in scope. + if let Some(min_ts) = earliest_message_ts { + reactions.retain(|r| r.ts() >= min_ts); + } + + let mut combined: Vec = Vec::with_capacity(messages.len() + reactions.len()); + combined.extend(messages); + combined.extend(reactions); + combined.sort_by_key(TimelineItem::ts); + Ok(combined) +} + +/// Render one timeline item into the prompt. +/// Messages: `[ts] $eid... [(you) ]@user: body` +/// Reactions: `[ts] [(you) ]@user reacted to $eid... with KEY` +fn render_timeline_item(prompt: &mut String, item: &TimelineItem) { + match item { + TimelineItem::Message { + event_id, + sender, + body, + is_self, + ts, + .. + } => { + let ts_str = format_ts(*ts); + let id = short_event_id(event_id); + let prefix = if *is_self { "(you) " } else { "" }; + writeln!(prompt, "[{ts_str}] {id} {prefix}{sender}: {body}").unwrap(); + } + TimelineItem::Reaction { + sender, + target_event_id, + key, + is_self, + ts, + } => { + let ts_str = format_ts(*ts); + let id = short_event_id(target_event_id); + let prefix = if *is_self { "(you) " } else { "" }; + writeln!( + prompt, + "[{ts_str}] {prefix}{sender} reacted to {id} with {key}" + ) + .unwrap(); + } + } +} + +/// Shorten an event id for prompt display: `$abc123def456...` → `$abc123de`. +fn short_event_id(id: &OwnedEventId) -> String { + let s = id.as_str(); + let prefix: String = s.chars().take(9).collect(); + if s.len() > 9 { + format!("{prefix}…") + } else { + prefix + } +} + +fn ts_secs_from(ts: matrix_sdk::ruma::UInt) -> i64 { + let ms: u64 = ts.into(); + i64::try_from(ms).unwrap_or(0) / 1000 } /// Fetch a single text message by event_id from the room's event cache. async fn fetch_message( cache: &matrix_sdk::event_cache::RoomEventCache, event_id: &matrix_sdk::ruma::EventId, -) -> Option<(OwnedEventId, OwnedUserId, String, i64, Option)> { + own_user: &OwnedUserId, +) -> Option { use matrix_sdk::ruma::events::AnySyncTimelineEvent; let ev = cache.find_event(event_id).await?; @@ -717,15 +857,15 @@ async fn fetch_message( let MessageType::Text(text) = &orig.content.msgtype else { return None; }; - let ts_ms: u64 = orig.origin_server_ts.0.into(); - let ts_secs: i64 = i64::try_from(ts_ms).unwrap_or(0) / 1000; - Some(( - orig.event_id.clone(), - orig.sender.clone(), - text.body.clone(), - ts_secs, - None, - )) + let ts = ts_secs_from(orig.origin_server_ts.0); + Some(TimelineItem::Message { + event_id: orig.event_id.clone(), + sender: orig.sender.clone(), + body: text.body.clone(), + is_self: &orig.sender == own_user, + ts, + in_reply_to: None, + }) } enum ResponseTarget { @@ -750,7 +890,7 @@ enum ClaudeDoc { async fn invoke_claude( source_room: &OwnedRoomId, room_name: &str, - history: &[ChatMessage], + timeline: &[TimelineItem], seen_idx: usize, model: &str, ) -> anyhow::Result> { @@ -766,11 +906,11 @@ async fn invoke_claude( ) .unwrap(); - // Collect unique non-self senders for per-person note discovery - let mut senders: Vec<&OwnedUserId> = history + // Collect unique non-self participants (message senders + reactors) + let mut senders: Vec<&OwnedUserId> = timeline .iter() - .filter(|m| !m.is_self) - .map(|m| &m.sender) + .filter(|t| !t.is_self()) + .map(TimelineItem::sender) .collect(); senders.sort(); senders.dedup(); @@ -785,30 +925,37 @@ async fn invoke_claude( } } - let seen = seen_idx.min(history.len()); - let (old, new) = history.split_at(seen); + let seen = seen_idx.min(timeline.len()); + let (old, new) = timeline.split_at(seen); if !old.is_empty() { - writeln!(prompt, "\n[previously seen messages — for context]").unwrap(); - for msg in old { - let prefix = if msg.is_self { "(you) " } else { "" }; - let ts = format_ts(msg.ts); - writeln!(prompt, "[{ts}] {prefix}{}: {}", msg.sender, msg.body).unwrap(); + writeln!(prompt, "\n[previously seen events — for context]").unwrap(); + for item in old { + render_timeline_item(&mut prompt, item); } } - writeln!(prompt, "\n[new messages — respond to these]").unwrap(); + writeln!(prompt, "\n[new events — respond to these]").unwrap(); if new.is_empty() { writeln!(prompt, "(none)").unwrap(); } else { - for msg in new { - let prefix = if msg.is_self { "(you) " } else { "" }; - let ts = format_ts(msg.ts); - writeln!(prompt, "[{ts}] {prefix}{}: {}", msg.sender, msg.body).unwrap(); + for item in new { + render_timeline_item(&mut prompt, item); } } - tracing::info!("invoking claude: {} new, {} seen", new.len(), old.len()); + let new_msg_count = new + .iter() + .filter(|t| matches!(t, TimelineItem::Message { .. })) + .count(); + let new_react_count = new.len() - new_msg_count; + tracing::info!( + "invoking claude: {} new ({} msg + {} react), {} seen", + new.len(), + new_msg_count, + new_react_count, + old.len() + ); tracing::trace!("full prompt:\n{prompt}"); use tokio::process::Command;