755 lines
30 KiB
Rust
755 lines
30 KiB
Rust
//! Live event stream for the per-agent web UI. The harness emits one
|
|
//! `LiveEvent` per interesting thing that happens during a turn — wake-up
|
|
//! (the popped inbox message), every line claude prints on stdout
|
|
//! (parsed from `--output-format stream-json`), and the turn-end summary.
|
|
//! The web UI subscribes via SSE and renders rows live.
|
|
//!
|
|
//! Channel type is `tokio::sync::broadcast`. New subscribers see only
|
|
//! future events; the dashboard JS deals with the cold-start case by
|
|
//! showing "connecting…" until the first event arrives.
|
|
|
|
use std::path::{Path, PathBuf};
|
|
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use rusqlite::{Connection, params};
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio::sync::broadcast;
|
|
|
|
const CHANNEL_CAPACITY: usize = 256;
|
|
/// Max `LiveEvent`s the `Bus` returns from `history()` and keeps in
|
|
/// sqlite. Older rows are vacuumed on a periodic sweep.
|
|
const HISTORY_CAPACITY: usize = 2000;
|
|
/// Path to the persisted event db. Overridable via `HYPERHIVE_EVENTS_DB`
|
|
/// for dev / tests; otherwise derived from the agent's state dir.
|
|
fn events_db_path() -> PathBuf {
|
|
std::env::var_os("HYPERHIVE_EVENTS_DB").map_or_else(
|
|
|| crate::paths::state_dir().join("hyperhive-events.sqlite"),
|
|
PathBuf::from,
|
|
)
|
|
}
|
|
|
|
/// Path to the persisted model file. Overridable via `HYPERHIVE_MODEL_FILE`
|
|
/// for dev / tests; otherwise derived from the agent's state dir.
|
|
fn model_file_path() -> PathBuf {
|
|
std::env::var_os("HYPERHIVE_MODEL_FILE").map_or_else(
|
|
|| crate::paths::state_dir().join("hyperhive-model"),
|
|
PathBuf::from,
|
|
)
|
|
}
|
|
|
|
fn load_model() -> Option<String> {
|
|
let s = std::fs::read_to_string(model_file_path()).ok()?;
|
|
let name = s.trim();
|
|
if name.is_empty() {
|
|
None
|
|
} else {
|
|
Some(name.to_owned())
|
|
}
|
|
}
|
|
|
|
fn persist_model(name: &str) -> std::io::Result<()> {
|
|
let path = model_file_path();
|
|
if let Some(parent) = path.parent() {
|
|
let _ = std::fs::create_dir_all(parent);
|
|
}
|
|
std::fs::write(path, format!("{name}\n"))
|
|
}
|
|
|
|
fn now_unix() -> i64 {
|
|
std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.ok()
|
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
|
.unwrap_or(0)
|
|
}
|
|
|
|
const SCHEMA: &str = "
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ts INTEGER NOT NULL,
|
|
kind TEXT NOT NULL,
|
|
payload_json TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_events_ts ON events (ts);
|
|
";
|
|
|
|
/// Envelope carried over the broadcast channel: the `LiveEvent` itself
|
|
/// plus a monotonic per-process seq stamped by `Bus::emit`. SSE consumers
|
|
/// serialize this directly (seq becomes a sibling of the `kind` tag);
|
|
/// clients use seq to dedupe their buffered live traffic against the
|
|
/// snapshot/history responses (drop anything with `seq <= snapshot.seq`).
|
|
#[derive(Debug, Clone, Serialize)]
|
|
pub struct BusEvent {
|
|
pub seq: u64,
|
|
#[serde(flatten)]
|
|
pub event: LiveEvent,
|
|
}
|
|
|
|
/// One row of the agent's live stream. Serialised to JSON for SSE delivery.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[serde(tag = "kind", rename_all = "snake_case")]
|
|
pub enum LiveEvent {
|
|
/// Harness popped a wake-up message and is about to invoke claude.
|
|
/// `unread` is the count of *other* messages still in the inbox at
|
|
/// that moment — surfaced as a badge in the live panel header.
|
|
TurnStart {
|
|
from: String,
|
|
body: String,
|
|
unread: u64,
|
|
},
|
|
/// One line of claude's `--output-format stream-json` stdout, parsed as
|
|
/// a generic JSON value (so we don't have to track every claude-code
|
|
/// event variant). The frontend pretty-prints by `type` field.
|
|
Stream(serde_json::Value),
|
|
/// Free-form note from the harness (e.g. "claude exited 0",
|
|
/// "stream-json parse error: ..."). Useful when stream-json itself
|
|
/// fails so the UI doesn't just go silent.
|
|
///
|
|
/// Must be a struct variant (not `Note(String)`): internally-tagged
|
|
/// enums can't flatten a tag onto a primitive newtype, and serde
|
|
/// fails serialization at runtime — silently, because the SSE
|
|
/// handler's `filter_map(... .ok()? ...)` swallows the error. From
|
|
/// 2025-08 through 2026-05 every `Note` emission was a no-op + the
|
|
/// sqlite history persisted them as the literal string `"null"`.
|
|
/// The web UI's `note` renderer already reads `ev.text`, so the
|
|
/// wire shape matches without a JS change.
|
|
Note { text: String },
|
|
/// Turn finished. `ok=false` means claude exited non-zero or the
|
|
/// harness hit a transport error.
|
|
TurnEnd { ok: bool, note: Option<String> },
|
|
/// Harness reachability flipped: `"online"` /
|
|
/// `"needs_login_idle"` / `"needs_login_in_progress"`. The web UI
|
|
/// drives the alive badge from this so the operator sees a login
|
|
/// land (or get revoked) without polling. Session detail
|
|
/// (`url`/`output`/`finished`) is still served by `/api/state`
|
|
/// during the short-lived in-progress window — the client
|
|
/// re-fetches only while that flow is active.
|
|
StatusChanged { status: String },
|
|
/// `/api/model` switched the active claude model. The web UI
|
|
/// updates the chip + the per-turn stats sink will key off this
|
|
/// to mark the boundary in its log.
|
|
ModelChanged { model: String },
|
|
/// Token usage for the turn just ended. Carries two snapshots:
|
|
/// - `ctx` is the LAST inference's usage block (the actual context
|
|
/// window in use right now — what the operator needs to decide
|
|
/// whether to compact / reset).
|
|
/// - `cost` is the cumulative usage across every inference in the
|
|
/// turn (sum of per-call billed tokens — the cost signal). For
|
|
/// tool-heavy turns the cumulative blows past the model's window
|
|
/// because each tool call's prompt is rebilled.
|
|
TokenUsageChanged { ctx: TokenUsage, cost: TokenUsage },
|
|
/// Harness's `TurnState` transitioned (idle / thinking /
|
|
/// compacting). `since_unix` matches `Bus::state_snapshot().1`
|
|
/// so the client's elapsed-time ticker keeps progressing across
|
|
/// SSE reconnects without drift.
|
|
TurnStateChanged {
|
|
state: TurnState,
|
|
since_unix: i64,
|
|
},
|
|
}
|
|
|
|
/// sqlite-backed event log. Wraps a `Connection` behind a `Mutex` so the
|
|
/// `Bus` (which clones cheaply) shares one writer.
|
|
struct EventStore {
|
|
conn: Mutex<Connection>,
|
|
}
|
|
|
|
impl EventStore {
|
|
fn open(path: &Path) -> rusqlite::Result<Self> {
|
|
if let Some(parent) = path.parent() {
|
|
let _ = std::fs::create_dir_all(parent);
|
|
}
|
|
let conn = Connection::open(path)?;
|
|
conn.execute_batch(SCHEMA)?;
|
|
Ok(Self {
|
|
conn: Mutex::new(conn),
|
|
})
|
|
}
|
|
|
|
fn append(&self, event: &LiveEvent) -> rusqlite::Result<()> {
|
|
let ts = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.ok()
|
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
|
.unwrap_or(0);
|
|
let kind = match event {
|
|
LiveEvent::TurnStart { .. } => "turn_start",
|
|
LiveEvent::Stream(_) => "stream",
|
|
LiveEvent::Note { .. } => "note",
|
|
LiveEvent::TurnEnd { .. } => "turn_end",
|
|
LiveEvent::StatusChanged { .. } => "status_changed",
|
|
LiveEvent::ModelChanged { .. } => "model_changed",
|
|
LiveEvent::TokenUsageChanged { .. } => "token_usage_changed",
|
|
LiveEvent::TurnStateChanged { .. } => "turn_state_changed",
|
|
};
|
|
let payload = serde_json::to_string(event).unwrap_or_else(|_| "null".into());
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.execute(
|
|
"INSERT INTO events (ts, kind, payload_json) VALUES (?1, ?2, ?3)",
|
|
params![ts, kind, payload],
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn recent(&self, limit: usize) -> rusqlite::Result<Vec<LiveEvent>> {
|
|
let limit_i = i64::try_from(limit).unwrap_or(i64::MAX);
|
|
let conn = self.conn.lock().unwrap();
|
|
let mut stmt = conn.prepare(
|
|
"SELECT payload_json FROM events
|
|
ORDER BY id DESC
|
|
LIMIT ?1",
|
|
)?;
|
|
let rows = stmt.query_map(params![limit_i], |row| {
|
|
let s: String = row.get(0)?;
|
|
Ok(serde_json::from_str::<LiveEvent>(&s).ok())
|
|
})?;
|
|
let mut out: Vec<LiveEvent> = rows.flatten().flatten().collect();
|
|
out.reverse();
|
|
Ok(out)
|
|
}
|
|
}
|
|
|
|
/// Token usage emitted by claude in the final `result` stream-json event.
|
|
/// All counts are in tokens. `None` fields mean the server didn't report them.
|
|
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
|
|
pub struct TokenUsage {
|
|
pub input_tokens: u64,
|
|
pub output_tokens: u64,
|
|
pub cache_read_input_tokens: u64,
|
|
pub cache_creation_input_tokens: u64,
|
|
}
|
|
|
|
impl TokenUsage {
|
|
/// Total context consumed this turn (input + cache reads + cache writes).
|
|
/// This is the per-inference context footprint that counts against the
|
|
/// model's `contextWindow` limit. Tracked from the last `assistant` event
|
|
/// in the stream-json (per-inference usage, not the cumulative `result`
|
|
/// event which sums across all inferences in a tool-heavy turn and can
|
|
/// far exceed the per-inference window).
|
|
#[must_use]
|
|
pub fn context_tokens(&self) -> u64 {
|
|
self.input_tokens + self.cache_read_input_tokens + self.cache_creation_input_tokens
|
|
}
|
|
|
|
/// Parse usage from the terminal `result` stream-json event. This is the
|
|
/// **cumulative** sum across every inference in the turn — useful as a
|
|
/// cost signal, but NOT the current context size (a tool-heavy turn
|
|
/// sums per-call cached prompts and easily exceeds the model window).
|
|
#[must_use]
|
|
pub fn from_stream_event(v: &serde_json::Value) -> Option<Self> {
|
|
if v.get("type").and_then(|t| t.as_str()) != Some("result") {
|
|
return None;
|
|
}
|
|
Some(Self::from_usage_obj(v.get("usage")?))
|
|
}
|
|
|
|
/// Parse usage from a per-inference `assistant` event's
|
|
/// `.message.usage` block. Each turn fires one of these for every
|
|
/// model call; tracking the LAST one over the turn gives the actual
|
|
/// conversation context size — the number to watch for compaction.
|
|
#[must_use]
|
|
pub fn from_assistant_event(v: &serde_json::Value) -> Option<Self> {
|
|
if v.get("type").and_then(|t| t.as_str()) != Some("assistant") {
|
|
return None;
|
|
}
|
|
Some(Self::from_usage_obj(v.get("message")?.get("usage")?))
|
|
}
|
|
|
|
fn from_usage_obj(u: &serde_json::Value) -> Self {
|
|
let field = |k: &str| u.get(k).and_then(serde_json::Value::as_u64).unwrap_or(0);
|
|
Self {
|
|
input_tokens: field("input_tokens"),
|
|
output_tokens: field("output_tokens"),
|
|
cache_read_input_tokens: field("cache_read_input_tokens"),
|
|
cache_creation_input_tokens: field("cache_creation_input_tokens"),
|
|
}
|
|
}
|
|
|
|
/// Extract the per-inference context-window limit from a `result`
|
|
/// stream-json event's `modelUsage` map. The API reports this as
|
|
/// `modelUsage.<model-name>.contextWindow`; we take the first non-zero
|
|
/// value across all model keys.
|
|
///
|
|
/// Returns `None` if the event is not a `result` type or has no
|
|
/// `contextWindow` field. The returned value is the authoritative
|
|
/// per-inference active window (e.g. 200 000 for `claude-sonnet-4-6`).
|
|
/// It may be smaller than the full prompt-cache capacity (which can
|
|
/// be several million tokens via cache reads).
|
|
#[must_use]
|
|
pub fn context_window_from_result_event(v: &serde_json::Value) -> Option<u64> {
|
|
if v.get("type").and_then(|t| t.as_str()) != Some("result") {
|
|
return None;
|
|
}
|
|
let model_usage = v.get("modelUsage")?;
|
|
let map = model_usage.as_object()?;
|
|
for (_model, stats) in map {
|
|
if let Some(w) = stats.get("contextWindow").and_then(serde_json::Value::as_u64) {
|
|
if w > 0 {
|
|
return Some(w);
|
|
}
|
|
}
|
|
}
|
|
None
|
|
}
|
|
}
|
|
|
|
/// Authoritative turn-loop state. The harness owns it; the web UI
|
|
/// reads via `/api/state` and renders. Lives alongside the bus
|
|
/// because everyone who has a `Bus` already has the right handle to
|
|
/// poke the state on transitions.
|
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum TurnState {
|
|
/// Inbox is empty / waiting on `Recv`.
|
|
Idle,
|
|
/// `claude --print` is running for a turn.
|
|
Thinking,
|
|
/// Operator-triggered `/compact` is running on the persistent
|
|
/// session.
|
|
Compacting,
|
|
}
|
|
|
|
/// Compiled-in fallback model used when neither `HIVE_DEFAULT_MODEL` nor a
|
|
/// persisted runtime override is present.
|
|
pub const DEFAULT_MODEL: &str = "haiku";
|
|
|
|
/// Return the model declared in `HIVE_DEFAULT_MODEL` (set from
|
|
/// `hyperhive.model` in `agent.nix`), or `None` if the env var is absent /
|
|
/// empty. When `Some`, this takes precedence over any persisted runtime
|
|
/// override so that nix config changes always take effect on rebuild.
|
|
#[must_use]
|
|
pub fn configured_model() -> Option<&'static str> {
|
|
// Leak once at startup — acceptable for a single config value.
|
|
std::env::var("HIVE_DEFAULT_MODEL")
|
|
.ok()
|
|
.filter(|s| !s.trim().is_empty())
|
|
.map(|s| &*Box::leak(s.into_boxed_str()))
|
|
}
|
|
|
|
/// Return the model to use when no config and no persisted override exist.
|
|
#[must_use]
|
|
pub fn default_model() -> &'static str {
|
|
configured_model().unwrap_or(DEFAULT_MODEL)
|
|
}
|
|
|
|
/// Context-window size in tokens for a given model name.
|
|
///
|
|
/// Canonical per-model sizes are declared in `harness-base.nix` as
|
|
/// `hyperhive.contextWindowTokens` and injected as
|
|
/// `HIVE_CONTEXT_WINDOW_TOKENS_<KEY_UPPER>` env vars — so this function
|
|
/// normally just reads them. The Rust code carries no model knowledge;
|
|
/// updating model families only requires a Nix change.
|
|
///
|
|
/// Resolution order (first match wins):
|
|
/// 1. `HIVE_CONTEXT_WINDOW_TOKENS_<KEY>` — key (lowercased) is a
|
|
/// substring of the active model name. Populated by the Nix default
|
|
/// map for all known families; add/override in `agent.nix`.
|
|
/// 2. `HIVE_CONTEXT_WINDOW_TOKENS` — single global override (any model).
|
|
/// 3. Hard fallback: `200_000` (conservative; only hit outside NixOS).
|
|
#[must_use]
|
|
pub fn context_window_tokens(model: &str) -> u64 {
|
|
let m = model.to_ascii_lowercase();
|
|
// Per-model env vars set by `hyperhive.contextWindowTokens` in Nix.
|
|
for (key, val) in std::env::vars() {
|
|
if let Some(suffix) = key.strip_prefix("HIVE_CONTEXT_WINDOW_TOKENS_")
|
|
&& !suffix.is_empty() && m.contains(&suffix.to_ascii_lowercase())
|
|
&& let Ok(v) = val.trim().parse::<u64>()
|
|
&& v > 0 {
|
|
return v;
|
|
}
|
|
}
|
|
// Global override (single value, any model).
|
|
if let Ok(s) = std::env::var("HIVE_CONTEXT_WINDOW_TOKENS")
|
|
&& let Ok(v) = s.trim().parse::<u64>()
|
|
&& v > 0 {
|
|
return v;
|
|
}
|
|
// Hard fallback for dev/test outside NixOS where env vars aren't set.
|
|
200_000
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct Bus {
|
|
tx: Arc<broadcast::Sender<BusEvent>>,
|
|
/// Monotonic per-process counter stamped onto every `BusEvent`.
|
|
/// Persisted nowhere — a harness restart resets seq to 0; clients
|
|
/// always treat reconnect as "fresh state, fresh stream of seqs."
|
|
/// Historical events served from sqlite carry no seq (they predate
|
|
/// the live channel the seq is meant to dedupe against).
|
|
event_seq: Arc<AtomicU64>,
|
|
/// Persistent event log. `None` only if opening the sqlite db failed
|
|
/// at construction — we keep going so the harness doesn't die on a
|
|
/// missing state dir mount in dev / test scenarios.
|
|
store: Option<Arc<EventStore>>,
|
|
/// Current turn-loop state + since-when (unix seconds).
|
|
state: Arc<Mutex<(TurnState, i64)>>,
|
|
/// Model name passed to `claude --model`. Default `haiku`; the
|
|
/// operator can override at runtime via `POST /api/model`.
|
|
model: Arc<Mutex<String>>,
|
|
/// Last-inference token usage from the most recent turn's final
|
|
/// `assistant` event. Represents the actual context window size at
|
|
/// turn-end — the number the operator watches to decide whether to
|
|
/// compact. `None` until the first turn completes.
|
|
last_ctx_usage: Arc<Mutex<Option<TokenUsage>>>,
|
|
/// Cumulative token usage from the most recent turn's `result`
|
|
/// event (sum across every inference in the turn). This is the cost
|
|
/// signal — tool-heavy turns rebill the cached prompt per call and
|
|
/// blow past the model window. `None` until the first turn completes.
|
|
last_cost_usage: Arc<Mutex<Option<TokenUsage>>>,
|
|
/// True while the harness is parked after a rate-limit response.
|
|
/// Set by `emit_status("rate_limited")`, cleared by
|
|
/// `emit_status("online")`. Also mirrored to a sentinel file at
|
|
/// `{state_dir}/hyperhive-rate-limited` so the host-side
|
|
/// `container_view` can surface the status on the dashboard without
|
|
/// a live socket call.
|
|
rate_limited: Arc<AtomicBool>,
|
|
/// One-shot: next `run_claude` call drops `--continue`, starting
|
|
/// a fresh claude session. Set by `POST /api/new-session` from
|
|
/// the per-agent web UI; consumed (cleared back to false) by the
|
|
/// next turn. Subsequent turns resume normal `--continue`
|
|
/// behavior. Atomic so the consumer can take-and-clear without a
|
|
/// lock.
|
|
skip_continue_once: Arc<AtomicBool>,
|
|
/// Per-turn tool-call counter. Reset by the bin loop between
|
|
/// turns via `take_tool_calls`. Populated by `observe_stream` as
|
|
/// the stdout pump parses each stream-json line. Powers the
|
|
/// `tool_call_count` + `tool_call_breakdown_json` columns on the
|
|
/// per-turn stats sink.
|
|
tool_calls: Arc<Mutex<std::collections::HashMap<String, u64>>>,
|
|
/// Unix timestamp of the most recent completed turn (set by
|
|
/// `record_turn_usage`). Used by the auto-reset heuristic in
|
|
/// `turn.rs` to compute how long the session has been idle and
|
|
/// whether the prompt cache has gone cold. `0` = no turn yet.
|
|
last_turn_ended_unix: Arc<AtomicI64>,
|
|
/// Per-inference context-window size as reported by the Anthropic API
|
|
/// in the stream-json `result` event (`modelUsage.*.contextWindow`).
|
|
/// Set by the stdout pump on every completed turn. Takes precedence
|
|
/// over the Nix-configured `HIVE_CONTEXT_WINDOW_TOKENS_*` env vars
|
|
/// for compaction watermark calculations — it reflects the actual
|
|
/// limit the model enforces, which may differ from what the operator
|
|
/// configured (e.g. 200 k active window on a 1 M cache-enabled model).
|
|
api_context_window: Arc<Mutex<Option<u64>>>,
|
|
}
|
|
|
|
impl Bus {
|
|
/// Open the events db (path from `events_db_path()`). On failure, fall back
|
|
/// to a no-store bus — the harness still works, just without persistent history.
|
|
#[must_use]
|
|
pub fn new() -> Self {
|
|
let path = events_db_path();
|
|
let store = match EventStore::open(&path) {
|
|
Ok(s) => Some(Arc::new(s)),
|
|
Err(e) => {
|
|
tracing::warn!(error = ?e, path = %path.display(), "events db open failed; running without history");
|
|
None
|
|
}
|
|
};
|
|
let (tx, _) = broadcast::channel(CHANNEL_CAPACITY);
|
|
// Priority: HIVE_DEFAULT_MODEL (from hyperhive.model in agent.nix) >
|
|
// persisted runtime override > compiled-in DEFAULT_MODEL.
|
|
// The nix config always wins on rebuild; the persisted file is kept
|
|
// for within-session tracking only (see persist_model / set_model).
|
|
let initial_model = configured_model()
|
|
.map(str::to_owned)
|
|
.unwrap_or_else(|| load_model().unwrap_or_else(|| DEFAULT_MODEL.to_owned()));
|
|
// Restore rate_limited from the sentinel file — if the harness
|
|
// crashed while parked, we should still show the right status on
|
|
// cold load until the next turn clears it.
|
|
let sentinel = crate::paths::state_dir().join("hyperhive-rate-limited");
|
|
let was_rate_limited = sentinel.exists();
|
|
Self {
|
|
tx: Arc::new(tx),
|
|
event_seq: Arc::new(AtomicU64::new(0)),
|
|
store,
|
|
state: Arc::new(Mutex::new((TurnState::Idle, now_unix()))),
|
|
model: Arc::new(Mutex::new(initial_model)),
|
|
last_ctx_usage: Arc::new(Mutex::new(None)),
|
|
last_cost_usage: Arc::new(Mutex::new(None)),
|
|
rate_limited: Arc::new(AtomicBool::new(was_rate_limited)),
|
|
skip_continue_once: Arc::new(AtomicBool::new(false)),
|
|
tool_calls: Arc::new(Mutex::new(std::collections::HashMap::new())),
|
|
last_turn_ended_unix: Arc::new(AtomicI64::new(0)),
|
|
api_context_window: Arc::new(Mutex::new(None)),
|
|
}
|
|
}
|
|
|
|
/// Current high-water seq. Snapshot endpoints read this before
|
|
/// gathering state so the resulting (snapshot.seq, snapshot) pair
|
|
/// satisfies: any live event with seq > snapshot.seq is post-snapshot
|
|
/// (not yet reflected). Clients dedupe buffered SSE traffic against
|
|
/// this value.
|
|
#[must_use]
|
|
pub fn current_seq(&self) -> u64 {
|
|
self.event_seq.load(Ordering::SeqCst)
|
|
}
|
|
|
|
fn next_seq(&self) -> u64 {
|
|
self.event_seq.fetch_add(1, Ordering::SeqCst) + 1
|
|
}
|
|
|
|
/// Arm the one-shot: the next claude invocation will run without
|
|
/// `--continue`, dropping any prior session context. Idempotent
|
|
/// — calling twice in a row before the next turn still consumes
|
|
/// to a single fresh-start.
|
|
pub fn request_new_session(&self) {
|
|
self.skip_continue_once.store(true, Ordering::SeqCst);
|
|
}
|
|
|
|
/// Take + clear the one-shot. Returns true iff the caller should
|
|
/// run claude without `--continue` for this turn.
|
|
#[must_use]
|
|
pub fn take_skip_continue(&self) -> bool {
|
|
self.skip_continue_once.swap(false, Ordering::SeqCst)
|
|
}
|
|
|
|
/// Currently-selected claude model name. Read on every turn so a
|
|
/// `/model <name>` flip takes effect on the next turn.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// Panics if the internal lock is poisoned.
|
|
#[must_use]
|
|
pub fn model(&self) -> String {
|
|
self.model.lock().unwrap().clone()
|
|
}
|
|
|
|
/// Switch the model for future turns. The current turn (if any)
|
|
/// keeps the model it was already running. Persisted to the agent's
|
|
/// state dir (`hyperhive-model`) so the override survives harness
|
|
/// restart and container rebuild (gone on `--purge`, matching
|
|
/// every other piece of agent state).
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// Panics if the internal lock is poisoned.
|
|
pub fn set_model(&self, name: impl Into<String>) {
|
|
let value: String = name.into();
|
|
self.model.lock().unwrap().clone_from(&value);
|
|
if let Err(e) = persist_model(&value) {
|
|
tracing::warn!(error = ?e, "model: persist failed");
|
|
}
|
|
self.emit(LiveEvent::ModelChanged { model: value });
|
|
}
|
|
|
|
/// Seed `last_ctx_usage` + `last_cost_usage` at startup without
|
|
/// emitting a SSE event. Used by the bin entrypoints to backfill
|
|
/// from the most recent `turn_stats` row so the per-agent web UI's
|
|
/// ctx + cost badges paint real numbers on cold load.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// Panics if an internal lock is poisoned.
|
|
pub fn seed_usage(&self, ctx: Option<TokenUsage>, cost: Option<TokenUsage>) {
|
|
if ctx.is_some() {
|
|
*self.last_ctx_usage.lock().unwrap() = ctx;
|
|
}
|
|
if cost.is_some() {
|
|
*self.last_cost_usage.lock().unwrap() = cost;
|
|
}
|
|
}
|
|
|
|
/// Record the just-ended turn's usage. `ctx` is the last inference's
|
|
/// usage (current context size); `cost` is the cumulative across
|
|
/// every inference in the turn (cost signal). One SSE event fires
|
|
/// per turn carrying both.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// Panics if an internal lock is poisoned.
|
|
pub fn record_turn_usage(&self, ctx: TokenUsage, cost: TokenUsage) {
|
|
*self.last_ctx_usage.lock().unwrap() = Some(ctx);
|
|
*self.last_cost_usage.lock().unwrap() = Some(cost);
|
|
self.last_turn_ended_unix.store(now_unix(), Ordering::Relaxed);
|
|
self.emit(LiveEvent::TokenUsageChanged { ctx, cost });
|
|
}
|
|
|
|
/// Unix timestamp of the most recent completed turn (`record_turn_usage`
|
|
/// call), or `0` if no turn has finished yet.
|
|
#[must_use]
|
|
pub fn last_turn_ended_unix(&self) -> i64 {
|
|
self.last_turn_ended_unix.load(Ordering::Relaxed)
|
|
}
|
|
|
|
/// Update the API-reported context-window size from the stream-json
|
|
/// `result` event's `modelUsage.*.contextWindow` field. Called by the
|
|
/// stdout pump once per completed turn. `0` is ignored (sentinel for
|
|
/// "not reported").
|
|
pub fn set_api_context_window(&self, window: u64) {
|
|
if window > 0 {
|
|
*self.api_context_window.lock().unwrap() = Some(window);
|
|
}
|
|
}
|
|
|
|
/// Return the API-reported per-inference context-window size, if the
|
|
/// harness has seen at least one completed turn for this session.
|
|
/// `None` until the first result event is processed.
|
|
#[must_use]
|
|
pub fn api_context_window(&self) -> Option<u64> {
|
|
*self.api_context_window.lock().unwrap()
|
|
}
|
|
|
|
/// Walk a stream-json value for `tool_use` blocks and bump the
|
|
/// per-turn counter for each one we find. Called by the stdout
|
|
/// pump on every parsed line. Cheap when the line isn't an
|
|
/// assistant message — the field-check short-circuits.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// Panics if the internal lock is poisoned.
|
|
pub fn observe_stream(&self, v: &serde_json::Value) {
|
|
if v.get("type").and_then(|t| t.as_str()) != Some("assistant") {
|
|
return;
|
|
}
|
|
let Some(content) = v
|
|
.get("message")
|
|
.and_then(|m| m.get("content"))
|
|
.and_then(|c| c.as_array())
|
|
else {
|
|
return;
|
|
};
|
|
let mut counts = self.tool_calls.lock().unwrap();
|
|
for block in content {
|
|
if block.get("type").and_then(|t| t.as_str()) != Some("tool_use") {
|
|
continue;
|
|
}
|
|
let name = block
|
|
.get("name")
|
|
.and_then(|n| n.as_str())
|
|
.unwrap_or("<unnamed>")
|
|
.to_owned();
|
|
*counts.entry(name).or_insert(0) += 1;
|
|
}
|
|
}
|
|
|
|
/// Snapshot + clear the per-turn tool-call counter. The harness
|
|
/// calls this between turns to fold the breakdown into a
|
|
/// `turn_stats` row, then start the next turn with an empty map.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// Panics if the internal lock is poisoned.
|
|
#[must_use]
|
|
pub fn take_tool_calls(&self) -> std::collections::HashMap<String, u64> {
|
|
std::mem::take(&mut *self.tool_calls.lock().unwrap())
|
|
}
|
|
|
|
/// Last context-size snapshot (last inference of the most recent
|
|
/// turn), or `None` if no turn has completed yet.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// Panics if the internal lock is poisoned.
|
|
#[must_use]
|
|
pub fn last_ctx_usage(&self) -> Option<TokenUsage> {
|
|
*self.last_ctx_usage.lock().unwrap()
|
|
}
|
|
|
|
/// Last cumulative cost snapshot (sum across the most recent turn's
|
|
/// inferences), or `None` if no turn has completed yet.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// Panics if the internal lock is poisoned.
|
|
#[must_use]
|
|
pub fn last_cost_usage(&self) -> Option<TokenUsage> {
|
|
*self.last_cost_usage.lock().unwrap()
|
|
}
|
|
|
|
/// Update the harness's authoritative turn-loop state. Records
|
|
/// the transition time so `state_snapshot` can return a since-age.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// Panics if the internal lock is poisoned.
|
|
pub fn set_state(&self, next: TurnState) {
|
|
let since;
|
|
{
|
|
let mut guard = self.state.lock().unwrap();
|
|
if guard.0 == next {
|
|
return;
|
|
}
|
|
*guard = (next, now_unix());
|
|
since = guard.1;
|
|
}
|
|
self.emit(LiveEvent::TurnStateChanged {
|
|
state: next,
|
|
since_unix: since,
|
|
});
|
|
}
|
|
|
|
/// Broadcast a status flip (online / `needs_login_*` / `rate_limited`).
|
|
/// Called by the bin entry points + `turn::wait_for_login` + the
|
|
/// `post_login_*` handlers — every site that mutates the
|
|
/// `Arc<Mutex<LoginState>>` should also call this so the web UI
|
|
/// drops its periodic /api/state poll while a turn loop is running.
|
|
///
|
|
/// `"rate_limited"` sets the rate-limited flag and writes a sentinel
|
|
/// file at `{state_dir}/hyperhive-rate-limited` so the host-side
|
|
/// dashboard can show the status without a live socket call.
|
|
/// Any other status clears the flag and removes the sentinel.
|
|
pub fn emit_status(&self, status: impl Into<String>) {
|
|
let status = status.into();
|
|
let sentinel = crate::paths::state_dir().join("hyperhive-rate-limited");
|
|
if status == "rate_limited" {
|
|
self.rate_limited.store(true, Ordering::Relaxed);
|
|
let _ = std::fs::write(&sentinel, b"");
|
|
} else {
|
|
self.rate_limited.store(false, Ordering::Relaxed);
|
|
let _ = std::fs::remove_file(&sentinel);
|
|
}
|
|
self.emit(LiveEvent::StatusChanged { status });
|
|
}
|
|
|
|
/// Returns true while the harness is parked after a rate-limit response.
|
|
#[must_use]
|
|
pub fn is_rate_limited(&self) -> bool {
|
|
self.rate_limited.load(Ordering::Relaxed)
|
|
}
|
|
|
|
/// Current state + since-when (unix seconds). Snapshot copy, no lock held.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// Panics if the internal lock is poisoned.
|
|
#[must_use]
|
|
pub fn state_snapshot(&self) -> (TurnState, i64) {
|
|
*self.state.lock().unwrap()
|
|
}
|
|
|
|
pub fn emit(&self, event: LiveEvent) {
|
|
if let Some(store) = &self.store
|
|
&& let Err(e) = store.append(&event)
|
|
{
|
|
tracing::warn!(error = ?e, "events: append failed");
|
|
}
|
|
let envelope = BusEvent {
|
|
seq: self.next_seq(),
|
|
event,
|
|
};
|
|
// Lagged subscribers drop events — fine; the UI is a tail, not a log.
|
|
let _ = self.tx.send(envelope);
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn subscribe(&self) -> broadcast::Receiver<BusEvent> {
|
|
self.tx.subscribe()
|
|
}
|
|
|
|
/// Most recent events, oldest first, capped at `HISTORY_CAPACITY`.
|
|
/// Drives the terminal pre-fill when the operator opens the agent
|
|
/// page; without a store (db open failed) this is empty.
|
|
#[must_use]
|
|
pub fn history(&self) -> Vec<LiveEvent> {
|
|
let Some(store) = &self.store else {
|
|
return Vec::new();
|
|
};
|
|
store.recent(HISTORY_CAPACITY).unwrap_or_default()
|
|
}
|
|
}
|
|
|
|
impl Default for Bus {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|