dedupe: lift drive_turn/emit_turn_end/wait_for_login into hive_ag3nt::turn
This commit is contained in:
parent
8fbee4fbf2
commit
e9b213690e
3 changed files with 81 additions and 140 deletions
|
|
@ -91,11 +91,15 @@ async fn main() -> Result<()> {
|
|||
LoginState::NeedsLogin => {
|
||||
// Partial-run mode: keep the harness alive (so the web UI
|
||||
// stays bound) but don't drive the turn loop. Poll the
|
||||
// claude dir periodically so a successful login (whether
|
||||
// from the dashboard PTY path in step 4 or via
|
||||
// `root-login` + `claude auth login` in the meantime)
|
||||
// transitions us into the turn loop without a restart.
|
||||
needs_login_loop(&cli.socket, &claude_dir, login_state, poll_ms, bus).await
|
||||
// claude dir; once a session lands we enter `serve`.
|
||||
turn::wait_for_login(&claude_dir, login_state.clone(), poll_ms).await;
|
||||
serve(
|
||||
&cli.socket,
|
||||
Duration::from_millis(poll_ms),
|
||||
login_state,
|
||||
bus,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -114,30 +118,6 @@ async fn main() -> Result<()> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Re-checks `claude_dir` every `poll_ms` ms. As soon as it contains a session
|
||||
/// (login completed), flips `state` to `Online` and enters the turn loop.
|
||||
async fn needs_login_loop(
|
||||
socket: &Path,
|
||||
claude_dir: &Path,
|
||||
state: Arc<Mutex<LoginState>>,
|
||||
poll_ms: u64,
|
||||
bus: Bus,
|
||||
) -> Result<()> {
|
||||
tracing::warn!(
|
||||
claude_dir = %claude_dir.display(),
|
||||
"no claude session — staying in partial-run mode (web UI only)"
|
||||
);
|
||||
let probe = Duration::from_millis(poll_ms.max(2000));
|
||||
loop {
|
||||
tokio::time::sleep(probe).await;
|
||||
if login::has_session(claude_dir) {
|
||||
tracing::info!("claude session detected — entering turn loop");
|
||||
*state.lock().unwrap() = LoginState::Online;
|
||||
return serve(socket, Duration::from_millis(poll_ms), state, bus).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn serve(
|
||||
socket: &Path,
|
||||
interval: Duration,
|
||||
|
|
@ -158,8 +138,9 @@ async fn serve(
|
|||
body: body.clone(),
|
||||
});
|
||||
let prompt = format_wake_prompt(&label, &from, &body);
|
||||
let outcome = drive_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Agent).await;
|
||||
emit_turn_end(&bus, &outcome);
|
||||
let outcome =
|
||||
turn::drive_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Agent).await;
|
||||
turn::emit_turn_end(&bus, &outcome);
|
||||
}
|
||||
Ok(AgentResponse::Empty) => {}
|
||||
Ok(AgentResponse::Ok | AgentResponse::Status { .. }) => {
|
||||
|
|
@ -176,47 +157,6 @@ async fn serve(
|
|||
}
|
||||
}
|
||||
|
||||
/// Drive one turn end-to-end. If claude hits `Prompt is too long`, run
|
||||
/// `/compact` against the persistent session and retry once. Returns the
|
||||
/// final `TurnOutcome` to drive the `TurnEnd` live event.
|
||||
async fn drive_turn(
|
||||
prompt: &str,
|
||||
mcp_config: &Path,
|
||||
bus: &Bus,
|
||||
flavor: mcp::Flavor,
|
||||
) -> turn::TurnOutcome {
|
||||
match turn::run_turn(prompt, mcp_config, bus, flavor).await {
|
||||
turn::TurnOutcome::PromptTooLong => {
|
||||
if let Err(e) = turn::compact_session(bus).await {
|
||||
tracing::warn!(error = %format!("{e:#}"), "compact failed");
|
||||
return turn::TurnOutcome::Failed(e);
|
||||
}
|
||||
turn::run_turn(prompt, mcp_config, bus, flavor).await
|
||||
}
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_turn_end(bus: &Bus, outcome: &turn::TurnOutcome) {
|
||||
match outcome {
|
||||
turn::TurnOutcome::Ok | turn::TurnOutcome::PromptTooLong => {
|
||||
bus.emit(LiveEvent::TurnEnd {
|
||||
ok: true,
|
||||
note: None,
|
||||
});
|
||||
tracing::info!("claude turn finished");
|
||||
}
|
||||
turn::TurnOutcome::Failed(e) => {
|
||||
let note = format!("{e:#}");
|
||||
bus.emit(LiveEvent::TurnEnd {
|
||||
ok: false,
|
||||
note: Some(note.clone()),
|
||||
});
|
||||
tracing::warn!(error = %note, "claude turn failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// System prompt handed to claude on each turn. The harness has already
|
||||
/// popped one message off the inbox (the wake signal); claude is told
|
||||
/// about it and the MCP tools, and is expected to drive any further
|
||||
|
|
|
|||
|
|
@ -92,11 +92,8 @@ async fn main() -> Result<()> {
|
|||
match initial {
|
||||
LoginState::Online => serve(&cli.socket, Duration::from_millis(poll_ms), bus).await,
|
||||
LoginState::NeedsLogin => {
|
||||
tracing::warn!(
|
||||
claude_dir = %claude_dir.display(),
|
||||
"manager has no claude session — staying in partial-run mode"
|
||||
);
|
||||
needs_login_loop(&cli.socket, &claude_dir, login_state, poll_ms, bus).await
|
||||
turn::wait_for_login(&claude_dir, login_state, poll_ms).await;
|
||||
serve(&cli.socket, Duration::from_millis(poll_ms), bus).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -126,26 +123,6 @@ async fn one_shot(socket: &Path, req: ManagerRequest) -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Manager-side mirror of hive-ag3nt's needs-login loop: keep the web UI
|
||||
/// alive, poll the claude dir, enter `serve` once login lands.
|
||||
async fn needs_login_loop(
|
||||
socket: &Path,
|
||||
claude_dir: &Path,
|
||||
state: Arc<Mutex<LoginState>>,
|
||||
poll_ms: u64,
|
||||
bus: Bus,
|
||||
) -> Result<()> {
|
||||
let probe = Duration::from_millis(poll_ms.max(2000));
|
||||
loop {
|
||||
tokio::time::sleep(probe).await;
|
||||
if login::has_session(claude_dir) {
|
||||
tracing::info!("manager claude session detected — entering inbox loop");
|
||||
*state.lock().unwrap() = LoginState::Online;
|
||||
return serve(socket, Duration::from_millis(poll_ms), bus).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> {
|
||||
tracing::info!(socket = %socket.display(), "hive-m1nd serve");
|
||||
let mcp_config = turn::write_mcp_config(socket).await?;
|
||||
|
|
@ -172,8 +149,9 @@ async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> {
|
|||
body: body.clone(),
|
||||
});
|
||||
let prompt = format_wake_prompt(&label, &from, &body);
|
||||
let outcome = drive_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Manager).await;
|
||||
emit_turn_end(&bus, &outcome);
|
||||
let outcome =
|
||||
turn::drive_turn(&prompt, &mcp_config, &bus, mcp::Flavor::Manager).await;
|
||||
turn::emit_turn_end(&bus, &outcome);
|
||||
}
|
||||
Ok(ManagerResponse::Empty) => {}
|
||||
Ok(ManagerResponse::Ok | ManagerResponse::Status { .. }) => {
|
||||
|
|
@ -190,46 +168,6 @@ async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Drive one manager turn end-to-end with the same overflow-then-compact
|
||||
/// retry as sub-agents.
|
||||
async fn drive_turn(
|
||||
prompt: &str,
|
||||
mcp_config: &Path,
|
||||
bus: &Bus,
|
||||
flavor: mcp::Flavor,
|
||||
) -> turn::TurnOutcome {
|
||||
match turn::run_turn(prompt, mcp_config, bus, flavor).await {
|
||||
turn::TurnOutcome::PromptTooLong => {
|
||||
if let Err(e) = turn::compact_session(bus).await {
|
||||
tracing::warn!(error = %format!("{e:#}"), "compact failed");
|
||||
return turn::TurnOutcome::Failed(e);
|
||||
}
|
||||
turn::run_turn(prompt, mcp_config, bus, flavor).await
|
||||
}
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_turn_end(bus: &Bus, outcome: &turn::TurnOutcome) {
|
||||
match outcome {
|
||||
turn::TurnOutcome::Ok | turn::TurnOutcome::PromptTooLong => {
|
||||
bus.emit(LiveEvent::TurnEnd {
|
||||
ok: true,
|
||||
note: None,
|
||||
});
|
||||
tracing::info!("manager turn finished");
|
||||
}
|
||||
turn::TurnOutcome::Failed(e) => {
|
||||
let note = format!("{e:#}");
|
||||
bus.emit(LiveEvent::TurnEnd {
|
||||
ok: false,
|
||||
note: Some(note.clone()),
|
||||
});
|
||||
tracing::warn!(error = %note, "manager turn failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Manager-flavored wake prompt. Mentions the privileged tools the sub-agent
|
||||
/// prompt doesn't have access to, and points the manager at its own
|
||||
/// editable config repo for self-modification.
|
||||
|
|
|
|||
|
|
@ -5,14 +5,16 @@
|
|||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Result, bail};
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::process::Command;
|
||||
|
||||
use crate::events::{Bus, LiveEvent};
|
||||
use crate::login::{self, LoginState};
|
||||
use crate::mcp;
|
||||
|
||||
/// Inline `--settings` JSON applied to every claude invocation. We turn off
|
||||
|
|
@ -59,6 +61,67 @@ pub enum TurnOutcome {
|
|||
Failed(anyhow::Error),
|
||||
}
|
||||
|
||||
/// Drive one turn end-to-end, transparently compacting + retrying once on
|
||||
/// `Prompt is too long`. Both the sub-agent and manager loops call this.
|
||||
pub async fn drive_turn(
|
||||
prompt: &str,
|
||||
mcp_config: &Path,
|
||||
bus: &Bus,
|
||||
flavor: mcp::Flavor,
|
||||
) -> TurnOutcome {
|
||||
match run_turn(prompt, mcp_config, bus, flavor).await {
|
||||
TurnOutcome::PromptTooLong => {
|
||||
if let Err(e) = compact_session(bus).await {
|
||||
tracing::warn!(error = %format!("{e:#}"), "compact failed");
|
||||
return TurnOutcome::Failed(e);
|
||||
}
|
||||
run_turn(prompt, mcp_config, bus, flavor).await
|
||||
}
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
|
||||
/// Emit the per-turn `TurnEnd` event + log line. Single owner so the
|
||||
/// agent and manager loops agree on outcome semantics.
|
||||
pub fn emit_turn_end(bus: &Bus, outcome: &TurnOutcome) {
|
||||
match outcome {
|
||||
TurnOutcome::Ok | TurnOutcome::PromptTooLong => {
|
||||
bus.emit(LiveEvent::TurnEnd {
|
||||
ok: true,
|
||||
note: None,
|
||||
});
|
||||
tracing::info!("turn finished");
|
||||
}
|
||||
TurnOutcome::Failed(e) => {
|
||||
let note = format!("{e:#}");
|
||||
bus.emit(LiveEvent::TurnEnd {
|
||||
ok: false,
|
||||
note: Some(note.clone()),
|
||||
});
|
||||
tracing::warn!(error = %note, "turn failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Block until the bound `~/.claude/` dir contains a session, polling
|
||||
/// `claude_dir` on a `poll_ms` interval (min 2s). Flips `state` to
|
||||
/// `Online` when login lands; caller resumes its serve loop.
|
||||
pub async fn wait_for_login(claude_dir: &Path, state: Arc<Mutex<LoginState>>, poll_ms: u64) {
|
||||
tracing::warn!(
|
||||
claude_dir = %claude_dir.display(),
|
||||
"no claude session — staying in partial-run mode (web UI only)"
|
||||
);
|
||||
let probe = Duration::from_millis(poll_ms.max(2000));
|
||||
loop {
|
||||
tokio::time::sleep(probe).await;
|
||||
if login::has_session(claude_dir) {
|
||||
tracing::info!("claude session detected — entering turn loop");
|
||||
*state.lock().unwrap() = LoginState::Online;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn `claude` for one turn and pump `stream-json` stdout into the
|
||||
/// live event bus. Prompt goes over stdin (variadic
|
||||
/// `--allowedTools`/`--tools` would otherwise eat a trailing positional
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue