From 44c903f265f16dda667b9d6535cbebe4b5884b6a Mon Sep 17 00:00:00 2001 From: damocles Date: Wed, 20 May 2026 14:08:32 +0200 Subject: [PATCH] auto session-reset when context large and cache is cold --- hive-ag3nt/src/events.rs | 16 ++++- hive-ag3nt/src/turn.rs | 126 +++++++++++++++++++++++++++++++++++---- 2 files changed, 131 insertions(+), 11 deletions(-) diff --git a/hive-ag3nt/src/events.rs b/hive-ag3nt/src/events.rs index 0469d2f..2fdda22 100644 --- a/hive-ag3nt/src/events.rs +++ b/hive-ag3nt/src/events.rs @@ -9,7 +9,7 @@ //! showing "connecting…" until the first event arrives. use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use rusqlite::{Connection, params}; @@ -323,6 +323,11 @@ pub struct Bus { /// `tool_call_count` + `tool_call_breakdown_json` columns on the /// per-turn stats sink. tool_calls: Arc>>, + /// Unix timestamp of the most recent completed turn (set by + /// `record_turn_usage`). Used by the auto-reset heuristic in + /// `turn.rs` to compute how long the session has been idle and + /// whether the prompt cache has gone cold. `0` = no turn yet. + last_turn_ended_unix: Arc, } impl Bus { @@ -350,6 +355,7 @@ impl Bus { last_cost_usage: Arc::new(Mutex::new(None)), skip_continue_once: Arc::new(AtomicBool::new(false)), tool_calls: Arc::new(Mutex::new(std::collections::HashMap::new())), + last_turn_ended_unix: Arc::new(AtomicI64::new(0)), } } @@ -422,9 +428,17 @@ impl Bus { pub fn record_turn_usage(&self, ctx: TokenUsage, cost: TokenUsage) { *self.last_ctx_usage.lock().unwrap() = Some(ctx); *self.last_cost_usage.lock().unwrap() = Some(cost); + self.last_turn_ended_unix.store(now_unix(), Ordering::Relaxed); self.emit(LiveEvent::TokenUsageChanged { ctx, cost }); } + /// Unix timestamp of the most recent completed turn (`record_turn_usage` + /// call), or `0` if no turn has finished yet. + #[must_use] + pub fn last_turn_ended_unix(&self) -> i64 { + self.last_turn_ended_unix.load(Ordering::Relaxed) + } + /// Walk a stream-json value for `tool_use` blocks and bump the /// per-turn counter for each one we find. Called by the stdout /// pump on every parsed line. Cheap when the line isn't an diff --git a/hive-ag3nt/src/turn.rs b/hive-ag3nt/src/turn.rs index 6eac490..4ae1fd3 100644 --- a/hive-ag3nt/src/turn.rs +++ b/hive-ag3nt/src/turn.rs @@ -54,6 +54,36 @@ const RATE_LIMIT_MARKERS: &[&str] = &[ /// capacity limits. const DEFAULT_RATE_LIMIT_SLEEP_SECS: u64 = 300; +/// Token watermark for *auto session-reset*. When context is at or above this +/// many tokens AND the prompt cache has gone cold (idle time ≥ `CACHE_TTL_SECS`), +/// the harness runs a notes-checkpoint turn then drops `--continue` so the +/// next turn starts fresh. Sending a large context uncached costs the same as +/// a fresh start; the reset avoids paying for a long history the model won't +/// benefit from. Default is ~50% of a 200k-token window; override via +/// `HIVE_AUTO_RESET_WATERMARK_TOKENS`, or set to `0` to disable. +const DEFAULT_AUTO_RESET_WATERMARK_TOKENS: u64 = 100_000; + +/// Assumed prompt-cache TTL. Claude's API caches prompt prefixes for ~5 +/// minutes; after that the prefix must be re-uploaded on the next turn. +/// When the idle gap exceeds this, a large context costs the same whether +/// we resume or start fresh — so we start fresh. Override via +/// `HIVE_CACHE_TTL_SECS`. +const DEFAULT_CACHE_TTL_SECS: u64 = 300; + +/// Synthetic checkpoint prompt injected before an auto-reset turn. Unlike the +/// proactive-compaction checkpoint (which is followed by `/compact`), this one +/// is followed by a fresh-session turn, so the wording focuses on "write now +/// so you can read it back cleanly" rather than "/compact is coming". +const AUTO_RESET_CHECKPOINT_PROMPT: &str = + "[system] Context checkpoint before session auto-reset — no inbox message to handle.\n\n\ +The harness detected that the prompt-cache TTL has elapsed while your context is large. \ +Resuming with --continue would re-upload the full transcript uncached; starting a fresh \ +session is cheaper and equally effective since the cache is already cold.\n\n\ +Use THIS turn to flush anything worth keeping into your durable state files: update \ +your notes / TODO.md with in-flight task state, open questions, important paths, and \ +anything else you would need to resume cleanly from a fresh session. Do not start \ +new work or reply to anyone — just write your notes and end the turn."; + /// Token watermark for *proactive* compaction. Once a turn finishes with /// the last inference's context size at or above this many tokens, /// `drive_turn` runs one dedicated notes-checkpoint turn (so the agent @@ -191,6 +221,26 @@ pub fn rate_limit_sleep_secs() -> u64 { .unwrap_or(DEFAULT_RATE_LIMIT_SLEEP_SECS) } +/// Resolve the auto-reset watermark: `HIVE_AUTO_RESET_WATERMARK_TOKENS` if +/// set to a valid integer, else `DEFAULT_AUTO_RESET_WATERMARK_TOKENS`. `0` +/// disables auto-reset entirely. +fn auto_reset_watermark_tokens() -> u64 { + std::env::var("HIVE_AUTO_RESET_WATERMARK_TOKENS") + .ok() + .and_then(|s| s.trim().parse::().ok()) + .unwrap_or(DEFAULT_AUTO_RESET_WATERMARK_TOKENS) +} + +/// Resolve the assumed cache TTL: `HIVE_CACHE_TTL_SECS` if set, else +/// `DEFAULT_CACHE_TTL_SECS`. +fn cache_ttl_secs() -> u64 { + std::env::var("HIVE_CACHE_TTL_SECS") + .ok() + .and_then(|s| s.trim().parse::().ok()) + .filter(|&v| v > 0) + .unwrap_or(DEFAULT_CACHE_TTL_SECS) +} + /// Resolve the proactive-compaction watermark: `HIVE_COMPACT_WATERMARK_TOKENS` /// if set to a valid integer, else `DEFAULT_COMPACT_WATERMARK_TOKENS`. A /// value of `0` disables proactive compaction. @@ -201,21 +251,26 @@ fn compact_watermark_tokens() -> u64 { .unwrap_or(DEFAULT_COMPACT_WATERMARK_TOKENS) } -/// Drive one turn end-to-end. Two compaction paths layer on top of the -/// raw `run_turn`: +/// Drive one turn end-to-end. Three paths layer on top of the raw `run_turn`: /// -/// - **Reactive** — `run_turn` returns `PromptTooLong`: the session is -/// already past the context window and *no* turn can run on it, so we -/// compact immediately and retry the same wake-up prompt once. No +/// - **Auto-reset (pre-turn)** — context is large AND the prompt cache has +/// gone cold (idle gap ≥ cache TTL). Resuming would re-upload the full +/// transcript uncached at the same cost as a fresh start. The harness runs +/// one checkpoint turn (agent flushes state), then arms a one-shot +/// `request_new_session` so the actual turn starts fresh. +/// - **Reactive (on overflow)** — `run_turn` returns `PromptTooLong`: the +/// session is already past the context window and *no* turn can run on it, +/// so we compact immediately and retry the same wake-up prompt once. No /// notes-checkpoint turn is possible here — the detail is gone. -/// - **Proactive** — the turn finished cleanly but its context size has -/// crept past the watermark: while the session is still healthy we -/// give the agent one dedicated turn to checkpoint its `/state` notes, -/// then compact. This keeps a later turn from hitting the reactive -/// path (where there is no chance to save anything first). +/// - **Proactive (post-turn)** — the turn finished cleanly but its context +/// size has crept past the watermark: while the session is still healthy we +/// give the agent one dedicated turn to checkpoint its `/state` notes, then +/// compact. This keeps a later turn from hitting the reactive path (where +/// there is no chance to save anything first). /// /// Both the sub-agent and manager loops call this. pub async fn drive_turn(prompt: &str, files: &TurnFiles, bus: &Bus) -> TurnOutcome { + maybe_auto_reset(files, bus).await; let outcome = match run_turn(prompt, files, bus).await { TurnOutcome::PromptTooLong => { if let Err(e) = compact_session(files, bus).await { @@ -284,6 +339,57 @@ async fn maybe_checkpoint_and_compact(files: &TurnFiles, bus: &Bus) { } } +/// Pre-turn auto-reset check. If context is large AND the prompt cache has +/// gone cold (idle time ≥ cache TTL), run a notes-checkpoint turn so the agent +/// can persist durable state, then arm `request_new_session` so the actual +/// wake-up turn starts fresh. Best-effort — a failed checkpoint is noted and +/// the reset proceeds anyway (agent still gets a clean session). +async fn maybe_auto_reset(files: &TurnFiles, bus: &Bus) { + let watermark = auto_reset_watermark_tokens(); + if watermark == 0 { + return; // auto-reset disabled + } + let Some(ctx_tokens) = bus.last_ctx_usage().map(|u| u.context_tokens()) else { + return; // no usage reading yet — first turn, nothing to reset + }; + if ctx_tokens < watermark { + return; + } + let last_ended = bus.last_turn_ended_unix(); + if last_ended == 0 { + return; // no completed turn yet + } + // Compute idle seconds using the same clock as now_unix (unix epoch, i64). + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + let idle_secs = now.saturating_sub(u64::try_from(last_ended).unwrap_or(0)); + let ttl = cache_ttl_secs(); + if idle_secs < ttl { + return; + } + bus.emit(LiveEvent::Note { + text: format!( + "context {ctx_tokens} tokens, idle {idle_secs}s ≥ cache TTL {ttl}s \ + — running checkpoint then auto-resetting session" + ), + }); + match run_turn(AUTO_RESET_CHECKPOINT_PROMPT, files, bus).await { + TurnOutcome::Ok => {} + TurnOutcome::PromptTooLong => bus.emit(LiveEvent::Note { + text: "auto-reset checkpoint overflowed — resetting without checkpoint".into(), + }), + TurnOutcome::RateLimited => bus.emit(LiveEvent::Note { + text: "auto-reset checkpoint was rate-limited — resetting anyway".into(), + }), + TurnOutcome::Failed(e) => bus.emit(LiveEvent::Note { + text: format!("auto-reset checkpoint failed ({e:#}) — resetting anyway"), + }), + } + bus.request_new_session(); +} + /// Emit the per-turn `TurnEnd` event + log line. Single owner so the /// agent and manager loops agree on outcome semantics. pub fn emit_turn_end(bus: &Bus, outcome: &TurnOutcome) {