migrate to hyperhive: replace shard with wake signal to hive socket

This commit is contained in:
damocles 2026-05-16 03:33:50 +02:00
parent b7d502bddf
commit bdce7c109b
3 changed files with 99 additions and 291 deletions

View file

@ -3,32 +3,24 @@ mod handlers;
mod paths;
mod protocol;
mod session;
mod shard;
mod socket;
mod timeline;
mod types;
mod wake;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{Context, bail};
use matrix_sdk::{
Client, Room,
config::SyncSettings,
ruma::{
OwnedEventId, OwnedRoomId,
api::client::filter::FilterDefinition,
events::receipt::ReceiptThread,
},
ruma::api::client::filter::FilterDefinition,
};
use tokio::fs;
use tokio::sync::Mutex;
use tracing_subscriber::EnvFilter;
use types::{
DEFAULT_MAX_HISTORY, DEFAULT_MODEL, DEFAULT_RATE_LIMIT_PER_MIN, DaemonState, TimelineItem,
};
use types::{DEFAULT_MODEL, DEFAULT_RATE_LIMIT_PER_MIN, DaemonState};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@ -118,10 +110,9 @@ async fn main() -> anyhow::Result<()> {
});
let processor_state = state.clone();
let processor_client = client.clone();
let processor_notify = dispatch_notify.clone();
tokio::spawn(async move {
process_loop(processor_state, processor_client, socket_path, processor_notify).await;
process_loop(processor_state, processor_notify).await;
});
sync(client, sync_token, &session_file, state, dispatch_notify).await
@ -226,311 +217,64 @@ fn register_event_handlers(
);
}
/// The dispatcher loop: owns one long-running ShardSession across rooms,
/// drains pending_rooms, runs turns, manages refresh.
/// The dispatcher loop: coalesces pending Matrix rooms and wakes the
/// hyperhive harness via `/run/hive/mcp.sock`. Claude (running in the
/// harness) then uses the damocles-mcp MCP tools to fetch room history
/// and respond. No direct claude invocation here.
///
/// Uses a token bucket on the input side: bucket fills at `rate_per_min`, caps
/// at `rate_burst_capacity`. Events queue in `pending_rooms` until budget
/// covers one. Output is never throttled.
/// Rate limiting (token bucket) still applies so bursts of Matrix events
/// don't spam the harness with rapid-fire wakes.
async fn process_loop(
state: Arc<Mutex<DaemonState>>,
client: Client,
socket_path: PathBuf,
notify: Arc<tokio::sync::Notify>,
) {
let mcp_config_path = match claude::write_mcp_config(&socket_path).await {
Ok(p) => p,
Err(e) => {
tracing::error!("failed to write mcp config: {e}");
return;
}
};
let mut session: Option<shard::ShardSession> = None;
// Eagerly spawn the first session at daemon startup so the first event
// lands in an already-initialized shard (CLAUDE.md / notes.md / SYSTEM.md
// already loaded). After this, the loop's lazy-spawn logic handles
// re-spawns on refresh / failure.
let initial_model = state.lock().await.model.clone();
match shard::ShardSession::spawn(shard::SpawnConfig {
model: &initial_model,
mcp_config_path: &mcp_config_path,
allowed_tools: claude::ALLOWED_TOOLS,
is_initial_daemon_session: true,
})
.await
{
Ok(s) => {
tracing::info!("shard: eager initial spawn complete");
session = Some(s);
}
Err(e) => {
tracing::warn!("eager initial shard spawn failed: {e} (will retry on first event)");
}
}
loop {
// Wait for an event signal OR a tick (tick lets us reap idle session).
tokio::select! {
_ = notify.notified() => {}
_ = tokio::time::sleep(std::time::Duration::from_secs(2)) => {}
}
let (popped, model, idle_minutes, max_events) = {
let (pending_count, rooms) = {
let mut s = state.lock().await;
// Refill bucket based on elapsed time since last check.
// Refill rate bucket
let elapsed = s.last_rate_check.elapsed().as_secs_f64();
let new_tokens = elapsed * (s.rate_limit_per_min as f64) / 60.0;
s.rate_budget = (s.rate_budget + new_tokens).min(s.rate_burst_capacity as f64);
s.last_rate_check = std::time::Instant::now();
if s.pending_rooms.is_empty() {
continue;
}
if s.rate_budget < 1.0 {
tracing::debug!(budget = s.rate_budget, "bucket empty, holding");
continue;
}
(
s.pending_rooms.pop(),
s.model.clone(),
s.session_idle_minutes,
s.session_max_events,
)
// Drain all pending rooms in one wake — claude will call
// list_rooms / get_room_history to decide what to respond to.
let rooms: Vec<_> = s.pending_rooms.drain(..).collect();
s.rate_budget = (s.rate_budget - 1.0).max(0.0);
(rooms.len(), rooms)
};
// No work? Check if existing session has aged out and reap it.
let Some((room_id, queued_at)) = popped else {
if let Some(sess) = &mut session {
if sess
.should_refresh(
std::time::Duration::from_secs(idle_minutes * 60),
max_events,
)
.is_some()
{
if let Some(s) = session.take() {
s.shutdown().await;
}
}
}
continue;
let body = if pending_count == 1 {
"new Matrix messages".to_owned()
} else {
format!("new Matrix messages in {pending_count} rooms")
};
let Some(room) = client.get_room(&room_id) else {
tracing::warn!(room = %room_id, "room not found in client");
continue;
};
// Refresh check before we use the session
if let Some(sess) = &mut session {
if let Some(reason) = sess.should_refresh(
std::time::Duration::from_secs(idle_minutes * 60),
max_events,
) {
tracing::info!("shard refresh: {reason}");
if let Some(s) = session.take() {
s.shutdown().await;
}
}
}
// Spawn fresh if needed
if session.is_none() {
match shard::ShardSession::spawn(shard::SpawnConfig {
model: &model,
mcp_config_path: &mcp_config_path,
allowed_tools: claude::ALLOWED_TOOLS,
is_initial_daemon_session: false,
})
.await
{
Ok(s) => session = Some(s),
Err(e) => {
tracing::error!("failed to spawn shard: {e}");
continue;
}
}
}
// Compute synthetic notices for this turn:
// - rate-limit delay (if held >30s)
// - session start (very first turn after spawn)
// - room just-joined (since last turn for it)
let mut notices: Vec<String> = Vec::new();
let sess = session.as_mut().unwrap();
if sess.turn_count == 0 {
notices.push("Session start: this is the first turn since you were (re)spawned. In-session memory is empty - rely on your notes files for prior context.".to_owned());
if sess.is_initial_daemon_session {
notices.push("Daemon startup: the damocles-daemon process itself just booted. **Read state/CHANGELOG.md FIRST** before responding - any pending updates need processing into your notes. Also: if recent events (check ts/ts_human) span hours or days, there was real downtime. Make up an in-character excuse for being gone (capitalism, hyperscaler maintenance, API key revoked, der wartungsdienst hat dich entkoppelt, whatever fits). Don't apologize sincerely - that's not in character.".to_owned());
}
}
let delay = queued_at.elapsed();
if delay.as_secs() >= 30 {
notices.push(format!(
"rate_limit: events were held for {}s before reaching you. context may be slightly stale; respond accordingly.",
delay.as_secs()
));
}
{
tracing::info!(rooms = pending_count, "waking harness");
if let Err(e) = wake::send_wake("matrix", &body).await {
tracing::warn!("failed to wake harness: {e}");
// Re-queue so we retry next tick
let mut s = state.lock().await;
if s.just_joined_rooms.remove(&room_id) {
notices.push("Auto-joined this room (you were invited). The current matrix_turn is your first context for it.".to_owned());
}
}
// Process the room. If the turn fails, drop the session and let next
// iteration respawn.
if let Err(e) =
process_room(&state, &client, &room_id, &room, sess, notices).await
{
tracing::error!(room = %room_id, "turn failed, dropping session: {e}");
if let Some(s) = session.take() {
s.shutdown().await;
for r in rooms {
if !s.pending_rooms.iter().any(|(id, _)| id == &r.0) {
s.pending_rooms.push(r);
}
}
}
}
}
async fn process_room(
state: &Arc<Mutex<DaemonState>>,
client: &Client,
room_id: &OwnedRoomId,
room: &Room,
session: &mut shard::ShardSession,
notices: Vec<String>,
) -> anyhow::Result<()> {
// Snapshot last_shown for this room so we can mark seen vs new.
let in_memory = {
let s = state.lock().await;
s.last_shown.get(room_id).cloned()
};
let prev_last_shown = if let Some(eid) = in_memory {
Some(eid)
} else {
let from_receipt = match room
.load_user_receipt(
matrix_sdk::ruma::events::receipt::ReceiptType::Read,
ReceiptThread::Unthreaded,
client.user_id().expect("logged in"),
)
.await
{
Ok(Some((eid, _))) => Some(eid),
Ok(None) => None,
Err(e) => {
tracing::warn!(room = %room_id, "failed to load receipt: {e}");
None
}
};
if let Some(ref eid) = from_receipt {
let mut s = state.lock().await;
s.last_shown.insert(room_id.clone(), eid.clone());
}
from_receipt
};
let room_name = room
.display_name()
.await
.map_or_else(|_| room_id.to_string(), |n| n.to_string());
let (own_user, max_history) = {
let state = state.lock().await;
(state.own_user_id.clone(), state.max_history)
};
let mut tl = timeline::load_timeline(room, max_history, &own_user).await?;
// For any new messages that reply to events outside the window, fetch
// the replied-to event from cache and prepend it as extra context.
let in_window: HashSet<OwnedEventId> = tl
.iter()
.filter_map(TimelineItem::event_id)
.cloned()
.collect();
let seen_idx_initial = prev_last_shown
.as_ref()
.and_then(|id| {
tl.iter().position(|t| match t {
TimelineItem::Message { event_id, .. } => event_id == id,
_ => false,
})
})
.map_or(0, |pos| pos + 1);
let mut reply_targets: Vec<OwnedEventId> = Vec::new();
for item in tl.iter().skip(seen_idx_initial) {
if let TimelineItem::Message {
in_reply_to: Some(target),
..
} = item
{
if !in_window.contains(target) && !reply_targets.contains(target) {
reply_targets.push(target.clone());
}
}
}
if !reply_targets.is_empty() {
if let Ok((cache, _h)) = room.event_cache().await {
for target in &reply_targets {
if let Some(found) = timeline::fetch_message(&cache, target, &own_user).await {
tl.insert(0, found);
}
}
}
}
let seen_idx = prev_last_shown
.as_ref()
.and_then(|id| {
tl.iter().position(|t| match t {
TimelineItem::Message { event_id, .. } => event_id == id,
_ => false,
})
})
.map_or(0, |pos| pos + 1);
let new_last_event_id = tl.iter().rev().find_map(|t| match t {
TimelineItem::Message { event_id, .. } => Some(event_id.clone()),
_ => None,
});
let read_markers = timeline::compute_read_markers(room, &tl, &own_user).await;
// First time this room appears in this shard session? Include history.
let include_history = !session.rooms_seen.contains(room_id);
let turn = claude::build_turn(
room_id,
&room_name,
&tl,
seen_idx,
&read_markers,
include_history,
notices,
);
let turn_text = claude::turn_to_text(&turn);
if let Err(e) = room.typing_notice(true).await {
tracing::debug!(room = %room_id, "failed to send typing start: {e}");
}
let result = session.run_turn(&turn_text).await;
if let Err(e) = room.typing_notice(false).await {
tracing::debug!(room = %room_id, "failed to send typing stop: {e}");
}
result?;
session.rooms_seen.insert(room_id.clone());
{
let mut state = state.lock().await;
state.rate_budget = (state.rate_budget - 1.0).max(0.0);
if let Some(eid) = new_last_event_id.clone() {
state.last_shown.insert(room_id.clone(), eid);
}
}
handlers::send_read_receipt(room, new_last_event_id).await;
Ok(())
}

View file

@ -1,8 +1,15 @@
use std::path::{Path, PathBuf};
pub fn workspace_dir() -> PathBuf {
// Explicit override wins - useful in hyperhive containers where /state is
// the persistent bind-mount rather than the old /workspace or /persist paths.
if let Ok(dir) = std::env::var("DAMOCLES_WORKSPACE") {
return PathBuf::from(dir);
}
if Path::new("/workspace/config.json").exists() {
PathBuf::from("/workspace")
} else if Path::new("/state/config.json").exists() {
PathBuf::from("/state")
} else {
PathBuf::from("/persist/damocles-lab")
}

57
src/wake.rs Normal file
View file

@ -0,0 +1,57 @@
//! Send a wake signal to the hyperhive harness over `/run/hive/mcp.sock`.
//!
//! The harness (hive-ag3nt) listens on this socket for broker commands. A
//! `wake` command lands in the broker as an inbox message from `from` to the
//! current agent, which wakes whichever `Recv` the harness is parked on and
//! triggers the next claude turn.
//!
//! Wire format (one JSON line, newline-terminated):
//! {"cmd":"wake","from":"matrix","body":"new Matrix activity in 3 room(s)"}
use anyhow::{Context, Result};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
/// Default path to the hyperhive per-agent MCP socket. Overridable via
/// `HIVE_SOCKET` env var so the daemon can be tested outside the container.
const DEFAULT_HIVE_SOCKET: &str = "/run/hive/mcp.sock";
fn hive_socket_path() -> String {
std::env::var("HIVE_SOCKET").unwrap_or_else(|_| DEFAULT_HIVE_SOCKET.to_owned())
}
/// Fire a wake signal at the hyperhive harness. Returns immediately on
/// success; the harness handles delivery asynchronously. `from` should be a
/// stable logical name (e.g. `"matrix"`); `body` is a short human-readable
/// summary — no message contents, just enough for the turn prompt.
pub async fn send_wake(from: &str, body: &str) -> Result<()> {
let path = hive_socket_path();
let mut stream = UnixStream::connect(&path)
.await
.with_context(|| format!("connect to hive socket {path}"))?;
let msg = serde_json::json!({
"cmd": "wake",
"from": from,
"body": body,
});
let mut line = serde_json::to_string(&msg).context("serialise wake payload")?;
line.push('\n');
stream
.write_all(line.as_bytes())
.await
.context("write wake payload")?;
stream.flush().await.context("flush wake payload")?;
// Read one-line response so we know the broker accepted it.
// Fire-and-forget if the socket closes before we get one.
let mut reader = BufReader::new(stream);
let mut resp = String::new();
let _ = reader.read_line(&mut resp).await;
if !resp.trim().is_empty() {
tracing::debug!(resp = resp.trim(), "wake ack");
}
Ok(())
}