//! Background loop that drains due reminders out of the broker and //! delivers them as inbox messages. Mirrors the `events_vacuum` / //! `crash_watch` shape — a single `spawn(coord)` entry point started //! from `main.rs`. //! //! File-path semantics: a reminder may carry a `file_path` (the //! agent-visible path inside its container). On delivery we: //! //! - Translate the container path (`/agents//state/foo.md`) to //! the host path (`/var/lib/hyperhive/agents//state/foo.md`) //! so hive-c0re can write to it from outside the container. //! - Reject anything that isn't under the agent's own state subtree, //! contains `..` (path traversal), or has an empty relative tail. //! Falling outside the allowed prefix means the file write is //! skipped and the original message is delivered inline (with a //! noted warning) — the reminder still fires, just without the //! payload split. //! - Defend against symlink escape: after `create_dir_all`, the //! parent dir is canonicalized and re-verified to live under the //! agent's host state root. Then we open the final file with //! `O_NOFOLLOW | O_CREAT | O_TRUNC` so an existing-symlink basename //! can't redirect the write either. Without this an agent could //! `ln -s /etc /agents/foo/state/escape` and bounce a write to an //! arbitrary host path. //! - Write the reminder body to disk and deliver a short pointer //! message in its place, so the agent's inbox/wake-prompt stays //! small and the bulky payload can be read out of band. //! //! Atomicity of the inbox INSERT + `reminders.sent_at` UPDATE is handled //! inside `Broker::deliver_reminder`; this module only computes the //! body string before calling it. use std::io::Write; use std::os::unix::fs::OpenOptionsExt; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use crate::coordinator::Coordinator; /// Per-tick cap on reminders delivered. Anything over this stays due /// in the table and gets picked up on the next tick — keeps a /// 10k-deep backlog from flooding the broker (or hogging the broker /// mutex) in one shot. 100/tick × 5s tick = sustained throughput cap /// of ~20 reminders/sec; bump together if the loose-ends tracker /// starts firing higher rates. const REMINDER_BATCH_LIMIT: u64 = 100; /// Poll interval. Trade-off between latency on a freshly due reminder /// and CPU spent on empty sweeps; 5s matches the original inline /// scheduler. const POLL_INTERVAL: Duration = Duration::from_secs(5); pub fn spawn(coord: Arc) { let mut shutdown = coord.shutdown_rx(); tokio::spawn(async move { loop { tick(&coord); tokio::select! { _ = tokio::time::sleep(POLL_INTERVAL) => {} _ = shutdown.changed() => { tracing::info!("reminder scheduler: shutdown signal received"); break; } } } }); } fn tick(coord: &Arc) { let due = match coord.broker.get_due_reminders(REMINDER_BATCH_LIMIT) { Ok(rows) => rows, Err(e) => { tracing::warn!(error = ?e, "failed to query due reminders"); return; } }; for (agent, id, message, file_path) in due { let body = prepare_body(&agent, &message, file_path.as_deref()); if let Err(e) = coord.broker.deliver_reminder(id, &agent, &body) { let reason = format!("{e:#}"); tracing::warn!( reminder_id = id, %agent, error = %reason, "failed to deliver reminder" ); // Persist the failure so the dashboard can surface it + // bump attempt_count. After MAX_REMINDER_ATTEMPTS the // row drops out of `get_due_reminders` and waits for // operator retry / cancel. if let Err(persist_err) = coord.broker.record_reminder_failure(id, &reason) { tracing::warn!( reminder_id = id, error = ?persist_err, "failed to persist reminder failure" ); } } } } /// Build the inbox body for a due reminder. When `file_path` is None /// the body is the original message verbatim. When set, we attempt to /// persist the message body to the requested file and return a short /// pointer string instead. Failures (bad prefix, symlink escape, /// write error, missing parent) fall back to inline delivery with a /// noted warning so the reminder still fires. fn prepare_body(agent: &str, message: &str, file_path: Option<&str>) -> String { let Some(req_path) = file_path else { return message.to_owned(); }; let host_path = match resolve_host_path(agent, req_path) { Ok(p) => p, Err(reason) => { tracing::warn!(%agent, %req_path, %reason, "reminder file_path rejected; delivering inline"); return inline_fallback(req_path, &format!("rejected: {reason}"), message); } }; match write_payload(agent, &host_path, message) { Ok(()) => { let bytes = message.len(); // debug! not info! — under load this would dominate the log. tracing::debug!(%agent, path = %host_path.display(), bytes, "reminder body written to file"); format!( "reminder body persisted to `{req_path}` ({bytes} bytes); read with your filesystem tools" ) } Err(reason) => { tracing::warn!(%agent, path = %host_path.display(), %reason, "reminder file_path write failed; delivering inline"); inline_fallback(req_path, &reason, message) } } } fn inline_fallback(req_path: &str, reason: &str, message: &str) -> String { format!("[reminder file_path '{req_path}' {reason}; delivering body inline]\n\n{message}") } /// Persist `message` to `host_path` with the symlink-escape defenses /// described in the module docs. Returns `Ok(())` on success, or a /// human-readable reason string on any failure (caller logs + /// inline-falls-back). `pub` because `agent_server::handle_remind` /// reuses it for the at-remind-time auto-file path. pub fn write_payload(agent: &str, host_path: &Path, message: &str) -> Result<(), String> { let Some(parent) = host_path.parent() else { return Err("internal: host path has no parent".to_owned()); }; std::fs::create_dir_all(parent) .map_err(|e| format!("parent dir create failed: {e}"))?; // Resolve symlinks in the parent chain, then re-verify the // canonical form still lives under the agent's host state root — // catches `ln -s /etc state/escape` style attacks. let parent_canonical = parent .canonicalize() .map_err(|e| format!("parent canonicalize failed: {e}"))?; let agent_root = Coordinator::agent_notes_dir(agent) .canonicalize() .map_err(|e| format!("agent state root canonicalize failed: {e}"))?; if !parent_canonical.starts_with(&agent_root) { return Err(format!( "symlink escape: canonical parent `{}` outside agent root `{}`", parent_canonical.display(), agent_root.display() )); } let basename = host_path .file_name() .ok_or_else(|| "missing basename".to_owned())?; let target = parent_canonical.join(basename); // O_NOFOLLOW on the final component refuses to open if the // basename is itself an existing symlink. Combined with the // canonicalize-parent check above, no symlink anywhere in the // path can redirect the write. let mut file = std::fs::OpenOptions::new() .write(true) .create(true) .truncate(true) .custom_flags(libc::O_NOFOLLOW) .open(&target) .map_err(|e| format!("open failed: {e}"))?; file.write_all(message.as_bytes()) .map_err(|e| format!("write failed: {e}"))?; Ok(()) } /// Container-visible state prefix the caller's `file_path` must live /// under. Sub-agents see their state at `/agents//state/`; /// the manager keeps the legacy `/state/` mount (see /// `lifecycle::set_nspawn_flags`). Auto-file paths use the same /// prefix so the round-trip is symmetric. #[must_use] pub fn container_state_prefix(agent: &str) -> String { if agent == hive_sh4re::MANAGER_AGENT { "/state/".to_owned() } else { format!("/agents/{agent}/state/") } } /// Map an agent-visible container path to the matching host path, /// validating that it lives under the agent's own state subtree, has /// a non-empty relative tail, and doesn't try to traverse out via /// `..`. Returns the host `PathBuf` on success, or a human-readable /// reason string on rejection. `pub` so `agent_server::handle_remind` /// can reuse it for the at-remind-time auto-file path. pub fn resolve_host_path(agent: &str, req_path: &str) -> Result { let prefix = container_state_prefix(agent); let Some(rel) = req_path.strip_prefix(&prefix) else { return Err(format!( "must be absolute and under `{prefix}` (got `{req_path}`)" )); }; if rel.is_empty() { return Err("file_path must include a filename, not just the state dir".to_owned()); } let rel_path = Path::new(rel); for comp in rel_path.components() { match comp { std::path::Component::Normal(_) => {} other => { return Err(format!( "path component `{other:?}` not allowed (no traversal / absolute / root)" )); } } } Ok(Coordinator::agent_notes_dir(agent).join(rel_path)) } #[cfg(test)] mod tests { use super::*; #[test] fn rejects_paths_outside_agent_state() { assert!(resolve_host_path("foo", "/etc/passwd").is_err()); assert!(resolve_host_path("foo", "/agents/bar/state/x.md").is_err()); assert!(resolve_host_path("foo", "relative.md").is_err()); } #[test] fn rejects_traversal() { assert!(resolve_host_path("foo", "/agents/foo/state/../../etc/passwd").is_err()); assert!(resolve_host_path("foo", "/agents/foo/state/./x.md").is_err()); } #[test] fn rejects_empty_relative_tail() { // Trailing slash → empty tail. Used to fall through to // create_dir_all + write-to-dir → confusing inline fallback; // explicit reject gives a cleaner log. let err = resolve_host_path("foo", "/agents/foo/state/").unwrap_err(); assert!(err.contains("must include a filename"), "got: {err}"); } #[test] fn accepts_well_formed_path() { let p = resolve_host_path("foo", "/agents/foo/state/reminders/123.md").unwrap(); assert_eq!( p, PathBuf::from("/var/lib/hyperhive/agents/foo/state/reminders/123.md") ); } #[test] fn manager_uses_legacy_state_prefix() { // The manager container mounts its state at `/state/` (legacy), // not `/agents/manager/state/`. Same host path; different // container-visible path. resolve_host_path needs to know. assert_eq!(container_state_prefix("manager"), "/state/"); let p = resolve_host_path("manager", "/state/reminders/x.md").unwrap(); assert_eq!( p, PathBuf::from("/var/lib/hyperhive/agents/manager/state/reminders/x.md") ); // And the sub-agent prefix must NOT be accepted for the manager. assert!(resolve_host_path("manager", "/agents/manager/state/x.md").is_err()); } #[test] fn prepare_body_passthrough_when_no_file_path() { let s = prepare_body("foo", "hello world", None); assert_eq!(s, "hello world"); } #[test] fn prepare_body_falls_back_inline_on_bad_path() { let s = prepare_body("foo", "payload", Some("/etc/passwd")); assert!(s.starts_with("[reminder file_path '/etc/passwd' rejected:")); assert!(s.contains("payload")); } }