turn_stats: per-turn analytics sink
new sqlite table at /state/hyperhive-turn-stats.sqlite on each agent's state dir. one row per claude turn captures identity (model, wake_from, result_kind), timing (started/ended_at, duration_ms), cost (input/output/cache_read/cache_creation token counts), behaviour (tool_call_count + per-tool breakdown JSON), and post-turn snapshot metrics (open_threads_count, open_reminders_count). wire additions: - AgentRequest/ManagerRequest::CountPendingReminders + Broker::count_pending_reminders_for(agent) - Bus::observe_stream + take_tool_calls — pumps the existing stdout stream-json, picks out tool_use blocks, accumulates per turn. bin loops fold the breakdown into each row. - TurnStats::open_default + TurnStatRow + record() — best-effort inserts; failures log + don't block the harness. both ag3nt and m1nd bins capture started_at + duration via Instant::elapsed, fetch open-thread + reminder counts from hive-c0re via the existing socket (post-turn, best-effort), and record one row at turn_end. record_kind splits ok / failed / prompt_too_long; failures carry the error message in note. todo entries for host-side vacuum sweep + reading the table back into agent/dashboard badges.
This commit is contained in:
parent
dc1ce1f236
commit
8f5752980f
12 changed files with 476 additions and 3 deletions
4
TODO.md
4
TODO.md
|
|
@ -75,7 +75,9 @@ how often the friction bites in normal use.
|
||||||
|
|
||||||
## Telemetry
|
## Telemetry
|
||||||
|
|
||||||
- **Per-turn stats log**: persist one row per claude turn in a new sqlite table on the per-agent state dir (or the host broker DB, indexed by agent). Columns: `started_at`, `ended_at`, `duration_ms`, `model`, `input_tokens`, `output_tokens`, `cache_read_input_tokens`, `cache_creation_input_tokens`, `tool_call_count`, `tool_call_breakdown` (JSON: `{Read: 12, Bash: 3, ...}`), `bytes_streamed`, `wake_reason` (recv'd message / reminder / operator-kick / manual), `result_kind` (ok / cancelled / failed-mid-turn / compacted), `note` (e.g. failure reason). Powers: per-agent dashboards (avg turn time over time, tool-usage histogram, cost projections from token counts × model rate), debugging stuck loops (look for repeated identical wake_reason + zero tool calls), and operator-visible "this is what your spend looked like this week" rollups. Source data is already mostly in the harness's `TurnState` + the per-event bus; just needs a sink. Keep a retention sweep (host-side) so the table doesn't grow forever.
|
- **Per-turn stats: host-side vacuum sweep**: the sink writes to `/state/hyperhive-turn-stats.sqlite` on each agent's state dir; needs a periodic retention sweep mirroring `events_vacuum.rs` so the table doesn't grow forever. Default keep-window: 90 days (turn-stats are denser than events but smaller per-row, ~200B each).
|
||||||
|
- **Surface per-turn stats on the agent web UI**: badges sourced from the new sink — `open_threads` count chip, `open_reminders` count chip, "N turns today" chip, rolling tool-call histogram tooltip on the model chip. Both `open_threads` and `open_reminders` are already columns on every row; the badge just reads the latest. The richer histograms read across rows.
|
||||||
|
- **Stats UI on the main dashboard**: per-agent rollups (avg turn duration, tokens-since-boot, top 5 tools) on the container row. Same data source, host-side aggregation query.
|
||||||
|
|
||||||
## Bugs
|
## Bugs
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ use anyhow::Result;
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
use hive_ag3nt::events::{Bus, LiveEvent, TurnState};
|
use hive_ag3nt::events::{Bus, LiveEvent, TurnState};
|
||||||
use hive_ag3nt::login::{self, LoginState};
|
use hive_ag3nt::login::{self, LoginState};
|
||||||
|
use hive_ag3nt::turn_stats::{TurnStatRow, TurnStats};
|
||||||
use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui};
|
use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui};
|
||||||
use hive_sh4re::{AgentRequest, AgentResponse};
|
use hive_sh4re::{AgentRequest, AgentResponse};
|
||||||
|
|
||||||
|
|
@ -72,6 +73,7 @@ async fn main() -> Result<()> {
|
||||||
tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "harness boot");
|
tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "harness boot");
|
||||||
let login_state = Arc::new(Mutex::new(initial));
|
let login_state = Arc::new(Mutex::new(initial));
|
||||||
let bus = Bus::new();
|
let bus = Bus::new();
|
||||||
|
let stats = TurnStats::open_default();
|
||||||
let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Agent).await?;
|
let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Agent).await?;
|
||||||
let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(()));
|
let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(()));
|
||||||
plugins::install_configured(&cli.socket, Some("manager")).await;
|
plugins::install_configured(&cli.socket, Some("manager")).await;
|
||||||
|
|
@ -91,6 +93,7 @@ async fn main() -> Result<()> {
|
||||||
Duration::from_millis(poll_ms),
|
Duration::from_millis(poll_ms),
|
||||||
login_state,
|
login_state,
|
||||||
bus,
|
bus,
|
||||||
|
stats,
|
||||||
&files,
|
&files,
|
||||||
turn_lock,
|
turn_lock,
|
||||||
&label,
|
&label,
|
||||||
|
|
@ -107,6 +110,7 @@ async fn main() -> Result<()> {
|
||||||
Duration::from_millis(poll_ms),
|
Duration::from_millis(poll_ms),
|
||||||
login_state,
|
login_state,
|
||||||
bus,
|
bus,
|
||||||
|
stats,
|
||||||
&files,
|
&files,
|
||||||
turn_lock,
|
turn_lock,
|
||||||
&label,
|
&label,
|
||||||
|
|
@ -143,6 +147,7 @@ async fn serve(
|
||||||
interval: Duration,
|
interval: Duration,
|
||||||
state: Arc<Mutex<LoginState>>,
|
state: Arc<Mutex<LoginState>>,
|
||||||
bus: Bus,
|
bus: Bus,
|
||||||
|
stats: Option<TurnStats>,
|
||||||
files: &turn::TurnFiles,
|
files: &turn::TurnFiles,
|
||||||
turn_lock: TurnLock,
|
turn_lock: TurnLock,
|
||||||
label: &str,
|
label: &str,
|
||||||
|
|
@ -172,6 +177,9 @@ async fn serve(
|
||||||
unread,
|
unread,
|
||||||
});
|
});
|
||||||
bus.set_state(TurnState::Thinking);
|
bus.set_state(TurnState::Thinking);
|
||||||
|
let started_at = now_unix();
|
||||||
|
let started_instant = std::time::Instant::now();
|
||||||
|
let model_at_start = bus.model();
|
||||||
let prompt = format_wake_prompt(&from, &body, unread);
|
let prompt = format_wake_prompt(&from, &body, unread);
|
||||||
let outcome = {
|
let outcome = {
|
||||||
let _guard = turn_lock.lock().await;
|
let _guard = turn_lock.lock().await;
|
||||||
|
|
@ -187,6 +195,24 @@ async fn serve(
|
||||||
if let turn::TurnOutcome::Failed(e) = &outcome {
|
if let turn::TurnOutcome::Failed(e) = &outcome {
|
||||||
notify_manager_of_failure(socket, label, e).await;
|
notify_manager_of_failure(socket, label, e).await;
|
||||||
}
|
}
|
||||||
|
if let Some(s) = &stats {
|
||||||
|
let ended_at = now_unix();
|
||||||
|
let duration_ms =
|
||||||
|
i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX);
|
||||||
|
let (open_threads, open_reminders) = fetch_agent_post_turn_counts(socket).await;
|
||||||
|
let row = build_row(
|
||||||
|
started_at,
|
||||||
|
ended_at,
|
||||||
|
duration_ms,
|
||||||
|
model_at_start,
|
||||||
|
from.clone(),
|
||||||
|
&outcome,
|
||||||
|
&bus,
|
||||||
|
open_threads,
|
||||||
|
open_reminders,
|
||||||
|
);
|
||||||
|
s.record(&row);
|
||||||
|
}
|
||||||
|
|
||||||
// After turn completes, check if there are pending messages waiting.
|
// After turn completes, check if there are pending messages waiting.
|
||||||
// If so, immediately process them instead of blocking on recv().
|
// If so, immediately process them instead of blocking on recv().
|
||||||
|
|
@ -209,7 +235,8 @@ async fn serve(
|
||||||
| AgentResponse::Status { .. }
|
| AgentResponse::Status { .. }
|
||||||
| AgentResponse::Recent { .. }
|
| AgentResponse::Recent { .. }
|
||||||
| AgentResponse::QuestionQueued { .. }
|
| AgentResponse::QuestionQueued { .. }
|
||||||
| AgentResponse::OpenThreads { .. },
|
| AgentResponse::OpenThreads { .. }
|
||||||
|
| AgentResponse::PendingRemindersCount { .. },
|
||||||
) => {
|
) => {
|
||||||
tracing::warn!("recv produced unexpected response kind");
|
tracing::warn!("recv produced unexpected response kind");
|
||||||
}
|
}
|
||||||
|
|
@ -271,3 +298,84 @@ async fn inbox_unread(socket: &Path) -> u64 {
|
||||||
_ => 0,
|
_ => 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn now_unix() -> i64 {
|
||||||
|
std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.ok()
|
||||||
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
||||||
|
.unwrap_or(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Best-effort: ask hive-c0re for this agent's open thread count + pending
|
||||||
|
/// reminder count, after the turn finishes. Either roundtrip can fail
|
||||||
|
/// (transport hiccup, race with hive-c0re restart) — in those cases we
|
||||||
|
/// just drop a `None` into the stats row rather than blocking the loop.
|
||||||
|
async fn fetch_agent_post_turn_counts(socket: &Path) -> (Option<u64>, Option<u64>) {
|
||||||
|
let threads = match client::request::<_, AgentResponse>(
|
||||||
|
socket,
|
||||||
|
&AgentRequest::GetOpenThreads,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(AgentResponse::OpenThreads { threads }) => u64::try_from(threads.len()).ok(),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let reminders = match client::request::<_, AgentResponse>(
|
||||||
|
socket,
|
||||||
|
&AgentRequest::CountPendingReminders,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(AgentResponse::PendingRemindersCount { count }) => Some(count),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
(threads, reminders)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Assemble a TurnStatRow from the harness's per-turn state. Shared
|
||||||
|
/// shape between the agent + manager bin loops (each lives in its own
|
||||||
|
/// crate root so this helper is duplicated; the savings of a shared
|
||||||
|
/// module aren't worth the cross-crate ceremony at this size).
|
||||||
|
fn build_row(
|
||||||
|
started_at: i64,
|
||||||
|
ended_at: i64,
|
||||||
|
duration_ms: i64,
|
||||||
|
model: String,
|
||||||
|
wake_from: String,
|
||||||
|
outcome: &turn::TurnOutcome,
|
||||||
|
bus: &Bus,
|
||||||
|
open_threads_count: Option<u64>,
|
||||||
|
open_reminders_count: Option<u64>,
|
||||||
|
) -> TurnStatRow {
|
||||||
|
let usage = bus.last_usage().unwrap_or_default();
|
||||||
|
let tool_calls = bus.take_tool_calls();
|
||||||
|
let tool_call_count: u64 = tool_calls.values().copied().sum();
|
||||||
|
let tool_call_breakdown_json = if tool_calls.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
serde_json::to_string(&tool_calls).ok()
|
||||||
|
};
|
||||||
|
let (result_kind, note) = match outcome {
|
||||||
|
turn::TurnOutcome::Ok => ("ok", None),
|
||||||
|
turn::TurnOutcome::PromptTooLong => ("prompt_too_long", None),
|
||||||
|
turn::TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))),
|
||||||
|
};
|
||||||
|
TurnStatRow {
|
||||||
|
started_at,
|
||||||
|
ended_at,
|
||||||
|
duration_ms,
|
||||||
|
model,
|
||||||
|
wake_from,
|
||||||
|
input_tokens: usage.input_tokens,
|
||||||
|
output_tokens: usage.output_tokens,
|
||||||
|
cache_read_input_tokens: usage.cache_read_input_tokens,
|
||||||
|
cache_creation_input_tokens: usage.cache_creation_input_tokens,
|
||||||
|
tool_call_count,
|
||||||
|
tool_call_breakdown_json,
|
||||||
|
open_threads_count,
|
||||||
|
open_reminders_count,
|
||||||
|
result_kind,
|
||||||
|
note,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ use anyhow::Result;
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
use hive_ag3nt::events::{Bus, LiveEvent, TurnState};
|
use hive_ag3nt::events::{Bus, LiveEvent, TurnState};
|
||||||
use hive_ag3nt::login::{self, LoginState};
|
use hive_ag3nt::login::{self, LoginState};
|
||||||
|
use hive_ag3nt::turn_stats::{TurnStatRow, TurnStats};
|
||||||
use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui};
|
use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui};
|
||||||
use hive_sh4re::{HelperEvent, ManagerRequest, ManagerResponse, SYSTEM_SENDER};
|
use hive_sh4re::{HelperEvent, ManagerRequest, ManagerResponse, SYSTEM_SENDER};
|
||||||
|
|
||||||
|
|
@ -62,6 +63,7 @@ async fn main() -> Result<()> {
|
||||||
tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "hm1nd boot");
|
tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "hm1nd boot");
|
||||||
let login_state = Arc::new(Mutex::new(initial));
|
let login_state = Arc::new(Mutex::new(initial));
|
||||||
let bus = Bus::new();
|
let bus = Bus::new();
|
||||||
|
let stats = TurnStats::open_default();
|
||||||
let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Manager).await?;
|
let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Manager).await?;
|
||||||
let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(()));
|
let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(()));
|
||||||
plugins::install_configured(&cli.socket, None).await;
|
plugins::install_configured(&cli.socket, None).await;
|
||||||
|
|
@ -80,6 +82,7 @@ async fn main() -> Result<()> {
|
||||||
&cli.socket,
|
&cli.socket,
|
||||||
Duration::from_millis(poll_ms),
|
Duration::from_millis(poll_ms),
|
||||||
bus,
|
bus,
|
||||||
|
stats,
|
||||||
&files,
|
&files,
|
||||||
turn_lock,
|
turn_lock,
|
||||||
)
|
)
|
||||||
|
|
@ -91,6 +94,7 @@ async fn main() -> Result<()> {
|
||||||
&cli.socket,
|
&cli.socket,
|
||||||
Duration::from_millis(poll_ms),
|
Duration::from_millis(poll_ms),
|
||||||
bus,
|
bus,
|
||||||
|
stats,
|
||||||
&files,
|
&files,
|
||||||
turn_lock,
|
turn_lock,
|
||||||
)
|
)
|
||||||
|
|
@ -106,6 +110,7 @@ async fn serve(
|
||||||
socket: &Path,
|
socket: &Path,
|
||||||
interval: Duration,
|
interval: Duration,
|
||||||
bus: Bus,
|
bus: Bus,
|
||||||
|
stats: Option<TurnStats>,
|
||||||
files: &turn::TurnFiles,
|
files: &turn::TurnFiles,
|
||||||
turn_lock: TurnLock,
|
turn_lock: TurnLock,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
|
@ -152,12 +157,34 @@ async fn serve(
|
||||||
});
|
});
|
||||||
let prompt = format_wake_prompt(&from, &body, unread);
|
let prompt = format_wake_prompt(&from, &body, unread);
|
||||||
bus.set_state(TurnState::Thinking);
|
bus.set_state(TurnState::Thinking);
|
||||||
|
let started_at = now_unix();
|
||||||
|
let started_instant = std::time::Instant::now();
|
||||||
|
let model_at_start = bus.model();
|
||||||
let outcome = {
|
let outcome = {
|
||||||
let _guard = turn_lock.lock().await;
|
let _guard = turn_lock.lock().await;
|
||||||
turn::drive_turn(&prompt, files, &bus).await
|
turn::drive_turn(&prompt, files, &bus).await
|
||||||
};
|
};
|
||||||
turn::emit_turn_end(&bus, &outcome);
|
turn::emit_turn_end(&bus, &outcome);
|
||||||
bus.set_state(TurnState::Idle);
|
bus.set_state(TurnState::Idle);
|
||||||
|
if let Some(s) = &stats {
|
||||||
|
let ended_at = now_unix();
|
||||||
|
let duration_ms =
|
||||||
|
i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX);
|
||||||
|
let (open_threads, open_reminders) =
|
||||||
|
fetch_manager_post_turn_counts(socket).await;
|
||||||
|
let row = build_row(
|
||||||
|
started_at,
|
||||||
|
ended_at,
|
||||||
|
duration_ms,
|
||||||
|
model_at_start,
|
||||||
|
from.clone(),
|
||||||
|
&outcome,
|
||||||
|
&bus,
|
||||||
|
open_threads,
|
||||||
|
open_reminders,
|
||||||
|
);
|
||||||
|
s.record(&row);
|
||||||
|
}
|
||||||
// Check for messages that arrived during the turn and loop
|
// Check for messages that arrived during the turn and loop
|
||||||
// immediately if any are waiting — mirrors hive-ag3nt behaviour.
|
// immediately if any are waiting — mirrors hive-ag3nt behaviour.
|
||||||
let pending = inbox_unread(socket).await;
|
let pending = inbox_unread(socket).await;
|
||||||
|
|
@ -176,7 +203,8 @@ async fn serve(
|
||||||
| ManagerResponse::QuestionQueued { .. }
|
| ManagerResponse::QuestionQueued { .. }
|
||||||
| ManagerResponse::Recent { .. }
|
| ManagerResponse::Recent { .. }
|
||||||
| ManagerResponse::Logs { .. }
|
| ManagerResponse::Logs { .. }
|
||||||
| ManagerResponse::OpenThreads { .. },
|
| ManagerResponse::OpenThreads { .. }
|
||||||
|
| ManagerResponse::PendingRemindersCount { .. },
|
||||||
) => {
|
) => {
|
||||||
tracing::warn!("recv produced unexpected response kind");
|
tracing::warn!("recv produced unexpected response kind");
|
||||||
}
|
}
|
||||||
|
|
@ -211,3 +239,80 @@ async fn inbox_unread(socket: &Path) -> u64 {
|
||||||
_ => 0,
|
_ => 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn now_unix() -> i64 {
|
||||||
|
std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.ok()
|
||||||
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
||||||
|
.unwrap_or(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Manager-flavour equivalent of the agent helper. Mirror shape, just
|
||||||
|
/// uses ManagerRequest/ManagerResponse instead of the agent variants.
|
||||||
|
async fn fetch_manager_post_turn_counts(socket: &Path) -> (Option<u64>, Option<u64>) {
|
||||||
|
let threads = match client::request::<_, ManagerResponse>(
|
||||||
|
socket,
|
||||||
|
&ManagerRequest::GetOpenThreads,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(ManagerResponse::OpenThreads { threads }) => u64::try_from(threads.len()).ok(),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let reminders = match client::request::<_, ManagerResponse>(
|
||||||
|
socket,
|
||||||
|
&ManagerRequest::CountPendingReminders,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(ManagerResponse::PendingRemindersCount { count }) => Some(count),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
(threads, reminders)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Manager flavour of the agent's build_row helper. Duplicated rather
|
||||||
|
/// than shared to keep each bin self-contained at this size.
|
||||||
|
fn build_row(
|
||||||
|
started_at: i64,
|
||||||
|
ended_at: i64,
|
||||||
|
duration_ms: i64,
|
||||||
|
model: String,
|
||||||
|
wake_from: String,
|
||||||
|
outcome: &turn::TurnOutcome,
|
||||||
|
bus: &Bus,
|
||||||
|
open_threads_count: Option<u64>,
|
||||||
|
open_reminders_count: Option<u64>,
|
||||||
|
) -> TurnStatRow {
|
||||||
|
let usage = bus.last_usage().unwrap_or_default();
|
||||||
|
let tool_calls = bus.take_tool_calls();
|
||||||
|
let tool_call_count: u64 = tool_calls.values().copied().sum();
|
||||||
|
let tool_call_breakdown_json = if tool_calls.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
serde_json::to_string(&tool_calls).ok()
|
||||||
|
};
|
||||||
|
let (result_kind, note) = match outcome {
|
||||||
|
turn::TurnOutcome::Ok => ("ok", None),
|
||||||
|
turn::TurnOutcome::PromptTooLong => ("prompt_too_long", None),
|
||||||
|
turn::TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))),
|
||||||
|
};
|
||||||
|
TurnStatRow {
|
||||||
|
started_at,
|
||||||
|
ended_at,
|
||||||
|
duration_ms,
|
||||||
|
model,
|
||||||
|
wake_from,
|
||||||
|
input_tokens: usage.input_tokens,
|
||||||
|
output_tokens: usage.output_tokens,
|
||||||
|
cache_read_input_tokens: usage.cache_read_input_tokens,
|
||||||
|
cache_creation_input_tokens: usage.cache_creation_input_tokens,
|
||||||
|
tool_call_count,
|
||||||
|
tool_call_breakdown_json,
|
||||||
|
open_threads_count,
|
||||||
|
open_reminders_count,
|
||||||
|
result_kind,
|
||||||
|
note,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -294,6 +294,12 @@ pub struct Bus {
|
||||||
/// behavior. Atomic so the consumer can take-and-clear without a
|
/// behavior. Atomic so the consumer can take-and-clear without a
|
||||||
/// lock.
|
/// lock.
|
||||||
skip_continue_once: Arc<AtomicBool>,
|
skip_continue_once: Arc<AtomicBool>,
|
||||||
|
/// Per-turn tool-call counter. Reset by the bin loop between
|
||||||
|
/// turns via `take_tool_calls`. Populated by `observe_stream` as
|
||||||
|
/// the stdout pump parses each stream-json line. Powers the
|
||||||
|
/// `tool_call_count` + `tool_call_breakdown_json` columns on the
|
||||||
|
/// per-turn stats sink.
|
||||||
|
tool_calls: Arc<Mutex<std::collections::HashMap<String, u64>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Bus {
|
impl Bus {
|
||||||
|
|
@ -319,6 +325,7 @@ impl Bus {
|
||||||
model: Arc::new(Mutex::new(initial_model)),
|
model: Arc::new(Mutex::new(initial_model)),
|
||||||
last_usage: Arc::new(Mutex::new(None)),
|
last_usage: Arc::new(Mutex::new(None)),
|
||||||
skip_continue_once: Arc::new(AtomicBool::new(false)),
|
skip_continue_once: Arc::new(AtomicBool::new(false)),
|
||||||
|
tool_calls: Arc::new(Mutex::new(std::collections::HashMap::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -377,6 +384,43 @@ impl Bus {
|
||||||
self.emit(LiveEvent::TokenUsageChanged { usage });
|
self.emit(LiveEvent::TokenUsageChanged { usage });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Walk a stream-json value for `tool_use` blocks and bump the
|
||||||
|
/// per-turn counter for each one we find. Called by the stdout
|
||||||
|
/// pump on every parsed line. Cheap when the line isn't an
|
||||||
|
/// assistant message — the field-check short-circuits.
|
||||||
|
pub fn observe_stream(&self, v: &serde_json::Value) {
|
||||||
|
if v.get("type").and_then(|t| t.as_str()) != Some("assistant") {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let Some(content) = v
|
||||||
|
.get("message")
|
||||||
|
.and_then(|m| m.get("content"))
|
||||||
|
.and_then(|c| c.as_array())
|
||||||
|
else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let mut counts = self.tool_calls.lock().unwrap();
|
||||||
|
for block in content {
|
||||||
|
if block.get("type").and_then(|t| t.as_str()) != Some("tool_use") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let name = block
|
||||||
|
.get("name")
|
||||||
|
.and_then(|n| n.as_str())
|
||||||
|
.unwrap_or("<unnamed>")
|
||||||
|
.to_owned();
|
||||||
|
*counts.entry(name).or_insert(0) += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Snapshot + clear the per-turn tool-call counter. The harness
|
||||||
|
/// calls this between turns to fold the breakdown into a
|
||||||
|
/// `turn_stats` row, then start the next turn with an empty map.
|
||||||
|
#[must_use]
|
||||||
|
pub fn take_tool_calls(&self) -> std::collections::HashMap<String, u64> {
|
||||||
|
std::mem::take(&mut *self.tool_calls.lock().unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
/// Last known token usage, or `None` if no turn has completed yet.
|
/// Last known token usage, or `None` if no turn has completed yet.
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn last_usage(&self) -> Option<TokenUsage> {
|
pub fn last_usage(&self) -> Option<TokenUsage> {
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ pub mod mcp;
|
||||||
pub mod paths;
|
pub mod paths;
|
||||||
pub mod plugins;
|
pub mod plugins;
|
||||||
pub mod turn;
|
pub mod turn;
|
||||||
|
pub mod turn_stats;
|
||||||
pub mod web_ui;
|
pub mod web_ui;
|
||||||
|
|
||||||
/// Default socket path inside the container — bind-mounted by `hive-c0re`.
|
/// Default socket path inside the container — bind-mounted by `hive-c0re`.
|
||||||
|
|
|
||||||
|
|
@ -41,6 +41,7 @@ pub enum SocketReply {
|
||||||
Recent(Vec<hive_sh4re::InboxRow>),
|
Recent(Vec<hive_sh4re::InboxRow>),
|
||||||
Logs(String),
|
Logs(String),
|
||||||
OpenThreads(Vec<hive_sh4re::OpenThread>),
|
OpenThreads(Vec<hive_sh4re::OpenThread>),
|
||||||
|
PendingRemindersCount(u64),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<hive_sh4re::AgentResponse> for SocketReply {
|
impl From<hive_sh4re::AgentResponse> for SocketReply {
|
||||||
|
|
@ -54,6 +55,9 @@ impl From<hive_sh4re::AgentResponse> for SocketReply {
|
||||||
hive_sh4re::AgentResponse::Recent { rows } => Self::Recent(rows),
|
hive_sh4re::AgentResponse::Recent { rows } => Self::Recent(rows),
|
||||||
hive_sh4re::AgentResponse::QuestionQueued { id } => Self::QuestionQueued(id),
|
hive_sh4re::AgentResponse::QuestionQueued { id } => Self::QuestionQueued(id),
|
||||||
hive_sh4re::AgentResponse::OpenThreads { threads } => Self::OpenThreads(threads),
|
hive_sh4re::AgentResponse::OpenThreads { threads } => Self::OpenThreads(threads),
|
||||||
|
hive_sh4re::AgentResponse::PendingRemindersCount { count } => {
|
||||||
|
Self::PendingRemindersCount(count)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -70,6 +74,9 @@ impl From<hive_sh4re::ManagerResponse> for SocketReply {
|
||||||
hive_sh4re::ManagerResponse::Recent { rows } => Self::Recent(rows),
|
hive_sh4re::ManagerResponse::Recent { rows } => Self::Recent(rows),
|
||||||
hive_sh4re::ManagerResponse::Logs { content } => Self::Logs(content),
|
hive_sh4re::ManagerResponse::Logs { content } => Self::Logs(content),
|
||||||
hive_sh4re::ManagerResponse::OpenThreads { threads } => Self::OpenThreads(threads),
|
hive_sh4re::ManagerResponse::OpenThreads { threads } => Self::OpenThreads(threads),
|
||||||
|
hive_sh4re::ManagerResponse::PendingRemindersCount { count } => {
|
||||||
|
Self::PendingRemindersCount(count)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -288,6 +288,7 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<bool>
|
||||||
if let Some(usage) = crate::events::TokenUsage::from_stream_event(&v) {
|
if let Some(usage) = crate::events::TokenUsage::from_stream_event(&v) {
|
||||||
bus_out.record_usage(usage);
|
bus_out.record_usage(usage);
|
||||||
}
|
}
|
||||||
|
bus_out.observe_stream(&v);
|
||||||
bus_out.emit(LiveEvent::Stream(v));
|
bus_out.emit(LiveEvent::Stream(v));
|
||||||
}
|
}
|
||||||
Err(_) => bus_out.emit(LiveEvent::Note {
|
Err(_) => bus_out.emit(LiveEvent::Note {
|
||||||
|
|
|
||||||
163
hive-ag3nt/src/turn_stats.rs
Normal file
163
hive-ag3nt/src/turn_stats.rs
Normal file
|
|
@ -0,0 +1,163 @@
|
||||||
|
//! Per-turn analytics sink. One sqlite row per claude turn captures:
|
||||||
|
//! identity (model, wake_from, result_kind), timing (started_at,
|
||||||
|
//! ended_at, duration_ms), cost (token counts), behaviour (tool-call
|
||||||
|
//! count + per-tool breakdown), and post-turn snapshot metrics
|
||||||
|
//! (open_threads_count, open_reminders_count).
|
||||||
|
//!
|
||||||
|
//! Lives next to `hyperhive-events.sqlite` in the agent's state dir
|
||||||
|
//! so the host-side state vacuum sweep can reach both. Schema is
|
||||||
|
//! intentionally append-only — every column has a default so future
|
||||||
|
//! additions don't break old readers; new columns land via
|
||||||
|
//! `ALTER TABLE ... ADD COLUMN ... DEFAULT ...` in the migration
|
||||||
|
//! block.
|
||||||
|
//!
|
||||||
|
//! Writes are best-effort: a failed insert logs a warning and lets
|
||||||
|
//! the turn loop continue. The next turn either succeeds or the
|
||||||
|
//! operator sees the journal trail.
|
||||||
|
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use rusqlite::{Connection, params};
|
||||||
|
|
||||||
|
/// SQL bootstrap. CREATE TABLE IF NOT EXISTS so first-boot agents
|
||||||
|
/// and existing ones converge on the same shape; ALTER-style
|
||||||
|
/// migrations land here as additional statements once we have any.
|
||||||
|
const SCHEMA: &str = "
|
||||||
|
CREATE TABLE IF NOT EXISTS turn_stats (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
started_at INTEGER NOT NULL,
|
||||||
|
ended_at INTEGER NOT NULL,
|
||||||
|
duration_ms INTEGER NOT NULL,
|
||||||
|
model TEXT NOT NULL,
|
||||||
|
wake_from TEXT NOT NULL,
|
||||||
|
input_tokens INTEGER NOT NULL DEFAULT 0,
|
||||||
|
output_tokens INTEGER NOT NULL DEFAULT 0,
|
||||||
|
cache_read_input_tokens INTEGER NOT NULL DEFAULT 0,
|
||||||
|
cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0,
|
||||||
|
tool_call_count INTEGER NOT NULL DEFAULT 0,
|
||||||
|
tool_call_breakdown_json TEXT,
|
||||||
|
open_threads_count INTEGER,
|
||||||
|
open_reminders_count INTEGER,
|
||||||
|
result_kind TEXT NOT NULL,
|
||||||
|
note TEXT
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_turn_stats_started
|
||||||
|
ON turn_stats (started_at DESC);
|
||||||
|
";
|
||||||
|
|
||||||
|
/// One row to be inserted. `Option`-wrapped fields default to NULL
|
||||||
|
/// when the harness couldn't gather them (e.g. socket roundtrip for
|
||||||
|
/// open_threads failed) so a partial row beats no row.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct TurnStatRow {
|
||||||
|
pub started_at: i64,
|
||||||
|
pub ended_at: i64,
|
||||||
|
pub duration_ms: i64,
|
||||||
|
pub model: String,
|
||||||
|
pub wake_from: String,
|
||||||
|
pub input_tokens: u64,
|
||||||
|
pub output_tokens: u64,
|
||||||
|
pub cache_read_input_tokens: u64,
|
||||||
|
pub cache_creation_input_tokens: u64,
|
||||||
|
pub tool_call_count: u64,
|
||||||
|
/// Per-tool breakdown as JSON: `{"Read":12,"Bash":3,...}`. None
|
||||||
|
/// when no tools were called (saves a sqlite write of `"{}"`).
|
||||||
|
pub tool_call_breakdown_json: Option<String>,
|
||||||
|
pub open_threads_count: Option<u64>,
|
||||||
|
pub open_reminders_count: Option<u64>,
|
||||||
|
/// `"ok" | "failed" | "prompt_too_long"`.
|
||||||
|
pub result_kind: &'static str,
|
||||||
|
pub note: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Thin sqlite wrapper. Cloning is cheap (Arc-shared connection).
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct TurnStats {
|
||||||
|
inner: std::sync::Arc<Mutex<Connection>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TurnStats {
|
||||||
|
/// Open the per-agent stats db, creating the file + schema if
|
||||||
|
/// missing. Returns `None` when the db can't be opened (read-only
|
||||||
|
/// fs in tests, missing state dir) — the harness logs and
|
||||||
|
/// continues without a sink rather than failing the turn loop.
|
||||||
|
#[must_use]
|
||||||
|
pub fn open_default() -> Option<Self> {
|
||||||
|
let path = default_path();
|
||||||
|
match Self::open(&path) {
|
||||||
|
Ok(s) => Some(s),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(
|
||||||
|
error = ?e,
|
||||||
|
path = %path.display(),
|
||||||
|
"turn_stats: open failed; per-turn analytics disabled"
|
||||||
|
);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open(path: &Path) -> Result<Self> {
|
||||||
|
if let Some(parent) = path.parent() {
|
||||||
|
let _ = std::fs::create_dir_all(parent);
|
||||||
|
}
|
||||||
|
let conn = Connection::open(path)
|
||||||
|
.with_context(|| format!("open turn_stats db {}", path.display()))?;
|
||||||
|
conn.execute_batch(SCHEMA)
|
||||||
|
.context("apply turn_stats schema")?;
|
||||||
|
Ok(Self {
|
||||||
|
inner: std::sync::Arc::new(Mutex::new(conn)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insert a row. Best-effort — logs + swallows errors so a sqlite
|
||||||
|
/// hiccup (locked db, full disk) doesn't crash the harness.
|
||||||
|
pub fn record(&self, row: &TurnStatRow) {
|
||||||
|
let conn = self.inner.lock().unwrap();
|
||||||
|
let res = conn.execute(
|
||||||
|
"INSERT INTO turn_stats (
|
||||||
|
started_at, ended_at, duration_ms, model, wake_from,
|
||||||
|
input_tokens, output_tokens,
|
||||||
|
cache_read_input_tokens, cache_creation_input_tokens,
|
||||||
|
tool_call_count, tool_call_breakdown_json,
|
||||||
|
open_threads_count, open_reminders_count,
|
||||||
|
result_kind, note
|
||||||
|
) VALUES (
|
||||||
|
?1, ?2, ?3, ?4, ?5,
|
||||||
|
?6, ?7,
|
||||||
|
?8, ?9,
|
||||||
|
?10, ?11,
|
||||||
|
?12, ?13,
|
||||||
|
?14, ?15
|
||||||
|
)",
|
||||||
|
params![
|
||||||
|
row.started_at,
|
||||||
|
row.ended_at,
|
||||||
|
row.duration_ms,
|
||||||
|
row.model,
|
||||||
|
row.wake_from,
|
||||||
|
i64::try_from(row.input_tokens).unwrap_or(i64::MAX),
|
||||||
|
i64::try_from(row.output_tokens).unwrap_or(i64::MAX),
|
||||||
|
i64::try_from(row.cache_read_input_tokens).unwrap_or(i64::MAX),
|
||||||
|
i64::try_from(row.cache_creation_input_tokens).unwrap_or(i64::MAX),
|
||||||
|
i64::try_from(row.tool_call_count).unwrap_or(i64::MAX),
|
||||||
|
row.tool_call_breakdown_json,
|
||||||
|
row.open_threads_count
|
||||||
|
.map(|n| i64::try_from(n).unwrap_or(i64::MAX)),
|
||||||
|
row.open_reminders_count
|
||||||
|
.map(|n| i64::try_from(n).unwrap_or(i64::MAX)),
|
||||||
|
row.result_kind,
|
||||||
|
row.note,
|
||||||
|
],
|
||||||
|
);
|
||||||
|
if let Err(e) = res {
|
||||||
|
tracing::warn!(error = ?e, "turn_stats: insert failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_path() -> PathBuf {
|
||||||
|
crate::paths::state_dir().join("hyperhive-turn-stats.sqlite")
|
||||||
|
}
|
||||||
|
|
@ -180,6 +180,14 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
|
||||||
message: format!("{e:#}"),
|
message: format!("{e:#}"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
AgentRequest::CountPendingReminders => {
|
||||||
|
match coord.broker.count_pending_reminders_for(agent) {
|
||||||
|
Ok(count) => AgentResponse::PendingRemindersCount { count },
|
||||||
|
Err(e) => AgentResponse::Err {
|
||||||
|
message: format!("{e:#}"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -324,6 +324,19 @@ impl Broker {
|
||||||
.context("list pending reminders")
|
.context("list pending reminders")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Count this agent's still-pending (un-delivered) reminders.
|
||||||
|
/// Used by the per-turn stats sink for a cheap "what was queued
|
||||||
|
/// at turn-end" snapshot.
|
||||||
|
pub fn count_pending_reminders_for(&self, agent: &str) -> Result<u64> {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
let n: i64 = conn.query_row(
|
||||||
|
"SELECT COUNT(*) FROM reminders WHERE agent = ?1 AND sent_at IS NULL",
|
||||||
|
params![agent],
|
||||||
|
|row| row.get(0),
|
||||||
|
)?;
|
||||||
|
Ok(u64::try_from(n).unwrap_or(0))
|
||||||
|
}
|
||||||
|
|
||||||
/// Delete a reminder by id. Returns the number of rows removed (0
|
/// Delete a reminder by id. Returns the number of rows removed (0
|
||||||
/// when the id never existed or was already delivered). Hard
|
/// when the id never existed or was already delivered). Hard
|
||||||
/// delete rather than soft so the row doesn't linger and confuse a
|
/// delete rather than soft so the row doesn't linger and confuse a
|
||||||
|
|
|
||||||
|
|
@ -335,6 +335,14 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResp
|
||||||
message: format!("{e:#}"),
|
message: format!("{e:#}"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
ManagerRequest::CountPendingReminders => {
|
||||||
|
match coord.broker.count_pending_reminders_for(MANAGER_AGENT) {
|
||||||
|
Ok(count) => ManagerResponse::PendingRemindersCount { count },
|
||||||
|
Err(e) => ManagerResponse::Err {
|
||||||
|
message: format!("{e:#}"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -309,6 +309,10 @@ pub enum AgentRequest {
|
||||||
/// manager); questions surface where the agent is `asker` or
|
/// manager); questions surface where the agent is `asker` or
|
||||||
/// `target`. Cheap O(n) sweep server-side — no caching.
|
/// `target`. Cheap O(n) sweep server-side — no caching.
|
||||||
GetOpenThreads,
|
GetOpenThreads,
|
||||||
|
/// Count of this agent's pending (un-delivered) reminders. Used
|
||||||
|
/// by the harness's per-turn stats sink to snapshot "what was
|
||||||
|
/// queued at turn-end time" without paying for a full list.
|
||||||
|
CountPendingReminders,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Responses on a per-agent socket.
|
/// Responses on a per-agent socket.
|
||||||
|
|
@ -333,6 +337,8 @@ pub enum AgentResponse {
|
||||||
/// `GetOpenThreads` result: list of loose ends pending against
|
/// `GetOpenThreads` result: list of loose ends pending against
|
||||||
/// this agent. Ordered newest-first within each kind.
|
/// this agent. Ordered newest-first within each kind.
|
||||||
OpenThreads { threads: Vec<OpenThread> },
|
OpenThreads { threads: Vec<OpenThread> },
|
||||||
|
/// `CountPendingReminders` result.
|
||||||
|
PendingRemindersCount { count: u64 },
|
||||||
}
|
}
|
||||||
|
|
||||||
// -----------------------------------------------------------------------------
|
// -----------------------------------------------------------------------------
|
||||||
|
|
@ -596,6 +602,9 @@ pub enum ManagerRequest {
|
||||||
/// sub-agent surface is `AgentRequest::GetOpenThreads` which
|
/// sub-agent surface is `AgentRequest::GetOpenThreads` which
|
||||||
/// only returns rows where the agent itself is asker / target.
|
/// only returns rows where the agent itself is asker / target.
|
||||||
GetOpenThreads,
|
GetOpenThreads,
|
||||||
|
/// Count of the manager's own pending reminders. Mirror of
|
||||||
|
/// `AgentRequest::CountPendingReminders` on the manager surface.
|
||||||
|
CountPendingReminders,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
|
@ -634,4 +643,8 @@ pub enum ManagerResponse {
|
||||||
OpenThreads {
|
OpenThreads {
|
||||||
threads: Vec<OpenThread>,
|
threads: Vec<OpenThread>,
|
||||||
},
|
},
|
||||||
|
/// `CountPendingReminders` result.
|
||||||
|
PendingRemindersCount {
|
||||||
|
count: u64,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue