diff --git a/hive-ag3nt/src/events.rs b/hive-ag3nt/src/events.rs index 7c0dff6..e16e3b7 100644 --- a/hive-ag3nt/src/events.rs +++ b/hive-ag3nt/src/events.rs @@ -222,6 +222,11 @@ pub struct TokenUsage { impl TokenUsage { /// Total context consumed this turn (input + cache reads + cache writes). + /// This is the per-inference context footprint that counts against the + /// model's `contextWindow` limit. Tracked from the last `assistant` event + /// in the stream-json (per-inference usage, not the cumulative `result` + /// event which sums across all inferences in a tool-heavy turn and can + /// far exceed the per-inference window). #[must_use] pub fn context_tokens(&self) -> u64 { self.input_tokens + self.cache_read_input_tokens + self.cache_creation_input_tokens @@ -260,6 +265,33 @@ impl TokenUsage { cache_creation_input_tokens: field("cache_creation_input_tokens"), } } + + /// Extract the per-inference context-window limit from a `result` + /// stream-json event's `modelUsage` map. The API reports this as + /// `modelUsage..contextWindow`; we take the first non-zero + /// value across all model keys. + /// + /// Returns `None` if the event is not a `result` type or has no + /// `contextWindow` field. The returned value is the authoritative + /// per-inference active window (e.g. 200 000 for `claude-sonnet-4-6`). + /// It may be smaller than the full prompt-cache capacity (which can + /// be several million tokens via cache reads). + #[must_use] + pub fn context_window_from_result_event(v: &serde_json::Value) -> Option { + if v.get("type").and_then(|t| t.as_str()) != Some("result") { + return None; + } + let model_usage = v.get("modelUsage")?; + let map = model_usage.as_object()?; + for (_model, stats) in map { + if let Some(w) = stats.get("contextWindow").and_then(serde_json::Value::as_u64) { + if w > 0 { + return Some(w); + } + } + } + None + } } /// Authoritative turn-loop state. The harness owns it; the web UI @@ -385,6 +417,14 @@ pub struct Bus { /// `turn.rs` to compute how long the session has been idle and /// whether the prompt cache has gone cold. `0` = no turn yet. last_turn_ended_unix: Arc, + /// Per-inference context-window size as reported by the Anthropic API + /// in the stream-json `result` event (`modelUsage.*.contextWindow`). + /// Set by the stdout pump on every completed turn. Takes precedence + /// over the Nix-configured `HIVE_CONTEXT_WINDOW_TOKENS_*` env vars + /// for compaction watermark calculations — it reflects the actual + /// limit the model enforces, which may differ from what the operator + /// configured (e.g. 200 k active window on a 1 M cache-enabled model). + api_context_window: Arc>>, } impl Bus { @@ -419,6 +459,7 @@ impl Bus { skip_continue_once: Arc::new(AtomicBool::new(false)), tool_calls: Arc::new(Mutex::new(std::collections::HashMap::new())), last_turn_ended_unix: Arc::new(AtomicI64::new(0)), + api_context_window: Arc::new(Mutex::new(None)), } } @@ -519,6 +560,24 @@ impl Bus { self.last_turn_ended_unix.load(Ordering::Relaxed) } + /// Update the API-reported context-window size from the stream-json + /// `result` event's `modelUsage.*.contextWindow` field. Called by the + /// stdout pump once per completed turn. `0` is ignored (sentinel for + /// "not reported"). + pub fn set_api_context_window(&self, window: u64) { + if window > 0 { + *self.api_context_window.lock().unwrap() = Some(window); + } + } + + /// Return the API-reported per-inference context-window size, if the + /// harness has seen at least one completed turn for this session. + /// `None` until the first result event is processed. + #[must_use] + pub fn api_context_window(&self) -> Option { + *self.api_context_window.lock().unwrap() + } + /// Walk a stream-json value for `tool_use` blocks and bump the /// per-turn counter for each one we find. Called by the stdout /// pump on every parsed line. Cheap when the line isn't an diff --git a/hive-ag3nt/src/turn.rs b/hive-ag3nt/src/turn.rs index 9301479..12a7e13 100644 --- a/hive-ag3nt/src/turn.rs +++ b/hive-ag3nt/src/turn.rs @@ -210,11 +210,25 @@ pub fn rate_limit_sleep_secs() -> u64 { .unwrap_or(DEFAULT_RATE_LIMIT_SLEEP_SECS) } +/// Resolve the effective context-window size for watermark calculations. +/// Priority order (first wins): +/// 1. API-reported window from the last `result` event's `modelUsage.*.contextWindow`. +/// 2. `HIVE_CONTEXT_WINDOW_TOKENS_*` env vars (Nix-configured per-model defaults). +/// 3. Hard fallback: 200 000. +/// +/// The API-reported window is the authoritative per-inference active +/// context limit. It reflects what the model actually enforces — which +/// for models with large prompt caches (e.g. 1 M total cache) may be +/// significantly smaller than the cache capacity (e.g. 200 k active window +/// for `claude-sonnet-4-6`). +fn effective_context_window(bus: &Bus) -> u64 { + bus.api_context_window() + .unwrap_or_else(|| crate::events::context_window_tokens(&bus.model())) +} + /// Resolve the auto-reset watermark. Priority order: /// 1. `HIVE_AUTO_RESET_WATERMARK_TOKENS` env var (explicit override). -/// 2. 50% of the model's context window (derived from `bus.model()` + -/// `events::context_window_tokens`). -/// +/// 2. 50% of `effective_context_window(bus)`. /// `0` disables auto-reset entirely. fn auto_reset_watermark_tokens(bus: &Bus) -> u64 { if let Some(v) = std::env::var("HIVE_AUTO_RESET_WATERMARK_TOKENS") @@ -223,7 +237,7 @@ fn auto_reset_watermark_tokens(bus: &Bus) -> u64 { { return v; } - crate::events::context_window_tokens(&bus.model()) / 2 + effective_context_window(bus) / 2 } /// Resolve the assumed cache TTL: `HIVE_CACHE_TTL_SECS` if set, else @@ -238,9 +252,7 @@ fn cache_ttl_secs() -> u64 { /// Resolve the proactive-compaction watermark. Priority order: /// 1. `HIVE_COMPACT_WATERMARK_TOKENS` env var (explicit override). -/// 2. 75% of the model's context window (derived from `bus.model()` + -/// `events::context_window_tokens`). -/// +/// 2. 75% of `effective_context_window(bus)`. /// `0` disables proactive compaction (reactive path still applies). fn compact_watermark_tokens(bus: &Bus) -> u64 { if let Some(v) = std::env::var("HIVE_COMPACT_WATERMARK_TOKENS") @@ -249,7 +261,7 @@ fn compact_watermark_tokens(bus: &Bus) -> u64 { { return v; } - crate::events::context_window_tokens(&bus.model()) * 3 / 4 + effective_context_window(bus) * 3 / 4 } /// Drive one turn end-to-end. Three paths layer on top of the raw `run_turn`: @@ -554,39 +566,53 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<(bool, if line.contains(PROMPT_TOO_LONG_MARKER) { flag_out.store(true, Ordering::Relaxed); } - if let Ok(v) = serde_json::from_str::(&line) { - // Rate-limit detection: only fire on JSON `error` events, - // not on arbitrary text content. An agent discussing a past - // rate limit in its response would otherwise trigger a false - // positive (the full conversation flows through stdout as - // stream-json, so any text the model outputs is visible here). - if v.get("type").and_then(|t| t.as_str()) == Some("error") { - let raw = v.to_string(); - if RATE_LIMIT_MARKERS.iter().any(|m| raw.contains(m)) { + match serde_json::from_str::(&line) { + Ok(v) => { + // Rate-limit detection: only fire on JSON `error` events, + // not on arbitrary text content. An agent discussing a past + // rate limit in its response would otherwise trigger a false + // positive (the full conversation flows through stdout as + // stream-json, so any text the model outputs is visible here). + if v.get("type").and_then(|t| t.as_str()) == Some("error") { + let raw = v.to_string(); + if RATE_LIMIT_MARKERS.iter().any(|m| raw.contains(m)) { + rate_out.store(true, Ordering::Relaxed); + } + } + if let Some(u) = crate::events::TokenUsage::from_assistant_event(&v) { + last_inference = Some(u); + } + if let Some(cost) = crate::events::TokenUsage::from_stream_event(&v) { + // Fallback to `cost` if the turn somehow produced + // a result without any assistant event — keeps the + // ctx badge from going stale on a degenerate turn. + let ctx = last_inference.unwrap_or(cost); + bus_out.record_turn_usage(ctx, cost); + } + // Seed the API-reported context-window from the result + // event's `modelUsage.*.contextWindow` field. This is + // the authoritative per-inference active window used for + // compaction watermarks — it reflects what the model + // actually enforces, which may differ from the Nix + // config (e.g. 200k active window on a 1M cache model). + if let Some(w) = + crate::events::TokenUsage::context_window_from_result_event(&v) + { + bus_out.set_api_context_window(w); + } + bus_out.observe_stream(&v); + bus_out.emit(LiveEvent::Stream(v)); + } + Err(_) => { + // Non-JSON stdout: raw text check is fine here since these + // are claude CLI messages, not conversation content. + if RATE_LIMIT_MARKERS.iter().any(|m| line.contains(m)) { rate_out.store(true, Ordering::Relaxed); } + bus_out.emit(LiveEvent::Note { + text: format!("(non-json) {line}"), + }); } - if let Some(u) = crate::events::TokenUsage::from_assistant_event(&v) { - last_inference = Some(u); - } - if let Some(cost) = crate::events::TokenUsage::from_stream_event(&v) { - // Fallback to `cost` if the turn somehow produced - // a result without any assistant event — keeps the - // ctx badge from going stale on a degenerate turn. - let ctx = last_inference.unwrap_or(cost); - bus_out.record_turn_usage(ctx, cost); - } - bus_out.observe_stream(&v); - bus_out.emit(LiveEvent::Stream(v)); - } else { - // Non-JSON stdout: raw text check is fine here since these - // are claude CLI messages, not conversation content. - if RATE_LIMIT_MARKERS.iter().any(|m| line.contains(m)) { - rate_out.store(true, Ordering::Relaxed); - } - bus_out.emit(LiveEvent::Note { - text: format!("(non-json) {line}"), - }); } } }); diff --git a/hive-ag3nt/src/web_ui.rs b/hive-ag3nt/src/web_ui.rs index 644844c..9c4594c 100644 --- a/hive-ag3nt/src/web_ui.rs +++ b/hive-ag3nt/src/web_ui.rs @@ -371,10 +371,10 @@ struct StateSnapshot { /// in flight). Mutable at runtime via `POST /api/model`. model: String, /// Effective context-window token budget for the current model. - /// Derived from `events::context_window_tokens(&model)` — respects - /// per-model and global `HIVE_CONTEXT_WINDOW_TOKENS_*` overrides then - /// falls back to model-family heuristic. Consumers (e.g. dashboard - /// badge) use this to render the ctx-usage percentage. + /// Primary source: API-reported `modelUsage.*.contextWindow` from + /// the last result event (authoritative per-inference active window). + /// Falls back to `HIVE_CONTEXT_WINDOW_TOKENS_*` env vars, then 200 000. + /// Consumers (e.g. dashboard badge) use this to render ctx-usage %. context_window_tokens: u64, /// Last-inference token usage from the most recent completed /// turn — represents the current context-window size at turn-end. @@ -483,7 +483,10 @@ async fn api_state(headers: HeaderMap, State(state): State) -> axum::J let inbox = recent_inbox(&state.socket, state.flavor()).await; let (turn_state, turn_state_since) = state.bus.state_snapshot(); let model = state.bus.model(); - let context_window_tokens = crate::events::context_window_tokens(&model); + let context_window_tokens = state + .bus + .api_context_window() + .unwrap_or_else(|| crate::events::context_window_tokens(&model)); let ctx_usage = state.bus.last_ctx_usage(); let cost_usage = state.bus.last_cost_usage(); axum::Json(StateSnapshot {