limits: unified 1 KiB cap on send/ask + reminder auto-file on overflow
This commit is contained in:
parent
753409a5ef
commit
0e6bac8388
6 changed files with 180 additions and 42 deletions
|
|
@ -152,9 +152,11 @@ pub struct RecvArgs {
|
||||||
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
|
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
|
||||||
pub struct RemindArgs {
|
pub struct RemindArgs {
|
||||||
/// Body that lands in your inbox when the reminder fires (sender
|
/// Body that lands in your inbox when the reminder fires (sender
|
||||||
/// will appear as `reminder`). Capped at 4096 bytes when
|
/// will appear as `reminder`). Soft cap at 1 KiB inline — anything
|
||||||
/// `file_path` is unset — anything bigger should be persisted to
|
/// larger gets auto-persisted to a file under
|
||||||
/// disk and pointed at via `file_path`.
|
/// `/agents/<you>/state/reminders/auto-<ts>.md` and the inbox
|
||||||
|
/// message becomes a short pointer. Pass `file_path` if you want
|
||||||
|
/// to control the destination yourself.
|
||||||
pub message: String,
|
pub message: String,
|
||||||
/// Fire `delay_seconds` from now (relative). Set this OR
|
/// Fire `delay_seconds` from now (relative). Set this OR
|
||||||
/// `at_unix_timestamp`, not both.
|
/// `at_unix_timestamp`, not both.
|
||||||
|
|
@ -288,10 +290,11 @@ impl AgentServer {
|
||||||
time (sender will appear as `reminder`). Use for self-paced follow-ups: 'check task \
|
time (sender will appear as `reminder`). Use for self-paced follow-ups: 'check task \
|
||||||
status in 60s', 'retry failed deploy at 14:00 UTC', 'nudge me when the operator's \
|
status in 60s', 'retry failed deploy at 14:00 UTC', 'nudge me when the operator's \
|
||||||
deploy window opens'. Set EXACTLY ONE of `delay_seconds` (fire N seconds from now) \
|
deploy window opens'. Set EXACTLY ONE of `delay_seconds` (fire N seconds from now) \
|
||||||
or `at_unix_timestamp` (fire at absolute epoch second). Body is capped at 4096 bytes \
|
or `at_unix_timestamp` (fire at absolute epoch second). Body soft-caps at 1 KiB \
|
||||||
when `file_path` is unset; for larger payloads write them to a file under your \
|
inline — anything larger gets auto-persisted to a file under your \
|
||||||
`/agents/<you>/state/` dir and pass the path in `file_path`. Returns immediately — \
|
`/agents/<you>/state/reminders/` dir and the inbox message becomes a short pointer; \
|
||||||
the reminder lives in the broker until due."
|
pass `file_path` if you want to control the destination yourself. Returns \
|
||||||
|
immediately — the reminder lives in the broker until due."
|
||||||
)]
|
)]
|
||||||
async fn remind(&self, Parameters(args): Parameters<RemindArgs>) -> String {
|
async fn remind(&self, Parameters(args): Parameters<RemindArgs>) -> String {
|
||||||
let log = format!("{args:?}");
|
let log = format!("{args:?}");
|
||||||
|
|
|
||||||
|
|
@ -98,6 +98,9 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
|
||||||
let broker = &coord.broker;
|
let broker = &coord.broker;
|
||||||
match req {
|
match req {
|
||||||
AgentRequest::Send { to, body } => {
|
AgentRequest::Send { to, body } => {
|
||||||
|
if let Err(message) = crate::limits::check_size("send", body) {
|
||||||
|
return AgentResponse::Err { message };
|
||||||
|
}
|
||||||
// Handle broadcast sends (recipient = "*")
|
// Handle broadcast sends (recipient = "*")
|
||||||
if to == "*" {
|
if to == "*" {
|
||||||
let errors = coord.broadcast_send(agent, body);
|
let errors = coord.broadcast_send(agent, body);
|
||||||
|
|
@ -189,6 +192,9 @@ fn handle_ask_operator(
|
||||||
multi: bool,
|
multi: bool,
|
||||||
ttl_seconds: Option<u64>,
|
ttl_seconds: Option<u64>,
|
||||||
) -> AgentResponse {
|
) -> AgentResponse {
|
||||||
|
if let Err(message) = crate::limits::check_size("question", question) {
|
||||||
|
return AgentResponse::Err { message };
|
||||||
|
}
|
||||||
let deadline_at = ttl_seconds.and_then(|s| {
|
let deadline_at = ttl_seconds.and_then(|s| {
|
||||||
let now = std::time::SystemTime::now()
|
let now = std::time::SystemTime::now()
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
|
@ -214,21 +220,6 @@ fn handle_ask_operator(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Cap on the inline `message` byte length when no `file_path` is set.
|
|
||||||
/// Reminders land in the agent's inbox and feed the next wake prompt — a
|
|
||||||
/// multi-kilobyte body bloats every subsequent turn's context. Anything
|
|
||||||
/// bigger should be persisted to disk by the caller and pointed at via
|
|
||||||
/// `file_path` (which the scheduler will deliver as a path reference rather
|
|
||||||
/// than the full body).
|
|
||||||
const REMIND_MESSAGE_MAX: usize = 4096;
|
|
||||||
|
|
||||||
/// Upper cap when `file_path` IS set. The body still lands in the
|
|
||||||
/// reminders sqlite row until delivery, so without an upper bound a
|
|
||||||
/// caller could DOS the broker DB with a single multi-megabyte
|
|
||||||
/// reminder. 64 KiB is generous for any reasonable payload + keeps a
|
|
||||||
/// single row small enough that sqlite won't choke.
|
|
||||||
const REMIND_MESSAGE_MAX_WITH_FILE: usize = 64 * 1024;
|
|
||||||
|
|
||||||
fn handle_remind(
|
fn handle_remind(
|
||||||
coord: &Arc<Coordinator>,
|
coord: &Arc<Coordinator>,
|
||||||
agent: &str,
|
agent: &str,
|
||||||
|
|
@ -236,21 +227,6 @@ fn handle_remind(
|
||||||
timing: &hive_sh4re::ReminderTiming,
|
timing: &hive_sh4re::ReminderTiming,
|
||||||
file_path: Option<&str>,
|
file_path: Option<&str>,
|
||||||
) -> AgentResponse {
|
) -> AgentResponse {
|
||||||
let (cap, hint) = match file_path {
|
|
||||||
None => (
|
|
||||||
REMIND_MESSAGE_MAX,
|
|
||||||
"; set `file_path` to persist a larger payload to a file instead",
|
|
||||||
),
|
|
||||||
Some(_) => (REMIND_MESSAGE_MAX_WITH_FILE, ""),
|
|
||||||
};
|
|
||||||
if message.len() > cap {
|
|
||||||
return AgentResponse::Err {
|
|
||||||
message: format!(
|
|
||||||
"reminder body too long ({} bytes, max {cap}){hint}",
|
|
||||||
message.len()
|
|
||||||
),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
let due_at = match resolve_due_at(timing) {
|
let due_at = match resolve_due_at(timing) {
|
||||||
Ok(t) => t,
|
Ok(t) => t,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
@ -259,7 +235,14 @@ fn handle_remind(
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
match coord.broker.store_reminder(agent, message, file_path, due_at) {
|
let (stored_message, stored_path) = match prepare_remind_storage(agent, message, file_path) {
|
||||||
|
Ok(pair) => pair,
|
||||||
|
Err(e) => return AgentResponse::Err { message: e },
|
||||||
|
};
|
||||||
|
match coord
|
||||||
|
.broker
|
||||||
|
.store_reminder(agent, &stored_message, stored_path.as_deref(), due_at)
|
||||||
|
{
|
||||||
Ok(id) => {
|
Ok(id) => {
|
||||||
tracing::info!(%id, %agent, %due_at, "reminder scheduled");
|
tracing::info!(%id, %agent, %due_at, "reminder scheduled");
|
||||||
AgentResponse::Ok
|
AgentResponse::Ok
|
||||||
|
|
@ -270,6 +253,61 @@ fn handle_remind(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Decide what we actually store in the reminders row, applying the
|
||||||
|
/// same byte cap as the rest of the wire protocol
|
||||||
|
/// ([`crate::limits::MESSAGE_MAX_BYTES`]). Three outcomes:
|
||||||
|
///
|
||||||
|
/// 1. Body within the cap → stored verbatim, with whatever `file_path`
|
||||||
|
/// the caller passed (None or Some). The scheduler honours
|
||||||
|
/// `file_path` at delivery time as before.
|
||||||
|
/// 2. Body over the cap, no caller `file_path` → auto-generate a path
|
||||||
|
/// under `/agents/<agent>/state/reminders/auto-<ts>.md`, write the
|
||||||
|
/// body to disk now, store a short pointer hint as the message and
|
||||||
|
/// clear `file_path` (so the scheduler doesn't re-write at
|
||||||
|
/// delivery and overwrite the body with the hint).
|
||||||
|
/// 3. Body over the cap, caller provided `file_path` → honour the
|
||||||
|
/// caller's path: write the body to it now, store the same hint
|
||||||
|
/// and clear `file_path` for the same reason as (2).
|
||||||
|
///
|
||||||
|
/// Returns `(stored_message, stored_file_path)` on success, or a
|
||||||
|
/// caller-ready error string on auto-save failure (which is the only
|
||||||
|
/// way a Remind request can be refused for size — the agent never has
|
||||||
|
/// to think about the cap).
|
||||||
|
fn prepare_remind_storage(
|
||||||
|
agent: &str,
|
||||||
|
message: &str,
|
||||||
|
file_path: Option<&str>,
|
||||||
|
) -> Result<(String, Option<String>), String> {
|
||||||
|
if message.len() <= crate::limits::MESSAGE_MAX_BYTES {
|
||||||
|
return Ok((message.to_owned(), file_path.map(str::to_owned)));
|
||||||
|
}
|
||||||
|
let req_path = match file_path {
|
||||||
|
Some(p) => p.to_owned(),
|
||||||
|
None => auto_reminder_path(agent),
|
||||||
|
};
|
||||||
|
let host_path = crate::reminder_scheduler::resolve_host_path(agent, &req_path)
|
||||||
|
.map_err(|reason| format!("auto-save path `{req_path}` rejected: {reason}"))?;
|
||||||
|
crate::reminder_scheduler::write_payload(agent, &host_path, message)
|
||||||
|
.map_err(|reason| format!("auto-save of large reminder body to `{req_path}` failed: {reason}"))?;
|
||||||
|
let hint = format!(
|
||||||
|
"[reminder body of {} bytes auto-saved to `{req_path}`; read with your filesystem tools]",
|
||||||
|
message.len()
|
||||||
|
);
|
||||||
|
Ok((hint, None))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generate a per-agent path for an auto-saved reminder body. Uses
|
||||||
|
/// `unix_nanos` plus the agent name to keep collisions infinitesimal
|
||||||
|
/// across the agent's own state subtree (we're not stamping a hostname
|
||||||
|
/// since hive-c0re is single-host).
|
||||||
|
fn auto_reminder_path(agent: &str) -> String {
|
||||||
|
let ts_ns = std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.map(|d| d.as_nanos())
|
||||||
|
.unwrap_or(0);
|
||||||
|
format!("/agents/{agent}/state/reminders/auto-{ts_ns}.md")
|
||||||
|
}
|
||||||
|
|
||||||
/// Resolve the `due_at` unix timestamp for a Remind request. Returns
|
/// Resolve the `due_at` unix timestamp for a Remind request. Returns
|
||||||
/// distinct error messages for each failure mode (overflow on
|
/// distinct error messages for each failure mode (overflow on
|
||||||
/// `InSeconds`, pre-epoch clock, `i64` cast wrap) so the caller can tell
|
/// `InSeconds`, pre-epoch clock, `i64` cast wrap) so the caller can tell
|
||||||
|
|
@ -293,3 +331,30 @@ fn resolve_due_at(timing: &hive_sh4re::ReminderTiming) -> anyhow::Result<i64> {
|
||||||
ReminderTiming::At { unix_timestamp } => Ok(*unix_timestamp),
|
ReminderTiming::At { unix_timestamp } => Ok(*unix_timestamp),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn auto_reminder_path_format() {
|
||||||
|
let p = auto_reminder_path("damocles");
|
||||||
|
assert!(p.starts_with("/agents/damocles/state/reminders/auto-"));
|
||||||
|
assert!(p.ends_with(".md"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn prepare_remind_storage_passthrough_under_cap() {
|
||||||
|
let (msg, fp) = prepare_remind_storage("foo", "small body", None).unwrap();
|
||||||
|
assert_eq!(msg, "small body");
|
||||||
|
assert_eq!(fp, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn prepare_remind_storage_passthrough_with_caller_file_path() {
|
||||||
|
let (msg, fp) =
|
||||||
|
prepare_remind_storage("foo", "small", Some("/agents/foo/state/x.md")).unwrap();
|
||||||
|
assert_eq!(msg, "small");
|
||||||
|
assert_eq!(fp.as_deref(), Some("/agents/foo/state/x.md"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
61
hive-c0re/src/limits.rs
Normal file
61
hive-c0re/src/limits.rs
Normal file
|
|
@ -0,0 +1,61 @@
|
||||||
|
//! Wire-protocol size limits shared across the agent + manager
|
||||||
|
//! sockets. Caps on inline message bodies stop a single chatty agent
|
||||||
|
//! (or a misbehaving extra-MCP server) from flooding the broker
|
||||||
|
//! sqlite with megabyte-sized rows that then bloat every recipient's
|
||||||
|
//! wake-prompt context. Anything genuinely larger should be written
|
||||||
|
//! to a state file and the path sent as the body.
|
||||||
|
//!
|
||||||
|
//! Reminders get a separate auto-file escape hatch (see
|
||||||
|
//! `agent_server::handle_remind`) so callers don't have to think
|
||||||
|
//! about it — oversized reminder bodies get persisted to disk
|
||||||
|
//! transparently and the inbox sees a pointer.
|
||||||
|
|
||||||
|
/// Per-message body cap. Applies to `send`, `ask_operator` question
|
||||||
|
/// text, and the stored inline form of a reminder. 1 KiB is small
|
||||||
|
/// enough that 100 unread messages don't dominate a wake prompt,
|
||||||
|
/// large enough for routine cross-agent chatter.
|
||||||
|
pub const MESSAGE_MAX_BYTES: usize = 1024;
|
||||||
|
|
||||||
|
/// Validate that `body` fits under [`MESSAGE_MAX_BYTES`]. Returns a
|
||||||
|
/// caller-ready error string (caller wraps in
|
||||||
|
/// `AgentResponse::Err`/`ManagerResponse::Err`) on failure.
|
||||||
|
///
|
||||||
|
/// `label` shows up in the error message verbatim — pass a short
|
||||||
|
/// noun like `"send"`, `"question"`, `"broadcast"` so the model can
|
||||||
|
/// tell which call got rejected.
|
||||||
|
pub fn check_size(label: &str, body: &str) -> Result<(), String> {
|
||||||
|
if body.len() > MESSAGE_MAX_BYTES {
|
||||||
|
Err(format!(
|
||||||
|
"{label} body too long ({} bytes, max {MESSAGE_MAX_BYTES}); write the \
|
||||||
|
payload to a file under your `/agents/<you>/state/` dir and send the \
|
||||||
|
path as the body instead",
|
||||||
|
body.len()
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn accepts_short_body() {
|
||||||
|
assert!(check_size("send", "hello").is_ok());
|
||||||
|
assert!(check_size("send", &"x".repeat(MESSAGE_MAX_BYTES)).is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rejects_oversize_body() {
|
||||||
|
let err = check_size("send", &"x".repeat(MESSAGE_MAX_BYTES + 1)).unwrap_err();
|
||||||
|
assert!(err.contains("send body too long"));
|
||||||
|
assert!(err.contains(&format!("max {MESSAGE_MAX_BYTES}")));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn label_threads_through() {
|
||||||
|
let err = check_size("question", &"x".repeat(MESSAGE_MAX_BYTES + 1)).unwrap_err();
|
||||||
|
assert!(err.starts_with("question body too long"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -17,6 +17,7 @@ mod dashboard;
|
||||||
mod events_vacuum;
|
mod events_vacuum;
|
||||||
mod forge;
|
mod forge;
|
||||||
mod lifecycle;
|
mod lifecycle;
|
||||||
|
mod limits;
|
||||||
mod manager_server;
|
mod manager_server;
|
||||||
mod meta;
|
mod meta;
|
||||||
mod migrate;
|
mod migrate;
|
||||||
|
|
|
||||||
|
|
@ -86,6 +86,9 @@ fn manager_recv_timeout(wait_seconds: Option<u64>) -> std::time::Duration {
|
||||||
async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResponse {
|
async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResponse {
|
||||||
match req {
|
match req {
|
||||||
ManagerRequest::Send { to, body } => {
|
ManagerRequest::Send { to, body } => {
|
||||||
|
if let Err(message) = crate::limits::check_size("send", body) {
|
||||||
|
return ManagerResponse::Err { message };
|
||||||
|
}
|
||||||
if to == "*" {
|
if to == "*" {
|
||||||
let errors = coord.broadcast_send(MANAGER_AGENT, body);
|
let errors = coord.broadcast_send(MANAGER_AGENT, body);
|
||||||
if errors.is_empty() {
|
if errors.is_empty() {
|
||||||
|
|
@ -247,6 +250,9 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResp
|
||||||
multi,
|
multi,
|
||||||
ttl_seconds,
|
ttl_seconds,
|
||||||
} => {
|
} => {
|
||||||
|
if let Err(message) = crate::limits::check_size("question", question) {
|
||||||
|
return ManagerResponse::Err { message };
|
||||||
|
}
|
||||||
tracing::info!(%question, ?options, multi, ?ttl_seconds, "manager: ask_operator");
|
tracing::info!(%question, ?options, multi, ?ttl_seconds, "manager: ask_operator");
|
||||||
let deadline_at = ttl_seconds.and_then(|s| {
|
let deadline_at = ttl_seconds.and_then(|s| {
|
||||||
let now = std::time::SystemTime::now()
|
let now = std::time::SystemTime::now()
|
||||||
|
|
|
||||||
|
|
@ -121,8 +121,9 @@ fn inline_fallback(req_path: &str, reason: &str, message: &str) -> String {
|
||||||
/// Persist `message` to `host_path` with the symlink-escape defenses
|
/// Persist `message` to `host_path` with the symlink-escape defenses
|
||||||
/// described in the module docs. Returns `Ok(())` on success, or a
|
/// described in the module docs. Returns `Ok(())` on success, or a
|
||||||
/// human-readable reason string on any failure (caller logs +
|
/// human-readable reason string on any failure (caller logs +
|
||||||
/// inline-falls-back).
|
/// inline-falls-back). `pub` because `agent_server::handle_remind`
|
||||||
fn write_payload(agent: &str, host_path: &Path, message: &str) -> Result<(), String> {
|
/// reuses it for the at-remind-time auto-file path.
|
||||||
|
pub fn write_payload(agent: &str, host_path: &Path, message: &str) -> Result<(), String> {
|
||||||
let Some(parent) = host_path.parent() else {
|
let Some(parent) = host_path.parent() else {
|
||||||
return Err("internal: host path has no parent".to_owned());
|
return Err("internal: host path has no parent".to_owned());
|
||||||
};
|
};
|
||||||
|
|
@ -168,8 +169,9 @@ fn write_payload(agent: &str, host_path: &Path, message: &str) -> Result<(), Str
|
||||||
/// validating that it lives under the agent's own state subtree, has
|
/// validating that it lives under the agent's own state subtree, has
|
||||||
/// a non-empty relative tail, and doesn't try to traverse out via
|
/// a non-empty relative tail, and doesn't try to traverse out via
|
||||||
/// `..`. Returns the host `PathBuf` on success, or a human-readable
|
/// `..`. Returns the host `PathBuf` on success, or a human-readable
|
||||||
/// reason string on rejection.
|
/// reason string on rejection. `pub` so `agent_server::handle_remind`
|
||||||
fn resolve_host_path(agent: &str, req_path: &str) -> Result<PathBuf, String> {
|
/// can reuse it for the at-remind-time auto-file path.
|
||||||
|
pub fn resolve_host_path(agent: &str, req_path: &str) -> Result<PathBuf, String> {
|
||||||
let prefix = format!("/agents/{agent}/state/");
|
let prefix = format!("/agents/{agent}/state/");
|
||||||
let Some(rel) = req_path.strip_prefix(&prefix) else {
|
let Some(rel) = req_path.strip_prefix(&prefix) else {
|
||||||
return Err(format!(
|
return Err(format!(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue