From 6ce85bd6f229d497ff8c6c0ea621c6c4be048351 Mon Sep 17 00:00:00 2001 From: damocles Date: Sun, 17 May 2026 11:05:29 +0200 Subject: [PATCH] reminder: file_path delivery + extract scheduler into own module --- CLAUDE.md | 3 + TODO.md | 2 +- hive-c0re/src/main.rs | 45 +------- hive-c0re/src/reminder_scheduler.rs | 173 ++++++++++++++++++++++++++++ 4 files changed, 181 insertions(+), 42 deletions(-) create mode 100644 hive-c0re/src/reminder_scheduler.rs diff --git a/CLAUDE.md b/CLAUDE.md index 8a52318..22b8c5f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -23,6 +23,9 @@ hive-c0re/ host daemon + CLI (one binary, subcommand-dispatched) hourly vacuum of delivered>30d src/approvals.rs sqlite Approval queue + kinds src/operator_questions.rs sqlite question queue backing `ask_operator` + src/reminder_scheduler.rs 5s poll loop: drains due reminders, + resolves file_path container→host, persists + payload + delivers pointer string src/events_vacuum.rs host-side hourly sweep of every agent's /state/hyperhive-events.sqlite src/crash_watch.rs poll every 10s; fire HelperEvent::ContainerCrash diff --git a/TODO.md b/TODO.md index 04aa69f..a83bd80 100644 --- a/TODO.md +++ b/TODO.md @@ -15,7 +15,7 @@ - Per-agent reminder limits (burst capacity, rate limiting) - ~~**Expose `remind` MCP tool**~~ ✓ fixed — `mcp__hyperhive__remind` now on `AgentServer`; takes `message`, exactly one of `delay_seconds` / `at_unix_timestamp`, optional `file_path`. Manager surface still missing (no `ManagerRequest::Remind` variant) — separate item below. - **Manager-side `remind`**: mirror of the agent tool but on `ManagerServer`. Needs `ManagerRequest::Remind` variant in hive-sh4re, dispatch in manager_server.rs, MCP tool wiring. -- **File path delivery**: currently unused in scheduler delivery loop — implement file write/delivery to /state//reminders/ or similar (also needed for the overflow-check escape hatch above to actually do anything useful). +- ~~**File path delivery**~~ ✓ fixed — scheduler now writes the reminder body to the requested `file_path` (mapped from container `/agents//state/...` to host `/var/lib/hyperhive/agents//state/...`) and delivers a short pointer message in its place. Path-traversal + foreign-agent-state writes are rejected; on rejection or write failure the body falls back to inline delivery with a noted warning. New module `hive-c0re/src/reminder_scheduler.rs` (extracted from main.rs). - ~~**Orphan reminders**~~ ✓ fixed — `Broker::deliver_reminder` wraps the inbox INSERT + reminders UPDATE in one sqlite transaction; partial failure can no longer cause duplicate delivery on the next tick. - ~~**Unbounded batches**~~ ✓ fixed — scheduler now calls `get_due_reminders(REMINDER_BATCH_LIMIT)` (cap = 100/tick); overflow stays due and gets picked up next cycle. - **Scheduler shutdown**: add graceful shutdown signal when coordinator is destroyed (currently runs forever) diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index ea84f8c..8918e68 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -21,6 +21,7 @@ mod manager_server; mod meta; mod migrate; mod operator_questions; +mod reminder_scheduler; mod server; use coordinator::Coordinator; @@ -86,12 +87,6 @@ enum Cmd { Deny { id: i64 }, } -/// Per-tick cap on reminders the scheduler delivers. Anything over this -/// stays due in the table and gets picked up on the next 5s tick — keeps -/// a 10k-deep backlog from flooding the broker (or hogging its mutex) in -/// one shot. -const REMINDER_BATCH_LIMIT: u64 = 100; - #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() @@ -171,41 +166,9 @@ async fn main() -> Result<()> { // when a previously-running container goes away without an // operator-initiated transient state. crash_watch::spawn(coord.clone()); - // Reminder scheduler: checks for due reminders every 5 seconds, - // delivers them atomically (insert inbox + mark sent in one - // sqlite transaction so a transient failure on the second step - // can never produce a duplicate next tick). Per-cycle batch - // limit caps the burst — leftover reminders stay due and get - // picked up on the next tick instead of monopolising the broker - // mutex. - let reminder_coord = coord.clone(); - tokio::spawn(async move { - loop { - match reminder_coord - .broker - .get_due_reminders(REMINDER_BATCH_LIMIT) - { - Ok(reminders) => { - for (agent, id, message, _file_path) in reminders { - if let Err(e) = - reminder_coord.broker.deliver_reminder(id, &agent, &message) - { - tracing::warn!( - reminder_id = id, - %agent, - error = ?e, - "failed to deliver reminder" - ); - } - } - } - Err(e) => { - tracing::warn!(error = ?e, "failed to query due reminders"); - } - } - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } - }); + // Reminder scheduler: drains due reminders + handles + // file_path payload persistence. See reminder_scheduler.rs. + reminder_scheduler::spawn(coord.clone()); let dash_coord = coord.clone(); tokio::spawn(async move { if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await { diff --git a/hive-c0re/src/reminder_scheduler.rs b/hive-c0re/src/reminder_scheduler.rs new file mode 100644 index 0000000..c21b752 --- /dev/null +++ b/hive-c0re/src/reminder_scheduler.rs @@ -0,0 +1,173 @@ +//! Background loop that drains due reminders out of the broker and +//! delivers them as inbox messages. Mirrors the `events_vacuum` / +//! `crash_watch` shape — a single `spawn(coord)` entry point started +//! from `main.rs`. +//! +//! File-path semantics: a reminder may carry a `file_path` (the +//! agent-visible path inside its container). On delivery we: +//! +//! - Translate the container path (`/agents//state/foo.md`) to +//! the host path (`/var/lib/hyperhive/agents//state/foo.md`) +//! so hive-c0re can write to it from outside the container. +//! - Reject anything that isn't under the agent's own state subtree, +//! or that contains `..` (path traversal). Falling outside the +//! allowed prefix means the file write is skipped and the original +//! message is delivered inline (with a noted warning) — the +//! reminder still fires, just without the payload split. +//! - Write the reminder body to disk and deliver a short pointer +//! message in its place, so the agent's inbox/wake-prompt stays +//! small and the bulky payload can be read out of band. +//! +//! Atomicity of the inbox INSERT + `reminders.sent_at` UPDATE is handled +//! inside `Broker::deliver_reminder`; this module only computes the +//! body string before calling it. + +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Duration; + +use crate::coordinator::Coordinator; + +/// Per-tick cap on reminders delivered. Anything over this stays due +/// in the table and gets picked up on the next tick — keeps a +/// 10k-deep backlog from flooding the broker (or hogging the broker +/// mutex) in one shot. +const REMINDER_BATCH_LIMIT: u64 = 100; + +/// Poll interval. Trade-off between latency on a freshly due reminder +/// and CPU spent on empty sweeps; 5s matches the original inline +/// scheduler. +const POLL_INTERVAL: Duration = Duration::from_secs(5); + +pub fn spawn(coord: Arc) { + tokio::spawn(async move { + loop { + tick(&coord); + tokio::time::sleep(POLL_INTERVAL).await; + } + }); +} + +fn tick(coord: &Arc) { + let due = match coord.broker.get_due_reminders(REMINDER_BATCH_LIMIT) { + Ok(rows) => rows, + Err(e) => { + tracing::warn!(error = ?e, "failed to query due reminders"); + return; + } + }; + for (agent, id, message, file_path) in due { + let body = prepare_body(&agent, &message, file_path.as_deref()); + if let Err(e) = coord.broker.deliver_reminder(id, &agent, &body) { + tracing::warn!( + reminder_id = id, + %agent, + error = ?e, + "failed to deliver reminder" + ); + } + } +} + +/// Build the inbox body for a due reminder. When `file_path` is None +/// the body is the original message verbatim. When set, we attempt to +/// persist the message body to the requested file and return a short +/// pointer string instead. Failures (bad prefix, write error, missing +/// parent) fall back to inline delivery with a noted warning so the +/// reminder still fires. +fn prepare_body(agent: &str, message: &str, file_path: Option<&str>) -> String { + let Some(req_path) = file_path else { + return message.to_owned(); + }; + let host_path = match resolve_host_path(agent, req_path) { + Ok(p) => p, + Err(reason) => { + tracing::warn!(%agent, %req_path, %reason, "reminder file_path rejected; delivering inline"); + return format!( + "[reminder file_path '{req_path}' rejected: {reason}; delivering body inline]\n\n{message}" + ); + } + }; + if let Some(parent) = host_path.parent() + && let Err(e) = std::fs::create_dir_all(parent) + { + tracing::warn!(%agent, path = %host_path.display(), error = ?e, "reminder file_path parent mkdir failed; delivering inline"); + return format!( + "[reminder file_path '{req_path}' parent dir create failed: {e}; delivering body inline]\n\n{message}" + ); + } + if let Err(e) = std::fs::write(&host_path, message) { + tracing::warn!(%agent, path = %host_path.display(), error = ?e, "reminder file_path write failed; delivering inline"); + return format!( + "[reminder file_path '{req_path}' write failed: {e}; delivering body inline]\n\n{message}" + ); + } + let bytes = message.len(); + tracing::info!(%agent, path = %host_path.display(), bytes, "reminder body written to file"); + format!("reminder body persisted to `{req_path}` ({bytes} bytes); read with your filesystem tools") +} + +/// Map an agent-visible container path to the matching host path, +/// validating that it lives under the agent's own state subtree and +/// doesn't try to traverse out via `..`. Returns the host `PathBuf` on +/// success, or a human-readable reason string on rejection. +fn resolve_host_path(agent: &str, req_path: &str) -> Result { + let prefix = format!("/agents/{agent}/state/"); + let Some(rel) = req_path.strip_prefix(&prefix) else { + return Err(format!( + "must be absolute and under `{prefix}` (got `{req_path}`)" + )); + }; + let rel_path = Path::new(rel); + for comp in rel_path.components() { + match comp { + std::path::Component::Normal(_) => {} + other => { + return Err(format!( + "path component `{other:?}` not allowed (no traversal / absolute / root)" + )); + } + } + } + Ok(Coordinator::agent_notes_dir(agent).join(rel_path)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn rejects_paths_outside_agent_state() { + assert!(resolve_host_path("foo", "/etc/passwd").is_err()); + assert!(resolve_host_path("foo", "/agents/bar/state/x.md").is_err()); + assert!(resolve_host_path("foo", "relative.md").is_err()); + } + + #[test] + fn rejects_traversal() { + assert!(resolve_host_path("foo", "/agents/foo/state/../../etc/passwd").is_err()); + assert!(resolve_host_path("foo", "/agents/foo/state/./x.md").is_err()); + } + + #[test] + fn accepts_well_formed_path() { + let p = resolve_host_path("foo", "/agents/foo/state/reminders/123.md").unwrap(); + assert_eq!( + p, + PathBuf::from("/var/lib/hyperhive/agents/foo/state/reminders/123.md") + ); + } + + #[test] + fn prepare_body_passthrough_when_no_file_path() { + let s = prepare_body("foo", "hello world", None); + assert_eq!(s, "hello world"); + } + + #[test] + fn prepare_body_falls_back_inline_on_bad_path() { + let s = prepare_body("foo", "payload", Some("/etc/passwd")); + assert!(s.starts_with("[reminder file_path '/etc/passwd' rejected:")); + assert!(s.contains("payload")); + } +}