diff --git a/src/bin/mcp.rs b/src/bin/mcp.rs index def1cf3..85e40e5 100644 --- a/src/bin/mcp.rs +++ b/src/bin/mcp.rs @@ -58,6 +58,13 @@ mod protocol_inline { room_id: String, limit: Option, }, + + #[serde(rename = "fetch_event")] + FetchEvent { + room_id: String, + event_id: String, + context_before: Option, + }, } #[derive(Debug, Serialize, Deserialize)] @@ -125,6 +132,24 @@ struct GetRoomHistoryParams { limit: Option, } +#[derive(Debug, Deserialize, JsonSchema)] +struct FetchEventParams { + /// The event ID to fetch. Can be the shortened form shown in the + /// timeline (e.g. $abc123de...) or a full ID. Use this to dereference + /// reply targets shown as [reply to $abc...] in the prompt, or to look + /// up any specific event by ID. + event_id: String, + /// Number of context messages BEFORE the target to include. Default 0. + /// Pass 5-10 for conversational context. The response also includes an + /// `earlier_handle` event_id you can pass to fetch_event to page further + /// back into history. + #[serde(default)] + context_before: Option, + /// Room ID to look in. Defaults to source room if omitted. + #[serde(default)] + room_id: Option, +} + // --------------------------------------------------------------------------- // MCP server struct // --------------------------------------------------------------------------- @@ -268,7 +293,7 @@ impl MatrixBridge { Self::response_to_result(resp) } - #[tool(description = "Get recent message history for any joined room. Returns JSON list of messages and reactions with timestamps.")] + #[tool(description = "Get recent message history for any joined room. Returns JSON list of messages and reactions with timestamps. Backfills via /messages if cache is short.")] async fn get_room_history( &self, Parameters(params): Parameters, @@ -281,6 +306,22 @@ impl MatrixBridge { .await?; Self::response_to_result(resp) } + + #[tool(description = "Fetch a specific event by ID with optional context messages before it. Use to dereference [reply to $...] markers or arbitrary event IDs. Response includes earlier_handle for paging further back.")] + async fn fetch_event( + &self, + Parameters(params): Parameters, + ) -> Result { + let room_id = params.room_id.unwrap_or_else(|| self.source_room.clone()); + let resp = self + .call(&DaemonRequest::FetchEvent { + room_id, + event_id: params.event_id, + context_before: params.context_before, + }) + .await?; + Self::response_to_result(resp) + } } // --------------------------------------------------------------------------- diff --git a/src/claude.rs b/src/claude.rs index dc44b7a..4a68b67 100644 --- a/src/claude.rs +++ b/src/claude.rs @@ -58,7 +58,7 @@ pub async fn invoke_claude( "--add-dir", &identity_str, "--allowedTools", - "Read,Edit,Write,Glob,Grep,mcp__matrix__send_message,mcp__matrix__send_dm,mcp__matrix__send_reaction,mcp__matrix__send_reply,mcp__matrix__list_rooms,mcp__matrix__list_room_members,mcp__matrix__get_room_history", + "Read,Edit,Write,Glob,Grep,mcp__matrix__send_message,mcp__matrix__send_dm,mcp__matrix__send_reaction,mcp__matrix__send_reply,mcp__matrix__list_rooms,mcp__matrix__list_room_members,mcp__matrix__get_room_history,mcp__matrix__fetch_event", "--mcp-config", &mcp_config_str, "-p", diff --git a/src/protocol.rs b/src/protocol.rs index eb426f4..eb17b9a 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -35,6 +35,13 @@ pub enum DaemonRequest { room_id: String, limit: Option, }, + + #[serde(rename = "fetch_event")] + FetchEvent { + room_id: String, + event_id: String, + context_before: Option, + }, } /// Response from daemon to MCP server. diff --git a/src/socket.rs b/src/socket.rs index 1f0380f..08f1e70 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -77,6 +77,11 @@ async fn handle_request(request: DaemonRequest, client: &Client) -> DaemonRespon DaemonRequest::GetRoomHistory { room_id, limit } => { get_room_history(client, &room_id, limit).await } + DaemonRequest::FetchEvent { + room_id, + event_id, + context_before, + } => fetch_event(client, &room_id, &event_id, context_before).await, } } @@ -251,45 +256,202 @@ async fn get_room_history( }; let limit = limit.unwrap_or(20).min(100); - let tl = match timeline::load_timeline(&room, limit, &own_user).await { - Ok(t) => t, - Err(e) => return DaemonResponse::err(format!("failed to load timeline: {e}")), + // Backfill via /messages if cache is short + if let Ok((cache, _)) = room.event_cache().await { + let mut tl = match timeline::load_timeline(&room, limit, &own_user).await { + Ok(t) => t, + Err(e) => return DaemonResponse::err(format!("failed to load timeline: {e}")), + }; + let mut tries = 0; + while tl.len() < limit && tries < 5 { + tries += 1; + match cache.pagination().run_backwards_once((limit - tl.len()) as u16).await { + Ok(outcome) => { + if outcome.reached_start { + break; + } + } + Err(e) => { + tracing::warn!("backfill failed: {e}"); + break; + } + } + tl = match timeline::load_timeline(&room, limit, &own_user).await { + Ok(t) => t, + Err(e) => return DaemonResponse::err(format!("reload after backfill failed: {e}")), + }; + } + + let items: Vec<_> = tl.iter().map(timeline_item_to_json).collect(); + DaemonResponse::ok(items) + } else { + DaemonResponse::err("event cache not available".to_owned()) + } +} + +fn timeline_item_to_json(item: &crate::types::TimelineItem) -> serde_json::Value { + match item { + crate::types::TimelineItem::Message { + event_id, + sender, + body, + is_self, + ts, + in_reply_to, + } => json!({ + "kind": "message", + "event_id": event_id.as_str(), + "sender": sender.as_str(), + "body": body, + "is_self": is_self, + "ts": ts, + "in_reply_to": in_reply_to.as_ref().map(|e| e.as_str()), + }), + crate::types::TimelineItem::Reaction { + sender, + target_event_id, + key, + is_self, + ts, + } => json!({ + "kind": "reaction", + "sender": sender.as_str(), + "target_event_id": target_event_id.as_str(), + "key": key, + "is_self": is_self, + "ts": ts, + }), + } +} + +/// Fetch a specific event by ID via the homeserver `/context` endpoint. +/// Returns the event plus `context_before` events before it. Includes one +/// extra event as `earlier_handle` so the shard can page further backward +/// by calling fetch_event again with that id. +async fn fetch_event( + client: &Client, + room_id: &str, + event_id: &str, + context_before: Option, +) -> DaemonResponse { + use matrix_sdk::ruma::events::AnySyncTimelineEvent; + use matrix_sdk::ruma::events::room::message::MessageType; + + let rid = match room_id.parse::() { + Ok(r) => r, + Err(e) => return DaemonResponse::err(format!("invalid room_id: {e}")), + }; + let Some(room) = client.get_room(&rid) else { + return DaemonResponse::err(format!("room {rid} not found")); + }; + let own_user = match client.user_id() { + Some(u) => u.to_owned(), + None => return DaemonResponse::err("not logged in".to_owned()), }; - let items: Vec<_> = tl - .iter() - .map(|item| match item { - crate::types::TimelineItem::Message { - event_id, - sender, - body, - is_self, - ts, - in_reply_to, - } => json!({ - "kind": "message", - "event_id": event_id.as_str(), - "sender": sender.as_str(), - "body": body, - "is_self": is_self, - "ts": ts, - "in_reply_to": in_reply_to.as_ref().map(|e| e.as_str()), - }), - crate::types::TimelineItem::Reaction { - sender, - target_event_id, - key, - is_self, - ts, - } => json!({ - "kind": "reaction", - "sender": sender.as_str(), - "target_event_id": target_event_id.as_str(), - "key": key, - "is_self": is_self, - "ts": ts, - }), - }) - .collect(); - DaemonResponse::ok(items) + // Resolve possibly-shortened event id against recent timeline first; + // fall back to parsing as a full id. + let resolve_tl = timeline::load_timeline(&room, 50, &own_user) + .await + .unwrap_or_default(); + let full_eid = match timeline::resolve_event_id(&resolve_tl, event_id) { + Some(eid) => eid, + None => match event_id.parse::() { + Ok(eid) => eid, + Err(e) => { + return DaemonResponse::err(format!("invalid or unknown event_id: {e}")); + } + }, + }; + + // Request one extra event so we can split it off as the paging handle + let context_before = context_before.unwrap_or(0).min(50); + let request_size = context_before + 1; + + let response = match room + .event_with_context( + &full_eid, + false, + matrix_sdk::ruma::UInt::from(request_size), + None, + ) + .await + { + Ok(r) => r, + Err(e) => return DaemonResponse::err(format!("event_with_context failed: {e}")), + }; + + let render = |raw: &matrix_sdk::deserialized_responses::TimelineEvent| -> Option { + let deserialized = raw.raw().deserialize().ok()?; + let AnySyncTimelineEvent::MessageLike(msg) = deserialized else { + return None; + }; + match msg { + matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage( + matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), + ) => { + let MessageType::Text(text) = &orig.content.msgtype else { + return None; + }; + let ms: u64 = orig.origin_server_ts.0.into(); + let ts = (ms / 1000) as i64; + let in_reply_to = match &orig.content.relates_to { + Some(matrix_sdk::ruma::events::room::message::Relation::Reply { in_reply_to }) => { + Some(in_reply_to.event_id.as_str().to_owned()) + } + _ => None, + }; + Some(json!({ + "kind": "message", + "event_id": orig.event_id.as_str(), + "sender": orig.sender.as_str(), + "body": text.body, + "is_self": orig.sender == own_user, + "ts": ts, + "in_reply_to": in_reply_to, + })) + } + matrix_sdk::ruma::events::AnySyncMessageLikeEvent::Reaction( + matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig), + ) => { + let ms: u64 = orig.origin_server_ts.0.into(); + let ts = (ms / 1000) as i64; + Some(json!({ + "kind": "reaction", + "sender": orig.sender.as_str(), + "target_event_id": orig.content.relates_to.event_id.as_str(), + "key": orig.content.relates_to.key, + "is_self": orig.sender == own_user, + "ts": ts, + })) + } + _ => None, + } + }; + + // events_before is newest-first per matrix /context spec - reverse for chronological + let mut before: Vec<_> = response.events_before.iter().filter_map(render).collect(); + before.reverse(); + + // The "earlier handle" is the oldest event we got, if we asked for more than 0 + // It's used by the shard to page further back via another fetch_event call + let earlier_handle = if context_before > 0 && before.len() > context_before as usize { + before.first().and_then(|e| e.get("event_id").and_then(|v| v.as_str()).map(String::from)) + } else { + None + }; + let context_events: Vec<_> = if earlier_handle.is_some() { + // First event is the handle - skip it from the main events list + before.into_iter().skip(1).collect() + } else { + before + }; + + let target = response.event.as_ref().and_then(render); + + DaemonResponse::ok(json!({ + "event": target, + "context_before": context_events, + "earlier_handle": earlier_handle, + })) }