agent: forward unhandled turn failures to manager
run_claude now keeps a 20-line stderr ring buffer and bails with it inline (was just 'exit <status>'). agent serve loop, on Failed (not PromptTooLong — that's already absorbed by drive_turn's compaction retry), sends the error body to manager via the normal hyperhive send. swallows transport errors — failure is already in journald and the events sqlite. manager-only harness (hive-m1nd) is unchanged so it doesn't try to notify itself.
This commit is contained in:
parent
7ec658851a
commit
3e040d5b16
2 changed files with 50 additions and 1 deletions
|
|
@ -166,6 +166,14 @@ async fn serve(
|
|||
let outcome = turn::drive_turn(&prompt, files, &bus).await;
|
||||
turn::emit_turn_end(&bus, &outcome);
|
||||
bus.set_state(TurnState::Idle);
|
||||
// Failures are unhandled by definition — PromptTooLong is
|
||||
// absorbed inside drive_turn via compaction, so anything
|
||||
// that reaches Failed here is a real crash. Notify the
|
||||
// manager so it can investigate / restart / page the
|
||||
// operator; best-effort, swallow the send error.
|
||||
if let turn::TurnOutcome::Failed(e) = &outcome {
|
||||
notify_manager_of_failure(socket, e).await;
|
||||
}
|
||||
|
||||
// After turn completes, check if there are pending messages waiting.
|
||||
// If so, immediately process them instead of blocking on recv().
|
||||
|
|
@ -214,6 +222,29 @@ fn format_wake_prompt(from: &str, body: &str, unread: u64) -> String {
|
|||
format!("Incoming message from `{from}`:\n---\n{body}\n---{pending}")
|
||||
}
|
||||
|
||||
/// Best-effort: tell the manager that this agent's last turn crashed
|
||||
/// (claude exited non-zero, compaction didn't help, etc.). Routed
|
||||
/// through the normal send path so the manager's inbox surfaces it
|
||||
/// like any other message; the agent's label is what the broker
|
||||
/// stamps as `from`, so the message body doesn't need to repeat it.
|
||||
/// Swallows transport errors — we just logged the failure, the worst
|
||||
/// case is the manager learns about the crash from the dashboard
|
||||
/// instead of inbox.
|
||||
async fn notify_manager_of_failure(socket: &Path, err: &anyhow::Error) {
|
||||
let body = format!("claude turn failed:\n{err:#}");
|
||||
let res = client::request::<_, AgentResponse>(
|
||||
socket,
|
||||
&AgentRequest::Send {
|
||||
to: "manager".into(),
|
||||
body,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
tracing::warn!(error = ?e, "failed to notify manager of turn failure");
|
||||
}
|
||||
}
|
||||
|
||||
/// Best-effort: ask our own per-agent socket how many messages are still
|
||||
/// pending after the wake-up Recv. Returns 0 if anything goes wrong.
|
||||
async fn inbox_unread(socket: &Path) -> u64 {
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
//! manager surface) and their wake-prompt wording; the spawn shape,
|
||||
//! arg-vector, stdin plumbing, and stream-json pumping are identical.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::Stdio;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
|
@ -281,6 +282,13 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<bool>
|
|||
}
|
||||
}
|
||||
});
|
||||
// Keep the last STDERR_TAIL_LINES of stderr so a non-zero exit can
|
||||
// include real context in the bail message (and downstream in the
|
||||
// failure notification to the manager) instead of just "exit 1".
|
||||
const STDERR_TAIL_LINES: usize = 20;
|
||||
let stderr_tail: Arc<Mutex<VecDeque<String>>> =
|
||||
Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_TAIL_LINES)));
|
||||
let tail_clone = stderr_tail.clone();
|
||||
let pump_stderr = tokio::spawn(async move {
|
||||
let mut reader = BufReader::new(stderr).lines();
|
||||
while let Ok(Some(line)) = reader.next_line().await {
|
||||
|
|
@ -293,6 +301,11 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<bool>
|
|||
// surfaces when claude exits non-zero.
|
||||
tracing::warn!(line = %line, "claude stderr");
|
||||
bus_err.emit(LiveEvent::Note(format!("stderr: {line}")));
|
||||
let mut t = tail_clone.lock().unwrap();
|
||||
if t.len() >= STDERR_TAIL_LINES {
|
||||
t.pop_front();
|
||||
}
|
||||
t.push_back(line);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -301,7 +314,12 @@ async fn run_claude(prompt: &str, files: &TurnFiles, bus: &Bus) -> Result<bool>
|
|||
let _ = pump_stderr.await;
|
||||
let too_long = prompt_too_long.load(Ordering::Relaxed);
|
||||
if !status.success() && !too_long {
|
||||
bail!("claude exited {status}");
|
||||
let tail = stderr_tail.lock().unwrap();
|
||||
if tail.is_empty() {
|
||||
bail!("claude exited {status} (no stderr)");
|
||||
}
|
||||
let tail_str = tail.iter().cloned().collect::<Vec<_>>().join("\n");
|
||||
bail!("claude exited {status}\nstderr tail:\n{tail_str}");
|
||||
}
|
||||
Ok(too_long)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue