diff --git a/hive-c0re/assets/app.js b/hive-c0re/assets/app.js index 012aeef..0c079f7 100644 --- a/hive-c0re/assets/app.js +++ b/hive-c0re/assets/app.js @@ -1349,7 +1349,8 @@ } const ul = el('ul', { class: 'reminders' }); for (const r of rows) { - const li = el('li', { class: 'reminder-row' }); + const failed = (r.attempt_count || 0) > 0; + const li = el('li', { class: 'reminder-row' + (failed ? ' reminder-failed' : '') }); const dueIn = r.due_at - Math.floor(Date.now() / 1000); const dueLabel = dueIn <= 0 ? `overdue ${fmtAgo(r.due_at)}` @@ -1364,19 +1365,45 @@ head.append(' ', el('span', { class: 'meta' }, '· payload → ')); appendLinkified(head, r.file_path); } + if (failed) { + head.append(' ', el('span', + { + class: 'badge badge-warn', + title: 'consecutive failed delivery attempts (capped at 5; over the cap the scheduler stops retrying until you click R3TRY or cancel)', + }, + `⚠ ${r.attempt_count} failed`)); + } const body = el('div', { class: 'reminder-body' }); const previews = appendLinkified(body, r.message); li.append(head, body); for (const d of previews) li.appendChild(d); - // Cancel form omits `data-no-refresh` — the resulting refreshState - // re-fires refreshReminders so the row drops on its own. + if (r.last_error) { + li.append(el('div', { class: 'reminder-error' }, + el('span', { class: 'msg-sep' }, 'error: '), + r.last_error, + )); + } + const actions = el('div', { class: 'reminder-actions' }); + if (failed) { + // Retry resets the failure counters so the scheduler picks + // the row up again on its next 5s tick. No data-no-refresh + // — the resulting refreshState re-fires refreshReminders. + const retryForm = el('form', { + method: 'POST', action: '/retry-reminder/' + r.id, + class: 'inline', 'data-async': '', + }); + retryForm.append(el('button', + { type: 'submit', class: 'btn btn-restart' }, '↻ R3TRY')); + actions.append(retryForm); + } const cancelForm = el('form', { method: 'POST', action: '/cancel-reminder/' + r.id, class: 'inline', 'data-async': '', 'data-confirm': `cancel reminder ${r.id} for ${r.agent}? this drops the queued delivery; no undo.`, }); cancelForm.append(el('button', { type: 'submit', class: 'btn btn-deny' }, '✗ C4NC3L')); - li.append(cancelForm); + actions.append(cancelForm); + li.append(actions); ul.append(li); } root.append(ul); diff --git a/hive-c0re/assets/dashboard.css b/hive-c0re/assets/dashboard.css index 46eeed6..ea62fe3 100644 --- a/hive-c0re/assets/dashboard.css +++ b/hive-c0re/assets/dashboard.css @@ -474,6 +474,25 @@ summary:hover { color: var(--purple); } word-break: break-word; margin: 0.3em 0; } +.reminder-row.reminder-failed { + border-left: 2px solid var(--red, #f38ba8); + padding-left: 0.5em; +} +.reminder-error { + color: var(--red, #f38ba8); + background: rgba(243, 139, 168, 0.06); + border: 1px solid rgba(243, 139, 168, 0.25); + padding: 0.3em 0.5em; + font-size: 0.85em; + white-space: pre-wrap; + word-break: break-word; + margin: 0.2em 0; +} +.reminder-actions { + display: flex; + gap: 0.4em; + margin-top: 0.3em; +} /* Path linkification — agents drop pointer strings into messages constantly; clicking the anchor expands a sibling
that diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index afe14f7..fbbb2d6 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -57,8 +57,26 @@ pub struct PendingReminder { pub file_path: Option, pub due_at: i64, pub created_at: i64, + /// Most recent delivery failure for this row, if any. Cleared + /// to NULL on operator retry. Surfaced inline in the dashboard + /// so a stuck reminder doesn't just silently retry forever. + #[serde(skip_serializing_if = "Option::is_none")] + pub last_error: Option, + /// Number of failed delivery attempts since the row was + /// created or last retried. After `MAX_REMINDER_ATTEMPTS` the + /// scheduler stops trying (the row stays in `pending` with the + /// error so the operator can decide between retry + cancel). + #[serde(default)] + pub attempt_count: u32, } +/// Stop retrying a row after this many consecutive failures. The +/// scheduler quits scheduling it until an operator explicitly +/// retries (which resets the counter) or cancels (which deletes +/// the row). Below the cap the existing 5s tick re-attempts each +/// time the row is due. +pub const MAX_REMINDER_ATTEMPTS: u32 = 5; + /// Intra-process broker event. `recv_blocking` listens on the same /// channel as the dashboard forwarder; the forwarder re-emits each /// event as a `DashboardEvent` with a freshly-stamped seq from the @@ -95,6 +113,7 @@ impl Broker { let conn = Connection::open(path).with_context(|| format!("open broker db {}", path.display()))?; conn.execute_batch(SCHEMA).context("apply broker schema")?; + ensure_reminder_columns(&conn).context("migrate reminders columns")?; let (events, _) = broadcast::channel(EVENT_CHANNEL); Ok(Self { conn: Mutex::new(conn), @@ -305,12 +324,14 @@ impl Broker { pub fn list_pending_reminders(&self) -> Result> { let conn = self.conn.lock().unwrap(); let mut stmt = conn.prepare( - "SELECT id, agent, message, file_path, due_at, created_at \ + "SELECT id, agent, message, file_path, due_at, created_at, \ + last_error, attempt_count \ FROM reminders \ WHERE sent_at IS NULL \ ORDER BY due_at ASC", )?; let rows = stmt.query_map([], |row| { + let attempts: i64 = row.get(7)?; Ok(PendingReminder { id: row.get(0)?, agent: row.get(1)?, @@ -318,12 +339,46 @@ impl Broker { file_path: row.get(3)?, due_at: row.get(4)?, created_at: row.get(5)?, + last_error: row.get(6)?, + attempt_count: u32::try_from(attempts).unwrap_or(0), }) })?; rows.collect::>>() .context("list pending reminders") } + /// Mark a delivery attempt as failed: bump `attempt_count` and + /// stash the error string. Called by `reminder_scheduler::tick` + /// when `deliver_reminder` returns Err. Soft-cap behaviour + /// lives in `get_due_reminders` (rows over the cap drop out + /// of the due-list and stop being attempted until retry). + pub fn record_reminder_failure(&self, id: i64, reason: &str) -> Result<()> { + let conn = self.conn.lock().unwrap(); + conn.execute( + "UPDATE reminders \ + SET attempt_count = attempt_count + 1, last_error = ?1 \ + WHERE id = ?2 AND sent_at IS NULL", + params![reason, id], + )?; + Ok(()) + } + + /// Clear the failure state on a pending reminder so the + /// scheduler picks it up again. No-op when the row is already + /// fresh (attempt_count == 0). Returns the number of rows + /// affected so callers can distinguish "retried" from "no + /// such pending reminder" (already delivered, or wrong id). + pub fn reset_reminder_failure(&self, id: i64) -> Result { + let conn = self.conn.lock().unwrap(); + let n = conn.execute( + "UPDATE reminders \ + SET attempt_count = 0, last_error = NULL \ + WHERE id = ?1 AND sent_at IS NULL", + params![id], + )?; + Ok(n) + } + /// Count this agent's still-pending (un-delivered) reminders. /// Used by the per-turn stats sink for a cheap "what was queued /// at turn-end" snapshot. @@ -357,13 +412,16 @@ impl Broker { pub fn get_due_reminders(&self, limit: u64) -> Result> { let conn = self.conn.lock().unwrap(); let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX); + let max_attempts = i64::from(MAX_REMINDER_ATTEMPTS); + // attempt_count >= cap = give up; row stays pending so the + // operator sees + can retry/cancel via the dashboard. let mut stmt = conn.prepare( "SELECT agent, id, message, file_path FROM reminders \ - WHERE due_at <= ?1 AND sent_at IS NULL \ + WHERE due_at <= ?1 AND sent_at IS NULL AND attempt_count < ?3 \ ORDER BY agent, due_at ASC \ LIMIT ?2", )?; - let rows = stmt.query_map(params![now_unix(), limit_i], |row| { + let rows = stmt.query_map(params![now_unix(), limit_i, max_attempts], |row| { Ok(( row.get::<_, String>(0)?, row.get::<_, i64>(1)?, @@ -410,6 +468,35 @@ impl Broker { } } +/// Idempotent reminder-table migrations. `ALTER TABLE ADD COLUMN` +/// has no `IF NOT EXISTS` form in sqlite, so we probe +/// `pragma_table_info` per column. New deploys (table created by +/// SCHEMA in this commit cycle) skip the ALTER; pre-existing +/// broker.sqlite files get the columns added on next boot. +fn ensure_reminder_columns(conn: &Connection) -> Result<()> { + for (name, sql) in [ + ( + "attempt_count", + "ALTER TABLE reminders ADD COLUMN attempt_count INTEGER NOT NULL DEFAULT 0;", + ), + ( + "last_error", + "ALTER TABLE reminders ADD COLUMN last_error TEXT;", + ), + ] { + let has: bool = conn + .prepare(&format!( + "SELECT 1 FROM pragma_table_info('reminders') WHERE name = '{name}'" + ))? + .exists([])?; + if !has { + conn.execute_batch(sql) + .with_context(|| format!("add reminders.{name} column"))?; + } + } + Ok(()) +} + fn now_unix() -> i64 { SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/hive-c0re/src/dashboard.rs b/hive-c0re/src/dashboard.rs index da25b15..6badad4 100644 --- a/hive-c0re/src/dashboard.rs +++ b/hive-c0re/src/dashboard.rs @@ -57,6 +57,7 @@ pub async fn serve(port: u16, coord: Arc) -> Result<()> { .route("/api/state-file", get(get_state_file)) .route("/api/reminders", get(api_reminders)) .route("/cancel-reminder/{id}", post(post_cancel_reminder)) + .route("/retry-reminder/{id}", post(post_retry_reminder)) .route("/api/agent-config/{name}", get(get_agent_config)) .route("/request-spawn", post(post_request_spawn)) .route("/op-send", post(post_op_send)) @@ -1126,6 +1127,25 @@ async fn post_cancel_reminder( } } +/// Reset a pending reminder's failure state so the scheduler +/// retries it on the next tick. Useful when the failure was +/// transient (sqlite lock contention, disk full → freed up) and +/// the operator wants delivery to resume immediately instead of +/// the row sitting in attempt-count-capped purgatory. +async fn post_retry_reminder( + State(state): State, + AxumPath(id): AxumPath, +) -> Response { + match state.coord.broker.reset_reminder_failure(id) { + Ok(0) => error_response(&format!("reminder {id} not pending (already delivered?)")), + Ok(_) => { + tracing::info!(%id, "operator reset reminder failure for retry"); + (StatusCode::OK, "ok").into_response() + } + Err(e) => error_response(&format!("retry reminder {id} failed: {e:#}")), + } +} + async fn post_purge_tombstone( State(state): State, AxumPath(name): AxumPath, diff --git a/hive-c0re/src/reminder_scheduler.rs b/hive-c0re/src/reminder_scheduler.rs index c7d83f0..92ce426 100644 --- a/hive-c0re/src/reminder_scheduler.rs +++ b/hive-c0re/src/reminder_scheduler.rs @@ -71,12 +71,24 @@ fn tick(coord: &Arc) { for (agent, id, message, file_path) in due { let body = prepare_body(&agent, &message, file_path.as_deref()); if let Err(e) = coord.broker.deliver_reminder(id, &agent, &body) { + let reason = format!("{e:#}"); tracing::warn!( reminder_id = id, %agent, - error = ?e, + error = %reason, "failed to deliver reminder" ); + // Persist the failure so the dashboard can surface it + + // bump attempt_count. After MAX_REMINDER_ATTEMPTS the + // row drops out of `get_due_reminders` and waits for + // operator retry / cancel. + if let Err(persist_err) = coord.broker.record_reminder_failure(id, &reason) { + tracing::warn!( + reminder_id = id, + error = ?persist_err, + "failed to persist reminder failure" + ); + } } } }