hyperhive/hive-c0re/src/main.rs

297 lines
12 KiB
Rust

use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{Result, bail};
use clap::{Parser, Subcommand};
use hive_sh4re::{HostRequest, HostResponse};
mod actions;
mod agent_server;
mod approvals;
mod auto_update;
mod broker;
mod client;
mod container_view;
mod coordinator;
mod crash_watch;
mod dashboard;
mod dashboard_events;
mod events_vacuum;
mod stats_vacuum;
mod forge;
mod lifecycle;
mod limits;
mod loose_ends;
mod manager_server;
mod meta;
mod migrate;
mod operator_questions;
mod questions;
mod reminder_scheduler;
mod server;
use coordinator::Coordinator;
#[derive(Parser)]
#[command(name = "hive-c0re", about = "hyperhive coordinator daemon and CLI")]
struct Cli {
/// Path to the host admin socket.
#[arg(long, global = true, default_value = "/run/hyperhive/host.sock")]
socket: PathBuf,
#[command(subcommand)]
cmd: Cmd,
}
#[derive(Subcommand)]
enum Cmd {
/// Run the coordinator daemon.
Serve {
/// URL of the hyperhive flake. Inlined into each per-agent
/// `flake.nix` as the `hyperhive` input.
#[arg(long, default_value = "/etc/hyperhive")]
hyperhive_flake: String,
/// Path to the sqlite message store.
#[arg(long, default_value = "/var/lib/hyperhive/broker.sqlite")]
db: PathBuf,
/// Dashboard HTTP port.
#[arg(long, default_value_t = 7000)]
dashboard_port: u16,
/// Operator pronouns (free text). Threaded into each
/// container's harness via `HIVE_OPERATOR_PRONOUNS` so the
/// system prompt can mention them. Default: `she/her`.
#[arg(long, default_value = "she/her")]
operator_pronouns: String,
},
/// Spawn a new agent container directly (`hive-agent-<name>`). Bypasses
/// the approval queue — use only as an operator on the host. For
/// approval-gated spawns, use `request-spawn` instead.
Spawn { name: String },
/// Queue a spawn request as an approval. The container is created on
/// `approve <id>` (CLI) or the dashboard's APPR0VE button.
RequestSpawn { name: String },
/// Stop a managed container (graceful).
Kill { name: String },
/// Tear down a sub-agent container. Container is removed; persistent
/// state (config repos + Claude credentials) is kept by default. Pass
/// `--purge` to also wipe the agent's state dirs (config + creds +
/// notes). No undo.
Destroy {
name: String,
#[arg(long)]
purge: bool,
},
/// Apply pending config to a managed container.
Rebuild { name: String },
/// List managed containers.
List,
/// List pending approval requests submitted by the manager.
Pending,
/// Approve a pending request by id; the action runs immediately.
Approve { id: i64 },
/// Deny a pending request by id.
Deny { id: i64 },
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
let cli = Cli::parse();
match cli.cmd {
Cmd::Serve {
hyperhive_flake,
db,
dashboard_port,
operator_pronouns,
} => {
let coord = Arc::new(Coordinator::open(
&db,
hyperhive_flake,
dashboard_port,
operator_pronouns,
)?);
manager_server::start(coord.clone())?;
// Idempotent pre-flight: rewrite pre-meta-layout applied
// repos, ensure proposed repos carry the `applied`
// remote, bootstrap the meta repo, repoint containers at
// `meta#<name>` (one-shot, guarded by a marker file).
// Runs before manager auto-spawn so the new manager is
// built against meta from the first attempt.
if let Err(e) = migrate::run(&coord).await {
tracing::warn!(error = ?e, "startup migration failed");
}
// Auto-create the manager container if it isn't there yet. Block
// on this — without hm1nd the system has no manager harness.
// Failures are logged but allowed: a broken auto-spawn shouldn't
// make the dashboard unreachable for debugging.
if let Err(e) = auto_update::ensure_manager(&coord).await {
tracing::warn!(error = ?e, "auto-spawn manager failed");
}
// Auto-update in the background — don't block service start.
// Sub-agent rebuilds can take tens of seconds; we want the admin
// socket up immediately.
let update_coord = coord.clone();
tokio::spawn(async move {
if let Err(e) = auto_update::run(update_coord).await {
tracing::warn!(error = ?e, "auto-update task failed");
}
});
// Forge user sweep: ensure every existing container has a
// forgejo user + access token. No-op when the hive-forge
// container isn't running. Backgrounded — touches the
// forge state dir via `nixos-container run` which is slow.
tokio::spawn(async move {
forge::ensure_all().await;
});
// Periodic broker vacuum: drop fully-acked messages older
// than 30 days. Delivered-but-unacked rows (recoverable via
// requeue_inflight) and undelivered rows are always kept.
// Runs hourly; first sweep happens immediately.
let vacuum_coord = coord.clone();
let mut vacuum_shutdown = coord.shutdown_rx();
tokio::spawn(async move {
let interval = std::time::Duration::from_secs(3600);
let keep_secs: i64 = 30 * 24 * 3600;
loop {
match vacuum_coord.broker.vacuum_delivered(keep_secs) {
Ok(0) => {}
Ok(n) => tracing::info!(removed = n, "broker vacuum"),
Err(e) => tracing::warn!(error = ?e, "broker vacuum failed"),
}
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = vacuum_shutdown.changed() => {
tracing::info!("broker vacuum: shutdown signal received");
break;
}
}
}
});
// Per-agent events.sqlite vacuum: host-side so the harness
// doesn't need any retention wiring of its own.
events_vacuum::spawn(coord.clone());
// Per-agent turn-stats.sqlite vacuum: same pattern, 90-day
// retention so trend analysis has enough history.
stats_vacuum::spawn(coord.clone());
// Container crash watcher: emits HelperEvent::ContainerCrash
// when a previously-running container goes away without an
// operator-initiated transient state.
crash_watch::spawn(coord.clone());
// Reminder scheduler: drains due reminders + handles
// file_path payload persistence. See reminder_scheduler.rs.
reminder_scheduler::spawn(coord.clone());
// Forward every broker event onto the unified dashboard
// channel with a freshly-stamped seq, so the dashboard SSE
// sees broker messages + future mutation events on one
// stream with one monotonic seq. The broker's intra-process
// channel (used by `recv_blocking_batch`) stays untouched.
spawn_broker_to_dashboard_forwarder(coord.clone());
let dash_coord = coord.clone();
tokio::spawn(async move {
if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {
tracing::error!(error = ?e, "dashboard failed");
}
});
// Run the admin socket until a signal arrives; then signal
// all background tasks so they exit cleanly before the
// process terminates.
let coord_sig = coord.clone();
tokio::select! {
res = server::serve(&cli.socket, coord) => { res? }
_ = tokio::signal::ctrl_c() => {
tracing::info!("SIGINT received — requesting shutdown");
coord_sig.request_shutdown();
}
_ = async {
let mut sig = tokio::signal::unix::signal(
tokio::signal::unix::SignalKind::terminate()
).expect("failed to install SIGTERM handler");
sig.recv().await;
} => {
tracing::info!("SIGTERM received — requesting shutdown");
coord_sig.request_shutdown();
}
}
Ok(())
}
Cmd::Spawn { name } => {
render(client::request(&cli.socket, HostRequest::Spawn { name }).await?)
}
Cmd::RequestSpawn { name } => {
render(client::request(&cli.socket, HostRequest::RequestSpawn { name }).await?)
}
Cmd::Kill { name } => {
render(client::request(&cli.socket, HostRequest::Kill { name }).await?)
}
Cmd::Destroy { name, purge } => {
render(client::request(&cli.socket, HostRequest::Destroy { name, purge }).await?)
}
Cmd::Rebuild { name } => {
render(client::request(&cli.socket, HostRequest::Rebuild { name }).await?)
}
Cmd::List => render(client::request(&cli.socket, HostRequest::List).await?),
Cmd::Pending => render(client::request(&cli.socket, HostRequest::Pending).await?),
Cmd::Approve { id } => {
render(client::request(&cli.socket, HostRequest::Approve { id }).await?)
}
Cmd::Deny { id } => render(client::request(&cli.socket, HostRequest::Deny { id }).await?),
}
}
/// Re-emit every broker `MessageEvent` onto the dashboard channel as
/// a `DashboardEvent::Sent` / `Delivered` with a freshly-stamped seq.
/// Background task; runs for the life of the process. On a lagged
/// broker subscription we just keep going — the dashboard channel is
/// best-effort presentation plumbing, the broker keeps its own sqlite
/// log for replay.
fn spawn_broker_to_dashboard_forwarder(coord: Arc<Coordinator>) {
use broker::MessageEvent;
use dashboard_events::DashboardEvent;
let mut rx = coord.broker.subscribe();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(MessageEvent::Sent { from, to, body, at }) => {
let file_refs = dashboard::scan_validated_paths(&body);
coord.emit_dashboard_event(DashboardEvent::Sent {
seq: coord.next_seq(),
from,
to,
body,
at,
file_refs,
});
}
Ok(MessageEvent::Delivered { from, to, body, at }) => {
let file_refs = dashboard::scan_validated_paths(&body);
coord.emit_dashboard_event(DashboardEvent::Delivered {
seq: coord.next_seq(),
from,
to,
body,
at,
file_refs,
});
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(skipped = n, "broker-to-dashboard forwarder lagged");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
}
fn render(resp: HostResponse) -> Result<()> {
println!("{}", serde_json::to_string_pretty(&resp)?);
if !resp.ok {
bail!(resp.error.unwrap_or_else(|| "request failed".to_owned()));
}
Ok(())
}