use matrix-sdk event cache for persistent room history, drop in-memory message buffers
This commit is contained in:
parent
6a3e7a8019
commit
50e2695e93
1 changed files with 103 additions and 43 deletions
146
src/main.rs
146
src/main.rs
|
|
@ -10,7 +10,7 @@ use matrix_sdk::{
|
||||||
authentication::matrix::MatrixSession,
|
authentication::matrix::MatrixSession,
|
||||||
config::SyncSettings,
|
config::SyncSettings,
|
||||||
ruma::{
|
ruma::{
|
||||||
OwnedRoomId, OwnedUserId,
|
OwnedEventId, OwnedRoomId, OwnedUserId,
|
||||||
api::client::filter::FilterDefinition,
|
api::client::filter::FilterDefinition,
|
||||||
events::room::message::{
|
events::room::message::{
|
||||||
MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent,
|
MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent,
|
||||||
|
|
@ -48,10 +48,9 @@ struct ChatMessage {
|
||||||
|
|
||||||
struct DaemonState {
|
struct DaemonState {
|
||||||
own_user_id: OwnedUserId,
|
own_user_id: OwnedUserId,
|
||||||
room_history: std::collections::HashMap<OwnedRoomId, Vec<ChatMessage>>,
|
/// Per-room: the latest event_id that's been "shown" to Claude. Events
|
||||||
/// For each room, the index in room_history up to which messages have been
|
/// after this are "new" on the next invocation. Cleared on daemon restart.
|
||||||
/// shown to Claude. Messages at indexes >= this value are "new".
|
last_shown: std::collections::HashMap<OwnedRoomId, OwnedEventId>,
|
||||||
seen_count: std::collections::HashMap<OwnedRoomId, usize>,
|
|
||||||
pending_rooms: Vec<OwnedRoomId>,
|
pending_rooms: Vec<OwnedRoomId>,
|
||||||
rate_budget: u32,
|
rate_budget: u32,
|
||||||
rate_limit_per_min: u32,
|
rate_limit_per_min: u32,
|
||||||
|
|
@ -92,10 +91,15 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let own_user_id = client.user_id().context("not logged in")?.to_owned();
|
let own_user_id = client.user_id().context("not logged in")?.to_owned();
|
||||||
tracing::info!(user = %own_user_id, rate_limit = rate_limit_per_min, "ready");
|
tracing::info!(user = %own_user_id, rate_limit = rate_limit_per_min, "ready");
|
||||||
|
|
||||||
|
// Enable persistent event cache (matrix-sdk's sqlite store keeps the timeline)
|
||||||
|
client
|
||||||
|
.event_cache()
|
||||||
|
.subscribe()
|
||||||
|
.context("subscribe event cache")?;
|
||||||
|
|
||||||
let state = Arc::new(Mutex::new(DaemonState {
|
let state = Arc::new(Mutex::new(DaemonState {
|
||||||
own_user_id,
|
own_user_id,
|
||||||
room_history: std::collections::HashMap::new(),
|
last_shown: std::collections::HashMap::new(),
|
||||||
seen_count: std::collections::HashMap::new(),
|
|
||||||
pending_rooms: Vec::new(),
|
pending_rooms: Vec::new(),
|
||||||
rate_budget: rate_limit_per_min,
|
rate_budget: rate_limit_per_min,
|
||||||
rate_limit_per_min,
|
rate_limit_per_min,
|
||||||
|
|
@ -230,34 +234,17 @@ async fn on_room_message(
|
||||||
};
|
};
|
||||||
|
|
||||||
let room_id = room.room_id().to_owned();
|
let room_id = room.room_id().to_owned();
|
||||||
let room_name = room
|
|
||||||
.display_name()
|
|
||||||
.await
|
|
||||||
.map_or_else(|_| room_id.to_string(), |n| n.to_string());
|
|
||||||
|
|
||||||
let mut state = state.lock().await;
|
let mut state = state.lock().await;
|
||||||
let is_self = event.sender == state.own_user_id;
|
let is_self = event.sender == state.own_user_id;
|
||||||
|
|
||||||
let msg = ChatMessage {
|
|
||||||
sender: event.sender.clone(),
|
|
||||||
body: text_content.body.clone(),
|
|
||||||
is_self,
|
|
||||||
};
|
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
room = %room_name,
|
room = %room_id,
|
||||||
sender = %event.sender,
|
sender = %event.sender,
|
||||||
self_msg = is_self,
|
self_msg = is_self,
|
||||||
"{}",
|
"{}",
|
||||||
text_content.body
|
text_content.body
|
||||||
);
|
);
|
||||||
|
|
||||||
let history = state.room_history.entry(room_id.clone()).or_default();
|
|
||||||
history.push(msg);
|
|
||||||
if history.len() > MAX_HISTORY {
|
|
||||||
history.drain(..history.len() - MAX_HISTORY);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !is_self && !state.pending_rooms.contains(&room_id) {
|
if !is_self && !state.pending_rooms.contains(&room_id) {
|
||||||
state.pending_rooms.push(room_id);
|
state.pending_rooms.push(room_id);
|
||||||
}
|
}
|
||||||
|
|
@ -286,31 +273,63 @@ async fn process_loop(state: Arc<Mutex<DaemonState>>, client: Client) {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
let (history, seen_idx) = {
|
// Snapshot last_shown for this room so we can mark seen vs new
|
||||||
|
let prev_last_shown = {
|
||||||
let state = state.lock().await;
|
let state = state.lock().await;
|
||||||
let history = state
|
state.last_shown.get(&room_id).cloned()
|
||||||
.room_history
|
|
||||||
.get(&room_id)
|
|
||||||
.cloned()
|
|
||||||
.unwrap_or_default();
|
|
||||||
let seen = state.seen_count.get(&room_id).copied().unwrap_or(0);
|
|
||||||
(history, seen)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let room_name = client
|
let Some(room) = client.get_room(&room_id) else {
|
||||||
.get_room(&room_id)
|
tracing::warn!(room = %room_id, "room not found in client");
|
||||||
.map_or_else(|| room_id.to_string(), |r| r.room_id().to_string());
|
continue;
|
||||||
|
};
|
||||||
|
let room_name = room
|
||||||
|
.display_name()
|
||||||
|
.await
|
||||||
|
.map_or_else(|_| room_id.to_string(), |n| n.to_string());
|
||||||
|
|
||||||
match invoke_claude(&room_id, &room_name, &history, seen_idx).await {
|
// Load recent history from matrix-sdk's persistent event cache
|
||||||
|
let history = match load_recent_messages(&room, MAX_HISTORY).await {
|
||||||
|
Ok(h) => h,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(room = %room_id, "failed to load history: {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let own_user = {
|
||||||
|
let state = state.lock().await;
|
||||||
|
state.own_user_id.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
let chat_msgs: Vec<ChatMessage> = history
|
||||||
|
.iter()
|
||||||
|
.map(|(_, sender, body)| ChatMessage {
|
||||||
|
sender: sender.clone(),
|
||||||
|
body: body.clone(),
|
||||||
|
is_self: sender == &own_user,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Determine seen split: everything before (and including) prev_last_shown is "seen"
|
||||||
|
let seen_idx = prev_last_shown
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|id| history.iter().position(|(eid, _, _)| eid == id))
|
||||||
|
.map_or(0, |pos| pos + 1);
|
||||||
|
|
||||||
|
let new_last_event_id = history.last().map(|(eid, _, _)| eid.clone());
|
||||||
|
|
||||||
|
match invoke_claude(&room_id, &room_name, &chat_msgs, seen_idx).await {
|
||||||
Ok(Some(response)) => {
|
Ok(Some(response)) => {
|
||||||
if let Some(room) = client.get_room(&response.room) {
|
if let Some(target_room) = client.get_room(&response.room) {
|
||||||
let content = RoomMessageEventContent::text_plain(&response.body);
|
let content = RoomMessageEventContent::text_plain(&response.body);
|
||||||
match room.send(content).await {
|
match target_room.send(content).await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
let mut state = state.lock().await;
|
let mut state = state.lock().await;
|
||||||
state.rate_budget = state.rate_budget.saturating_sub(1);
|
state.rate_budget = state.rate_budget.saturating_sub(1);
|
||||||
// Mark current history as seen
|
if let Some(eid) = new_last_event_id {
|
||||||
state.seen_count.insert(room_id.clone(), history.len());
|
state.last_shown.insert(room_id.clone(), eid);
|
||||||
|
}
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
room = %response.room,
|
room = %response.room,
|
||||||
"sent response ({} budget remaining)",
|
"sent response ({} budget remaining)",
|
||||||
|
|
@ -325,9 +344,10 @@ async fn process_loop(state: Arc<Mutex<DaemonState>>, client: Client) {
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
tracing::debug!(room = %room_id, "claude chose to skip");
|
tracing::debug!(room = %room_id, "claude chose to skip");
|
||||||
// Even on skip, mark messages as seen so we don't reprocess
|
|
||||||
let mut state = state.lock().await;
|
let mut state = state.lock().await;
|
||||||
state.seen_count.insert(room_id.clone(), history.len());
|
if let Some(eid) = new_last_event_id {
|
||||||
|
state.last_shown.insert(room_id.clone(), eid);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!(room = %room_id, "claude invocation failed: {e}");
|
tracing::error!(room = %room_id, "claude invocation failed: {e}");
|
||||||
|
|
@ -336,6 +356,46 @@ async fn process_loop(state: Arc<Mutex<DaemonState>>, client: Client) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Load the last N text messages from the room's persistent event cache.
|
||||||
|
/// Returns oldest-first list of (event_id, sender, body).
|
||||||
|
async fn load_recent_messages(
|
||||||
|
room: &Room,
|
||||||
|
limit: usize,
|
||||||
|
) -> anyhow::Result<Vec<(OwnedEventId, OwnedUserId, String)>> {
|
||||||
|
use matrix_sdk::ruma::events::AnySyncTimelineEvent;
|
||||||
|
|
||||||
|
let (cache, _handles) = room.event_cache().await?;
|
||||||
|
let events = cache.events().await;
|
||||||
|
|
||||||
|
let mut out: Vec<(OwnedEventId, OwnedUserId, String)> = Vec::new();
|
||||||
|
for ev in events.iter().rev() {
|
||||||
|
if out.len() >= limit {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let raw = ev.raw();
|
||||||
|
let Ok(deserialized) = raw.deserialize() else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
if let AnySyncTimelineEvent::MessageLike(msg) = deserialized {
|
||||||
|
if let matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage(
|
||||||
|
matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig),
|
||||||
|
) = msg
|
||||||
|
{
|
||||||
|
if let MessageType::Text(text) = &orig.content.msgtype {
|
||||||
|
out.push((
|
||||||
|
orig.event_id.clone(),
|
||||||
|
orig.sender.clone(),
|
||||||
|
text.body.clone(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
out.reverse();
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
|
|
||||||
struct ClaudeResponse {
|
struct ClaudeResponse {
|
||||||
room: OwnedRoomId,
|
room: OwnedRoomId,
|
||||||
body: String,
|
body: String,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue