fix reminder tool issues: error on time overflow, optimize scheduler query
This commit is contained in:
parent
bc27113967
commit
24eec69418
4 changed files with 90 additions and 48 deletions
6
TODO.md
6
TODO.md
|
|
@ -10,8 +10,14 @@
|
||||||
|
|
||||||
- Handle text overflow → suggest file_path option for long messages
|
- Handle text overflow → suggest file_path option for long messages
|
||||||
- Per-agent reminder limits (burst capacity, rate limiting)
|
- Per-agent reminder limits (burst capacity, rate limiting)
|
||||||
|
- **File path delivery**: currently unused in scheduler delivery loop — implement file write/delivery to /state/<agent>/reminders/ or similar
|
||||||
|
- **Orphan reminders**: handle partial failures (e.g. delivery succeeds but mark_reminder_sent fails) to avoid resending
|
||||||
|
- **Unbounded batches**: implement per-cycle delivery limit so burst of 10k reminders doesn't flood the broker in one cycle
|
||||||
|
- **Scheduler shutdown**: add graceful shutdown signal when coordinator is destroyed (currently runs forever)
|
||||||
|
- **DB lock contention**: under high reminder volume, many concurrent mark_reminder_sent calls may serialize behind the Mutex lock — consider batch updates
|
||||||
|
|
||||||
## Dashboard
|
## Dashboard
|
||||||
|
|
||||||
- Per-agent reminder status (pending, delivered)
|
- Per-agent reminder status (pending, delivered)
|
||||||
- Reminder query interface for debugging
|
- Reminder query interface for debugging
|
||||||
|
- Display reminder delivery errors (failed sends, mark failures)
|
||||||
|
|
|
||||||
|
|
@ -194,26 +194,48 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
|
||||||
file_path,
|
file_path,
|
||||||
} => {
|
} => {
|
||||||
use hive_sh4re::ReminderTiming;
|
use hive_sh4re::ReminderTiming;
|
||||||
let due_at = match timing {
|
|
||||||
|
// Calculate the due_at timestamp, propagating errors instead of silently
|
||||||
|
// defaulting to epoch 1970 on overflow/conversion failure.
|
||||||
|
let due_at_result: Result<i64> = match timing {
|
||||||
ReminderTiming::InSeconds { seconds } => {
|
ReminderTiming::InSeconds { seconds } => {
|
||||||
std::time::SystemTime::now()
|
let now = std::time::SystemTime::now();
|
||||||
.checked_add(std::time::Duration::from_secs(*seconds))
|
let future = match now.checked_add(std::time::Duration::from_secs(*seconds)) {
|
||||||
.and_then(|t| {
|
Some(t) => t,
|
||||||
t.duration_since(std::time::UNIX_EPOCH)
|
None => return AgentResponse::Err {
|
||||||
.ok()
|
message: format!("InSeconds overflow: {seconds}s exceeds system time range"),
|
||||||
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
},
|
||||||
})
|
};
|
||||||
.unwrap_or(0)
|
let duration = match future.duration_since(std::time::UNIX_EPOCH) {
|
||||||
|
Ok(d) => d,
|
||||||
|
Err(e) => return AgentResponse::Err {
|
||||||
|
message: format!("system time before UNIX_EPOCH: {e}"),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
match i64::try_from(duration.as_secs()) {
|
||||||
|
Ok(ts) => Ok(ts),
|
||||||
|
Err(e) => return AgentResponse::Err {
|
||||||
|
message: format!("unix timestamp exceeds i64 range: {e}"),
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ReminderTiming::At { unix_timestamp } => *unix_timestamp,
|
ReminderTiming::At { unix_timestamp } => Ok(*unix_timestamp),
|
||||||
};
|
};
|
||||||
match broker.store_reminder(agent, message, file_path.as_deref(), due_at) {
|
|
||||||
Ok(id) => {
|
match due_at_result {
|
||||||
tracing::info!(%id, %agent, %due_at, "reminder scheduled");
|
Ok(due_at) => {
|
||||||
AgentResponse::Ok
|
match broker.store_reminder(agent, message, file_path.as_deref(), due_at) {
|
||||||
|
Ok(id) => {
|
||||||
|
tracing::info!(%id, %agent, %due_at, "reminder scheduled");
|
||||||
|
AgentResponse::Ok
|
||||||
|
}
|
||||||
|
Err(e) => AgentResponse::Err {
|
||||||
|
message: format!("failed to store reminder: {e:#}"),
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(e) => AgentResponse::Err {
|
Err(e) => AgentResponse::Err {
|
||||||
message: format!("failed to store reminder: {e:#}"),
|
message: format!("invalid reminder timing: {e:#}"),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -253,6 +253,25 @@ impl Broker {
|
||||||
.context("query reminders")
|
.context("query reminders")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get all due reminders across all agents in a single query.
|
||||||
|
/// Returns a vec of (agent, id, message, file_path) tuples.
|
||||||
|
pub fn get_all_due_reminders(&self) -> Result<Vec<(String, i64, String, Option<String>)>> {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
let mut stmt = conn.prepare(
|
||||||
|
"SELECT agent, id, message, file_path FROM reminders WHERE due_at <= ?1 AND sent_at IS NULL ORDER BY agent, due_at ASC"
|
||||||
|
)?;
|
||||||
|
let rows = stmt.query_map(params![now_unix()], |row| {
|
||||||
|
Ok((
|
||||||
|
row.get::<_, String>(0)?,
|
||||||
|
row.get::<_, i64>(1)?,
|
||||||
|
row.get::<_, String>(2)?,
|
||||||
|
row.get::<_, Option<String>>(3)?,
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
rows.collect::<rusqlite::Result<Vec<_>>>()
|
||||||
|
.context("query all due reminders")
|
||||||
|
}
|
||||||
|
|
||||||
/// Mark a reminder as sent (delivered).
|
/// Mark a reminder as sent (delivered).
|
||||||
pub fn mark_reminder_sent(&self, id: i64) -> Result<()> {
|
pub fn mark_reminder_sent(&self, id: i64) -> Result<()> {
|
||||||
let conn = self.conn.lock().unwrap();
|
let conn = self.conn.lock().unwrap();
|
||||||
|
|
|
||||||
|
|
@ -163,41 +163,36 @@ async fn main() -> Result<()> {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
use hive_sh4re::Message;
|
use hive_sh4re::Message;
|
||||||
loop {
|
loop {
|
||||||
// Get all agents currently registered
|
// Query all due reminders in a single DB call
|
||||||
let agents = reminder_coord.list_agents();
|
match reminder_coord.broker.get_all_due_reminders() {
|
||||||
for agent in agents {
|
Ok(reminders) => {
|
||||||
match reminder_coord.broker.get_due_reminders(&agent) {
|
for (agent, id, message, _file_path) in reminders {
|
||||||
Ok(reminders) => {
|
// Deliver as inbox message from "reminder"
|
||||||
for (id, message, _file_path) in reminders {
|
if let Err(e) = reminder_coord.broker.send(&Message {
|
||||||
// Deliver as inbox message from "reminder"
|
from: "reminder".to_owned(),
|
||||||
if let Err(e) = reminder_coord.broker.send(&Message {
|
to: agent.clone(),
|
||||||
from: "reminder".to_owned(),
|
body: message.clone(),
|
||||||
to: agent.clone(),
|
}) {
|
||||||
body: message.clone(),
|
tracing::warn!(
|
||||||
}) {
|
reminder_id = id,
|
||||||
tracing::warn!(
|
%agent,
|
||||||
reminder_id = id,
|
error = ?e,
|
||||||
%agent,
|
"failed to deliver reminder"
|
||||||
error = ?e,
|
);
|
||||||
"failed to deliver reminder"
|
continue;
|
||||||
);
|
}
|
||||||
continue;
|
// Mark as sent
|
||||||
}
|
if let Err(e) = reminder_coord.broker.mark_reminder_sent(id) {
|
||||||
// Mark as sent
|
tracing::warn!(
|
||||||
if let Err(e) = reminder_coord.broker.mark_reminder_sent(id) {
|
reminder_id = id,
|
||||||
tracing::warn!(
|
error = ?e,
|
||||||
reminder_id = id,
|
"failed to mark reminder sent"
|
||||||
error = ?e,
|
);
|
||||||
"failed to mark reminder sent"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => tracing::warn!(
|
}
|
||||||
%agent,
|
Err(e) => {
|
||||||
error = ?e,
|
tracing::warn!(error = ?e, "failed to query due reminders");
|
||||||
"failed to query due reminders"
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue