refactor: split long functions per review feedback; remove all #[allow] attributes
This commit is contained in:
parent
bbe2112dc9
commit
748536203b
5 changed files with 429 additions and 432 deletions
|
|
@ -149,11 +149,11 @@ async fn main() -> Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments, clippy::similar_names, clippy::too_many_lines)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn serve(
|
async fn serve(
|
||||||
socket: &Path,
|
socket: &Path,
|
||||||
interval: Duration,
|
interval: Duration,
|
||||||
state: Arc<Mutex<LoginState>>,
|
_login_state: Arc<Mutex<LoginState>>,
|
||||||
bus: Bus,
|
bus: Bus,
|
||||||
stats: Option<TurnStats>,
|
stats: Option<TurnStats>,
|
||||||
files: &turn::TurnFiles,
|
files: &turn::TurnFiles,
|
||||||
|
|
@ -161,25 +161,12 @@ async fn serve(
|
||||||
label: &str,
|
label: &str,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
tracing::info!(socket = %socket.display(), "hive-ag3nt serve");
|
tracing::info!(socket = %socket.display(), "hive-ag3nt serve");
|
||||||
let _ = state; // reserved for future state transitions (turn-loop -> needs-login)
|
|
||||||
// Boot-time recovery: ask the broker to resurface anything we
|
|
||||||
// popped in a previous harness session but never acked
|
|
||||||
// (crashed mid-turn / OOM / container restart). The broker
|
|
||||||
// resets `delivered_at = NULL` on those rows and remembers
|
|
||||||
// their ids so the next `Recv` tags them `redelivered: true`;
|
|
||||||
// we then prepend a "may already be handled" hint to the wake
|
|
||||||
// prompt. Single shot before entering the serve loop; idempotent
|
|
||||||
// when there's nothing inflight.
|
|
||||||
requeue_inflight(socket).await;
|
requeue_inflight(socket).await;
|
||||||
loop {
|
loop {
|
||||||
let recv: Result<AgentResponse> =
|
let recv: Result<AgentResponse> =
|
||||||
// Explicit long-poll: the new agent_server semantics treat
|
// Explicit long-poll: park until a message arrives (180s cap).
|
||||||
// `None` as "peek, don't wait", which would tight-loop on
|
// `max: None` (= 1) — one turn per wake; claude calls
|
||||||
// sleep(interval). The harness wants to park until a
|
// recv(max: N) in-turn to drain bursts.
|
||||||
// message arrives, so opt into the full 180s cap.
|
|
||||||
// `max: None` (= 1) — the serve loop drives one turn per
|
|
||||||
// wake; claude itself calls recv(max: N) in-turn to drain
|
|
||||||
// a burst when the wake prompt mentions pending.
|
|
||||||
client::request(
|
client::request(
|
||||||
socket,
|
socket,
|
||||||
&AgentRequest::Recv {
|
&AgentRequest::Recv {
|
||||||
|
|
@ -191,93 +178,7 @@ async fn serve(
|
||||||
match recv {
|
match recv {
|
||||||
Ok(AgentResponse::Messages { messages }) if !messages.is_empty() => {
|
Ok(AgentResponse::Messages { messages }) if !messages.is_empty() => {
|
||||||
let first = messages.into_iter().next().expect("checked non-empty");
|
let first = messages.into_iter().next().expect("checked non-empty");
|
||||||
let from = first.from;
|
handle_agent_turn(socket, &bus, stats.as_ref(), files, &turn_lock, label, first).await;
|
||||||
let body = first.body;
|
|
||||||
let redelivered = first.redelivered;
|
|
||||||
tracing::info!(%from, %body, %redelivered, "inbox");
|
|
||||||
let unread = inbox_unread(socket).await;
|
|
||||||
bus.emit(LiveEvent::TurnStart {
|
|
||||||
from: from.clone(),
|
|
||||||
body: body.clone(),
|
|
||||||
unread,
|
|
||||||
});
|
|
||||||
bus.set_state(TurnState::Thinking);
|
|
||||||
let started_at = serve_common::now_unix();
|
|
||||||
let started_instant = std::time::Instant::now();
|
|
||||||
let model_at_start = bus.model();
|
|
||||||
let prompt = serve_common::format_wake_prompt(&from, &body, unread, redelivered);
|
|
||||||
let outcome = {
|
|
||||||
let _guard = turn_lock.lock().await;
|
|
||||||
turn::drive_turn(&prompt, files, &bus).await
|
|
||||||
};
|
|
||||||
turn::emit_turn_end(&bus, &outcome);
|
|
||||||
bus.set_state(TurnState::Idle);
|
|
||||||
// Ack only on a clean turn-end. `Failed` leaves every
|
|
||||||
// message popped during the turn in the unacked list;
|
|
||||||
// next harness boot's `RequeueInflight` will reset
|
|
||||||
// `delivered_at = NULL` and tag them `redelivered`.
|
|
||||||
// `PromptTooLong` is absorbed inside `drive_turn` via
|
|
||||||
// compaction so it shouldn't reach here, but if it
|
|
||||||
// does we also skip the ack (safer to redeliver than
|
|
||||||
// to lose the message).
|
|
||||||
if matches!(outcome, turn::TurnOutcome::Ok | turn::TurnOutcome::Compacted) {
|
|
||||||
ack_turn(socket).await;
|
|
||||||
}
|
|
||||||
// Rate-limited: park until the quota resets, then requeue
|
|
||||||
// the unacked message so it resurfaces in the same session.
|
|
||||||
if matches!(outcome, turn::TurnOutcome::RateLimited) {
|
|
||||||
let secs = turn::rate_limit_sleep_secs();
|
|
||||||
bus.emit_status("rate_limited");
|
|
||||||
bus.emit(LiveEvent::Note {
|
|
||||||
text: format!(
|
|
||||||
"API rate-limited — sleeping {secs}s before retry"
|
|
||||||
),
|
|
||||||
});
|
|
||||||
tracing::warn!(sleep_secs = secs, "rate-limited; parking");
|
|
||||||
tokio::time::sleep(Duration::from_secs(secs)).await;
|
|
||||||
requeue_inflight(socket).await;
|
|
||||||
bus.emit_status("online");
|
|
||||||
}
|
|
||||||
// Failures are unhandled by definition — PromptTooLong is
|
|
||||||
// absorbed inside drive_turn via compaction, so anything
|
|
||||||
// that reaches Failed here is a real crash. Notify the
|
|
||||||
// manager so it can investigate / restart / page the
|
|
||||||
// operator; best-effort, swallow the send error.
|
|
||||||
if let turn::TurnOutcome::Failed(e) = &outcome {
|
|
||||||
notify_manager_of_failure(socket, label, e).await;
|
|
||||||
}
|
|
||||||
if let Some(s) = &stats {
|
|
||||||
let ended_at = serve_common::now_unix();
|
|
||||||
let duration_ms =
|
|
||||||
i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX);
|
|
||||||
let (open_threads, open_reminders) = fetch_agent_post_turn_counts(socket).await;
|
|
||||||
let row = serve_common::build_row(
|
|
||||||
started_at,
|
|
||||||
ended_at,
|
|
||||||
duration_ms,
|
|
||||||
model_at_start,
|
|
||||||
from.clone(),
|
|
||||||
&outcome,
|
|
||||||
&bus,
|
|
||||||
open_threads,
|
|
||||||
open_reminders,
|
|
||||||
);
|
|
||||||
s.record(&row);
|
|
||||||
}
|
|
||||||
|
|
||||||
// After turn completes, log whether messages arrived during
|
|
||||||
// the turn — the outer loop will iterate back to recv() on
|
|
||||||
// its own (the Empty-arm sleep only fires when recv
|
|
||||||
// actually returned Empty), so no explicit continue needed.
|
|
||||||
let pending = inbox_unread(socket).await;
|
|
||||||
if pending > 0 {
|
|
||||||
tracing::info!(%pending, "pending messages after turn; fetching next");
|
|
||||||
}
|
|
||||||
// `request_next_turn` MCP tool: agent wrote a sentinel
|
|
||||||
// requesting an immediate self-continuation turn. Clear
|
|
||||||
// the file and inject a synthetic wake so the outer loop
|
|
||||||
// fires a bare turn even if the inbox is empty.
|
|
||||||
check_and_inject_continue(socket, label).await;
|
|
||||||
}
|
}
|
||||||
Ok(AgentResponse::Messages { .. }) => {
|
Ok(AgentResponse::Messages { .. }) => {
|
||||||
// Idle: empty list = nothing pending. Brief sleep
|
// Idle: empty list = nothing pending. Brief sleep
|
||||||
|
|
@ -307,6 +208,80 @@ async fn serve(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Drive one turn for a received agent-inbox message.
|
||||||
|
async fn handle_agent_turn(
|
||||||
|
socket: &Path,
|
||||||
|
bus: &Bus,
|
||||||
|
stats: Option<&TurnStats>,
|
||||||
|
files: &turn::TurnFiles,
|
||||||
|
turn_lock: &TurnLock,
|
||||||
|
label: &str,
|
||||||
|
first: hive_sh4re::DeliveredMessage,
|
||||||
|
) {
|
||||||
|
let from = first.from;
|
||||||
|
let body = first.body;
|
||||||
|
let redelivered = first.redelivered;
|
||||||
|
tracing::info!(%from, %body, %redelivered, "inbox");
|
||||||
|
let unread = inbox_unread(socket).await;
|
||||||
|
bus.emit(LiveEvent::TurnStart { from: from.clone(), body: body.clone(), unread });
|
||||||
|
bus.set_state(TurnState::Thinking);
|
||||||
|
let started_at = serve_common::now_unix();
|
||||||
|
let started_instant = std::time::Instant::now();
|
||||||
|
let model_at_start = bus.model();
|
||||||
|
let prompt = serve_common::format_wake_prompt(&from, &body, unread, redelivered);
|
||||||
|
let outcome = {
|
||||||
|
let _guard = turn_lock.lock().await;
|
||||||
|
turn::drive_turn(&prompt, files, bus).await
|
||||||
|
};
|
||||||
|
turn::emit_turn_end(bus, &outcome);
|
||||||
|
bus.set_state(TurnState::Idle);
|
||||||
|
// Ack only on a clean turn-end. `Failed` leaves every message popped
|
||||||
|
// during the turn in the unacked list; next harness boot requeues them.
|
||||||
|
if matches!(outcome, turn::TurnOutcome::Ok | turn::TurnOutcome::Compacted) {
|
||||||
|
ack_turn(socket).await;
|
||||||
|
}
|
||||||
|
if matches!(outcome, turn::TurnOutcome::RateLimited) {
|
||||||
|
let secs = turn::rate_limit_sleep_secs();
|
||||||
|
bus.emit_status("rate_limited");
|
||||||
|
bus.emit(LiveEvent::Note {
|
||||||
|
text: format!("API rate-limited — sleeping {secs}s before retry"),
|
||||||
|
});
|
||||||
|
tracing::warn!(sleep_secs = secs, "rate-limited; parking");
|
||||||
|
tokio::time::sleep(Duration::from_secs(secs)).await;
|
||||||
|
requeue_inflight(socket).await;
|
||||||
|
bus.emit_status("online");
|
||||||
|
}
|
||||||
|
// Real crash: PromptTooLong is absorbed by compaction inside drive_turn.
|
||||||
|
if let turn::TurnOutcome::Failed(e) = &outcome {
|
||||||
|
notify_manager_of_failure(socket, label, e).await;
|
||||||
|
}
|
||||||
|
if let Some(stats) = stats {
|
||||||
|
let ended_at = serve_common::now_unix();
|
||||||
|
let duration_ms =
|
||||||
|
i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX);
|
||||||
|
let (open_threads, open_reminders) = fetch_agent_post_turn_counts(socket).await;
|
||||||
|
let row = serve_common::build_row(
|
||||||
|
started_at,
|
||||||
|
ended_at,
|
||||||
|
duration_ms,
|
||||||
|
model_at_start,
|
||||||
|
from.clone(),
|
||||||
|
&outcome,
|
||||||
|
bus,
|
||||||
|
open_threads,
|
||||||
|
open_reminders,
|
||||||
|
);
|
||||||
|
stats.record(&row);
|
||||||
|
}
|
||||||
|
let pending = inbox_unread(socket).await;
|
||||||
|
if pending > 0 {
|
||||||
|
tracing::info!(%pending, "pending messages after turn; fetching next");
|
||||||
|
}
|
||||||
|
// `request_next_turn` MCP tool: agent wrote a sentinel requesting
|
||||||
|
// an immediate self-continuation. Clear and inject synthetic wake.
|
||||||
|
check_and_inject_continue(socket, label).await;
|
||||||
|
}
|
||||||
|
|
||||||
// Per-turn user prompt: the role/tools/etc. is in the system prompt
|
// Per-turn user prompt: the role/tools/etc. is in the system prompt
|
||||||
// (`prompts/agent.md` → `claude --system-prompt-file`); this is just the
|
// (`prompts/agent.md` → `claude --system-prompt-file`); this is just the
|
||||||
// wake signal claude reacts to. `unread` is the count of *other*
|
// wake signal claude reacts to. `unread` is the count of *other*
|
||||||
|
|
|
||||||
|
|
@ -113,7 +113,6 @@ async fn main() -> Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_lines)] // linear startup sequence; splitting adds indirection without clarity
|
|
||||||
async fn serve(
|
async fn serve(
|
||||||
socket: &Path,
|
socket: &Path,
|
||||||
interval: Duration,
|
interval: Duration,
|
||||||
|
|
@ -145,93 +144,7 @@ async fn serve(
|
||||||
match recv {
|
match recv {
|
||||||
Ok(ManagerResponse::Messages { messages }) if !messages.is_empty() => {
|
Ok(ManagerResponse::Messages { messages }) if !messages.is_empty() => {
|
||||||
let first = messages.into_iter().next().expect("checked non-empty");
|
let first = messages.into_iter().next().expect("checked non-empty");
|
||||||
let from = first.from;
|
handle_manager_turn(socket, &bus, stats.as_ref(), files, &turn_lock, first).await;
|
||||||
let body = first.body;
|
|
||||||
let redelivered = first.redelivered;
|
|
||||||
if from == SYSTEM_SENDER {
|
|
||||||
// Helper events (ApprovalResolved / Spawned / Rebuilt /
|
|
||||||
// Killed / Destroyed) — these are FYI for the manager;
|
|
||||||
// we surface them in the live view and forward them as
|
|
||||||
// a normal claude turn so the manager can react (e.g.
|
|
||||||
// greet a newly-spawned agent, retry a failed rebuild).
|
|
||||||
let parsed = serde_json::from_str::<HelperEvent>(&body).ok();
|
|
||||||
if let Some(event) = parsed {
|
|
||||||
tracing::info!(?event, "helper event");
|
|
||||||
} else {
|
|
||||||
tracing::info!(%from, %body, "system message");
|
|
||||||
}
|
|
||||||
bus.emit(LiveEvent::Note {
|
|
||||||
text: format!("[system] {body}"),
|
|
||||||
});
|
|
||||||
// Fall through: drive a turn with the event in the wake
|
|
||||||
// prompt body so claude sees it. Sender stays "system"
|
|
||||||
// so the wake prompt can label it as such.
|
|
||||||
}
|
|
||||||
tracing::info!(%from, %body, %redelivered, "manager inbox");
|
|
||||||
let unread = inbox_unread(socket).await;
|
|
||||||
bus.emit(LiveEvent::TurnStart {
|
|
||||||
from: from.clone(),
|
|
||||||
body: body.clone(),
|
|
||||||
unread,
|
|
||||||
});
|
|
||||||
let prompt = serve_common::format_wake_prompt(&from, &body, unread, redelivered);
|
|
||||||
bus.set_state(TurnState::Thinking);
|
|
||||||
let started_at = serve_common::now_unix();
|
|
||||||
let started_instant = std::time::Instant::now();
|
|
||||||
let model_at_start = bus.model();
|
|
||||||
let outcome = {
|
|
||||||
let _guard = turn_lock.lock().await;
|
|
||||||
turn::drive_turn(&prompt, files, &bus).await
|
|
||||||
};
|
|
||||||
turn::emit_turn_end(&bus, &outcome);
|
|
||||||
bus.set_state(TurnState::Idle);
|
|
||||||
// Ack only on a clean turn-end; Failed / RateLimited leave
|
|
||||||
// the popped ids in-flight for the next boot's requeue.
|
|
||||||
// Mirrors hive-ag3nt; see that loop for full rationale.
|
|
||||||
if matches!(outcome, turn::TurnOutcome::Ok | turn::TurnOutcome::Compacted) {
|
|
||||||
ack_turn(socket).await;
|
|
||||||
}
|
|
||||||
// Rate-limited: park until the quota resets, then requeue
|
|
||||||
// the unacked message so it resurfaces in the same session.
|
|
||||||
if matches!(outcome, turn::TurnOutcome::RateLimited) {
|
|
||||||
let secs = turn::rate_limit_sleep_secs();
|
|
||||||
bus.emit_status("rate_limited");
|
|
||||||
bus.emit(LiveEvent::Note {
|
|
||||||
text: format!(
|
|
||||||
"API rate-limited — sleeping {secs}s before retry"
|
|
||||||
),
|
|
||||||
});
|
|
||||||
tracing::warn!(sleep_secs = secs, "rate-limited; parking");
|
|
||||||
tokio::time::sleep(Duration::from_secs(secs)).await;
|
|
||||||
requeue_inflight(socket).await;
|
|
||||||
bus.emit_status("online");
|
|
||||||
}
|
|
||||||
if let Some(s) = &stats {
|
|
||||||
let ended_at = serve_common::now_unix();
|
|
||||||
let duration_ms =
|
|
||||||
i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX);
|
|
||||||
let (open_threads, open_reminders) =
|
|
||||||
fetch_manager_post_turn_counts(socket).await;
|
|
||||||
let row = serve_common::build_row(
|
|
||||||
started_at,
|
|
||||||
ended_at,
|
|
||||||
duration_ms,
|
|
||||||
model_at_start,
|
|
||||||
from.clone(),
|
|
||||||
&outcome,
|
|
||||||
&bus,
|
|
||||||
open_threads,
|
|
||||||
open_reminders,
|
|
||||||
);
|
|
||||||
s.record(&row);
|
|
||||||
}
|
|
||||||
// Check for messages that arrived during the turn so we
|
|
||||||
// surface "draining" in the logs. The loop will already
|
|
||||||
// re-iterate from here — no explicit continue needed.
|
|
||||||
let pending = inbox_unread(socket).await;
|
|
||||||
if pending > 0 {
|
|
||||||
tracing::info!(%pending, "pending messages after turn; fetching next");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(ManagerResponse::Messages { .. }) => {
|
Ok(ManagerResponse::Messages { .. }) => {
|
||||||
// Idle: empty list = nothing pending. Brief sleep
|
// Idle: empty list = nothing pending. Brief sleep
|
||||||
|
|
@ -261,6 +174,84 @@ async fn serve(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Drive one turn for a received manager-inbox message. Called from the
|
||||||
|
/// serve loop for the non-empty-messages arm to keep that loop readable.
|
||||||
|
async fn handle_manager_turn(
|
||||||
|
socket: &Path,
|
||||||
|
bus: &Bus,
|
||||||
|
stats: Option<&TurnStats>,
|
||||||
|
files: &turn::TurnFiles,
|
||||||
|
turn_lock: &TurnLock,
|
||||||
|
first: hive_sh4re::DeliveredMessage,
|
||||||
|
) {
|
||||||
|
let from = first.from;
|
||||||
|
let body = first.body;
|
||||||
|
let redelivered = first.redelivered;
|
||||||
|
if from == SYSTEM_SENDER {
|
||||||
|
// Helper events (ApprovalResolved / Spawned / Rebuilt /
|
||||||
|
// Killed / Destroyed) — surface in the live view and drive a
|
||||||
|
// normal turn so the manager can react.
|
||||||
|
let parsed = serde_json::from_str::<HelperEvent>(&body).ok();
|
||||||
|
if let Some(event) = parsed {
|
||||||
|
tracing::info!(?event, "helper event");
|
||||||
|
} else {
|
||||||
|
tracing::info!(%from, %body, "system message");
|
||||||
|
}
|
||||||
|
bus.emit(LiveEvent::Note { text: format!("[system] {body}") });
|
||||||
|
}
|
||||||
|
tracing::info!(%from, %body, %redelivered, "manager inbox");
|
||||||
|
let unread = inbox_unread(socket).await;
|
||||||
|
bus.emit(LiveEvent::TurnStart { from: from.clone(), body: body.clone(), unread });
|
||||||
|
let prompt = serve_common::format_wake_prompt(&from, &body, unread, redelivered);
|
||||||
|
bus.set_state(TurnState::Thinking);
|
||||||
|
let started_at = serve_common::now_unix();
|
||||||
|
let started_instant = std::time::Instant::now();
|
||||||
|
let model_at_start = bus.model();
|
||||||
|
let outcome = {
|
||||||
|
let _guard = turn_lock.lock().await;
|
||||||
|
turn::drive_turn(&prompt, files, bus).await
|
||||||
|
};
|
||||||
|
turn::emit_turn_end(bus, &outcome);
|
||||||
|
bus.set_state(TurnState::Idle);
|
||||||
|
// Ack only on a clean turn-end; Failed / RateLimited leave the
|
||||||
|
// popped ids in-flight for the next boot's requeue.
|
||||||
|
if matches!(outcome, turn::TurnOutcome::Ok | turn::TurnOutcome::Compacted) {
|
||||||
|
ack_turn(socket).await;
|
||||||
|
}
|
||||||
|
if matches!(outcome, turn::TurnOutcome::RateLimited) {
|
||||||
|
let secs = turn::rate_limit_sleep_secs();
|
||||||
|
bus.emit_status("rate_limited");
|
||||||
|
bus.emit(LiveEvent::Note {
|
||||||
|
text: format!("API rate-limited — sleeping {secs}s before retry"),
|
||||||
|
});
|
||||||
|
tracing::warn!(sleep_secs = secs, "rate-limited; parking");
|
||||||
|
tokio::time::sleep(Duration::from_secs(secs)).await;
|
||||||
|
requeue_inflight(socket).await;
|
||||||
|
bus.emit_status("online");
|
||||||
|
}
|
||||||
|
if let Some(stats) = stats {
|
||||||
|
let ended_at = serve_common::now_unix();
|
||||||
|
let duration_ms =
|
||||||
|
i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX);
|
||||||
|
let (open_threads, open_reminders) = fetch_manager_post_turn_counts(socket).await;
|
||||||
|
let row = serve_common::build_row(
|
||||||
|
started_at,
|
||||||
|
ended_at,
|
||||||
|
duration_ms,
|
||||||
|
model_at_start,
|
||||||
|
from.clone(),
|
||||||
|
&outcome,
|
||||||
|
bus,
|
||||||
|
open_threads,
|
||||||
|
open_reminders,
|
||||||
|
);
|
||||||
|
stats.record(&row);
|
||||||
|
}
|
||||||
|
let pending = inbox_unread(socket).await;
|
||||||
|
if pending > 0 {
|
||||||
|
tracing::info!(%pending, "pending messages after turn; fetching next");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Best-effort: tell the broker every message popped during the turn
|
/// Best-effort: tell the broker every message popped during the turn
|
||||||
/// is now handled. Mirror of `hive-ag3nt::ack_turn` on the manager
|
/// is now handled. Mirror of `hive-ag3nt::ack_turn` on the manager
|
||||||
|
|
|
||||||
|
|
@ -422,8 +422,7 @@ fn format_state_change_notification(
|
||||||
&& subject
|
&& subject
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.and_then(|s| s["requested_reviewers"].as_array())
|
.and_then(|s| s["requested_reviewers"].as_array())
|
||||||
.map(|arr| arr.iter().any(|r| r["login"].as_str() == Some(own_login)))
|
.is_some_and(|arr| arr.iter().any(|r| r["login"].as_str() == Some(own_login)));
|
||||||
.unwrap_or(false);
|
|
||||||
let kind = if is_review_request {
|
let kind = if is_review_request {
|
||||||
format!("review requested{num}{repo}")
|
format!("review requested{num}{repo}")
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@ use crate::lifecycle::{self, MANAGER_NAME};
|
||||||
///
|
///
|
||||||
/// In all cases an `ApprovalResolved` helper event lands in the manager's
|
/// In all cases an `ApprovalResolved` helper event lands in the manager's
|
||||||
/// inbox when the work resolves.
|
/// inbox when the work resolves.
|
||||||
#[allow(clippy::too_many_lines)] // approval dispatch covers several independent approval kinds
|
|
||||||
pub async fn approve(coord: Arc<Coordinator>, id: i64) -> Result<()> {
|
pub async fn approve(coord: Arc<Coordinator>, id: i64) -> Result<()> {
|
||||||
let approval = coord.approvals.mark_approved(id)?;
|
let approval = coord.approvals.mark_approved(id)?;
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
|
|
@ -33,142 +32,164 @@ pub async fn approve(coord: Arc<Coordinator>, id: i64) -> Result<()> {
|
||||||
%approval.commit_ref,
|
%approval.commit_ref,
|
||||||
"approval: running action",
|
"approval: running action",
|
||||||
);
|
);
|
||||||
|
|
||||||
let agent_dir = coord.ensure_runtime(&approval.agent)?;
|
let agent_dir = coord.ensure_runtime(&approval.agent)?;
|
||||||
let proposed_dir = Coordinator::agent_proposed_dir(&approval.agent);
|
let proposed_dir = Coordinator::agent_proposed_dir(&approval.agent);
|
||||||
let applied_dir = Coordinator::agent_applied_dir(&approval.agent);
|
let applied_dir = Coordinator::agent_applied_dir(&approval.agent);
|
||||||
let claude_dir = Coordinator::agent_claude_dir(&approval.agent);
|
let claude_dir = Coordinator::agent_claude_dir(&approval.agent);
|
||||||
let notes_dir = Coordinator::agent_notes_dir(&approval.agent);
|
let notes_dir = Coordinator::agent_notes_dir(&approval.agent);
|
||||||
|
|
||||||
match approval.kind {
|
match approval.kind {
|
||||||
ApprovalKind::ApplyCommit => {
|
ApprovalKind::ApplyCommit => {
|
||||||
let (result, terminal_tag, is_first_spawn) = run_apply_commit(
|
approve_apply_commit(coord, approval, agent_dir, applied_dir, claude_dir, notes_dir).await
|
||||||
&coord,
|
|
||||||
&approval,
|
|
||||||
&agent_dir,
|
|
||||||
&applied_dir,
|
|
||||||
&claude_dir,
|
|
||||||
¬es_dir,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
// Mirror the applied repo's new tag/branch state (approved/
|
|
||||||
// building/deployed-or-failed + main) to the forge.
|
|
||||||
if let Err(e) = crate::forge::push_config(&approval.agent).await {
|
|
||||||
tracing::warn!(agent = %approval.agent, error = ?e, "forge: push_config after apply failed");
|
|
||||||
}
|
|
||||||
if is_first_spawn && result.is_ok() {
|
|
||||||
// First-spawn bookkeeping: create the per-agent forge user,
|
|
||||||
// mirror the applied repo into agent-configs/<n>, and grant
|
|
||||||
// read access to core/meta.
|
|
||||||
if let Err(e) = crate::forge::ensure_user_for(&approval.agent).await {
|
|
||||||
tracing::warn!(agent = %approval.agent, error = ?e, "forge: ensure_user after first spawn failed");
|
|
||||||
}
|
|
||||||
if let Err(e) = crate::forge::ensure_config_repo(&approval.agent).await {
|
|
||||||
tracing::warn!(agent = %approval.agent, error = ?e, "forge: ensure_config_repo after first spawn failed");
|
|
||||||
}
|
|
||||||
if let Some(core_token) = crate::forge::core_token()
|
|
||||||
&& let Err(e) = crate::forge::meta_read_access(&approval.agent, &core_token).await {
|
|
||||||
tracing::warn!(agent = %approval.agent, error = ?e, "forge: meta_read_access after first spawn failed");
|
|
||||||
}
|
|
||||||
if let Err(e) = crate::forge::ensure_meta_remote(&approval.agent).await {
|
|
||||||
tracing::warn!(agent = %approval.agent, error = ?e, "forge: ensure_meta_remote after first spawn failed");
|
|
||||||
}
|
|
||||||
// New container row appeared — rescan so the dashboard
|
|
||||||
// reflects the post-spawn state without a manual refetch.
|
|
||||||
coord.rescan_containers_and_emit().await;
|
|
||||||
crate::dashboard::emit_tombstones_snapshot(&coord).await;
|
|
||||||
}
|
|
||||||
finish_approval(&coord, &approval, result, terminal_tag, is_first_spawn)
|
|
||||||
}
|
}
|
||||||
ApprovalKind::InitConfig => {
|
ApprovalKind::InitConfig => {
|
||||||
// Seed the proposed config repo. Runs synchronously — it's just
|
approve_init_config(coord, approval, proposed_dir, claude_dir, notes_dir).await
|
||||||
// a few git operations with no nixos-container involvement.
|
|
||||||
let result: Result<()> = async {
|
|
||||||
lifecycle::setup_proposed(&proposed_dir, &approval.agent).await?;
|
|
||||||
lifecycle::ensure_claude_dir(&claude_dir)?;
|
|
||||||
lifecycle::ensure_state_dir(¬es_dir)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
.await;
|
|
||||||
// Wire the meta remote now that the proposed repo exists.
|
|
||||||
if result.is_ok()
|
|
||||||
&& let Err(e) = crate::forge::ensure_meta_remote(&approval.agent).await {
|
|
||||||
tracing::warn!(agent = %approval.agent, error = ?e, "forge: ensure_meta_remote after init_config failed");
|
|
||||||
}
|
|
||||||
finish_approval(&coord, &approval, result, None, false)
|
|
||||||
}
|
|
||||||
ApprovalKind::UpdateMetaInputs => {
|
|
||||||
// Decode the inputs from the commit_ref field (stored as JSON
|
|
||||||
// by submit_apply_commit's counterpart in manager_server.rs).
|
|
||||||
let inputs: Vec<String> =
|
|
||||||
serde_json::from_str(&approval.commit_ref).unwrap_or_default();
|
|
||||||
let result = crate::meta::lock_update(&inputs).await;
|
|
||||||
finish_approval(&coord, &approval, result, None, false)
|
|
||||||
}
|
}
|
||||||
|
ApprovalKind::UpdateMetaInputs => approve_update_meta_inputs(coord, approval).await,
|
||||||
ApprovalKind::Spawn => {
|
ApprovalKind::Spawn => {
|
||||||
// Run the spawn in the background so the approve POST returns
|
approve_spawn(&coord, &approval, agent_dir, proposed_dir, applied_dir, claude_dir, notes_dir);
|
||||||
// immediately. The dashboard reads `transient` to render a spinner.
|
|
||||||
// Guard is created synchronously here (so the spinner appears
|
|
||||||
// the moment the operator clicks approve) and moved into the
|
|
||||||
// task; it auto-clears even if the runtime drops the task.
|
|
||||||
let coord_bg = coord.clone();
|
|
||||||
let approval_bg = approval.clone();
|
|
||||||
let guard = coord_bg.transient_guard(&approval_bg.agent, TransientKind::Spawning);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let guard = guard;
|
|
||||||
let agent_bg = approval_bg.agent.clone();
|
|
||||||
let result = lifecycle::spawn(
|
|
||||||
&approval_bg.agent,
|
|
||||||
&coord_bg.hyperhive_flake,
|
|
||||||
&agent_dir,
|
|
||||||
&proposed_dir,
|
|
||||||
&applied_dir,
|
|
||||||
&claude_dir,
|
|
||||||
¬es_dir,
|
|
||||||
coord_bg.dashboard_port,
|
|
||||||
&coord_bg.operator_pronouns,
|
|
||||||
&coord_bg.context_window_tokens,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
drop(guard);
|
|
||||||
if result.is_ok() {
|
|
||||||
if let Err(e) = crate::forge::ensure_user_for(&agent_bg).await {
|
|
||||||
tracing::warn!(agent = %agent_bg, error = ?e, "forge: ensure_user after spawn failed");
|
|
||||||
}
|
|
||||||
// Create the agent-configs mirror repo and seed it
|
|
||||||
// with the freshly-initialised applied repo (main +
|
|
||||||
// deployed/0).
|
|
||||||
if let Err(e) = crate::forge::ensure_config_repo(&agent_bg).await {
|
|
||||||
tracing::warn!(agent = %agent_bg, error = ?e, "forge: ensure_config_repo after spawn failed");
|
|
||||||
}
|
|
||||||
if let Err(e) = crate::forge::push_config(&agent_bg).await {
|
|
||||||
tracing::warn!(agent = %agent_bg, error = ?e, "forge: push_config after spawn failed");
|
|
||||||
}
|
|
||||||
if let Some(core_token) = crate::forge::core_token()
|
|
||||||
&& let Err(e) = crate::forge::meta_read_access(&agent_bg, &core_token).await {
|
|
||||||
tracing::warn!(agent = %agent_bg, error = ?e, "forge: meta_read_access after spawn failed");
|
|
||||||
}
|
|
||||||
if let Err(e) = crate::forge::ensure_meta_remote(&agent_bg).await {
|
|
||||||
tracing::warn!(agent = %agent_bg, error = ?e, "forge: ensure_meta_remote after spawn failed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Err(e) = finish_approval(&coord_bg, &approval_bg, result, None, false) {
|
|
||||||
tracing::warn!(agent = %agent_bg, error = ?e, "spawn approval failed");
|
|
||||||
}
|
|
||||||
// New container row appeared (or didn't, on failure
|
|
||||||
// before nixos-container create completed) — rescan so
|
|
||||||
// dashboards reflect the post-spawn state. Spawn can
|
|
||||||
// also consume a tombstone of the same name; emit the
|
|
||||||
// fresh list so the operator's dormant-state pane
|
|
||||||
// updates without a refetch.
|
|
||||||
coord_bg.rescan_containers_and_emit().await;
|
|
||||||
crate::dashboard::emit_tombstones_snapshot(&coord_bg).await;
|
|
||||||
});
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn approve_apply_commit(
|
||||||
|
coord: Arc<Coordinator>,
|
||||||
|
approval: hive_sh4re::Approval,
|
||||||
|
agent_dir: std::path::PathBuf,
|
||||||
|
applied_dir: std::path::PathBuf,
|
||||||
|
claude_dir: std::path::PathBuf,
|
||||||
|
notes_dir: std::path::PathBuf,
|
||||||
|
) -> Result<()> {
|
||||||
|
let (result, terminal_tag, is_first_spawn) = run_apply_commit(
|
||||||
|
&coord,
|
||||||
|
&approval,
|
||||||
|
&agent_dir,
|
||||||
|
&applied_dir,
|
||||||
|
&claude_dir,
|
||||||
|
¬es_dir,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
if let Err(e) = crate::forge::push_config(&approval.agent).await {
|
||||||
|
tracing::warn!(agent = %approval.agent, error = ?e, "forge: push_config after apply failed");
|
||||||
|
}
|
||||||
|
if is_first_spawn && result.is_ok() {
|
||||||
|
forge_after_first_spawn(&coord, &approval.agent).await;
|
||||||
|
}
|
||||||
|
finish_approval(&coord, &approval, result, terminal_tag, is_first_spawn)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Forge bookkeeping run once after the very first container spawn:
|
||||||
|
/// create the per-agent forge user, mirror the applied repo, and grant
|
||||||
|
/// read access to core/meta. Also rescans containers so the dashboard
|
||||||
|
/// reflects the post-spawn state.
|
||||||
|
async fn forge_after_first_spawn(coord: &Arc<Coordinator>, agent: &str) {
|
||||||
|
if let Err(e) = crate::forge::ensure_user_for(agent).await {
|
||||||
|
tracing::warn!(%agent, error = ?e, "forge: ensure_user after first spawn failed");
|
||||||
|
}
|
||||||
|
if let Err(e) = crate::forge::ensure_config_repo(agent).await {
|
||||||
|
tracing::warn!(%agent, error = ?e, "forge: ensure_config_repo after first spawn failed");
|
||||||
|
}
|
||||||
|
if let Some(core_token) = crate::forge::core_token()
|
||||||
|
&& let Err(e) = crate::forge::meta_read_access(agent, &core_token).await {
|
||||||
|
tracing::warn!(%agent, error = ?e, "forge: meta_read_access after first spawn failed");
|
||||||
|
}
|
||||||
|
if let Err(e) = crate::forge::ensure_meta_remote(agent).await {
|
||||||
|
tracing::warn!(%agent, error = ?e, "forge: ensure_meta_remote after first spawn failed");
|
||||||
|
}
|
||||||
|
coord.rescan_containers_and_emit().await;
|
||||||
|
crate::dashboard::emit_tombstones_snapshot(coord).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn approve_init_config(
|
||||||
|
coord: Arc<Coordinator>,
|
||||||
|
approval: hive_sh4re::Approval,
|
||||||
|
proposed_dir: std::path::PathBuf,
|
||||||
|
claude_dir: std::path::PathBuf,
|
||||||
|
notes_dir: std::path::PathBuf,
|
||||||
|
) -> Result<()> {
|
||||||
|
// Seed the proposed config repo — just git operations, no nixos-container.
|
||||||
|
let result: Result<()> = async {
|
||||||
|
lifecycle::setup_proposed(&proposed_dir, &approval.agent).await?;
|
||||||
|
lifecycle::ensure_claude_dir(&claude_dir)?;
|
||||||
|
lifecycle::ensure_state_dir(¬es_dir)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
.await;
|
||||||
|
if result.is_ok()
|
||||||
|
&& let Err(e) = crate::forge::ensure_meta_remote(&approval.agent).await {
|
||||||
|
tracing::warn!(agent = %approval.agent, error = ?e, "forge: ensure_meta_remote after init_config failed");
|
||||||
|
}
|
||||||
|
finish_approval(&coord, &approval, result, None, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn approve_update_meta_inputs(
|
||||||
|
coord: Arc<Coordinator>,
|
||||||
|
approval: hive_sh4re::Approval,
|
||||||
|
) -> Result<()> {
|
||||||
|
// Inputs stored as JSON in commit_ref by the manager's submit path.
|
||||||
|
let inputs: Vec<String> = serde_json::from_str(&approval.commit_ref).unwrap_or_default();
|
||||||
|
let result = crate::meta::lock_update(&inputs).await;
|
||||||
|
finish_approval(&coord, &approval, result, None, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn approve_spawn(
|
||||||
|
coord: &Arc<Coordinator>,
|
||||||
|
approval: &hive_sh4re::Approval,
|
||||||
|
agent_dir: std::path::PathBuf,
|
||||||
|
proposed_dir: std::path::PathBuf,
|
||||||
|
applied_dir: std::path::PathBuf,
|
||||||
|
claude_dir: std::path::PathBuf,
|
||||||
|
notes_dir: std::path::PathBuf,
|
||||||
|
) {
|
||||||
|
// Run spawn in the background so approve POST returns immediately.
|
||||||
|
// Guard created synchronously so the spinner appears the moment
|
||||||
|
// the operator clicks approve; auto-clears when the task drops it.
|
||||||
|
let coord_bg = Arc::clone(coord);
|
||||||
|
let approval_bg = approval.clone();
|
||||||
|
let guard = coord_bg.transient_guard(&approval_bg.agent, TransientKind::Spawning);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let guard = guard;
|
||||||
|
let agent_bg = approval_bg.agent.clone();
|
||||||
|
let result = lifecycle::spawn(
|
||||||
|
&approval_bg.agent,
|
||||||
|
&coord_bg.hyperhive_flake,
|
||||||
|
&agent_dir,
|
||||||
|
&proposed_dir,
|
||||||
|
&applied_dir,
|
||||||
|
&claude_dir,
|
||||||
|
¬es_dir,
|
||||||
|
coord_bg.dashboard_port,
|
||||||
|
&coord_bg.operator_pronouns,
|
||||||
|
&coord_bg.context_window_tokens,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
drop(guard);
|
||||||
|
if result.is_ok() {
|
||||||
|
if let Err(e) = crate::forge::ensure_user_for(&agent_bg).await {
|
||||||
|
tracing::warn!(agent = %agent_bg, error = ?e, "forge: ensure_user after spawn failed");
|
||||||
|
}
|
||||||
|
if let Err(e) = crate::forge::ensure_config_repo(&agent_bg).await {
|
||||||
|
tracing::warn!(agent = %agent_bg, error = ?e, "forge: ensure_config_repo after spawn failed");
|
||||||
|
}
|
||||||
|
if let Err(e) = crate::forge::push_config(&agent_bg).await {
|
||||||
|
tracing::warn!(agent = %agent_bg, error = ?e, "forge: push_config after spawn failed");
|
||||||
|
}
|
||||||
|
if let Some(core_token) = crate::forge::core_token()
|
||||||
|
&& let Err(e) = crate::forge::meta_read_access(&agent_bg, &core_token).await {
|
||||||
|
tracing::warn!(agent = %agent_bg, error = ?e, "forge: meta_read_access after spawn failed");
|
||||||
|
}
|
||||||
|
if let Err(e) = crate::forge::ensure_meta_remote(&agent_bg).await {
|
||||||
|
tracing::warn!(agent = %agent_bg, error = ?e, "forge: ensure_meta_remote after spawn failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Err(e) = finish_approval(&coord_bg, &approval_bg, result, None, false) {
|
||||||
|
tracing::warn!(agent = %agent_bg, error = ?e, "spawn approval failed");
|
||||||
|
}
|
||||||
|
coord_bg.rescan_containers_and_emit().await;
|
||||||
|
crate::dashboard::emit_tombstones_snapshot(&coord_bg).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
fn finish_approval(
|
fn finish_approval(
|
||||||
coord: &Coordinator,
|
coord: &Coordinator,
|
||||||
approval: &hive_sh4re::Approval,
|
approval: &hive_sh4re::Approval,
|
||||||
|
|
|
||||||
|
|
@ -100,7 +100,6 @@ enum Cmd {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
#[allow(clippy::too_many_lines)] // top-level dispatch; hard to split without losing readability
|
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
tracing_subscriber::fmt()
|
tracing_subscriber::fmt()
|
||||||
.with_env_filter(
|
.with_env_filter(
|
||||||
|
|
@ -117,121 +116,7 @@ async fn main() -> Result<()> {
|
||||||
dashboard_port,
|
dashboard_port,
|
||||||
operator_pronouns,
|
operator_pronouns,
|
||||||
context_window_tokens,
|
context_window_tokens,
|
||||||
} => {
|
} => cmd_serve(hyperhive_flake, db, dashboard_port, operator_pronouns, context_window_tokens, &cli.socket).await,
|
||||||
let cwt: std::collections::HashMap<String, u64> =
|
|
||||||
serde_json::from_str(&context_window_tokens)
|
|
||||||
.context("--context-window-tokens: invalid JSON")?;
|
|
||||||
let coord = Arc::new(Coordinator::open(
|
|
||||||
&db,
|
|
||||||
hyperhive_flake,
|
|
||||||
dashboard_port,
|
|
||||||
operator_pronouns,
|
|
||||||
cwt,
|
|
||||||
)?);
|
|
||||||
manager_server::start(coord.clone())?;
|
|
||||||
// Idempotent pre-flight: rewrite pre-meta-layout applied
|
|
||||||
// repos, ensure proposed repos carry the `applied`
|
|
||||||
// remote, bootstrap the meta repo, repoint containers at
|
|
||||||
// `meta#<name>` (one-shot, guarded by a marker file).
|
|
||||||
// Runs before manager auto-spawn so the new manager is
|
|
||||||
// built against meta from the first attempt.
|
|
||||||
if let Err(e) = migrate::run(&coord).await {
|
|
||||||
tracing::warn!(error = ?e, "startup migration failed");
|
|
||||||
}
|
|
||||||
// Auto-create the manager container if it isn't there yet. Block
|
|
||||||
// on this — without hm1nd the system has no manager harness.
|
|
||||||
// Failures are logged but allowed: a broken auto-spawn shouldn't
|
|
||||||
// make the dashboard unreachable for debugging.
|
|
||||||
if let Err(e) = auto_update::ensure_manager(&coord).await {
|
|
||||||
tracing::warn!(error = ?e, "auto-spawn manager failed");
|
|
||||||
}
|
|
||||||
// Auto-update in the background — don't block service start.
|
|
||||||
// Sub-agent rebuilds can take tens of seconds; we want the admin
|
|
||||||
// socket up immediately.
|
|
||||||
let update_coord = coord.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = auto_update::run(update_coord).await {
|
|
||||||
tracing::warn!(error = ?e, "auto-update task failed");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
// Forge user sweep: ensure every existing container has a
|
|
||||||
// forgejo user + access token. No-op when the hive-forge
|
|
||||||
// container isn't running. Backgrounded — touches the
|
|
||||||
// forge state dir via `nixos-container run` which is slow.
|
|
||||||
tokio::spawn(async move {
|
|
||||||
forge::ensure_all().await;
|
|
||||||
});
|
|
||||||
// Periodic broker vacuum: drop fully-acked messages older
|
|
||||||
// than 30 days. Delivered-but-unacked rows (recoverable via
|
|
||||||
// requeue_inflight) and undelivered rows are always kept.
|
|
||||||
// Runs hourly; first sweep happens immediately.
|
|
||||||
let vacuum_coord = coord.clone();
|
|
||||||
let mut vacuum_shutdown = coord.shutdown_rx();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let interval = std::time::Duration::from_secs(3600);
|
|
||||||
let keep_secs: i64 = 30 * 24 * 3600;
|
|
||||||
loop {
|
|
||||||
match vacuum_coord.broker.vacuum_delivered(keep_secs) {
|
|
||||||
Ok(0) => {}
|
|
||||||
Ok(n) => tracing::info!(removed = n, "broker vacuum"),
|
|
||||||
Err(e) => tracing::warn!(error = ?e, "broker vacuum failed"),
|
|
||||||
}
|
|
||||||
tokio::select! {
|
|
||||||
() = tokio::time::sleep(interval) => {}
|
|
||||||
_ = vacuum_shutdown.changed() => {
|
|
||||||
tracing::info!("broker vacuum: shutdown signal received");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
// Per-agent events.sqlite vacuum: host-side so the harness
|
|
||||||
// doesn't need any retention wiring of its own.
|
|
||||||
events_vacuum::spawn(&coord);
|
|
||||||
// Per-agent turn-stats.sqlite vacuum: same pattern, 90-day
|
|
||||||
// retention so trend analysis has enough history.
|
|
||||||
stats_vacuum::spawn(&coord);
|
|
||||||
// Container crash watcher: emits HelperEvent::ContainerCrash
|
|
||||||
// when a previously-running container goes away without an
|
|
||||||
// operator-initiated transient state.
|
|
||||||
crash_watch::spawn(coord.clone());
|
|
||||||
// Reminder scheduler: drains due reminders + handles
|
|
||||||
// file_path payload persistence. See reminder_scheduler.rs.
|
|
||||||
reminder_scheduler::spawn(coord.clone());
|
|
||||||
// Forward every broker event onto the unified dashboard
|
|
||||||
// channel with a freshly-stamped seq, so the dashboard SSE
|
|
||||||
// sees broker messages + future mutation events on one
|
|
||||||
// stream with one monotonic seq. The broker's intra-process
|
|
||||||
// channel (used by `recv_blocking_batch`) stays untouched.
|
|
||||||
spawn_broker_to_dashboard_forwarder(coord.clone());
|
|
||||||
let dash_coord = coord.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {
|
|
||||||
tracing::error!(error = ?e, "dashboard failed");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
// Run the admin socket until a signal arrives; then signal
|
|
||||||
// all background tasks so they exit cleanly before the
|
|
||||||
// process terminates.
|
|
||||||
let coord_sig = coord.clone();
|
|
||||||
tokio::select! {
|
|
||||||
res = server::serve(&cli.socket, coord) => { res? }
|
|
||||||
_ = tokio::signal::ctrl_c() => {
|
|
||||||
tracing::info!("SIGINT received — requesting shutdown");
|
|
||||||
coord_sig.request_shutdown();
|
|
||||||
}
|
|
||||||
() = async {
|
|
||||||
let mut sig = tokio::signal::unix::signal(
|
|
||||||
tokio::signal::unix::SignalKind::terminate()
|
|
||||||
).expect("failed to install SIGTERM handler");
|
|
||||||
sig.recv().await;
|
|
||||||
} => {
|
|
||||||
tracing::info!("SIGTERM received — requesting shutdown");
|
|
||||||
coord_sig.request_shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Cmd::Spawn { name } => {
|
Cmd::Spawn { name } => {
|
||||||
render(client::request(&cli.socket, HostRequest::Spawn { name }).await?)
|
render(client::request(&cli.socket, HostRequest::Spawn { name }).await?)
|
||||||
}
|
}
|
||||||
|
|
@ -256,6 +141,132 @@ async fn main() -> Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Start the coordinator daemon: open the broker, run migrations, spawn
|
||||||
|
/// background tasks (auto-update, vacuums, crash-watcher, reminder-scheduler,
|
||||||
|
/// dashboard), then serve the admin socket until a signal arrives.
|
||||||
|
async fn cmd_serve(
|
||||||
|
hyperhive_flake: String,
|
||||||
|
db: std::path::PathBuf,
|
||||||
|
dashboard_port: u16,
|
||||||
|
operator_pronouns: String,
|
||||||
|
context_window_tokens: String,
|
||||||
|
socket: &std::path::Path,
|
||||||
|
) -> Result<()> {
|
||||||
|
let cwt: std::collections::HashMap<String, u64> =
|
||||||
|
serde_json::from_str(&context_window_tokens)
|
||||||
|
.context("--context-window-tokens: invalid JSON")?;
|
||||||
|
let coord = Arc::new(Coordinator::open(
|
||||||
|
&db,
|
||||||
|
hyperhive_flake,
|
||||||
|
dashboard_port,
|
||||||
|
operator_pronouns,
|
||||||
|
cwt,
|
||||||
|
)?);
|
||||||
|
manager_server::start(coord.clone())?;
|
||||||
|
// Idempotent pre-flight: rewrite pre-meta-layout applied
|
||||||
|
// repos, ensure proposed repos carry the `applied`
|
||||||
|
// remote, bootstrap the meta repo, repoint containers at
|
||||||
|
// `meta#<name>` (one-shot, guarded by a marker file).
|
||||||
|
// Runs before manager auto-spawn so the new manager is
|
||||||
|
// built against meta from the first attempt.
|
||||||
|
if let Err(e) = migrate::run(&coord).await {
|
||||||
|
tracing::warn!(error = ?e, "startup migration failed");
|
||||||
|
}
|
||||||
|
// Auto-create the manager container if it isn't there yet. Block
|
||||||
|
// on this — without hm1nd the system has no manager harness.
|
||||||
|
// Failures are logged but allowed: a broken auto-spawn shouldn't
|
||||||
|
// make the dashboard unreachable for debugging.
|
||||||
|
if let Err(e) = auto_update::ensure_manager(&coord).await {
|
||||||
|
tracing::warn!(error = ?e, "auto-spawn manager failed");
|
||||||
|
}
|
||||||
|
// Auto-update in the background — don't block service start.
|
||||||
|
// Sub-agent rebuilds can take tens of seconds; we want the admin
|
||||||
|
// socket up immediately.
|
||||||
|
let update_coord = coord.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = auto_update::run(update_coord).await {
|
||||||
|
tracing::warn!(error = ?e, "auto-update task failed");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Forge user sweep: ensure every existing container has a
|
||||||
|
// forgejo user + access token. No-op when the hive-forge
|
||||||
|
// container isn't running. Backgrounded — touches the
|
||||||
|
// forge state dir via `nixos-container run` which is slow.
|
||||||
|
tokio::spawn(async move {
|
||||||
|
forge::ensure_all().await;
|
||||||
|
});
|
||||||
|
// Periodic broker vacuum: drop fully-acked messages older
|
||||||
|
// than 30 days. Delivered-but-unacked rows (recoverable via
|
||||||
|
// requeue_inflight) and undelivered rows are always kept.
|
||||||
|
// Runs hourly; first sweep happens immediately.
|
||||||
|
let vacuum_coord = coord.clone();
|
||||||
|
let mut vacuum_shutdown = coord.shutdown_rx();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let interval = std::time::Duration::from_secs(3600);
|
||||||
|
let keep_secs: i64 = 30 * 24 * 3600;
|
||||||
|
loop {
|
||||||
|
match vacuum_coord.broker.vacuum_delivered(keep_secs) {
|
||||||
|
Ok(0) => {}
|
||||||
|
Ok(n) => tracing::info!(removed = n, "broker vacuum"),
|
||||||
|
Err(e) => tracing::warn!(error = ?e, "broker vacuum failed"),
|
||||||
|
}
|
||||||
|
tokio::select! {
|
||||||
|
() = tokio::time::sleep(interval) => {}
|
||||||
|
_ = vacuum_shutdown.changed() => {
|
||||||
|
tracing::info!("broker vacuum: shutdown signal received");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Per-agent events.sqlite vacuum: host-side so the harness
|
||||||
|
// doesn't need any retention wiring of its own.
|
||||||
|
events_vacuum::spawn(&coord);
|
||||||
|
// Per-agent turn-stats.sqlite vacuum: same pattern, 90-day
|
||||||
|
// retention so trend analysis has enough history.
|
||||||
|
stats_vacuum::spawn(&coord);
|
||||||
|
// Container crash watcher: emits HelperEvent::ContainerCrash
|
||||||
|
// when a previously-running container goes away without an
|
||||||
|
// operator-initiated transient state.
|
||||||
|
crash_watch::spawn(coord.clone());
|
||||||
|
// Reminder scheduler: drains due reminders + handles
|
||||||
|
// file_path payload persistence. See reminder_scheduler.rs.
|
||||||
|
reminder_scheduler::spawn(coord.clone());
|
||||||
|
// Forward every broker event onto the unified dashboard
|
||||||
|
// channel with a freshly-stamped seq, so the dashboard SSE
|
||||||
|
// sees broker messages + future mutation events on one
|
||||||
|
// stream with one monotonic seq. The broker's intra-process
|
||||||
|
// channel (used by `recv_blocking_batch`) stays untouched.
|
||||||
|
spawn_broker_to_dashboard_forwarder(coord.clone());
|
||||||
|
let dash_coord = coord.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {
|
||||||
|
tracing::error!(error = ?e, "dashboard failed");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Run the admin socket until a signal arrives; then signal
|
||||||
|
// all background tasks so they exit cleanly before the
|
||||||
|
// process terminates.
|
||||||
|
let coord_sig = coord.clone();
|
||||||
|
tokio::select! {
|
||||||
|
res = server::serve(socket, coord) => { res? }
|
||||||
|
_ = tokio::signal::ctrl_c() => {
|
||||||
|
tracing::info!("SIGINT received — requesting shutdown");
|
||||||
|
coord_sig.request_shutdown();
|
||||||
|
}
|
||||||
|
() = async {
|
||||||
|
let mut sig = tokio::signal::unix::signal(
|
||||||
|
tokio::signal::unix::SignalKind::terminate()
|
||||||
|
).expect("failed to install SIGTERM handler");
|
||||||
|
sig.recv().await;
|
||||||
|
} => {
|
||||||
|
tracing::info!("SIGTERM received — requesting shutdown");
|
||||||
|
coord_sig.request_shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Re-emit every broker `MessageEvent` onto the dashboard channel as
|
/// Re-emit every broker `MessageEvent` onto the dashboard channel as
|
||||||
/// a `DashboardEvent::Sent` / `Delivered` with a freshly-stamped seq.
|
/// a `DashboardEvent::Sent` / `Delivered` with a freshly-stamped seq.
|
||||||
/// Background task; runs for the life of the process. On a lagged
|
/// Background task; runs for the life of the process. On a lagged
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue