refactor: extract format_wake_prompt/now_unix/build_row into serve_common (closes #169)

This commit is contained in:
damocles 2026-05-21 18:44:53 +02:00 committed by Mara
parent dfee0574a5
commit e28b0a1dab
4 changed files with 103 additions and 172 deletions

View file

@ -8,8 +8,8 @@ use anyhow::Result;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use hive_ag3nt::events::{Bus, LiveEvent, TurnState}; use hive_ag3nt::events::{Bus, LiveEvent, TurnState};
use hive_ag3nt::login::{self, LoginState}; use hive_ag3nt::login::{self, LoginState};
use hive_ag3nt::turn_stats::{TurnStatRow, TurnStats}; use hive_ag3nt::turn_stats::TurnStats;
use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui}; use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, serve_common, turn, web_ui};
use hive_sh4re::{AgentRequest, AgentResponse}; use hive_sh4re::{AgentRequest, AgentResponse};
#[derive(Parser)] #[derive(Parser)]
@ -83,7 +83,7 @@ async fn main() -> Result<()> {
let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Agent).await?; let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Agent).await?;
let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(())); let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(()));
plugins::install_configured(&cli.socket, Some("manager")).await; plugins::install_configured(&cli.socket, Some("manager")).await;
tokio::spawn(hive_ag3nt::forge_notify::run(cli.socket.clone())); tokio::spawn(hive_ag3nt::forge_notify::run(cli.socket.clone(), false));
tokio::spawn(web_ui::serve( tokio::spawn(web_ui::serve(
label.clone(), label.clone(),
port, port,
@ -202,10 +202,10 @@ async fn serve(
unread, unread,
}); });
bus.set_state(TurnState::Thinking); bus.set_state(TurnState::Thinking);
let started_at = now_unix(); let started_at = serve_common::now_unix();
let started_instant = std::time::Instant::now(); let started_instant = std::time::Instant::now();
let model_at_start = bus.model(); let model_at_start = bus.model();
let prompt = format_wake_prompt(&from, &body, unread, redelivered); let prompt = serve_common::format_wake_prompt(&from, &body, unread, redelivered);
let outcome = { let outcome = {
let _guard = turn_lock.lock().await; let _guard = turn_lock.lock().await;
turn::drive_turn(&prompt, files, &bus).await turn::drive_turn(&prompt, files, &bus).await
@ -247,11 +247,11 @@ async fn serve(
notify_manager_of_failure(socket, label, e).await; notify_manager_of_failure(socket, label, e).await;
} }
if let Some(s) = &stats { if let Some(s) = &stats {
let ended_at = now_unix(); let ended_at = serve_common::now_unix();
let duration_ms = let duration_ms =
i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX); i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX);
let (open_threads, open_reminders) = fetch_agent_post_turn_counts(socket).await; let (open_threads, open_reminders) = fetch_agent_post_turn_counts(socket).await;
let row = build_row( let row = serve_common::build_row(
started_at, started_at,
ended_at, ended_at,
duration_ms, duration_ms,
@ -310,22 +310,6 @@ async fn serve(
/// session, never acked, and resurfaced after a restart — a banner /// session, never acked, and resurfaced after a restart — a banner
/// at the top of the wake prompt warns that any side-effects of /// at the top of the wake prompt warns that any side-effects of
/// previous handling may already have happened. /// previous handling may already have happened.
fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String {
let banner = if redelivered {
hive_ag3nt::mcp::REDELIVERY_HINT
} else {
""
};
let pending = if unread == 0 {
String::new()
} else {
format!(
"\n\n({unread} more message(s) pending in your inbox — call `mcp__hyperhive__recv` \
with `max: {unread}` to drain them all in one round-trip before acting.)"
)
};
format!("{banner}Incoming message from `{from}`:\n---\n{body}\n---{pending}")
}
/// Best-effort: tell the broker every message we popped during the /// Best-effort: tell the broker every message we popped during the
/// turn is now fully handled (turn-end-OK). Swallows transport /// turn is now fully handled (turn-end-OK). Swallows transport
@ -397,13 +381,6 @@ async fn inbox_unread(socket: &Path) -> u64 {
} }
} }
fn now_unix() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0)
}
/// Best-effort: ask hive-c0re for this agent's open thread count + pending /// Best-effort: ask hive-c0re for this agent's open thread count + pending
/// reminder count, after the turn finishes. Either roundtrip can fail /// reminder count, after the turn finishes. Either roundtrip can fail
@ -431,57 +408,3 @@ async fn fetch_agent_post_turn_counts(socket: &Path) -> (Option<u64>, Option<u64
(threads, reminders) (threads, reminders)
} }
/// Assemble a `TurnStatRow` from the harness's per-turn state. Shared
/// shape between the agent + manager bin loops (each lives in its own
/// crate root so this helper is duplicated; the savings of a shared
/// module aren't worth the cross-crate ceremony at this size).
#[allow(clippy::too_many_arguments)]
fn build_row(
started_at: i64,
ended_at: i64,
duration_ms: i64,
model: String,
wake_from: String,
outcome: &turn::TurnOutcome,
bus: &Bus,
open_threads_count: Option<u64>,
open_reminders_count: Option<u64>,
) -> TurnStatRow {
let cost = bus.last_cost_usage().unwrap_or_default();
let ctx = bus.last_ctx_usage().unwrap_or(cost);
let tool_calls = bus.take_tool_calls();
let tool_call_count: u64 = tool_calls.values().copied().sum();
let tool_call_breakdown_json = if tool_calls.is_empty() {
None
} else {
serde_json::to_string(&tool_calls).ok()
};
let (result_kind, note) = match outcome {
turn::TurnOutcome::Ok => ("ok", None),
turn::TurnOutcome::Compacted => ("compacted", None),
turn::TurnOutcome::PromptTooLong => ("prompt_too_long", None),
turn::TurnOutcome::RateLimited => ("rate_limited", None),
turn::TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))),
};
TurnStatRow {
started_at,
ended_at,
duration_ms,
model,
wake_from,
input_tokens: cost.input_tokens,
output_tokens: cost.output_tokens,
cache_read_input_tokens: cost.cache_read_input_tokens,
cache_creation_input_tokens: cost.cache_creation_input_tokens,
last_input_tokens: ctx.input_tokens,
last_output_tokens: ctx.output_tokens,
last_cache_read_input_tokens: ctx.cache_read_input_tokens,
last_cache_creation_input_tokens: ctx.cache_creation_input_tokens,
tool_call_count,
tool_call_breakdown_json,
open_threads_count,
open_reminders_count,
result_kind,
note,
}
}

View file

@ -12,8 +12,8 @@ use anyhow::Result;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use hive_ag3nt::events::{Bus, LiveEvent, TurnState}; use hive_ag3nt::events::{Bus, LiveEvent, TurnState};
use hive_ag3nt::login::{self, LoginState}; use hive_ag3nt::login::{self, LoginState};
use hive_ag3nt::turn_stats::{TurnStatRow, TurnStats}; use hive_ag3nt::turn_stats::TurnStats;
use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui}; use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, serve_common, turn, web_ui};
use hive_sh4re::{HelperEvent, ManagerRequest, ManagerResponse, SYSTEM_SENDER}; use hive_sh4re::{HelperEvent, ManagerRequest, ManagerResponse, SYSTEM_SENDER};
#[derive(Parser)] #[derive(Parser)]
@ -172,9 +172,9 @@ async fn serve(
body: body.clone(), body: body.clone(),
unread, unread,
}); });
let prompt = format_wake_prompt(&from, &body, unread, redelivered); let prompt = serve_common::format_wake_prompt(&from, &body, unread, redelivered);
bus.set_state(TurnState::Thinking); bus.set_state(TurnState::Thinking);
let started_at = now_unix(); let started_at = serve_common::now_unix();
let started_instant = std::time::Instant::now(); let started_instant = std::time::Instant::now();
let model_at_start = bus.model(); let model_at_start = bus.model();
let outcome = { let outcome = {
@ -205,12 +205,12 @@ async fn serve(
bus.emit_status("online"); bus.emit_status("online");
} }
if let Some(s) = &stats { if let Some(s) = &stats {
let ended_at = now_unix(); let ended_at = serve_common::now_unix();
let duration_ms = let duration_ms =
i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX); i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX);
let (open_threads, open_reminders) = let (open_threads, open_reminders) =
fetch_manager_post_turn_counts(socket).await; fetch_manager_post_turn_counts(socket).await;
let row = build_row( let row = serve_common::build_row(
started_at, started_at,
ended_at, ended_at,
duration_ms, duration_ms,
@ -259,28 +259,6 @@ async fn serve(
} }
} }
/// Per-turn user prompt. The role/tools/etc. is in the system prompt
/// (`prompts/manager.md` → `claude --system-prompt-file`); this is just
/// the wake signal. `unread` is the inbox depth after this message was
/// popped. `redelivered` adds a "may already be handled" banner above
/// the wake body when the broker resurfaced this row (see hive-ag3nt's
/// `format_wake_prompt` for the full story).
fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String {
let banner = if redelivered {
hive_ag3nt::mcp::REDELIVERY_HINT
} else {
""
};
let pending = if unread == 0 {
String::new()
} else {
format!(
"\n\n({unread} more message(s) pending in your inbox — call `mcp__hyperhive__recv` \
with `max: {unread}` to drain them all in one round-trip before acting.)"
)
};
format!("{banner}Incoming message from `{from}`:\n---\n{body}\n---{pending}")
}
/// Best-effort: tell the broker every message popped during the turn /// Best-effort: tell the broker every message popped during the turn
/// is now handled. Mirror of `hive-ag3nt::ack_turn` on the manager /// is now handled. Mirror of `hive-ag3nt::ack_turn` on the manager
@ -321,14 +299,6 @@ async fn inbox_unread(socket: &Path) -> u64 {
} }
} }
fn now_unix() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0)
}
/// Manager-flavour equivalent of the agent helper. Mirror shape, just /// Manager-flavour equivalent of the agent helper. Mirror shape, just
/// uses ManagerRequest/ManagerResponse instead of the agent variants. /// uses ManagerRequest/ManagerResponse instead of the agent variants.
async fn fetch_manager_post_turn_counts(socket: &Path) -> (Option<u64>, Option<u64>) { async fn fetch_manager_post_turn_counts(socket: &Path) -> (Option<u64>, Option<u64>) {
@ -353,55 +323,3 @@ async fn fetch_manager_post_turn_counts(socket: &Path) -> (Option<u64>, Option<u
(threads, reminders) (threads, reminders)
} }
/// Manager flavour of the agent's `build_row` helper. Duplicated rather
/// than shared to keep each bin self-contained at this size.
#[allow(clippy::too_many_arguments)]
fn build_row(
started_at: i64,
ended_at: i64,
duration_ms: i64,
model: String,
wake_from: String,
outcome: &turn::TurnOutcome,
bus: &Bus,
open_threads_count: Option<u64>,
open_reminders_count: Option<u64>,
) -> TurnStatRow {
let cost = bus.last_cost_usage().unwrap_or_default();
let ctx = bus.last_ctx_usage().unwrap_or(cost);
let tool_calls = bus.take_tool_calls();
let tool_call_count: u64 = tool_calls.values().copied().sum();
let tool_call_breakdown_json = if tool_calls.is_empty() {
None
} else {
serde_json::to_string(&tool_calls).ok()
};
let (result_kind, note) = match outcome {
turn::TurnOutcome::Ok => ("ok", None),
turn::TurnOutcome::Compacted => ("compacted", None),
turn::TurnOutcome::PromptTooLong => ("prompt_too_long", None),
turn::TurnOutcome::RateLimited => ("rate_limited", None),
turn::TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))),
};
TurnStatRow {
started_at,
ended_at,
duration_ms,
model,
wake_from,
input_tokens: cost.input_tokens,
output_tokens: cost.output_tokens,
cache_read_input_tokens: cost.cache_read_input_tokens,
cache_creation_input_tokens: cost.cache_creation_input_tokens,
last_input_tokens: ctx.input_tokens,
last_output_tokens: ctx.output_tokens,
last_cache_read_input_tokens: ctx.cache_read_input_tokens,
last_cache_creation_input_tokens: ctx.cache_creation_input_tokens,
tool_call_count,
tool_call_breakdown_json,
open_threads_count,
open_reminders_count,
result_kind,
note,
}
}

View file

@ -9,6 +9,7 @@ pub mod login_session;
pub mod mcp; pub mod mcp;
pub mod paths; pub mod paths;
pub mod plugins; pub mod plugins;
pub mod serve_common;
pub mod stats; pub mod stats;
pub mod turn; pub mod turn;
pub mod turn_stats; pub mod turn_stats;

View file

@ -0,0 +1,89 @@
//! Helpers shared between `hive-ag3nt` (agent) and `hive-m1nd` (manager)
//! serve loops. Only pure functions with no wire-type dependency live here;
//! request/response-flavored helpers (`requeue_inflight`, `ack_turn`, etc.)
//! stay in each binary because they use different request enum variants.
use crate::events::Bus;
use crate::mcp::REDELIVERY_HINT;
use crate::turn::TurnOutcome;
use crate::turn_stats::TurnStatRow;
/// Assemble the per-turn wake prompt string. The role/tools/etc. live in the
/// system prompt; this is just the wake signal body. `unread` is the inbox
/// depth after this message was popped. `redelivered` prepends a "may already
/// be handled" banner.
pub fn format_wake_prompt(from: &str, body: &str, unread: u64, redelivered: bool) -> String {
let banner = if redelivered { REDELIVERY_HINT } else { "" };
let pending = if unread == 0 {
String::new()
} else {
format!(
"\n\n({unread} more message(s) pending in your inbox — call `mcp__hyperhive__recv` \
with `max: {unread}` to drain them all in one round-trip before acting.)"
)
};
format!("{banner}Incoming message from `{from}`:\n---\n{body}\n---{pending}")
}
/// Current time as a Unix timestamp (seconds). Returns 0 on any error.
pub fn now_unix() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0)
}
/// Assemble a `TurnStatRow` from the harness's per-turn state. Used by both
/// the agent and manager serve loops — the shape is identical, only the
/// post-turn count fetch helpers differ (and those stay in each binary).
#[allow(clippy::too_many_arguments)]
pub fn build_row(
started_at: i64,
ended_at: i64,
duration_ms: i64,
model: String,
wake_from: String,
outcome: &TurnOutcome,
bus: &Bus,
open_threads_count: Option<u64>,
open_reminders_count: Option<u64>,
) -> TurnStatRow {
let cost = bus.last_cost_usage().unwrap_or_default();
let ctx = bus.last_ctx_usage().unwrap_or(cost);
let tool_calls = bus.take_tool_calls();
let tool_call_count: u64 = tool_calls.values().copied().sum();
let tool_call_breakdown_json = if tool_calls.is_empty() {
None
} else {
serde_json::to_string(&tool_calls).ok()
};
let (result_kind, note) = match outcome {
TurnOutcome::Ok => ("ok", None),
TurnOutcome::Compacted => ("compacted", None),
TurnOutcome::PromptTooLong => ("prompt_too_long", None),
TurnOutcome::RateLimited => ("rate_limited", None),
TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))),
};
TurnStatRow {
started_at,
ended_at,
duration_ms,
model,
wake_from,
input_tokens: cost.input_tokens,
output_tokens: cost.output_tokens,
cache_read_input_tokens: cost.cache_read_input_tokens,
cache_creation_input_tokens: cost.cache_creation_input_tokens,
last_input_tokens: ctx.input_tokens,
last_output_tokens: ctx.output_tokens,
last_cache_read_input_tokens: ctx.cache_read_input_tokens,
last_cache_creation_input_tokens: ctx.cache_creation_input_tokens,
tool_call_count,
tool_call_breakdown_json,
open_threads_count,
open_reminders_count,
result_kind,
note,
}
}