agent_server: reminder body size cap + extract Remind/AskOperator handlers

This commit is contained in:
damocles 2026-05-17 02:55:08 +02:00
parent dba3badeae
commit 271c524e66
2 changed files with 106 additions and 82 deletions

View file

@ -11,9 +11,10 @@
## Reminder Tool ## Reminder Tool
- Handle text overflow → suggest file_path option for long messages - ~~Handle text overflow → suggest file_path option for long messages~~ ✓ fixed — Remind dispatch rejects `message.len() > 4096` (when no `file_path` was supplied) with an error pointing at the `file_path` escape hatch.
- Per-agent reminder limits (burst capacity, rate limiting) - Per-agent reminder limits (burst capacity, rate limiting)
- **File path delivery**: currently unused in scheduler delivery loop — implement file write/delivery to /state/<agent>/reminders/ or similar - **Expose `remind` MCP tool**: wire protocol exists (`AgentRequest::Remind`) and the broker handles it, but no `#[tool]` method on `AgentServer` actually surfaces it to claude. Until that lands, the Remind path is unreachable from agent turns.
- **File path delivery**: currently unused in scheduler delivery loop — implement file write/delivery to /state/<agent>/reminders/ or similar (also needed for the overflow-check escape hatch above to actually do anything useful).
- ~~**Orphan reminders**~~ ✓ fixed — `Broker::deliver_reminder` wraps the inbox INSERT + reminders UPDATE in one sqlite transaction; partial failure can no longer cause duplicate delivery on the next tick. - ~~**Orphan reminders**~~ ✓ fixed — `Broker::deliver_reminder` wraps the inbox INSERT + reminders UPDATE in one sqlite transaction; partial failure can no longer cause duplicate delivery on the next tick.
- ~~**Unbounded batches**~~ ✓ fixed — scheduler now calls `get_due_reminders(REMINDER_BATCH_LIMIT)` (cap = 100/tick); overflow stays due and gets picked up next cycle. - ~~**Unbounded batches**~~ ✓ fixed — scheduler now calls `get_due_reminders(REMINDER_BATCH_LIMIT)` (cap = 100/tick); overflow stays due and gets picked up next cycle.
- **Scheduler shutdown**: add graceful shutdown signal when coordinator is destroyed (currently runs forever) - **Scheduler shutdown**: add graceful shutdown signal when coordinator is destroyed (currently runs forever)

View file

@ -172,7 +172,23 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
options, options,
multi, multi,
ttl_seconds, ttl_seconds,
} => { } => handle_ask_operator(coord, agent, question, options, *multi, *ttl_seconds),
AgentRequest::Remind {
message,
timing,
file_path,
} => handle_remind(broker, agent, message, timing, file_path.as_deref()),
}
}
fn handle_ask_operator(
coord: &Arc<Coordinator>,
agent: &str,
question: &str,
options: &[String],
multi: bool,
ttl_seconds: Option<u64>,
) -> AgentResponse {
let deadline_at = ttl_seconds.and_then(|s| { let deadline_at = ttl_seconds.and_then(|s| {
let now = std::time::SystemTime::now() let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
@ -183,11 +199,11 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
}); });
match coord match coord
.questions .questions
.submit(agent, question, options, *multi, deadline_at) .submit(agent, question, options, multi, deadline_at)
{ {
Ok(id) => { Ok(id) => {
tracing::info!(%id, %agent, ?deadline_at, "agent question queued"); tracing::info!(%id, %agent, ?deadline_at, "agent question queued");
if let Some(ttl) = *ttl_seconds { if let Some(ttl) = ttl_seconds {
crate::manager_server::spawn_question_watchdog(coord, id, ttl); crate::manager_server::spawn_question_watchdog(coord, id, ttl);
} }
AgentResponse::QuestionQueued { id } AgentResponse::QuestionQueued { id }
@ -196,52 +212,42 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
message: format!("{e:#}"), message: format!("{e:#}"),
}, },
} }
} }
AgentRequest::Remind {
message,
timing,
file_path,
} => {
use hive_sh4re::ReminderTiming;
// Calculate the due_at timestamp, propagating errors instead of silently /// Cap on the inline `message` byte length the Remind request accepts.
// defaulting to epoch 1970 on overflow/conversion failure. /// Reminders land in the agent's inbox and feed the next wake prompt — a
let due_at_result: Result<i64> = match timing { /// multi-kilobyte body bloats every subsequent turn's context. Anything
ReminderTiming::InSeconds { seconds } => { /// bigger should be persisted to disk by the caller and pointed at via
let now = std::time::SystemTime::now(); /// `file_path` (which the scheduler will deliver as a path reference rather
let future = match now.checked_add(std::time::Duration::from_secs(*seconds)) { /// than the full body).
Some(t) => t, const REMIND_MESSAGE_MAX: usize = 4096;
None => {
fn handle_remind(
broker: &crate::broker::Broker,
agent: &str,
message: &str,
timing: &hive_sh4re::ReminderTiming,
file_path: Option<&str>,
) -> AgentResponse {
if file_path.is_none() && message.len() > REMIND_MESSAGE_MAX {
return AgentResponse::Err { return AgentResponse::Err {
message: format!( message: format!(
"InSeconds overflow: {seconds}s exceeds system time range" "reminder body too long ({} bytes, max {REMIND_MESSAGE_MAX}); write the \
payload to a file under your /state/ dir and pass its path as \
`file_path` so the reminder delivers a pointer instead of the full body",
message.len()
), ),
}; };
} }
}; let due_at = match resolve_due_at(timing) {
let duration = match future.duration_since(std::time::UNIX_EPOCH) { Ok(t) => t,
Ok(d) => d,
Err(e) => { Err(e) => {
return AgentResponse::Err { return AgentResponse::Err {
message: format!("system time before UNIX_EPOCH: {e}"), message: format!("invalid reminder timing: {e:#}"),
}; };
} }
}; };
match i64::try_from(duration.as_secs()) { match broker.store_reminder(agent, message, file_path, due_at) {
Ok(ts) => Ok(ts),
Err(e) => {
return AgentResponse::Err {
message: format!("unix timestamp exceeds i64 range: {e}"),
};
}
}
}
ReminderTiming::At { unix_timestamp } => Ok(*unix_timestamp),
};
match due_at_result {
Ok(due_at) => {
match broker.store_reminder(agent, message, file_path.as_deref(), due_at) {
Ok(id) => { Ok(id) => {
tracing::info!(%id, %agent, %due_at, "reminder scheduled"); tracing::info!(%id, %agent, %due_at, "reminder scheduled");
AgentResponse::Ok AgentResponse::Ok
@ -250,11 +256,28 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
message: format!("failed to store reminder: {e:#}"), message: format!("failed to store reminder: {e:#}"),
}, },
} }
}
/// Resolve the `due_at` unix timestamp for a Remind request. Returns
/// distinct error messages for each failure mode (overflow on
/// `InSeconds`, pre-epoch clock, `i64` cast wrap) so the caller can tell
/// what went wrong without inspecting the chain.
fn resolve_due_at(timing: &hive_sh4re::ReminderTiming) -> anyhow::Result<i64> {
use hive_sh4re::ReminderTiming;
match timing {
ReminderTiming::InSeconds { seconds } => {
let now = std::time::SystemTime::now();
let future = now
.checked_add(std::time::Duration::from_secs(*seconds))
.ok_or_else(|| {
anyhow::anyhow!("InSeconds overflow: {seconds}s exceeds system time range")
})?;
let duration = future
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| anyhow::anyhow!("system time before UNIX_EPOCH: {e}"))?;
i64::try_from(duration.as_secs())
.map_err(|e| anyhow::anyhow!("unix timestamp exceeds i64 range: {e}"))
} }
Err(e) => AgentResponse::Err { ReminderTiming::At { unix_timestamp } => Ok(*unix_timestamp),
message: format!("invalid reminder timing: {e:#}"),
},
}
}
} }
} }