use std::collections::HashMap; use matrix_sdk::{ Room, ruma::{OwnedEventId, OwnedUserId, events::room::message::MessageType}, }; use crate::types::{EditRecord, TimelineItem}; /// Format a unix-seconds timestamp as `YYYY-MM-DD HH:MM` UTC. Returns "?" for 0. pub fn format_ts(secs: i64) -> String { if secs == 0 { return "?".into(); } let days = secs.div_euclid(86400); let day_secs = secs.rem_euclid(86400); let (y, m, d) = days_to_ymd(days); let h = day_secs / 3600; let min = (day_secs % 3600) / 60; format!("{y:04}-{m:02}-{d:02} {h:02}:{min:02}") } pub fn chrono_now() -> String { use std::time::{SystemTime, UNIX_EPOCH}; let secs = SystemTime::now() .duration_since(UNIX_EPOCH) .map(|d| d.as_secs() as i64) .unwrap_or(0); let days = secs / 86400; let (y, m, d) = days_to_ymd(days); format!("{y:04}-{m:02}-{d:02}") } /// Convert days-since-1970-01-01 to (year, month, day). Civil-date algorithm. fn days_to_ymd(z: i64) -> (i64, u32, u32) { let z = z + 719_468; let era = z.div_euclid(146_097); let doe = z.rem_euclid(146_097) as u32; let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365; let y = yoe as i64 + era * 400; let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); let mp = (5 * doy + 2) / 153; let d = doy - (153 * mp + 2) / 5 + 1; let m = if mp < 10 { mp + 3 } else { mp - 9 }; (if m <= 2 { y + 1 } else { y }, m, d) } pub fn ts_secs_from(ts: matrix_sdk::ruma::UInt) -> i64 { let ms: u64 = ts.into(); i64::try_from(ms).unwrap_or(0) / 1000 } /// Resolve a (possibly shortened/ellipsized) event id to a full one by /// looking up against the timeline. Returns the matching message's full /// event id if found. pub fn resolve_event_id(timeline: &[TimelineItem], arg: &str) -> Option { let cleaned = arg.trim_end_matches('…').trim_end_matches('.').trim(); if cleaned.is_empty() { return None; } for item in timeline { if let TimelineItem::Message { event_id, .. } = item { if event_id.as_str() == cleaned || event_id.as_str().starts_with(cleaned) { return Some(event_id.clone()); } } } None } /// Load the last N timeline items (messages + reactions) from the room's /// persistent event cache. Returns oldest-first. /// /// We walk events newest-first, collect messages until we have `limit`, then /// also include any reactions whose timestamps fall within the message window. pub async fn load_timeline( room: &Room, limit: usize, own_user: &OwnedUserId, ) -> anyhow::Result> { use matrix_sdk::ruma::events::AnySyncTimelineEvent; let (cache, _handles) = room.event_cache().await?; let events = cache.events().await; use matrix_sdk::ruma::events::room::message::Relation; struct StashedEdit { sender: OwnedUserId, target: OwnedEventId, new_body: String, ts: i64, is_self: bool, } let mut messages: Vec = Vec::new(); let mut reactions: Vec = Vec::new(); let mut stashed_edits: Vec = Vec::new(); let mut earliest_message_ts: Option = None; for ev in events.iter().rev() { let raw = ev.raw(); let Ok(deserialized) = raw.deserialize() else { continue; }; let AnySyncTimelineEvent::MessageLike(msg) = deserialized else { continue; }; match msg { matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage( matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), ) => { let ts = ts_secs_from(orig.origin_server_ts.0); // Edit event? Stash for second pass; don't count toward limit. if let Some(Relation::Replacement(replacement)) = &orig.content.relates_to { if let MessageType::Text(text) = &replacement.new_content.msgtype { stashed_edits.push(StashedEdit { sender: orig.sender.clone(), target: replacement.event_id.clone(), new_body: text.body.clone(), ts, is_self: &orig.sender == own_user, }); } continue; } if messages.len() >= limit { continue; } let MessageType::Text(text) = &orig.content.msgtype else { continue; }; let in_reply_to = match &orig.content.relates_to { Some(Relation::Reply { in_reply_to }) => Some(in_reply_to.event_id.clone()), _ => None, }; if earliest_message_ts.is_none_or(|e| ts < e) { earliest_message_ts = Some(ts); } messages.push(TimelineItem::Message { event_id: orig.event_id.clone(), sender: orig.sender.clone(), body: text.body.clone(), is_self: &orig.sender == own_user, ts, in_reply_to, edit_history: Vec::new(), }); } matrix_sdk::ruma::events::AnySyncMessageLikeEvent::Reaction( matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), ) => { let ts = ts_secs_from(orig.origin_server_ts.0); reactions.push(TimelineItem::Reaction { sender: orig.sender.clone(), target_event_id: orig.content.relates_to.event_id.clone(), key: orig.content.relates_to.key.clone(), is_self: &orig.sender == own_user, ts, }); } _ => {} } } // Build per-target chain (original + edits, sorted oldest-first), used // for both edit_history population AND old_body lookup on Edit items. let mut chains: HashMap> = HashMap::new(); for msg in &messages { if let TimelineItem::Message { event_id, body, ts, .. } = msg { chains .entry(event_id.clone()) .or_default() .push((body.clone(), *ts)); } } for edit in &stashed_edits { chains .entry(edit.target.clone()) .or_default() .push((edit.new_body.clone(), edit.ts)); } for chain in chains.values_mut() { chain.sort_by_key(|(_, t)| *t); } // Update messages with edit_history + latest body. for item in &mut messages { if let TimelineItem::Message { event_id, body, edit_history, .. } = item { let Some(chain) = chains.get(event_id) else { continue; }; if chain.len() <= 1 { continue; } let (latest, _) = chain.last().unwrap(); *body = latest.clone(); *edit_history = chain[..chain.len() - 1] .iter() .map(|(b, t)| EditRecord { body: b.clone(), ts: *t, ts_human: format!("{} UTC", format_ts(*t)), }) .collect(); } } // Build Edit timeline items (only for edits whose target is in window). let in_window_targets: std::collections::HashSet = messages .iter() .filter_map(|m| match m { TimelineItem::Message { event_id, .. } => Some(event_id.clone()), _ => None, }) .collect(); let mut edit_items: Vec = Vec::new(); for edit in &stashed_edits { if !in_window_targets.contains(&edit.target) { continue; } let chain = chains.get(&edit.target).expect("chain exists for in-window target"); let pos = chain .iter() .position(|(b, t)| *t == edit.ts && *b == edit.new_body) .unwrap_or(0); let old_body = if pos > 0 { chain[pos - 1].0.clone() } else { String::new() }; edit_items.push(TimelineItem::Edit { sender: edit.sender.clone(), target_event_id: edit.target.clone(), old_body, new_body: edit.new_body.clone(), is_self: edit.is_self, ts: edit.ts, }); } if let Some(min_ts) = earliest_message_ts { reactions.retain(|r| r.ts() >= min_ts); edit_items.retain(|e| e.ts() >= min_ts); } let mut combined: Vec = Vec::with_capacity(messages.len() + reactions.len() + edit_items.len()); combined.extend(messages); combined.extend(reactions); combined.extend(edit_items); combined.sort_by_key(TimelineItem::ts); Ok(combined) } /// Fetch a single text message by event_id from the room's event cache. pub async fn fetch_message( cache: &matrix_sdk::event_cache::RoomEventCache, event_id: &matrix_sdk::ruma::EventId, own_user: &OwnedUserId, ) -> Option { use matrix_sdk::ruma::events::AnySyncTimelineEvent; let ev = cache.find_event(event_id).await?; let deserialized = ev.raw().deserialize().ok()?; let AnySyncTimelineEvent::MessageLike(msg) = deserialized else { return None; }; let matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage( matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), ) = msg else { return None; }; let MessageType::Text(text) = &orig.content.msgtype else { return None; }; let ts = ts_secs_from(orig.origin_server_ts.0); Some(TimelineItem::Message { event_id: orig.event_id.clone(), sender: orig.sender.clone(), body: text.body.clone(), is_self: &orig.sender == own_user, ts, in_reply_to: None, edit_history: Vec::new(), }) } /// For each message in the timeline, compute the list of OTHER users who /// have a read receipt at or after that message. Self is excluded. pub async fn compute_read_markers( room: &Room, timeline: &[TimelineItem], own_user: &OwnedUserId, ) -> HashMap> { use matrix_sdk::ruma::events::receipt::{ReceiptThread, ReceiptType}; let mut users: Vec = timeline .iter() .filter(|t| !t.is_self()) .map(|t| t.sender().clone()) .collect(); users.sort(); users.dedup(); let positions: HashMap = timeline .iter() .enumerate() .filter_map(|(i, t)| match t { TimelineItem::Message { event_id, .. } => Some((event_id.clone(), i)), _ => None, }) .collect(); let mut readers: HashMap> = HashMap::new(); for user in &users { if user == own_user { continue; } let (receipt_eid, receipt_ts) = match room .load_user_receipt(ReceiptType::Read, ReceiptThread::Unthreaded, user) .await { Ok(Some((eid, r))) => { let ts = r.ts.map(|t| ts_secs_from(t.0)).unwrap_or(0); (eid, ts) } _ => continue, }; let user_msg_idx_inclusive: Option = if let Some(&p) = positions.get(&receipt_eid) { Some(p) } else { let newest_msg_ts = timeline .iter() .rev() .find_map(|t| match t { TimelineItem::Message { ts, .. } => Some(*ts), _ => None, }) .unwrap_or(0); if receipt_ts > 0 && receipt_ts >= newest_msg_ts { Some(timeline.len().saturating_sub(1)) } else { None } }; if let Some(up_to) = user_msg_idx_inclusive { for item in timeline.iter().take(up_to + 1) { if let TimelineItem::Message { event_id, .. } = item { readers .entry(event_id.clone()) .or_default() .push(user.clone()); } } } } readers }