From 271c524e66ddf72192325e153f3726e83ca60cb0 Mon Sep 17 00:00:00 2001 From: damocles Date: Sun, 17 May 2026 02:55:08 +0200 Subject: [PATCH] agent_server: reminder body size cap + extract Remind/AskOperator handlers --- TODO.md | 5 +- hive-c0re/src/agent_server.rs | 183 +++++++++++++++++++--------------- 2 files changed, 106 insertions(+), 82 deletions(-) diff --git a/TODO.md b/TODO.md index 835303e..f7a3c6c 100644 --- a/TODO.md +++ b/TODO.md @@ -11,9 +11,10 @@ ## Reminder Tool -- Handle text overflow → suggest file_path option for long messages +- ~~Handle text overflow → suggest file_path option for long messages~~ ✓ fixed — Remind dispatch rejects `message.len() > 4096` (when no `file_path` was supplied) with an error pointing at the `file_path` escape hatch. - Per-agent reminder limits (burst capacity, rate limiting) -- **File path delivery**: currently unused in scheduler delivery loop — implement file write/delivery to /state//reminders/ or similar +- **Expose `remind` MCP tool**: wire protocol exists (`AgentRequest::Remind`) and the broker handles it, but no `#[tool]` method on `AgentServer` actually surfaces it to claude. Until that lands, the Remind path is unreachable from agent turns. +- **File path delivery**: currently unused in scheduler delivery loop — implement file write/delivery to /state//reminders/ or similar (also needed for the overflow-check escape hatch above to actually do anything useful). - ~~**Orphan reminders**~~ ✓ fixed — `Broker::deliver_reminder` wraps the inbox INSERT + reminders UPDATE in one sqlite transaction; partial failure can no longer cause duplicate delivery on the next tick. - ~~**Unbounded batches**~~ ✓ fixed — scheduler now calls `get_due_reminders(REMINDER_BATCH_LIMIT)` (cap = 100/tick); overflow stays due and gets picked up next cycle. - **Scheduler shutdown**: add graceful shutdown signal when coordinator is destroyed (currently runs forever) diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index 8ba71fc..e90da40 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -172,89 +172,112 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> options, multi, ttl_seconds, - } => { - let deadline_at = ttl_seconds.and_then(|s| { - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .ok() - .and_then(|d| i64::try_from(d.as_secs()).ok()) - .unwrap_or(0); - i64::try_from(s).ok().map(|s| now + s) - }); - match coord - .questions - .submit(agent, question, options, *multi, deadline_at) - { - Ok(id) => { - tracing::info!(%id, %agent, ?deadline_at, "agent question queued"); - if let Some(ttl) = *ttl_seconds { - crate::manager_server::spawn_question_watchdog(coord, id, ttl); - } - AgentResponse::QuestionQueued { id } - } - Err(e) => AgentResponse::Err { - message: format!("{e:#}"), - }, - } - } + } => handle_ask_operator(coord, agent, question, options, *multi, *ttl_seconds), AgentRequest::Remind { message, timing, file_path, - } => { - use hive_sh4re::ReminderTiming; - - // Calculate the due_at timestamp, propagating errors instead of silently - // defaulting to epoch 1970 on overflow/conversion failure. - let due_at_result: Result = match timing { - ReminderTiming::InSeconds { seconds } => { - let now = std::time::SystemTime::now(); - let future = match now.checked_add(std::time::Duration::from_secs(*seconds)) { - Some(t) => t, - None => { - return AgentResponse::Err { - message: format!( - "InSeconds overflow: {seconds}s exceeds system time range" - ), - }; - } - }; - let duration = match future.duration_since(std::time::UNIX_EPOCH) { - Ok(d) => d, - Err(e) => { - return AgentResponse::Err { - message: format!("system time before UNIX_EPOCH: {e}"), - }; - } - }; - match i64::try_from(duration.as_secs()) { - Ok(ts) => Ok(ts), - Err(e) => { - return AgentResponse::Err { - message: format!("unix timestamp exceeds i64 range: {e}"), - }; - } - } - } - ReminderTiming::At { unix_timestamp } => Ok(*unix_timestamp), - }; - - match due_at_result { - Ok(due_at) => { - match broker.store_reminder(agent, message, file_path.as_deref(), due_at) { - Ok(id) => { - tracing::info!(%id, %agent, %due_at, "reminder scheduled"); - AgentResponse::Ok - } - Err(e) => AgentResponse::Err { - message: format!("failed to store reminder: {e:#}"), - }, - } - } - Err(e) => AgentResponse::Err { - message: format!("invalid reminder timing: {e:#}"), - }, - } - } + } => handle_remind(broker, agent, message, timing, file_path.as_deref()), + } +} + +fn handle_ask_operator( + coord: &Arc, + agent: &str, + question: &str, + options: &[String], + multi: bool, + ttl_seconds: Option, +) -> AgentResponse { + let deadline_at = ttl_seconds.and_then(|s| { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + .unwrap_or(0); + i64::try_from(s).ok().map(|s| now + s) + }); + match coord + .questions + .submit(agent, question, options, multi, deadline_at) + { + Ok(id) => { + tracing::info!(%id, %agent, ?deadline_at, "agent question queued"); + if let Some(ttl) = ttl_seconds { + crate::manager_server::spawn_question_watchdog(coord, id, ttl); + } + AgentResponse::QuestionQueued { id } + } + Err(e) => AgentResponse::Err { + message: format!("{e:#}"), + }, + } +} + +/// Cap on the inline `message` byte length the Remind request accepts. +/// Reminders land in the agent's inbox and feed the next wake prompt — a +/// multi-kilobyte body bloats every subsequent turn's context. Anything +/// bigger should be persisted to disk by the caller and pointed at via +/// `file_path` (which the scheduler will deliver as a path reference rather +/// than the full body). +const REMIND_MESSAGE_MAX: usize = 4096; + +fn handle_remind( + broker: &crate::broker::Broker, + agent: &str, + message: &str, + timing: &hive_sh4re::ReminderTiming, + file_path: Option<&str>, +) -> AgentResponse { + if file_path.is_none() && message.len() > REMIND_MESSAGE_MAX { + return AgentResponse::Err { + message: format!( + "reminder body too long ({} bytes, max {REMIND_MESSAGE_MAX}); write the \ + payload to a file under your /state/ dir and pass its path as \ + `file_path` so the reminder delivers a pointer instead of the full body", + message.len() + ), + }; + } + let due_at = match resolve_due_at(timing) { + Ok(t) => t, + Err(e) => { + return AgentResponse::Err { + message: format!("invalid reminder timing: {e:#}"), + }; + } + }; + match broker.store_reminder(agent, message, file_path, due_at) { + Ok(id) => { + tracing::info!(%id, %agent, %due_at, "reminder scheduled"); + AgentResponse::Ok + } + Err(e) => AgentResponse::Err { + message: format!("failed to store reminder: {e:#}"), + }, + } +} + +/// Resolve the `due_at` unix timestamp for a Remind request. Returns +/// distinct error messages for each failure mode (overflow on +/// `InSeconds`, pre-epoch clock, `i64` cast wrap) so the caller can tell +/// what went wrong without inspecting the chain. +fn resolve_due_at(timing: &hive_sh4re::ReminderTiming) -> anyhow::Result { + use hive_sh4re::ReminderTiming; + match timing { + ReminderTiming::InSeconds { seconds } => { + let now = std::time::SystemTime::now(); + let future = now + .checked_add(std::time::Duration::from_secs(*seconds)) + .ok_or_else(|| { + anyhow::anyhow!("InSeconds overflow: {seconds}s exceeds system time range") + })?; + let duration = future + .duration_since(std::time::UNIX_EPOCH) + .map_err(|e| anyhow::anyhow!("system time before UNIX_EPOCH: {e}"))?; + i64::try_from(duration.as_secs()) + .map_err(|e| anyhow::anyhow!("unix timestamp exceeds i64 range: {e}")) + } + ReminderTiming::At { unix_timestamp } => Ok(*unix_timestamp), } }