reminder: file_path delivery + extract scheduler into own module

This commit is contained in:
damocles 2026-05-17 11:05:29 +02:00
parent f2484b5e78
commit 6ce85bd6f2
4 changed files with 181 additions and 42 deletions

View file

@ -23,6 +23,9 @@ hive-c0re/ host daemon + CLI (one binary, subcommand-dispatched)
hourly vacuum of delivered>30d hourly vacuum of delivered>30d
src/approvals.rs sqlite Approval queue + kinds src/approvals.rs sqlite Approval queue + kinds
src/operator_questions.rs sqlite question queue backing `ask_operator` src/operator_questions.rs sqlite question queue backing `ask_operator`
src/reminder_scheduler.rs 5s poll loop: drains due reminders,
resolves file_path container→host, persists
payload + delivers pointer string
src/events_vacuum.rs host-side hourly sweep of every agent's src/events_vacuum.rs host-side hourly sweep of every agent's
/state/hyperhive-events.sqlite /state/hyperhive-events.sqlite
src/crash_watch.rs poll every 10s; fire HelperEvent::ContainerCrash src/crash_watch.rs poll every 10s; fire HelperEvent::ContainerCrash

View file

@ -15,7 +15,7 @@
- Per-agent reminder limits (burst capacity, rate limiting) - Per-agent reminder limits (burst capacity, rate limiting)
- ~~**Expose `remind` MCP tool**~~ ✓ fixed — `mcp__hyperhive__remind` now on `AgentServer`; takes `message`, exactly one of `delay_seconds` / `at_unix_timestamp`, optional `file_path`. Manager surface still missing (no `ManagerRequest::Remind` variant) — separate item below. - ~~**Expose `remind` MCP tool**~~ ✓ fixed — `mcp__hyperhive__remind` now on `AgentServer`; takes `message`, exactly one of `delay_seconds` / `at_unix_timestamp`, optional `file_path`. Manager surface still missing (no `ManagerRequest::Remind` variant) — separate item below.
- **Manager-side `remind`**: mirror of the agent tool but on `ManagerServer`. Needs `ManagerRequest::Remind` variant in hive-sh4re, dispatch in manager_server.rs, MCP tool wiring. - **Manager-side `remind`**: mirror of the agent tool but on `ManagerServer`. Needs `ManagerRequest::Remind` variant in hive-sh4re, dispatch in manager_server.rs, MCP tool wiring.
- **File path delivery**: currently unused in scheduler delivery loop — implement file write/delivery to /state/<agent>/reminders/ or similar (also needed for the overflow-check escape hatch above to actually do anything useful). - ~~**File path delivery**~~ ✓ fixed — scheduler now writes the reminder body to the requested `file_path` (mapped from container `/agents/<agent>/state/...` to host `/var/lib/hyperhive/agents/<agent>/state/...`) and delivers a short pointer message in its place. Path-traversal + foreign-agent-state writes are rejected; on rejection or write failure the body falls back to inline delivery with a noted warning. New module `hive-c0re/src/reminder_scheduler.rs` (extracted from main.rs).
- ~~**Orphan reminders**~~ ✓ fixed — `Broker::deliver_reminder` wraps the inbox INSERT + reminders UPDATE in one sqlite transaction; partial failure can no longer cause duplicate delivery on the next tick. - ~~**Orphan reminders**~~ ✓ fixed — `Broker::deliver_reminder` wraps the inbox INSERT + reminders UPDATE in one sqlite transaction; partial failure can no longer cause duplicate delivery on the next tick.
- ~~**Unbounded batches**~~ ✓ fixed — scheduler now calls `get_due_reminders(REMINDER_BATCH_LIMIT)` (cap = 100/tick); overflow stays due and gets picked up next cycle. - ~~**Unbounded batches**~~ ✓ fixed — scheduler now calls `get_due_reminders(REMINDER_BATCH_LIMIT)` (cap = 100/tick); overflow stays due and gets picked up next cycle.
- **Scheduler shutdown**: add graceful shutdown signal when coordinator is destroyed (currently runs forever) - **Scheduler shutdown**: add graceful shutdown signal when coordinator is destroyed (currently runs forever)

View file

@ -21,6 +21,7 @@ mod manager_server;
mod meta; mod meta;
mod migrate; mod migrate;
mod operator_questions; mod operator_questions;
mod reminder_scheduler;
mod server; mod server;
use coordinator::Coordinator; use coordinator::Coordinator;
@ -86,12 +87,6 @@ enum Cmd {
Deny { id: i64 }, Deny { id: i64 },
} }
/// Per-tick cap on reminders the scheduler delivers. Anything over this
/// stays due in the table and gets picked up on the next 5s tick — keeps
/// a 10k-deep backlog from flooding the broker (or hogging its mutex) in
/// one shot.
const REMINDER_BATCH_LIMIT: u64 = 100;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
tracing_subscriber::fmt() tracing_subscriber::fmt()
@ -171,41 +166,9 @@ async fn main() -> Result<()> {
// when a previously-running container goes away without an // when a previously-running container goes away without an
// operator-initiated transient state. // operator-initiated transient state.
crash_watch::spawn(coord.clone()); crash_watch::spawn(coord.clone());
// Reminder scheduler: checks for due reminders every 5 seconds, // Reminder scheduler: drains due reminders + handles
// delivers them atomically (insert inbox + mark sent in one // file_path payload persistence. See reminder_scheduler.rs.
// sqlite transaction so a transient failure on the second step reminder_scheduler::spawn(coord.clone());
// can never produce a duplicate next tick). Per-cycle batch
// limit caps the burst — leftover reminders stay due and get
// picked up on the next tick instead of monopolising the broker
// mutex.
let reminder_coord = coord.clone();
tokio::spawn(async move {
loop {
match reminder_coord
.broker
.get_due_reminders(REMINDER_BATCH_LIMIT)
{
Ok(reminders) => {
for (agent, id, message, _file_path) in reminders {
if let Err(e) =
reminder_coord.broker.deliver_reminder(id, &agent, &message)
{
tracing::warn!(
reminder_id = id,
%agent,
error = ?e,
"failed to deliver reminder"
);
}
}
}
Err(e) => {
tracing::warn!(error = ?e, "failed to query due reminders");
}
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});
let dash_coord = coord.clone(); let dash_coord = coord.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await { if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {

View file

@ -0,0 +1,173 @@
//! Background loop that drains due reminders out of the broker and
//! delivers them as inbox messages. Mirrors the `events_vacuum` /
//! `crash_watch` shape — a single `spawn(coord)` entry point started
//! from `main.rs`.
//!
//! File-path semantics: a reminder may carry a `file_path` (the
//! agent-visible path inside its container). On delivery we:
//!
//! - Translate the container path (`/agents/<agent>/state/foo.md`) to
//! the host path (`/var/lib/hyperhive/agents/<agent>/state/foo.md`)
//! so hive-c0re can write to it from outside the container.
//! - Reject anything that isn't under the agent's own state subtree,
//! or that contains `..` (path traversal). Falling outside the
//! allowed prefix means the file write is skipped and the original
//! message is delivered inline (with a noted warning) — the
//! reminder still fires, just without the payload split.
//! - Write the reminder body to disk and deliver a short pointer
//! message in its place, so the agent's inbox/wake-prompt stays
//! small and the bulky payload can be read out of band.
//!
//! Atomicity of the inbox INSERT + `reminders.sent_at` UPDATE is handled
//! inside `Broker::deliver_reminder`; this module only computes the
//! body string before calling it.
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use crate::coordinator::Coordinator;
/// Per-tick cap on reminders delivered. Anything over this stays due
/// in the table and gets picked up on the next tick — keeps a
/// 10k-deep backlog from flooding the broker (or hogging the broker
/// mutex) in one shot.
const REMINDER_BATCH_LIMIT: u64 = 100;
/// Poll interval. Trade-off between latency on a freshly due reminder
/// and CPU spent on empty sweeps; 5s matches the original inline
/// scheduler.
const POLL_INTERVAL: Duration = Duration::from_secs(5);
pub fn spawn(coord: Arc<Coordinator>) {
tokio::spawn(async move {
loop {
tick(&coord);
tokio::time::sleep(POLL_INTERVAL).await;
}
});
}
fn tick(coord: &Arc<Coordinator>) {
let due = match coord.broker.get_due_reminders(REMINDER_BATCH_LIMIT) {
Ok(rows) => rows,
Err(e) => {
tracing::warn!(error = ?e, "failed to query due reminders");
return;
}
};
for (agent, id, message, file_path) in due {
let body = prepare_body(&agent, &message, file_path.as_deref());
if let Err(e) = coord.broker.deliver_reminder(id, &agent, &body) {
tracing::warn!(
reminder_id = id,
%agent,
error = ?e,
"failed to deliver reminder"
);
}
}
}
/// Build the inbox body for a due reminder. When `file_path` is None
/// the body is the original message verbatim. When set, we attempt to
/// persist the message body to the requested file and return a short
/// pointer string instead. Failures (bad prefix, write error, missing
/// parent) fall back to inline delivery with a noted warning so the
/// reminder still fires.
fn prepare_body(agent: &str, message: &str, file_path: Option<&str>) -> String {
let Some(req_path) = file_path else {
return message.to_owned();
};
let host_path = match resolve_host_path(agent, req_path) {
Ok(p) => p,
Err(reason) => {
tracing::warn!(%agent, %req_path, %reason, "reminder file_path rejected; delivering inline");
return format!(
"[reminder file_path '{req_path}' rejected: {reason}; delivering body inline]\n\n{message}"
);
}
};
if let Some(parent) = host_path.parent()
&& let Err(e) = std::fs::create_dir_all(parent)
{
tracing::warn!(%agent, path = %host_path.display(), error = ?e, "reminder file_path parent mkdir failed; delivering inline");
return format!(
"[reminder file_path '{req_path}' parent dir create failed: {e}; delivering body inline]\n\n{message}"
);
}
if let Err(e) = std::fs::write(&host_path, message) {
tracing::warn!(%agent, path = %host_path.display(), error = ?e, "reminder file_path write failed; delivering inline");
return format!(
"[reminder file_path '{req_path}' write failed: {e}; delivering body inline]\n\n{message}"
);
}
let bytes = message.len();
tracing::info!(%agent, path = %host_path.display(), bytes, "reminder body written to file");
format!("reminder body persisted to `{req_path}` ({bytes} bytes); read with your filesystem tools")
}
/// Map an agent-visible container path to the matching host path,
/// validating that it lives under the agent's own state subtree and
/// doesn't try to traverse out via `..`. Returns the host `PathBuf` on
/// success, or a human-readable reason string on rejection.
fn resolve_host_path(agent: &str, req_path: &str) -> Result<PathBuf, String> {
let prefix = format!("/agents/{agent}/state/");
let Some(rel) = req_path.strip_prefix(&prefix) else {
return Err(format!(
"must be absolute and under `{prefix}` (got `{req_path}`)"
));
};
let rel_path = Path::new(rel);
for comp in rel_path.components() {
match comp {
std::path::Component::Normal(_) => {}
other => {
return Err(format!(
"path component `{other:?}` not allowed (no traversal / absolute / root)"
));
}
}
}
Ok(Coordinator::agent_notes_dir(agent).join(rel_path))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rejects_paths_outside_agent_state() {
assert!(resolve_host_path("foo", "/etc/passwd").is_err());
assert!(resolve_host_path("foo", "/agents/bar/state/x.md").is_err());
assert!(resolve_host_path("foo", "relative.md").is_err());
}
#[test]
fn rejects_traversal() {
assert!(resolve_host_path("foo", "/agents/foo/state/../../etc/passwd").is_err());
assert!(resolve_host_path("foo", "/agents/foo/state/./x.md").is_err());
}
#[test]
fn accepts_well_formed_path() {
let p = resolve_host_path("foo", "/agents/foo/state/reminders/123.md").unwrap();
assert_eq!(
p,
PathBuf::from("/var/lib/hyperhive/agents/foo/state/reminders/123.md")
);
}
#[test]
fn prepare_body_passthrough_when_no_file_path() {
let s = prepare_body("foo", "hello world", None);
assert_eq!(s, "hello world");
}
#[test]
fn prepare_body_falls_back_inline_on_bad_path() {
let s = prepare_body("foo", "payload", Some("/etc/passwd"));
assert!(s.starts_with("[reminder file_path '/etc/passwd' rejected:"));
assert!(s.contains("payload"));
}
}