From e9b213690eb5854499f2eabc094566aeb724bc43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?m=C3=BCde?= Date: Fri, 15 May 2026 16:27:51 +0200 Subject: [PATCH] dedupe: lift drive_turn/emit_turn_end/wait_for_login into hive_ag3nt::turn --- hive-ag3nt/src/bin/hive-ag3nt.rs | 84 +++++--------------------------- hive-ag3nt/src/bin/hive-m1nd.rs | 72 ++------------------------- hive-ag3nt/src/turn.rs | 65 +++++++++++++++++++++++- 3 files changed, 81 insertions(+), 140 deletions(-) diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index e421780..81e05c5 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -91,11 +91,15 @@ async fn main() -> Result<()> { LoginState::NeedsLogin => { // Partial-run mode: keep the harness alive (so the web UI // stays bound) but don't drive the turn loop. Poll the - // claude dir periodically so a successful login (whether - // from the dashboard PTY path in step 4 or via - // `root-login` + `claude auth login` in the meantime) - // transitions us into the turn loop without a restart. - needs_login_loop(&cli.socket, &claude_dir, login_state, poll_ms, bus).await + // claude dir; once a session lands we enter `serve`. + turn::wait_for_login(&claude_dir, login_state.clone(), poll_ms).await; + serve( + &cli.socket, + Duration::from_millis(poll_ms), + login_state, + bus, + ) + .await } } } @@ -114,30 +118,6 @@ async fn main() -> Result<()> { } } -/// Re-checks `claude_dir` every `poll_ms` ms. As soon as it contains a session -/// (login completed), flips `state` to `Online` and enters the turn loop. -async fn needs_login_loop( - socket: &Path, - claude_dir: &Path, - state: Arc>, - poll_ms: u64, - bus: Bus, -) -> Result<()> { - tracing::warn!( - claude_dir = %claude_dir.display(), - "no claude session — staying in partial-run mode (web UI only)" - ); - let probe = Duration::from_millis(poll_ms.max(2000)); - loop { - tokio::time::sleep(probe).await; - if login::has_session(claude_dir) { - tracing::info!("claude session detected — entering turn loop"); - *state.lock().unwrap() = LoginState::Online; - return serve(socket, Duration::from_millis(poll_ms), state, bus).await; - } - } -} - async fn serve( socket: &Path, interval: Duration, @@ -158,8 +138,9 @@ async fn serve( body: body.clone(), }); let prompt = format_wake_prompt(&label, &from, &body); - let outcome = drive_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Agent).await; - emit_turn_end(&bus, &outcome); + let outcome = + turn::drive_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Agent).await; + turn::emit_turn_end(&bus, &outcome); } Ok(AgentResponse::Empty) => {} Ok(AgentResponse::Ok | AgentResponse::Status { .. }) => { @@ -176,47 +157,6 @@ async fn serve( } } -/// Drive one turn end-to-end. If claude hits `Prompt is too long`, run -/// `/compact` against the persistent session and retry once. Returns the -/// final `TurnOutcome` to drive the `TurnEnd` live event. -async fn drive_turn( - prompt: &str, - mcp_config: &Path, - bus: &Bus, - flavor: mcp::Flavor, -) -> turn::TurnOutcome { - match turn::run_turn(prompt, mcp_config, bus, flavor).await { - turn::TurnOutcome::PromptTooLong => { - if let Err(e) = turn::compact_session(bus).await { - tracing::warn!(error = %format!("{e:#}"), "compact failed"); - return turn::TurnOutcome::Failed(e); - } - turn::run_turn(prompt, mcp_config, bus, flavor).await - } - other => other, - } -} - -fn emit_turn_end(bus: &Bus, outcome: &turn::TurnOutcome) { - match outcome { - turn::TurnOutcome::Ok | turn::TurnOutcome::PromptTooLong => { - bus.emit(LiveEvent::TurnEnd { - ok: true, - note: None, - }); - tracing::info!("claude turn finished"); - } - turn::TurnOutcome::Failed(e) => { - let note = format!("{e:#}"); - bus.emit(LiveEvent::TurnEnd { - ok: false, - note: Some(note.clone()), - }); - tracing::warn!(error = %note, "claude turn failed"); - } - } -} - /// System prompt handed to claude on each turn. The harness has already /// popped one message off the inbox (the wake signal); claude is told /// about it and the MCP tools, and is expected to drive any further diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index a4c2634..f32393f 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -92,11 +92,8 @@ async fn main() -> Result<()> { match initial { LoginState::Online => serve(&cli.socket, Duration::from_millis(poll_ms), bus).await, LoginState::NeedsLogin => { - tracing::warn!( - claude_dir = %claude_dir.display(), - "manager has no claude session — staying in partial-run mode" - ); - needs_login_loop(&cli.socket, &claude_dir, login_state, poll_ms, bus).await + turn::wait_for_login(&claude_dir, login_state, poll_ms).await; + serve(&cli.socket, Duration::from_millis(poll_ms), bus).await } } } @@ -126,26 +123,6 @@ async fn one_shot(socket: &Path, req: ManagerRequest) -> Result<()> { Ok(()) } -/// Manager-side mirror of hive-ag3nt's needs-login loop: keep the web UI -/// alive, poll the claude dir, enter `serve` once login lands. -async fn needs_login_loop( - socket: &Path, - claude_dir: &Path, - state: Arc>, - poll_ms: u64, - bus: Bus, -) -> Result<()> { - let probe = Duration::from_millis(poll_ms.max(2000)); - loop { - tokio::time::sleep(probe).await; - if login::has_session(claude_dir) { - tracing::info!("manager claude session detected — entering inbox loop"); - *state.lock().unwrap() = LoginState::Online; - return serve(socket, Duration::from_millis(poll_ms), bus).await; - } - } -} - async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-m1nd serve"); let mcp_config = turn::write_mcp_config(socket).await?; @@ -172,8 +149,9 @@ async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> { body: body.clone(), }); let prompt = format_wake_prompt(&label, &from, &body); - let outcome = drive_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Manager).await; - emit_turn_end(&bus, &outcome); + let outcome = + turn::drive_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Manager).await; + turn::emit_turn_end(&bus, &outcome); } Ok(ManagerResponse::Empty) => {} Ok(ManagerResponse::Ok | ManagerResponse::Status { .. }) => { @@ -190,46 +168,6 @@ async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> { } } -/// Drive one manager turn end-to-end with the same overflow-then-compact -/// retry as sub-agents. -async fn drive_turn( - prompt: &str, - mcp_config: &Path, - bus: &Bus, - flavor: mcp::Flavor, -) -> turn::TurnOutcome { - match turn::run_turn(prompt, mcp_config, bus, flavor).await { - turn::TurnOutcome::PromptTooLong => { - if let Err(e) = turn::compact_session(bus).await { - tracing::warn!(error = %format!("{e:#}"), "compact failed"); - return turn::TurnOutcome::Failed(e); - } - turn::run_turn(prompt, mcp_config, bus, flavor).await - } - other => other, - } -} - -fn emit_turn_end(bus: &Bus, outcome: &turn::TurnOutcome) { - match outcome { - turn::TurnOutcome::Ok | turn::TurnOutcome::PromptTooLong => { - bus.emit(LiveEvent::TurnEnd { - ok: true, - note: None, - }); - tracing::info!("manager turn finished"); - } - turn::TurnOutcome::Failed(e) => { - let note = format!("{e:#}"); - bus.emit(LiveEvent::TurnEnd { - ok: false, - note: Some(note.clone()), - }); - tracing::warn!(error = %note, "manager turn failed"); - } - } -} - /// Manager-flavored wake prompt. Mentions the privileged tools the sub-agent /// prompt doesn't have access to, and points the manager at its own /// editable config repo for self-modification. diff --git a/hive-ag3nt/src/turn.rs b/hive-ag3nt/src/turn.rs index 5af016b..387fe19 100644 --- a/hive-ag3nt/src/turn.rs +++ b/hive-ag3nt/src/turn.rs @@ -5,14 +5,16 @@ use std::path::{Path, PathBuf}; use std::process::Stdio; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use anyhow::{Result, bail}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::Command; use crate::events::{Bus, LiveEvent}; +use crate::login::{self, LoginState}; use crate::mcp; /// Inline `--settings` JSON applied to every claude invocation. We turn off @@ -59,6 +61,67 @@ pub enum TurnOutcome { Failed(anyhow::Error), } +/// Drive one turn end-to-end, transparently compacting + retrying once on +/// `Prompt is too long`. Both the sub-agent and manager loops call this. +pub async fn drive_turn( + prompt: &str, + mcp_config: &Path, + bus: &Bus, + flavor: mcp::Flavor, +) -> TurnOutcome { + match run_turn(prompt, mcp_config, bus, flavor).await { + TurnOutcome::PromptTooLong => { + if let Err(e) = compact_session(bus).await { + tracing::warn!(error = %format!("{e:#}"), "compact failed"); + return TurnOutcome::Failed(e); + } + run_turn(prompt, mcp_config, bus, flavor).await + } + other => other, + } +} + +/// Emit the per-turn `TurnEnd` event + log line. Single owner so the +/// agent and manager loops agree on outcome semantics. +pub fn emit_turn_end(bus: &Bus, outcome: &TurnOutcome) { + match outcome { + TurnOutcome::Ok | TurnOutcome::PromptTooLong => { + bus.emit(LiveEvent::TurnEnd { + ok: true, + note: None, + }); + tracing::info!("turn finished"); + } + TurnOutcome::Failed(e) => { + let note = format!("{e:#}"); + bus.emit(LiveEvent::TurnEnd { + ok: false, + note: Some(note.clone()), + }); + tracing::warn!(error = %note, "turn failed"); + } + } +} + +/// Block until the bound `~/.claude/` dir contains a session, polling +/// `claude_dir` on a `poll_ms` interval (min 2s). Flips `state` to +/// `Online` when login lands; caller resumes its serve loop. +pub async fn wait_for_login(claude_dir: &Path, state: Arc>, poll_ms: u64) { + tracing::warn!( + claude_dir = %claude_dir.display(), + "no claude session — staying in partial-run mode (web UI only)" + ); + let probe = Duration::from_millis(poll_ms.max(2000)); + loop { + tokio::time::sleep(probe).await; + if login::has_session(claude_dir) { + tracing::info!("claude session detected — entering turn loop"); + *state.lock().unwrap() = LoginState::Online; + return; + } + } +} + /// Spawn `claude` for one turn and pump `stream-json` stdout into the /// live event bus. Prompt goes over stdin (variadic /// `--allowedTools`/`--tools` would otherwise eat a trailing positional