557 lines
19 KiB
Rust
557 lines
19 KiB
Rust
//! Read-side aggregations over the per-agent `turn_stats.sqlite` for
|
|
//! the agent's `/stats` web page. Owned by the agent (same process
|
|
//! that writes the sink) so per-MCP extensions can register more
|
|
//! providers without the host needing to know their schemas.
|
|
//!
|
|
//! Best-effort: any sqlite error returns an empty snapshot rather than
|
|
//! propagating — the stats page is decorative, not authoritative, and
|
|
//! a missing db on a brand-new agent shouldn't 500 the route.
|
|
|
|
use std::collections::{HashMap, HashSet};
|
|
use std::path::{Path, PathBuf};
|
|
|
|
use anyhow::{Context, Result};
|
|
use rusqlite::{Connection, OpenFlags};
|
|
use serde::Serialize;
|
|
|
|
use hive_sh4re::ReminderStats;
|
|
|
|
/// Window param accepted by `/api/stats?window=`. Each maps to a
|
|
/// total span + the bucket width used to roll up trend series.
|
|
#[derive(Debug, Clone, Copy)]
|
|
pub enum Window {
|
|
Hour,
|
|
FourHour,
|
|
Day,
|
|
ThreeDay,
|
|
Week,
|
|
Month,
|
|
}
|
|
|
|
impl Window {
|
|
#[must_use]
|
|
pub fn parse(s: &str) -> Self {
|
|
match s {
|
|
"1h" => Self::Hour,
|
|
"4h" => Self::FourHour,
|
|
"3d" => Self::ThreeDay,
|
|
"7d" => Self::Week,
|
|
"30d" => Self::Month,
|
|
_ => Self::Day,
|
|
}
|
|
}
|
|
|
|
fn label(self) -> &'static str {
|
|
match self {
|
|
Self::Hour => "1h",
|
|
Self::FourHour => "4h",
|
|
Self::Day => "24h",
|
|
Self::ThreeDay => "3d",
|
|
Self::Week => "7d",
|
|
Self::Month => "30d",
|
|
}
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn span_secs(self) -> i64 {
|
|
match self {
|
|
Self::Hour => 3600,
|
|
Self::FourHour => 4 * 3600,
|
|
Self::Day => 24 * 3600,
|
|
Self::ThreeDay => 3 * 24 * 3600,
|
|
Self::Week => 7 * 24 * 3600,
|
|
Self::Month => 30 * 24 * 3600,
|
|
}
|
|
}
|
|
|
|
fn bucket_secs(self) -> i64 {
|
|
match self {
|
|
// 5-min buckets for 1h (12 buckets), 15-min for 4h (16 buckets),
|
|
// hourly for 24h + 3d, daily for 7d + 30d.
|
|
Self::Hour => 300,
|
|
Self::FourHour => 900,
|
|
Self::Day | Self::ThreeDay => 3600,
|
|
Self::Week | Self::Month => 24 * 3600,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize)]
|
|
pub struct Snapshot {
|
|
pub window: &'static str,
|
|
pub bucket_seconds: i64,
|
|
pub now: i64,
|
|
pub from: i64,
|
|
/// Total turns in the window.
|
|
pub turn_count: u64,
|
|
/// Time-bucketed trend series, oldest first. Always covers the
|
|
/// full window even for empty buckets (so charts paint a stable
|
|
/// x-axis instead of skipping gaps).
|
|
pub buckets: Vec<Bucket>,
|
|
/// Top tools by call count across the window. Capped to 10.
|
|
pub tool_breakdown: Vec<KeyCount>,
|
|
pub wake_mix: Vec<KeyCount>,
|
|
pub result_mix: Vec<KeyCount>,
|
|
/// Distinct models seen in the window, sorted. Each bucket's
|
|
/// `model_counts` keys into this set; the stats page uses it as
|
|
/// the stacked-bar series list (stable order + colours).
|
|
pub models: Vec<String>,
|
|
/// Across-window p50 / p95 / avg of `duration_ms`. Same numbers
|
|
/// as the per-bucket fields but aggregated over the whole window
|
|
/// for the headline summary chips.
|
|
pub duration_summary: DurationSummary,
|
|
/// Reminder activity stats: counts of scheduled, delivered, and
|
|
/// pending reminders over the window (fetched from the broker RPC).
|
|
/// None if the RPC call failed or hasn't been integrated yet.
|
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
|
pub reminder_stats: Option<ReminderStats>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize)]
|
|
pub struct Bucket {
|
|
/// Unix timestamp of the bucket start.
|
|
pub ts: i64,
|
|
pub turn_count: u64,
|
|
pub avg_duration_ms: f64,
|
|
pub p50_duration_ms: f64,
|
|
pub p95_duration_ms: f64,
|
|
/// Sums across the bucket. JS picks how to combine them
|
|
/// (input + output for cost, etc.) so we don't bake a policy in.
|
|
pub input_tokens: u64,
|
|
pub output_tokens: u64,
|
|
pub cache_read_input_tokens: u64,
|
|
pub cache_creation_input_tokens: u64,
|
|
/// Mean of `last_input_tokens` across the bucket (the context
|
|
/// size at turn-end — useful for spotting drift toward compaction).
|
|
pub avg_ctx_tokens: f64,
|
|
pub max_ctx_tokens: u64,
|
|
/// Turn count per model in this bucket. Model choice greatly
|
|
/// affects token cost, so this lets the operator line model usage
|
|
/// up against the cost series over time.
|
|
pub model_counts: HashMap<String, u64>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize)]
|
|
pub struct KeyCount {
|
|
pub key: String,
|
|
pub count: u64,
|
|
}
|
|
|
|
#[derive(Debug, Default, Serialize)]
|
|
pub struct DurationSummary {
|
|
pub avg_ms: f64,
|
|
pub p50_ms: f64,
|
|
pub p95_ms: f64,
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn snapshot_default(window: Window) -> Snapshot {
|
|
let path = default_path();
|
|
match snapshot(&path, window) {
|
|
Ok(s) => s,
|
|
Err(e) => {
|
|
tracing::warn!(error = ?e, path = %path.display(), "stats: snapshot failed");
|
|
empty_snapshot(window)
|
|
}
|
|
}
|
|
}
|
|
|
|
fn default_path() -> PathBuf {
|
|
crate::paths::state_dir().join("hyperhive-turn-stats.sqlite")
|
|
}
|
|
|
|
fn empty_snapshot(window: Window) -> Snapshot {
|
|
let now = now_secs();
|
|
let from = now - window.span_secs();
|
|
let buckets = fill_buckets(from, now, window.bucket_secs(), &HashMap::new());
|
|
Snapshot {
|
|
window: window.label(),
|
|
bucket_seconds: window.bucket_secs(),
|
|
now,
|
|
from,
|
|
turn_count: 0,
|
|
buckets,
|
|
tool_breakdown: Vec::new(),
|
|
wake_mix: Vec::new(),
|
|
result_mix: Vec::new(),
|
|
models: Vec::new(),
|
|
duration_summary: DurationSummary::default(),
|
|
reminder_stats: None,
|
|
}
|
|
}
|
|
|
|
fn snapshot(path: &Path, window: Window) -> Result<Snapshot> {
|
|
// Read-only open so an in-flight writer (the harness's own
|
|
// turn_stats sink) never blocks us and we can't corrupt the db
|
|
// via a query bug.
|
|
let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)
|
|
.with_context(|| format!("open {} read-only", path.display()))?;
|
|
let now = now_secs();
|
|
let from = now - window.span_secs();
|
|
let bucket_secs = window.bucket_secs();
|
|
|
|
let mut stmt = conn.prepare(
|
|
"SELECT started_at, duration_ms,
|
|
input_tokens, output_tokens,
|
|
cache_read_input_tokens, cache_creation_input_tokens,
|
|
last_input_tokens,
|
|
tool_call_breakdown_json,
|
|
wake_from, result_kind, model
|
|
FROM turn_stats
|
|
WHERE started_at >= ?1
|
|
ORDER BY started_at ASC",
|
|
)?;
|
|
let rows = stmt.query_map([from], |row| {
|
|
Ok(Row {
|
|
started_at: row.get(0)?,
|
|
duration_ms: row.get::<_, i64>(1)?,
|
|
input_tokens: u64_from_i64(row.get::<_, i64>(2)?),
|
|
output_tokens: u64_from_i64(row.get::<_, i64>(3)?),
|
|
cache_read_input_tokens: u64_from_i64(row.get::<_, i64>(4)?),
|
|
cache_creation_input_tokens: u64_from_i64(row.get::<_, i64>(5)?),
|
|
last_input_tokens: u64_from_i64(row.get::<_, i64>(6)?),
|
|
tool_breakdown_json: row.get::<_, Option<String>>(7)?,
|
|
wake_from: row.get::<_, String>(8)?,
|
|
result_kind: row.get::<_, String>(9)?,
|
|
model: row.get::<_, String>(10)?,
|
|
})
|
|
})?;
|
|
|
|
let mut by_bucket: HashMap<i64, BucketAcc> = HashMap::new();
|
|
let mut tool_totals: HashMap<String, u64> = HashMap::new();
|
|
let mut wake_totals: HashMap<String, u64> = HashMap::new();
|
|
let mut result_totals: HashMap<String, u64> = HashMap::new();
|
|
let mut model_set: HashSet<String> = HashSet::new();
|
|
let mut all_durations: Vec<i64> = Vec::new();
|
|
let mut turn_count: u64 = 0;
|
|
|
|
for r in rows {
|
|
let r = r?;
|
|
turn_count += 1;
|
|
let bucket_ts = (r.started_at / bucket_secs) * bucket_secs;
|
|
let acc = by_bucket.entry(bucket_ts).or_default();
|
|
acc.turn_count += 1;
|
|
acc.durations.push(r.duration_ms.max(0));
|
|
acc.input_tokens = acc.input_tokens.saturating_add(r.input_tokens);
|
|
acc.output_tokens = acc.output_tokens.saturating_add(r.output_tokens);
|
|
acc.cache_read_input_tokens = acc
|
|
.cache_read_input_tokens
|
|
.saturating_add(r.cache_read_input_tokens);
|
|
acc.cache_creation_input_tokens = acc
|
|
.cache_creation_input_tokens
|
|
.saturating_add(r.cache_creation_input_tokens);
|
|
acc.ctx_sum = acc.ctx_sum.saturating_add(r.last_input_tokens);
|
|
acc.ctx_max = acc.ctx_max.max(r.last_input_tokens);
|
|
*acc.model_counts.entry(r.model.clone()).or_insert(0) += 1;
|
|
|
|
all_durations.push(r.duration_ms.max(0));
|
|
*wake_totals.entry(r.wake_from).or_insert(0) += 1;
|
|
*result_totals.entry(r.result_kind).or_insert(0) += 1;
|
|
model_set.insert(r.model);
|
|
|
|
if let Some(json) = r.tool_breakdown_json
|
|
&& let Ok(map) = serde_json::from_str::<HashMap<String, u64>>(&json)
|
|
{
|
|
for (k, v) in map {
|
|
*tool_totals.entry(k).or_insert(0) += v;
|
|
}
|
|
}
|
|
}
|
|
|
|
let buckets = fill_buckets(from, now, bucket_secs, &by_bucket);
|
|
let duration_summary = summarize_durations(&mut all_durations);
|
|
let mut models: Vec<String> = model_set.into_iter().collect();
|
|
models.sort_unstable();
|
|
|
|
Ok(Snapshot {
|
|
window: window.label(),
|
|
bucket_seconds: bucket_secs,
|
|
now,
|
|
from,
|
|
turn_count,
|
|
buckets,
|
|
tool_breakdown: top_n(tool_totals, 10),
|
|
wake_mix: top_n(wake_totals, 20),
|
|
result_mix: top_n(result_totals, 20),
|
|
models,
|
|
duration_summary,
|
|
reminder_stats: None, // TODO: fetch via ReminderRollup RPC
|
|
})
|
|
}
|
|
|
|
struct Row {
|
|
started_at: i64,
|
|
duration_ms: i64,
|
|
input_tokens: u64,
|
|
output_tokens: u64,
|
|
cache_read_input_tokens: u64,
|
|
cache_creation_input_tokens: u64,
|
|
last_input_tokens: u64,
|
|
tool_breakdown_json: Option<String>,
|
|
wake_from: String,
|
|
result_kind: String,
|
|
model: String,
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct BucketAcc {
|
|
turn_count: u64,
|
|
durations: Vec<i64>,
|
|
input_tokens: u64,
|
|
output_tokens: u64,
|
|
cache_read_input_tokens: u64,
|
|
cache_creation_input_tokens: u64,
|
|
ctx_sum: u64,
|
|
ctx_max: u64,
|
|
model_counts: HashMap<String, u64>,
|
|
}
|
|
|
|
fn fill_buckets(
|
|
from: i64,
|
|
now: i64,
|
|
bucket_secs: i64,
|
|
by_bucket: &HashMap<i64, BucketAcc>,
|
|
) -> Vec<Bucket> {
|
|
let start = (from / bucket_secs) * bucket_secs;
|
|
let mut out = Vec::new();
|
|
let mut ts = start;
|
|
while ts <= now {
|
|
let bucket = if let Some(acc) = by_bucket.get(&ts) {
|
|
let mut sorted = acc.durations.clone();
|
|
sorted.sort_unstable();
|
|
let avg = if sorted.is_empty() {
|
|
0.0
|
|
} else {
|
|
#[allow(clippy::cast_precision_loss)]
|
|
let sum_f = sorted.iter().sum::<i64>() as f64;
|
|
#[allow(clippy::cast_precision_loss)]
|
|
let len_f = sorted.len() as f64;
|
|
sum_f / len_f
|
|
};
|
|
let p50 = percentile(&sorted, 50);
|
|
let p95 = percentile(&sorted, 95);
|
|
let avg_ctx = if acc.turn_count == 0 {
|
|
0.0
|
|
} else {
|
|
#[allow(clippy::cast_precision_loss)]
|
|
let sum_f = acc.ctx_sum as f64;
|
|
#[allow(clippy::cast_precision_loss)]
|
|
let cnt_f = acc.turn_count as f64;
|
|
sum_f / cnt_f
|
|
};
|
|
Bucket {
|
|
ts,
|
|
turn_count: acc.turn_count,
|
|
avg_duration_ms: avg,
|
|
p50_duration_ms: p50,
|
|
p95_duration_ms: p95,
|
|
input_tokens: acc.input_tokens,
|
|
output_tokens: acc.output_tokens,
|
|
cache_read_input_tokens: acc.cache_read_input_tokens,
|
|
cache_creation_input_tokens: acc.cache_creation_input_tokens,
|
|
avg_ctx_tokens: avg_ctx,
|
|
max_ctx_tokens: acc.ctx_max,
|
|
model_counts: acc.model_counts.clone(),
|
|
}
|
|
} else {
|
|
Bucket {
|
|
ts,
|
|
turn_count: 0,
|
|
avg_duration_ms: 0.0,
|
|
p50_duration_ms: 0.0,
|
|
p95_duration_ms: 0.0,
|
|
input_tokens: 0,
|
|
output_tokens: 0,
|
|
cache_read_input_tokens: 0,
|
|
cache_creation_input_tokens: 0,
|
|
avg_ctx_tokens: 0.0,
|
|
max_ctx_tokens: 0,
|
|
model_counts: HashMap::new(),
|
|
}
|
|
};
|
|
out.push(bucket);
|
|
ts += bucket_secs;
|
|
}
|
|
out
|
|
}
|
|
|
|
fn summarize_durations(all: &mut [i64]) -> DurationSummary {
|
|
if all.is_empty() {
|
|
return DurationSummary::default();
|
|
}
|
|
all.sort_unstable();
|
|
#[allow(clippy::cast_precision_loss)]
|
|
let sum_f = all.iter().sum::<i64>() as f64;
|
|
#[allow(clippy::cast_precision_loss)]
|
|
let len_f = all.len() as f64;
|
|
DurationSummary {
|
|
avg_ms: sum_f / len_f,
|
|
p50_ms: percentile(all, 50),
|
|
p95_ms: percentile(all, 95),
|
|
}
|
|
}
|
|
|
|
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation, clippy::cast_sign_loss)]
|
|
fn percentile(sorted: &[i64], pct: u8) -> f64 {
|
|
if sorted.is_empty() {
|
|
return 0.0;
|
|
}
|
|
if sorted.len() == 1 {
|
|
return sorted[0] as f64;
|
|
}
|
|
// Nearest-rank, clamped.
|
|
let rank = ((f64::from(pct) / 100.0) * (sorted.len() as f64 - 1.0)).round() as usize;
|
|
sorted[rank.min(sorted.len() - 1)] as f64
|
|
}
|
|
|
|
fn top_n(map: HashMap<String, u64>, n: usize) -> Vec<KeyCount> {
|
|
let mut v: Vec<KeyCount> = map
|
|
.into_iter()
|
|
.map(|(key, count)| KeyCount { key, count })
|
|
.collect();
|
|
v.sort_unstable_by(|a, b| b.count.cmp(&a.count).then_with(|| a.key.cmp(&b.key)));
|
|
v.truncate(n);
|
|
v
|
|
}
|
|
|
|
fn u64_from_i64(v: i64) -> u64 {
|
|
u64::try_from(v).unwrap_or(0)
|
|
}
|
|
|
|
fn now_secs() -> i64 {
|
|
std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.map_or(0, |d| i64::try_from(d.as_secs()).unwrap_or(i64::MAX))
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use rusqlite::params;
|
|
use std::sync::atomic::{AtomicU32, Ordering};
|
|
|
|
static SEQ: AtomicU32 = AtomicU32::new(0);
|
|
|
|
fn tmp_db() -> PathBuf {
|
|
let n = SEQ.fetch_add(1, Ordering::SeqCst);
|
|
let pid = std::process::id();
|
|
std::env::temp_dir().join(format!("hyperhive-stats-test-{pid}-{n}.sqlite"))
|
|
}
|
|
|
|
fn seed_db(path: &Path, rows: &[(i64, i64, &str, &str, &str, &str)]) {
|
|
let conn = Connection::open(path).unwrap();
|
|
conn.execute_batch(
|
|
"CREATE TABLE turn_stats (
|
|
id INTEGER PRIMARY KEY,
|
|
started_at INTEGER NOT NULL,
|
|
ended_at INTEGER NOT NULL,
|
|
duration_ms INTEGER NOT NULL,
|
|
model TEXT NOT NULL,
|
|
wake_from TEXT NOT NULL,
|
|
input_tokens INTEGER NOT NULL DEFAULT 0,
|
|
output_tokens INTEGER NOT NULL DEFAULT 0,
|
|
cache_read_input_tokens INTEGER NOT NULL DEFAULT 0,
|
|
cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0,
|
|
last_input_tokens INTEGER NOT NULL DEFAULT 0,
|
|
last_output_tokens INTEGER NOT NULL DEFAULT 0,
|
|
last_cache_read_input_tokens INTEGER NOT NULL DEFAULT 0,
|
|
last_cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0,
|
|
tool_call_count INTEGER NOT NULL DEFAULT 0,
|
|
tool_call_breakdown_json TEXT,
|
|
open_threads_count INTEGER,
|
|
open_reminders_count INTEGER,
|
|
result_kind TEXT NOT NULL,
|
|
note TEXT
|
|
);",
|
|
)
|
|
.unwrap();
|
|
for (started, dur, model, wake, result, tools_json) in rows {
|
|
conn.execute(
|
|
"INSERT INTO turn_stats
|
|
(started_at, ended_at, duration_ms, model, wake_from,
|
|
last_input_tokens, tool_call_breakdown_json, result_kind)
|
|
VALUES (?1, ?2, ?3, ?4, ?5, 1000, ?6, ?7)",
|
|
params![started, started + dur / 1000, dur, model, wake, tools_json, result],
|
|
)
|
|
.unwrap();
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn snapshot_aggregates_rows() {
|
|
let db = tmp_db();
|
|
let _ = std::fs::remove_file(&db);
|
|
let now = now_secs();
|
|
seed_db(
|
|
&db,
|
|
&[
|
|
(now - 600, 5_000, "opus", "recv", "ok", r#"{"Read":2,"Bash":1}"#),
|
|
(now - 300, 10_000, "opus", "recv", "ok", r#"{"Read":3}"#),
|
|
(now - 100, 20_000, "sonnet", "operator", "failed", "{}"),
|
|
],
|
|
);
|
|
let s = snapshot(&db, Window::Day).unwrap();
|
|
assert_eq!(s.turn_count, 3);
|
|
assert_eq!(s.window, "24h");
|
|
assert_eq!(s.bucket_seconds, 3600);
|
|
let tool_map: HashMap<_, _> = s
|
|
.tool_breakdown
|
|
.iter()
|
|
.map(|kc| (kc.key.clone(), kc.count))
|
|
.collect();
|
|
assert_eq!(tool_map.get("Read").copied(), Some(5));
|
|
assert_eq!(tool_map.get("Bash").copied(), Some(1));
|
|
let wake_map: HashMap<_, _> = s
|
|
.wake_mix
|
|
.iter()
|
|
.map(|kc| (kc.key.clone(), kc.count))
|
|
.collect();
|
|
assert_eq!(wake_map.get("recv").copied(), Some(2));
|
|
assert_eq!(wake_map.get("operator").copied(), Some(1));
|
|
let result_map: HashMap<_, _> = s
|
|
.result_mix
|
|
.iter()
|
|
.map(|kc| (kc.key.clone(), kc.count))
|
|
.collect();
|
|
assert_eq!(result_map.get("ok").copied(), Some(2));
|
|
assert_eq!(result_map.get("failed").copied(), Some(1));
|
|
// Model breakdown: 2 opus + 1 sonnet, all in the same hour
|
|
// bucket given the 24h window.
|
|
assert_eq!(s.models, vec!["opus".to_string(), "sonnet".to_string()]);
|
|
let mut model_totals: HashMap<String, u64> = HashMap::new();
|
|
for b in &s.buckets {
|
|
for (k, v) in &b.model_counts {
|
|
*model_totals.entry(k.clone()).or_insert(0) += v;
|
|
}
|
|
}
|
|
assert_eq!(model_totals.get("opus").copied(), Some(2));
|
|
assert_eq!(model_totals.get("sonnet").copied(), Some(1));
|
|
// Durations: [5000, 10000, 20000] → avg ≈ 11666.67, p50 = 10000, p95 ~ 20000
|
|
assert!((s.duration_summary.avg_ms - 11_666.666_666_666_666).abs() < 1.0);
|
|
assert!((s.duration_summary.p50_ms - 10_000.0).abs() < 1.0);
|
|
assert!((s.duration_summary.p95_ms - 20_000.0).abs() < 1.0);
|
|
}
|
|
|
|
#[test]
|
|
fn empty_window_still_paints_buckets() {
|
|
let db = tmp_db();
|
|
let _ = std::fs::remove_file(&db);
|
|
seed_db(&db, &[]);
|
|
let s = snapshot(&db, Window::Day).unwrap();
|
|
assert_eq!(s.turn_count, 0);
|
|
// 24h / 1h buckets = ~24-25 buckets covering the window.
|
|
assert!(s.buckets.len() >= 24);
|
|
assert!(s.buckets.iter().all(|b| b.turn_count == 0));
|
|
}
|
|
|
|
#[test]
|
|
fn week_uses_daily_buckets() {
|
|
let db = tmp_db();
|
|
let _ = std::fs::remove_file(&db);
|
|
seed_db(&db, &[]);
|
|
let s = snapshot(&db, Window::Week).unwrap();
|
|
assert_eq!(s.window, "7d");
|
|
assert_eq!(s.bucket_seconds, 86_400);
|
|
assert!(s.buckets.len() >= 7);
|
|
}
|
|
}
|