diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index 0b1cf10..5073497 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -222,6 +222,21 @@ async fn serve( if matches!(outcome, turn::TurnOutcome::Ok) { ack_turn(socket).await; } + // Rate-limited: park until the quota resets, then requeue + // the unacked message so it resurfaces in the same session. + if matches!(outcome, turn::TurnOutcome::RateLimited) { + let secs = turn::rate_limit_sleep_secs(); + bus.emit_status("rate_limited"); + bus.emit(LiveEvent::Note { + text: format!( + "API rate-limited — sleeping {secs}s before retry" + ), + }); + tracing::warn!(sleep_secs = secs, "rate-limited; parking"); + tokio::time::sleep(Duration::from_secs(secs)).await; + requeue_inflight(socket).await; + bus.emit_status("online"); + } // Failures are unhandled by definition — PromptTooLong is // absorbed inside drive_turn via compaction, so anything // that reaches Failed here is a real crash. Notify the @@ -442,6 +457,7 @@ fn build_row( let (result_kind, note) = match outcome { turn::TurnOutcome::Ok => ("ok", None), turn::TurnOutcome::PromptTooLong => ("prompt_too_long", None), + turn::TurnOutcome::RateLimited => ("rate_limited", None), turn::TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))), }; TurnStatRow { diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 213b050..74cc488 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -183,12 +183,27 @@ async fn serve( }; turn::emit_turn_end(&bus, &outcome); bus.set_state(TurnState::Idle); - // Ack only on a clean turn-end; Failed leaves the - // popped ids in-flight for the next boot's requeue. + // Ack only on a clean turn-end; Failed / RateLimited leave + // the popped ids in-flight for the next boot's requeue. // Mirrors hive-ag3nt; see that loop for full rationale. if matches!(outcome, turn::TurnOutcome::Ok) { ack_turn(socket).await; } + // Rate-limited: park until the quota resets, then requeue + // the unacked message so it resurfaces in the same session. + if matches!(outcome, turn::TurnOutcome::RateLimited) { + let secs = turn::rate_limit_sleep_secs(); + bus.emit_status("rate_limited"); + bus.emit(LiveEvent::Note { + text: format!( + "API rate-limited — sleeping {secs}s before retry" + ), + }); + tracing::warn!(sleep_secs = secs, "rate-limited; parking"); + tokio::time::sleep(Duration::from_secs(secs)).await; + requeue_inflight(socket).await; + bus.emit_status("online"); + } if let Some(s) = &stats { let ended_at = now_unix(); let duration_ms = @@ -364,6 +379,7 @@ fn build_row( let (result_kind, note) = match outcome { turn::TurnOutcome::Ok => ("ok", None), turn::TurnOutcome::PromptTooLong => ("prompt_too_long", None), + turn::TurnOutcome::RateLimited => ("rate_limited", None), turn::TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))), }; TurnStatRow { diff --git a/hive-ag3nt/src/turn.rs b/hive-ag3nt/src/turn.rs index 3cb28fc..6eac490 100644 --- a/hive-ag3nt/src/turn.rs +++ b/hive-ag3nt/src/turn.rs @@ -34,6 +34,26 @@ const CLAUDE_SETTINGS: &str = include_str!("../prompts/claude-settings.json"); /// claude exit with a useful error in the live view. const PROMPT_TOO_LONG_MARKER: &str = "Prompt is too long"; +/// Substrings that indicate the Anthropic API is refusing the request due +/// to a rate limit, per-account usage cap, or exhausted credit balance. +/// Matched against both stdout and stderr; any hit returns +/// `TurnOutcome::RateLimited` so the serve loop can park + retry instead +/// of propagating a hard failure that looks identical to a crash. +const RATE_LIMIT_MARKERS: &[&str] = &[ + "rate_limit_error", + "overloaded_error", + "Credit balance is too low", + "Usage limit reached", + "Request rate limit exceeded", +]; + +/// How long to sleep after detecting a rate-limit before re-entering the +/// serve loop. Overridable via `HIVE_RATE_LIMIT_SLEEP_SECS`. Default is +/// 5 minutes — enough for most short-lived throttles; the operator can +/// tune down for tight retry scenarios or up if they're hitting sustained +/// capacity limits. +const DEFAULT_RATE_LIMIT_SLEEP_SECS: u64 = 300; + /// Token watermark for *proactive* compaction. Once a turn finishes with /// the last inference's context size at or above this many tokens, /// `drive_turn` runs one dedicated notes-checkpoint turn (so the agent @@ -153,9 +173,24 @@ pub enum TurnOutcome { /// claude saw "Prompt is too long" — the session needs compacting. /// Run `compact_session()` then retry the same wake-up prompt. PromptTooLong, + /// The Anthropic API refused the request due to a rate limit, per-account + /// usage cap, or exhausted credit balance. The serve loop should park for + /// `rate_limit_sleep_secs()` and retry — NOT bubble up as a crash. + RateLimited, Failed(anyhow::Error), } +/// How long to sleep after a rate-limit before re-entering the serve loop. +/// Reads `HIVE_RATE_LIMIT_SLEEP_SECS` if set to a valid positive integer. +#[must_use] +pub fn rate_limit_sleep_secs() -> u64 { + std::env::var("HIVE_RATE_LIMIT_SLEEP_SECS") + .ok() + .and_then(|s| s.trim().parse::().ok()) + .filter(|&v| v > 0) + .unwrap_or(DEFAULT_RATE_LIMIT_SLEEP_SECS) +} + /// Resolve the proactive-compaction watermark: `HIVE_COMPACT_WATERMARK_TOKENS` /// if set to a valid integer, else `DEFAULT_COMPACT_WATERMARK_TOKENS`. A /// value of `0` disables proactive compaction. @@ -189,6 +224,9 @@ pub async fn drive_turn(prompt: &str, files: &TurnFiles, bus: &Bus) -> TurnOutco } run_turn(prompt, files, bus).await } + // Rate-limited: no point retrying immediately — bubble up so the + // serve loop can park + emit status before the next attempt. + TurnOutcome::RateLimited => return TurnOutcome::RateLimited, other => other, }; // Proactive: a turn just completed on a still-healthy session. If its @@ -231,6 +269,9 @@ async fn maybe_checkpoint_and_compact(files: &TurnFiles, bus: &Bus) { TurnOutcome::PromptTooLong => bus.emit(LiveEvent::Note { text: "checkpoint turn overflowed the window — compacting without it".into(), }), + TurnOutcome::RateLimited => bus.emit(LiveEvent::Note { + text: "checkpoint turn was rate-limited — compacting anyway".into(), + }), TurnOutcome::Failed(e) => bus.emit(LiveEvent::Note { text: format!("checkpoint turn failed ({e:#}) — compacting anyway"), }), @@ -254,6 +295,13 @@ pub fn emit_turn_end(bus: &Bus, outcome: &TurnOutcome) { }); tracing::info!("turn finished"); } + TurnOutcome::RateLimited => { + bus.emit(LiveEvent::TurnEnd { + ok: false, + note: Some("rate limited — parking until quota resets".into()), + }); + tracing::warn!("turn rate-limited"); + } TurnOutcome::Failed(e) => { let note = format!("{e:#}"); bus.emit(LiveEvent::TurnEnd { @@ -298,7 +346,8 @@ pub async fn wait_for_login( /// doesn't stall mid-turn — hyperhive owns compaction. pub async fn run_turn(prompt: &str, files: &TurnFiles, bus: &Bus) -> TurnOutcome { match run_claude(prompt, files, bus).await { - Ok(too_long) if too_long => TurnOutcome::PromptTooLong, + Ok((too_long, _)) if too_long => TurnOutcome::PromptTooLong, + Ok((_, rate_limited)) if rate_limited => TurnOutcome::RateLimited, Ok(_) => TurnOutcome::Ok, Err(e) => TurnOutcome::Failed(e), } @@ -314,7 +363,7 @@ pub async fn compact_session(files: &TurnFiles, bus: &Bus) -> Result<()> { bus.emit(LiveEvent::Note { text: "context overflow — running /compact on the persistent session".into(), }); - let _ = run_claude("/compact", files, bus).await?; + let (_, _) = run_claude("/compact", files, bus).await?; bus.emit(LiveEvent::Note { text: "/compact done".into(), }); @@ -322,7 +371,7 @@ pub async fn compact_session(files: &TurnFiles, bus: &Bus) -> Result<()> { } #[allow(clippy::too_many_lines)] -async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result { +async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<(bool, bool)> { // Keep the last STDERR_TAIL_LINES of stderr so a non-zero exit can // include real context in the bail message (and downstream in the // failure notification to the manager) instead of just "exit 1". @@ -377,8 +426,11 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result let stderr = child.stderr.take().expect("piped stderr"); let prompt_too_long = Arc::new(AtomicBool::new(false)); + let rate_limited = Arc::new(AtomicBool::new(false)); let flag_out = prompt_too_long.clone(); let flag_err = prompt_too_long.clone(); + let rate_out = rate_limited.clone(); + let rate_err = rate_limited.clone(); let bus_out = bus.clone(); let bus_err = bus.clone(); let pump_stdout = tokio::spawn(async move { @@ -394,6 +446,9 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result if line.contains(PROMPT_TOO_LONG_MARKER) { flag_out.store(true, Ordering::Relaxed); } + if RATE_LIMIT_MARKERS.iter().any(|m| line.contains(m)) { + rate_out.store(true, Ordering::Relaxed); + } match serde_json::from_str::(&line) { Ok(v) => { if let Some(u) = crate::events::TokenUsage::from_assistant_event(&v) { @@ -424,6 +479,9 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result if line.contains(PROMPT_TOO_LONG_MARKER) { flag_err.store(true, Ordering::Relaxed); } + if RATE_LIMIT_MARKERS.iter().any(|m| line.contains(m)) { + rate_err.store(true, Ordering::Relaxed); + } // Mirror to journald so post-mortems work without the web UI // or the events sqlite. The bus event is what the dashboard // renders; the tracing line is what `journalctl -M -b` @@ -444,7 +502,8 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result let _ = pump_stdout.await; let _ = pump_stderr.await; let too_long = prompt_too_long.load(Ordering::Relaxed); - if !status.success() && !too_long { + let is_rate_limited = rate_limited.load(Ordering::Relaxed); + if !status.success() && !too_long && !is_rate_limited { let tail = stderr_tail.lock().unwrap(); if tail.is_empty() { bail!("claude exited {status} (no stderr)"); @@ -452,5 +511,5 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result let tail_str = tail.iter().cloned().collect::>().join("\n"); bail!("claude exited {status}\nstderr tail:\n{tail_str}"); } - Ok(too_long) + Ok((too_long, is_rate_limited)) }