fix: self-calibrate context window from API result event
the stream-json result event carries modelUsage.<model>.contextWindow which is the actual per-inference active window the model enforces. for claude-sonnet-4-6 this is 200k even though the full prompt cache can hold millions of tokens via accumulated cache reads. with the nix-configured sonnet = 1000000 the proactive compact watermark sat at 750k and was never reached. agents grew context until prompt_too_long at ~170k — reactive compact, no checkpoint turn. changes: - bus gains api_context_window field seeded from modelUsage.*.contextWindow in each turn's result event. authoritative; falls back to env var, then 200k. - new effective_context_window(bus) helper used by both watermark functions - compact_watermark (75%) and auto_reset_watermark (50%) call effective_context_window - context_tokens() docstring clarified: all three token fields (input + cache_read + cache_creation) count against the per-inference contextWindow limit. the large cache_read values seen in the result event are cumulative across all inferences in a turn, not per-inference. - /api/state context_window_tokens now reflects the calibrated window closes #129
This commit is contained in:
parent
3e94914569
commit
b0f6bd8ece
3 changed files with 131 additions and 43 deletions
|
|
@ -222,6 +222,11 @@ pub struct TokenUsage {
|
|||
|
||||
impl TokenUsage {
|
||||
/// Total context consumed this turn (input + cache reads + cache writes).
|
||||
/// This is the per-inference context footprint that counts against the
|
||||
/// model's `contextWindow` limit. Tracked from the last `assistant` event
|
||||
/// in the stream-json (per-inference usage, not the cumulative `result`
|
||||
/// event which sums across all inferences in a tool-heavy turn and can
|
||||
/// far exceed the per-inference window).
|
||||
#[must_use]
|
||||
pub fn context_tokens(&self) -> u64 {
|
||||
self.input_tokens + self.cache_read_input_tokens + self.cache_creation_input_tokens
|
||||
|
|
@ -260,6 +265,33 @@ impl TokenUsage {
|
|||
cache_creation_input_tokens: field("cache_creation_input_tokens"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract the per-inference context-window limit from a `result`
|
||||
/// stream-json event's `modelUsage` map. The API reports this as
|
||||
/// `modelUsage.<model-name>.contextWindow`; we take the first non-zero
|
||||
/// value across all model keys.
|
||||
///
|
||||
/// Returns `None` if the event is not a `result` type or has no
|
||||
/// `contextWindow` field. The returned value is the authoritative
|
||||
/// per-inference active window (e.g. 200 000 for `claude-sonnet-4-6`).
|
||||
/// It may be smaller than the full prompt-cache capacity (which can
|
||||
/// be several million tokens via cache reads).
|
||||
#[must_use]
|
||||
pub fn context_window_from_result_event(v: &serde_json::Value) -> Option<u64> {
|
||||
if v.get("type").and_then(|t| t.as_str()) != Some("result") {
|
||||
return None;
|
||||
}
|
||||
let model_usage = v.get("modelUsage")?;
|
||||
let map = model_usage.as_object()?;
|
||||
for (_model, stats) in map {
|
||||
if let Some(w) = stats.get("contextWindow").and_then(serde_json::Value::as_u64) {
|
||||
if w > 0 {
|
||||
return Some(w);
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Authoritative turn-loop state. The harness owns it; the web UI
|
||||
|
|
@ -385,6 +417,14 @@ pub struct Bus {
|
|||
/// `turn.rs` to compute how long the session has been idle and
|
||||
/// whether the prompt cache has gone cold. `0` = no turn yet.
|
||||
last_turn_ended_unix: Arc<AtomicI64>,
|
||||
/// Per-inference context-window size as reported by the Anthropic API
|
||||
/// in the stream-json `result` event (`modelUsage.*.contextWindow`).
|
||||
/// Set by the stdout pump on every completed turn. Takes precedence
|
||||
/// over the Nix-configured `HIVE_CONTEXT_WINDOW_TOKENS_*` env vars
|
||||
/// for compaction watermark calculations — it reflects the actual
|
||||
/// limit the model enforces, which may differ from what the operator
|
||||
/// configured (e.g. 200 k active window on a 1 M cache-enabled model).
|
||||
api_context_window: Arc<Mutex<Option<u64>>>,
|
||||
}
|
||||
|
||||
impl Bus {
|
||||
|
|
@ -419,6 +459,7 @@ impl Bus {
|
|||
skip_continue_once: Arc::new(AtomicBool::new(false)),
|
||||
tool_calls: Arc::new(Mutex::new(std::collections::HashMap::new())),
|
||||
last_turn_ended_unix: Arc::new(AtomicI64::new(0)),
|
||||
api_context_window: Arc::new(Mutex::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -519,6 +560,24 @@ impl Bus {
|
|||
self.last_turn_ended_unix.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Update the API-reported context-window size from the stream-json
|
||||
/// `result` event's `modelUsage.*.contextWindow` field. Called by the
|
||||
/// stdout pump once per completed turn. `0` is ignored (sentinel for
|
||||
/// "not reported").
|
||||
pub fn set_api_context_window(&self, window: u64) {
|
||||
if window > 0 {
|
||||
*self.api_context_window.lock().unwrap() = Some(window);
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the API-reported per-inference context-window size, if the
|
||||
/// harness has seen at least one completed turn for this session.
|
||||
/// `None` until the first result event is processed.
|
||||
#[must_use]
|
||||
pub fn api_context_window(&self) -> Option<u64> {
|
||||
*self.api_context_window.lock().unwrap()
|
||||
}
|
||||
|
||||
/// Walk a stream-json value for `tool_use` blocks and bump the
|
||||
/// per-turn counter for each one we find. Called by the stdout
|
||||
/// pump on every parsed line. Cheap when the line isn't an
|
||||
|
|
|
|||
|
|
@ -210,11 +210,25 @@ pub fn rate_limit_sleep_secs() -> u64 {
|
|||
.unwrap_or(DEFAULT_RATE_LIMIT_SLEEP_SECS)
|
||||
}
|
||||
|
||||
/// Resolve the effective context-window size for watermark calculations.
|
||||
/// Priority order (first wins):
|
||||
/// 1. API-reported window from the last `result` event's `modelUsage.*.contextWindow`.
|
||||
/// 2. `HIVE_CONTEXT_WINDOW_TOKENS_*` env vars (Nix-configured per-model defaults).
|
||||
/// 3. Hard fallback: 200 000.
|
||||
///
|
||||
/// The API-reported window is the authoritative per-inference active
|
||||
/// context limit. It reflects what the model actually enforces — which
|
||||
/// for models with large prompt caches (e.g. 1 M total cache) may be
|
||||
/// significantly smaller than the cache capacity (e.g. 200 k active window
|
||||
/// for `claude-sonnet-4-6`).
|
||||
fn effective_context_window(bus: &Bus) -> u64 {
|
||||
bus.api_context_window()
|
||||
.unwrap_or_else(|| crate::events::context_window_tokens(&bus.model()))
|
||||
}
|
||||
|
||||
/// Resolve the auto-reset watermark. Priority order:
|
||||
/// 1. `HIVE_AUTO_RESET_WATERMARK_TOKENS` env var (explicit override).
|
||||
/// 2. 50% of the model's context window (derived from `bus.model()` +
|
||||
/// `events::context_window_tokens`).
|
||||
///
|
||||
/// 2. 50% of `effective_context_window(bus)`.
|
||||
/// `0` disables auto-reset entirely.
|
||||
fn auto_reset_watermark_tokens(bus: &Bus) -> u64 {
|
||||
if let Some(v) = std::env::var("HIVE_AUTO_RESET_WATERMARK_TOKENS")
|
||||
|
|
@ -223,7 +237,7 @@ fn auto_reset_watermark_tokens(bus: &Bus) -> u64 {
|
|||
{
|
||||
return v;
|
||||
}
|
||||
crate::events::context_window_tokens(&bus.model()) / 2
|
||||
effective_context_window(bus) / 2
|
||||
}
|
||||
|
||||
/// Resolve the assumed cache TTL: `HIVE_CACHE_TTL_SECS` if set, else
|
||||
|
|
@ -238,9 +252,7 @@ fn cache_ttl_secs() -> u64 {
|
|||
|
||||
/// Resolve the proactive-compaction watermark. Priority order:
|
||||
/// 1. `HIVE_COMPACT_WATERMARK_TOKENS` env var (explicit override).
|
||||
/// 2. 75% of the model's context window (derived from `bus.model()` +
|
||||
/// `events::context_window_tokens`).
|
||||
///
|
||||
/// 2. 75% of `effective_context_window(bus)`.
|
||||
/// `0` disables proactive compaction (reactive path still applies).
|
||||
fn compact_watermark_tokens(bus: &Bus) -> u64 {
|
||||
if let Some(v) = std::env::var("HIVE_COMPACT_WATERMARK_TOKENS")
|
||||
|
|
@ -249,7 +261,7 @@ fn compact_watermark_tokens(bus: &Bus) -> u64 {
|
|||
{
|
||||
return v;
|
||||
}
|
||||
crate::events::context_window_tokens(&bus.model()) * 3 / 4
|
||||
effective_context_window(bus) * 3 / 4
|
||||
}
|
||||
|
||||
/// Drive one turn end-to-end. Three paths layer on top of the raw `run_turn`:
|
||||
|
|
@ -554,7 +566,8 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<(bool,
|
|||
if line.contains(PROMPT_TOO_LONG_MARKER) {
|
||||
flag_out.store(true, Ordering::Relaxed);
|
||||
}
|
||||
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line) {
|
||||
match serde_json::from_str::<serde_json::Value>(&line) {
|
||||
Ok(v) => {
|
||||
// Rate-limit detection: only fire on JSON `error` events,
|
||||
// not on arbitrary text content. An agent discussing a past
|
||||
// rate limit in its response would otherwise trigger a false
|
||||
|
|
@ -576,9 +589,21 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<(bool,
|
|||
let ctx = last_inference.unwrap_or(cost);
|
||||
bus_out.record_turn_usage(ctx, cost);
|
||||
}
|
||||
// Seed the API-reported context-window from the result
|
||||
// event's `modelUsage.*.contextWindow` field. This is
|
||||
// the authoritative per-inference active window used for
|
||||
// compaction watermarks — it reflects what the model
|
||||
// actually enforces, which may differ from the Nix
|
||||
// config (e.g. 200k active window on a 1M cache model).
|
||||
if let Some(w) =
|
||||
crate::events::TokenUsage::context_window_from_result_event(&v)
|
||||
{
|
||||
bus_out.set_api_context_window(w);
|
||||
}
|
||||
bus_out.observe_stream(&v);
|
||||
bus_out.emit(LiveEvent::Stream(v));
|
||||
} else {
|
||||
}
|
||||
Err(_) => {
|
||||
// Non-JSON stdout: raw text check is fine here since these
|
||||
// are claude CLI messages, not conversation content.
|
||||
if RATE_LIMIT_MARKERS.iter().any(|m| line.contains(m)) {
|
||||
|
|
@ -589,6 +614,7 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<(bool,
|
|||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
let stderr_tail: Arc<Mutex<VecDeque<String>>> =
|
||||
Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_TAIL_LINES)));
|
||||
|
|
|
|||
|
|
@ -371,10 +371,10 @@ struct StateSnapshot {
|
|||
/// in flight). Mutable at runtime via `POST /api/model`.
|
||||
model: String,
|
||||
/// Effective context-window token budget for the current model.
|
||||
/// Derived from `events::context_window_tokens(&model)` — respects
|
||||
/// per-model and global `HIVE_CONTEXT_WINDOW_TOKENS_*` overrides then
|
||||
/// falls back to model-family heuristic. Consumers (e.g. dashboard
|
||||
/// badge) use this to render the ctx-usage percentage.
|
||||
/// Primary source: API-reported `modelUsage.*.contextWindow` from
|
||||
/// the last result event (authoritative per-inference active window).
|
||||
/// Falls back to `HIVE_CONTEXT_WINDOW_TOKENS_*` env vars, then 200 000.
|
||||
/// Consumers (e.g. dashboard badge) use this to render ctx-usage %.
|
||||
context_window_tokens: u64,
|
||||
/// Last-inference token usage from the most recent completed
|
||||
/// turn — represents the current context-window size at turn-end.
|
||||
|
|
@ -483,7 +483,10 @@ async fn api_state(headers: HeaderMap, State(state): State<AppState>) -> axum::J
|
|||
let inbox = recent_inbox(&state.socket, state.flavor()).await;
|
||||
let (turn_state, turn_state_since) = state.bus.state_snapshot();
|
||||
let model = state.bus.model();
|
||||
let context_window_tokens = crate::events::context_window_tokens(&model);
|
||||
let context_window_tokens = state
|
||||
.bus
|
||||
.api_context_window()
|
||||
.unwrap_or_else(|| crate::events::context_window_tokens(&model));
|
||||
let ctx_usage = state.bus.last_ctx_usage();
|
||||
let cost_usage = state.bus.last_cost_usage();
|
||||
axum::Json(StateSnapshot {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue