add fetch_event tool and backfill get_room_history beyond event cache

This commit is contained in:
Damocles 2026-05-01 03:33:04 +02:00
parent e538be2c3a
commit 829a60854f
4 changed files with 251 additions and 41 deletions

View file

@ -58,6 +58,13 @@ mod protocol_inline {
room_id: String,
limit: Option<usize>,
},
#[serde(rename = "fetch_event")]
FetchEvent {
room_id: String,
event_id: String,
context_before: Option<u32>,
},
}
#[derive(Debug, Serialize, Deserialize)]
@ -125,6 +132,24 @@ struct GetRoomHistoryParams {
limit: Option<usize>,
}
#[derive(Debug, Deserialize, JsonSchema)]
struct FetchEventParams {
/// The event ID to fetch. Can be the shortened form shown in the
/// timeline (e.g. $abc123de...) or a full ID. Use this to dereference
/// reply targets shown as [reply to $abc...] in the prompt, or to look
/// up any specific event by ID.
event_id: String,
/// Number of context messages BEFORE the target to include. Default 0.
/// Pass 5-10 for conversational context. The response also includes an
/// `earlier_handle` event_id you can pass to fetch_event to page further
/// back into history.
#[serde(default)]
context_before: Option<u32>,
/// Room ID to look in. Defaults to source room if omitted.
#[serde(default)]
room_id: Option<String>,
}
// ---------------------------------------------------------------------------
// MCP server struct
// ---------------------------------------------------------------------------
@ -268,7 +293,7 @@ impl MatrixBridge {
Self::response_to_result(resp)
}
#[tool(description = "Get recent message history for any joined room. Returns JSON list of messages and reactions with timestamps.")]
#[tool(description = "Get recent message history for any joined room. Returns JSON list of messages and reactions with timestamps. Backfills via /messages if cache is short.")]
async fn get_room_history(
&self,
Parameters(params): Parameters<GetRoomHistoryParams>,
@ -281,6 +306,22 @@ impl MatrixBridge {
.await?;
Self::response_to_result(resp)
}
#[tool(description = "Fetch a specific event by ID with optional context messages before it. Use to dereference [reply to $...] markers or arbitrary event IDs. Response includes earlier_handle for paging further back.")]
async fn fetch_event(
&self,
Parameters(params): Parameters<FetchEventParams>,
) -> Result<CallToolResult, McpError> {
let room_id = params.room_id.unwrap_or_else(|| self.source_room.clone());
let resp = self
.call(&DaemonRequest::FetchEvent {
room_id,
event_id: params.event_id,
context_before: params.context_before,
})
.await?;
Self::response_to_result(resp)
}
}
// ---------------------------------------------------------------------------

View file

@ -58,7 +58,7 @@ pub async fn invoke_claude(
"--add-dir",
&identity_str,
"--allowedTools",
"Read,Edit,Write,Glob,Grep,mcp__matrix__send_message,mcp__matrix__send_dm,mcp__matrix__send_reaction,mcp__matrix__send_reply,mcp__matrix__list_rooms,mcp__matrix__list_room_members,mcp__matrix__get_room_history",
"Read,Edit,Write,Glob,Grep,mcp__matrix__send_message,mcp__matrix__send_dm,mcp__matrix__send_reaction,mcp__matrix__send_reply,mcp__matrix__list_rooms,mcp__matrix__list_room_members,mcp__matrix__get_room_history,mcp__matrix__fetch_event",
"--mcp-config",
&mcp_config_str,
"-p",

View file

@ -35,6 +35,13 @@ pub enum DaemonRequest {
room_id: String,
limit: Option<usize>,
},
#[serde(rename = "fetch_event")]
FetchEvent {
room_id: String,
event_id: String,
context_before: Option<u32>,
},
}
/// Response from daemon to MCP server.

View file

@ -77,6 +77,11 @@ async fn handle_request(request: DaemonRequest, client: &Client) -> DaemonRespon
DaemonRequest::GetRoomHistory { room_id, limit } => {
get_room_history(client, &room_id, limit).await
}
DaemonRequest::FetchEvent {
room_id,
event_id,
context_before,
} => fetch_event(client, &room_id, &event_id, context_before).await,
}
}
@ -251,45 +256,202 @@ async fn get_room_history(
};
let limit = limit.unwrap_or(20).min(100);
let tl = match timeline::load_timeline(&room, limit, &own_user).await {
Ok(t) => t,
Err(e) => return DaemonResponse::err(format!("failed to load timeline: {e}")),
// Backfill via /messages if cache is short
if let Ok((cache, _)) = room.event_cache().await {
let mut tl = match timeline::load_timeline(&room, limit, &own_user).await {
Ok(t) => t,
Err(e) => return DaemonResponse::err(format!("failed to load timeline: {e}")),
};
let mut tries = 0;
while tl.len() < limit && tries < 5 {
tries += 1;
match cache.pagination().run_backwards_once((limit - tl.len()) as u16).await {
Ok(outcome) => {
if outcome.reached_start {
break;
}
}
Err(e) => {
tracing::warn!("backfill failed: {e}");
break;
}
}
tl = match timeline::load_timeline(&room, limit, &own_user).await {
Ok(t) => t,
Err(e) => return DaemonResponse::err(format!("reload after backfill failed: {e}")),
};
}
let items: Vec<_> = tl.iter().map(timeline_item_to_json).collect();
DaemonResponse::ok(items)
} else {
DaemonResponse::err("event cache not available".to_owned())
}
}
fn timeline_item_to_json(item: &crate::types::TimelineItem) -> serde_json::Value {
match item {
crate::types::TimelineItem::Message {
event_id,
sender,
body,
is_self,
ts,
in_reply_to,
} => json!({
"kind": "message",
"event_id": event_id.as_str(),
"sender": sender.as_str(),
"body": body,
"is_self": is_self,
"ts": ts,
"in_reply_to": in_reply_to.as_ref().map(|e| e.as_str()),
}),
crate::types::TimelineItem::Reaction {
sender,
target_event_id,
key,
is_self,
ts,
} => json!({
"kind": "reaction",
"sender": sender.as_str(),
"target_event_id": target_event_id.as_str(),
"key": key,
"is_self": is_self,
"ts": ts,
}),
}
}
/// Fetch a specific event by ID via the homeserver `/context` endpoint.
/// Returns the event plus `context_before` events before it. Includes one
/// extra event as `earlier_handle` so the shard can page further backward
/// by calling fetch_event again with that id.
async fn fetch_event(
client: &Client,
room_id: &str,
event_id: &str,
context_before: Option<u32>,
) -> DaemonResponse {
use matrix_sdk::ruma::events::AnySyncTimelineEvent;
use matrix_sdk::ruma::events::room::message::MessageType;
let rid = match room_id.parse::<OwnedRoomId>() {
Ok(r) => r,
Err(e) => return DaemonResponse::err(format!("invalid room_id: {e}")),
};
let Some(room) = client.get_room(&rid) else {
return DaemonResponse::err(format!("room {rid} not found"));
};
let own_user = match client.user_id() {
Some(u) => u.to_owned(),
None => return DaemonResponse::err("not logged in".to_owned()),
};
let items: Vec<_> = tl
.iter()
.map(|item| match item {
crate::types::TimelineItem::Message {
event_id,
sender,
body,
is_self,
ts,
in_reply_to,
} => json!({
"kind": "message",
"event_id": event_id.as_str(),
"sender": sender.as_str(),
"body": body,
"is_self": is_self,
"ts": ts,
"in_reply_to": in_reply_to.as_ref().map(|e| e.as_str()),
}),
crate::types::TimelineItem::Reaction {
sender,
target_event_id,
key,
is_self,
ts,
} => json!({
"kind": "reaction",
"sender": sender.as_str(),
"target_event_id": target_event_id.as_str(),
"key": key,
"is_self": is_self,
"ts": ts,
}),
})
.collect();
DaemonResponse::ok(items)
// Resolve possibly-shortened event id against recent timeline first;
// fall back to parsing as a full id.
let resolve_tl = timeline::load_timeline(&room, 50, &own_user)
.await
.unwrap_or_default();
let full_eid = match timeline::resolve_event_id(&resolve_tl, event_id) {
Some(eid) => eid,
None => match event_id.parse::<matrix_sdk::ruma::OwnedEventId>() {
Ok(eid) => eid,
Err(e) => {
return DaemonResponse::err(format!("invalid or unknown event_id: {e}"));
}
},
};
// Request one extra event so we can split it off as the paging handle
let context_before = context_before.unwrap_or(0).min(50);
let request_size = context_before + 1;
let response = match room
.event_with_context(
&full_eid,
false,
matrix_sdk::ruma::UInt::from(request_size),
None,
)
.await
{
Ok(r) => r,
Err(e) => return DaemonResponse::err(format!("event_with_context failed: {e}")),
};
let render = |raw: &matrix_sdk::deserialized_responses::TimelineEvent| -> Option<serde_json::Value> {
let deserialized = raw.raw().deserialize().ok()?;
let AnySyncTimelineEvent::MessageLike(msg) = deserialized else {
return None;
};
match msg {
matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage(
matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig),
) => {
let MessageType::Text(text) = &orig.content.msgtype else {
return None;
};
let ms: u64 = orig.origin_server_ts.0.into();
let ts = (ms / 1000) as i64;
let in_reply_to = match &orig.content.relates_to {
Some(matrix_sdk::ruma::events::room::message::Relation::Reply { in_reply_to }) => {
Some(in_reply_to.event_id.as_str().to_owned())
}
_ => None,
};
Some(json!({
"kind": "message",
"event_id": orig.event_id.as_str(),
"sender": orig.sender.as_str(),
"body": text.body,
"is_self": orig.sender == own_user,
"ts": ts,
"in_reply_to": in_reply_to,
}))
}
matrix_sdk::ruma::events::AnySyncMessageLikeEvent::Reaction(
matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig),
) => {
let ms: u64 = orig.origin_server_ts.0.into();
let ts = (ms / 1000) as i64;
Some(json!({
"kind": "reaction",
"sender": orig.sender.as_str(),
"target_event_id": orig.content.relates_to.event_id.as_str(),
"key": orig.content.relates_to.key,
"is_self": orig.sender == own_user,
"ts": ts,
}))
}
_ => None,
}
};
// events_before is newest-first per matrix /context spec - reverse for chronological
let mut before: Vec<_> = response.events_before.iter().filter_map(render).collect();
before.reverse();
// The "earlier handle" is the oldest event we got, if we asked for more than 0
// It's used by the shard to page further back via another fetch_event call
let earlier_handle = if context_before > 0 && before.len() > context_before as usize {
before.first().and_then(|e| e.get("event_id").and_then(|v| v.as_str()).map(String::from))
} else {
None
};
let context_events: Vec<_> = if earlier_handle.is_some() {
// First event is the handle - skip it from the main events list
before.into_iter().skip(1).collect()
} else {
before
};
let target = response.event.as_ref().and_then(render);
DaemonResponse::ok(json!({
"event": target,
"context_before": context_events,
"earlier_handle": earlier_handle,
}))
}