split main.rs into types, timeline, claude, handlers, session modules

This commit is contained in:
Damocles 2026-05-01 02:15:03 +02:00
parent 8d2f43b6c5
commit 09259ee5fa
6 changed files with 1337 additions and 1322 deletions

419
src/claude.rs Normal file
View file

@ -0,0 +1,419 @@
use std::collections::HashMap;
use std::fmt::Write as _;
use anyhow::{Context, bail};
use matrix_sdk::ruma::{OwnedEventId, OwnedRoomId, OwnedUserId};
use crate::paths;
use crate::timeline::render_timeline_item;
use crate::types::{ClaudeDoc, ResponseTarget, TimelineItem};
pub async fn invoke_claude(
source_room: &OwnedRoomId,
room_name: &str,
timeline: &[TimelineItem],
seen_idx: usize,
model: &str,
read_markers: &HashMap<OwnedEventId, Vec<OwnedUserId>>,
) -> anyhow::Result<Vec<ClaudeDoc>> {
let identity_dir = paths::identity_dir();
let identity_str = identity_dir.to_string_lossy();
let mut prompt = String::new();
writeln!(prompt, "[room_id: {source_room}]").unwrap();
writeln!(prompt, "[room_name: {room_name}]").unwrap();
writeln!(
prompt,
"[room notes path: ../rooms/{source_room}/notes.md (create dir if needed)]"
)
.unwrap();
// Collect unique non-self participants (message senders + reactors)
let mut senders: Vec<&OwnedUserId> = timeline
.iter()
.filter(|t| !t.is_self())
.map(TimelineItem::sender)
.collect();
senders.sort();
senders.dedup();
if !senders.is_empty() {
writeln!(
prompt,
"\n[people in this room - check ../people/<user_id>/notes.md for each]"
)
.unwrap();
for s in &senders {
writeln!(prompt, " {s}").unwrap();
}
}
let seen = seen_idx.min(timeline.len());
let (old, new) = timeline.split_at(seen);
if !old.is_empty() {
writeln!(prompt, "\n[previously seen events - for context]").unwrap();
for item in old {
render_timeline_item(&mut prompt, item, read_markers);
}
}
writeln!(prompt, "\n[new events - respond to these]").unwrap();
if new.is_empty() {
writeln!(prompt, "(none)").unwrap();
} else {
for item in new {
render_timeline_item(&mut prompt, item, read_markers);
}
}
let new_msg_count = new
.iter()
.filter(|t| matches!(t, TimelineItem::Message { .. }))
.count();
let new_react_count = new.len() - new_msg_count;
tracing::info!(
"invoking claude: {} new ({} msg + {} react), {} seen",
new.len(),
new_msg_count,
new_react_count,
old.len()
);
tracing::trace!("full prompt:\n{prompt}");
use tokio::process::Command;
let mut cmd = Command::new("claude");
cmd.args([
"--print",
"--model",
model,
"--add-dir",
&identity_str,
"--allowedTools",
"Read Edit Write Glob Grep",
"-p",
&prompt,
]);
cmd.current_dir(&identity_dir);
cmd.stdin(std::process::Stdio::null());
let output = cmd.output().await.context("failed to run claude")?;
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
if !output.status.success() {
bail!(
"claude exited with {}:\nstdout: {}\nstderr: {}",
output.status,
stdout,
stderr
);
}
if !stderr.is_empty() {
tracing::warn!("claude stderr: {stderr}");
}
let raw = String::from_utf8_lossy(&output.stdout).to_string();
Ok(parse_response(&raw, source_room))
}
/// Parse Claude's stdout into a list of documents.
///
/// Format: each doc starts with a line `=== <type> [arg]`. Body is everything
/// until the next `===` line or EOF. Types:
/// - `=== thought` -> `ClaudeDoc::Thought` (logged, not sent)
/// - `=== room [<room_id>]` -> `ClaudeDoc::Message` to that room (or source room if no arg)
/// - `=== dm <user_id>` -> `ClaudeDoc::Message` as DM
/// - `=== skip` -> `ClaudeDoc::Skip` (no-op)
///
/// Anything before the first `===` line is treated as a preamble thought.
/// Bare text with no `===` is treated as a single message to default_room.
pub fn parse_response(raw: &str, default_room: &OwnedRoomId) -> Vec<ClaudeDoc> {
let trimmed = raw.trim();
if trimmed.is_empty() {
return Vec::new();
}
let mut docs = Vec::new();
let mut current_header: Option<String> = None;
let mut current_body = String::new();
let mut preamble = String::new();
for line in trimmed.lines() {
if let Some(header) = line.strip_prefix("===") {
if let Some(h) = current_header.take() {
if let Some(doc) = build_doc(&h, current_body.trim(), default_room) {
docs.push(doc);
}
current_body.clear();
} else {
let p = preamble.trim();
if !p.is_empty() {
docs.push(ClaudeDoc::Thought(p.to_owned()));
}
preamble.clear();
}
current_header = Some(header.trim().to_owned());
} else if current_header.is_some() {
current_body.push_str(line);
current_body.push('\n');
} else {
preamble.push_str(line);
preamble.push('\n');
}
}
if let Some(h) = current_header {
if let Some(doc) = build_doc(&h, current_body.trim(), default_room) {
docs.push(doc);
}
} else {
let p = preamble.trim();
if !p.is_empty() {
docs.push(ClaudeDoc::Message {
target: ResponseTarget::Room(default_room.clone()),
body: p.to_owned(),
});
}
}
docs
}
fn build_doc(header: &str, body: &str, default_room: &OwnedRoomId) -> Option<ClaudeDoc> {
let mut parts = header.splitn(2, char::is_whitespace);
let kind = parts.next().unwrap_or("").trim();
let arg = parts.next().unwrap_or("").trim();
match kind {
"skip" => Some(ClaudeDoc::Skip),
"thought" => {
if body.is_empty() {
None
} else {
Some(ClaudeDoc::Thought(body.to_owned()))
}
}
"room" => {
if body.is_empty() {
return None;
}
let target = if arg.is_empty() {
ResponseTarget::Room(default_room.clone())
} else {
match arg.parse::<OwnedRoomId>() {
Ok(rid) => ResponseTarget::Room(rid),
Err(_) => return None,
}
};
Some(ClaudeDoc::Message {
target,
body: body.to_owned(),
})
}
"dm" => {
if body.is_empty() {
return None;
}
match arg.parse::<OwnedUserId>() {
Ok(uid) => Some(ClaudeDoc::Message {
target: ResponseTarget::Dm(uid),
body: body.to_owned(),
}),
Err(_) => None,
}
}
"react" => {
let mut header_parts = arg.splitn(2, char::is_whitespace);
let eid_arg = header_parts.next().unwrap_or("").trim();
let key_in_header = header_parts.next().unwrap_or("").trim();
if eid_arg.is_empty() {
return None;
}
let key = if !key_in_header.is_empty() {
key_in_header.to_owned()
} else if !body.is_empty() {
body.to_owned()
} else {
return None;
};
Some(ClaudeDoc::Reaction {
target_id_arg: eid_arg.to_owned(),
key,
})
}
_ => {
if body.is_empty() {
None
} else {
Some(ClaudeDoc::Thought(format!(
"[unknown header '{header}'] {body}"
)))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_room() -> OwnedRoomId {
"!test:example.com".parse().unwrap()
}
fn first_message(docs: &[ClaudeDoc]) -> (&ResponseTarget, &str) {
for d in docs {
if let ClaudeDoc::Message { target, body } = d {
return (target, body.as_str());
}
}
panic!("no message doc found");
}
fn assert_room(target: &ResponseTarget, expected: &str) {
match target {
ResponseTarget::Room(r) => assert_eq!(r.as_str(), expected),
ResponseTarget::Dm(_) => panic!("expected room target, got dm"),
}
}
#[test]
fn parse_room_with_arg() {
let raw = "=== room !other:server\nhello world";
let docs = parse_response(raw, &test_room());
let (target, body) = first_message(&docs);
assert_room(target, "!other:server");
assert_eq!(body, "hello world");
}
#[test]
fn parse_room_no_arg_uses_default() {
let raw = "=== room\nhi";
let docs = parse_response(raw, &test_room());
let (target, _) = first_message(&docs);
assert_room(target, "!test:example.com");
}
#[test]
fn parse_skip() {
let raw = "=== skip";
let docs = parse_response(raw, &test_room());
assert_eq!(docs.len(), 1);
assert!(matches!(docs[0], ClaudeDoc::Skip));
}
#[test]
fn parse_plain_text_no_header() {
let raw = "just a message";
let docs = parse_response(raw, &test_room());
let (target, body) = first_message(&docs);
assert_room(target, "!test:example.com");
assert_eq!(body, "just a message");
}
#[test]
fn parse_empty() {
assert!(parse_response("", &test_room()).is_empty());
assert!(parse_response(" \n ", &test_room()).is_empty());
}
#[test]
fn parse_dm() {
let raw = "=== dm @alice:example.com\nhi alice";
let docs = parse_response(raw, &test_room());
let (target, body) = first_message(&docs);
match target {
ResponseTarget::Dm(u) => assert_eq!(u.as_str(), "@alice:example.com"),
ResponseTarget::Room(_) => panic!("expected dm target"),
}
assert_eq!(body, "hi alice");
}
#[test]
fn parse_thought() {
let raw = "=== thought\nthinking about whether to reply...";
let docs = parse_response(raw, &test_room());
assert_eq!(docs.len(), 1);
match &docs[0] {
ClaudeDoc::Thought(s) => assert_eq!(s, "thinking about whether to reply..."),
_ => panic!("expected thought"),
}
}
#[test]
fn parse_multi_doc() {
let raw = "\
=== thought
let me check notes
=== room !x:y
hi
=== dm @u:s
private
=== skip
";
let docs = parse_response(raw, &test_room());
assert_eq!(docs.len(), 4);
assert!(matches!(docs[0], ClaudeDoc::Thought(_)));
assert!(matches!(
docs[1],
ClaudeDoc::Message {
target: ResponseTarget::Room(_),
..
}
));
assert!(matches!(
docs[2],
ClaudeDoc::Message {
target: ResponseTarget::Dm(_),
..
}
));
assert!(matches!(docs[3], ClaudeDoc::Skip));
}
#[test]
fn parse_preamble_becomes_thought() {
let raw = "preamble line\n=== room !x:y\nhello";
let docs = parse_response(raw, &test_room());
assert_eq!(docs.len(), 2);
assert!(matches!(docs[0], ClaudeDoc::Thought(_)));
assert!(matches!(docs[1], ClaudeDoc::Message { .. }));
}
#[test]
fn parse_react_with_key_in_header() {
let raw = "=== react $abc12345… 👀";
let docs = parse_response(raw, &test_room());
assert_eq!(docs.len(), 1);
match &docs[0] {
ClaudeDoc::Reaction { target_id_arg, key } => {
assert_eq!(target_id_arg, "$abc12345…");
assert_eq!(key, "👀");
}
_ => panic!("expected reaction"),
}
}
#[test]
fn parse_react_with_key_in_body() {
let raw = "=== react $abc12345…\n🔥";
let docs = parse_response(raw, &test_room());
assert_eq!(docs.len(), 1);
match &docs[0] {
ClaudeDoc::Reaction { key, .. } => assert_eq!(key, "🔥"),
_ => panic!("expected reaction"),
}
}
#[test]
fn parse_unknown_header_becomes_thought() {
let raw = "=== mystery foo\nbody";
let docs = parse_response(raw, &test_room());
assert_eq!(docs.len(), 1);
assert!(matches!(docs[0], ClaudeDoc::Thought(_)));
}
}

217
src/handlers.rs Normal file
View file

@ -0,0 +1,217 @@
use std::path::Path;
use std::sync::Arc;
use matrix_sdk::{
Client, Room, RoomState,
ruma::{
UserId,
api::client::receipt::create_receipt::v3::ReceiptType as CreateReceiptType,
events::{
receipt::ReceiptThread,
room::{
member::StrippedRoomMemberEvent,
message::{MessageType, OriginalSyncRoomMessageEvent},
},
},
},
};
use tokio::fs;
use tokio::sync::Mutex;
use crate::paths;
use crate::timeline::chrono_now;
use crate::types::{DaemonState, PersistedSession};
pub async fn on_room_message(
event: OriginalSyncRoomMessageEvent,
room: Room,
state: Arc<Mutex<DaemonState>>,
) {
if room.state() != RoomState::Joined {
return;
}
let MessageType::Text(text_content) = &event.content.msgtype else {
return;
};
let room_id = room.room_id().to_owned();
let is_self = {
let state = state.lock().await;
event.sender == state.own_user_id
};
tracing::info!(
room = %room_id,
sender = %event.sender,
self_msg = is_self,
"{}",
text_content.body
);
if let Err(e) = ensure_room_notes(&room).await {
tracing::warn!(room = %room_id, "failed to ensure room notes: {e}");
}
if !is_self {
if let Err(e) = ensure_person_notes(&room, &event.sender).await {
tracing::warn!(sender = %event.sender, "failed to ensure person notes: {e}");
}
}
if !is_self {
let mut state = state.lock().await;
if !state.pending_rooms.contains(&room_id) {
state.pending_rooms.push(room_id);
}
}
}
pub async fn on_reaction(
event: matrix_sdk::ruma::events::reaction::OriginalSyncReactionEvent,
room: Room,
state: Arc<Mutex<DaemonState>>,
) {
if room.state() != RoomState::Joined {
return;
}
let room_id = room.room_id().to_owned();
let mut state = state.lock().await;
let is_self = event.sender == state.own_user_id;
tracing::info!(
room = %room_id,
sender = %event.sender,
self_react = is_self,
target = %event.content.relates_to.event_id,
key = %event.content.relates_to.key,
"reaction"
);
if !is_self && !state.pending_rooms.contains(&room_id) {
state.pending_rooms.push(room_id);
}
}
pub async fn on_stripped_state_member(event: StrippedRoomMemberEvent, client: Client, room: Room) {
let Some(my_id) = client.user_id() else {
return;
};
if event.state_key != my_id {
return;
}
let room_id = room.room_id().to_owned();
tokio::spawn(async move {
tracing::info!(room = %room_id, "auto-joining invite");
let mut delay = 2u64;
loop {
match room.join().await {
Ok(()) => {
tracing::info!(room = %room_id, "joined");
if let Err(e) = ensure_room_notes(&room).await {
tracing::warn!(room = %room_id, "failed to write room notes: {e}");
}
return;
}
Err(e) => {
tracing::warn!(room = %room_id, "join failed, retry in {delay}s: {e}");
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
delay = (delay * 2).min(300);
if delay >= 300 {
tracing::error!(room = %room_id, "giving up on auto-join");
return;
}
}
}
}
});
}
/// Create state/rooms/<room_id>/notes.md if it doesn't exist.
pub async fn ensure_room_notes(room: &Room) -> anyhow::Result<()> {
let room_id = room.room_id();
let dir = paths::state_dir().join("rooms").join(room_id.as_str());
let file = dir.join("notes.md");
if file.exists() {
return Ok(());
}
fs::create_dir_all(&dir).await?;
let display_name = room
.display_name()
.await
.map(|n| n.to_string())
.unwrap_or_else(|_| room_id.to_string());
let now = chrono_now();
let body = format!("# {room_id}\n\nDisplay name: {display_name}\n\nFirst joined: {now}\n");
fs::write(&file, body).await?;
tracing::info!(room = %room_id, "created room notes");
Ok(())
}
/// Create state/people/<user_id>/notes.md if it doesn't exist.
/// Pre-fill with display name (from this room's member info) and the room as
/// "first met in".
pub async fn ensure_person_notes(room: &Room, user_id: &UserId) -> anyhow::Result<()> {
let dir = paths::state_dir().join("people").join(user_id.as_str());
let file = dir.join("notes.md");
if file.exists() {
return Ok(());
}
fs::create_dir_all(&dir).await?;
let display_name = room
.get_member_no_sync(user_id)
.await
.ok()
.flatten()
.and_then(|m| m.display_name().map(ToOwned::to_owned))
.unwrap_or_default();
let display_line = if display_name.is_empty() {
String::new()
} else {
format!("Display name: {display_name}\n")
};
let room_id = room.room_id();
let now = chrono_now();
let body = format!("# {user_id}\n\n{display_line}First met in: {room_id} on {now}\n");
fs::write(&file, body).await?;
tracing::info!(user = %user_id, "created person notes");
Ok(())
}
/// Find an existing DM room with the given user, or create one.
pub async fn find_or_create_dm(client: &Client, user_id: &UserId) -> anyhow::Result<Room> {
for room in client.joined_rooms() {
if room.is_direct().await.unwrap_or(false)
&& room
.direct_targets()
.iter()
.any(|t| t.as_str() == user_id.as_str())
{
return Ok(room);
}
}
tracing::info!(user = %user_id, "creating new DM room");
Ok(client.create_dm(user_id).await?)
}
pub async fn send_read_receipt(room: &Room, event_id: Option<matrix_sdk::ruma::OwnedEventId>) {
let Some(eid) = event_id else {
return;
};
if let Err(e) = room
.send_single_receipt(CreateReceiptType::Read, ReceiptThread::Unthreaded, eid)
.await
{
tracing::warn!(room = %room.room_id(), "failed to send read receipt: {e}");
}
}
pub async fn persist_sync_token(session_file: &Path, sync_token: String) -> anyhow::Result<()> {
let data = fs::read_to_string(session_file).await?;
let mut session: PersistedSession = serde_json::from_str(&data)?;
session.sync_token = Some(sync_token);
fs::write(session_file, serde_json::to_string_pretty(&session)?).await?;
Ok(())
}

File diff suppressed because it is too large Load diff

64
src/session.rs Normal file
View file

@ -0,0 +1,64 @@
use std::path::Path;
use anyhow::Context;
use matrix_sdk::Client;
use tokio::fs;
use crate::paths;
use crate::types::{Config, PersistedSession};
pub async fn load_config() -> anyhow::Result<Config> {
let data = fs::read_to_string(paths::config_path())
.await
.context("failed to read config.json")?;
serde_json::from_str(&data).context("failed to parse config.json")
}
pub async fn restore_session(session_file: &Path) -> anyhow::Result<(Client, Option<String>)> {
let data = fs::read_to_string(session_file).await?;
let session: PersistedSession = serde_json::from_str(&data)?;
tracing::info!(user = %session.user_session.meta.user_id, "restoring session");
let client = Client::builder()
.homeserver_url(&session.homeserver)
.sqlite_store(paths::db_path(), None)
.build()
.await?;
client.restore_session(session.user_session).await?;
Ok((client, session.sync_token))
}
pub async fn login(config: &Config, db_path: &Path, session_file: &Path) -> anyhow::Result<Client> {
tracing::info!(homeserver = %config.homeserver, user = %config.username, "logging in");
let client = Client::builder()
.homeserver_url(&config.homeserver)
.sqlite_store(db_path, None)
.build()
.await?;
client
.matrix_auth()
.login_username(&config.username, &config.password)
.initial_device_display_name("damocles-daemon")
.await?;
let user_session = client
.matrix_auth()
.session()
.context("no session after login")?;
let persisted = PersistedSession {
homeserver: config.homeserver.clone(),
db_path: db_path.to_owned(),
user_session,
sync_token: None,
};
fs::write(session_file, serde_json::to_string_pretty(&persisted)?).await?;
tracing::info!("session persisted");
Ok(client)
}

328
src/timeline.rs Normal file
View file

@ -0,0 +1,328 @@
use std::collections::HashMap;
use std::fmt::Write as _;
use matrix_sdk::{
Room,
ruma::{OwnedEventId, OwnedUserId, events::room::message::MessageType},
};
use crate::types::TimelineItem;
/// Format a unix-seconds timestamp as `YYYY-MM-DD HH:MM` UTC. Returns "?" for 0.
pub fn format_ts(secs: i64) -> String {
if secs == 0 {
return "?".into();
}
let days = secs.div_euclid(86400);
let day_secs = secs.rem_euclid(86400);
let (y, m, d) = days_to_ymd(days);
let h = day_secs / 3600;
let min = (day_secs % 3600) / 60;
format!("{y:04}-{m:02}-{d:02} {h:02}:{min:02}")
}
pub fn chrono_now() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let days = secs / 86400;
let (y, m, d) = days_to_ymd(days);
format!("{y:04}-{m:02}-{d:02}")
}
/// Convert days-since-1970-01-01 to (year, month, day). Civil-date algorithm.
fn days_to_ymd(z: i64) -> (i64, u32, u32) {
let z = z + 719_468;
let era = z.div_euclid(146_097);
let doe = z.rem_euclid(146_097) as u32;
let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
let y = yoe as i64 + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
(if m <= 2 { y + 1 } else { y }, m, d)
}
pub fn ts_secs_from(ts: matrix_sdk::ruma::UInt) -> i64 {
let ms: u64 = ts.into();
i64::try_from(ms).unwrap_or(0) / 1000
}
/// Shorten an event id for prompt display: `$abc123def456...` -> `$abc123de`.
pub fn short_event_id(id: &OwnedEventId) -> String {
let s = id.as_str();
let prefix: String = s.chars().take(9).collect();
if s.len() > 9 {
format!("{prefix}")
} else {
prefix
}
}
/// Resolve a (possibly shortened/ellipsized) event id to a full one by
/// looking up against the timeline. Returns the matching message's full
/// event id if found.
pub fn resolve_event_id(timeline: &[TimelineItem], arg: &str) -> Option<OwnedEventId> {
let cleaned = arg.trim_end_matches('…').trim_end_matches('.').trim();
if cleaned.is_empty() {
return None;
}
for item in timeline {
if let TimelineItem::Message { event_id, .. } = item {
if event_id.as_str() == cleaned || event_id.as_str().starts_with(cleaned) {
return Some(event_id.clone());
}
}
}
None
}
/// Render one timeline item into the prompt.
/// Messages: `[ts] $eid... [(you) ]@user: body [read by: ...]`
/// Reactions: `[ts] [(you) ]@user reacted to $eid... with KEY`
pub fn render_timeline_item(
prompt: &mut String,
item: &TimelineItem,
read_markers: &HashMap<OwnedEventId, Vec<OwnedUserId>>,
) {
match item {
TimelineItem::Message {
event_id,
sender,
body,
is_self,
ts,
..
} => {
let ts_str = format_ts(*ts);
let id = short_event_id(event_id);
let prefix = if *is_self { "(you) " } else { "" };
let readers_str = match read_markers.get(event_id) {
Some(rs) if !rs.is_empty() => {
let mut sorted = rs.clone();
sorted.sort();
let names: Vec<String> = sorted.iter().map(|u| u.to_string()).collect();
format!(" [read by: {}]", names.join(", "))
}
_ => String::new(),
};
writeln!(
prompt,
"[{ts_str}] {id} {prefix}{sender}: {body}{readers_str}"
)
.unwrap();
}
TimelineItem::Reaction {
sender,
target_event_id,
key,
is_self,
ts,
} => {
let ts_str = format_ts(*ts);
let id = short_event_id(target_event_id);
let prefix = if *is_self { "(you) " } else { "" };
writeln!(
prompt,
"[{ts_str}] {prefix}{sender} reacted to {id} with {key}"
)
.unwrap();
}
}
}
/// Load the last N timeline items (messages + reactions) from the room's
/// persistent event cache. Returns oldest-first.
///
/// We walk events newest-first, collect messages until we have `limit`, then
/// also include any reactions whose timestamps fall within the message window.
pub async fn load_timeline(
room: &Room,
limit: usize,
own_user: &OwnedUserId,
) -> anyhow::Result<Vec<TimelineItem>> {
use matrix_sdk::ruma::events::AnySyncTimelineEvent;
let (cache, _handles) = room.event_cache().await?;
let events = cache.events().await;
let mut messages: Vec<TimelineItem> = Vec::new();
let mut reactions: Vec<TimelineItem> = Vec::new();
let mut earliest_message_ts: Option<i64> = None;
for ev in events.iter().rev() {
let raw = ev.raw();
let Ok(deserialized) = raw.deserialize() else {
continue;
};
let AnySyncTimelineEvent::MessageLike(msg) = deserialized else {
continue;
};
match msg {
matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage(
matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig),
) => {
if messages.len() >= limit {
continue;
}
let MessageType::Text(text) = &orig.content.msgtype else {
continue;
};
let ts = ts_secs_from(orig.origin_server_ts.0);
let in_reply_to = match &orig.content.relates_to {
Some(matrix_sdk::ruma::events::room::message::Relation::Reply {
in_reply_to,
}) => Some(in_reply_to.event_id.clone()),
_ => None,
};
if earliest_message_ts.is_none_or(|e| ts < e) {
earliest_message_ts = Some(ts);
}
messages.push(TimelineItem::Message {
event_id: orig.event_id.clone(),
sender: orig.sender.clone(),
body: text.body.clone(),
is_self: &orig.sender == own_user,
ts,
in_reply_to,
});
}
matrix_sdk::ruma::events::AnySyncMessageLikeEvent::Reaction(
matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig),
) => {
let ts = ts_secs_from(orig.origin_server_ts.0);
reactions.push(TimelineItem::Reaction {
sender: orig.sender.clone(),
target_event_id: orig.content.relates_to.event_id.clone(),
key: orig.content.relates_to.key.clone(),
is_self: &orig.sender == own_user,
ts,
});
}
_ => {}
}
}
if let Some(min_ts) = earliest_message_ts {
reactions.retain(|r| r.ts() >= min_ts);
}
let mut combined: Vec<TimelineItem> = Vec::with_capacity(messages.len() + reactions.len());
combined.extend(messages);
combined.extend(reactions);
combined.sort_by_key(TimelineItem::ts);
Ok(combined)
}
/// Fetch a single text message by event_id from the room's event cache.
pub async fn fetch_message(
cache: &matrix_sdk::event_cache::RoomEventCache,
event_id: &matrix_sdk::ruma::EventId,
own_user: &OwnedUserId,
) -> Option<TimelineItem> {
use matrix_sdk::ruma::events::AnySyncTimelineEvent;
let ev = cache.find_event(event_id).await?;
let deserialized = ev.raw().deserialize().ok()?;
let AnySyncTimelineEvent::MessageLike(msg) = deserialized else {
return None;
};
let matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage(
matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig),
) = msg
else {
return None;
};
let MessageType::Text(text) = &orig.content.msgtype else {
return None;
};
let ts = ts_secs_from(orig.origin_server_ts.0);
Some(TimelineItem::Message {
event_id: orig.event_id.clone(),
sender: orig.sender.clone(),
body: text.body.clone(),
is_self: &orig.sender == own_user,
ts,
in_reply_to: None,
})
}
/// For each message in the timeline, compute the list of OTHER users who
/// have a read receipt at or after that message. Self is excluded.
pub async fn compute_read_markers(
room: &Room,
timeline: &[TimelineItem],
own_user: &OwnedUserId,
) -> HashMap<OwnedEventId, Vec<OwnedUserId>> {
use matrix_sdk::ruma::events::receipt::{ReceiptThread, ReceiptType};
let mut users: Vec<OwnedUserId> = timeline
.iter()
.filter(|t| !t.is_self())
.map(|t| t.sender().clone())
.collect();
users.sort();
users.dedup();
let positions: HashMap<OwnedEventId, usize> = timeline
.iter()
.enumerate()
.filter_map(|(i, t)| match t {
TimelineItem::Message { event_id, .. } => Some((event_id.clone(), i)),
_ => None,
})
.collect();
let mut readers: HashMap<OwnedEventId, Vec<OwnedUserId>> = HashMap::new();
for user in &users {
if user == own_user {
continue;
}
let (receipt_eid, receipt_ts) = match room
.load_user_receipt(ReceiptType::Read, ReceiptThread::Unthreaded, user)
.await
{
Ok(Some((eid, r))) => {
let ts = r.ts.map(|t| ts_secs_from(t.0)).unwrap_or(0);
(eid, ts)
}
_ => continue,
};
let user_msg_idx_inclusive: Option<usize> = if let Some(&p) = positions.get(&receipt_eid) {
Some(p)
} else {
let newest_msg_ts = timeline
.iter()
.rev()
.find_map(|t| match t {
TimelineItem::Message { ts, .. } => Some(*ts),
_ => None,
})
.unwrap_or(0);
if receipt_ts > 0 && receipt_ts >= newest_msg_ts {
Some(timeline.len().saturating_sub(1))
} else {
None
}
};
if let Some(up_to) = user_msg_idx_inclusive {
for item in timeline.iter().take(up_to + 1) {
if let TimelineItem::Message { event_id, .. } = item {
readers
.entry(event_id.clone())
.or_default()
.push(user.clone());
}
}
}
}
readers
}

112
src/types.rs Normal file
View file

@ -0,0 +1,112 @@
use std::collections::HashMap;
use matrix_sdk::{
authentication::matrix::MatrixSession,
ruma::{OwnedEventId, OwnedRoomId, OwnedUserId},
};
use serde::{Deserialize, Serialize};
pub const DEFAULT_MODEL: &str = "claude-sonnet-4-6";
pub const DEFAULT_MAX_HISTORY: usize = 20;
pub const DEFAULT_RATE_LIMIT_PER_MIN: u32 = 1;
#[derive(Debug, Deserialize)]
pub struct Config {
pub homeserver: String,
pub username: String,
pub password: String,
pub rate_limit_per_min: Option<u32>,
pub model: Option<String>,
pub max_history: Option<usize>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PersistedSession {
pub homeserver: String,
pub db_path: std::path::PathBuf,
pub user_session: MatrixSession,
#[serde(skip_serializing_if = "Option::is_none")]
pub sync_token: Option<String>,
}
#[derive(Clone, Debug)]
pub enum TimelineItem {
Message {
event_id: OwnedEventId,
sender: OwnedUserId,
body: String,
is_self: bool,
/// Unix seconds. 0 if unknown.
ts: i64,
in_reply_to: Option<OwnedEventId>,
},
Reaction {
sender: OwnedUserId,
target_event_id: OwnedEventId,
key: String,
is_self: bool,
ts: i64,
},
}
impl TimelineItem {
pub fn ts(&self) -> i64 {
match self {
Self::Message { ts, .. } | Self::Reaction { ts, .. } => *ts,
}
}
pub fn event_id(&self) -> Option<&OwnedEventId> {
match self {
Self::Message { event_id, .. } => Some(event_id),
Self::Reaction { .. } => None,
}
}
pub fn sender(&self) -> &OwnedUserId {
match self {
Self::Message { sender, .. } | Self::Reaction { sender, .. } => sender,
}
}
pub fn is_self(&self) -> bool {
match self {
Self::Message { is_self, .. } | Self::Reaction { is_self, .. } => *is_self,
}
}
}
pub struct DaemonState {
pub own_user_id: OwnedUserId,
/// Per-room: the latest event_id that's been "shown" to Claude. Events
/// after this are "new" on the next invocation. Cleared on daemon restart.
pub last_shown: HashMap<OwnedRoomId, OwnedEventId>,
pub pending_rooms: Vec<OwnedRoomId>,
pub rate_budget: u32,
pub rate_limit_per_min: u32,
pub last_rate_reset: std::time::Instant,
pub model: String,
pub max_history: usize,
}
pub enum ResponseTarget {
Room(OwnedRoomId),
Dm(OwnedUserId),
}
/// One document within Claude's multi-doc output. Each doc has its own
/// frontmatter; the daemon routes based on which fields are present.
pub enum ClaudeDoc {
/// A chat message to send.
Message {
target: ResponseTarget,
body: String,
},
/// A reaction to a message. `target_id_arg` is the event id (possibly
/// shortened) the agent saw in the prompt; daemon expands by prefix match.
Reaction { target_id_arg: String, key: String },
/// Agent's internal monologue. Not sent to chat. Logged to tracing.
Thought(String),
/// Explicit "do nothing for this slot". Useful as a placeholder.
Skip,
}