diff --git a/hive-ag3nt/src/events.rs b/hive-ag3nt/src/events.rs index 0469d2f..2fdda22 100644 --- a/hive-ag3nt/src/events.rs +++ b/hive-ag3nt/src/events.rs @@ -9,7 +9,7 @@ //! showing "connecting…" until the first event arrives. use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use rusqlite::{Connection, params}; @@ -323,6 +323,11 @@ pub struct Bus { /// `tool_call_count` + `tool_call_breakdown_json` columns on the /// per-turn stats sink. tool_calls: Arc>>, + /// Unix timestamp of the most recent completed turn (set by + /// `record_turn_usage`). Used by the auto-reset heuristic in + /// `turn.rs` to compute how long the session has been idle and + /// whether the prompt cache has gone cold. `0` = no turn yet. + last_turn_ended_unix: Arc, } impl Bus { @@ -350,6 +355,7 @@ impl Bus { last_cost_usage: Arc::new(Mutex::new(None)), skip_continue_once: Arc::new(AtomicBool::new(false)), tool_calls: Arc::new(Mutex::new(std::collections::HashMap::new())), + last_turn_ended_unix: Arc::new(AtomicI64::new(0)), } } @@ -422,9 +428,17 @@ impl Bus { pub fn record_turn_usage(&self, ctx: TokenUsage, cost: TokenUsage) { *self.last_ctx_usage.lock().unwrap() = Some(ctx); *self.last_cost_usage.lock().unwrap() = Some(cost); + self.last_turn_ended_unix.store(now_unix(), Ordering::Relaxed); self.emit(LiveEvent::TokenUsageChanged { ctx, cost }); } + /// Unix timestamp of the most recent completed turn (`record_turn_usage` + /// call), or `0` if no turn has finished yet. + #[must_use] + pub fn last_turn_ended_unix(&self) -> i64 { + self.last_turn_ended_unix.load(Ordering::Relaxed) + } + /// Walk a stream-json value for `tool_use` blocks and bump the /// per-turn counter for each one we find. Called by the stdout /// pump on every parsed line. Cheap when the line isn't an diff --git a/hive-ag3nt/src/turn.rs b/hive-ag3nt/src/turn.rs index 6eac490..d42bc7c 100644 --- a/hive-ag3nt/src/turn.rs +++ b/hive-ag3nt/src/turn.rs @@ -54,6 +54,27 @@ const RATE_LIMIT_MARKERS: &[&str] = &[ /// capacity limits. const DEFAULT_RATE_LIMIT_SLEEP_SECS: u64 = 300; +/// Token watermark for *auto session-reset*. When context is at or above this +/// many tokens AND the prompt cache has gone cold (idle time >= `CACHE_TTL_SECS`), +/// the harness drops `--continue` so the next turn starts fresh. Running any +/// turn (even a checkpoint) before the reset would re-upload the full context +/// and warm the cache, defeating the cost purpose — so the reset happens +/// immediately with no preceding turn. Default is ~50% of a 200k-token +/// window; override via `HIVE_AUTO_RESET_WATERMARK_TOKENS`, or set to `0` +/// to disable. +const DEFAULT_AUTO_RESET_WATERMARK_TOKENS: u64 = 100_000; + +/// Assumed prompt-cache TTL. Claude caches prompt prefixes — ~5 minutes on +/// the API (pay-per-token), ~1 hour on Claude Max (subscription). When the +/// idle gap exceeds this, the cache prefix has likely expired and the next +/// turn re-uploads the full transcript regardless of whether we resume or +/// start fresh. A fresh session with a small context is therefore equally +/// cheap but gives the model a clean slate. Default is 3600s (1h) matching +/// the subscription TTL; API (pay-per-token) users should set +/// `HIVE_CACHE_TTL_SECS=300`. Override via `HIVE_CACHE_TTL_SECS`; set to +/// `0` to disable (always resume). +const DEFAULT_CACHE_TTL_SECS: u64 = 3600; + /// Token watermark for *proactive* compaction. Once a turn finishes with /// the last inference's context size at or above this many tokens, /// `drive_turn` runs one dedicated notes-checkpoint turn (so the agent @@ -191,6 +212,26 @@ pub fn rate_limit_sleep_secs() -> u64 { .unwrap_or(DEFAULT_RATE_LIMIT_SLEEP_SECS) } +/// Resolve the auto-reset watermark: `HIVE_AUTO_RESET_WATERMARK_TOKENS` if +/// set to a valid integer, else `DEFAULT_AUTO_RESET_WATERMARK_TOKENS`. `0` +/// disables auto-reset entirely. +fn auto_reset_watermark_tokens() -> u64 { + std::env::var("HIVE_AUTO_RESET_WATERMARK_TOKENS") + .ok() + .and_then(|s| s.trim().parse::().ok()) + .unwrap_or(DEFAULT_AUTO_RESET_WATERMARK_TOKENS) +} + +/// Resolve the assumed cache TTL: `HIVE_CACHE_TTL_SECS` if set, else +/// `DEFAULT_CACHE_TTL_SECS`. +fn cache_ttl_secs() -> u64 { + std::env::var("HIVE_CACHE_TTL_SECS") + .ok() + .and_then(|s| s.trim().parse::().ok()) + .filter(|&v| v > 0) + .unwrap_or(DEFAULT_CACHE_TTL_SECS) +} + /// Resolve the proactive-compaction watermark: `HIVE_COMPACT_WATERMARK_TOKENS` /// if set to a valid integer, else `DEFAULT_COMPACT_WATERMARK_TOKENS`. A /// value of `0` disables proactive compaction. @@ -201,21 +242,26 @@ fn compact_watermark_tokens() -> u64 { .unwrap_or(DEFAULT_COMPACT_WATERMARK_TOKENS) } -/// Drive one turn end-to-end. Two compaction paths layer on top of the -/// raw `run_turn`: +/// Drive one turn end-to-end. Three paths layer on top of the raw `run_turn`: /// -/// - **Reactive** — `run_turn` returns `PromptTooLong`: the session is -/// already past the context window and *no* turn can run on it, so we -/// compact immediately and retry the same wake-up prompt once. No +/// - **Auto-reset (pre-turn)** — context is large AND the prompt cache has +/// gone cold (idle gap ≥ cache TTL). Resuming would re-upload the full +/// transcript uncached at the same cost as a fresh start. The harness runs +/// one checkpoint turn (agent flushes state), then arms a one-shot +/// `request_new_session` so the actual turn starts fresh. +/// - **Reactive (on overflow)** — `run_turn` returns `PromptTooLong`: the +/// session is already past the context window and *no* turn can run on it, +/// so we compact immediately and retry the same wake-up prompt once. No /// notes-checkpoint turn is possible here — the detail is gone. -/// - **Proactive** — the turn finished cleanly but its context size has -/// crept past the watermark: while the session is still healthy we -/// give the agent one dedicated turn to checkpoint its `/state` notes, -/// then compact. This keeps a later turn from hitting the reactive -/// path (where there is no chance to save anything first). +/// - **Proactive (post-turn)** — the turn finished cleanly but its context +/// size has crept past the watermark: while the session is still healthy we +/// give the agent one dedicated turn to checkpoint its `/state` notes, then +/// compact. This keeps a later turn from hitting the reactive path (where +/// there is no chance to save anything first). /// /// Both the sub-agent and manager loops call this. pub async fn drive_turn(prompt: &str, files: &TurnFiles, bus: &Bus) -> TurnOutcome { + maybe_auto_reset(bus); let outcome = match run_turn(prompt, files, bus).await { TurnOutcome::PromptTooLong => { if let Err(e) = compact_session(files, bus).await { @@ -284,6 +330,45 @@ async fn maybe_checkpoint_and_compact(files: &TurnFiles, bus: &Bus) { } } +/// Pre-turn auto-reset check. If context is large AND the prompt cache has +/// gone cold (idle time >= cache TTL), arm `request_new_session` so the +/// next wake-up turn starts fresh. No preceding checkpoint turn — running +/// any turn before the reset would re-upload and re-warm the cache, which +/// defeats the cost-optimisation purpose entirely. +fn maybe_auto_reset(bus: &Bus) { + let watermark = auto_reset_watermark_tokens(); + if watermark == 0 { + return; // auto-reset disabled + } + let Some(ctx_tokens) = bus.last_ctx_usage().map(|u| u.context_tokens()) else { + return; // no usage reading yet — first turn, nothing to reset + }; + if ctx_tokens < watermark { + return; + } + let last_ended = bus.last_turn_ended_unix(); + if last_ended == 0 { + return; // no completed turn yet + } + // Compute idle seconds using the same clock as now_unix (unix epoch, i64). + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + let idle_secs = now.saturating_sub(u64::try_from(last_ended).unwrap_or(0)); + let ttl = cache_ttl_secs(); + if idle_secs < ttl { + return; + } + bus.emit(LiveEvent::Note { + text: format!( + "context {ctx_tokens} tokens, idle {idle_secs}s >= cache TTL {ttl}s \ + — dropping session (cache cold, fresh start is equally cheap)" + ), + }); + bus.request_new_session(); +} + /// Emit the per-turn `TurnEnd` event + log line. Single owner so the /// agent and manager loops agree on outcome semantics. pub fn emit_turn_end(bus: &Bus, outcome: &TurnOutcome) {