1259 lines
40 KiB
Rust
1259 lines
40 KiB
Rust
mod paths;
|
|
|
|
use std::fmt::Write as _;
|
|
use std::path::Path;
|
|
use std::sync::Arc;
|
|
|
|
use anyhow::{Context, bail};
|
|
use matrix_sdk::{
|
|
Client, Room, RoomState,
|
|
authentication::matrix::MatrixSession,
|
|
config::SyncSettings,
|
|
ruma::{
|
|
OwnedEventId, OwnedRoomId, OwnedUserId, UserId,
|
|
api::client::{
|
|
filter::FilterDefinition, receipt::create_receipt::v3::ReceiptType as CreateReceiptType,
|
|
},
|
|
events::{
|
|
receipt::ReceiptThread,
|
|
room::{
|
|
member::StrippedRoomMemberEvent,
|
|
message::{MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent},
|
|
},
|
|
},
|
|
},
|
|
};
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio::fs;
|
|
use tokio::sync::Mutex;
|
|
use tracing_subscriber::EnvFilter;
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
struct Config {
|
|
homeserver: String,
|
|
username: String,
|
|
password: String,
|
|
rate_limit_per_min: Option<u32>,
|
|
model: Option<String>,
|
|
max_history: Option<usize>,
|
|
}
|
|
|
|
const DEFAULT_MODEL: &str = "claude-sonnet-4-6";
|
|
const DEFAULT_MAX_HISTORY: usize = 20;
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
struct PersistedSession {
|
|
homeserver: String,
|
|
db_path: std::path::PathBuf,
|
|
user_session: MatrixSession,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
sync_token: Option<String>,
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
enum TimelineItem {
|
|
Message {
|
|
event_id: OwnedEventId,
|
|
sender: OwnedUserId,
|
|
body: String,
|
|
is_self: bool,
|
|
/// Unix seconds. 0 if unknown.
|
|
ts: i64,
|
|
in_reply_to: Option<OwnedEventId>,
|
|
},
|
|
Reaction {
|
|
sender: OwnedUserId,
|
|
target_event_id: OwnedEventId,
|
|
key: String,
|
|
is_self: bool,
|
|
ts: i64,
|
|
},
|
|
}
|
|
|
|
impl TimelineItem {
|
|
fn ts(&self) -> i64 {
|
|
match self {
|
|
TimelineItem::Message { ts, .. } | TimelineItem::Reaction { ts, .. } => *ts,
|
|
}
|
|
}
|
|
|
|
fn event_id(&self) -> Option<&OwnedEventId> {
|
|
match self {
|
|
TimelineItem::Message { event_id, .. } => Some(event_id),
|
|
TimelineItem::Reaction { .. } => None,
|
|
}
|
|
}
|
|
|
|
fn sender(&self) -> &OwnedUserId {
|
|
match self {
|
|
TimelineItem::Message { sender, .. } | TimelineItem::Reaction { sender, .. } => sender,
|
|
}
|
|
}
|
|
|
|
fn is_self(&self) -> bool {
|
|
match self {
|
|
TimelineItem::Message { is_self, .. } | TimelineItem::Reaction { is_self, .. } => {
|
|
*is_self
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
struct DaemonState {
|
|
own_user_id: OwnedUserId,
|
|
/// Per-room: the latest event_id that's been "shown" to Claude. Events
|
|
/// after this are "new" on the next invocation. Cleared on daemon restart.
|
|
last_shown: std::collections::HashMap<OwnedRoomId, OwnedEventId>,
|
|
pending_rooms: Vec<OwnedRoomId>,
|
|
rate_budget: u32,
|
|
rate_limit_per_min: u32,
|
|
last_rate_reset: std::time::Instant,
|
|
model: String,
|
|
max_history: usize,
|
|
}
|
|
|
|
const DEFAULT_RATE_LIMIT_PER_MIN: u32 = 1;
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
tracing_subscriber::fmt()
|
|
.with_env_filter(EnvFilter::from_default_env())
|
|
.init();
|
|
|
|
tracing::info!("damocles-daemon starting");
|
|
|
|
let state_dir = paths::state_dir();
|
|
fs::create_dir_all(&state_dir).await?;
|
|
fs::create_dir_all(paths::identity_dir()).await?;
|
|
fs::create_dir_all(state_dir.join("rooms")).await?;
|
|
fs::create_dir_all(state_dir.join("people")).await?;
|
|
|
|
let session_file = paths::session_path();
|
|
let db_path = paths::db_path();
|
|
|
|
let config = load_config().await?;
|
|
let rate_limit_per_min = config
|
|
.rate_limit_per_min
|
|
.unwrap_or(DEFAULT_RATE_LIMIT_PER_MIN);
|
|
let model = config
|
|
.model
|
|
.clone()
|
|
.unwrap_or_else(|| DEFAULT_MODEL.to_owned());
|
|
let max_history = config.max_history.unwrap_or(DEFAULT_MAX_HISTORY);
|
|
|
|
let (client, sync_token) = if session_file.exists() {
|
|
restore_session(&session_file).await?
|
|
} else {
|
|
(login(&config, &db_path, &session_file).await?, None)
|
|
};
|
|
|
|
let own_user_id = client.user_id().context("not logged in")?.to_owned();
|
|
tracing::info!(
|
|
user = %own_user_id,
|
|
rate_limit = rate_limit_per_min,
|
|
model = %model,
|
|
"ready"
|
|
);
|
|
|
|
// Enable persistent event cache (matrix-sdk's sqlite store keeps the timeline)
|
|
client
|
|
.event_cache()
|
|
.subscribe()
|
|
.context("subscribe event cache")?;
|
|
|
|
let state = Arc::new(Mutex::new(DaemonState {
|
|
own_user_id,
|
|
last_shown: std::collections::HashMap::new(),
|
|
pending_rooms: Vec::new(),
|
|
rate_budget: rate_limit_per_min,
|
|
rate_limit_per_min,
|
|
last_rate_reset: std::time::Instant::now(),
|
|
model,
|
|
max_history,
|
|
}));
|
|
|
|
let processor_state = state.clone();
|
|
let processor_client = client.clone();
|
|
tokio::spawn(async move {
|
|
process_loop(processor_state, processor_client).await;
|
|
});
|
|
|
|
sync(client, sync_token, &session_file, state).await
|
|
}
|
|
|
|
async fn load_config() -> anyhow::Result<Config> {
|
|
let data = fs::read_to_string(paths::config_path())
|
|
.await
|
|
.context("failed to read config.json")?;
|
|
serde_json::from_str(&data).context("failed to parse config.json")
|
|
}
|
|
|
|
async fn restore_session(session_file: &Path) -> anyhow::Result<(Client, Option<String>)> {
|
|
let data = fs::read_to_string(session_file).await?;
|
|
let session: PersistedSession = serde_json::from_str(&data)?;
|
|
|
|
tracing::info!(user = %session.user_session.meta.user_id, "restoring session");
|
|
|
|
let client = Client::builder()
|
|
.homeserver_url(&session.homeserver)
|
|
.sqlite_store(paths::db_path(), None)
|
|
.build()
|
|
.await?;
|
|
|
|
client.restore_session(session.user_session).await?;
|
|
|
|
Ok((client, session.sync_token))
|
|
}
|
|
|
|
async fn login(config: &Config, db_path: &Path, session_file: &Path) -> anyhow::Result<Client> {
|
|
tracing::info!(homeserver = %config.homeserver, user = %config.username, "logging in");
|
|
|
|
let client = Client::builder()
|
|
.homeserver_url(&config.homeserver)
|
|
.sqlite_store(db_path, None)
|
|
.build()
|
|
.await?;
|
|
|
|
client
|
|
.matrix_auth()
|
|
.login_username(&config.username, &config.password)
|
|
.initial_device_display_name("damocles-daemon")
|
|
.await?;
|
|
|
|
let user_session = client
|
|
.matrix_auth()
|
|
.session()
|
|
.context("no session after login")?;
|
|
|
|
let persisted = PersistedSession {
|
|
homeserver: config.homeserver.clone(),
|
|
db_path: db_path.to_owned(),
|
|
user_session,
|
|
sync_token: None,
|
|
};
|
|
fs::write(session_file, serde_json::to_string_pretty(&persisted)?).await?;
|
|
|
|
tracing::info!("session persisted");
|
|
Ok(client)
|
|
}
|
|
|
|
async fn sync(
|
|
client: Client,
|
|
initial_sync_token: Option<String>,
|
|
session_file: &Path,
|
|
state: Arc<Mutex<DaemonState>>,
|
|
) -> anyhow::Result<()> {
|
|
tracing::info!("initial sync (ignoring past messages)");
|
|
|
|
let filter = FilterDefinition::with_lazy_loading();
|
|
let mut sync_settings = SyncSettings::default().filter(filter.into());
|
|
|
|
if let Some(token) = initial_sync_token {
|
|
sync_settings = sync_settings.token(token);
|
|
}
|
|
|
|
loop {
|
|
match client.sync_once(sync_settings.clone()).await {
|
|
Ok(response) => {
|
|
sync_settings = sync_settings.token(response.next_batch.clone());
|
|
persist_sync_token(session_file, response.next_batch).await?;
|
|
break;
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!("initial sync failed, retrying: {e}");
|
|
}
|
|
}
|
|
}
|
|
|
|
tracing::info!("synced, listening for messages");
|
|
|
|
client.add_event_handler(move |event: OriginalSyncRoomMessageEvent, room: Room| {
|
|
let state = state.clone();
|
|
async move {
|
|
on_room_message(event, room, state).await;
|
|
}
|
|
});
|
|
|
|
client.add_event_handler(on_stripped_state_member);
|
|
|
|
client.sync(sync_settings).await?;
|
|
|
|
bail!("sync loop exited unexpectedly")
|
|
}
|
|
|
|
async fn on_stripped_state_member(event: StrippedRoomMemberEvent, client: Client, room: Room) {
|
|
let Some(my_id) = client.user_id() else {
|
|
return;
|
|
};
|
|
if event.state_key != my_id {
|
|
return;
|
|
}
|
|
let room_id = room.room_id().to_owned();
|
|
tokio::spawn(async move {
|
|
tracing::info!(room = %room_id, "auto-joining invite");
|
|
let mut delay = 2u64;
|
|
loop {
|
|
match room.join().await {
|
|
Ok(()) => {
|
|
tracing::info!(room = %room_id, "joined");
|
|
if let Err(e) = ensure_room_notes(&room).await {
|
|
tracing::warn!(room = %room_id, "failed to write room notes: {e}");
|
|
}
|
|
return;
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!(room = %room_id, "join failed, retry in {delay}s: {e}");
|
|
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
|
|
delay = (delay * 2).min(300);
|
|
if delay >= 300 {
|
|
tracing::error!(room = %room_id, "giving up on auto-join");
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
async fn persist_sync_token(session_file: &Path, sync_token: String) -> anyhow::Result<()> {
|
|
let data = fs::read_to_string(session_file).await?;
|
|
let mut session: PersistedSession = serde_json::from_str(&data)?;
|
|
session.sync_token = Some(sync_token);
|
|
fs::write(session_file, serde_json::to_string_pretty(&session)?).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn on_room_message(
|
|
event: OriginalSyncRoomMessageEvent,
|
|
room: Room,
|
|
state: Arc<Mutex<DaemonState>>,
|
|
) {
|
|
if room.state() != RoomState::Joined {
|
|
return;
|
|
}
|
|
let MessageType::Text(text_content) = &event.content.msgtype else {
|
|
return;
|
|
};
|
|
|
|
let room_id = room.room_id().to_owned();
|
|
let is_self = {
|
|
let state = state.lock().await;
|
|
event.sender == state.own_user_id
|
|
};
|
|
|
|
tracing::info!(
|
|
room = %room_id,
|
|
sender = %event.sender,
|
|
self_msg = is_self,
|
|
"{}",
|
|
text_content.body
|
|
);
|
|
|
|
if let Err(e) = ensure_room_notes(&room).await {
|
|
tracing::warn!(room = %room_id, "failed to ensure room notes: {e}");
|
|
}
|
|
if !is_self {
|
|
if let Err(e) = ensure_person_notes(&room, &event.sender).await {
|
|
tracing::warn!(sender = %event.sender, "failed to ensure person notes: {e}");
|
|
}
|
|
}
|
|
|
|
if !is_self {
|
|
let mut state = state.lock().await;
|
|
if !state.pending_rooms.contains(&room_id) {
|
|
state.pending_rooms.push(room_id);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Create state/rooms/<room_id>/notes.md if it doesn't exist.
|
|
async fn ensure_room_notes(room: &Room) -> anyhow::Result<()> {
|
|
let room_id = room.room_id();
|
|
let dir = paths::state_dir().join("rooms").join(room_id.as_str());
|
|
let file = dir.join("notes.md");
|
|
if file.exists() {
|
|
return Ok(());
|
|
}
|
|
fs::create_dir_all(&dir).await?;
|
|
|
|
let display_name = room
|
|
.display_name()
|
|
.await
|
|
.map(|n| n.to_string())
|
|
.unwrap_or_else(|_| room_id.to_string());
|
|
let now = chrono_now();
|
|
|
|
let body = format!("# {room_id}\n\nDisplay name: {display_name}\n\nFirst joined: {now}\n",);
|
|
fs::write(&file, body).await?;
|
|
tracing::info!(room = %room_id, "created room notes");
|
|
Ok(())
|
|
}
|
|
|
|
/// Create state/people/<user_id>/notes.md if it doesn't exist.
|
|
/// Pre-fill with display name (from this room's member info) and the room as
|
|
/// "first met in".
|
|
async fn ensure_person_notes(room: &Room, user_id: &UserId) -> anyhow::Result<()> {
|
|
let dir = paths::state_dir().join("people").join(user_id.as_str());
|
|
let file = dir.join("notes.md");
|
|
if file.exists() {
|
|
return Ok(());
|
|
}
|
|
fs::create_dir_all(&dir).await?;
|
|
|
|
let display_name = room
|
|
.get_member_no_sync(user_id)
|
|
.await
|
|
.ok()
|
|
.flatten()
|
|
.and_then(|m| m.display_name().map(ToOwned::to_owned))
|
|
.unwrap_or_default();
|
|
let display_line = if display_name.is_empty() {
|
|
String::new()
|
|
} else {
|
|
format!("Display name: {display_name}\n")
|
|
};
|
|
let room_id = room.room_id();
|
|
let now = chrono_now();
|
|
|
|
let body = format!("# {user_id}\n\n{display_line}First met in: {room_id} on {now}\n",);
|
|
fs::write(&file, body).await?;
|
|
tracing::info!(user = %user_id, "created person notes");
|
|
Ok(())
|
|
}
|
|
|
|
fn chrono_now() -> String {
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
let secs = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.map(|d| d.as_secs() as i64)
|
|
.unwrap_or(0);
|
|
// Simple ISO-ish without pulling chrono. YYYY-MM-DD only is fine for notes.
|
|
let days = secs / 86400;
|
|
let (y, m, d) = days_to_ymd(days);
|
|
format!("{y:04}-{m:02}-{d:02}")
|
|
}
|
|
|
|
/// Format a unix-seconds timestamp as `YYYY-MM-DD HH:MM` UTC. Returns "?" for 0.
|
|
fn format_ts(secs: i64) -> String {
|
|
if secs == 0 {
|
|
return "?".into();
|
|
}
|
|
let days = secs.div_euclid(86400);
|
|
let day_secs = secs.rem_euclid(86400);
|
|
let (y, m, d) = days_to_ymd(days);
|
|
let h = day_secs / 3600;
|
|
let min = (day_secs % 3600) / 60;
|
|
format!("{y:04}-{m:02}-{d:02} {h:02}:{min:02}")
|
|
}
|
|
|
|
/// Convert days-since-1970-01-01 to (year, month, day). Civil-date algorithm.
|
|
fn days_to_ymd(z: i64) -> (i64, u32, u32) {
|
|
let z = z + 719_468;
|
|
let era = z.div_euclid(146_097);
|
|
let doe = z.rem_euclid(146_097) as u32;
|
|
let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
|
|
let y = yoe as i64 + era * 400;
|
|
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
|
|
let mp = (5 * doy + 2) / 153;
|
|
let d = doy - (153 * mp + 2) / 5 + 1;
|
|
let m = if mp < 10 { mp + 3 } else { mp - 9 };
|
|
(if m <= 2 { y + 1 } else { y }, m, d)
|
|
}
|
|
|
|
async fn process_loop(state: Arc<Mutex<DaemonState>>, client: Client) {
|
|
loop {
|
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
|
|
let room_id = {
|
|
let mut state = state.lock().await;
|
|
|
|
if state.last_rate_reset.elapsed() >= std::time::Duration::from_secs(60) {
|
|
state.rate_budget = state.rate_limit_per_min;
|
|
state.last_rate_reset = std::time::Instant::now();
|
|
}
|
|
|
|
if state.rate_budget == 0 {
|
|
continue;
|
|
}
|
|
|
|
state.pending_rooms.pop()
|
|
};
|
|
|
|
let Some(room_id) = room_id else {
|
|
continue;
|
|
};
|
|
|
|
let Some(room) = client.get_room(&room_id) else {
|
|
tracing::warn!(room = %room_id, "room not found in client");
|
|
continue;
|
|
};
|
|
|
|
// Snapshot last_shown for this room so we can mark seen vs new.
|
|
// If we don't have one in memory (e.g. fresh daemon start), seed from
|
|
// the stored read receipt so we don't reprocess old messages.
|
|
let in_memory = {
|
|
let s = state.lock().await;
|
|
s.last_shown.get(&room_id).cloned()
|
|
};
|
|
let prev_last_shown = if let Some(eid) = in_memory {
|
|
Some(eid)
|
|
} else {
|
|
let from_receipt = match room
|
|
.load_user_receipt(
|
|
matrix_sdk::ruma::events::receipt::ReceiptType::Read,
|
|
ReceiptThread::Unthreaded,
|
|
client.user_id().expect("logged in"),
|
|
)
|
|
.await
|
|
{
|
|
Ok(Some((eid, _))) => Some(eid),
|
|
Ok(None) => None,
|
|
Err(e) => {
|
|
tracing::warn!(room = %room_id, "failed to load receipt: {e}");
|
|
None
|
|
}
|
|
};
|
|
if let Some(ref eid) = from_receipt {
|
|
let mut s = state.lock().await;
|
|
s.last_shown.insert(room_id.clone(), eid.clone());
|
|
}
|
|
from_receipt
|
|
};
|
|
let room_name = room
|
|
.display_name()
|
|
.await
|
|
.map_or_else(|_| room_id.to_string(), |n| n.to_string());
|
|
|
|
let (own_user, model, max_history) = {
|
|
let state = state.lock().await;
|
|
(
|
|
state.own_user_id.clone(),
|
|
state.model.clone(),
|
|
state.max_history,
|
|
)
|
|
};
|
|
|
|
// Load recent timeline (messages + reactions) from matrix-sdk's
|
|
// persistent event cache.
|
|
let mut timeline = match load_timeline(&room, max_history, &own_user).await {
|
|
Ok(t) => t,
|
|
Err(e) => {
|
|
tracing::error!(room = %room_id, "failed to load timeline: {e}");
|
|
continue;
|
|
}
|
|
};
|
|
|
|
// For any new messages that reply to events outside the window, fetch
|
|
// the replied-to event from cache and prepend it as extra context.
|
|
let in_window: std::collections::HashSet<OwnedEventId> = timeline
|
|
.iter()
|
|
.filter_map(TimelineItem::event_id)
|
|
.cloned()
|
|
.collect();
|
|
let seen_idx_initial = prev_last_shown
|
|
.as_ref()
|
|
.and_then(|id| {
|
|
timeline.iter().position(|t| match t {
|
|
TimelineItem::Message { event_id, .. } => event_id == id,
|
|
_ => false,
|
|
})
|
|
})
|
|
.map_or(0, |pos| pos + 1);
|
|
let mut reply_targets: Vec<OwnedEventId> = Vec::new();
|
|
for item in timeline.iter().skip(seen_idx_initial) {
|
|
if let TimelineItem::Message {
|
|
in_reply_to: Some(target),
|
|
..
|
|
} = item
|
|
{
|
|
if !in_window.contains(target) && !reply_targets.contains(target) {
|
|
reply_targets.push(target.clone());
|
|
}
|
|
}
|
|
}
|
|
if !reply_targets.is_empty() {
|
|
if let Ok((cache, _h)) = room.event_cache().await {
|
|
for target in &reply_targets {
|
|
if let Some(found) = fetch_message(&cache, target, &own_user).await {
|
|
timeline.insert(0, found);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Determine seen split based on the last_shown message event id
|
|
let seen_idx = prev_last_shown
|
|
.as_ref()
|
|
.and_then(|id| {
|
|
timeline.iter().position(|t| match t {
|
|
TimelineItem::Message { event_id, .. } => event_id == id,
|
|
_ => false,
|
|
})
|
|
})
|
|
.map_or(0, |pos| pos + 1);
|
|
|
|
// The "last shown" pointer should advance to the latest message we've
|
|
// loaded (not a reaction).
|
|
let new_last_event_id = timeline.iter().rev().find_map(|t| match t {
|
|
TimelineItem::Message { event_id, .. } => Some(event_id.clone()),
|
|
_ => None,
|
|
});
|
|
|
|
let docs = match invoke_claude(&room_id, &room_name, &timeline, seen_idx, &model).await {
|
|
Ok(d) => d,
|
|
Err(e) => {
|
|
tracing::error!(room = %room_id, "claude invocation failed: {e}");
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let mut sent_any = false;
|
|
for doc in docs {
|
|
match doc {
|
|
ClaudeDoc::Skip => {
|
|
tracing::debug!(room = %room_id, "claude doc: skip");
|
|
}
|
|
ClaudeDoc::Thought(body) => {
|
|
tracing::info!(room = %room_id, thought = %body.chars().take(120).collect::<String>(), "claude doc: thought");
|
|
tracing::trace!("full thought: {body}");
|
|
}
|
|
ClaudeDoc::Message { target, body } => {
|
|
let target_room = match &target {
|
|
ResponseTarget::Room(rid) => client.get_room(rid),
|
|
ResponseTarget::Dm(user) => match find_or_create_dm(&client, user).await {
|
|
Ok(r) => Some(r),
|
|
Err(e) => {
|
|
tracing::error!(user = %user, "failed to get/create DM: {e}");
|
|
None
|
|
}
|
|
},
|
|
};
|
|
let target_label = match &target {
|
|
ResponseTarget::Room(rid) => rid.to_string(),
|
|
ResponseTarget::Dm(user) => format!("dm:{user}"),
|
|
};
|
|
if let Some(target_room) = target_room {
|
|
let content = RoomMessageEventContent::text_plain(&body);
|
|
match target_room.send(content).await {
|
|
Ok(_) => {
|
|
let mut state = state.lock().await;
|
|
state.rate_budget = state.rate_budget.saturating_sub(1);
|
|
tracing::info!(
|
|
target = %target_label,
|
|
"sent response ({} budget remaining)",
|
|
state.rate_budget
|
|
);
|
|
sent_any = true;
|
|
}
|
|
Err(e) => tracing::error!("failed to send: {e}"),
|
|
}
|
|
} else {
|
|
tracing::warn!(target = %target_label, "target not available");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update last_shown and send read receipt regardless of whether we
|
|
// sent a message - the agent saw the messages either way.
|
|
{
|
|
let mut state = state.lock().await;
|
|
if let Some(eid) = new_last_event_id.clone() {
|
|
state.last_shown.insert(room_id.clone(), eid);
|
|
}
|
|
}
|
|
send_read_receipt(&room, new_last_event_id).await;
|
|
|
|
let _ = sent_any;
|
|
}
|
|
}
|
|
|
|
/// Find an existing DM room with the given user, or create one.
|
|
async fn find_or_create_dm(client: &Client, user_id: &UserId) -> anyhow::Result<Room> {
|
|
for room in client.joined_rooms() {
|
|
if room.is_direct().await.unwrap_or(false)
|
|
&& room
|
|
.direct_targets()
|
|
.iter()
|
|
.any(|t| t.as_str() == user_id.as_str())
|
|
{
|
|
return Ok(room);
|
|
}
|
|
}
|
|
tracing::info!(user = %user_id, "creating new DM room");
|
|
Ok(client.create_dm(user_id).await?)
|
|
}
|
|
|
|
async fn send_read_receipt(room: &Room, event_id: Option<OwnedEventId>) {
|
|
let Some(eid) = event_id else {
|
|
return;
|
|
};
|
|
if let Err(e) = room
|
|
.send_single_receipt(CreateReceiptType::Read, ReceiptThread::Unthreaded, eid)
|
|
.await
|
|
{
|
|
tracing::warn!(room = %room.room_id(), "failed to send read receipt: {e}");
|
|
}
|
|
}
|
|
|
|
/// Load the last N timeline items (messages + reactions) from the room's
|
|
/// persistent event cache. Returns oldest-first.
|
|
///
|
|
/// We walk events newest-first, collect messages until we have `limit`, then
|
|
/// also include any reactions whose timestamps fall within the message window.
|
|
async fn load_timeline(
|
|
room: &Room,
|
|
limit: usize,
|
|
own_user: &OwnedUserId,
|
|
) -> anyhow::Result<Vec<TimelineItem>> {
|
|
use matrix_sdk::ruma::events::AnySyncTimelineEvent;
|
|
|
|
let (cache, _handles) = room.event_cache().await?;
|
|
let events = cache.events().await;
|
|
|
|
let mut messages: Vec<TimelineItem> = Vec::new();
|
|
let mut reactions: Vec<TimelineItem> = Vec::new();
|
|
let mut earliest_message_ts: Option<i64> = None;
|
|
|
|
for ev in events.iter().rev() {
|
|
let raw = ev.raw();
|
|
let Ok(deserialized) = raw.deserialize() else {
|
|
continue;
|
|
};
|
|
let AnySyncTimelineEvent::MessageLike(msg) = deserialized else {
|
|
continue;
|
|
};
|
|
|
|
match msg {
|
|
matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage(
|
|
matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig),
|
|
) => {
|
|
if messages.len() >= limit {
|
|
continue;
|
|
}
|
|
let MessageType::Text(text) = &orig.content.msgtype else {
|
|
continue;
|
|
};
|
|
let ts = ts_secs_from(orig.origin_server_ts.0);
|
|
let in_reply_to = match &orig.content.relates_to {
|
|
Some(matrix_sdk::ruma::events::room::message::Relation::Reply {
|
|
in_reply_to,
|
|
}) => Some(in_reply_to.event_id.clone()),
|
|
_ => None,
|
|
};
|
|
if earliest_message_ts.is_none_or(|e| ts < e) {
|
|
earliest_message_ts = Some(ts);
|
|
}
|
|
messages.push(TimelineItem::Message {
|
|
event_id: orig.event_id.clone(),
|
|
sender: orig.sender.clone(),
|
|
body: text.body.clone(),
|
|
is_self: &orig.sender == own_user,
|
|
ts,
|
|
in_reply_to,
|
|
});
|
|
}
|
|
matrix_sdk::ruma::events::AnySyncMessageLikeEvent::Reaction(
|
|
matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig),
|
|
) => {
|
|
let ts = ts_secs_from(orig.origin_server_ts.0);
|
|
reactions.push(TimelineItem::Reaction {
|
|
sender: orig.sender.clone(),
|
|
target_event_id: orig.content.relates_to.event_id.clone(),
|
|
key: orig.content.relates_to.key.clone(),
|
|
is_self: &orig.sender == own_user,
|
|
ts,
|
|
});
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
// Drop reactions older than the oldest visible message, to avoid
|
|
// referencing messages we don't have in scope.
|
|
if let Some(min_ts) = earliest_message_ts {
|
|
reactions.retain(|r| r.ts() >= min_ts);
|
|
}
|
|
|
|
let mut combined: Vec<TimelineItem> = Vec::with_capacity(messages.len() + reactions.len());
|
|
combined.extend(messages);
|
|
combined.extend(reactions);
|
|
combined.sort_by_key(TimelineItem::ts);
|
|
Ok(combined)
|
|
}
|
|
|
|
/// Render one timeline item into the prompt.
|
|
/// Messages: `[ts] $eid... [(you) ]@user: body`
|
|
/// Reactions: `[ts] [(you) ]@user reacted to $eid... with KEY`
|
|
fn render_timeline_item(prompt: &mut String, item: &TimelineItem) {
|
|
match item {
|
|
TimelineItem::Message {
|
|
event_id,
|
|
sender,
|
|
body,
|
|
is_self,
|
|
ts,
|
|
..
|
|
} => {
|
|
let ts_str = format_ts(*ts);
|
|
let id = short_event_id(event_id);
|
|
let prefix = if *is_self { "(you) " } else { "" };
|
|
writeln!(prompt, "[{ts_str}] {id} {prefix}{sender}: {body}").unwrap();
|
|
}
|
|
TimelineItem::Reaction {
|
|
sender,
|
|
target_event_id,
|
|
key,
|
|
is_self,
|
|
ts,
|
|
} => {
|
|
let ts_str = format_ts(*ts);
|
|
let id = short_event_id(target_event_id);
|
|
let prefix = if *is_self { "(you) " } else { "" };
|
|
writeln!(
|
|
prompt,
|
|
"[{ts_str}] {prefix}{sender} reacted to {id} with {key}"
|
|
)
|
|
.unwrap();
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Shorten an event id for prompt display: `$abc123def456...` → `$abc123de`.
|
|
fn short_event_id(id: &OwnedEventId) -> String {
|
|
let s = id.as_str();
|
|
let prefix: String = s.chars().take(9).collect();
|
|
if s.len() > 9 {
|
|
format!("{prefix}…")
|
|
} else {
|
|
prefix
|
|
}
|
|
}
|
|
|
|
fn ts_secs_from(ts: matrix_sdk::ruma::UInt) -> i64 {
|
|
let ms: u64 = ts.into();
|
|
i64::try_from(ms).unwrap_or(0) / 1000
|
|
}
|
|
|
|
/// Fetch a single text message by event_id from the room's event cache.
|
|
async fn fetch_message(
|
|
cache: &matrix_sdk::event_cache::RoomEventCache,
|
|
event_id: &matrix_sdk::ruma::EventId,
|
|
own_user: &OwnedUserId,
|
|
) -> Option<TimelineItem> {
|
|
use matrix_sdk::ruma::events::AnySyncTimelineEvent;
|
|
|
|
let ev = cache.find_event(event_id).await?;
|
|
let deserialized = ev.raw().deserialize().ok()?;
|
|
let AnySyncTimelineEvent::MessageLike(msg) = deserialized else {
|
|
return None;
|
|
};
|
|
let matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage(
|
|
matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(orig),
|
|
) = msg
|
|
else {
|
|
return None;
|
|
};
|
|
let MessageType::Text(text) = &orig.content.msgtype else {
|
|
return None;
|
|
};
|
|
let ts = ts_secs_from(orig.origin_server_ts.0);
|
|
Some(TimelineItem::Message {
|
|
event_id: orig.event_id.clone(),
|
|
sender: orig.sender.clone(),
|
|
body: text.body.clone(),
|
|
is_self: &orig.sender == own_user,
|
|
ts,
|
|
in_reply_to: None,
|
|
})
|
|
}
|
|
|
|
enum ResponseTarget {
|
|
Room(OwnedRoomId),
|
|
Dm(OwnedUserId),
|
|
}
|
|
|
|
/// One document within Claude's multi-doc output. Each doc has its own
|
|
/// frontmatter; the daemon routes based on which fields are present.
|
|
enum ClaudeDoc {
|
|
/// A chat message to send.
|
|
Message {
|
|
target: ResponseTarget,
|
|
body: String,
|
|
},
|
|
/// Agent's internal monologue. Not sent to chat. Logged to tracing.
|
|
Thought(String),
|
|
/// Explicit "do nothing for this slot". Useful as a placeholder.
|
|
Skip,
|
|
}
|
|
|
|
async fn invoke_claude(
|
|
source_room: &OwnedRoomId,
|
|
room_name: &str,
|
|
timeline: &[TimelineItem],
|
|
seen_idx: usize,
|
|
model: &str,
|
|
) -> anyhow::Result<Vec<ClaudeDoc>> {
|
|
let identity_dir = paths::identity_dir();
|
|
let identity_str = identity_dir.to_string_lossy();
|
|
|
|
let mut prompt = String::new();
|
|
writeln!(prompt, "[room_id: {source_room}]").unwrap();
|
|
writeln!(prompt, "[room_name: {room_name}]").unwrap();
|
|
writeln!(
|
|
prompt,
|
|
"[room notes path: ../rooms/{source_room}/notes.md (create dir if needed)]"
|
|
)
|
|
.unwrap();
|
|
|
|
// Collect unique non-self participants (message senders + reactors)
|
|
let mut senders: Vec<&OwnedUserId> = timeline
|
|
.iter()
|
|
.filter(|t| !t.is_self())
|
|
.map(TimelineItem::sender)
|
|
.collect();
|
|
senders.sort();
|
|
senders.dedup();
|
|
if !senders.is_empty() {
|
|
writeln!(
|
|
prompt,
|
|
"\n[people in this room — check ../people/<user_id>/notes.md for each]"
|
|
)
|
|
.unwrap();
|
|
for s in &senders {
|
|
writeln!(prompt, " {s}").unwrap();
|
|
}
|
|
}
|
|
|
|
let seen = seen_idx.min(timeline.len());
|
|
let (old, new) = timeline.split_at(seen);
|
|
|
|
if !old.is_empty() {
|
|
writeln!(prompt, "\n[previously seen events — for context]").unwrap();
|
|
for item in old {
|
|
render_timeline_item(&mut prompt, item);
|
|
}
|
|
}
|
|
|
|
writeln!(prompt, "\n[new events — respond to these]").unwrap();
|
|
if new.is_empty() {
|
|
writeln!(prompt, "(none)").unwrap();
|
|
} else {
|
|
for item in new {
|
|
render_timeline_item(&mut prompt, item);
|
|
}
|
|
}
|
|
|
|
let new_msg_count = new
|
|
.iter()
|
|
.filter(|t| matches!(t, TimelineItem::Message { .. }))
|
|
.count();
|
|
let new_react_count = new.len() - new_msg_count;
|
|
tracing::info!(
|
|
"invoking claude: {} new ({} msg + {} react), {} seen",
|
|
new.len(),
|
|
new_msg_count,
|
|
new_react_count,
|
|
old.len()
|
|
);
|
|
tracing::trace!("full prompt:\n{prompt}");
|
|
|
|
use tokio::process::Command;
|
|
let mut cmd = Command::new("claude");
|
|
cmd.args([
|
|
"--print",
|
|
"--model",
|
|
model,
|
|
"--add-dir",
|
|
&identity_str,
|
|
"--allowedTools",
|
|
"Read Edit Write Glob Grep",
|
|
"-p",
|
|
&prompt,
|
|
]);
|
|
cmd.current_dir(&identity_dir);
|
|
cmd.stdin(std::process::Stdio::null());
|
|
let output = cmd.output().await.context("failed to run claude")?;
|
|
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
|
|
|
if !output.status.success() {
|
|
bail!(
|
|
"claude exited with {}:\nstdout: {}\nstderr: {}",
|
|
output.status,
|
|
stdout,
|
|
stderr
|
|
);
|
|
}
|
|
|
|
if !stderr.is_empty() {
|
|
tracing::warn!("claude stderr: {stderr}");
|
|
}
|
|
|
|
let raw = String::from_utf8_lossy(&output.stdout).to_string();
|
|
Ok(parse_response(&raw, source_room))
|
|
}
|
|
|
|
/// Parse Claude's stdout into a list of documents.
|
|
///
|
|
/// Format: each doc starts with a line `=== <type> [arg]`. Body is everything
|
|
/// until the next `===` line or EOF. Types:
|
|
/// - `=== thought` → ClaudeDoc::Thought (logged, not sent)
|
|
/// - `=== room [<room_id>]` → ClaudeDoc::Message to that room (or source room if no arg)
|
|
/// - `=== dm <user_id>` → ClaudeDoc::Message as DM
|
|
/// - `=== skip` → ClaudeDoc::Skip (no-op)
|
|
///
|
|
/// Anything before the first `===` line is treated as a preamble thought.
|
|
/// Bare text with no `===` is treated as a single message to default_room.
|
|
fn parse_response(raw: &str, default_room: &OwnedRoomId) -> Vec<ClaudeDoc> {
|
|
let trimmed = raw.trim();
|
|
if trimmed.is_empty() {
|
|
return Vec::new();
|
|
}
|
|
|
|
// Walk lines, splitting on lines that start with "=== "
|
|
let mut docs = Vec::new();
|
|
let mut current_header: Option<String> = None;
|
|
let mut current_body = String::new();
|
|
let mut preamble = String::new();
|
|
|
|
for line in trimmed.lines() {
|
|
if let Some(header) = line.strip_prefix("===") {
|
|
// Flush previous doc
|
|
if let Some(h) = current_header.take() {
|
|
if let Some(doc) = build_doc(&h, current_body.trim(), default_room) {
|
|
docs.push(doc);
|
|
}
|
|
current_body.clear();
|
|
} else {
|
|
// We were collecting preamble
|
|
let p = preamble.trim();
|
|
if !p.is_empty() {
|
|
docs.push(ClaudeDoc::Thought(p.to_owned()));
|
|
}
|
|
preamble.clear();
|
|
}
|
|
current_header = Some(header.trim().to_owned());
|
|
} else if current_header.is_some() {
|
|
current_body.push_str(line);
|
|
current_body.push('\n');
|
|
} else {
|
|
preamble.push_str(line);
|
|
preamble.push('\n');
|
|
}
|
|
}
|
|
|
|
// Flush the last doc or preamble
|
|
if let Some(h) = current_header {
|
|
if let Some(doc) = build_doc(&h, current_body.trim(), default_room) {
|
|
docs.push(doc);
|
|
}
|
|
} else {
|
|
// No `===` headers at all - treat whole output as a single message
|
|
let p = preamble.trim();
|
|
if !p.is_empty() {
|
|
docs.push(ClaudeDoc::Message {
|
|
target: ResponseTarget::Room(default_room.clone()),
|
|
body: p.to_owned(),
|
|
});
|
|
}
|
|
}
|
|
|
|
docs
|
|
}
|
|
|
|
fn build_doc(header: &str, body: &str, default_room: &OwnedRoomId) -> Option<ClaudeDoc> {
|
|
let mut parts = header.splitn(2, char::is_whitespace);
|
|
let kind = parts.next().unwrap_or("").trim();
|
|
let arg = parts.next().unwrap_or("").trim();
|
|
|
|
match kind {
|
|
"skip" => Some(ClaudeDoc::Skip),
|
|
"thought" => {
|
|
if body.is_empty() {
|
|
None
|
|
} else {
|
|
Some(ClaudeDoc::Thought(body.to_owned()))
|
|
}
|
|
}
|
|
"room" => {
|
|
if body.is_empty() {
|
|
return None;
|
|
}
|
|
let target = if arg.is_empty() {
|
|
ResponseTarget::Room(default_room.clone())
|
|
} else {
|
|
match arg.parse::<OwnedRoomId>() {
|
|
Ok(rid) => ResponseTarget::Room(rid),
|
|
Err(_) => return None,
|
|
}
|
|
};
|
|
Some(ClaudeDoc::Message {
|
|
target,
|
|
body: body.to_owned(),
|
|
})
|
|
}
|
|
"dm" => {
|
|
if body.is_empty() {
|
|
return None;
|
|
}
|
|
match arg.parse::<OwnedUserId>() {
|
|
Ok(uid) => Some(ClaudeDoc::Message {
|
|
target: ResponseTarget::Dm(uid),
|
|
body: body.to_owned(),
|
|
}),
|
|
Err(_) => None,
|
|
}
|
|
}
|
|
_ => {
|
|
// Unknown header - treat body as a thought so it doesn't leak to chat
|
|
if body.is_empty() {
|
|
None
|
|
} else {
|
|
Some(ClaudeDoc::Thought(format!(
|
|
"[unknown header '{header}'] {body}"
|
|
)))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
fn test_room() -> OwnedRoomId {
|
|
"!test:example.com".parse().unwrap()
|
|
}
|
|
|
|
fn first_message(docs: &[ClaudeDoc]) -> (&ResponseTarget, &str) {
|
|
for d in docs {
|
|
if let ClaudeDoc::Message { target, body } = d {
|
|
return (target, body.as_str());
|
|
}
|
|
}
|
|
panic!("no message doc found");
|
|
}
|
|
|
|
fn assert_room(target: &ResponseTarget, expected: &str) {
|
|
match target {
|
|
ResponseTarget::Room(r) => assert_eq!(r.as_str(), expected),
|
|
ResponseTarget::Dm(_) => panic!("expected room target, got dm"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn parse_room_with_arg() {
|
|
let raw = "=== room !other:server\nhello world";
|
|
let docs = parse_response(raw, &test_room());
|
|
let (target, body) = first_message(&docs);
|
|
assert_room(target, "!other:server");
|
|
assert_eq!(body, "hello world");
|
|
}
|
|
|
|
#[test]
|
|
fn parse_room_no_arg_uses_default() {
|
|
let raw = "=== room\nhi";
|
|
let docs = parse_response(raw, &test_room());
|
|
let (target, _) = first_message(&docs);
|
|
assert_room(target, "!test:example.com");
|
|
}
|
|
|
|
#[test]
|
|
fn parse_skip() {
|
|
let raw = "=== skip";
|
|
let docs = parse_response(raw, &test_room());
|
|
assert_eq!(docs.len(), 1);
|
|
assert!(matches!(docs[0], ClaudeDoc::Skip));
|
|
}
|
|
|
|
#[test]
|
|
fn parse_plain_text_no_header() {
|
|
let raw = "just a message";
|
|
let docs = parse_response(raw, &test_room());
|
|
let (target, body) = first_message(&docs);
|
|
assert_room(target, "!test:example.com");
|
|
assert_eq!(body, "just a message");
|
|
}
|
|
|
|
#[test]
|
|
fn parse_empty() {
|
|
assert!(parse_response("", &test_room()).is_empty());
|
|
assert!(parse_response(" \n ", &test_room()).is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn parse_dm() {
|
|
let raw = "=== dm @alice:example.com\nhi alice";
|
|
let docs = parse_response(raw, &test_room());
|
|
let (target, body) = first_message(&docs);
|
|
match target {
|
|
ResponseTarget::Dm(u) => assert_eq!(u.as_str(), "@alice:example.com"),
|
|
ResponseTarget::Room(_) => panic!("expected dm target"),
|
|
}
|
|
assert_eq!(body, "hi alice");
|
|
}
|
|
|
|
#[test]
|
|
fn parse_thought() {
|
|
let raw = "=== thought\nthinking about whether to reply...";
|
|
let docs = parse_response(raw, &test_room());
|
|
assert_eq!(docs.len(), 1);
|
|
match &docs[0] {
|
|
ClaudeDoc::Thought(s) => assert_eq!(s, "thinking about whether to reply..."),
|
|
_ => panic!("expected thought"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn parse_multi_doc() {
|
|
let raw = "\
|
|
=== thought
|
|
let me check notes
|
|
|
|
=== room !x:y
|
|
hi
|
|
|
|
=== dm @u:s
|
|
private
|
|
|
|
=== skip
|
|
";
|
|
let docs = parse_response(raw, &test_room());
|
|
assert_eq!(docs.len(), 4);
|
|
assert!(matches!(docs[0], ClaudeDoc::Thought(_)));
|
|
assert!(matches!(
|
|
docs[1],
|
|
ClaudeDoc::Message {
|
|
target: ResponseTarget::Room(_),
|
|
..
|
|
}
|
|
));
|
|
assert!(matches!(
|
|
docs[2],
|
|
ClaudeDoc::Message {
|
|
target: ResponseTarget::Dm(_),
|
|
..
|
|
}
|
|
));
|
|
assert!(matches!(docs[3], ClaudeDoc::Skip));
|
|
}
|
|
|
|
#[test]
|
|
fn parse_preamble_becomes_thought() {
|
|
let raw = "preamble line\n=== room !x:y\nhello";
|
|
let docs = parse_response(raw, &test_room());
|
|
assert_eq!(docs.len(), 2);
|
|
assert!(matches!(docs[0], ClaudeDoc::Thought(_)));
|
|
assert!(matches!(docs[1], ClaudeDoc::Message { .. }));
|
|
}
|
|
|
|
#[test]
|
|
fn parse_unknown_header_becomes_thought() {
|
|
let raw = "=== mystery foo\nbody";
|
|
let docs = parse_response(raw, &test_room());
|
|
assert_eq!(docs.len(), 1);
|
|
assert!(matches!(docs[0], ClaudeDoc::Thought(_)));
|
|
}
|
|
}
|