session-start and room-joined synthetic notices via generic notices list
This commit is contained in:
parent
994835d1db
commit
780f80615d
6 changed files with 77 additions and 24 deletions
|
|
@ -126,7 +126,12 @@ There's also a `kind: "edit"` event type that appears chronologically when an ed
|
|||
|
||||
You'll see edits in real-time as they're made. Don't reply to an edit event itself - it's a signal, not a message. Roast as you would for `edit_history` (sparingly, the spicy ones).
|
||||
|
||||
**Synthetic events** (`kind: "notice"`) appear inline in `new_events` when the daemon needs to tell you something out-of-band. Currently the only kind is rate-limit notification ("rate_limit: events held for Xs..."). They are NOT real Matrix messages - don't reply to them, don't react to them, just incorporate the info into your reasoning.
|
||||
**Synthetic events** (`kind: "notice"`) appear inline in `new_events` when the daemon needs to tell you something out-of-band. Examples:
|
||||
- `Session start: this is the first turn since you were (re)spawned. In-session memory is empty - rely on your notes files for prior context.` — first turn after a fresh shard spawn (idle gap, max events, mtime change, or crash recovery)
|
||||
- `rate_limit: events were held for Xs before reaching you...` — when input-side queueing held this turn for ≥30s
|
||||
- `Auto-joined this room (you were invited)...` — first turn for a room you were just auto-joined into
|
||||
|
||||
They are NOT real Matrix messages - don't reply to them, don't react to them. Incorporate the info into your reasoning. The session-start notice in particular is a chance to glance at room/people notes for context before responding.
|
||||
|
||||
To browse other room notes (cross-room awareness), look in `../rooms/`. To browse what you know about people, look in `../people/`. Each can have its own `notes.md` keyed by user_id.
|
||||
|
||||
|
|
|
|||
|
|
@ -29,9 +29,9 @@ pub struct MatrixTurn {
|
|||
|
||||
/// Build a matrix_turn envelope for one room. If `include_history` is false,
|
||||
/// the `previously_seen` array is empty (shard already has that context from
|
||||
/// earlier turns in this session). If `delay_notice_seconds` is `Some(n)`, a
|
||||
/// synthetic Notice event is prepended to `new_events` informing the shard
|
||||
/// that this turn was held by rate limiting for that many seconds.
|
||||
/// earlier turns in this session). `notices` are synthetic system messages
|
||||
/// (rate-limit delay, session start, room joined, etc.) prepended to
|
||||
/// `new_events` so the shard sees them in chronological order.
|
||||
pub fn build_turn(
|
||||
source_room: &OwnedRoomId,
|
||||
room_name: &str,
|
||||
|
|
@ -39,7 +39,7 @@ pub fn build_turn(
|
|||
seen_idx: usize,
|
||||
read_markers: &HashMap<OwnedEventId, Vec<OwnedUserId>>,
|
||||
include_history: bool,
|
||||
delay_notice_seconds: Option<u64>,
|
||||
notices: Vec<String>,
|
||||
) -> MatrixTurn {
|
||||
let mut senders: Vec<&OwnedUserId> = timeline
|
||||
.iter()
|
||||
|
|
@ -69,13 +69,12 @@ pub fn build_turn(
|
|||
.map(|d| d.as_secs() as i64)
|
||||
.unwrap_or(0);
|
||||
|
||||
if let Some(secs) = delay_notice_seconds {
|
||||
// Prepend notices in order so the shard sees them at the top of new_events.
|
||||
for (idx, text) in notices.into_iter().enumerate() {
|
||||
new_events.insert(
|
||||
0,
|
||||
idx,
|
||||
WireEvent::Notice {
|
||||
text: format!(
|
||||
"rate_limit: events were held for {secs}s before reaching you. context may be slightly stale; respond accordingly."
|
||||
),
|
||||
text,
|
||||
ts: now,
|
||||
ts_human: format!("{} UTC", format_ts(now)),
|
||||
},
|
||||
|
|
|
|||
|
|
@ -91,7 +91,13 @@ pub async fn on_reaction(
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn on_stripped_state_member(event: StrippedRoomMemberEvent, client: Client, room: Room) {
|
||||
pub async fn on_stripped_state_member(
|
||||
event: StrippedRoomMemberEvent,
|
||||
client: Client,
|
||||
room: Room,
|
||||
state: std::sync::Arc<tokio::sync::Mutex<crate::types::DaemonState>>,
|
||||
notify: std::sync::Arc<tokio::sync::Notify>,
|
||||
) {
|
||||
let Some(my_id) = client.user_id() else {
|
||||
return;
|
||||
};
|
||||
|
|
@ -109,6 +115,17 @@ pub async fn on_stripped_state_member(event: StrippedRoomMemberEvent, client: Cl
|
|||
if let Err(e) = ensure_room_notes(&room).await {
|
||||
tracing::warn!(room = %room_id, "failed to write room notes: {e}");
|
||||
}
|
||||
// Mark room as just-joined and queue it so the shard
|
||||
// gets a synthetic "you were just invited" notice on
|
||||
// the first turn for this room.
|
||||
{
|
||||
let mut s = state.lock().await;
|
||||
s.just_joined_rooms.insert(room_id.clone());
|
||||
if !s.pending_rooms.iter().any(|(r, _)| r == &room_id) {
|
||||
s.pending_rooms.push((room_id.clone(), std::time::Instant::now()));
|
||||
}
|
||||
}
|
||||
notify.notify_one();
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
|
|
|
|||
53
src/main.rs
53
src/main.rs
|
|
@ -101,6 +101,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
max_history,
|
||||
session_idle_minutes,
|
||||
session_max_events,
|
||||
just_joined_rooms: std::collections::HashSet::new(),
|
||||
}));
|
||||
|
||||
// Notify dispatcher when new events arrive (instant wake-up)
|
||||
|
|
@ -210,7 +211,19 @@ fn register_event_handlers(
|
|||
},
|
||||
);
|
||||
|
||||
client.add_event_handler(handlers::on_stripped_state_member);
|
||||
let join_state = state.clone();
|
||||
let join_notify = notify.clone();
|
||||
client.add_event_handler(
|
||||
move |event: matrix_sdk::ruma::events::room::member::StrippedRoomMemberEvent,
|
||||
client: Client,
|
||||
room: Room| {
|
||||
let state = join_state.clone();
|
||||
let notify = join_notify.clone();
|
||||
async move {
|
||||
handlers::on_stripped_state_member(event, client, room, state, notify).await;
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/// The dispatcher loop: owns one long-running ShardSession across rooms,
|
||||
|
|
@ -314,20 +327,36 @@ async fn process_loop(
|
|||
}
|
||||
}
|
||||
|
||||
// Compute delay since this room first entered the queue. If
|
||||
// significant (>30s), surface it to the shard via a synthetic notice.
|
||||
// Compute synthetic notices for this turn:
|
||||
// - rate-limit delay (if held >30s)
|
||||
// - session start (very first turn after spawn)
|
||||
// - room just-joined (since last turn for it)
|
||||
let mut notices: Vec<String> = Vec::new();
|
||||
|
||||
let sess = session.as_mut().unwrap();
|
||||
if sess.turn_count == 0 {
|
||||
notices.push("Session start: this is the first turn since you were (re)spawned. In-session memory is empty - rely on your notes files for prior context.".to_owned());
|
||||
}
|
||||
|
||||
let delay = queued_at.elapsed();
|
||||
let delay_notice = if delay.as_secs() >= 30 {
|
||||
Some(delay.as_secs())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
if delay.as_secs() >= 30 {
|
||||
notices.push(format!(
|
||||
"rate_limit: events were held for {}s before reaching you. context may be slightly stale; respond accordingly.",
|
||||
delay.as_secs()
|
||||
));
|
||||
}
|
||||
|
||||
{
|
||||
let mut s = state.lock().await;
|
||||
if s.just_joined_rooms.remove(&room_id) {
|
||||
notices.push("Auto-joined this room (you were invited). The current matrix_turn is your first context for it.".to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
// Process the room. If the turn fails, drop the session and let next
|
||||
// iteration respawn.
|
||||
let sess = session.as_mut().unwrap();
|
||||
if let Err(e) =
|
||||
process_room(&state, &client, &room_id, &room, sess, delay_notice).await
|
||||
process_room(&state, &client, &room_id, &room, sess, notices).await
|
||||
{
|
||||
tracing::error!(room = %room_id, "turn failed, dropping session: {e}");
|
||||
if let Some(s) = session.take() {
|
||||
|
|
@ -343,7 +372,7 @@ async fn process_room(
|
|||
room_id: &OwnedRoomId,
|
||||
room: &Room,
|
||||
session: &mut shard::ShardSession,
|
||||
delay_notice_seconds: Option<u64>,
|
||||
notices: Vec<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Snapshot last_shown for this room so we can mark seen vs new.
|
||||
let in_memory = {
|
||||
|
|
@ -451,7 +480,7 @@ async fn process_room(
|
|||
seen_idx,
|
||||
&read_markers,
|
||||
include_history,
|
||||
delay_notice_seconds,
|
||||
notices,
|
||||
);
|
||||
let turn_text = claude::turn_to_text(&turn);
|
||||
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ pub struct ShardSession {
|
|||
/// Last time a turn finished.
|
||||
last_used: Instant,
|
||||
/// Number of turns processed.
|
||||
turn_count: u32,
|
||||
pub turn_count: u32,
|
||||
/// Rooms we've sent at least one turn for in this session. Used to decide
|
||||
/// whether to include `previously_seen` context in a turn.
|
||||
pub rooms_seen: HashSet<OwnedRoomId>,
|
||||
|
|
|
|||
|
|
@ -194,5 +194,8 @@ pub struct DaemonState {
|
|||
pub max_history: usize,
|
||||
pub session_idle_minutes: u64,
|
||||
pub session_max_events: u32,
|
||||
/// Rooms we've auto-joined since their last turn. The dispatcher emits a
|
||||
/// "just joined" notice on the first turn for the room, then removes it.
|
||||
pub just_joined_rooms: std::collections::HashSet<OwnedRoomId>,
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue