Compare commits

..

3 commits

Author SHA1 Message Date
iris
f2015954d9 add reminder_stats field to stats Snapshot
Add Optional<ReminderStats> field to the per-agent stats page response,
placeholder for future ReminderRollup RPC integration to fetch reminder
activity metrics from the broker.
2026-05-20 13:24:17 +02:00
iris
8fe0725e1d fix: handle ReminderRollup in agent/manager response patterns
Add the ReminderRollup variant to exhaustive pattern matches
in both hive-ag3nt and hive-m1nd binaries.
2026-05-20 13:24:17 +02:00
iris
91bfa269fd add reminder rollup RPC and broker query
Surface reminder activity statistics (scheduled, delivered, pending counts)
for each agent over configurable time windows. Needed by the per-agent
stats page to display reminder metrics.

Adds:
- ReminderStats struct and ReminderRollup request/response variants
- Broker::reminder_rollup_for(agent, since_secs) method
- Agent and manager socket handlers for the new RPC
- SocketReply mapping for response conversion
2026-05-20 13:24:17 +02:00
8 changed files with 100 additions and 0 deletions

View file

@ -271,6 +271,7 @@ async fn serve(
| AgentResponse::QuestionQueued { .. } | AgentResponse::QuestionQueued { .. }
| AgentResponse::LooseEnds { .. } | AgentResponse::LooseEnds { .. }
| AgentResponse::PendingRemindersCount { .. } | AgentResponse::PendingRemindersCount { .. }
| AgentResponse::ReminderRollup { .. }
| AgentResponse::Whoami { .. }, | AgentResponse::Whoami { .. },
) => { ) => {
tracing::warn!("recv produced unexpected response kind"); tracing::warn!("recv produced unexpected response kind");

View file

@ -229,6 +229,7 @@ async fn serve(
| ManagerResponse::Logs { .. } | ManagerResponse::Logs { .. }
| ManagerResponse::LooseEnds { .. } | ManagerResponse::LooseEnds { .. }
| ManagerResponse::PendingRemindersCount { .. } | ManagerResponse::PendingRemindersCount { .. }
| ManagerResponse::ReminderRollup { .. }
| ManagerResponse::Whoami { .. }, | ManagerResponse::Whoami { .. },
) => { ) => {
tracing::warn!("recv produced unexpected response kind"); tracing::warn!("recv produced unexpected response kind");

View file

@ -48,6 +48,7 @@ pub enum SocketReply {
Logs(String), Logs(String),
LooseEnds(Vec<hive_sh4re::LooseEnd>), LooseEnds(Vec<hive_sh4re::LooseEnd>),
PendingRemindersCount(u64), PendingRemindersCount(u64),
ReminderRollup(hive_sh4re::ReminderStats),
Whoami { Whoami {
name: String, name: String,
role: String, role: String,
@ -68,6 +69,7 @@ impl From<hive_sh4re::AgentResponse> for SocketReply {
hive_sh4re::AgentResponse::PendingRemindersCount { count } => { hive_sh4re::AgentResponse::PendingRemindersCount { count } => {
Self::PendingRemindersCount(count) Self::PendingRemindersCount(count)
} }
hive_sh4re::AgentResponse::ReminderRollup(stats) => Self::ReminderRollup(stats),
hive_sh4re::AgentResponse::Whoami { hive_sh4re::AgentResponse::Whoami {
name, name,
role, role,
@ -95,6 +97,7 @@ impl From<hive_sh4re::ManagerResponse> for SocketReply {
hive_sh4re::ManagerResponse::PendingRemindersCount { count } => { hive_sh4re::ManagerResponse::PendingRemindersCount { count } => {
Self::PendingRemindersCount(count) Self::PendingRemindersCount(count)
} }
hive_sh4re::ManagerResponse::ReminderRollup(stats) => Self::ReminderRollup(stats),
hive_sh4re::ManagerResponse::Whoami { hive_sh4re::ManagerResponse::Whoami {
name, name,
role, role,

View file

@ -14,6 +14,8 @@ use anyhow::{Context, Result};
use rusqlite::{Connection, OpenFlags}; use rusqlite::{Connection, OpenFlags};
use serde::Serialize; use serde::Serialize;
use hive_sh4re::ReminderStats;
/// Window param accepted by `/api/stats?window=`. Each maps to a /// Window param accepted by `/api/stats?window=`. Each maps to a
/// total span + the bucket width used to roll up trend series. /// total span + the bucket width used to roll up trend series.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
@ -81,6 +83,11 @@ pub struct Snapshot {
/// as the per-bucket fields but aggregated over the whole window /// as the per-bucket fields but aggregated over the whole window
/// for the headline summary chips. /// for the headline summary chips.
pub duration_summary: DurationSummary, pub duration_summary: DurationSummary,
/// Reminder activity stats: counts of scheduled, delivered, and
/// pending reminders over the window (fetched from the broker RPC).
/// None if the RPC call failed or hasn't been integrated yet.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reminder_stats: Option<ReminderStats>,
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
@ -152,6 +159,7 @@ fn empty_snapshot(window: Window) -> Snapshot {
result_mix: Vec::new(), result_mix: Vec::new(),
models: Vec::new(), models: Vec::new(),
duration_summary: DurationSummary::default(), duration_summary: DurationSummary::default(),
reminder_stats: None,
} }
} }
@ -250,6 +258,7 @@ fn snapshot(path: &Path, window: Window) -> Result<Snapshot> {
result_mix: top_n(result_totals, 20), result_mix: top_n(result_totals, 20),
models, models,
duration_summary, duration_summary,
reminder_stats: None, // TODO: fetch via ReminderRollup RPC
}) })
} }

View file

@ -207,6 +207,14 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
}, },
} }
} }
AgentRequest::ReminderRollup { since_secs } => {
match coord.broker.reminder_rollup_for(agent, *since_secs) {
Ok(stats) => AgentResponse::ReminderRollup(stats),
Err(e) => AgentResponse::Err {
message: format!("{e:#}"),
},
}
}
AgentRequest::Whoami => AgentResponse::Whoami { AgentRequest::Whoami => AgentResponse::Whoami {
name: agent.to_owned(), name: agent.to_owned(),
role: "agent".to_owned(), role: "agent".to_owned(),

View file

@ -583,6 +583,43 @@ impl Broker {
Ok(u64::try_from(n).unwrap_or(0)) Ok(u64::try_from(n).unwrap_or(0))
} }
/// Reminder rollup stats for an agent over a time window. Returns
/// counts of scheduled, delivered, and pending reminders created
/// in the last `since_secs` seconds (0 = all reminders).
pub fn reminder_rollup_for(&self, agent: &str, since_secs: u64) -> Result<hive_sh4re::ReminderStats> {
let conn = self.conn.lock().unwrap();
let cutoff_time = if since_secs > 0 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
now - since_secs as i64
} else {
i64::MIN
};
let scheduled: i64 = conn.query_row(
"SELECT COUNT(*) FROM reminders WHERE agent = ?1 AND created_at >= ?2",
params![agent, cutoff_time],
|row| row.get(0),
)?;
let delivered: i64 = conn.query_row(
"SELECT COUNT(*) FROM reminders WHERE agent = ?1 AND created_at >= ?2 AND sent_at IS NOT NULL",
params![agent, cutoff_time],
|row| row.get(0),
)?;
let pending: i64 = conn.query_row(
"SELECT COUNT(*) FROM reminders WHERE agent = ?1 AND created_at >= ?2 AND sent_at IS NULL",
params![agent, cutoff_time],
|row| row.get(0),
)?;
Ok(hive_sh4re::ReminderStats {
scheduled: u64::try_from(scheduled).unwrap_or(0),
delivered: u64::try_from(delivered).unwrap_or(0),
pending: u64::try_from(pending).unwrap_or(0),
})
}
/// Delete a reminder by id. Returns the number of rows removed (0 /// Delete a reminder by id. Returns the number of rows removed (0
/// when the id never existed or was already delivered). Hard /// when the id never existed or was already delivered). Hard
/// delete rather than soft so the row doesn't linger and confuse a /// delete rather than soft so the row doesn't linger and confuse a

View file

@ -369,6 +369,14 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResp
}, },
} }
} }
ManagerRequest::ReminderRollup { since_secs } => {
match coord.broker.reminder_rollup_for(MANAGER_AGENT, *since_secs) {
Ok(stats) => ManagerResponse::ReminderRollup(stats),
Err(e) => ManagerResponse::Err {
message: format!("{e:#}"),
},
}
}
ManagerRequest::Whoami => ManagerResponse::Whoami { ManagerRequest::Whoami => ManagerResponse::Whoami {
name: MANAGER_AGENT.to_owned(), name: MANAGER_AGENT.to_owned(),
role: "manager".to_owned(), role: "manager".to_owned(),

View file

@ -110,6 +110,17 @@ pub enum ApprovalStatus {
Failed, Failed,
} }
/// Reminder activity statistics for an agent over a time window.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReminderStats {
/// Total reminders scheduled in the window (created_at >= cutoff).
pub scheduled: u64,
/// Reminders that have been delivered in the window (sent_at IS NOT NULL).
pub delivered: u64,
/// Reminders still pending in the window (sent_at IS NULL).
pub pending: u64,
}
impl HostResponse { impl HostResponse {
pub fn success() -> Self { pub fn success() -> Self {
Self { Self {
@ -381,6 +392,16 @@ pub enum AgentRequest {
/// by the harness's per-turn stats sink to snapshot "what was /// by the harness's per-turn stats sink to snapshot "what was
/// queued at turn-end time" without paying for a full list. /// queued at turn-end time" without paying for a full list.
CountPendingReminders, CountPendingReminders,
/// Reminder statistics for this agent: counts of scheduled, delivered,
/// and pending reminders over a time window. Used by the stats page
/// to display reminder activity. `since_secs` filters to reminders
/// created in the last N seconds (0 = all reminders).
ReminderRollup {
/// Only count reminders created in the last N seconds from now.
/// Pass 0 to include all reminders.
#[serde(default)]
since_secs: u64,
},
/// Self-introspection: who am I, what role, what rev. All values /// Self-introspection: who am I, what role, what rev. All values
/// derive from coord state (no env access required); useful for /// derive from coord state (no env access required); useful for
/// agents to stamp notes / commits / messages with a trustworthy /// agents to stamp notes / commits / messages with a trustworthy
@ -445,6 +466,8 @@ pub enum AgentResponse {
LooseEnds { loose_ends: Vec<LooseEnd> }, LooseEnds { loose_ends: Vec<LooseEnd> },
/// `CountPendingReminders` result. /// `CountPendingReminders` result.
PendingRemindersCount { count: u64 }, PendingRemindersCount { count: u64 },
/// `ReminderRollup` result: reminder activity stats for the agent.
ReminderRollup(ReminderStats),
/// `Whoami` result: identity + role + the current hyperhive rev /// `Whoami` result: identity + role + the current hyperhive rev
/// hive-c0re is running against. `role` is `"agent"` for /// hive-c0re is running against. `role` is `"agent"` for
/// sub-agents (the only path that reaches this variant of the /// sub-agents (the only path that reaches this variant of the
@ -730,6 +753,14 @@ pub enum ManagerRequest {
/// Count of the manager's own pending reminders. Mirror of /// Count of the manager's own pending reminders. Mirror of
/// `AgentRequest::CountPendingReminders` on the manager surface. /// `AgentRequest::CountPendingReminders` on the manager surface.
CountPendingReminders, CountPendingReminders,
/// Reminder statistics: counts of scheduled, delivered, and pending
/// reminders (manager-flavour). Mirror of `AgentRequest::ReminderRollup`.
ReminderRollup {
/// Only count reminders created in the last N seconds from now.
/// Pass 0 to include all reminders.
#[serde(default)]
since_secs: u64,
},
/// Manager-flavour self-introspection. Same wire shape as /// Manager-flavour self-introspection. Same wire shape as
/// `AgentRequest::Whoami`, but `role` is always `"manager"`. /// `AgentRequest::Whoami`, but `role` is always `"manager"`.
Whoami, Whoami,
@ -788,6 +819,8 @@ pub enum ManagerResponse {
PendingRemindersCount { PendingRemindersCount {
count: u64, count: u64,
}, },
/// `ReminderRollup` result: reminder activity stats for the manager.
ReminderRollup(ReminderStats),
/// `Whoami` result: manager identity. `role` is always /// `Whoami` result: manager identity. `role` is always
/// `"manager"`. Mirror of `AgentResponse::Whoami`. /// `"manager"`. Mirror of `AgentResponse::Whoami`.
Whoami { Whoami {