From 748536203b9696f97b822e2ca17f67e0c70ce254 Mon Sep 17 00:00:00 2001 From: damocles Date: Fri, 22 May 2026 19:24:44 +0200 Subject: [PATCH] refactor: split long functions per review feedback; remove all #[allow] attributes --- hive-ag3nt/src/bin/hive-ag3nt.rs | 185 ++++++++++------------ hive-ag3nt/src/bin/hive-m1nd.rs | 167 ++++++++++---------- hive-ag3nt/src/forge_notify.rs | 3 +- hive-c0re/src/actions.rs | 263 +++++++++++++++++-------------- hive-c0re/src/main.rs | 243 ++++++++++++++-------------- 5 files changed, 429 insertions(+), 432 deletions(-) diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index 4510da1..44c0ae6 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -149,11 +149,11 @@ async fn main() -> Result<()> { } } -#[allow(clippy::too_many_arguments, clippy::similar_names, clippy::too_many_lines)] +#[allow(clippy::too_many_arguments)] async fn serve( socket: &Path, interval: Duration, - state: Arc>, + _login_state: Arc>, bus: Bus, stats: Option, files: &turn::TurnFiles, @@ -161,25 +161,12 @@ async fn serve( label: &str, ) -> Result<()> { tracing::info!(socket = %socket.display(), "hive-ag3nt serve"); - let _ = state; // reserved for future state transitions (turn-loop -> needs-login) - // Boot-time recovery: ask the broker to resurface anything we - // popped in a previous harness session but never acked - // (crashed mid-turn / OOM / container restart). The broker - // resets `delivered_at = NULL` on those rows and remembers - // their ids so the next `Recv` tags them `redelivered: true`; - // we then prepend a "may already be handled" hint to the wake - // prompt. Single shot before entering the serve loop; idempotent - // when there's nothing inflight. requeue_inflight(socket).await; loop { let recv: Result = - // Explicit long-poll: the new agent_server semantics treat - // `None` as "peek, don't wait", which would tight-loop on - // sleep(interval). The harness wants to park until a - // message arrives, so opt into the full 180s cap. - // `max: None` (= 1) — the serve loop drives one turn per - // wake; claude itself calls recv(max: N) in-turn to drain - // a burst when the wake prompt mentions pending. + // Explicit long-poll: park until a message arrives (180s cap). + // `max: None` (= 1) — one turn per wake; claude calls + // recv(max: N) in-turn to drain bursts. client::request( socket, &AgentRequest::Recv { @@ -191,93 +178,7 @@ async fn serve( match recv { Ok(AgentResponse::Messages { messages }) if !messages.is_empty() => { let first = messages.into_iter().next().expect("checked non-empty"); - let from = first.from; - let body = first.body; - let redelivered = first.redelivered; - tracing::info!(%from, %body, %redelivered, "inbox"); - let unread = inbox_unread(socket).await; - bus.emit(LiveEvent::TurnStart { - from: from.clone(), - body: body.clone(), - unread, - }); - bus.set_state(TurnState::Thinking); - let started_at = serve_common::now_unix(); - let started_instant = std::time::Instant::now(); - let model_at_start = bus.model(); - let prompt = serve_common::format_wake_prompt(&from, &body, unread, redelivered); - let outcome = { - let _guard = turn_lock.lock().await; - turn::drive_turn(&prompt, files, &bus).await - }; - turn::emit_turn_end(&bus, &outcome); - bus.set_state(TurnState::Idle); - // Ack only on a clean turn-end. `Failed` leaves every - // message popped during the turn in the unacked list; - // next harness boot's `RequeueInflight` will reset - // `delivered_at = NULL` and tag them `redelivered`. - // `PromptTooLong` is absorbed inside `drive_turn` via - // compaction so it shouldn't reach here, but if it - // does we also skip the ack (safer to redeliver than - // to lose the message). - if matches!(outcome, turn::TurnOutcome::Ok | turn::TurnOutcome::Compacted) { - ack_turn(socket).await; - } - // Rate-limited: park until the quota resets, then requeue - // the unacked message so it resurfaces in the same session. - if matches!(outcome, turn::TurnOutcome::RateLimited) { - let secs = turn::rate_limit_sleep_secs(); - bus.emit_status("rate_limited"); - bus.emit(LiveEvent::Note { - text: format!( - "API rate-limited — sleeping {secs}s before retry" - ), - }); - tracing::warn!(sleep_secs = secs, "rate-limited; parking"); - tokio::time::sleep(Duration::from_secs(secs)).await; - requeue_inflight(socket).await; - bus.emit_status("online"); - } - // Failures are unhandled by definition — PromptTooLong is - // absorbed inside drive_turn via compaction, so anything - // that reaches Failed here is a real crash. Notify the - // manager so it can investigate / restart / page the - // operator; best-effort, swallow the send error. - if let turn::TurnOutcome::Failed(e) = &outcome { - notify_manager_of_failure(socket, label, e).await; - } - if let Some(s) = &stats { - let ended_at = serve_common::now_unix(); - let duration_ms = - i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX); - let (open_threads, open_reminders) = fetch_agent_post_turn_counts(socket).await; - let row = serve_common::build_row( - started_at, - ended_at, - duration_ms, - model_at_start, - from.clone(), - &outcome, - &bus, - open_threads, - open_reminders, - ); - s.record(&row); - } - - // After turn completes, log whether messages arrived during - // the turn — the outer loop will iterate back to recv() on - // its own (the Empty-arm sleep only fires when recv - // actually returned Empty), so no explicit continue needed. - let pending = inbox_unread(socket).await; - if pending > 0 { - tracing::info!(%pending, "pending messages after turn; fetching next"); - } - // `request_next_turn` MCP tool: agent wrote a sentinel - // requesting an immediate self-continuation turn. Clear - // the file and inject a synthetic wake so the outer loop - // fires a bare turn even if the inbox is empty. - check_and_inject_continue(socket, label).await; + handle_agent_turn(socket, &bus, stats.as_ref(), files, &turn_lock, label, first).await; } Ok(AgentResponse::Messages { .. }) => { // Idle: empty list = nothing pending. Brief sleep @@ -307,6 +208,80 @@ async fn serve( } } +/// Drive one turn for a received agent-inbox message. +async fn handle_agent_turn( + socket: &Path, + bus: &Bus, + stats: Option<&TurnStats>, + files: &turn::TurnFiles, + turn_lock: &TurnLock, + label: &str, + first: hive_sh4re::DeliveredMessage, +) { + let from = first.from; + let body = first.body; + let redelivered = first.redelivered; + tracing::info!(%from, %body, %redelivered, "inbox"); + let unread = inbox_unread(socket).await; + bus.emit(LiveEvent::TurnStart { from: from.clone(), body: body.clone(), unread }); + bus.set_state(TurnState::Thinking); + let started_at = serve_common::now_unix(); + let started_instant = std::time::Instant::now(); + let model_at_start = bus.model(); + let prompt = serve_common::format_wake_prompt(&from, &body, unread, redelivered); + let outcome = { + let _guard = turn_lock.lock().await; + turn::drive_turn(&prompt, files, bus).await + }; + turn::emit_turn_end(bus, &outcome); + bus.set_state(TurnState::Idle); + // Ack only on a clean turn-end. `Failed` leaves every message popped + // during the turn in the unacked list; next harness boot requeues them. + if matches!(outcome, turn::TurnOutcome::Ok | turn::TurnOutcome::Compacted) { + ack_turn(socket).await; + } + if matches!(outcome, turn::TurnOutcome::RateLimited) { + let secs = turn::rate_limit_sleep_secs(); + bus.emit_status("rate_limited"); + bus.emit(LiveEvent::Note { + text: format!("API rate-limited — sleeping {secs}s before retry"), + }); + tracing::warn!(sleep_secs = secs, "rate-limited; parking"); + tokio::time::sleep(Duration::from_secs(secs)).await; + requeue_inflight(socket).await; + bus.emit_status("online"); + } + // Real crash: PromptTooLong is absorbed by compaction inside drive_turn. + if let turn::TurnOutcome::Failed(e) = &outcome { + notify_manager_of_failure(socket, label, e).await; + } + if let Some(stats) = stats { + let ended_at = serve_common::now_unix(); + let duration_ms = + i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX); + let (open_threads, open_reminders) = fetch_agent_post_turn_counts(socket).await; + let row = serve_common::build_row( + started_at, + ended_at, + duration_ms, + model_at_start, + from.clone(), + &outcome, + bus, + open_threads, + open_reminders, + ); + stats.record(&row); + } + let pending = inbox_unread(socket).await; + if pending > 0 { + tracing::info!(%pending, "pending messages after turn; fetching next"); + } + // `request_next_turn` MCP tool: agent wrote a sentinel requesting + // an immediate self-continuation. Clear and inject synthetic wake. + check_and_inject_continue(socket, label).await; +} + // Per-turn user prompt: the role/tools/etc. is in the system prompt // (`prompts/agent.md` → `claude --system-prompt-file`); this is just the // wake signal claude reacts to. `unread` is the count of *other* diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 00054f7..21e5f3f 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -113,7 +113,6 @@ async fn main() -> Result<()> { } } -#[allow(clippy::too_many_lines)] // linear startup sequence; splitting adds indirection without clarity async fn serve( socket: &Path, interval: Duration, @@ -145,93 +144,7 @@ async fn serve( match recv { Ok(ManagerResponse::Messages { messages }) if !messages.is_empty() => { let first = messages.into_iter().next().expect("checked non-empty"); - let from = first.from; - let body = first.body; - let redelivered = first.redelivered; - if from == SYSTEM_SENDER { - // Helper events (ApprovalResolved / Spawned / Rebuilt / - // Killed / Destroyed) — these are FYI for the manager; - // we surface them in the live view and forward them as - // a normal claude turn so the manager can react (e.g. - // greet a newly-spawned agent, retry a failed rebuild). - let parsed = serde_json::from_str::(&body).ok(); - if let Some(event) = parsed { - tracing::info!(?event, "helper event"); - } else { - tracing::info!(%from, %body, "system message"); - } - bus.emit(LiveEvent::Note { - text: format!("[system] {body}"), - }); - // Fall through: drive a turn with the event in the wake - // prompt body so claude sees it. Sender stays "system" - // so the wake prompt can label it as such. - } - tracing::info!(%from, %body, %redelivered, "manager inbox"); - let unread = inbox_unread(socket).await; - bus.emit(LiveEvent::TurnStart { - from: from.clone(), - body: body.clone(), - unread, - }); - let prompt = serve_common::format_wake_prompt(&from, &body, unread, redelivered); - bus.set_state(TurnState::Thinking); - let started_at = serve_common::now_unix(); - let started_instant = std::time::Instant::now(); - let model_at_start = bus.model(); - let outcome = { - let _guard = turn_lock.lock().await; - turn::drive_turn(&prompt, files, &bus).await - }; - turn::emit_turn_end(&bus, &outcome); - bus.set_state(TurnState::Idle); - // Ack only on a clean turn-end; Failed / RateLimited leave - // the popped ids in-flight for the next boot's requeue. - // Mirrors hive-ag3nt; see that loop for full rationale. - if matches!(outcome, turn::TurnOutcome::Ok | turn::TurnOutcome::Compacted) { - ack_turn(socket).await; - } - // Rate-limited: park until the quota resets, then requeue - // the unacked message so it resurfaces in the same session. - if matches!(outcome, turn::TurnOutcome::RateLimited) { - let secs = turn::rate_limit_sleep_secs(); - bus.emit_status("rate_limited"); - bus.emit(LiveEvent::Note { - text: format!( - "API rate-limited — sleeping {secs}s before retry" - ), - }); - tracing::warn!(sleep_secs = secs, "rate-limited; parking"); - tokio::time::sleep(Duration::from_secs(secs)).await; - requeue_inflight(socket).await; - bus.emit_status("online"); - } - if let Some(s) = &stats { - let ended_at = serve_common::now_unix(); - let duration_ms = - i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX); - let (open_threads, open_reminders) = - fetch_manager_post_turn_counts(socket).await; - let row = serve_common::build_row( - started_at, - ended_at, - duration_ms, - model_at_start, - from.clone(), - &outcome, - &bus, - open_threads, - open_reminders, - ); - s.record(&row); - } - // Check for messages that arrived during the turn so we - // surface "draining" in the logs. The loop will already - // re-iterate from here — no explicit continue needed. - let pending = inbox_unread(socket).await; - if pending > 0 { - tracing::info!(%pending, "pending messages after turn; fetching next"); - } + handle_manager_turn(socket, &bus, stats.as_ref(), files, &turn_lock, first).await; } Ok(ManagerResponse::Messages { .. }) => { // Idle: empty list = nothing pending. Brief sleep @@ -261,6 +174,84 @@ async fn serve( } } +/// Drive one turn for a received manager-inbox message. Called from the +/// serve loop for the non-empty-messages arm to keep that loop readable. +async fn handle_manager_turn( + socket: &Path, + bus: &Bus, + stats: Option<&TurnStats>, + files: &turn::TurnFiles, + turn_lock: &TurnLock, + first: hive_sh4re::DeliveredMessage, +) { + let from = first.from; + let body = first.body; + let redelivered = first.redelivered; + if from == SYSTEM_SENDER { + // Helper events (ApprovalResolved / Spawned / Rebuilt / + // Killed / Destroyed) — surface in the live view and drive a + // normal turn so the manager can react. + let parsed = serde_json::from_str::(&body).ok(); + if let Some(event) = parsed { + tracing::info!(?event, "helper event"); + } else { + tracing::info!(%from, %body, "system message"); + } + bus.emit(LiveEvent::Note { text: format!("[system] {body}") }); + } + tracing::info!(%from, %body, %redelivered, "manager inbox"); + let unread = inbox_unread(socket).await; + bus.emit(LiveEvent::TurnStart { from: from.clone(), body: body.clone(), unread }); + let prompt = serve_common::format_wake_prompt(&from, &body, unread, redelivered); + bus.set_state(TurnState::Thinking); + let started_at = serve_common::now_unix(); + let started_instant = std::time::Instant::now(); + let model_at_start = bus.model(); + let outcome = { + let _guard = turn_lock.lock().await; + turn::drive_turn(&prompt, files, bus).await + }; + turn::emit_turn_end(bus, &outcome); + bus.set_state(TurnState::Idle); + // Ack only on a clean turn-end; Failed / RateLimited leave the + // popped ids in-flight for the next boot's requeue. + if matches!(outcome, turn::TurnOutcome::Ok | turn::TurnOutcome::Compacted) { + ack_turn(socket).await; + } + if matches!(outcome, turn::TurnOutcome::RateLimited) { + let secs = turn::rate_limit_sleep_secs(); + bus.emit_status("rate_limited"); + bus.emit(LiveEvent::Note { + text: format!("API rate-limited — sleeping {secs}s before retry"), + }); + tracing::warn!(sleep_secs = secs, "rate-limited; parking"); + tokio::time::sleep(Duration::from_secs(secs)).await; + requeue_inflight(socket).await; + bus.emit_status("online"); + } + if let Some(stats) = stats { + let ended_at = serve_common::now_unix(); + let duration_ms = + i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX); + let (open_threads, open_reminders) = fetch_manager_post_turn_counts(socket).await; + let row = serve_common::build_row( + started_at, + ended_at, + duration_ms, + model_at_start, + from.clone(), + &outcome, + bus, + open_threads, + open_reminders, + ); + stats.record(&row); + } + let pending = inbox_unread(socket).await; + if pending > 0 { + tracing::info!(%pending, "pending messages after turn; fetching next"); + } +} /// Best-effort: tell the broker every message popped during the turn /// is now handled. Mirror of `hive-ag3nt::ack_turn` on the manager diff --git a/hive-ag3nt/src/forge_notify.rs b/hive-ag3nt/src/forge_notify.rs index a1f9e7a..21977f7 100644 --- a/hive-ag3nt/src/forge_notify.rs +++ b/hive-ag3nt/src/forge_notify.rs @@ -422,8 +422,7 @@ fn format_state_change_notification( && subject .as_ref() .and_then(|s| s["requested_reviewers"].as_array()) - .map(|arr| arr.iter().any(|r| r["login"].as_str() == Some(own_login))) - .unwrap_or(false); + .is_some_and(|arr| arr.iter().any(|r| r["login"].as_str() == Some(own_login))); let kind = if is_review_request { format!("review requested{num}{repo}") } else { diff --git a/hive-c0re/src/actions.rs b/hive-c0re/src/actions.rs index 02d12dd..36fa764 100644 --- a/hive-c0re/src/actions.rs +++ b/hive-c0re/src/actions.rs @@ -23,7 +23,6 @@ use crate::lifecycle::{self, MANAGER_NAME}; /// /// In all cases an `ApprovalResolved` helper event lands in the manager's /// inbox when the work resolves. -#[allow(clippy::too_many_lines)] // approval dispatch covers several independent approval kinds pub async fn approve(coord: Arc, id: i64) -> Result<()> { let approval = coord.approvals.mark_approved(id)?; tracing::info!( @@ -33,142 +32,164 @@ pub async fn approve(coord: Arc, id: i64) -> Result<()> { %approval.commit_ref, "approval: running action", ); - let agent_dir = coord.ensure_runtime(&approval.agent)?; let proposed_dir = Coordinator::agent_proposed_dir(&approval.agent); let applied_dir = Coordinator::agent_applied_dir(&approval.agent); let claude_dir = Coordinator::agent_claude_dir(&approval.agent); let notes_dir = Coordinator::agent_notes_dir(&approval.agent); - match approval.kind { ApprovalKind::ApplyCommit => { - let (result, terminal_tag, is_first_spawn) = run_apply_commit( - &coord, - &approval, - &agent_dir, - &applied_dir, - &claude_dir, - ¬es_dir, - ) - .await; - // Mirror the applied repo's new tag/branch state (approved/ - // building/deployed-or-failed + main) to the forge. - if let Err(e) = crate::forge::push_config(&approval.agent).await { - tracing::warn!(agent = %approval.agent, error = ?e, "forge: push_config after apply failed"); - } - if is_first_spawn && result.is_ok() { - // First-spawn bookkeeping: create the per-agent forge user, - // mirror the applied repo into agent-configs/, and grant - // read access to core/meta. - if let Err(e) = crate::forge::ensure_user_for(&approval.agent).await { - tracing::warn!(agent = %approval.agent, error = ?e, "forge: ensure_user after first spawn failed"); - } - if let Err(e) = crate::forge::ensure_config_repo(&approval.agent).await { - tracing::warn!(agent = %approval.agent, error = ?e, "forge: ensure_config_repo after first spawn failed"); - } - if let Some(core_token) = crate::forge::core_token() - && let Err(e) = crate::forge::meta_read_access(&approval.agent, &core_token).await { - tracing::warn!(agent = %approval.agent, error = ?e, "forge: meta_read_access after first spawn failed"); - } - if let Err(e) = crate::forge::ensure_meta_remote(&approval.agent).await { - tracing::warn!(agent = %approval.agent, error = ?e, "forge: ensure_meta_remote after first spawn failed"); - } - // New container row appeared — rescan so the dashboard - // reflects the post-spawn state without a manual refetch. - coord.rescan_containers_and_emit().await; - crate::dashboard::emit_tombstones_snapshot(&coord).await; - } - finish_approval(&coord, &approval, result, terminal_tag, is_first_spawn) + approve_apply_commit(coord, approval, agent_dir, applied_dir, claude_dir, notes_dir).await } ApprovalKind::InitConfig => { - // Seed the proposed config repo. Runs synchronously — it's just - // a few git operations with no nixos-container involvement. - let result: Result<()> = async { - lifecycle::setup_proposed(&proposed_dir, &approval.agent).await?; - lifecycle::ensure_claude_dir(&claude_dir)?; - lifecycle::ensure_state_dir(¬es_dir)?; - Ok(()) - } - .await; - // Wire the meta remote now that the proposed repo exists. - if result.is_ok() - && let Err(e) = crate::forge::ensure_meta_remote(&approval.agent).await { - tracing::warn!(agent = %approval.agent, error = ?e, "forge: ensure_meta_remote after init_config failed"); - } - finish_approval(&coord, &approval, result, None, false) - } - ApprovalKind::UpdateMetaInputs => { - // Decode the inputs from the commit_ref field (stored as JSON - // by submit_apply_commit's counterpart in manager_server.rs). - let inputs: Vec = - serde_json::from_str(&approval.commit_ref).unwrap_or_default(); - let result = crate::meta::lock_update(&inputs).await; - finish_approval(&coord, &approval, result, None, false) + approve_init_config(coord, approval, proposed_dir, claude_dir, notes_dir).await } + ApprovalKind::UpdateMetaInputs => approve_update_meta_inputs(coord, approval).await, ApprovalKind::Spawn => { - // Run the spawn in the background so the approve POST returns - // immediately. The dashboard reads `transient` to render a spinner. - // Guard is created synchronously here (so the spinner appears - // the moment the operator clicks approve) and moved into the - // task; it auto-clears even if the runtime drops the task. - let coord_bg = coord.clone(); - let approval_bg = approval.clone(); - let guard = coord_bg.transient_guard(&approval_bg.agent, TransientKind::Spawning); - tokio::spawn(async move { - let guard = guard; - let agent_bg = approval_bg.agent.clone(); - let result = lifecycle::spawn( - &approval_bg.agent, - &coord_bg.hyperhive_flake, - &agent_dir, - &proposed_dir, - &applied_dir, - &claude_dir, - ¬es_dir, - coord_bg.dashboard_port, - &coord_bg.operator_pronouns, - &coord_bg.context_window_tokens, - ) - .await; - drop(guard); - if result.is_ok() { - if let Err(e) = crate::forge::ensure_user_for(&agent_bg).await { - tracing::warn!(agent = %agent_bg, error = ?e, "forge: ensure_user after spawn failed"); - } - // Create the agent-configs mirror repo and seed it - // with the freshly-initialised applied repo (main + - // deployed/0). - if let Err(e) = crate::forge::ensure_config_repo(&agent_bg).await { - tracing::warn!(agent = %agent_bg, error = ?e, "forge: ensure_config_repo after spawn failed"); - } - if let Err(e) = crate::forge::push_config(&agent_bg).await { - tracing::warn!(agent = %agent_bg, error = ?e, "forge: push_config after spawn failed"); - } - if let Some(core_token) = crate::forge::core_token() - && let Err(e) = crate::forge::meta_read_access(&agent_bg, &core_token).await { - tracing::warn!(agent = %agent_bg, error = ?e, "forge: meta_read_access after spawn failed"); - } - if let Err(e) = crate::forge::ensure_meta_remote(&agent_bg).await { - tracing::warn!(agent = %agent_bg, error = ?e, "forge: ensure_meta_remote after spawn failed"); - } - } - if let Err(e) = finish_approval(&coord_bg, &approval_bg, result, None, false) { - tracing::warn!(agent = %agent_bg, error = ?e, "spawn approval failed"); - } - // New container row appeared (or didn't, on failure - // before nixos-container create completed) — rescan so - // dashboards reflect the post-spawn state. Spawn can - // also consume a tombstone of the same name; emit the - // fresh list so the operator's dormant-state pane - // updates without a refetch. - coord_bg.rescan_containers_and_emit().await; - crate::dashboard::emit_tombstones_snapshot(&coord_bg).await; - }); + approve_spawn(&coord, &approval, agent_dir, proposed_dir, applied_dir, claude_dir, notes_dir); Ok(()) } } } +async fn approve_apply_commit( + coord: Arc, + approval: hive_sh4re::Approval, + agent_dir: std::path::PathBuf, + applied_dir: std::path::PathBuf, + claude_dir: std::path::PathBuf, + notes_dir: std::path::PathBuf, +) -> Result<()> { + let (result, terminal_tag, is_first_spawn) = run_apply_commit( + &coord, + &approval, + &agent_dir, + &applied_dir, + &claude_dir, + ¬es_dir, + ) + .await; + if let Err(e) = crate::forge::push_config(&approval.agent).await { + tracing::warn!(agent = %approval.agent, error = ?e, "forge: push_config after apply failed"); + } + if is_first_spawn && result.is_ok() { + forge_after_first_spawn(&coord, &approval.agent).await; + } + finish_approval(&coord, &approval, result, terminal_tag, is_first_spawn) +} + +/// Forge bookkeeping run once after the very first container spawn: +/// create the per-agent forge user, mirror the applied repo, and grant +/// read access to core/meta. Also rescans containers so the dashboard +/// reflects the post-spawn state. +async fn forge_after_first_spawn(coord: &Arc, agent: &str) { + if let Err(e) = crate::forge::ensure_user_for(agent).await { + tracing::warn!(%agent, error = ?e, "forge: ensure_user after first spawn failed"); + } + if let Err(e) = crate::forge::ensure_config_repo(agent).await { + tracing::warn!(%agent, error = ?e, "forge: ensure_config_repo after first spawn failed"); + } + if let Some(core_token) = crate::forge::core_token() + && let Err(e) = crate::forge::meta_read_access(agent, &core_token).await { + tracing::warn!(%agent, error = ?e, "forge: meta_read_access after first spawn failed"); + } + if let Err(e) = crate::forge::ensure_meta_remote(agent).await { + tracing::warn!(%agent, error = ?e, "forge: ensure_meta_remote after first spawn failed"); + } + coord.rescan_containers_and_emit().await; + crate::dashboard::emit_tombstones_snapshot(coord).await; +} + +async fn approve_init_config( + coord: Arc, + approval: hive_sh4re::Approval, + proposed_dir: std::path::PathBuf, + claude_dir: std::path::PathBuf, + notes_dir: std::path::PathBuf, +) -> Result<()> { + // Seed the proposed config repo — just git operations, no nixos-container. + let result: Result<()> = async { + lifecycle::setup_proposed(&proposed_dir, &approval.agent).await?; + lifecycle::ensure_claude_dir(&claude_dir)?; + lifecycle::ensure_state_dir(¬es_dir)?; + Ok(()) + } + .await; + if result.is_ok() + && let Err(e) = crate::forge::ensure_meta_remote(&approval.agent).await { + tracing::warn!(agent = %approval.agent, error = ?e, "forge: ensure_meta_remote after init_config failed"); + } + finish_approval(&coord, &approval, result, None, false) +} + +async fn approve_update_meta_inputs( + coord: Arc, + approval: hive_sh4re::Approval, +) -> Result<()> { + // Inputs stored as JSON in commit_ref by the manager's submit path. + let inputs: Vec = serde_json::from_str(&approval.commit_ref).unwrap_or_default(); + let result = crate::meta::lock_update(&inputs).await; + finish_approval(&coord, &approval, result, None, false) +} + +fn approve_spawn( + coord: &Arc, + approval: &hive_sh4re::Approval, + agent_dir: std::path::PathBuf, + proposed_dir: std::path::PathBuf, + applied_dir: std::path::PathBuf, + claude_dir: std::path::PathBuf, + notes_dir: std::path::PathBuf, +) { + // Run spawn in the background so approve POST returns immediately. + // Guard created synchronously so the spinner appears the moment + // the operator clicks approve; auto-clears when the task drops it. + let coord_bg = Arc::clone(coord); + let approval_bg = approval.clone(); + let guard = coord_bg.transient_guard(&approval_bg.agent, TransientKind::Spawning); + tokio::spawn(async move { + let guard = guard; + let agent_bg = approval_bg.agent.clone(); + let result = lifecycle::spawn( + &approval_bg.agent, + &coord_bg.hyperhive_flake, + &agent_dir, + &proposed_dir, + &applied_dir, + &claude_dir, + ¬es_dir, + coord_bg.dashboard_port, + &coord_bg.operator_pronouns, + &coord_bg.context_window_tokens, + ) + .await; + drop(guard); + if result.is_ok() { + if let Err(e) = crate::forge::ensure_user_for(&agent_bg).await { + tracing::warn!(agent = %agent_bg, error = ?e, "forge: ensure_user after spawn failed"); + } + if let Err(e) = crate::forge::ensure_config_repo(&agent_bg).await { + tracing::warn!(agent = %agent_bg, error = ?e, "forge: ensure_config_repo after spawn failed"); + } + if let Err(e) = crate::forge::push_config(&agent_bg).await { + tracing::warn!(agent = %agent_bg, error = ?e, "forge: push_config after spawn failed"); + } + if let Some(core_token) = crate::forge::core_token() + && let Err(e) = crate::forge::meta_read_access(&agent_bg, &core_token).await { + tracing::warn!(agent = %agent_bg, error = ?e, "forge: meta_read_access after spawn failed"); + } + if let Err(e) = crate::forge::ensure_meta_remote(&agent_bg).await { + tracing::warn!(agent = %agent_bg, error = ?e, "forge: ensure_meta_remote after spawn failed"); + } + } + if let Err(e) = finish_approval(&coord_bg, &approval_bg, result, None, false) { + tracing::warn!(agent = %agent_bg, error = ?e, "spawn approval failed"); + } + coord_bg.rescan_containers_and_emit().await; + crate::dashboard::emit_tombstones_snapshot(&coord_bg).await; + }); +} + fn finish_approval( coord: &Coordinator, approval: &hive_sh4re::Approval, diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 74a8e18..07e6d17 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -100,7 +100,6 @@ enum Cmd { } #[tokio::main] -#[allow(clippy::too_many_lines)] // top-level dispatch; hard to split without losing readability async fn main() -> Result<()> { tracing_subscriber::fmt() .with_env_filter( @@ -117,121 +116,7 @@ async fn main() -> Result<()> { dashboard_port, operator_pronouns, context_window_tokens, - } => { - let cwt: std::collections::HashMap = - serde_json::from_str(&context_window_tokens) - .context("--context-window-tokens: invalid JSON")?; - let coord = Arc::new(Coordinator::open( - &db, - hyperhive_flake, - dashboard_port, - operator_pronouns, - cwt, - )?); - manager_server::start(coord.clone())?; - // Idempotent pre-flight: rewrite pre-meta-layout applied - // repos, ensure proposed repos carry the `applied` - // remote, bootstrap the meta repo, repoint containers at - // `meta#` (one-shot, guarded by a marker file). - // Runs before manager auto-spawn so the new manager is - // built against meta from the first attempt. - if let Err(e) = migrate::run(&coord).await { - tracing::warn!(error = ?e, "startup migration failed"); - } - // Auto-create the manager container if it isn't there yet. Block - // on this — without hm1nd the system has no manager harness. - // Failures are logged but allowed: a broken auto-spawn shouldn't - // make the dashboard unreachable for debugging. - if let Err(e) = auto_update::ensure_manager(&coord).await { - tracing::warn!(error = ?e, "auto-spawn manager failed"); - } - // Auto-update in the background — don't block service start. - // Sub-agent rebuilds can take tens of seconds; we want the admin - // socket up immediately. - let update_coord = coord.clone(); - tokio::spawn(async move { - if let Err(e) = auto_update::run(update_coord).await { - tracing::warn!(error = ?e, "auto-update task failed"); - } - }); - // Forge user sweep: ensure every existing container has a - // forgejo user + access token. No-op when the hive-forge - // container isn't running. Backgrounded — touches the - // forge state dir via `nixos-container run` which is slow. - tokio::spawn(async move { - forge::ensure_all().await; - }); - // Periodic broker vacuum: drop fully-acked messages older - // than 30 days. Delivered-but-unacked rows (recoverable via - // requeue_inflight) and undelivered rows are always kept. - // Runs hourly; first sweep happens immediately. - let vacuum_coord = coord.clone(); - let mut vacuum_shutdown = coord.shutdown_rx(); - tokio::spawn(async move { - let interval = std::time::Duration::from_secs(3600); - let keep_secs: i64 = 30 * 24 * 3600; - loop { - match vacuum_coord.broker.vacuum_delivered(keep_secs) { - Ok(0) => {} - Ok(n) => tracing::info!(removed = n, "broker vacuum"), - Err(e) => tracing::warn!(error = ?e, "broker vacuum failed"), - } - tokio::select! { - () = tokio::time::sleep(interval) => {} - _ = vacuum_shutdown.changed() => { - tracing::info!("broker vacuum: shutdown signal received"); - break; - } - } - } - }); - // Per-agent events.sqlite vacuum: host-side so the harness - // doesn't need any retention wiring of its own. - events_vacuum::spawn(&coord); - // Per-agent turn-stats.sqlite vacuum: same pattern, 90-day - // retention so trend analysis has enough history. - stats_vacuum::spawn(&coord); - // Container crash watcher: emits HelperEvent::ContainerCrash - // when a previously-running container goes away without an - // operator-initiated transient state. - crash_watch::spawn(coord.clone()); - // Reminder scheduler: drains due reminders + handles - // file_path payload persistence. See reminder_scheduler.rs. - reminder_scheduler::spawn(coord.clone()); - // Forward every broker event onto the unified dashboard - // channel with a freshly-stamped seq, so the dashboard SSE - // sees broker messages + future mutation events on one - // stream with one monotonic seq. The broker's intra-process - // channel (used by `recv_blocking_batch`) stays untouched. - spawn_broker_to_dashboard_forwarder(coord.clone()); - let dash_coord = coord.clone(); - tokio::spawn(async move { - if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await { - tracing::error!(error = ?e, "dashboard failed"); - } - }); - // Run the admin socket until a signal arrives; then signal - // all background tasks so they exit cleanly before the - // process terminates. - let coord_sig = coord.clone(); - tokio::select! { - res = server::serve(&cli.socket, coord) => { res? } - _ = tokio::signal::ctrl_c() => { - tracing::info!("SIGINT received — requesting shutdown"); - coord_sig.request_shutdown(); - } - () = async { - let mut sig = tokio::signal::unix::signal( - tokio::signal::unix::SignalKind::terminate() - ).expect("failed to install SIGTERM handler"); - sig.recv().await; - } => { - tracing::info!("SIGTERM received — requesting shutdown"); - coord_sig.request_shutdown(); - } - } - Ok(()) - } + } => cmd_serve(hyperhive_flake, db, dashboard_port, operator_pronouns, context_window_tokens, &cli.socket).await, Cmd::Spawn { name } => { render(client::request(&cli.socket, HostRequest::Spawn { name }).await?) } @@ -256,6 +141,132 @@ async fn main() -> Result<()> { } } +/// Start the coordinator daemon: open the broker, run migrations, spawn +/// background tasks (auto-update, vacuums, crash-watcher, reminder-scheduler, +/// dashboard), then serve the admin socket until a signal arrives. +async fn cmd_serve( + hyperhive_flake: String, + db: std::path::PathBuf, + dashboard_port: u16, + operator_pronouns: String, + context_window_tokens: String, + socket: &std::path::Path, +) -> Result<()> { + let cwt: std::collections::HashMap = + serde_json::from_str(&context_window_tokens) + .context("--context-window-tokens: invalid JSON")?; + let coord = Arc::new(Coordinator::open( + &db, + hyperhive_flake, + dashboard_port, + operator_pronouns, + cwt, + )?); + manager_server::start(coord.clone())?; + // Idempotent pre-flight: rewrite pre-meta-layout applied + // repos, ensure proposed repos carry the `applied` + // remote, bootstrap the meta repo, repoint containers at + // `meta#` (one-shot, guarded by a marker file). + // Runs before manager auto-spawn so the new manager is + // built against meta from the first attempt. + if let Err(e) = migrate::run(&coord).await { + tracing::warn!(error = ?e, "startup migration failed"); + } + // Auto-create the manager container if it isn't there yet. Block + // on this — without hm1nd the system has no manager harness. + // Failures are logged but allowed: a broken auto-spawn shouldn't + // make the dashboard unreachable for debugging. + if let Err(e) = auto_update::ensure_manager(&coord).await { + tracing::warn!(error = ?e, "auto-spawn manager failed"); + } + // Auto-update in the background — don't block service start. + // Sub-agent rebuilds can take tens of seconds; we want the admin + // socket up immediately. + let update_coord = coord.clone(); + tokio::spawn(async move { + if let Err(e) = auto_update::run(update_coord).await { + tracing::warn!(error = ?e, "auto-update task failed"); + } + }); + // Forge user sweep: ensure every existing container has a + // forgejo user + access token. No-op when the hive-forge + // container isn't running. Backgrounded — touches the + // forge state dir via `nixos-container run` which is slow. + tokio::spawn(async move { + forge::ensure_all().await; + }); + // Periodic broker vacuum: drop fully-acked messages older + // than 30 days. Delivered-but-unacked rows (recoverable via + // requeue_inflight) and undelivered rows are always kept. + // Runs hourly; first sweep happens immediately. + let vacuum_coord = coord.clone(); + let mut vacuum_shutdown = coord.shutdown_rx(); + tokio::spawn(async move { + let interval = std::time::Duration::from_secs(3600); + let keep_secs: i64 = 30 * 24 * 3600; + loop { + match vacuum_coord.broker.vacuum_delivered(keep_secs) { + Ok(0) => {} + Ok(n) => tracing::info!(removed = n, "broker vacuum"), + Err(e) => tracing::warn!(error = ?e, "broker vacuum failed"), + } + tokio::select! { + () = tokio::time::sleep(interval) => {} + _ = vacuum_shutdown.changed() => { + tracing::info!("broker vacuum: shutdown signal received"); + break; + } + } + } + }); + // Per-agent events.sqlite vacuum: host-side so the harness + // doesn't need any retention wiring of its own. + events_vacuum::spawn(&coord); + // Per-agent turn-stats.sqlite vacuum: same pattern, 90-day + // retention so trend analysis has enough history. + stats_vacuum::spawn(&coord); + // Container crash watcher: emits HelperEvent::ContainerCrash + // when a previously-running container goes away without an + // operator-initiated transient state. + crash_watch::spawn(coord.clone()); + // Reminder scheduler: drains due reminders + handles + // file_path payload persistence. See reminder_scheduler.rs. + reminder_scheduler::spawn(coord.clone()); + // Forward every broker event onto the unified dashboard + // channel with a freshly-stamped seq, so the dashboard SSE + // sees broker messages + future mutation events on one + // stream with one monotonic seq. The broker's intra-process + // channel (used by `recv_blocking_batch`) stays untouched. + spawn_broker_to_dashboard_forwarder(coord.clone()); + let dash_coord = coord.clone(); + tokio::spawn(async move { + if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await { + tracing::error!(error = ?e, "dashboard failed"); + } + }); + // Run the admin socket until a signal arrives; then signal + // all background tasks so they exit cleanly before the + // process terminates. + let coord_sig = coord.clone(); + tokio::select! { + res = server::serve(socket, coord) => { res? } + _ = tokio::signal::ctrl_c() => { + tracing::info!("SIGINT received — requesting shutdown"); + coord_sig.request_shutdown(); + } + () = async { + let mut sig = tokio::signal::unix::signal( + tokio::signal::unix::SignalKind::terminate() + ).expect("failed to install SIGTERM handler"); + sig.recv().await; + } => { + tracing::info!("SIGTERM received — requesting shutdown"); + coord_sig.request_shutdown(); + } + } + Ok(()) +} + /// Re-emit every broker `MessageEvent` onto the dashboard channel as /// a `DashboardEvent::Sent` / `Delivered` with a freshly-stamped seq. /// Background task; runs for the life of the process. On a lagged