//! Read-side aggregations over the per-agent `turn_stats.sqlite` for //! the agent's `/stats` web page. Owned by the agent (same process //! that writes the sink) so per-MCP extensions can register more //! providers without the host needing to know their schemas. //! //! Best-effort: any sqlite error returns an empty snapshot rather than //! propagating — the stats page is decorative, not authoritative, and //! a missing db on a brand-new agent shouldn't 500 the route. use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; use anyhow::{Context, Result}; use rusqlite::{Connection, OpenFlags}; use serde::Serialize; use hive_sh4re::ReminderStats; /// Window param accepted by `/api/stats?window=`. Each maps to a /// total span + the bucket width used to roll up trend series. #[derive(Debug, Clone, Copy)] pub enum Window { Hour, FourHour, Day, ThreeDay, Week, Month, } impl Window { #[must_use] pub fn parse(s: &str) -> Self { match s { "1h" => Self::Hour, "4h" => Self::FourHour, "3d" => Self::ThreeDay, "7d" => Self::Week, "30d" => Self::Month, _ => Self::Day, } } fn label(self) -> &'static str { match self { Self::Hour => "1h", Self::FourHour => "4h", Self::Day => "24h", Self::ThreeDay => "3d", Self::Week => "7d", Self::Month => "30d", } } #[must_use] pub fn span_secs(self) -> i64 { match self { Self::Hour => 3600, Self::FourHour => 4 * 3600, Self::Day => 24 * 3600, Self::ThreeDay => 3 * 24 * 3600, Self::Week => 7 * 24 * 3600, Self::Month => 30 * 24 * 3600, } } fn bucket_secs(self) -> i64 { match self { // 5-min buckets for 1h (12 buckets), 15-min for 4h (16 buckets), // hourly for 24h + 3d, daily for 7d + 30d. Self::Hour => 300, Self::FourHour => 900, Self::Day | Self::ThreeDay => 3600, Self::Week | Self::Month => 24 * 3600, } } } #[derive(Debug, Serialize)] pub struct Snapshot { pub window: &'static str, pub bucket_seconds: i64, pub now: i64, pub from: i64, /// Total turns in the window. pub turn_count: u64, /// Time-bucketed trend series, oldest first. Always covers the /// full window even for empty buckets (so charts paint a stable /// x-axis instead of skipping gaps). pub buckets: Vec, /// Top tools by call count across the window. Capped to 10. pub tool_breakdown: Vec, pub wake_mix: Vec, pub result_mix: Vec, /// Distinct models seen in the window, sorted. Each bucket's /// `model_counts` keys into this set; the stats page uses it as /// the stacked-bar series list (stable order + colours). pub models: Vec, /// Across-window p50 / p95 / avg of `duration_ms`. Same numbers /// as the per-bucket fields but aggregated over the whole window /// for the headline summary chips. pub duration_summary: DurationSummary, /// Reminder activity stats: counts of scheduled, delivered, and /// pending reminders over the window (fetched from the broker RPC). /// None if the RPC call failed or hasn't been integrated yet. #[serde(default, skip_serializing_if = "Option::is_none")] pub reminder_stats: Option, } #[derive(Debug, Serialize)] pub struct Bucket { /// Unix timestamp of the bucket start. pub ts: i64, pub turn_count: u64, pub avg_duration_ms: f64, pub p50_duration_ms: f64, pub p95_duration_ms: f64, /// Sums across the bucket. JS picks how to combine them /// (input + output for cost, etc.) so we don't bake a policy in. pub input_tokens: u64, pub output_tokens: u64, pub cache_read_input_tokens: u64, pub cache_creation_input_tokens: u64, /// Mean of `last_input_tokens` across the bucket (the context /// size at turn-end — useful for spotting drift toward compaction). pub avg_ctx_tokens: f64, pub max_ctx_tokens: u64, /// Turn count per model in this bucket. Model choice greatly /// affects token cost, so this lets the operator line model usage /// up against the cost series over time. pub model_counts: HashMap, } #[derive(Debug, Serialize)] pub struct KeyCount { pub key: String, pub count: u64, } #[derive(Debug, Default, Serialize)] pub struct DurationSummary { pub avg_ms: f64, pub p50_ms: f64, pub p95_ms: f64, } #[must_use] pub fn snapshot_default(window: Window) -> Snapshot { let path = default_path(); match snapshot(&path, window) { Ok(s) => s, Err(e) => { tracing::warn!(error = ?e, path = %path.display(), "stats: snapshot failed"); empty_snapshot(window) } } } fn default_path() -> PathBuf { crate::paths::state_dir().join("hyperhive-turn-stats.sqlite") } fn empty_snapshot(window: Window) -> Snapshot { let now = now_secs(); let from = now - window.span_secs(); let buckets = fill_buckets(from, now, window.bucket_secs(), &HashMap::new()); Snapshot { window: window.label(), bucket_seconds: window.bucket_secs(), now, from, turn_count: 0, buckets, tool_breakdown: Vec::new(), wake_mix: Vec::new(), result_mix: Vec::new(), models: Vec::new(), duration_summary: DurationSummary::default(), reminder_stats: None, } } fn snapshot(path: &Path, window: Window) -> Result { // Read-only open so an in-flight writer (the harness's own // turn_stats sink) never blocks us and we can't corrupt the db // via a query bug. let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY) .with_context(|| format!("open {} read-only", path.display()))?; let now = now_secs(); let from = now - window.span_secs(); let bucket_secs = window.bucket_secs(); let mut stmt = conn.prepare( "SELECT started_at, duration_ms, input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, last_input_tokens, tool_call_breakdown_json, wake_from, result_kind, model FROM turn_stats WHERE started_at >= ?1 ORDER BY started_at ASC", )?; let rows = stmt.query_map([from], |row| { Ok(Row { started_at: row.get(0)?, duration_ms: row.get::<_, i64>(1)?, input_tokens: u64_from_i64(row.get::<_, i64>(2)?), output_tokens: u64_from_i64(row.get::<_, i64>(3)?), cache_read_input_tokens: u64_from_i64(row.get::<_, i64>(4)?), cache_creation_input_tokens: u64_from_i64(row.get::<_, i64>(5)?), last_input_tokens: u64_from_i64(row.get::<_, i64>(6)?), tool_breakdown_json: row.get::<_, Option>(7)?, wake_from: row.get::<_, String>(8)?, result_kind: row.get::<_, String>(9)?, model: row.get::<_, String>(10)?, }) })?; let mut by_bucket: HashMap = HashMap::new(); let mut tool_totals: HashMap = HashMap::new(); let mut wake_totals: HashMap = HashMap::new(); let mut result_totals: HashMap = HashMap::new(); let mut model_set: HashSet = HashSet::new(); let mut all_durations: Vec = Vec::new(); let mut turn_count: u64 = 0; for r in rows { let r = r?; turn_count += 1; let bucket_ts = (r.started_at / bucket_secs) * bucket_secs; let acc = by_bucket.entry(bucket_ts).or_default(); acc.turn_count += 1; acc.durations.push(r.duration_ms.max(0)); acc.input_tokens = acc.input_tokens.saturating_add(r.input_tokens); acc.output_tokens = acc.output_tokens.saturating_add(r.output_tokens); acc.cache_read_input_tokens = acc .cache_read_input_tokens .saturating_add(r.cache_read_input_tokens); acc.cache_creation_input_tokens = acc .cache_creation_input_tokens .saturating_add(r.cache_creation_input_tokens); acc.ctx_sum = acc.ctx_sum.saturating_add(r.last_input_tokens); acc.ctx_max = acc.ctx_max.max(r.last_input_tokens); *acc.model_counts.entry(r.model.clone()).or_insert(0) += 1; all_durations.push(r.duration_ms.max(0)); *wake_totals.entry(r.wake_from).or_insert(0) += 1; *result_totals.entry(r.result_kind).or_insert(0) += 1; model_set.insert(r.model); if let Some(json) = r.tool_breakdown_json && let Ok(map) = serde_json::from_str::>(&json) { for (k, v) in map { *tool_totals.entry(k).or_insert(0) += v; } } } let buckets = fill_buckets(from, now, bucket_secs, &by_bucket); let duration_summary = summarize_durations(&mut all_durations); let mut models: Vec = model_set.into_iter().collect(); models.sort_unstable(); Ok(Snapshot { window: window.label(), bucket_seconds: bucket_secs, now, from, turn_count, buckets, tool_breakdown: top_n(tool_totals, 10), wake_mix: top_n(wake_totals, 20), result_mix: top_n(result_totals, 20), models, duration_summary, reminder_stats: None, // TODO: fetch via ReminderRollup RPC }) } struct Row { started_at: i64, duration_ms: i64, input_tokens: u64, output_tokens: u64, cache_read_input_tokens: u64, cache_creation_input_tokens: u64, last_input_tokens: u64, tool_breakdown_json: Option, wake_from: String, result_kind: String, model: String, } #[derive(Default)] struct BucketAcc { turn_count: u64, durations: Vec, input_tokens: u64, output_tokens: u64, cache_read_input_tokens: u64, cache_creation_input_tokens: u64, ctx_sum: u64, ctx_max: u64, model_counts: HashMap, } fn fill_buckets( from: i64, now: i64, bucket_secs: i64, by_bucket: &HashMap, ) -> Vec { let start = (from / bucket_secs) * bucket_secs; let mut out = Vec::new(); let mut ts = start; while ts <= now { let bucket = if let Some(acc) = by_bucket.get(&ts) { let mut sorted = acc.durations.clone(); sorted.sort_unstable(); let avg = if sorted.is_empty() { 0.0 } else { #[allow(clippy::cast_precision_loss)] let sum_f = sorted.iter().sum::() as f64; #[allow(clippy::cast_precision_loss)] let len_f = sorted.len() as f64; sum_f / len_f }; let p50 = percentile(&sorted, 50); let p95 = percentile(&sorted, 95); let avg_ctx = if acc.turn_count == 0 { 0.0 } else { #[allow(clippy::cast_precision_loss)] let sum_f = acc.ctx_sum as f64; #[allow(clippy::cast_precision_loss)] let cnt_f = acc.turn_count as f64; sum_f / cnt_f }; Bucket { ts, turn_count: acc.turn_count, avg_duration_ms: avg, p50_duration_ms: p50, p95_duration_ms: p95, input_tokens: acc.input_tokens, output_tokens: acc.output_tokens, cache_read_input_tokens: acc.cache_read_input_tokens, cache_creation_input_tokens: acc.cache_creation_input_tokens, avg_ctx_tokens: avg_ctx, max_ctx_tokens: acc.ctx_max, model_counts: acc.model_counts.clone(), } } else { Bucket { ts, turn_count: 0, avg_duration_ms: 0.0, p50_duration_ms: 0.0, p95_duration_ms: 0.0, input_tokens: 0, output_tokens: 0, cache_read_input_tokens: 0, cache_creation_input_tokens: 0, avg_ctx_tokens: 0.0, max_ctx_tokens: 0, model_counts: HashMap::new(), } }; out.push(bucket); ts += bucket_secs; } out } fn summarize_durations(all: &mut [i64]) -> DurationSummary { if all.is_empty() { return DurationSummary::default(); } all.sort_unstable(); #[allow(clippy::cast_precision_loss)] let sum_f = all.iter().sum::() as f64; #[allow(clippy::cast_precision_loss)] let len_f = all.len() as f64; DurationSummary { avg_ms: sum_f / len_f, p50_ms: percentile(all, 50), p95_ms: percentile(all, 95), } } #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation, clippy::cast_sign_loss)] fn percentile(sorted: &[i64], pct: u8) -> f64 { if sorted.is_empty() { return 0.0; } if sorted.len() == 1 { return sorted[0] as f64; } // Nearest-rank, clamped. let rank = ((f64::from(pct) / 100.0) * (sorted.len() as f64 - 1.0)).round() as usize; sorted[rank.min(sorted.len() - 1)] as f64 } fn top_n(map: HashMap, n: usize) -> Vec { let mut v: Vec = map .into_iter() .map(|(key, count)| KeyCount { key, count }) .collect(); v.sort_unstable_by(|a, b| b.count.cmp(&a.count).then_with(|| a.key.cmp(&b.key))); v.truncate(n); v } fn u64_from_i64(v: i64) -> u64 { u64::try_from(v).unwrap_or(0) } fn now_secs() -> i64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map_or(0, |d| i64::try_from(d.as_secs()).unwrap_or(i64::MAX)) } #[cfg(test)] mod tests { use super::*; use rusqlite::params; use std::sync::atomic::{AtomicU32, Ordering}; static SEQ: AtomicU32 = AtomicU32::new(0); fn tmp_db() -> PathBuf { let n = SEQ.fetch_add(1, Ordering::SeqCst); let pid = std::process::id(); std::env::temp_dir().join(format!("hyperhive-stats-test-{pid}-{n}.sqlite")) } fn seed_db(path: &Path, rows: &[(i64, i64, &str, &str, &str, &str)]) { let conn = Connection::open(path).unwrap(); conn.execute_batch( "CREATE TABLE turn_stats ( id INTEGER PRIMARY KEY, started_at INTEGER NOT NULL, ended_at INTEGER NOT NULL, duration_ms INTEGER NOT NULL, model TEXT NOT NULL, wake_from TEXT NOT NULL, input_tokens INTEGER NOT NULL DEFAULT 0, output_tokens INTEGER NOT NULL DEFAULT 0, cache_read_input_tokens INTEGER NOT NULL DEFAULT 0, cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0, last_input_tokens INTEGER NOT NULL DEFAULT 0, last_output_tokens INTEGER NOT NULL DEFAULT 0, last_cache_read_input_tokens INTEGER NOT NULL DEFAULT 0, last_cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0, tool_call_count INTEGER NOT NULL DEFAULT 0, tool_call_breakdown_json TEXT, open_threads_count INTEGER, open_reminders_count INTEGER, result_kind TEXT NOT NULL, note TEXT );", ) .unwrap(); for (started, dur, model, wake, result, tools_json) in rows { conn.execute( "INSERT INTO turn_stats (started_at, ended_at, duration_ms, model, wake_from, last_input_tokens, tool_call_breakdown_json, result_kind) VALUES (?1, ?2, ?3, ?4, ?5, 1000, ?6, ?7)", params![started, started + dur / 1000, dur, model, wake, tools_json, result], ) .unwrap(); } } #[test] fn snapshot_aggregates_rows() { let db = tmp_db(); let _ = std::fs::remove_file(&db); let now = now_secs(); seed_db( &db, &[ (now - 600, 5_000, "opus", "recv", "ok", r#"{"Read":2,"Bash":1}"#), (now - 300, 10_000, "opus", "recv", "ok", r#"{"Read":3}"#), (now - 100, 20_000, "sonnet", "operator", "failed", "{}"), ], ); let s = snapshot(&db, Window::Day).unwrap(); assert_eq!(s.turn_count, 3); assert_eq!(s.window, "24h"); assert_eq!(s.bucket_seconds, 3600); let tool_map: HashMap<_, _> = s .tool_breakdown .iter() .map(|kc| (kc.key.clone(), kc.count)) .collect(); assert_eq!(tool_map.get("Read").copied(), Some(5)); assert_eq!(tool_map.get("Bash").copied(), Some(1)); let wake_map: HashMap<_, _> = s .wake_mix .iter() .map(|kc| (kc.key.clone(), kc.count)) .collect(); assert_eq!(wake_map.get("recv").copied(), Some(2)); assert_eq!(wake_map.get("operator").copied(), Some(1)); let result_map: HashMap<_, _> = s .result_mix .iter() .map(|kc| (kc.key.clone(), kc.count)) .collect(); assert_eq!(result_map.get("ok").copied(), Some(2)); assert_eq!(result_map.get("failed").copied(), Some(1)); // Model breakdown: 2 opus + 1 sonnet, all in the same hour // bucket given the 24h window. assert_eq!(s.models, vec!["opus".to_string(), "sonnet".to_string()]); let mut model_totals: HashMap = HashMap::new(); for b in &s.buckets { for (k, v) in &b.model_counts { *model_totals.entry(k.clone()).or_insert(0) += v; } } assert_eq!(model_totals.get("opus").copied(), Some(2)); assert_eq!(model_totals.get("sonnet").copied(), Some(1)); // Durations: [5000, 10000, 20000] → avg ≈ 11666.67, p50 = 10000, p95 ~ 20000 assert!((s.duration_summary.avg_ms - 11_666.666_666_666_666).abs() < 1.0); assert!((s.duration_summary.p50_ms - 10_000.0).abs() < 1.0); assert!((s.duration_summary.p95_ms - 20_000.0).abs() < 1.0); } #[test] fn empty_window_still_paints_buckets() { let db = tmp_db(); let _ = std::fs::remove_file(&db); seed_db(&db, &[]); let s = snapshot(&db, Window::Day).unwrap(); assert_eq!(s.turn_count, 0); // 24h / 1h buckets = ~24-25 buckets covering the window. assert!(s.buckets.len() >= 24); assert!(s.buckets.iter().all(|b| b.turn_count == 0)); } #[test] fn week_uses_daily_buckets() { let db = tmp_db(); let _ = std::fs::remove_file(&db); seed_db(&db, &[]); let s = snapshot(&db, Window::Week).unwrap(); assert_eq!(s.window, "7d"); assert_eq!(s.bucket_seconds, 86_400); assert!(s.buckets.len() >= 7); } }