//! Per-turn claude invocation shared by `hive-ag3nt` and `hive-m1nd`. The //! two binaries differ only in their MCP `Flavor` (agent surface vs. //! manager surface) and their wake-prompt wording; the spawn shape, //! arg-vector, stdin plumbing, and stream-json pumping are identical. use std::collections::VecDeque; use std::path::{Path, PathBuf}; use std::process::Stdio; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::{Result, bail}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::Command; use crate::events::{Bus, LiveEvent}; use crate::login::{self, LoginState}; use crate::mcp; /// `--settings` JSON applied to every claude invocation. Lives as a /// properly-formatted file in `prompts/claude-settings.json` so it's easy /// to read and edit; we ship it via `include_str!`. We turn off claude's /// in-session auto-compaction and its cross-session auto-memory because /// hyperhive owns those concerns (`/compact` on overflow, notes /// persistence under `/state`). Unknown keys are silently ignored by /// claude-code; if a key gets renamed we'll spot it because the /// corresponding behavior will start firing mid-turn again. const CLAUDE_SETTINGS: &str = include_str!("../prompts/claude-settings.json"); /// Regex-ish marker claude-code emits when context overflows. Same string /// bitburner-agent watches for. Empirically reliable across claude-code /// versions; if it ever changes, compaction won't fire and we'll see a /// claude exit with a useful error in the live view. const PROMPT_TOO_LONG_MARKER: &str = "Prompt is too long"; /// Substrings that indicate the Anthropic API is refusing the request due /// to a rate limit, per-account usage cap, or exhausted credit balance. /// Matched against both stdout and stderr; any hit returns /// `TurnOutcome::RateLimited` so the serve loop can park + retry instead /// of propagating a hard failure that looks identical to a crash. const RATE_LIMIT_MARKERS: &[&str] = &[ "rate_limit_error", "overloaded_error", "Credit balance is too low", "Usage limit reached", "Request rate limit exceeded", ]; /// How long to sleep after detecting a rate-limit before re-entering the /// serve loop. Overridable via `HIVE_RATE_LIMIT_SLEEP_SECS`. Default is /// 5 minutes — enough for most short-lived throttles; the operator can /// tune down for tight retry scenarios or up if they're hitting sustained /// capacity limits. const DEFAULT_RATE_LIMIT_SLEEP_SECS: u64 = 300; /// Token watermark for *auto session-reset*. When context is at or above this /// many tokens AND the prompt cache has gone cold (idle time ≥ `CACHE_TTL_SECS`), /// the harness runs a notes-checkpoint turn then drops `--continue` so the /// next turn starts fresh. Sending a large context uncached costs the same as /// a fresh start; the reset avoids paying for a long history the model won't /// benefit from. Default is ~50% of a 200k-token window; override via /// `HIVE_AUTO_RESET_WATERMARK_TOKENS`, or set to `0` to disable. const DEFAULT_AUTO_RESET_WATERMARK_TOKENS: u64 = 100_000; /// Assumed prompt-cache TTL. Claude's API caches prompt prefixes for ~5 /// minutes; after that the prefix must be re-uploaded on the next turn. /// When the idle gap exceeds this, a large context costs the same whether /// we resume or start fresh — so we start fresh. Override via /// `HIVE_CACHE_TTL_SECS`. const DEFAULT_CACHE_TTL_SECS: u64 = 300; /// Synthetic checkpoint prompt injected before an auto-reset turn. Unlike the /// proactive-compaction checkpoint (which is followed by `/compact`), this one /// is followed by a fresh-session turn, so the wording focuses on "write now /// so you can read it back cleanly" rather than "/compact is coming". const AUTO_RESET_CHECKPOINT_PROMPT: &str = "[system] Context checkpoint before session auto-reset — no inbox message to handle.\n\n\ The harness detected that the prompt-cache TTL has elapsed while your context is large. \ Resuming with --continue would re-upload the full transcript uncached; starting a fresh \ session is cheaper and equally effective since the cache is already cold.\n\n\ Use THIS turn to flush anything worth keeping into your durable state files: update \ your notes / TODO.md with in-flight task state, open questions, important paths, and \ anything else you would need to resume cleanly from a fresh session. Do not start \ new work or reply to anyone — just write your notes and end the turn."; /// Token watermark for *proactive* compaction. Once a turn finishes with /// the last inference's context size at or above this many tokens, /// `drive_turn` runs one dedicated notes-checkpoint turn (so the agent /// can flush durable state into `/state`) and then `/compact` — while the /// session is still healthy enough to run a turn at all. This is distinct /// from the reactive `PROMPT_TOO_LONG_MARKER` path, which only fires once /// the session is *already* past the window: at that point no turn can /// run on it, so the reactive path just compacts + retries with no /// checkpoint. Default is ~75% of a 200k-token window; override via /// `HIVE_COMPACT_WATERMARK_TOKENS`, or set that to `0` to disable /// proactive compaction entirely (the reactive path always applies). const DEFAULT_COMPACT_WATERMARK_TOKENS: u64 = 150_000; /// Synthetic wake prompt for the proactive notes-checkpoint turn. Not an /// inbox message — the harness injects it directly so the agent gets one /// turn to persist durable state before `/compact` collapses the /// turn-by-turn history into a summary. const CHECKPOINT_PROMPT: &str = "[system] Context checkpoint — no inbox message to handle.\n\n\ Your conversation context has grown large and the harness is about to run `/compact`, \ which collapses the detailed turn-by-turn history into a short summary. Anything you \ do not persist now is effectively lost after the next turn.\n\n\ Use THIS turn to flush anything worth keeping into your durable `/state` files: update \ your notes / CLAUDE.md / TODO.md with in-flight task state, decisions made, important \ file paths, and whatever you would need to resume cleanly with only a summary of this \ conversation to go on. Do not start new work or reply to anyone — just write your notes \ and end the turn."; /// The set of files claude reads on every invocation: the MCP server /// config (`--mcp-config`), static settings (`--settings`), and the /// pre-rendered role/tools system prompt (`--system-prompt-file`). /// Materialised once at harness startup; shared between the turn loop /// and the operator-driven `/compact` path so both invocations look /// identical to claude (same MCP surface, same allowed tools, same /// role prompt — only the stdin payload differs). #[derive(Clone)] pub struct TurnFiles { pub mcp_config: PathBuf, pub settings: PathBuf, pub system_prompt: PathBuf, pub flavor: mcp::Flavor, } impl TurnFiles { /// Write all three files into the per-agent runtime dir alongside /// `socket`. Idempotent — overwrites whatever was there. pub async fn prepare(socket: &Path, label: &str, flavor: mcp::Flavor) -> Result { Ok(Self { mcp_config: write_mcp_config(socket).await?, settings: write_settings(socket).await?, system_prompt: write_system_prompt(socket, label, flavor).await?, flavor, }) } } /// Drop the MCP config blob claude reads from `--mcp-config `. /// `socket` is the hyperhive per-container socket (forwarded to the child /// as `--socket `); `binary_subcommand` is e.g. `"mcp"` for sub-agents /// or `"mcp"` for the manager (both binaries name their MCP subcommand the /// same — the differentiator is which binary `/proc/self/exe` resolves to). pub async fn write_mcp_config(socket: &Path) -> Result { let parent = socket.parent().unwrap_or_else(|| Path::new("/run/hive")); tokio::fs::create_dir_all(parent).await.ok(); let path = parent.join("claude-mcp-config.json"); let exe = std::env::current_exe() .ok() .map_or_else(|| "hive-ag3nt".into(), |p| p.display().to_string()); let body = mcp::render_claude_config(&exe, socket); tokio::fs::write(&path, body).await?; tracing::info!(path = %path.display(), "wrote claude MCP config"); Ok(path) } /// Drop the static `--settings` JSON next to the MCP config so we can /// pass a path (`--settings `) instead of an ever-growing inline /// blob — the CLI argv has a finite length budget. pub async fn write_settings(socket: &Path) -> Result { let parent = socket.parent().unwrap_or_else(|| Path::new("/run/hive")); tokio::fs::create_dir_all(parent).await.ok(); let path = parent.join("claude-settings.json"); tokio::fs::write(&path, CLAUDE_SETTINGS).await?; tracing::info!(path = %path.display(), "wrote claude settings"); Ok(path) } /// Write the agent's / manager's static system prompt to a file next to /// the MCP config and return the path. Passed to claude via /// `--system-prompt-file`, replacing claude's default system prompt with /// the role + tools instructions. Per-turn prompts become much smaller /// (just the wake message body). pub async fn write_system_prompt( socket: &Path, label: &str, flavor: mcp::Flavor, ) -> Result { let parent = socket.parent().unwrap_or_else(|| Path::new("/run/hive")); tokio::fs::create_dir_all(parent).await.ok(); let template = match flavor { mcp::Flavor::Agent => include_str!("../prompts/agent.md"), mcp::Flavor::Manager => include_str!("../prompts/manager.md"), }; let pronouns = std::env::var("HIVE_OPERATOR_PRONOUNS").unwrap_or_else(|_| "she/her".to_owned()); let body = template .replace("{label}", label) .replace("{operator_pronouns}", &pronouns); let path = parent.join("claude-system-prompt.md"); tokio::fs::write(&path, body).await?; tracing::info!(path = %path.display(), "wrote claude system prompt"); Ok(path) } /// One claude turn's outcome. The harness uses this to decide whether to /// transparently kick off a compaction and retry. #[derive(Debug)] pub enum TurnOutcome { Ok, /// claude saw "Prompt is too long" — the session needs compacting. /// Run `compact_session()` then retry the same wake-up prompt. PromptTooLong, /// The Anthropic API refused the request due to a rate limit, per-account /// usage cap, or exhausted credit balance. The serve loop should park for /// `rate_limit_sleep_secs()` and retry — NOT bubble up as a crash. RateLimited, Failed(anyhow::Error), } /// How long to sleep after a rate-limit before re-entering the serve loop. /// Reads `HIVE_RATE_LIMIT_SLEEP_SECS` if set to a valid positive integer. #[must_use] pub fn rate_limit_sleep_secs() -> u64 { std::env::var("HIVE_RATE_LIMIT_SLEEP_SECS") .ok() .and_then(|s| s.trim().parse::().ok()) .filter(|&v| v > 0) .unwrap_or(DEFAULT_RATE_LIMIT_SLEEP_SECS) } /// Resolve the auto-reset watermark: `HIVE_AUTO_RESET_WATERMARK_TOKENS` if /// set to a valid integer, else `DEFAULT_AUTO_RESET_WATERMARK_TOKENS`. `0` /// disables auto-reset entirely. fn auto_reset_watermark_tokens() -> u64 { std::env::var("HIVE_AUTO_RESET_WATERMARK_TOKENS") .ok() .and_then(|s| s.trim().parse::().ok()) .unwrap_or(DEFAULT_AUTO_RESET_WATERMARK_TOKENS) } /// Resolve the assumed cache TTL: `HIVE_CACHE_TTL_SECS` if set, else /// `DEFAULT_CACHE_TTL_SECS`. fn cache_ttl_secs() -> u64 { std::env::var("HIVE_CACHE_TTL_SECS") .ok() .and_then(|s| s.trim().parse::().ok()) .filter(|&v| v > 0) .unwrap_or(DEFAULT_CACHE_TTL_SECS) } /// Resolve the proactive-compaction watermark: `HIVE_COMPACT_WATERMARK_TOKENS` /// if set to a valid integer, else `DEFAULT_COMPACT_WATERMARK_TOKENS`. A /// value of `0` disables proactive compaction. fn compact_watermark_tokens() -> u64 { std::env::var("HIVE_COMPACT_WATERMARK_TOKENS") .ok() .and_then(|s| s.trim().parse::().ok()) .unwrap_or(DEFAULT_COMPACT_WATERMARK_TOKENS) } /// Drive one turn end-to-end. Three paths layer on top of the raw `run_turn`: /// /// - **Auto-reset (pre-turn)** — context is large AND the prompt cache has /// gone cold (idle gap ≥ cache TTL). Resuming would re-upload the full /// transcript uncached at the same cost as a fresh start. The harness runs /// one checkpoint turn (agent flushes state), then arms a one-shot /// `request_new_session` so the actual turn starts fresh. /// - **Reactive (on overflow)** — `run_turn` returns `PromptTooLong`: the /// session is already past the context window and *no* turn can run on it, /// so we compact immediately and retry the same wake-up prompt once. No /// notes-checkpoint turn is possible here — the detail is gone. /// - **Proactive (post-turn)** — the turn finished cleanly but its context /// size has crept past the watermark: while the session is still healthy we /// give the agent one dedicated turn to checkpoint its `/state` notes, then /// compact. This keeps a later turn from hitting the reactive path (where /// there is no chance to save anything first). /// /// Both the sub-agent and manager loops call this. pub async fn drive_turn(prompt: &str, files: &TurnFiles, bus: &Bus) -> TurnOutcome { maybe_auto_reset(files, bus).await; let outcome = match run_turn(prompt, files, bus).await { TurnOutcome::PromptTooLong => { if let Err(e) = compact_session(files, bus).await { tracing::warn!(error = %format!("{e:#}"), "compact failed"); return TurnOutcome::Failed(e); } run_turn(prompt, files, bus).await } // Rate-limited: no point retrying immediately — bubble up so the // serve loop can park + emit status before the next attempt. TurnOutcome::RateLimited => return TurnOutcome::RateLimited, other => other, }; // Proactive: a turn just completed on a still-healthy session. If its // context crossed the watermark, checkpoint + compact before a later // turn overflows into the reactive path. Best-effort — never changes // the outcome of the turn that already succeeded. if matches!(outcome, TurnOutcome::Ok) { maybe_checkpoint_and_compact(files, bus).await; } outcome } /// Proactive post-turn compaction. If the last inference's context size /// has crossed the watermark, run one notes-checkpoint turn so the agent /// can persist durable state, then `/compact`. Best-effort: a failed /// checkpoint or compaction is logged + surfaced as a Note but never /// fails the turn that already succeeded. async fn maybe_checkpoint_and_compact(files: &TurnFiles, bus: &Bus) { let watermark = compact_watermark_tokens(); if watermark == 0 { return; // proactive compaction disabled } let Some(used) = bus.last_ctx_usage().map(|u| u.context_tokens()) else { return; // no usage reading yet — nothing to compare against }; if used < watermark { return; } bus.emit(LiveEvent::Note { text: format!( "context at {used} tokens (watermark {watermark}) — running a \ notes-checkpoint turn before /compact" ), }); // Give the agent one turn to flush durable state into /state. If the // session is somehow already too far gone to run even this, fall // through to compaction anyway — the checkpoint is best-effort. match run_turn(CHECKPOINT_PROMPT, files, bus).await { TurnOutcome::Ok => {} TurnOutcome::PromptTooLong => bus.emit(LiveEvent::Note { text: "checkpoint turn overflowed the window — compacting without it".into(), }), TurnOutcome::RateLimited => bus.emit(LiveEvent::Note { text: "checkpoint turn was rate-limited — compacting anyway".into(), }), TurnOutcome::Failed(e) => bus.emit(LiveEvent::Note { text: format!("checkpoint turn failed ({e:#}) — compacting anyway"), }), } if let Err(e) = compact_session(files, bus).await { tracing::warn!(error = %format!("{e:#}"), "post-checkpoint compact failed"); bus.emit(LiveEvent::Note { text: format!("/compact after checkpoint failed: {e:#}"), }); } } /// Pre-turn auto-reset check. If context is large AND the prompt cache has /// gone cold (idle time ≥ cache TTL), run a notes-checkpoint turn so the agent /// can persist durable state, then arm `request_new_session` so the actual /// wake-up turn starts fresh. Best-effort — a failed checkpoint is noted and /// the reset proceeds anyway (agent still gets a clean session). async fn maybe_auto_reset(files: &TurnFiles, bus: &Bus) { let watermark = auto_reset_watermark_tokens(); if watermark == 0 { return; // auto-reset disabled } let Some(ctx_tokens) = bus.last_ctx_usage().map(|u| u.context_tokens()) else { return; // no usage reading yet — first turn, nothing to reset }; if ctx_tokens < watermark { return; } let last_ended = bus.last_turn_ended_unix(); if last_ended == 0 { return; // no completed turn yet } // Compute idle seconds using the same clock as now_unix (unix epoch, i64). let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_secs()) .unwrap_or(0); let idle_secs = now.saturating_sub(u64::try_from(last_ended).unwrap_or(0)); let ttl = cache_ttl_secs(); if idle_secs < ttl { return; } bus.emit(LiveEvent::Note { text: format!( "context {ctx_tokens} tokens, idle {idle_secs}s ≥ cache TTL {ttl}s \ — running checkpoint then auto-resetting session" ), }); match run_turn(AUTO_RESET_CHECKPOINT_PROMPT, files, bus).await { TurnOutcome::Ok => {} TurnOutcome::PromptTooLong => bus.emit(LiveEvent::Note { text: "auto-reset checkpoint overflowed — resetting without checkpoint".into(), }), TurnOutcome::RateLimited => bus.emit(LiveEvent::Note { text: "auto-reset checkpoint was rate-limited — resetting anyway".into(), }), TurnOutcome::Failed(e) => bus.emit(LiveEvent::Note { text: format!("auto-reset checkpoint failed ({e:#}) — resetting anyway"), }), } bus.request_new_session(); } /// Emit the per-turn `TurnEnd` event + log line. Single owner so the /// agent and manager loops agree on outcome semantics. pub fn emit_turn_end(bus: &Bus, outcome: &TurnOutcome) { match outcome { TurnOutcome::Ok | TurnOutcome::PromptTooLong => { bus.emit(LiveEvent::TurnEnd { ok: true, note: None, }); tracing::info!("turn finished"); } TurnOutcome::RateLimited => { bus.emit(LiveEvent::TurnEnd { ok: false, note: Some("rate limited — parking until quota resets".into()), }); tracing::warn!("turn rate-limited"); } TurnOutcome::Failed(e) => { let note = format!("{e:#}"); bus.emit(LiveEvent::TurnEnd { ok: false, note: Some(note.clone()), }); tracing::warn!(error = %note, "turn failed"); } } } /// Block until the bound `~/.claude/` dir contains a session, polling /// `claude_dir` on a `poll_ms` interval (min 2s). Flips `state` to /// `Online` when login lands; caller resumes its serve loop. pub async fn wait_for_login( claude_dir: &Path, state: Arc>, bus: &Bus, poll_ms: u64, ) { tracing::warn!( claude_dir = %claude_dir.display(), "no claude session — staying in partial-run mode (web UI only)" ); let probe = Duration::from_millis(poll_ms.max(2000)); loop { tokio::time::sleep(probe).await; if login::has_session(claude_dir) { tracing::info!("claude session detected — entering turn loop"); *state.lock().unwrap() = LoginState::Online; bus.emit_status("online"); return; } } } /// Spawn `claude` for one turn and pump `stream-json` stdout into the /// live event bus. Prompt goes over stdin (variadic /// `--allowedTools`/`--tools` would otherwise eat a trailing positional /// prompt). The session is persistent across turns via `--continue` and /// claude's in-session auto-compact is disabled via `--settings` so it /// doesn't stall mid-turn — hyperhive owns compaction. pub async fn run_turn(prompt: &str, files: &TurnFiles, bus: &Bus) -> TurnOutcome { match run_claude(prompt, files, bus).await { Ok((too_long, _)) if too_long => TurnOutcome::PromptTooLong, Ok((_, rate_limited)) if rate_limited => TurnOutcome::RateLimited, Ok(_) => TurnOutcome::Ok, Err(e) => TurnOutcome::Failed(e), } } /// Run claude's built-in `/compact` slash command on the persistent /// session. Takes the *same* params as `run_turn` because compact /// re-initialises claude with the full session shape — same MCP /// surface, same system prompt, same allowed-tools — so the post- /// compact state matches a normal turn's. Only the prompt over stdin /// differs (`/compact` vs the wake-up payload). pub async fn compact_session(files: &TurnFiles, bus: &Bus) -> Result<()> { bus.emit(LiveEvent::Note { text: "context overflow — running /compact on the persistent session".into(), }); let (_, _) = run_claude("/compact", files, bus).await?; bus.emit(LiveEvent::Note { text: "/compact done".into(), }); Ok(()) } #[allow(clippy::too_many_lines)] async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<(bool, bool)> { // Keep the last STDERR_TAIL_LINES of stderr so a non-zero exit can // include real context in the bail message (and downstream in the // failure notification to the manager) instead of just "exit 1". const STDERR_TAIL_LINES: usize = 20; let model = bus.model(); let resume = !bus.take_skip_continue(); if !resume { bus.emit(LiveEvent::Note { text: "fresh session (--continue suppressed for this turn)".into(), }); } let mut cmd = Command::new("claude"); // Spawn inside the agent's state dir so relative paths in tool calls // (Read foo.md, Bash ls, Write notes.md) land in the durable dir // instead of wherever the harness systemd unit started. Falls back // silently if the dir is missing (dev / test without the bind mount). let state_dir = crate::paths::state_dir(); if state_dir.is_dir() { cmd.current_dir(&state_dir); } cmd.arg("--print") .arg("--verbose") .arg("--output-format") .arg("stream-json") .arg("--model") .arg(&model) .arg("--settings") .arg(&files.settings); if resume { cmd.arg("--continue"); } cmd.arg("--system-prompt-file").arg(&files.system_prompt); cmd.arg("--mcp-config") .arg(&files.mcp_config) .arg("--strict-mcp-config") .arg("--tools") .arg(mcp::builtin_tools_arg()) .arg("--allowedTools") .arg(mcp::allowed_tools_arg(files.flavor)); let mut child = cmd .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn()?; if let Some(mut stdin) = child.stdin.take() { stdin.write_all(prompt.as_bytes()).await?; stdin.shutdown().await.ok(); drop(stdin); } let stdout = child.stdout.take().expect("piped stdout"); let stderr = child.stderr.take().expect("piped stderr"); let prompt_too_long = Arc::new(AtomicBool::new(false)); let rate_limited = Arc::new(AtomicBool::new(false)); let flag_out = prompt_too_long.clone(); let flag_err = prompt_too_long.clone(); let rate_out = rate_limited.clone(); let rate_err = rate_limited.clone(); let bus_out = bus.clone(); let bus_err = bus.clone(); let pump_stdout = tokio::spawn(async move { let mut reader = BufReader::new(stdout).lines(); // Track usage as the turn unfolds. `last_inference` overwrites on // every assistant event so at result-time it holds the most recent // model call's usage — the actual context size. The `result` event // carries the cumulative-across-the-turn usage (cost signal). Both // get handed to `record_turn_usage` together so a single SSE // event updates both badges. let mut last_inference: Option = None; while let Ok(Some(line)) = reader.next_line().await { if line.contains(PROMPT_TOO_LONG_MARKER) { flag_out.store(true, Ordering::Relaxed); } if RATE_LIMIT_MARKERS.iter().any(|m| line.contains(m)) { rate_out.store(true, Ordering::Relaxed); } match serde_json::from_str::(&line) { Ok(v) => { if let Some(u) = crate::events::TokenUsage::from_assistant_event(&v) { last_inference = Some(u); } if let Some(cost) = crate::events::TokenUsage::from_stream_event(&v) { // Fallback to `cost` if the turn somehow produced // a result without any assistant event — keeps the // ctx badge from going stale on a degenerate turn. let ctx = last_inference.unwrap_or(cost); bus_out.record_turn_usage(ctx, cost); } bus_out.observe_stream(&v); bus_out.emit(LiveEvent::Stream(v)); } Err(_) => bus_out.emit(LiveEvent::Note { text: format!("(non-json) {line}"), }), } } }); let stderr_tail: Arc>> = Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_TAIL_LINES))); let tail_clone = stderr_tail.clone(); let pump_stderr = tokio::spawn(async move { let mut reader = BufReader::new(stderr).lines(); while let Ok(Some(line)) = reader.next_line().await { if line.contains(PROMPT_TOO_LONG_MARKER) { flag_err.store(true, Ordering::Relaxed); } if RATE_LIMIT_MARKERS.iter().any(|m| line.contains(m)) { rate_err.store(true, Ordering::Relaxed); } // Mirror to journald so post-mortems work without the web UI // or the events sqlite. The bus event is what the dashboard // renders; the tracing line is what `journalctl -M -b` // surfaces when claude exits non-zero. tracing::warn!(line = %line, "claude stderr"); bus_err.emit(LiveEvent::Note { text: format!("stderr: {line}"), }); let mut t = tail_clone.lock().unwrap(); if t.len() >= STDERR_TAIL_LINES { t.pop_front(); } t.push_back(line); } }); let status = child.wait().await?; let _ = pump_stdout.await; let _ = pump_stderr.await; let too_long = prompt_too_long.load(Ordering::Relaxed); let is_rate_limited = rate_limited.load(Ordering::Relaxed); if !status.success() && !too_long && !is_rate_limited { let tail = stderr_tail.lock().unwrap(); if tail.is_empty() { bail!("claude exited {status} (no stderr)"); } let tail_str = tail.iter().cloned().collect::>().join("\n"); bail!("claude exited {status}\nstderr tail:\n{tail_str}"); } Ok((too_long, is_rate_limited)) }