diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index 364ae1f..9a393e2 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -8,8 +8,8 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use hive_ag3nt::events::{Bus, LiveEvent, TurnState}; use hive_ag3nt::login::{self, LoginState}; -use hive_ag3nt::turn_stats::{TurnStatRow, TurnStats}; -use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui}; +use hive_ag3nt::turn_stats::TurnStats; +use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, serve_common, turn, web_ui}; use hive_sh4re::{AgentRequest, AgentResponse}; #[derive(Parser)] @@ -83,7 +83,7 @@ async fn main() -> Result<()> { let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Agent).await?; let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(())); plugins::install_configured(&cli.socket, Some("manager")).await; - tokio::spawn(hive_ag3nt::forge_notify::run(cli.socket.clone())); + tokio::spawn(hive_ag3nt::forge_notify::run(cli.socket.clone(), false)); tokio::spawn(web_ui::serve( label.clone(), port, @@ -202,10 +202,10 @@ async fn serve( unread, }); bus.set_state(TurnState::Thinking); - let started_at = now_unix(); + let started_at = serve_common::now_unix(); let started_instant = std::time::Instant::now(); let model_at_start = bus.model(); - let prompt = format_wake_prompt(&from, &body, unread, redelivered); + let prompt = serve_common::format_wake_prompt(&from, &body, unread, redelivered); let outcome = { let _guard = turn_lock.lock().await; turn::drive_turn(&prompt, files, &bus).await @@ -247,11 +247,11 @@ async fn serve( notify_manager_of_failure(socket, label, e).await; } if let Some(s) = &stats { - let ended_at = now_unix(); + let ended_at = serve_common::now_unix(); let duration_ms = i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX); let (open_threads, open_reminders) = fetch_agent_post_turn_counts(socket).await; - let row = build_row( + let row = serve_common::build_row( started_at, ended_at, duration_ms, @@ -310,22 +310,6 @@ async fn serve( /// session, never acked, and resurfaced after a restart — a banner /// at the top of the wake prompt warns that any side-effects of /// previous handling may already have happened. -fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String { - let banner = if redelivered { - hive_ag3nt::mcp::REDELIVERY_HINT - } else { - "" - }; - let pending = if unread == 0 { - String::new() - } else { - format!( - "\n\n({unread} more message(s) pending in your inbox — call `mcp__hyperhive__recv` \ - with `max: {unread}` to drain them all in one round-trip before acting.)" - ) - }; - format!("{banner}Incoming message from `{from}`:\n---\n{body}\n---{pending}") -} /// Best-effort: tell the broker every message we popped during the /// turn is now fully handled (turn-end-OK). Swallows transport @@ -397,13 +381,6 @@ async fn inbox_unread(socket: &Path) -> u64 { } } -fn now_unix() -> i64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .ok() - .and_then(|d| i64::try_from(d.as_secs()).ok()) - .unwrap_or(0) -} /// Best-effort: ask hive-c0re for this agent's open thread count + pending /// reminder count, after the turn finishes. Either roundtrip can fail @@ -431,57 +408,3 @@ async fn fetch_agent_post_turn_counts(socket: &Path) -> (Option, Option, - open_reminders_count: Option, -) -> TurnStatRow { - let cost = bus.last_cost_usage().unwrap_or_default(); - let ctx = bus.last_ctx_usage().unwrap_or(cost); - let tool_calls = bus.take_tool_calls(); - let tool_call_count: u64 = tool_calls.values().copied().sum(); - let tool_call_breakdown_json = if tool_calls.is_empty() { - None - } else { - serde_json::to_string(&tool_calls).ok() - }; - let (result_kind, note) = match outcome { - turn::TurnOutcome::Ok => ("ok", None), - turn::TurnOutcome::Compacted => ("compacted", None), - turn::TurnOutcome::PromptTooLong => ("prompt_too_long", None), - turn::TurnOutcome::RateLimited => ("rate_limited", None), - turn::TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))), - }; - TurnStatRow { - started_at, - ended_at, - duration_ms, - model, - wake_from, - input_tokens: cost.input_tokens, - output_tokens: cost.output_tokens, - cache_read_input_tokens: cost.cache_read_input_tokens, - cache_creation_input_tokens: cost.cache_creation_input_tokens, - last_input_tokens: ctx.input_tokens, - last_output_tokens: ctx.output_tokens, - last_cache_read_input_tokens: ctx.cache_read_input_tokens, - last_cache_creation_input_tokens: ctx.cache_creation_input_tokens, - tool_call_count, - tool_call_breakdown_json, - open_threads_count, - open_reminders_count, - result_kind, - note, - } -} diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 8346bed..6e085e5 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -12,8 +12,8 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use hive_ag3nt::events::{Bus, LiveEvent, TurnState}; use hive_ag3nt::login::{self, LoginState}; -use hive_ag3nt::turn_stats::{TurnStatRow, TurnStats}; -use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui}; +use hive_ag3nt::turn_stats::TurnStats; +use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, serve_common, turn, web_ui}; use hive_sh4re::{HelperEvent, ManagerRequest, ManagerResponse, SYSTEM_SENDER}; #[derive(Parser)] @@ -172,9 +172,9 @@ async fn serve( body: body.clone(), unread, }); - let prompt = format_wake_prompt(&from, &body, unread, redelivered); + let prompt = serve_common::format_wake_prompt(&from, &body, unread, redelivered); bus.set_state(TurnState::Thinking); - let started_at = now_unix(); + let started_at = serve_common::now_unix(); let started_instant = std::time::Instant::now(); let model_at_start = bus.model(); let outcome = { @@ -205,12 +205,12 @@ async fn serve( bus.emit_status("online"); } if let Some(s) = &stats { - let ended_at = now_unix(); + let ended_at = serve_common::now_unix(); let duration_ms = i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX); let (open_threads, open_reminders) = fetch_manager_post_turn_counts(socket).await; - let row = build_row( + let row = serve_common::build_row( started_at, ended_at, duration_ms, @@ -259,28 +259,6 @@ async fn serve( } } -/// Per-turn user prompt. The role/tools/etc. is in the system prompt -/// (`prompts/manager.md` → `claude --system-prompt-file`); this is just -/// the wake signal. `unread` is the inbox depth after this message was -/// popped. `redelivered` adds a "may already be handled" banner above -/// the wake body when the broker resurfaced this row (see hive-ag3nt's -/// `format_wake_prompt` for the full story). -fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String { - let banner = if redelivered { - hive_ag3nt::mcp::REDELIVERY_HINT - } else { - "" - }; - let pending = if unread == 0 { - String::new() - } else { - format!( - "\n\n({unread} more message(s) pending in your inbox — call `mcp__hyperhive__recv` \ - with `max: {unread}` to drain them all in one round-trip before acting.)" - ) - }; - format!("{banner}Incoming message from `{from}`:\n---\n{body}\n---{pending}") -} /// Best-effort: tell the broker every message popped during the turn /// is now handled. Mirror of `hive-ag3nt::ack_turn` on the manager @@ -321,14 +299,6 @@ async fn inbox_unread(socket: &Path) -> u64 { } } -fn now_unix() -> i64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .ok() - .and_then(|d| i64::try_from(d.as_secs()).ok()) - .unwrap_or(0) -} - /// Manager-flavour equivalent of the agent helper. Mirror shape, just /// uses ManagerRequest/ManagerResponse instead of the agent variants. async fn fetch_manager_post_turn_counts(socket: &Path) -> (Option, Option) { @@ -353,55 +323,3 @@ async fn fetch_manager_post_turn_counts(socket: &Path) -> (Option, Option, - open_reminders_count: Option, -) -> TurnStatRow { - let cost = bus.last_cost_usage().unwrap_or_default(); - let ctx = bus.last_ctx_usage().unwrap_or(cost); - let tool_calls = bus.take_tool_calls(); - let tool_call_count: u64 = tool_calls.values().copied().sum(); - let tool_call_breakdown_json = if tool_calls.is_empty() { - None - } else { - serde_json::to_string(&tool_calls).ok() - }; - let (result_kind, note) = match outcome { - turn::TurnOutcome::Ok => ("ok", None), - turn::TurnOutcome::Compacted => ("compacted", None), - turn::TurnOutcome::PromptTooLong => ("prompt_too_long", None), - turn::TurnOutcome::RateLimited => ("rate_limited", None), - turn::TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))), - }; - TurnStatRow { - started_at, - ended_at, - duration_ms, - model, - wake_from, - input_tokens: cost.input_tokens, - output_tokens: cost.output_tokens, - cache_read_input_tokens: cost.cache_read_input_tokens, - cache_creation_input_tokens: cost.cache_creation_input_tokens, - last_input_tokens: ctx.input_tokens, - last_output_tokens: ctx.output_tokens, - last_cache_read_input_tokens: ctx.cache_read_input_tokens, - last_cache_creation_input_tokens: ctx.cache_creation_input_tokens, - tool_call_count, - tool_call_breakdown_json, - open_threads_count, - open_reminders_count, - result_kind, - note, - } -} diff --git a/hive-ag3nt/src/lib.rs b/hive-ag3nt/src/lib.rs index c9d3683..3b2f717 100644 --- a/hive-ag3nt/src/lib.rs +++ b/hive-ag3nt/src/lib.rs @@ -9,6 +9,7 @@ pub mod login_session; pub mod mcp; pub mod paths; pub mod plugins; +pub mod serve_common; pub mod stats; pub mod turn; pub mod turn_stats; diff --git a/hive-ag3nt/src/serve_common.rs b/hive-ag3nt/src/serve_common.rs new file mode 100644 index 0000000..a1c9d48 --- /dev/null +++ b/hive-ag3nt/src/serve_common.rs @@ -0,0 +1,89 @@ +//! Helpers shared between `hive-ag3nt` (agent) and `hive-m1nd` (manager) +//! serve loops. Only pure functions with no wire-type dependency live here; +//! request/response-flavored helpers (`requeue_inflight`, `ack_turn`, etc.) +//! stay in each binary because they use different request enum variants. + +use crate::events::Bus; +use crate::mcp::REDELIVERY_HINT; +use crate::turn::TurnOutcome; +use crate::turn_stats::TurnStatRow; + +/// Assemble the per-turn wake prompt string. The role/tools/etc. live in the +/// system prompt; this is just the wake signal body. `unread` is the inbox +/// depth after this message was popped. `redelivered` prepends a "may already +/// be handled" banner. +pub fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String { + let banner = if redelivered { REDELIVERY_HINT } else { "" }; + let pending = if unread == 0 { + String::new() + } else { + format!( + "\n\n({unread} more message(s) pending in your inbox — call `mcp__hyperhive__recv` \ + with `max: {unread}` to drain them all in one round-trip before acting.)" + ) + }; + format!("{banner}Incoming message from `{from}`:\n---\n{body}\n---{pending}") +} + +/// Current time as a Unix timestamp (seconds). Returns 0 on any error. +pub fn now_unix() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + .unwrap_or(0) +} + +/// Assemble a `TurnStatRow` from the harness's per-turn state. Used by both +/// the agent and manager serve loops — the shape is identical, only the +/// post-turn count fetch helpers differ (and those stay in each binary). +#[allow(clippy::too_many_arguments)] +pub fn build_row( + started_at: i64, + ended_at: i64, + duration_ms: i64, + model: String, + wake_from: String, + outcome: &TurnOutcome, + bus: &Bus, + open_threads_count: Option, + open_reminders_count: Option, +) -> TurnStatRow { + let cost = bus.last_cost_usage().unwrap_or_default(); + let ctx = bus.last_ctx_usage().unwrap_or(cost); + let tool_calls = bus.take_tool_calls(); + let tool_call_count: u64 = tool_calls.values().copied().sum(); + let tool_call_breakdown_json = if tool_calls.is_empty() { + None + } else { + serde_json::to_string(&tool_calls).ok() + }; + let (result_kind, note) = match outcome { + TurnOutcome::Ok => ("ok", None), + TurnOutcome::Compacted => ("compacted", None), + TurnOutcome::PromptTooLong => ("prompt_too_long", None), + TurnOutcome::RateLimited => ("rate_limited", None), + TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))), + }; + TurnStatRow { + started_at, + ended_at, + duration_ms, + model, + wake_from, + input_tokens: cost.input_tokens, + output_tokens: cost.output_tokens, + cache_read_input_tokens: cost.cache_read_input_tokens, + cache_creation_input_tokens: cost.cache_creation_input_tokens, + last_input_tokens: ctx.input_tokens, + last_output_tokens: ctx.output_tokens, + last_cache_read_input_tokens: ctx.cache_read_input_tokens, + last_cache_creation_input_tokens: ctx.cache_creation_input_tokens, + tool_call_count, + tool_call_breakdown_json, + open_threads_count, + open_reminders_count, + result_kind, + note, + } +}