add turn lock to prevent /compact racing with in-flight turns

This commit is contained in:
damocles 2026-05-16 19:57:03 +02:00
parent 25508d7399
commit fca480b86e
3 changed files with 44 additions and 4 deletions

View file

@ -2,6 +2,8 @@ use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use hive_ag3nt::web_ui::TurnLock;
use anyhow::Result; use anyhow::Result;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use hive_ag3nt::events::{Bus, LiveEvent, TurnState}; use hive_ag3nt::events::{Bus, LiveEvent, TurnState};
@ -71,6 +73,7 @@ async fn main() -> Result<()> {
let login_state = Arc::new(Mutex::new(initial)); let login_state = Arc::new(Mutex::new(initial));
let bus = Bus::new(); let bus = Bus::new();
let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Agent).await?; let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Agent).await?;
let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(()));
plugins::install_configured(&cli.socket, Some("manager")).await; plugins::install_configured(&cli.socket, Some("manager")).await;
tokio::spawn(web_ui::serve( tokio::spawn(web_ui::serve(
label, label,
@ -79,6 +82,7 @@ async fn main() -> Result<()> {
bus.clone(), bus.clone(),
cli.socket.clone(), cli.socket.clone(),
files.clone(), files.clone(),
turn_lock.clone(),
)); ));
match initial { match initial {
LoginState::Online => { LoginState::Online => {
@ -88,6 +92,7 @@ async fn main() -> Result<()> {
login_state, login_state,
bus, bus,
&files, &files,
turn_lock,
) )
.await .await
} }
@ -102,6 +107,7 @@ async fn main() -> Result<()> {
login_state, login_state,
bus, bus,
&files, &files,
turn_lock,
) )
.await .await
} }
@ -136,6 +142,7 @@ async fn serve(
state: Arc<Mutex<LoginState>>, state: Arc<Mutex<LoginState>>,
bus: Bus, bus: Bus,
files: &turn::TurnFiles, files: &turn::TurnFiles,
turn_lock: TurnLock,
) -> Result<()> { ) -> Result<()> {
tracing::info!(socket = %socket.display(), "hive-ag3nt serve"); tracing::info!(socket = %socket.display(), "hive-ag3nt serve");
let _ = state; // reserved for future state transitions (turn-loop -> needs-login) let _ = state; // reserved for future state transitions (turn-loop -> needs-login)
@ -163,7 +170,10 @@ async fn serve(
}); });
bus.set_state(TurnState::Thinking); bus.set_state(TurnState::Thinking);
let prompt = format_wake_prompt(&from, &body, unread); let prompt = format_wake_prompt(&from, &body, unread);
let outcome = turn::drive_turn(&prompt, files, &bus).await; let outcome = {
let _guard = turn_lock.lock().await;
turn::drive_turn(&prompt, files, &bus).await
};
turn::emit_turn_end(&bus, &outcome); turn::emit_turn_end(&bus, &outcome);
bus.set_state(TurnState::Idle); bus.set_state(TurnState::Idle);
// Failures are unhandled by definition — PromptTooLong is // Failures are unhandled by definition — PromptTooLong is

View file

@ -6,6 +6,8 @@ use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use hive_ag3nt::web_ui::TurnLock;
use anyhow::Result; use anyhow::Result;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use hive_ag3nt::events::{Bus, LiveEvent, TurnState}; use hive_ag3nt::events::{Bus, LiveEvent, TurnState};
@ -61,6 +63,7 @@ async fn main() -> Result<()> {
let login_state = Arc::new(Mutex::new(initial)); let login_state = Arc::new(Mutex::new(initial));
let bus = Bus::new(); let bus = Bus::new();
let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Manager).await?; let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Manager).await?;
let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(()));
plugins::install_configured(&cli.socket, None).await; plugins::install_configured(&cli.socket, None).await;
tokio::spawn(web_ui::serve( tokio::spawn(web_ui::serve(
label, label,
@ -69,14 +72,15 @@ async fn main() -> Result<()> {
bus.clone(), bus.clone(),
cli.socket.clone(), cli.socket.clone(),
files.clone(), files.clone(),
turn_lock.clone(),
)); ));
match initial { match initial {
LoginState::Online => { LoginState::Online => {
serve(&cli.socket, Duration::from_millis(poll_ms), bus, &files).await serve(&cli.socket, Duration::from_millis(poll_ms), bus, &files, turn_lock).await
} }
LoginState::NeedsLogin => { LoginState::NeedsLogin => {
turn::wait_for_login(&claude_dir, login_state, poll_ms).await; turn::wait_for_login(&claude_dir, login_state, poll_ms).await;
serve(&cli.socket, Duration::from_millis(poll_ms), bus, &files).await serve(&cli.socket, Duration::from_millis(poll_ms), bus, &files, turn_lock).await
} }
} }
} }
@ -89,6 +93,7 @@ async fn serve(
interval: Duration, interval: Duration,
bus: Bus, bus: Bus,
files: &turn::TurnFiles, files: &turn::TurnFiles,
turn_lock: TurnLock,
) -> Result<()> { ) -> Result<()> {
tracing::info!(socket = %socket.display(), "hive-m1nd serve"); tracing::info!(socket = %socket.display(), "hive-m1nd serve");
loop { loop {
@ -131,7 +136,10 @@ async fn serve(
}); });
let prompt = format_wake_prompt(&from, &body, unread); let prompt = format_wake_prompt(&from, &body, unread);
bus.set_state(TurnState::Thinking); bus.set_state(TurnState::Thinking);
let outcome = turn::drive_turn(&prompt, files, &bus).await; let outcome = {
let _guard = turn_lock.lock().await;
turn::drive_turn(&prompt, files, &bus).await
};
turn::emit_turn_end(&bus, &outcome); turn::emit_turn_end(&bus, &outcome);
bus.set_state(TurnState::Idle); bus.set_state(TurnState::Idle);
// Check for messages that arrived during the turn and loop // Check for messages that arrived during the turn and loop

View file

@ -36,6 +36,12 @@ use crate::turn::TurnFiles;
/// render. /// render.
pub type LoginStateCell = Arc<Mutex<LoginState>>; pub type LoginStateCell = Arc<Mutex<LoginState>>;
/// Shared turn lock. The serve loop acquires this (as an async mutex) for the
/// duration of every `drive_turn` call. The `/api/compact` handler tries
/// `try_lock()` and rejects immediately if a turn is in flight, preventing
/// concurrent access to the claude session.
pub type TurnLock = Arc<tokio::sync::Mutex<()>>;
#[derive(Clone)] #[derive(Clone)]
struct AppState { struct AppState {
label: String, label: String,
@ -48,6 +54,8 @@ struct AppState {
/// settings claude saw on the last regular turn — keeps the /// settings claude saw on the last regular turn — keeps the
/// session shape identical across compact + normal turns. /// session shape identical across compact + normal turns.
files: TurnFiles, files: TurnFiles,
/// Prevents `/api/compact` from racing with an in-flight normal turn.
turn_lock: TurnLock,
} }
impl AppState { impl AppState {
@ -70,6 +78,7 @@ pub async fn serve(
bus: Bus, bus: Bus,
socket: PathBuf, socket: PathBuf,
files: TurnFiles, files: TurnFiles,
turn_lock: TurnLock,
) -> Result<()> { ) -> Result<()> {
let state = AppState { let state = AppState {
label, label,
@ -78,6 +87,7 @@ pub async fn serve(
bus, bus,
socket, socket,
files, files,
turn_lock,
}; };
let app = Router::new() let app = Router::new()
.route("/", get(serve_index)) .route("/", get(serve_index))
@ -406,9 +416,21 @@ async fn post_set_model(State(state): State<AppState>, Form(form): Form<ModelFor
} }
async fn post_compact(State(state): State<AppState>) -> Response { async fn post_compact(State(state): State<AppState>) -> Response {
// Clone the Arc before locking so the guard's lifetime is tied to the
// clone (which we can move into the spawn) rather than to `state`.
let lock = state.turn_lock.clone();
// Reject immediately if a normal turn is in flight — concurrent access
// to the claude session is unsafe and produces garbled output.
let guard = match lock.try_lock_owned() {
Ok(g) => g,
Err(_) => {
return error_response("turn in flight — wait for it to finish before compacting");
}
};
let bus = state.bus.clone(); let bus = state.bus.clone();
let files = state.files.clone(); let files = state.files.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _guard = guard; // keep lock alive for the duration of compaction
bus.emit(crate::events::LiveEvent::Note( bus.emit(crate::events::LiveEvent::Note(
"operator: /compact — running on persistent session".into(), "operator: /compact — running on persistent session".into(),
)); ));