dashboard: queued reminders surface

new 'qu3u3d r3m1nd3rs' section between approvals and operator
inbox. lists every pending reminder with agent, due-relative
timestamp, body, payload path (path-linkified), and a cancel
button. drives off a new /api/reminders endpoint and a
POST /cancel-reminder/{id} that hard-deletes the row.

failure surface (last_error / attempt_count + retry) deferred —
needs a sqlite migration; tracked in TODO.md.
This commit is contained in:
müde 2026-05-17 22:10:02 +02:00
parent cb71a07300
commit 1db6b8ffed
6 changed files with 183 additions and 4 deletions

View file

@ -46,6 +46,19 @@ const EVENT_CHANNEL: usize = 256;
/// self-documenting.
pub type DueReminder = (String, i64, String, Option<String>);
/// Row shape for [`Broker::list_pending_reminders`], shipped on the
/// dashboard `/api/reminders` response.
#[derive(Debug, Clone, Serialize)]
pub struct PendingReminder {
pub id: i64,
pub agent: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub file_path: Option<String>,
pub due_at: i64,
pub created_at: i64,
}
/// Intra-process broker event. `recv_blocking` listens on the same
/// channel as the dashboard forwarder; the forwarder re-emits each
/// event as a `DashboardEvent` with a freshly-stamped seq from the
@ -286,6 +299,44 @@ impl Broker {
Ok(id)
}
/// Every reminder still pending delivery, newest-first. Used by the
/// dashboard's reminders pane so the operator can see what's queued
/// + cancel rows that are no longer wanted.
pub fn list_pending_reminders(&self) -> Result<Vec<PendingReminder>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id, agent, message, file_path, due_at, created_at \
FROM reminders \
WHERE sent_at IS NULL \
ORDER BY due_at ASC",
)?;
let rows = stmt.query_map([], |row| {
Ok(PendingReminder {
id: row.get(0)?,
agent: row.get(1)?,
message: row.get(2)?,
file_path: row.get(3)?,
due_at: row.get(4)?,
created_at: row.get(5)?,
})
})?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.context("list pending reminders")
}
/// Delete a reminder by id. Returns the number of rows removed (0
/// when the id never existed or was already delivered). Hard
/// delete rather than soft so the row doesn't linger and confuse a
/// re-creation under the same id.
pub fn cancel_reminder(&self, id: i64) -> Result<usize> {
let conn = self.conn.lock().unwrap();
let n = conn.execute(
"DELETE FROM reminders WHERE id = ?1 AND sent_at IS NULL",
params![id],
)?;
Ok(n)
}
/// Get up to `limit` due reminders across all agents in a single query.
/// Returns `(agent, id, message, file_path)` tuples. Pass a small limit
/// (e.g. 100) so a burst of overdue reminders doesn't flood the broker

View file

@ -55,6 +55,8 @@ pub async fn serve(port: u16, coord: Arc<Coordinator>) -> Result<()> {
.route("/purge-tombstone/{name}", post(post_purge_tombstone))
.route("/api/journal/{name}", get(get_journal))
.route("/api/state-file", get(get_state_file))
.route("/api/reminders", get(api_reminders))
.route("/cancel-reminder/{id}", post(post_cancel_reminder))
.route("/api/agent-config/{name}", get(get_agent_config))
.route("/request-spawn", post(post_request_spawn))
.route("/op-send", post(post_op_send))
@ -983,6 +985,27 @@ async fn get_state_file(
([("content-type", "text/plain; charset=utf-8")], body).into_response()
}
async fn api_reminders(State(state): State<AppState>) -> Response {
match state.coord.broker.list_pending_reminders() {
Ok(rows) => axum::Json(rows).into_response(),
Err(e) => error_response(&format!("reminders: {e:#}")),
}
}
async fn post_cancel_reminder(
State(state): State<AppState>,
AxumPath(id): AxumPath<i64>,
) -> Response {
match state.coord.broker.cancel_reminder(id) {
Ok(0) => error_response(&format!("reminder {id} not pending (already delivered?)")),
Ok(_) => {
tracing::info!(%id, "operator cancelled reminder");
(StatusCode::OK, "ok").into_response()
}
Err(e) => error_response(&format!("cancel reminder {id} failed: {e:#}")),
}
}
async fn post_purge_tombstone(
State(state): State<AppState>,
AxumPath(name): AxumPath<String>,