From 8f5752980fc28aca3d55ba4db2bc764c3f78cce8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?m=C3=BCde?= Date: Sun, 17 May 2026 23:00:41 +0200 Subject: [PATCH] turn_stats: per-turn analytics sink MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit new sqlite table at /state/hyperhive-turn-stats.sqlite on each agent's state dir. one row per claude turn captures identity (model, wake_from, result_kind), timing (started/ended_at, duration_ms), cost (input/output/cache_read/cache_creation token counts), behaviour (tool_call_count + per-tool breakdown JSON), and post-turn snapshot metrics (open_threads_count, open_reminders_count). wire additions: - AgentRequest/ManagerRequest::CountPendingReminders + Broker::count_pending_reminders_for(agent) - Bus::observe_stream + take_tool_calls — pumps the existing stdout stream-json, picks out tool_use blocks, accumulates per turn. bin loops fold the breakdown into each row. - TurnStats::open_default + TurnStatRow + record() — best-effort inserts; failures log + don't block the harness. both ag3nt and m1nd bins capture started_at + duration via Instant::elapsed, fetch open-thread + reminder counts from hive-c0re via the existing socket (post-turn, best-effort), and record one row at turn_end. record_kind splits ok / failed / prompt_too_long; failures carry the error message in note. todo entries for host-side vacuum sweep + reading the table back into agent/dashboard badges. --- TODO.md | 4 +- hive-ag3nt/src/bin/hive-ag3nt.rs | 110 ++++++++++++++++++++- hive-ag3nt/src/bin/hive-m1nd.rs | 107 +++++++++++++++++++- hive-ag3nt/src/events.rs | 44 +++++++++ hive-ag3nt/src/lib.rs | 1 + hive-ag3nt/src/mcp.rs | 7 ++ hive-ag3nt/src/turn.rs | 1 + hive-ag3nt/src/turn_stats.rs | 163 +++++++++++++++++++++++++++++++ hive-c0re/src/agent_server.rs | 8 ++ hive-c0re/src/broker.rs | 13 +++ hive-c0re/src/manager_server.rs | 8 ++ hive-sh4re/src/lib.rs | 13 +++ 12 files changed, 476 insertions(+), 3 deletions(-) create mode 100644 hive-ag3nt/src/turn_stats.rs diff --git a/TODO.md b/TODO.md index f5704cf..bbcb778 100644 --- a/TODO.md +++ b/TODO.md @@ -75,7 +75,9 @@ how often the friction bites in normal use. ## Telemetry -- **Per-turn stats log**: persist one row per claude turn in a new sqlite table on the per-agent state dir (or the host broker DB, indexed by agent). Columns: `started_at`, `ended_at`, `duration_ms`, `model`, `input_tokens`, `output_tokens`, `cache_read_input_tokens`, `cache_creation_input_tokens`, `tool_call_count`, `tool_call_breakdown` (JSON: `{Read: 12, Bash: 3, ...}`), `bytes_streamed`, `wake_reason` (recv'd message / reminder / operator-kick / manual), `result_kind` (ok / cancelled / failed-mid-turn / compacted), `note` (e.g. failure reason). Powers: per-agent dashboards (avg turn time over time, tool-usage histogram, cost projections from token counts × model rate), debugging stuck loops (look for repeated identical wake_reason + zero tool calls), and operator-visible "this is what your spend looked like this week" rollups. Source data is already mostly in the harness's `TurnState` + the per-event bus; just needs a sink. Keep a retention sweep (host-side) so the table doesn't grow forever. +- **Per-turn stats: host-side vacuum sweep**: the sink writes to `/state/hyperhive-turn-stats.sqlite` on each agent's state dir; needs a periodic retention sweep mirroring `events_vacuum.rs` so the table doesn't grow forever. Default keep-window: 90 days (turn-stats are denser than events but smaller per-row, ~200B each). +- **Surface per-turn stats on the agent web UI**: badges sourced from the new sink — `open_threads` count chip, `open_reminders` count chip, "N turns today" chip, rolling tool-call histogram tooltip on the model chip. Both `open_threads` and `open_reminders` are already columns on every row; the badge just reads the latest. The richer histograms read across rows. +- **Stats UI on the main dashboard**: per-agent rollups (avg turn duration, tokens-since-boot, top 5 tools) on the container row. Same data source, host-side aggregation query. ## Bugs diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index ed2495f..43b76b7 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -8,6 +8,7 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use hive_ag3nt::events::{Bus, LiveEvent, TurnState}; use hive_ag3nt::login::{self, LoginState}; +use hive_ag3nt::turn_stats::{TurnStatRow, TurnStats}; use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui}; use hive_sh4re::{AgentRequest, AgentResponse}; @@ -72,6 +73,7 @@ async fn main() -> Result<()> { tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "harness boot"); let login_state = Arc::new(Mutex::new(initial)); let bus = Bus::new(); + let stats = TurnStats::open_default(); let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Agent).await?; let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(())); plugins::install_configured(&cli.socket, Some("manager")).await; @@ -91,6 +93,7 @@ async fn main() -> Result<()> { Duration::from_millis(poll_ms), login_state, bus, + stats, &files, turn_lock, &label, @@ -107,6 +110,7 @@ async fn main() -> Result<()> { Duration::from_millis(poll_ms), login_state, bus, + stats, &files, turn_lock, &label, @@ -143,6 +147,7 @@ async fn serve( interval: Duration, state: Arc>, bus: Bus, + stats: Option, files: &turn::TurnFiles, turn_lock: TurnLock, label: &str, @@ -172,6 +177,9 @@ async fn serve( unread, }); bus.set_state(TurnState::Thinking); + let started_at = now_unix(); + let started_instant = std::time::Instant::now(); + let model_at_start = bus.model(); let prompt = format_wake_prompt(&from, &body, unread); let outcome = { let _guard = turn_lock.lock().await; @@ -187,6 +195,24 @@ async fn serve( if let turn::TurnOutcome::Failed(e) = &outcome { notify_manager_of_failure(socket, label, e).await; } + if let Some(s) = &stats { + let ended_at = now_unix(); + let duration_ms = + i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX); + let (open_threads, open_reminders) = fetch_agent_post_turn_counts(socket).await; + let row = build_row( + started_at, + ended_at, + duration_ms, + model_at_start, + from.clone(), + &outcome, + &bus, + open_threads, + open_reminders, + ); + s.record(&row); + } // After turn completes, check if there are pending messages waiting. // If so, immediately process them instead of blocking on recv(). @@ -209,7 +235,8 @@ async fn serve( | AgentResponse::Status { .. } | AgentResponse::Recent { .. } | AgentResponse::QuestionQueued { .. } - | AgentResponse::OpenThreads { .. }, + | AgentResponse::OpenThreads { .. } + | AgentResponse::PendingRemindersCount { .. }, ) => { tracing::warn!("recv produced unexpected response kind"); } @@ -271,3 +298,84 @@ async fn inbox_unread(socket: &Path) -> u64 { _ => 0, } } + +fn now_unix() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + .unwrap_or(0) +} + +/// Best-effort: ask hive-c0re for this agent's open thread count + pending +/// reminder count, after the turn finishes. Either roundtrip can fail +/// (transport hiccup, race with hive-c0re restart) — in those cases we +/// just drop a `None` into the stats row rather than blocking the loop. +async fn fetch_agent_post_turn_counts(socket: &Path) -> (Option, Option) { + let threads = match client::request::<_, AgentResponse>( + socket, + &AgentRequest::GetOpenThreads, + ) + .await + { + Ok(AgentResponse::OpenThreads { threads }) => u64::try_from(threads.len()).ok(), + _ => None, + }; + let reminders = match client::request::<_, AgentResponse>( + socket, + &AgentRequest::CountPendingReminders, + ) + .await + { + Ok(AgentResponse::PendingRemindersCount { count }) => Some(count), + _ => None, + }; + (threads, reminders) +} + +/// Assemble a TurnStatRow from the harness's per-turn state. Shared +/// shape between the agent + manager bin loops (each lives in its own +/// crate root so this helper is duplicated; the savings of a shared +/// module aren't worth the cross-crate ceremony at this size). +fn build_row( + started_at: i64, + ended_at: i64, + duration_ms: i64, + model: String, + wake_from: String, + outcome: &turn::TurnOutcome, + bus: &Bus, + open_threads_count: Option, + open_reminders_count: Option, +) -> TurnStatRow { + let usage = bus.last_usage().unwrap_or_default(); + let tool_calls = bus.take_tool_calls(); + let tool_call_count: u64 = tool_calls.values().copied().sum(); + let tool_call_breakdown_json = if tool_calls.is_empty() { + None + } else { + serde_json::to_string(&tool_calls).ok() + }; + let (result_kind, note) = match outcome { + turn::TurnOutcome::Ok => ("ok", None), + turn::TurnOutcome::PromptTooLong => ("prompt_too_long", None), + turn::TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))), + }; + TurnStatRow { + started_at, + ended_at, + duration_ms, + model, + wake_from, + input_tokens: usage.input_tokens, + output_tokens: usage.output_tokens, + cache_read_input_tokens: usage.cache_read_input_tokens, + cache_creation_input_tokens: usage.cache_creation_input_tokens, + tool_call_count, + tool_call_breakdown_json, + open_threads_count, + open_reminders_count, + result_kind, + note, + } +} diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 79e5614..4d8591c 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -12,6 +12,7 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use hive_ag3nt::events::{Bus, LiveEvent, TurnState}; use hive_ag3nt::login::{self, LoginState}; +use hive_ag3nt::turn_stats::{TurnStatRow, TurnStats}; use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui}; use hive_sh4re::{HelperEvent, ManagerRequest, ManagerResponse, SYSTEM_SENDER}; @@ -62,6 +63,7 @@ async fn main() -> Result<()> { tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "hm1nd boot"); let login_state = Arc::new(Mutex::new(initial)); let bus = Bus::new(); + let stats = TurnStats::open_default(); let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Manager).await?; let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(())); plugins::install_configured(&cli.socket, None).await; @@ -80,6 +82,7 @@ async fn main() -> Result<()> { &cli.socket, Duration::from_millis(poll_ms), bus, + stats, &files, turn_lock, ) @@ -91,6 +94,7 @@ async fn main() -> Result<()> { &cli.socket, Duration::from_millis(poll_ms), bus, + stats, &files, turn_lock, ) @@ -106,6 +110,7 @@ async fn serve( socket: &Path, interval: Duration, bus: Bus, + stats: Option, files: &turn::TurnFiles, turn_lock: TurnLock, ) -> Result<()> { @@ -152,12 +157,34 @@ async fn serve( }); let prompt = format_wake_prompt(&from, &body, unread); bus.set_state(TurnState::Thinking); + let started_at = now_unix(); + let started_instant = std::time::Instant::now(); + let model_at_start = bus.model(); let outcome = { let _guard = turn_lock.lock().await; turn::drive_turn(&prompt, files, &bus).await }; turn::emit_turn_end(&bus, &outcome); bus.set_state(TurnState::Idle); + if let Some(s) = &stats { + let ended_at = now_unix(); + let duration_ms = + i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX); + let (open_threads, open_reminders) = + fetch_manager_post_turn_counts(socket).await; + let row = build_row( + started_at, + ended_at, + duration_ms, + model_at_start, + from.clone(), + &outcome, + &bus, + open_threads, + open_reminders, + ); + s.record(&row); + } // Check for messages that arrived during the turn and loop // immediately if any are waiting — mirrors hive-ag3nt behaviour. let pending = inbox_unread(socket).await; @@ -176,7 +203,8 @@ async fn serve( | ManagerResponse::QuestionQueued { .. } | ManagerResponse::Recent { .. } | ManagerResponse::Logs { .. } - | ManagerResponse::OpenThreads { .. }, + | ManagerResponse::OpenThreads { .. } + | ManagerResponse::PendingRemindersCount { .. }, ) => { tracing::warn!("recv produced unexpected response kind"); } @@ -211,3 +239,80 @@ async fn inbox_unread(socket: &Path) -> u64 { _ => 0, } } + +fn now_unix() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + .unwrap_or(0) +} + +/// Manager-flavour equivalent of the agent helper. Mirror shape, just +/// uses ManagerRequest/ManagerResponse instead of the agent variants. +async fn fetch_manager_post_turn_counts(socket: &Path) -> (Option, Option) { + let threads = match client::request::<_, ManagerResponse>( + socket, + &ManagerRequest::GetOpenThreads, + ) + .await + { + Ok(ManagerResponse::OpenThreads { threads }) => u64::try_from(threads.len()).ok(), + _ => None, + }; + let reminders = match client::request::<_, ManagerResponse>( + socket, + &ManagerRequest::CountPendingReminders, + ) + .await + { + Ok(ManagerResponse::PendingRemindersCount { count }) => Some(count), + _ => None, + }; + (threads, reminders) +} + +/// Manager flavour of the agent's build_row helper. Duplicated rather +/// than shared to keep each bin self-contained at this size. +fn build_row( + started_at: i64, + ended_at: i64, + duration_ms: i64, + model: String, + wake_from: String, + outcome: &turn::TurnOutcome, + bus: &Bus, + open_threads_count: Option, + open_reminders_count: Option, +) -> TurnStatRow { + let usage = bus.last_usage().unwrap_or_default(); + let tool_calls = bus.take_tool_calls(); + let tool_call_count: u64 = tool_calls.values().copied().sum(); + let tool_call_breakdown_json = if tool_calls.is_empty() { + None + } else { + serde_json::to_string(&tool_calls).ok() + }; + let (result_kind, note) = match outcome { + turn::TurnOutcome::Ok => ("ok", None), + turn::TurnOutcome::PromptTooLong => ("prompt_too_long", None), + turn::TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))), + }; + TurnStatRow { + started_at, + ended_at, + duration_ms, + model, + wake_from, + input_tokens: usage.input_tokens, + output_tokens: usage.output_tokens, + cache_read_input_tokens: usage.cache_read_input_tokens, + cache_creation_input_tokens: usage.cache_creation_input_tokens, + tool_call_count, + tool_call_breakdown_json, + open_threads_count, + open_reminders_count, + result_kind, + note, + } +} diff --git a/hive-ag3nt/src/events.rs b/hive-ag3nt/src/events.rs index a99f49c..df1ada9 100644 --- a/hive-ag3nt/src/events.rs +++ b/hive-ag3nt/src/events.rs @@ -294,6 +294,12 @@ pub struct Bus { /// behavior. Atomic so the consumer can take-and-clear without a /// lock. skip_continue_once: Arc, + /// Per-turn tool-call counter. Reset by the bin loop between + /// turns via `take_tool_calls`. Populated by `observe_stream` as + /// the stdout pump parses each stream-json line. Powers the + /// `tool_call_count` + `tool_call_breakdown_json` columns on the + /// per-turn stats sink. + tool_calls: Arc>>, } impl Bus { @@ -319,6 +325,7 @@ impl Bus { model: Arc::new(Mutex::new(initial_model)), last_usage: Arc::new(Mutex::new(None)), skip_continue_once: Arc::new(AtomicBool::new(false)), + tool_calls: Arc::new(Mutex::new(std::collections::HashMap::new())), } } @@ -377,6 +384,43 @@ impl Bus { self.emit(LiveEvent::TokenUsageChanged { usage }); } + /// Walk a stream-json value for `tool_use` blocks and bump the + /// per-turn counter for each one we find. Called by the stdout + /// pump on every parsed line. Cheap when the line isn't an + /// assistant message — the field-check short-circuits. + pub fn observe_stream(&self, v: &serde_json::Value) { + if v.get("type").and_then(|t| t.as_str()) != Some("assistant") { + return; + } + let Some(content) = v + .get("message") + .and_then(|m| m.get("content")) + .and_then(|c| c.as_array()) + else { + return; + }; + let mut counts = self.tool_calls.lock().unwrap(); + for block in content { + if block.get("type").and_then(|t| t.as_str()) != Some("tool_use") { + continue; + } + let name = block + .get("name") + .and_then(|n| n.as_str()) + .unwrap_or("") + .to_owned(); + *counts.entry(name).or_insert(0) += 1; + } + } + + /// Snapshot + clear the per-turn tool-call counter. The harness + /// calls this between turns to fold the breakdown into a + /// `turn_stats` row, then start the next turn with an empty map. + #[must_use] + pub fn take_tool_calls(&self) -> std::collections::HashMap { + std::mem::take(&mut *self.tool_calls.lock().unwrap()) + } + /// Last known token usage, or `None` if no turn has completed yet. #[must_use] pub fn last_usage(&self) -> Option { diff --git a/hive-ag3nt/src/lib.rs b/hive-ag3nt/src/lib.rs index 22ced06..219e644 100644 --- a/hive-ag3nt/src/lib.rs +++ b/hive-ag3nt/src/lib.rs @@ -9,6 +9,7 @@ pub mod mcp; pub mod paths; pub mod plugins; pub mod turn; +pub mod turn_stats; pub mod web_ui; /// Default socket path inside the container — bind-mounted by `hive-c0re`. diff --git a/hive-ag3nt/src/mcp.rs b/hive-ag3nt/src/mcp.rs index 082f22e..b32f884 100644 --- a/hive-ag3nt/src/mcp.rs +++ b/hive-ag3nt/src/mcp.rs @@ -41,6 +41,7 @@ pub enum SocketReply { Recent(Vec), Logs(String), OpenThreads(Vec), + PendingRemindersCount(u64), } impl From for SocketReply { @@ -54,6 +55,9 @@ impl From for SocketReply { hive_sh4re::AgentResponse::Recent { rows } => Self::Recent(rows), hive_sh4re::AgentResponse::QuestionQueued { id } => Self::QuestionQueued(id), hive_sh4re::AgentResponse::OpenThreads { threads } => Self::OpenThreads(threads), + hive_sh4re::AgentResponse::PendingRemindersCount { count } => { + Self::PendingRemindersCount(count) + } } } } @@ -70,6 +74,9 @@ impl From for SocketReply { hive_sh4re::ManagerResponse::Recent { rows } => Self::Recent(rows), hive_sh4re::ManagerResponse::Logs { content } => Self::Logs(content), hive_sh4re::ManagerResponse::OpenThreads { threads } => Self::OpenThreads(threads), + hive_sh4re::ManagerResponse::PendingRemindersCount { count } => { + Self::PendingRemindersCount(count) + } } } } diff --git a/hive-ag3nt/src/turn.rs b/hive-ag3nt/src/turn.rs index a2f73ca..901effd 100644 --- a/hive-ag3nt/src/turn.rs +++ b/hive-ag3nt/src/turn.rs @@ -288,6 +288,7 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result if let Some(usage) = crate::events::TokenUsage::from_stream_event(&v) { bus_out.record_usage(usage); } + bus_out.observe_stream(&v); bus_out.emit(LiveEvent::Stream(v)); } Err(_) => bus_out.emit(LiveEvent::Note { diff --git a/hive-ag3nt/src/turn_stats.rs b/hive-ag3nt/src/turn_stats.rs new file mode 100644 index 0000000..66edfc2 --- /dev/null +++ b/hive-ag3nt/src/turn_stats.rs @@ -0,0 +1,163 @@ +//! Per-turn analytics sink. One sqlite row per claude turn captures: +//! identity (model, wake_from, result_kind), timing (started_at, +//! ended_at, duration_ms), cost (token counts), behaviour (tool-call +//! count + per-tool breakdown), and post-turn snapshot metrics +//! (open_threads_count, open_reminders_count). +//! +//! Lives next to `hyperhive-events.sqlite` in the agent's state dir +//! so the host-side state vacuum sweep can reach both. Schema is +//! intentionally append-only — every column has a default so future +//! additions don't break old readers; new columns land via +//! `ALTER TABLE ... ADD COLUMN ... DEFAULT ...` in the migration +//! block. +//! +//! Writes are best-effort: a failed insert logs a warning and lets +//! the turn loop continue. The next turn either succeeds or the +//! operator sees the journal trail. + +use std::path::{Path, PathBuf}; +use std::sync::Mutex; + +use anyhow::{Context, Result}; +use rusqlite::{Connection, params}; + +/// SQL bootstrap. CREATE TABLE IF NOT EXISTS so first-boot agents +/// and existing ones converge on the same shape; ALTER-style +/// migrations land here as additional statements once we have any. +const SCHEMA: &str = " +CREATE TABLE IF NOT EXISTS turn_stats ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + started_at INTEGER NOT NULL, + ended_at INTEGER NOT NULL, + duration_ms INTEGER NOT NULL, + model TEXT NOT NULL, + wake_from TEXT NOT NULL, + input_tokens INTEGER NOT NULL DEFAULT 0, + output_tokens INTEGER NOT NULL DEFAULT 0, + cache_read_input_tokens INTEGER NOT NULL DEFAULT 0, + cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0, + tool_call_count INTEGER NOT NULL DEFAULT 0, + tool_call_breakdown_json TEXT, + open_threads_count INTEGER, + open_reminders_count INTEGER, + result_kind TEXT NOT NULL, + note TEXT +); +CREATE INDEX IF NOT EXISTS idx_turn_stats_started + ON turn_stats (started_at DESC); +"; + +/// One row to be inserted. `Option`-wrapped fields default to NULL +/// when the harness couldn't gather them (e.g. socket roundtrip for +/// open_threads failed) so a partial row beats no row. +#[derive(Debug, Clone)] +pub struct TurnStatRow { + pub started_at: i64, + pub ended_at: i64, + pub duration_ms: i64, + pub model: String, + pub wake_from: String, + pub input_tokens: u64, + pub output_tokens: u64, + pub cache_read_input_tokens: u64, + pub cache_creation_input_tokens: u64, + pub tool_call_count: u64, + /// Per-tool breakdown as JSON: `{"Read":12,"Bash":3,...}`. None + /// when no tools were called (saves a sqlite write of `"{}"`). + pub tool_call_breakdown_json: Option, + pub open_threads_count: Option, + pub open_reminders_count: Option, + /// `"ok" | "failed" | "prompt_too_long"`. + pub result_kind: &'static str, + pub note: Option, +} + +/// Thin sqlite wrapper. Cloning is cheap (Arc-shared connection). +#[derive(Clone)] +pub struct TurnStats { + inner: std::sync::Arc>, +} + +impl TurnStats { + /// Open the per-agent stats db, creating the file + schema if + /// missing. Returns `None` when the db can't be opened (read-only + /// fs in tests, missing state dir) — the harness logs and + /// continues without a sink rather than failing the turn loop. + #[must_use] + pub fn open_default() -> Option { + let path = default_path(); + match Self::open(&path) { + Ok(s) => Some(s), + Err(e) => { + tracing::warn!( + error = ?e, + path = %path.display(), + "turn_stats: open failed; per-turn analytics disabled" + ); + None + } + } + } + + fn open(path: &Path) -> Result { + if let Some(parent) = path.parent() { + let _ = std::fs::create_dir_all(parent); + } + let conn = Connection::open(path) + .with_context(|| format!("open turn_stats db {}", path.display()))?; + conn.execute_batch(SCHEMA) + .context("apply turn_stats schema")?; + Ok(Self { + inner: std::sync::Arc::new(Mutex::new(conn)), + }) + } + + /// Insert a row. Best-effort — logs + swallows errors so a sqlite + /// hiccup (locked db, full disk) doesn't crash the harness. + pub fn record(&self, row: &TurnStatRow) { + let conn = self.inner.lock().unwrap(); + let res = conn.execute( + "INSERT INTO turn_stats ( + started_at, ended_at, duration_ms, model, wake_from, + input_tokens, output_tokens, + cache_read_input_tokens, cache_creation_input_tokens, + tool_call_count, tool_call_breakdown_json, + open_threads_count, open_reminders_count, + result_kind, note + ) VALUES ( + ?1, ?2, ?3, ?4, ?5, + ?6, ?7, + ?8, ?9, + ?10, ?11, + ?12, ?13, + ?14, ?15 + )", + params![ + row.started_at, + row.ended_at, + row.duration_ms, + row.model, + row.wake_from, + i64::try_from(row.input_tokens).unwrap_or(i64::MAX), + i64::try_from(row.output_tokens).unwrap_or(i64::MAX), + i64::try_from(row.cache_read_input_tokens).unwrap_or(i64::MAX), + i64::try_from(row.cache_creation_input_tokens).unwrap_or(i64::MAX), + i64::try_from(row.tool_call_count).unwrap_or(i64::MAX), + row.tool_call_breakdown_json, + row.open_threads_count + .map(|n| i64::try_from(n).unwrap_or(i64::MAX)), + row.open_reminders_count + .map(|n| i64::try_from(n).unwrap_or(i64::MAX)), + row.result_kind, + row.note, + ], + ); + if let Err(e) = res { + tracing::warn!(error = ?e, "turn_stats: insert failed"); + } + } +} + +fn default_path() -> PathBuf { + crate::paths::state_dir().join("hyperhive-turn-stats.sqlite") +} diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index 4bf0a18..1d2cd00 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -180,6 +180,14 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> message: format!("{e:#}"), }, }, + AgentRequest::CountPendingReminders => { + match coord.broker.count_pending_reminders_for(agent) { + Ok(count) => AgentResponse::PendingRemindersCount { count }, + Err(e) => AgentResponse::Err { + message: format!("{e:#}"), + }, + } + } } } diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index e20e11d..afe14f7 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -324,6 +324,19 @@ impl Broker { .context("list pending reminders") } + /// Count this agent's still-pending (un-delivered) reminders. + /// Used by the per-turn stats sink for a cheap "what was queued + /// at turn-end" snapshot. + pub fn count_pending_reminders_for(&self, agent: &str) -> Result { + let conn = self.conn.lock().unwrap(); + let n: i64 = conn.query_row( + "SELECT COUNT(*) FROM reminders WHERE agent = ?1 AND sent_at IS NULL", + params![agent], + |row| row.get(0), + )?; + Ok(u64::try_from(n).unwrap_or(0)) + } + /// Delete a reminder by id. Returns the number of rows removed (0 /// when the id never existed or was already delivered). Hard /// delete rather than soft so the row doesn't linger and confuse a diff --git a/hive-c0re/src/manager_server.rs b/hive-c0re/src/manager_server.rs index ead1fe5..b707032 100644 --- a/hive-c0re/src/manager_server.rs +++ b/hive-c0re/src/manager_server.rs @@ -335,6 +335,14 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResp message: format!("{e:#}"), }, }, + ManagerRequest::CountPendingReminders => { + match coord.broker.count_pending_reminders_for(MANAGER_AGENT) { + Ok(count) => ManagerResponse::PendingRemindersCount { count }, + Err(e) => ManagerResponse::Err { + message: format!("{e:#}"), + }, + } + } } } diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index 9da4346..2a9e895 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -309,6 +309,10 @@ pub enum AgentRequest { /// manager); questions surface where the agent is `asker` or /// `target`. Cheap O(n) sweep server-side — no caching. GetOpenThreads, + /// Count of this agent's pending (un-delivered) reminders. Used + /// by the harness's per-turn stats sink to snapshot "what was + /// queued at turn-end time" without paying for a full list. + CountPendingReminders, } /// Responses on a per-agent socket. @@ -333,6 +337,8 @@ pub enum AgentResponse { /// `GetOpenThreads` result: list of loose ends pending against /// this agent. Ordered newest-first within each kind. OpenThreads { threads: Vec }, + /// `CountPendingReminders` result. + PendingRemindersCount { count: u64 }, } // ----------------------------------------------------------------------------- @@ -596,6 +602,9 @@ pub enum ManagerRequest { /// sub-agent surface is `AgentRequest::GetOpenThreads` which /// only returns rows where the agent itself is asker / target. GetOpenThreads, + /// Count of the manager's own pending reminders. Mirror of + /// `AgentRequest::CountPendingReminders` on the manager surface. + CountPendingReminders, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -634,4 +643,8 @@ pub enum ManagerResponse { OpenThreads { threads: Vec, }, + /// `CountPendingReminders` result. + PendingRemindersCount { + count: u64, + }, }