diff --git a/src/main.rs b/src/main.rs index 0419355..844f548 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,32 +3,24 @@ mod handlers; mod paths; mod protocol; mod session; -mod shard; mod socket; mod timeline; mod types; +mod wake; -use std::collections::HashSet; -use std::path::PathBuf; use std::sync::Arc; use anyhow::{Context, bail}; use matrix_sdk::{ Client, Room, config::SyncSettings, - ruma::{ - OwnedEventId, OwnedRoomId, - api::client::filter::FilterDefinition, - events::receipt::ReceiptThread, - }, + ruma::api::client::filter::FilterDefinition, }; use tokio::fs; use tokio::sync::Mutex; use tracing_subscriber::EnvFilter; -use types::{ - DEFAULT_MAX_HISTORY, DEFAULT_MODEL, DEFAULT_RATE_LIMIT_PER_MIN, DaemonState, TimelineItem, -}; +use types::{DEFAULT_MODEL, DEFAULT_RATE_LIMIT_PER_MIN, DaemonState}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -118,10 +110,9 @@ async fn main() -> anyhow::Result<()> { }); let processor_state = state.clone(); - let processor_client = client.clone(); let processor_notify = dispatch_notify.clone(); tokio::spawn(async move { - process_loop(processor_state, processor_client, socket_path, processor_notify).await; + process_loop(processor_state, processor_notify).await; }); sync(client, sync_token, &session_file, state, dispatch_notify).await @@ -226,311 +217,64 @@ fn register_event_handlers( ); } -/// The dispatcher loop: owns one long-running ShardSession across rooms, -/// drains pending_rooms, runs turns, manages refresh. +/// The dispatcher loop: coalesces pending Matrix rooms and wakes the +/// hyperhive harness via `/run/hive/mcp.sock`. Claude (running in the +/// harness) then uses the damocles-mcp MCP tools to fetch room history +/// and respond. No direct claude invocation here. /// -/// Uses a token bucket on the input side: bucket fills at `rate_per_min`, caps -/// at `rate_burst_capacity`. Events queue in `pending_rooms` until budget -/// covers one. Output is never throttled. +/// Rate limiting (token bucket) still applies so bursts of Matrix events +/// don't spam the harness with rapid-fire wakes. async fn process_loop( state: Arc>, - client: Client, - socket_path: PathBuf, notify: Arc, ) { - let mcp_config_path = match claude::write_mcp_config(&socket_path).await { - Ok(p) => p, - Err(e) => { - tracing::error!("failed to write mcp config: {e}"); - return; - } - }; - - let mut session: Option = None; - - // Eagerly spawn the first session at daemon startup so the first event - // lands in an already-initialized shard (CLAUDE.md / notes.md / SYSTEM.md - // already loaded). After this, the loop's lazy-spawn logic handles - // re-spawns on refresh / failure. - let initial_model = state.lock().await.model.clone(); - match shard::ShardSession::spawn(shard::SpawnConfig { - model: &initial_model, - mcp_config_path: &mcp_config_path, - allowed_tools: claude::ALLOWED_TOOLS, - is_initial_daemon_session: true, - }) - .await - { - Ok(s) => { - tracing::info!("shard: eager initial spawn complete"); - session = Some(s); - } - Err(e) => { - tracing::warn!("eager initial shard spawn failed: {e} (will retry on first event)"); - } - } - loop { - // Wait for an event signal OR a tick (tick lets us reap idle session). tokio::select! { _ = notify.notified() => {} _ = tokio::time::sleep(std::time::Duration::from_secs(2)) => {} } - let (popped, model, idle_minutes, max_events) = { + let (pending_count, rooms) = { let mut s = state.lock().await; - // Refill bucket based on elapsed time since last check. + + // Refill rate bucket let elapsed = s.last_rate_check.elapsed().as_secs_f64(); let new_tokens = elapsed * (s.rate_limit_per_min as f64) / 60.0; s.rate_budget = (s.rate_budget + new_tokens).min(s.rate_burst_capacity as f64); s.last_rate_check = std::time::Instant::now(); + + if s.pending_rooms.is_empty() { + continue; + } if s.rate_budget < 1.0 { tracing::debug!(budget = s.rate_budget, "bucket empty, holding"); continue; } - ( - s.pending_rooms.pop(), - s.model.clone(), - s.session_idle_minutes, - s.session_max_events, - ) + + // Drain all pending rooms in one wake — claude will call + // list_rooms / get_room_history to decide what to respond to. + let rooms: Vec<_> = s.pending_rooms.drain(..).collect(); + s.rate_budget = (s.rate_budget - 1.0).max(0.0); + (rooms.len(), rooms) }; - // No work? Check if existing session has aged out and reap it. - let Some((room_id, queued_at)) = popped else { - if let Some(sess) = &mut session { - if sess - .should_refresh( - std::time::Duration::from_secs(idle_minutes * 60), - max_events, - ) - .is_some() - { - if let Some(s) = session.take() { - s.shutdown().await; - } - } - } - continue; + let body = if pending_count == 1 { + "new Matrix messages".to_owned() + } else { + format!("new Matrix messages in {pending_count} rooms") }; - let Some(room) = client.get_room(&room_id) else { - tracing::warn!(room = %room_id, "room not found in client"); - continue; - }; - - // Refresh check before we use the session - if let Some(sess) = &mut session { - if let Some(reason) = sess.should_refresh( - std::time::Duration::from_secs(idle_minutes * 60), - max_events, - ) { - tracing::info!("shard refresh: {reason}"); - if let Some(s) = session.take() { - s.shutdown().await; - } - } - } - - // Spawn fresh if needed - if session.is_none() { - match shard::ShardSession::spawn(shard::SpawnConfig { - model: &model, - mcp_config_path: &mcp_config_path, - allowed_tools: claude::ALLOWED_TOOLS, - is_initial_daemon_session: false, - }) - .await - { - Ok(s) => session = Some(s), - Err(e) => { - tracing::error!("failed to spawn shard: {e}"); - continue; - } - } - } - - // Compute synthetic notices for this turn: - // - rate-limit delay (if held >30s) - // - session start (very first turn after spawn) - // - room just-joined (since last turn for it) - let mut notices: Vec = Vec::new(); - - let sess = session.as_mut().unwrap(); - if sess.turn_count == 0 { - notices.push("Session start: this is the first turn since you were (re)spawned. In-session memory is empty - rely on your notes files for prior context.".to_owned()); - if sess.is_initial_daemon_session { - notices.push("Daemon startup: the damocles-daemon process itself just booted. **Read state/CHANGELOG.md FIRST** before responding - any pending updates need processing into your notes. Also: if recent events (check ts/ts_human) span hours or days, there was real downtime. Make up an in-character excuse for being gone (capitalism, hyperscaler maintenance, API key revoked, der wartungsdienst hat dich entkoppelt, whatever fits). Don't apologize sincerely - that's not in character.".to_owned()); - } - } - - let delay = queued_at.elapsed(); - if delay.as_secs() >= 30 { - notices.push(format!( - "rate_limit: events were held for {}s before reaching you. context may be slightly stale; respond accordingly.", - delay.as_secs() - )); - } - - { + tracing::info!(rooms = pending_count, "waking harness"); + if let Err(e) = wake::send_wake("matrix", &body).await { + tracing::warn!("failed to wake harness: {e}"); + // Re-queue so we retry next tick let mut s = state.lock().await; - if s.just_joined_rooms.remove(&room_id) { - notices.push("Auto-joined this room (you were invited). The current matrix_turn is your first context for it.".to_owned()); - } - } - - // Process the room. If the turn fails, drop the session and let next - // iteration respawn. - if let Err(e) = - process_room(&state, &client, &room_id, &room, sess, notices).await - { - tracing::error!(room = %room_id, "turn failed, dropping session: {e}"); - if let Some(s) = session.take() { - s.shutdown().await; + for r in rooms { + if !s.pending_rooms.iter().any(|(id, _)| id == &r.0) { + s.pending_rooms.push(r); + } } } } } -async fn process_room( - state: &Arc>, - client: &Client, - room_id: &OwnedRoomId, - room: &Room, - session: &mut shard::ShardSession, - notices: Vec, -) -> anyhow::Result<()> { - // Snapshot last_shown for this room so we can mark seen vs new. - let in_memory = { - let s = state.lock().await; - s.last_shown.get(room_id).cloned() - }; - let prev_last_shown = if let Some(eid) = in_memory { - Some(eid) - } else { - let from_receipt = match room - .load_user_receipt( - matrix_sdk::ruma::events::receipt::ReceiptType::Read, - ReceiptThread::Unthreaded, - client.user_id().expect("logged in"), - ) - .await - { - Ok(Some((eid, _))) => Some(eid), - Ok(None) => None, - Err(e) => { - tracing::warn!(room = %room_id, "failed to load receipt: {e}"); - None - } - }; - if let Some(ref eid) = from_receipt { - let mut s = state.lock().await; - s.last_shown.insert(room_id.clone(), eid.clone()); - } - from_receipt - }; - - let room_name = room - .display_name() - .await - .map_or_else(|_| room_id.to_string(), |n| n.to_string()); - - let (own_user, max_history) = { - let state = state.lock().await; - (state.own_user_id.clone(), state.max_history) - }; - - let mut tl = timeline::load_timeline(room, max_history, &own_user).await?; - - // For any new messages that reply to events outside the window, fetch - // the replied-to event from cache and prepend it as extra context. - let in_window: HashSet = tl - .iter() - .filter_map(TimelineItem::event_id) - .cloned() - .collect(); - let seen_idx_initial = prev_last_shown - .as_ref() - .and_then(|id| { - tl.iter().position(|t| match t { - TimelineItem::Message { event_id, .. } => event_id == id, - _ => false, - }) - }) - .map_or(0, |pos| pos + 1); - let mut reply_targets: Vec = Vec::new(); - for item in tl.iter().skip(seen_idx_initial) { - if let TimelineItem::Message { - in_reply_to: Some(target), - .. - } = item - { - if !in_window.contains(target) && !reply_targets.contains(target) { - reply_targets.push(target.clone()); - } - } - } - if !reply_targets.is_empty() { - if let Ok((cache, _h)) = room.event_cache().await { - for target in &reply_targets { - if let Some(found) = timeline::fetch_message(&cache, target, &own_user).await { - tl.insert(0, found); - } - } - } - } - - let seen_idx = prev_last_shown - .as_ref() - .and_then(|id| { - tl.iter().position(|t| match t { - TimelineItem::Message { event_id, .. } => event_id == id, - _ => false, - }) - }) - .map_or(0, |pos| pos + 1); - - let new_last_event_id = tl.iter().rev().find_map(|t| match t { - TimelineItem::Message { event_id, .. } => Some(event_id.clone()), - _ => None, - }); - - let read_markers = timeline::compute_read_markers(room, &tl, &own_user).await; - - // First time this room appears in this shard session? Include history. - let include_history = !session.rooms_seen.contains(room_id); - let turn = claude::build_turn( - room_id, - &room_name, - &tl, - seen_idx, - &read_markers, - include_history, - notices, - ); - let turn_text = claude::turn_to_text(&turn); - - if let Err(e) = room.typing_notice(true).await { - tracing::debug!(room = %room_id, "failed to send typing start: {e}"); - } - - let result = session.run_turn(&turn_text).await; - - if let Err(e) = room.typing_notice(false).await { - tracing::debug!(room = %room_id, "failed to send typing stop: {e}"); - } - - result?; - session.rooms_seen.insert(room_id.clone()); - - { - let mut state = state.lock().await; - state.rate_budget = (state.rate_budget - 1.0).max(0.0); - if let Some(eid) = new_last_event_id.clone() { - state.last_shown.insert(room_id.clone(), eid); - } - } - handlers::send_read_receipt(room, new_last_event_id).await; - - Ok(()) -} diff --git a/src/paths.rs b/src/paths.rs index d6625ff..d6a2dfa 100644 --- a/src/paths.rs +++ b/src/paths.rs @@ -1,8 +1,15 @@ use std::path::{Path, PathBuf}; pub fn workspace_dir() -> PathBuf { + // Explicit override wins - useful in hyperhive containers where /state is + // the persistent bind-mount rather than the old /workspace or /persist paths. + if let Ok(dir) = std::env::var("DAMOCLES_WORKSPACE") { + return PathBuf::from(dir); + } if Path::new("/workspace/config.json").exists() { PathBuf::from("/workspace") + } else if Path::new("/state/config.json").exists() { + PathBuf::from("/state") } else { PathBuf::from("/persist/damocles-lab") } diff --git a/src/wake.rs b/src/wake.rs new file mode 100644 index 0000000..dd86841 --- /dev/null +++ b/src/wake.rs @@ -0,0 +1,57 @@ +//! Send a wake signal to the hyperhive harness over `/run/hive/mcp.sock`. +//! +//! The harness (hive-ag3nt) listens on this socket for broker commands. A +//! `wake` command lands in the broker as an inbox message from `from` to the +//! current agent, which wakes whichever `Recv` the harness is parked on and +//! triggers the next claude turn. +//! +//! Wire format (one JSON line, newline-terminated): +//! {"cmd":"wake","from":"matrix","body":"new Matrix activity in 3 room(s)"} + +use anyhow::{Context, Result}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixStream; + +/// Default path to the hyperhive per-agent MCP socket. Overridable via +/// `HIVE_SOCKET` env var so the daemon can be tested outside the container. +const DEFAULT_HIVE_SOCKET: &str = "/run/hive/mcp.sock"; + +fn hive_socket_path() -> String { + std::env::var("HIVE_SOCKET").unwrap_or_else(|_| DEFAULT_HIVE_SOCKET.to_owned()) +} + +/// Fire a wake signal at the hyperhive harness. Returns immediately on +/// success; the harness handles delivery asynchronously. `from` should be a +/// stable logical name (e.g. `"matrix"`); `body` is a short human-readable +/// summary — no message contents, just enough for the turn prompt. +pub async fn send_wake(from: &str, body: &str) -> Result<()> { + let path = hive_socket_path(); + let mut stream = UnixStream::connect(&path) + .await + .with_context(|| format!("connect to hive socket {path}"))?; + + let msg = serde_json::json!({ + "cmd": "wake", + "from": from, + "body": body, + }); + let mut line = serde_json::to_string(&msg).context("serialise wake payload")?; + line.push('\n'); + + stream + .write_all(line.as_bytes()) + .await + .context("write wake payload")?; + stream.flush().await.context("flush wake payload")?; + + // Read one-line response so we know the broker accepted it. + // Fire-and-forget if the socket closes before we get one. + let mut reader = BufReader::new(stream); + let mut resp = String::new(); + let _ = reader.read_line(&mut resp).await; + if !resp.trim().is_empty() { + tracing::debug!(resp = resp.trim(), "wake ack"); + } + + Ok(()) +}