From fca480b86ed0ef76a691839f08458c64e7cf41ca Mon Sep 17 00:00:00 2001 From: damocles Date: Sat, 16 May 2026 19:57:03 +0200 Subject: [PATCH] add turn lock to prevent /compact racing with in-flight turns --- hive-ag3nt/src/bin/hive-ag3nt.rs | 12 +++++++++++- hive-ag3nt/src/bin/hive-m1nd.rs | 14 +++++++++++--- hive-ag3nt/src/web_ui.rs | 22 ++++++++++++++++++++++ 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index d486e80..ee6fd0c 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -2,6 +2,8 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::Duration; +use hive_ag3nt::web_ui::TurnLock; + use anyhow::Result; use clap::{Parser, Subcommand}; use hive_ag3nt::events::{Bus, LiveEvent, TurnState}; @@ -71,6 +73,7 @@ async fn main() -> Result<()> { let login_state = Arc::new(Mutex::new(initial)); let bus = Bus::new(); let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Agent).await?; + let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(())); plugins::install_configured(&cli.socket, Some("manager")).await; tokio::spawn(web_ui::serve( label, @@ -79,6 +82,7 @@ async fn main() -> Result<()> { bus.clone(), cli.socket.clone(), files.clone(), + turn_lock.clone(), )); match initial { LoginState::Online => { @@ -88,6 +92,7 @@ async fn main() -> Result<()> { login_state, bus, &files, + turn_lock, ) .await } @@ -102,6 +107,7 @@ async fn main() -> Result<()> { login_state, bus, &files, + turn_lock, ) .await } @@ -136,6 +142,7 @@ async fn serve( state: Arc>, bus: Bus, files: &turn::TurnFiles, + turn_lock: TurnLock, ) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-ag3nt serve"); let _ = state; // reserved for future state transitions (turn-loop -> needs-login) @@ -163,7 +170,10 @@ async fn serve( }); bus.set_state(TurnState::Thinking); let prompt = format_wake_prompt(&from, &body, unread); - let outcome = turn::drive_turn(&prompt, files, &bus).await; + let outcome = { + let _guard = turn_lock.lock().await; + turn::drive_turn(&prompt, files, &bus).await + }; turn::emit_turn_end(&bus, &outcome); bus.set_state(TurnState::Idle); // Failures are unhandled by definition — PromptTooLong is diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 85f7092..9d89038 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -6,6 +6,8 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::Duration; +use hive_ag3nt::web_ui::TurnLock; + use anyhow::Result; use clap::{Parser, Subcommand}; use hive_ag3nt::events::{Bus, LiveEvent, TurnState}; @@ -61,6 +63,7 @@ async fn main() -> Result<()> { let login_state = Arc::new(Mutex::new(initial)); let bus = Bus::new(); let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Manager).await?; + let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(())); plugins::install_configured(&cli.socket, None).await; tokio::spawn(web_ui::serve( label, @@ -69,14 +72,15 @@ async fn main() -> Result<()> { bus.clone(), cli.socket.clone(), files.clone(), + turn_lock.clone(), )); match initial { LoginState::Online => { - serve(&cli.socket, Duration::from_millis(poll_ms), bus, &files).await + serve(&cli.socket, Duration::from_millis(poll_ms), bus, &files, turn_lock).await } LoginState::NeedsLogin => { turn::wait_for_login(&claude_dir, login_state, poll_ms).await; - serve(&cli.socket, Duration::from_millis(poll_ms), bus, &files).await + serve(&cli.socket, Duration::from_millis(poll_ms), bus, &files, turn_lock).await } } } @@ -89,6 +93,7 @@ async fn serve( interval: Duration, bus: Bus, files: &turn::TurnFiles, + turn_lock: TurnLock, ) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-m1nd serve"); loop { @@ -131,7 +136,10 @@ async fn serve( }); let prompt = format_wake_prompt(&from, &body, unread); bus.set_state(TurnState::Thinking); - let outcome = turn::drive_turn(&prompt, files, &bus).await; + let outcome = { + let _guard = turn_lock.lock().await; + turn::drive_turn(&prompt, files, &bus).await + }; turn::emit_turn_end(&bus, &outcome); bus.set_state(TurnState::Idle); // Check for messages that arrived during the turn and loop diff --git a/hive-ag3nt/src/web_ui.rs b/hive-ag3nt/src/web_ui.rs index 931ec2d..02a9df2 100644 --- a/hive-ag3nt/src/web_ui.rs +++ b/hive-ag3nt/src/web_ui.rs @@ -36,6 +36,12 @@ use crate::turn::TurnFiles; /// render. pub type LoginStateCell = Arc>; +/// Shared turn lock. The serve loop acquires this (as an async mutex) for the +/// duration of every `drive_turn` call. The `/api/compact` handler tries +/// `try_lock()` and rejects immediately if a turn is in flight, preventing +/// concurrent access to the claude session. +pub type TurnLock = Arc>; + #[derive(Clone)] struct AppState { label: String, @@ -48,6 +54,8 @@ struct AppState { /// settings claude saw on the last regular turn — keeps the /// session shape identical across compact + normal turns. files: TurnFiles, + /// Prevents `/api/compact` from racing with an in-flight normal turn. + turn_lock: TurnLock, } impl AppState { @@ -70,6 +78,7 @@ pub async fn serve( bus: Bus, socket: PathBuf, files: TurnFiles, + turn_lock: TurnLock, ) -> Result<()> { let state = AppState { label, @@ -78,6 +87,7 @@ pub async fn serve( bus, socket, files, + turn_lock, }; let app = Router::new() .route("/", get(serve_index)) @@ -406,9 +416,21 @@ async fn post_set_model(State(state): State, Form(form): Form) -> Response { + // Clone the Arc before locking so the guard's lifetime is tied to the + // clone (which we can move into the spawn) rather than to `state`. + let lock = state.turn_lock.clone(); + // Reject immediately if a normal turn is in flight — concurrent access + // to the claude session is unsafe and produces garbled output. + let guard = match lock.try_lock_owned() { + Ok(g) => g, + Err(_) => { + return error_response("turn in flight — wait for it to finish before compacting"); + } + }; let bus = state.bus.clone(); let files = state.files.clone(); tokio::spawn(async move { + let _guard = guard; // keep lock alive for the duration of compaction bus.emit(crate::events::LiveEvent::Note( "operator: /compact — running on persistent session".into(), ));