turn loop: --continue, disable claude auto-compact, /compact on overflow
This commit is contained in:
parent
409263f1c9
commit
70af56e050
3 changed files with 183 additions and 49 deletions
|
|
@ -153,24 +153,8 @@ async fn serve(
|
||||||
});
|
});
|
||||||
let prompt = format_wake_prompt(&label, &from, &body);
|
let prompt = format_wake_prompt(&label, &from, &body);
|
||||||
let outcome =
|
let outcome =
|
||||||
turn::run_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Agent).await;
|
drive_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Agent).await;
|
||||||
match outcome {
|
emit_turn_end(&bus, &outcome);
|
||||||
Ok(()) => {
|
|
||||||
bus.emit(LiveEvent::TurnEnd {
|
|
||||||
ok: true,
|
|
||||||
note: None,
|
|
||||||
});
|
|
||||||
tracing::info!("claude turn finished");
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
let note = format!("{e:#}");
|
|
||||||
bus.emit(LiveEvent::TurnEnd {
|
|
||||||
ok: false,
|
|
||||||
note: Some(note.clone()),
|
|
||||||
});
|
|
||||||
tracing::warn!(error = %note, "claude turn failed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(AgentResponse::Empty) => {}
|
Ok(AgentResponse::Empty) => {}
|
||||||
Ok(AgentResponse::Ok | AgentResponse::Status { .. }) => {
|
Ok(AgentResponse::Ok | AgentResponse::Status { .. }) => {
|
||||||
|
|
@ -187,6 +171,47 @@ async fn serve(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Drive one turn end-to-end. If claude hits `Prompt is too long`, run
|
||||||
|
/// `/compact` against the persistent session and retry once. Returns the
|
||||||
|
/// final `TurnOutcome` to drive the `TurnEnd` live event.
|
||||||
|
async fn drive_turn(
|
||||||
|
prompt: &str,
|
||||||
|
mcp_config: &Path,
|
||||||
|
bus: &Bus,
|
||||||
|
flavor: mcp::Flavor,
|
||||||
|
) -> turn::TurnOutcome {
|
||||||
|
match turn::run_turn(prompt, mcp_config, bus, flavor).await {
|
||||||
|
turn::TurnOutcome::PromptTooLong => {
|
||||||
|
if let Err(e) = turn::compact_session(bus).await {
|
||||||
|
tracing::warn!(error = %format!("{e:#}"), "compact failed");
|
||||||
|
return turn::TurnOutcome::Failed(e);
|
||||||
|
}
|
||||||
|
turn::run_turn(prompt, mcp_config, bus, flavor).await
|
||||||
|
}
|
||||||
|
other => other,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn emit_turn_end(bus: &Bus, outcome: &turn::TurnOutcome) {
|
||||||
|
match outcome {
|
||||||
|
turn::TurnOutcome::Ok | turn::TurnOutcome::PromptTooLong => {
|
||||||
|
bus.emit(LiveEvent::TurnEnd {
|
||||||
|
ok: true,
|
||||||
|
note: None,
|
||||||
|
});
|
||||||
|
tracing::info!("claude turn finished");
|
||||||
|
}
|
||||||
|
turn::TurnOutcome::Failed(e) => {
|
||||||
|
let note = format!("{e:#}");
|
||||||
|
bus.emit(LiveEvent::TurnEnd {
|
||||||
|
ok: false,
|
||||||
|
note: Some(note.clone()),
|
||||||
|
});
|
||||||
|
tracing::warn!(error = %note, "claude turn failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// System prompt handed to claude on each turn. The harness has already
|
/// System prompt handed to claude on each turn. The harness has already
|
||||||
/// popped one message off the inbox (the wake signal); claude is told
|
/// popped one message off the inbox (the wake signal); claude is told
|
||||||
/// about it and the MCP tools, and is expected to drive any further
|
/// about it and the MCP tools, and is expected to drive any further
|
||||||
|
|
|
||||||
|
|
@ -175,24 +175,8 @@ async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> {
|
||||||
});
|
});
|
||||||
let prompt = format_wake_prompt(&label, &from, &body);
|
let prompt = format_wake_prompt(&label, &from, &body);
|
||||||
let outcome =
|
let outcome =
|
||||||
turn::run_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Manager).await;
|
drive_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Manager).await;
|
||||||
match outcome {
|
emit_turn_end(&bus, &outcome);
|
||||||
Ok(()) => {
|
|
||||||
bus.emit(LiveEvent::TurnEnd {
|
|
||||||
ok: true,
|
|
||||||
note: None,
|
|
||||||
});
|
|
||||||
tracing::info!("manager turn finished");
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
let note = format!("{e:#}");
|
|
||||||
bus.emit(LiveEvent::TurnEnd {
|
|
||||||
ok: false,
|
|
||||||
note: Some(note.clone()),
|
|
||||||
});
|
|
||||||
tracing::warn!(error = %note, "manager turn failed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(ManagerResponse::Empty) => {}
|
Ok(ManagerResponse::Empty) => {}
|
||||||
Ok(ManagerResponse::Ok | ManagerResponse::Status { .. }) => {
|
Ok(ManagerResponse::Ok | ManagerResponse::Status { .. }) => {
|
||||||
|
|
@ -209,6 +193,46 @@ async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Drive one manager turn end-to-end with the same overflow-then-compact
|
||||||
|
/// retry as sub-agents.
|
||||||
|
async fn drive_turn(
|
||||||
|
prompt: &str,
|
||||||
|
mcp_config: &Path,
|
||||||
|
bus: &Bus,
|
||||||
|
flavor: mcp::Flavor,
|
||||||
|
) -> turn::TurnOutcome {
|
||||||
|
match turn::run_turn(prompt, mcp_config, bus, flavor).await {
|
||||||
|
turn::TurnOutcome::PromptTooLong => {
|
||||||
|
if let Err(e) = turn::compact_session(bus).await {
|
||||||
|
tracing::warn!(error = %format!("{e:#}"), "compact failed");
|
||||||
|
return turn::TurnOutcome::Failed(e);
|
||||||
|
}
|
||||||
|
turn::run_turn(prompt, mcp_config, bus, flavor).await
|
||||||
|
}
|
||||||
|
other => other,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn emit_turn_end(bus: &Bus, outcome: &turn::TurnOutcome) {
|
||||||
|
match outcome {
|
||||||
|
turn::TurnOutcome::Ok | turn::TurnOutcome::PromptTooLong => {
|
||||||
|
bus.emit(LiveEvent::TurnEnd {
|
||||||
|
ok: true,
|
||||||
|
note: None,
|
||||||
|
});
|
||||||
|
tracing::info!("manager turn finished");
|
||||||
|
}
|
||||||
|
turn::TurnOutcome::Failed(e) => {
|
||||||
|
let note = format!("{e:#}");
|
||||||
|
bus.emit(LiveEvent::TurnEnd {
|
||||||
|
ok: false,
|
||||||
|
note: Some(note.clone()),
|
||||||
|
});
|
||||||
|
tracing::warn!(error = %note, "manager turn failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Manager-flavored wake prompt. Mentions the privileged tools the sub-agent
|
/// Manager-flavored wake prompt. Mentions the privileged tools the sub-agent
|
||||||
/// prompt doesn't have access to, and points the manager at its own
|
/// prompt doesn't have access to, and points the manager at its own
|
||||||
/// editable config repo for self-modification.
|
/// editable config repo for self-modification.
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@
|
||||||
|
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::process::Stdio;
|
use std::process::Stdio;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
|
||||||
use anyhow::{Result, bail};
|
use anyhow::{Result, bail};
|
||||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||||
|
|
@ -13,6 +15,21 @@ use tokio::process::Command;
|
||||||
use crate::events::{Bus, LiveEvent};
|
use crate::events::{Bus, LiveEvent};
|
||||||
use crate::mcp;
|
use crate::mcp;
|
||||||
|
|
||||||
|
/// Inline `--settings` JSON applied to every claude invocation. We turn off
|
||||||
|
/// claude's in-session auto-compaction and its cross-session auto-memory
|
||||||
|
/// because hyperhive owns those concerns: compaction is operator/harness-
|
||||||
|
/// driven (`/compact` on overflow), notes persistence is a hyperhive
|
||||||
|
/// concern (planned, not yet wired). Unknown keys are silently ignored by
|
||||||
|
/// claude-code; if the key names ever rename, we'll spot it because
|
||||||
|
/// auto-compact will start firing mid-turn again.
|
||||||
|
const CLAUDE_SETTINGS: &str = r#"{"autoCompactEnabled":false,"autoMemoryEnabled":false}"#;
|
||||||
|
|
||||||
|
/// Regex-ish marker claude-code emits when context overflows. Same string
|
||||||
|
/// bitburner-agent watches for. Empirically reliable across claude-code
|
||||||
|
/// versions; if it ever changes, compaction won't fire and we'll see a
|
||||||
|
/// claude exit with a useful error in the live view.
|
||||||
|
const PROMPT_TOO_LONG_MARKER: &str = "Prompt is too long";
|
||||||
|
|
||||||
/// Drop the MCP config blob claude reads from `--mcp-config <path>`.
|
/// Drop the MCP config blob claude reads from `--mcp-config <path>`.
|
||||||
/// `socket` is the hyperhive per-container socket (forwarded to the child
|
/// `socket` is the hyperhive per-container socket (forwarded to the child
|
||||||
/// as `--socket <path>`); `binary_subcommand` is e.g. `"mcp"` for sub-agents
|
/// as `--socket <path>`); `binary_subcommand` is e.g. `"mcp"` for sub-agents
|
||||||
|
|
@ -31,30 +48,88 @@ pub async fn write_mcp_config(socket: &Path) -> Result<PathBuf> {
|
||||||
Ok(path)
|
Ok(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// One claude turn's outcome. The harness uses this to decide whether to
|
||||||
|
/// transparently kick off a compaction and retry.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum TurnOutcome {
|
||||||
|
Ok,
|
||||||
|
/// claude saw "Prompt is too long" — the session needs compacting.
|
||||||
|
/// Run `compact_session()` then retry the same wake-up prompt.
|
||||||
|
PromptTooLong,
|
||||||
|
Failed(anyhow::Error),
|
||||||
|
}
|
||||||
|
|
||||||
/// Spawn `claude` for one turn and pump `stream-json` stdout into the
|
/// Spawn `claude` for one turn and pump `stream-json` stdout into the
|
||||||
/// live event bus. Prompt goes over stdin (variadic
|
/// live event bus. Prompt goes over stdin (variadic
|
||||||
/// `--allowedTools`/`--tools` would otherwise eat a trailing positional
|
/// `--allowedTools`/`--tools` would otherwise eat a trailing positional
|
||||||
/// prompt). On non-zero exit returns an error; the caller emits the
|
/// prompt). The session is persistent across turns via `--continue` and
|
||||||
/// `TurnEnd` event.
|
/// claude's in-session auto-compact is disabled via `--settings` so it
|
||||||
|
/// doesn't stall mid-turn — hyperhive owns compaction.
|
||||||
pub async fn run_turn(
|
pub async fn run_turn(
|
||||||
prompt: &str,
|
prompt: &str,
|
||||||
mcp_config: &Path,
|
mcp_config: &Path,
|
||||||
bus: &Bus,
|
bus: &Bus,
|
||||||
flavor: mcp::Flavor,
|
flavor: mcp::Flavor,
|
||||||
) -> Result<()> {
|
) -> TurnOutcome {
|
||||||
let mut child = Command::new("claude")
|
match run_claude(prompt, mcp_config, bus, flavor, ClaudeMode::Turn).await {
|
||||||
.arg("--print")
|
Ok(too_long) if too_long => TurnOutcome::PromptTooLong,
|
||||||
|
Ok(_) => TurnOutcome::Ok,
|
||||||
|
Err(e) => TurnOutcome::Failed(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run claude's built-in `/compact` slash command on the persistent
|
||||||
|
/// session so the next turn can fit. No MCP tools needed; we just feed
|
||||||
|
/// `/compact` over stdin and let claude rewrite its own history.
|
||||||
|
pub async fn compact_session(bus: &Bus) -> Result<()> {
|
||||||
|
bus.emit(LiveEvent::Note(
|
||||||
|
"context overflow — running /compact on the persistent session".into(),
|
||||||
|
));
|
||||||
|
let _ = run_claude(
|
||||||
|
"/compact",
|
||||||
|
Path::new("/dev/null"),
|
||||||
|
bus,
|
||||||
|
mcp::Flavor::Agent, // tool surface unused for /compact
|
||||||
|
ClaudeMode::Compact,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
bus.emit(LiveEvent::Note("/compact done".into()));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
enum ClaudeMode {
|
||||||
|
Turn,
|
||||||
|
Compact,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_claude(
|
||||||
|
prompt: &str,
|
||||||
|
mcp_config: &Path,
|
||||||
|
bus: &Bus,
|
||||||
|
flavor: mcp::Flavor,
|
||||||
|
mode: ClaudeMode,
|
||||||
|
) -> Result<bool> {
|
||||||
|
let mut cmd = Command::new("claude");
|
||||||
|
cmd.arg("--print")
|
||||||
.arg("--verbose")
|
.arg("--verbose")
|
||||||
.arg("--output-format")
|
.arg("--output-format")
|
||||||
.arg("stream-json")
|
.arg("stream-json")
|
||||||
.arg("--model")
|
.arg("--model")
|
||||||
.arg("haiku")
|
.arg("haiku")
|
||||||
.arg("--mcp-config")
|
.arg("--continue")
|
||||||
.arg(mcp_config)
|
.arg("--settings")
|
||||||
.arg("--tools")
|
.arg(CLAUDE_SETTINGS);
|
||||||
.arg(mcp::builtin_tools_arg())
|
if let ClaudeMode::Turn = mode {
|
||||||
.arg("--allowedTools")
|
cmd.arg("--mcp-config")
|
||||||
.arg(mcp::allowed_tools_arg(flavor))
|
.arg(mcp_config)
|
||||||
|
.arg("--strict-mcp-config")
|
||||||
|
.arg("--tools")
|
||||||
|
.arg(mcp::builtin_tools_arg())
|
||||||
|
.arg("--allowedTools")
|
||||||
|
.arg(mcp::allowed_tools_arg(flavor));
|
||||||
|
}
|
||||||
|
let mut child = cmd
|
||||||
.stdin(Stdio::piped())
|
.stdin(Stdio::piped())
|
||||||
.stdout(Stdio::piped())
|
.stdout(Stdio::piped())
|
||||||
.stderr(Stdio::piped())
|
.stderr(Stdio::piped())
|
||||||
|
|
@ -68,11 +143,17 @@ pub async fn run_turn(
|
||||||
let stdout = child.stdout.take().expect("piped stdout");
|
let stdout = child.stdout.take().expect("piped stdout");
|
||||||
let stderr = child.stderr.take().expect("piped stderr");
|
let stderr = child.stderr.take().expect("piped stderr");
|
||||||
|
|
||||||
|
let prompt_too_long = Arc::new(AtomicBool::new(false));
|
||||||
|
let flag_out = prompt_too_long.clone();
|
||||||
|
let flag_err = prompt_too_long.clone();
|
||||||
let bus_out = bus.clone();
|
let bus_out = bus.clone();
|
||||||
let bus_err = bus.clone();
|
let bus_err = bus.clone();
|
||||||
let pump_stdout = tokio::spawn(async move {
|
let pump_stdout = tokio::spawn(async move {
|
||||||
let mut reader = BufReader::new(stdout).lines();
|
let mut reader = BufReader::new(stdout).lines();
|
||||||
while let Ok(Some(line)) = reader.next_line().await {
|
while let Ok(Some(line)) = reader.next_line().await {
|
||||||
|
if line.contains(PROMPT_TOO_LONG_MARKER) {
|
||||||
|
flag_out.store(true, Ordering::Relaxed);
|
||||||
|
}
|
||||||
match serde_json::from_str::<serde_json::Value>(&line) {
|
match serde_json::from_str::<serde_json::Value>(&line) {
|
||||||
Ok(v) => bus_out.emit(LiveEvent::Stream(v)),
|
Ok(v) => bus_out.emit(LiveEvent::Stream(v)),
|
||||||
Err(_) => bus_out.emit(LiveEvent::Note(format!("(non-json) {line}"))),
|
Err(_) => bus_out.emit(LiveEvent::Note(format!("(non-json) {line}"))),
|
||||||
|
|
@ -82,6 +163,9 @@ pub async fn run_turn(
|
||||||
let pump_stderr = tokio::spawn(async move {
|
let pump_stderr = tokio::spawn(async move {
|
||||||
let mut reader = BufReader::new(stderr).lines();
|
let mut reader = BufReader::new(stderr).lines();
|
||||||
while let Ok(Some(line)) = reader.next_line().await {
|
while let Ok(Some(line)) = reader.next_line().await {
|
||||||
|
if line.contains(PROMPT_TOO_LONG_MARKER) {
|
||||||
|
flag_err.store(true, Ordering::Relaxed);
|
||||||
|
}
|
||||||
bus_err.emit(LiveEvent::Note(format!("stderr: {line}")));
|
bus_err.emit(LiveEvent::Note(format!("stderr: {line}")));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
@ -89,8 +173,9 @@ pub async fn run_turn(
|
||||||
let status = child.wait().await?;
|
let status = child.wait().await?;
|
||||||
let _ = pump_stdout.await;
|
let _ = pump_stdout.await;
|
||||||
let _ = pump_stderr.await;
|
let _ = pump_stderr.await;
|
||||||
if !status.success() {
|
let too_long = prompt_too_long.load(Ordering::Relaxed);
|
||||||
|
if !status.success() && !too_long {
|
||||||
bail!("claude exited {status}");
|
bail!("claude exited {status}");
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(too_long)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue