//! Per-turn claude invocation shared by `hive-ag3nt` and `hive-m1nd`. The //! two binaries differ only in their MCP `Flavor` (agent surface vs. //! manager surface) and their wake-prompt wording; the spawn shape, //! arg-vector, stdin plumbing, and stream-json pumping are identical. use std::path::{Path, PathBuf}; use std::process::Stdio; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::{Result, bail}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::Command; use crate::events::{Bus, LiveEvent}; use crate::login::{self, LoginState}; use crate::mcp; /// Inline `--settings` JSON applied to every claude invocation. We turn off /// claude's in-session auto-compaction and its cross-session auto-memory /// because hyperhive owns those concerns: compaction is operator/harness- /// driven (`/compact` on overflow), notes persistence is a hyperhive /// concern (planned, not yet wired). Unknown keys are silently ignored by /// claude-code; if the key names ever rename, we'll spot it because /// auto-compact will start firing mid-turn again. const CLAUDE_SETTINGS: &str = r#"{"autoCompactEnabled":false,"autoMemoryEnabled":false}"#; /// Regex-ish marker claude-code emits when context overflows. Same string /// bitburner-agent watches for. Empirically reliable across claude-code /// versions; if it ever changes, compaction won't fire and we'll see a /// claude exit with a useful error in the live view. const PROMPT_TOO_LONG_MARKER: &str = "Prompt is too long"; /// Drop the MCP config blob claude reads from `--mcp-config `. /// `socket` is the hyperhive per-container socket (forwarded to the child /// as `--socket `); `binary_subcommand` is e.g. `"mcp"` for sub-agents /// or `"mcp"` for the manager (both binaries name their MCP subcommand the /// same — the differentiator is which binary `/proc/self/exe` resolves to). pub async fn write_mcp_config(socket: &Path) -> Result { let parent = socket.parent().unwrap_or_else(|| Path::new("/run/hive")); tokio::fs::create_dir_all(parent).await.ok(); let path = parent.join("claude-mcp-config.json"); let exe = std::env::current_exe() .ok() .map_or_else(|| "hive-ag3nt".into(), |p| p.display().to_string()); let body = mcp::render_claude_config(&exe, socket); tokio::fs::write(&path, body).await?; tracing::info!(path = %path.display(), "wrote claude MCP config"); Ok(path) } /// Write the agent's / manager's static system prompt to a file next to /// the MCP config and return the path. Passed to claude via /// `--system-prompt-file`, replacing claude's default system prompt with /// the role + tools instructions. Per-turn prompts become much smaller /// (just the wake message body). pub async fn write_system_prompt( socket: &Path, label: &str, flavor: mcp::Flavor, ) -> Result { let parent = socket.parent().unwrap_or_else(|| Path::new("/run/hive")); tokio::fs::create_dir_all(parent).await.ok(); let template = match flavor { mcp::Flavor::Agent => include_str!("../prompts/agent.md"), mcp::Flavor::Manager => include_str!("../prompts/manager.md"), }; let body = template.replace("{label}", label); let path = parent.join("claude-system-prompt.md"); tokio::fs::write(&path, body).await?; tracing::info!(path = %path.display(), "wrote claude system prompt"); Ok(path) } /// One claude turn's outcome. The harness uses this to decide whether to /// transparently kick off a compaction and retry. #[derive(Debug)] pub enum TurnOutcome { Ok, /// claude saw "Prompt is too long" — the session needs compacting. /// Run `compact_session()` then retry the same wake-up prompt. PromptTooLong, Failed(anyhow::Error), } /// Drive one turn end-to-end, transparently compacting + retrying once on /// `Prompt is too long`. Both the sub-agent and manager loops call this. pub async fn drive_turn( prompt: &str, mcp_config: &Path, system_prompt: &Path, bus: &Bus, flavor: mcp::Flavor, ) -> TurnOutcome { match run_turn(prompt, mcp_config, system_prompt, bus, flavor).await { TurnOutcome::PromptTooLong => { if let Err(e) = compact_session(bus).await { tracing::warn!(error = %format!("{e:#}"), "compact failed"); return TurnOutcome::Failed(e); } run_turn(prompt, mcp_config, system_prompt, bus, flavor).await } other => other, } } /// Emit the per-turn `TurnEnd` event + log line. Single owner so the /// agent and manager loops agree on outcome semantics. pub fn emit_turn_end(bus: &Bus, outcome: &TurnOutcome) { match outcome { TurnOutcome::Ok | TurnOutcome::PromptTooLong => { bus.emit(LiveEvent::TurnEnd { ok: true, note: None, }); tracing::info!("turn finished"); } TurnOutcome::Failed(e) => { let note = format!("{e:#}"); bus.emit(LiveEvent::TurnEnd { ok: false, note: Some(note.clone()), }); tracing::warn!(error = %note, "turn failed"); } } } /// Block until the bound `~/.claude/` dir contains a session, polling /// `claude_dir` on a `poll_ms` interval (min 2s). Flips `state` to /// `Online` when login lands; caller resumes its serve loop. pub async fn wait_for_login(claude_dir: &Path, state: Arc>, poll_ms: u64) { tracing::warn!( claude_dir = %claude_dir.display(), "no claude session — staying in partial-run mode (web UI only)" ); let probe = Duration::from_millis(poll_ms.max(2000)); loop { tokio::time::sleep(probe).await; if login::has_session(claude_dir) { tracing::info!("claude session detected — entering turn loop"); *state.lock().unwrap() = LoginState::Online; return; } } } /// Spawn `claude` for one turn and pump `stream-json` stdout into the /// live event bus. Prompt goes over stdin (variadic /// `--allowedTools`/`--tools` would otherwise eat a trailing positional /// prompt). The session is persistent across turns via `--continue` and /// claude's in-session auto-compact is disabled via `--settings` so it /// doesn't stall mid-turn — hyperhive owns compaction. pub async fn run_turn( prompt: &str, mcp_config: &Path, system_prompt: &Path, bus: &Bus, flavor: mcp::Flavor, ) -> TurnOutcome { match run_claude( prompt, mcp_config, Some(system_prompt), bus, flavor, ClaudeMode::Turn, ) .await { Ok(too_long) if too_long => TurnOutcome::PromptTooLong, Ok(_) => TurnOutcome::Ok, Err(e) => TurnOutcome::Failed(e), } } /// Run claude's built-in `/compact` slash command on the persistent /// session so the next turn can fit. No MCP tools needed; we just feed /// `/compact` over stdin and let claude rewrite its own history. pub async fn compact_session(bus: &Bus) -> Result<()> { bus.emit(LiveEvent::Note( "context overflow — running /compact on the persistent session".into(), )); let _ = run_claude( "/compact", Path::new("/dev/null"), None, bus, mcp::Flavor::Agent, // tool surface unused for /compact ClaudeMode::Compact, ) .await?; bus.emit(LiveEvent::Note("/compact done".into())); Ok(()) } #[derive(Clone, Copy)] enum ClaudeMode { Turn, Compact, } async fn run_claude( prompt: &str, mcp_config: &Path, system_prompt: Option<&Path>, bus: &Bus, flavor: mcp::Flavor, mode: ClaudeMode, ) -> Result { let mut cmd = Command::new("claude"); cmd.arg("--print") .arg("--verbose") .arg("--output-format") .arg("stream-json") .arg("--model") .arg("haiku") .arg("--continue") .arg("--settings") .arg(CLAUDE_SETTINGS); if let Some(p) = system_prompt { cmd.arg("--system-prompt-file").arg(p); } if let ClaudeMode::Turn = mode { cmd.arg("--mcp-config") .arg(mcp_config) .arg("--strict-mcp-config") .arg("--tools") .arg(mcp::builtin_tools_arg()) .arg("--allowedTools") .arg(mcp::allowed_tools_arg(flavor)); } let mut child = cmd .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn()?; if let Some(mut stdin) = child.stdin.take() { stdin.write_all(prompt.as_bytes()).await?; stdin.shutdown().await.ok(); drop(stdin); } let stdout = child.stdout.take().expect("piped stdout"); let stderr = child.stderr.take().expect("piped stderr"); let prompt_too_long = Arc::new(AtomicBool::new(false)); let flag_out = prompt_too_long.clone(); let flag_err = prompt_too_long.clone(); let bus_out = bus.clone(); let bus_err = bus.clone(); let pump_stdout = tokio::spawn(async move { let mut reader = BufReader::new(stdout).lines(); while let Ok(Some(line)) = reader.next_line().await { if line.contains(PROMPT_TOO_LONG_MARKER) { flag_out.store(true, Ordering::Relaxed); } match serde_json::from_str::(&line) { Ok(v) => bus_out.emit(LiveEvent::Stream(v)), Err(_) => bus_out.emit(LiveEvent::Note(format!("(non-json) {line}"))), } } }); let pump_stderr = tokio::spawn(async move { let mut reader = BufReader::new(stderr).lines(); while let Ok(Some(line)) = reader.next_line().await { if line.contains(PROMPT_TOO_LONG_MARKER) { flag_err.store(true, Ordering::Relaxed); } bus_err.emit(LiveEvent::Note(format!("stderr: {line}"))); } }); let status = child.wait().await?; let _ = pump_stdout.await; let _ = pump_stderr.await; let too_long = prompt_too_long.load(Ordering::Relaxed); if !status.success() && !too_long { bail!("claude exited {status}"); } Ok(too_long) }