Compare commits
No commits in common. "25659ee9f3e3a094ff8a567fc4ec95d5a8284f4e" and "80dd5bb69e4b792bc2e0535d35aefe1a09c5198b" have entirely different histories.
25659ee9f3
...
80dd5bb69e
2 changed files with 11 additions and 110 deletions
|
|
@ -9,7 +9,7 @@
|
||||||
//! showing "connecting…" until the first event arrives.
|
//! showing "connecting…" until the first event arrives.
|
||||||
|
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use rusqlite::{Connection, params};
|
use rusqlite::{Connection, params};
|
||||||
|
|
@ -323,11 +323,6 @@ pub struct Bus {
|
||||||
/// `tool_call_count` + `tool_call_breakdown_json` columns on the
|
/// `tool_call_count` + `tool_call_breakdown_json` columns on the
|
||||||
/// per-turn stats sink.
|
/// per-turn stats sink.
|
||||||
tool_calls: Arc<Mutex<std::collections::HashMap<String, u64>>>,
|
tool_calls: Arc<Mutex<std::collections::HashMap<String, u64>>>,
|
||||||
/// Unix timestamp of the most recent completed turn (set by
|
|
||||||
/// `record_turn_usage`). Used by the auto-reset heuristic in
|
|
||||||
/// `turn.rs` to compute how long the session has been idle and
|
|
||||||
/// whether the prompt cache has gone cold. `0` = no turn yet.
|
|
||||||
last_turn_ended_unix: Arc<AtomicI64>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Bus {
|
impl Bus {
|
||||||
|
|
@ -355,7 +350,6 @@ impl Bus {
|
||||||
last_cost_usage: Arc::new(Mutex::new(None)),
|
last_cost_usage: Arc::new(Mutex::new(None)),
|
||||||
skip_continue_once: Arc::new(AtomicBool::new(false)),
|
skip_continue_once: Arc::new(AtomicBool::new(false)),
|
||||||
tool_calls: Arc::new(Mutex::new(std::collections::HashMap::new())),
|
tool_calls: Arc::new(Mutex::new(std::collections::HashMap::new())),
|
||||||
last_turn_ended_unix: Arc::new(AtomicI64::new(0)),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -428,17 +422,9 @@ impl Bus {
|
||||||
pub fn record_turn_usage(&self, ctx: TokenUsage, cost: TokenUsage) {
|
pub fn record_turn_usage(&self, ctx: TokenUsage, cost: TokenUsage) {
|
||||||
*self.last_ctx_usage.lock().unwrap() = Some(ctx);
|
*self.last_ctx_usage.lock().unwrap() = Some(ctx);
|
||||||
*self.last_cost_usage.lock().unwrap() = Some(cost);
|
*self.last_cost_usage.lock().unwrap() = Some(cost);
|
||||||
self.last_turn_ended_unix.store(now_unix(), Ordering::Relaxed);
|
|
||||||
self.emit(LiveEvent::TokenUsageChanged { ctx, cost });
|
self.emit(LiveEvent::TokenUsageChanged { ctx, cost });
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Unix timestamp of the most recent completed turn (`record_turn_usage`
|
|
||||||
/// call), or `0` if no turn has finished yet.
|
|
||||||
#[must_use]
|
|
||||||
pub fn last_turn_ended_unix(&self) -> i64 {
|
|
||||||
self.last_turn_ended_unix.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Walk a stream-json value for `tool_use` blocks and bump the
|
/// Walk a stream-json value for `tool_use` blocks and bump the
|
||||||
/// per-turn counter for each one we find. Called by the stdout
|
/// per-turn counter for each one we find. Called by the stdout
|
||||||
/// pump on every parsed line. Cheap when the line isn't an
|
/// pump on every parsed line. Cheap when the line isn't an
|
||||||
|
|
|
||||||
|
|
@ -54,27 +54,6 @@ const RATE_LIMIT_MARKERS: &[&str] = &[
|
||||||
/// capacity limits.
|
/// capacity limits.
|
||||||
const DEFAULT_RATE_LIMIT_SLEEP_SECS: u64 = 300;
|
const DEFAULT_RATE_LIMIT_SLEEP_SECS: u64 = 300;
|
||||||
|
|
||||||
/// Token watermark for *auto session-reset*. When context is at or above this
|
|
||||||
/// many tokens AND the prompt cache has gone cold (idle time >= `CACHE_TTL_SECS`),
|
|
||||||
/// the harness drops `--continue` so the next turn starts fresh. Running any
|
|
||||||
/// turn (even a checkpoint) before the reset would re-upload the full context
|
|
||||||
/// and warm the cache, defeating the cost purpose — so the reset happens
|
|
||||||
/// immediately with no preceding turn. Default is ~50% of a 200k-token
|
|
||||||
/// window; override via `HIVE_AUTO_RESET_WATERMARK_TOKENS`, or set to `0`
|
|
||||||
/// to disable.
|
|
||||||
const DEFAULT_AUTO_RESET_WATERMARK_TOKENS: u64 = 100_000;
|
|
||||||
|
|
||||||
/// Assumed prompt-cache TTL. Claude caches prompt prefixes — ~5 minutes on
|
|
||||||
/// the API (pay-per-token), ~1 hour on Claude Max (subscription). When the
|
|
||||||
/// idle gap exceeds this, the cache prefix has likely expired and the next
|
|
||||||
/// turn re-uploads the full transcript regardless of whether we resume or
|
|
||||||
/// start fresh. A fresh session with a small context is therefore equally
|
|
||||||
/// cheap but gives the model a clean slate. Default is 3600s (1h) matching
|
|
||||||
/// the subscription TTL; API (pay-per-token) users should set
|
|
||||||
/// `HIVE_CACHE_TTL_SECS=300`. Override via `HIVE_CACHE_TTL_SECS`; set to
|
|
||||||
/// `0` to disable (always resume).
|
|
||||||
const DEFAULT_CACHE_TTL_SECS: u64 = 3600;
|
|
||||||
|
|
||||||
/// Token watermark for *proactive* compaction. Once a turn finishes with
|
/// Token watermark for *proactive* compaction. Once a turn finishes with
|
||||||
/// the last inference's context size at or above this many tokens,
|
/// the last inference's context size at or above this many tokens,
|
||||||
/// `drive_turn` runs one dedicated notes-checkpoint turn (so the agent
|
/// `drive_turn` runs one dedicated notes-checkpoint turn (so the agent
|
||||||
|
|
@ -212,26 +191,6 @@ pub fn rate_limit_sleep_secs() -> u64 {
|
||||||
.unwrap_or(DEFAULT_RATE_LIMIT_SLEEP_SECS)
|
.unwrap_or(DEFAULT_RATE_LIMIT_SLEEP_SECS)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Resolve the auto-reset watermark: `HIVE_AUTO_RESET_WATERMARK_TOKENS` if
|
|
||||||
/// set to a valid integer, else `DEFAULT_AUTO_RESET_WATERMARK_TOKENS`. `0`
|
|
||||||
/// disables auto-reset entirely.
|
|
||||||
fn auto_reset_watermark_tokens() -> u64 {
|
|
||||||
std::env::var("HIVE_AUTO_RESET_WATERMARK_TOKENS")
|
|
||||||
.ok()
|
|
||||||
.and_then(|s| s.trim().parse::<u64>().ok())
|
|
||||||
.unwrap_or(DEFAULT_AUTO_RESET_WATERMARK_TOKENS)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Resolve the assumed cache TTL: `HIVE_CACHE_TTL_SECS` if set, else
|
|
||||||
/// `DEFAULT_CACHE_TTL_SECS`.
|
|
||||||
fn cache_ttl_secs() -> u64 {
|
|
||||||
std::env::var("HIVE_CACHE_TTL_SECS")
|
|
||||||
.ok()
|
|
||||||
.and_then(|s| s.trim().parse::<u64>().ok())
|
|
||||||
.filter(|&v| v > 0)
|
|
||||||
.unwrap_or(DEFAULT_CACHE_TTL_SECS)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Resolve the proactive-compaction watermark: `HIVE_COMPACT_WATERMARK_TOKENS`
|
/// Resolve the proactive-compaction watermark: `HIVE_COMPACT_WATERMARK_TOKENS`
|
||||||
/// if set to a valid integer, else `DEFAULT_COMPACT_WATERMARK_TOKENS`. A
|
/// if set to a valid integer, else `DEFAULT_COMPACT_WATERMARK_TOKENS`. A
|
||||||
/// value of `0` disables proactive compaction.
|
/// value of `0` disables proactive compaction.
|
||||||
|
|
@ -242,26 +201,21 @@ fn compact_watermark_tokens() -> u64 {
|
||||||
.unwrap_or(DEFAULT_COMPACT_WATERMARK_TOKENS)
|
.unwrap_or(DEFAULT_COMPACT_WATERMARK_TOKENS)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Drive one turn end-to-end. Three paths layer on top of the raw `run_turn`:
|
/// Drive one turn end-to-end. Two compaction paths layer on top of the
|
||||||
|
/// raw `run_turn`:
|
||||||
///
|
///
|
||||||
/// - **Auto-reset (pre-turn)** — context is large AND the prompt cache has
|
/// - **Reactive** — `run_turn` returns `PromptTooLong`: the session is
|
||||||
/// gone cold (idle gap ≥ cache TTL). Resuming would re-upload the full
|
/// already past the context window and *no* turn can run on it, so we
|
||||||
/// transcript uncached at the same cost as a fresh start. The harness runs
|
/// compact immediately and retry the same wake-up prompt once. No
|
||||||
/// one checkpoint turn (agent flushes state), then arms a one-shot
|
|
||||||
/// `request_new_session` so the actual turn starts fresh.
|
|
||||||
/// - **Reactive (on overflow)** — `run_turn` returns `PromptTooLong`: the
|
|
||||||
/// session is already past the context window and *no* turn can run on it,
|
|
||||||
/// so we compact immediately and retry the same wake-up prompt once. No
|
|
||||||
/// notes-checkpoint turn is possible here — the detail is gone.
|
/// notes-checkpoint turn is possible here — the detail is gone.
|
||||||
/// - **Proactive (post-turn)** — the turn finished cleanly but its context
|
/// - **Proactive** — the turn finished cleanly but its context size has
|
||||||
/// size has crept past the watermark: while the session is still healthy we
|
/// crept past the watermark: while the session is still healthy we
|
||||||
/// give the agent one dedicated turn to checkpoint its `/state` notes, then
|
/// give the agent one dedicated turn to checkpoint its `/state` notes,
|
||||||
/// compact. This keeps a later turn from hitting the reactive path (where
|
/// then compact. This keeps a later turn from hitting the reactive
|
||||||
/// there is no chance to save anything first).
|
/// path (where there is no chance to save anything first).
|
||||||
///
|
///
|
||||||
/// Both the sub-agent and manager loops call this.
|
/// Both the sub-agent and manager loops call this.
|
||||||
pub async fn drive_turn(prompt: &str, files: &TurnFiles, bus: &Bus) -> TurnOutcome {
|
pub async fn drive_turn(prompt: &str, files: &TurnFiles, bus: &Bus) -> TurnOutcome {
|
||||||
maybe_auto_reset(bus);
|
|
||||||
let outcome = match run_turn(prompt, files, bus).await {
|
let outcome = match run_turn(prompt, files, bus).await {
|
||||||
TurnOutcome::PromptTooLong => {
|
TurnOutcome::PromptTooLong => {
|
||||||
if let Err(e) = compact_session(files, bus).await {
|
if let Err(e) = compact_session(files, bus).await {
|
||||||
|
|
@ -330,45 +284,6 @@ async fn maybe_checkpoint_and_compact(files: &TurnFiles, bus: &Bus) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Pre-turn auto-reset check. If context is large AND the prompt cache has
|
|
||||||
/// gone cold (idle time >= cache TTL), arm `request_new_session` so the
|
|
||||||
/// next wake-up turn starts fresh. No preceding checkpoint turn — running
|
|
||||||
/// any turn before the reset would re-upload and re-warm the cache, which
|
|
||||||
/// defeats the cost-optimisation purpose entirely.
|
|
||||||
fn maybe_auto_reset(bus: &Bus) {
|
|
||||||
let watermark = auto_reset_watermark_tokens();
|
|
||||||
if watermark == 0 {
|
|
||||||
return; // auto-reset disabled
|
|
||||||
}
|
|
||||||
let Some(ctx_tokens) = bus.last_ctx_usage().map(|u| u.context_tokens()) else {
|
|
||||||
return; // no usage reading yet — first turn, nothing to reset
|
|
||||||
};
|
|
||||||
if ctx_tokens < watermark {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let last_ended = bus.last_turn_ended_unix();
|
|
||||||
if last_ended == 0 {
|
|
||||||
return; // no completed turn yet
|
|
||||||
}
|
|
||||||
// Compute idle seconds using the same clock as now_unix (unix epoch, i64).
|
|
||||||
let now = std::time::SystemTime::now()
|
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
|
||||||
.map(|d| d.as_secs())
|
|
||||||
.unwrap_or(0);
|
|
||||||
let idle_secs = now.saturating_sub(u64::try_from(last_ended).unwrap_or(0));
|
|
||||||
let ttl = cache_ttl_secs();
|
|
||||||
if idle_secs < ttl {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
bus.emit(LiveEvent::Note {
|
|
||||||
text: format!(
|
|
||||||
"context {ctx_tokens} tokens, idle {idle_secs}s >= cache TTL {ttl}s \
|
|
||||||
— dropping session (cache cold, fresh start is equally cheap)"
|
|
||||||
),
|
|
||||||
});
|
|
||||||
bus.request_new_session();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Emit the per-turn `TurnEnd` event + log line. Single owner so the
|
/// Emit the per-turn `TurnEnd` event + log line. Single owner so the
|
||||||
/// agent and manager loops agree on outcome semantics.
|
/// agent and manager loops agree on outcome semantics.
|
||||||
pub fn emit_turn_end(bus: &Bus, outcome: &TurnOutcome) {
|
pub fn emit_turn_end(bus: &Bus, outcome: &TurnOutcome) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue