324 lines
12 KiB
Rust
324 lines
12 KiB
Rust
//! Manager harness. Talks to the manager socket (bind-mounted from the host
|
|
//! at `/run/hive/mcp.sock` inside the `hm1nd` container). Two surfaces:
|
|
//! `serve` (long-lived turn loop) and `mcp` (stdio MCP server claude spawns).
|
|
|
|
use std::path::{Path, PathBuf};
|
|
use std::sync::{Arc, Mutex};
|
|
use std::time::Duration;
|
|
|
|
use hive_ag3nt::web_ui::TurnLock;
|
|
|
|
use anyhow::Result;
|
|
use clap::{Parser, Subcommand};
|
|
use hive_ag3nt::events::{Bus, LiveEvent, TurnState};
|
|
use hive_ag3nt::login::{self, LoginState};
|
|
use hive_ag3nt::turn_stats::{TurnStatRow, TurnStats};
|
|
use hive_ag3nt::{DEFAULT_SOCKET, DEFAULT_WEB_PORT, client, mcp, plugins, turn, web_ui};
|
|
use hive_sh4re::{HelperEvent, ManagerRequest, ManagerResponse, SYSTEM_SENDER};
|
|
|
|
#[derive(Parser)]
|
|
#[command(name = "hive-m1nd", about = "hyperhive manager harness")]
|
|
struct Cli {
|
|
/// Path to the manager MCP socket (bind-mounted from the host).
|
|
#[arg(long, global = true, default_value = DEFAULT_SOCKET)]
|
|
socket: PathBuf,
|
|
|
|
#[command(subcommand)]
|
|
cmd: Cmd,
|
|
}
|
|
|
|
#[derive(Subcommand)]
|
|
enum Cmd {
|
|
/// Long-lived loop polling the manager inbox.
|
|
Serve {
|
|
#[arg(long, default_value_t = 1000)]
|
|
poll_ms: u64,
|
|
},
|
|
/// Run the manager MCP server on stdio. Spawned by claude via
|
|
/// `--mcp-config`; same shape as `hive-ag3nt mcp` but with the
|
|
/// manager tool surface (`request_spawn`, `kill`, `start`, `restart`,
|
|
/// `request_apply_commit`, `ask`, `answer`, `remind`).
|
|
Mcp,
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<()> {
|
|
tracing_subscriber::fmt()
|
|
.with_env_filter(
|
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
|
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
|
|
)
|
|
.init();
|
|
|
|
let cli = Cli::parse();
|
|
match cli.cmd {
|
|
Cmd::Serve { poll_ms } => {
|
|
let port = std::env::var("HIVE_PORT")
|
|
.ok()
|
|
.and_then(|s| s.parse::<u16>().ok())
|
|
.unwrap_or(DEFAULT_WEB_PORT);
|
|
let label = std::env::var("HIVE_LABEL").unwrap_or_else(|_| "hm1nd".into());
|
|
let claude_dir = login::default_dir();
|
|
let initial = LoginState::from_dir(&claude_dir);
|
|
tracing::info!(state = ?initial, claude_dir = %claude_dir.display(), "hm1nd boot");
|
|
let login_state = Arc::new(Mutex::new(initial));
|
|
let bus = Bus::new();
|
|
let stats = TurnStats::open_default();
|
|
if let Some(s) = &stats
|
|
&& let Some(u) = s.last_usage()
|
|
{
|
|
bus.seed_usage(u);
|
|
}
|
|
let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Manager).await?;
|
|
let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(()));
|
|
plugins::install_configured(&cli.socket, None).await;
|
|
tokio::spawn(web_ui::serve(
|
|
label,
|
|
port,
|
|
login_state.clone(),
|
|
bus.clone(),
|
|
cli.socket.clone(),
|
|
files.clone(),
|
|
turn_lock.clone(),
|
|
));
|
|
match initial {
|
|
LoginState::Online => {
|
|
serve(
|
|
&cli.socket,
|
|
Duration::from_millis(poll_ms),
|
|
bus,
|
|
stats,
|
|
&files,
|
|
turn_lock,
|
|
)
|
|
.await
|
|
}
|
|
LoginState::NeedsLogin => {
|
|
turn::wait_for_login(&claude_dir, login_state, &bus, poll_ms).await;
|
|
serve(
|
|
&cli.socket,
|
|
Duration::from_millis(poll_ms),
|
|
bus,
|
|
stats,
|
|
&files,
|
|
turn_lock,
|
|
)
|
|
.await
|
|
}
|
|
}
|
|
}
|
|
Cmd::Mcp => mcp::serve_manager_stdio(cli.socket).await,
|
|
}
|
|
}
|
|
|
|
async fn serve(
|
|
socket: &Path,
|
|
interval: Duration,
|
|
bus: Bus,
|
|
stats: Option<TurnStats>,
|
|
files: &turn::TurnFiles,
|
|
turn_lock: TurnLock,
|
|
) -> Result<()> {
|
|
tracing::info!(socket = %socket.display(), "hive-m1nd serve");
|
|
loop {
|
|
let recv: Result<ManagerResponse> =
|
|
// Explicit long-poll: see hive-ag3nt's serve loop for the
|
|
// rationale — recv now defaults to peek when wait_seconds
|
|
// is None.
|
|
client::request(
|
|
socket,
|
|
&ManagerRequest::Recv {
|
|
wait_seconds: Some(180),
|
|
},
|
|
)
|
|
.await;
|
|
match recv {
|
|
Ok(ManagerResponse::Message { from, body }) => {
|
|
if from == SYSTEM_SENDER {
|
|
// Helper events (ApprovalResolved / Spawned / Rebuilt /
|
|
// Killed / Destroyed) — these are FYI for the manager;
|
|
// we surface them in the live view and forward them as
|
|
// a normal claude turn so the manager can react (e.g.
|
|
// greet a newly-spawned agent, retry a failed rebuild).
|
|
let parsed = serde_json::from_str::<HelperEvent>(&body).ok();
|
|
if let Some(event) = parsed {
|
|
tracing::info!(?event, "helper event");
|
|
} else {
|
|
tracing::info!(%from, %body, "system message");
|
|
}
|
|
bus.emit(LiveEvent::Note {
|
|
text: format!("[system] {body}"),
|
|
});
|
|
// Fall through: drive a turn with the event in the wake
|
|
// prompt body so claude sees it. Sender stays "system"
|
|
// so the wake prompt can label it as such.
|
|
}
|
|
tracing::info!(%from, %body, "manager inbox");
|
|
let unread = inbox_unread(socket).await;
|
|
bus.emit(LiveEvent::TurnStart {
|
|
from: from.clone(),
|
|
body: body.clone(),
|
|
unread,
|
|
});
|
|
let prompt = format_wake_prompt(&from, &body, unread);
|
|
bus.set_state(TurnState::Thinking);
|
|
let started_at = now_unix();
|
|
let started_instant = std::time::Instant::now();
|
|
let model_at_start = bus.model();
|
|
let outcome = {
|
|
let _guard = turn_lock.lock().await;
|
|
turn::drive_turn(&prompt, files, &bus).await
|
|
};
|
|
turn::emit_turn_end(&bus, &outcome);
|
|
bus.set_state(TurnState::Idle);
|
|
if let Some(s) = &stats {
|
|
let ended_at = now_unix();
|
|
let duration_ms =
|
|
i64::try_from(started_instant.elapsed().as_millis()).unwrap_or(i64::MAX);
|
|
let (open_threads, open_reminders) =
|
|
fetch_manager_post_turn_counts(socket).await;
|
|
let row = build_row(
|
|
started_at,
|
|
ended_at,
|
|
duration_ms,
|
|
model_at_start,
|
|
from.clone(),
|
|
&outcome,
|
|
&bus,
|
|
open_threads,
|
|
open_reminders,
|
|
);
|
|
s.record(&row);
|
|
}
|
|
// Check for messages that arrived during the turn and loop
|
|
// immediately if any are waiting — mirrors hive-ag3nt behaviour.
|
|
let pending = inbox_unread(socket).await;
|
|
if pending > 0 {
|
|
tracing::info!(%pending, "pending messages after turn; fetching next");
|
|
continue;
|
|
}
|
|
}
|
|
Ok(ManagerResponse::Empty) => {
|
|
// Idle: sleep briefly before next long-poll attempt.
|
|
tokio::time::sleep(interval).await;
|
|
}
|
|
Ok(
|
|
ManagerResponse::Ok
|
|
| ManagerResponse::Status { .. }
|
|
| ManagerResponse::QuestionQueued { .. }
|
|
| ManagerResponse::Recent { .. }
|
|
| ManagerResponse::Logs { .. }
|
|
| ManagerResponse::LooseEnds { .. }
|
|
| ManagerResponse::PendingRemindersCount { .. }
|
|
| ManagerResponse::Whoami { .. },
|
|
) => {
|
|
tracing::warn!("recv produced unexpected response kind");
|
|
}
|
|
Ok(ManagerResponse::Err { message }) => {
|
|
tracing::warn!(%message, "recv error");
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!(error = ?e, "recv failed; retrying");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Per-turn user prompt. The role/tools/etc. is in the system prompt
|
|
/// (`prompts/manager.md` → `claude --system-prompt-file`); this is just
|
|
/// the wake signal. `unread` is the inbox depth after this message was
|
|
/// popped.
|
|
fn format_wake_prompt(from: &str, body: &str, unread: u64) -> String {
|
|
let pending = if unread == 0 {
|
|
String::new()
|
|
} else {
|
|
format!(
|
|
"\n\n({unread} more message(s) pending in your inbox — drain via `mcp__hyperhive__recv` if relevant.)"
|
|
)
|
|
};
|
|
format!("Incoming message from `{from}`:\n---\n{body}\n---{pending}")
|
|
}
|
|
|
|
async fn inbox_unread(socket: &Path) -> u64 {
|
|
match client::request::<_, ManagerResponse>(socket, &ManagerRequest::Status).await {
|
|
Ok(ManagerResponse::Status { unread }) => unread,
|
|
_ => 0,
|
|
}
|
|
}
|
|
|
|
fn now_unix() -> i64 {
|
|
std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.ok()
|
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
|
.unwrap_or(0)
|
|
}
|
|
|
|
/// Manager-flavour equivalent of the agent helper. Mirror shape, just
|
|
/// uses ManagerRequest/ManagerResponse instead of the agent variants.
|
|
async fn fetch_manager_post_turn_counts(socket: &Path) -> (Option<u64>, Option<u64>) {
|
|
let threads = match client::request::<_, ManagerResponse>(
|
|
socket,
|
|
&ManagerRequest::GetLooseEnds,
|
|
)
|
|
.await
|
|
{
|
|
Ok(ManagerResponse::LooseEnds { loose_ends }) => u64::try_from(loose_ends.len()).ok(),
|
|
_ => None,
|
|
};
|
|
let reminders = match client::request::<_, ManagerResponse>(
|
|
socket,
|
|
&ManagerRequest::CountPendingReminders,
|
|
)
|
|
.await
|
|
{
|
|
Ok(ManagerResponse::PendingRemindersCount { count }) => Some(count),
|
|
_ => None,
|
|
};
|
|
(threads, reminders)
|
|
}
|
|
|
|
/// Manager flavour of the agent's build_row helper. Duplicated rather
|
|
/// than shared to keep each bin self-contained at this size.
|
|
fn build_row(
|
|
started_at: i64,
|
|
ended_at: i64,
|
|
duration_ms: i64,
|
|
model: String,
|
|
wake_from: String,
|
|
outcome: &turn::TurnOutcome,
|
|
bus: &Bus,
|
|
open_threads_count: Option<u64>,
|
|
open_reminders_count: Option<u64>,
|
|
) -> TurnStatRow {
|
|
let usage = bus.last_usage().unwrap_or_default();
|
|
let tool_calls = bus.take_tool_calls();
|
|
let tool_call_count: u64 = tool_calls.values().copied().sum();
|
|
let tool_call_breakdown_json = if tool_calls.is_empty() {
|
|
None
|
|
} else {
|
|
serde_json::to_string(&tool_calls).ok()
|
|
};
|
|
let (result_kind, note) = match outcome {
|
|
turn::TurnOutcome::Ok => ("ok", None),
|
|
turn::TurnOutcome::PromptTooLong => ("prompt_too_long", None),
|
|
turn::TurnOutcome::Failed(e) => ("failed", Some(format!("{e:#}"))),
|
|
};
|
|
TurnStatRow {
|
|
started_at,
|
|
ended_at,
|
|
duration_ms,
|
|
model,
|
|
wake_from,
|
|
input_tokens: usage.input_tokens,
|
|
output_tokens: usage.output_tokens,
|
|
cache_read_input_tokens: usage.cache_read_input_tokens,
|
|
cache_creation_input_tokens: usage.cache_creation_input_tokens,
|
|
tool_call_count,
|
|
tool_call_breakdown_json,
|
|
open_threads_count,
|
|
open_reminders_count,
|
|
result_kind,
|
|
note,
|
|
}
|
|
}
|