turn: unify run_turn / compact_session via TurnFiles

new TurnFiles bundle (mcp_config + settings + system_prompt +
flavor) materialised once per harness boot, passed to drive_turn
and compact_session alike. operator-initiated /compact now uses
the exact same session shape as a normal turn — same MCP
surface, same allowed tools, same role prompt — only the stdin
payload differs (/compact vs the wake-up body). web_ui's
AppState carries the TurnFiles instead of (label + socket +
flavor + ad-hoc file writes per click). bin/hive-ag3nt and
bin/hive-m1nd prepare TurnFiles before spawning the web UI and
pass them to both surfaces. web_ui::Flavor folds into a type
alias for mcp::Flavor — no two-stage enum mapping.

removes ClaudeMode + the run_claude variant fork (system prompt
was Option, mcp args were skipped on Compact). dead 'mode'
plumbing gone.
This commit is contained in:
müde 2026-05-16 00:57:58 +02:00
parent 87c7b05b05
commit d94712bde8
4 changed files with 109 additions and 163 deletions

View file

@ -56,24 +56,16 @@ async fn main() -> Result<()> {
let initial = LoginState::from_dir(&claude_dir); let initial = LoginState::from_dir(&claude_dir);
tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "harness boot"); tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "harness boot");
let login_state = Arc::new(Mutex::new(initial)); let login_state = Arc::new(Mutex::new(initial));
let ui_state = login_state.clone();
let bus = Bus::new(); let bus = Bus::new();
let ui_bus = bus.clone(); let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Agent).await?;
let ui_socket = cli.socket.clone(); tokio::spawn(web_ui::serve(
tokio::spawn(async move {
if let Err(e) = web_ui::serve(
label, label,
port, port,
ui_state, login_state.clone(),
ui_bus, bus.clone(),
ui_socket, cli.socket.clone(),
web_ui::Flavor::Agent, files.clone(),
) ));
.await
{
tracing::error!(error = ?e, "web ui failed");
}
});
match initial { match initial {
LoginState::Online => { LoginState::Online => {
serve( serve(
@ -81,6 +73,7 @@ async fn main() -> Result<()> {
Duration::from_millis(poll_ms), Duration::from_millis(poll_ms),
login_state, login_state,
bus, bus,
&files,
) )
.await .await
} }
@ -94,6 +87,7 @@ async fn main() -> Result<()> {
Duration::from_millis(poll_ms), Duration::from_millis(poll_ms),
login_state, login_state,
bus, bus,
&files,
) )
.await .await
} }
@ -108,13 +102,10 @@ async fn serve(
interval: Duration, interval: Duration,
state: Arc<Mutex<LoginState>>, state: Arc<Mutex<LoginState>>,
bus: Bus, bus: Bus,
files: &turn::TurnFiles,
) -> Result<()> { ) -> Result<()> {
tracing::info!(socket = %socket.display(), "hive-ag3nt serve"); tracing::info!(socket = %socket.display(), "hive-ag3nt serve");
let _ = state; // reserved for future state transitions (turn-loop -> needs-login) let _ = state; // reserved for future state transitions (turn-loop -> needs-login)
let mcp_config = turn::write_mcp_config(socket).await?;
let settings = turn::write_settings(socket).await?;
let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hive-ag3nt".into());
let system_prompt = turn::write_system_prompt(socket, &label, mcp::Flavor::Agent).await?;
loop { loop {
let recv: Result<AgentResponse> = let recv: Result<AgentResponse> =
client::request(socket, &AgentRequest::Recv { wait_seconds: None }).await; client::request(socket, &AgentRequest::Recv { wait_seconds: None }).await;
@ -129,15 +120,7 @@ async fn serve(
}); });
bus.set_state(TurnState::Thinking); bus.set_state(TurnState::Thinking);
let prompt = format_wake_prompt(&from, &body, unread); let prompt = format_wake_prompt(&from, &body, unread);
let outcome = turn::drive_turn( let outcome = turn::drive_turn(&prompt, files, &bus).await;
&prompt,
&mcp_config,
&system_prompt,
&settings,
&bus,
mcp::Flavor::Agent,
)
.await;
turn::emit_turn_end(&bus, &outcome); turn::emit_turn_end(&bus, &outcome);
bus.set_state(TurnState::Idle); bus.set_state(TurnState::Idle);
} }

View file

@ -59,29 +59,23 @@ async fn main() -> Result<()> {
let initial = LoginState::from_dir(&claude_dir); let initial = LoginState::from_dir(&claude_dir);
tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "hm1nd boot"); tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "hm1nd boot");
let login_state = Arc::new(Mutex::new(initial)); let login_state = Arc::new(Mutex::new(initial));
let ui_state = login_state.clone();
let bus = Bus::new(); let bus = Bus::new();
let ui_bus = bus.clone(); let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Manager).await?;
let ui_socket = cli.socket.clone(); tokio::spawn(web_ui::serve(
tokio::spawn(async move {
if let Err(e) = web_ui::serve(
label, label,
port, port,
ui_state, login_state.clone(),
ui_bus, bus.clone(),
ui_socket, cli.socket.clone(),
web_ui::Flavor::Manager, files.clone(),
) ));
.await
{
tracing::error!(error = ?e, "web ui failed");
}
});
match initial { match initial {
LoginState::Online => serve(&cli.socket, Duration::from_millis(poll_ms), bus).await, LoginState::Online => {
serve(&cli.socket, Duration::from_millis(poll_ms), bus, &files).await
}
LoginState::NeedsLogin => { LoginState::NeedsLogin => {
turn::wait_for_login(&claude_dir, login_state, poll_ms).await; turn::wait_for_login(&claude_dir, login_state, poll_ms).await;
serve(&cli.socket, Duration::from_millis(poll_ms), bus).await serve(&cli.socket, Duration::from_millis(poll_ms), bus, &files).await
} }
} }
} }
@ -89,12 +83,13 @@ async fn main() -> Result<()> {
} }
} }
async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> { async fn serve(
socket: &Path,
interval: Duration,
bus: Bus,
files: &turn::TurnFiles,
) -> Result<()> {
tracing::info!(socket = %socket.display(), "hive-m1nd serve"); tracing::info!(socket = %socket.display(), "hive-m1nd serve");
let mcp_config = turn::write_mcp_config(socket).await?;
let settings = turn::write_settings(socket).await?;
let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hm1nd".into());
let system_prompt = turn::write_system_prompt(socket, &label, mcp::Flavor::Manager).await?;
loop { loop {
let recv: Result<ManagerResponse> = let recv: Result<ManagerResponse> =
client::request(socket, &ManagerRequest::Recv { wait_seconds: None }).await; client::request(socket, &ManagerRequest::Recv { wait_seconds: None }).await;
@ -126,15 +121,7 @@ async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> {
}); });
let prompt = format_wake_prompt(&from, &body, unread); let prompt = format_wake_prompt(&from, &body, unread);
bus.set_state(TurnState::Thinking); bus.set_state(TurnState::Thinking);
let outcome = turn::drive_turn( let outcome = turn::drive_turn(&prompt, files, &bus).await;
&prompt,
&mcp_config,
&system_prompt,
&settings,
&bus,
mcp::Flavor::Manager,
)
.await;
turn::emit_turn_end(&bus, &outcome); turn::emit_turn_end(&bus, &outcome);
bus.set_state(TurnState::Idle); bus.set_state(TurnState::Idle);
} }

View file

@ -33,6 +33,34 @@ const CLAUDE_SETTINGS: &str = include_str!("../prompts/claude-settings.json");
/// claude exit with a useful error in the live view. /// claude exit with a useful error in the live view.
const PROMPT_TOO_LONG_MARKER: &str = "Prompt is too long"; const PROMPT_TOO_LONG_MARKER: &str = "Prompt is too long";
/// The set of files claude reads on every invocation: the MCP server
/// config (`--mcp-config`), static settings (`--settings`), and the
/// pre-rendered role/tools system prompt (`--system-prompt-file`).
/// Materialised once at harness startup; shared between the turn loop
/// and the operator-driven `/compact` path so both invocations look
/// identical to claude (same MCP surface, same allowed tools, same
/// role prompt — only the stdin payload differs).
#[derive(Clone)]
pub struct TurnFiles {
pub mcp_config: PathBuf,
pub settings: PathBuf,
pub system_prompt: PathBuf,
pub flavor: mcp::Flavor,
}
impl TurnFiles {
/// Write all three files into the per-agent runtime dir alongside
/// `socket`. Idempotent — overwrites whatever was there.
pub async fn prepare(socket: &Path, label: &str, flavor: mcp::Flavor) -> Result<Self> {
Ok(Self {
mcp_config: write_mcp_config(socket).await?,
settings: write_settings(socket).await?,
system_prompt: write_system_prompt(socket, label, flavor).await?,
flavor,
})
}
}
/// Drop the MCP config blob claude reads from `--mcp-config <path>`. /// Drop the MCP config blob claude reads from `--mcp-config <path>`.
/// `socket` is the hyperhive per-container socket (forwarded to the child /// `socket` is the hyperhive per-container socket (forwarded to the child
/// as `--socket <path>`); `binary_subcommand` is e.g. `"mcp"` for sub-agents /// as `--socket <path>`); `binary_subcommand` is e.g. `"mcp"` for sub-agents
@ -99,21 +127,14 @@ pub enum TurnOutcome {
/// Drive one turn end-to-end, transparently compacting + retrying once on /// Drive one turn end-to-end, transparently compacting + retrying once on
/// `Prompt is too long`. Both the sub-agent and manager loops call this. /// `Prompt is too long`. Both the sub-agent and manager loops call this.
pub async fn drive_turn( pub async fn drive_turn(prompt: &str, files: &TurnFiles, bus: &Bus) -> TurnOutcome {
prompt: &str, match run_turn(prompt, files, bus).await {
mcp_config: &Path,
system_prompt: &Path,
settings: &Path,
bus: &Bus,
flavor: mcp::Flavor,
) -> TurnOutcome {
match run_turn(prompt, mcp_config, system_prompt, settings, bus, flavor).await {
TurnOutcome::PromptTooLong => { TurnOutcome::PromptTooLong => {
if let Err(e) = compact_session(settings, bus).await { if let Err(e) = compact_session(files, bus).await {
tracing::warn!(error = %format!("{e:#}"), "compact failed"); tracing::warn!(error = %format!("{e:#}"), "compact failed");
return TurnOutcome::Failed(e); return TurnOutcome::Failed(e);
} }
run_turn(prompt, mcp_config, system_prompt, settings, bus, flavor).await run_turn(prompt, files, bus).await
} }
other => other, other => other,
} }
@ -166,25 +187,8 @@ pub async fn wait_for_login(claude_dir: &Path, state: Arc<Mutex<LoginState>>, po
/// prompt). The session is persistent across turns via `--continue` and /// prompt). The session is persistent across turns via `--continue` and
/// claude's in-session auto-compact is disabled via `--settings` so it /// claude's in-session auto-compact is disabled via `--settings` so it
/// doesn't stall mid-turn — hyperhive owns compaction. /// doesn't stall mid-turn — hyperhive owns compaction.
pub async fn run_turn( pub async fn run_turn(prompt: &str, files: &TurnFiles, bus: &Bus) -> TurnOutcome {
prompt: &str, match run_claude(prompt, files, bus).await {
mcp_config: &Path,
system_prompt: &Path,
settings: &Path,
bus: &Bus,
flavor: mcp::Flavor,
) -> TurnOutcome {
match run_claude(
prompt,
mcp_config,
Some(system_prompt),
settings,
bus,
flavor,
ClaudeMode::Turn,
)
.await
{
Ok(too_long) if too_long => TurnOutcome::PromptTooLong, Ok(too_long) if too_long => TurnOutcome::PromptTooLong,
Ok(_) => TurnOutcome::Ok, Ok(_) => TurnOutcome::Ok,
Err(e) => TurnOutcome::Failed(e), Err(e) => TurnOutcome::Failed(e),
@ -192,49 +196,23 @@ pub async fn run_turn(
} }
/// Run claude's built-in `/compact` slash command on the persistent /// Run claude's built-in `/compact` slash command on the persistent
/// session so the next turn can fit. No MCP tools needed; we just feed /// session. Takes the *same* params as `run_turn` because compact
/// `/compact` over stdin and let claude rewrite its own history. /// re-initialises claude with the full session shape — same MCP
pub async fn compact_session(settings: &Path, bus: &Bus) -> Result<()> { /// surface, same system prompt, same allowed-tools — so the post-
/// compact state matches a normal turn's. Only the prompt over stdin
/// differs (`/compact` vs the wake-up payload).
pub async fn compact_session(files: &TurnFiles, bus: &Bus) -> Result<()> {
bus.emit(LiveEvent::Note( bus.emit(LiveEvent::Note(
"context overflow — running /compact on the persistent session".into(), "context overflow — running /compact on the persistent session".into(),
)); ));
let _ = run_claude( let _ = run_claude("/compact", files, bus).await?;
"/compact",
Path::new("/dev/null"),
None,
settings,
bus,
mcp::Flavor::Agent, // tool surface unused for /compact
ClaudeMode::Compact,
)
.await?;
bus.emit(LiveEvent::Note("/compact done".into())); bus.emit(LiveEvent::Note("/compact done".into()));
Ok(()) Ok(())
} }
#[derive(Clone, Copy)] async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<bool> {
enum ClaudeMode {
Turn,
Compact,
}
async fn run_claude(
prompt: &str,
mcp_config: &Path,
system_prompt: Option<&Path>,
settings: &Path,
bus: &Bus,
flavor: mcp::Flavor,
mode: ClaudeMode,
) -> Result<bool> {
let model = bus.model(); let model = bus.model();
// /compact must always run against the existing session — otherwise let resume = !bus.take_skip_continue();
// there's nothing to compact. Only normal turns honor the
// operator's "new session" one-shot flag.
let resume = match mode {
ClaudeMode::Turn => !bus.take_skip_continue(),
ClaudeMode::Compact => true,
};
if !resume { if !resume {
bus.emit(LiveEvent::Note( bus.emit(LiveEvent::Note(
"fresh session (--continue suppressed for this turn)".into(), "fresh session (--continue suppressed for this turn)".into(),
@ -258,22 +236,18 @@ async fn run_claude(
.arg("--model") .arg("--model")
.arg(&model) .arg(&model)
.arg("--settings") .arg("--settings")
.arg(settings); .arg(&files.settings);
if resume { if resume {
cmd.arg("--continue"); cmd.arg("--continue");
} }
if let Some(p) = system_prompt { cmd.arg("--system-prompt-file").arg(&files.system_prompt);
cmd.arg("--system-prompt-file").arg(p);
}
if let ClaudeMode::Turn = mode {
cmd.arg("--mcp-config") cmd.arg("--mcp-config")
.arg(mcp_config) .arg(&files.mcp_config)
.arg("--strict-mcp-config") .arg("--strict-mcp-config")
.arg("--tools") .arg("--tools")
.arg(mcp::builtin_tools_arg()) .arg(mcp::builtin_tools_arg())
.arg("--allowedTools") .arg("--allowedTools")
.arg(mcp::allowed_tools_arg(flavor)); .arg(mcp::allowed_tools_arg(files.flavor));
}
let mut child = cmd let mut child = cmd
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())

View file

@ -28,6 +28,8 @@ use crate::client;
use crate::events::Bus; use crate::events::Bus;
use crate::login::LoginState; use crate::login::LoginState;
use crate::login_session::{LoginSession, drop_if_finished}; use crate::login_session::{LoginSession, drop_if_finished};
use crate::mcp;
use crate::turn::TurnFiles;
/// Live login state for the web UI. The harness updates this in place as it /// Live login state for the web UI. The harness updates this in place as it
/// transitions between `NeedsLogin` and `Online`; the UI reads on each /// transitions between `NeedsLogin` and `Online`; the UI reads on each
@ -41,16 +43,25 @@ struct AppState {
session: Arc<Mutex<Option<Arc<LoginSession>>>>, session: Arc<Mutex<Option<Arc<LoginSession>>>>,
bus: Bus, bus: Bus,
socket: PathBuf, socket: PathBuf,
flavor: Flavor, /// Same `TurnFiles` the harness's turn loop uses. Shared so
/// `/api/compact` re-uses the exact MCP config / system prompt /
/// settings claude saw on the last regular turn — keeps the
/// session shape identical across compact + normal turns.
files: TurnFiles,
}
impl AppState {
fn flavor(&self) -> Flavor {
self.files.flavor
}
} }
/// Which wire protocol the per-agent UI's `/send` handler should speak. /// Which wire protocol the per-agent UI's `/send` handler should speak.
/// Sub-agent → `AgentRequest::OperatorMsg`; manager → `ManagerRequest::OperatorMsg`. /// Sub-agent → `AgentRequest::OperatorMsg`; manager →
#[derive(Debug, Clone, Copy)] /// `ManagerRequest::OperatorMsg`. Reuses the MCP-side enum so a
pub enum Flavor { /// single value drives both the send protocol and (in
Agent, /// `post_compact`) the allowed-tools surface claude sees.
Manager, pub type Flavor = mcp::Flavor;
}
pub async fn serve( pub async fn serve(
label: String, label: String,
@ -58,7 +69,7 @@ pub async fn serve(
login: LoginStateCell, login: LoginStateCell,
bus: Bus, bus: Bus,
socket: PathBuf, socket: PathBuf,
flavor: Flavor, files: TurnFiles,
) -> Result<()> { ) -> Result<()> {
let state = AppState { let state = AppState {
label, label,
@ -66,7 +77,7 @@ pub async fn serve(
session: Arc::new(Mutex::new(None)), session: Arc::new(Mutex::new(None)),
bus, bus,
socket, socket,
flavor, files,
}; };
let app = Router::new() let app = Router::new()
.route("/", get(serve_index)) .route("/", get(serve_index))
@ -208,7 +219,7 @@ async fn api_state(State(state): State<AppState>) -> axum::Json<StateSnapshot> {
.ok() .ok()
.and_then(|s| s.parse::<u16>().ok()) .and_then(|s| s.parse::<u16>().ok())
.unwrap_or(7000); .unwrap_or(7000);
let inbox = recent_inbox(&state.socket, state.flavor).await; let inbox = recent_inbox(&state.socket, state.flavor()).await;
let (turn_state, turn_state_since) = state.bus.state_snapshot(); let (turn_state, turn_state_since) = state.bus.state_snapshot();
let model = state.bus.model(); let model = state.bus.model();
axum::Json(StateSnapshot { axum::Json(StateSnapshot {
@ -268,7 +279,7 @@ async fn post_send(State(state): State<AppState>, Form(form): Form<SendForm>) ->
if body.is_empty() { if body.is_empty() {
return error_response("send: `body` required"); return error_response("send: `body` required");
} }
let result = match state.flavor { let result = match state.flavor() {
Flavor::Agent => match client::request::<_, hive_sh4re::AgentResponse>( Flavor::Agent => match client::request::<_, hive_sh4re::AgentResponse>(
&state.socket, &state.socket,
&hive_sh4re::AgentRequest::OperatorMsg { body }, &hive_sh4re::AgentRequest::OperatorMsg { body },
@ -396,22 +407,13 @@ async fn post_set_model(State(state): State<AppState>, Form(form): Form<ModelFor
async fn post_compact(State(state): State<AppState>) -> Response { async fn post_compact(State(state): State<AppState>) -> Response {
let bus = state.bus.clone(); let bus = state.bus.clone();
let socket = state.socket.clone(); let files = state.files.clone();
tokio::spawn(async move { tokio::spawn(async move {
bus.emit(crate::events::LiveEvent::Note( bus.emit(crate::events::LiveEvent::Note(
"operator: /compact — running on persistent session".into(), "operator: /compact — running on persistent session".into(),
)); ));
let settings = match crate::turn::write_settings(&socket).await {
Ok(p) => p,
Err(e) => {
bus.emit(crate::events::LiveEvent::Note(format!(
"/compact failed: settings write — {e:#}"
)));
return;
}
};
bus.set_state(crate::events::TurnState::Compacting); bus.set_state(crate::events::TurnState::Compacting);
let r = crate::turn::compact_session(&settings, &bus).await; let r = crate::turn::compact_session(&files, &bus).await;
bus.set_state(crate::events::TurnState::Idle); bus.set_state(crate::events::TurnState::Idle);
if let Err(e) = r { if let Err(e) = r {
bus.emit(crate::events::LiveEvent::Note(format!( bus.emit(crate::events::LiveEvent::Note(format!(