hyperhive/hive-c0re/src/server.rs
müde d06b598c56 kick_agent on every rebuild + apply path
agents weren't being woken with the 'you were rebuilt — check
/state/ for notes, --continue intact' system message after
several recent rebuild surfaces:

- auto_update::rebuild_agent — used by the dashboard rebuild
  button, admin-CLI rebuild via lifecycle_action, the startup
  rev-scan, AND the new meta-input update batch loop. kick
  moves *into* rebuild_agent's success arm so all four
  paths benefit. (the dashboard's lifecycle_action extra
  closure was already firing kick — now it's a no-op for the
  rebuild path since rebuild_agent does it.)
- actions::run_apply_commit — apply-commit approve flow built
  + tagged deployed/<id> but never kicked. add kick on
  success with the more specific 'config update applied' hint.
- server.rs::HostRequest::Rebuild — the admin-CLI direct path
  calls lifecycle::rebuild bypassing rebuild_agent. add kick
  on success.

dashboard's restart / start lifecycle_action extras still
kick via their own closures since they don't route through
rebuild_agent. stop / kill / destroy intentionally don't
kick — there's nothing to wake.
2026-05-16 04:20:01 +02:00

192 lines
7.6 KiB
Rust

use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, Result};
use hive_sh4re::{HostRequest, HostResponse};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use crate::actions;
use crate::coordinator::Coordinator;
use crate::lifecycle;
pub async fn serve(socket: &Path, coord: Arc<Coordinator>) -> Result<()> {
if let Some(parent) = socket.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create socket parent {}", parent.display()))?;
}
if socket.exists() {
std::fs::remove_file(socket).context("remove stale socket")?;
}
let listener = UnixListener::bind(socket)
.with_context(|| format!("bind admin socket {}", socket.display()))?;
tracing::info!(socket = %socket.display(), hyperhive_flake = %coord.hyperhive_flake, "hive-c0re admin listening");
loop {
let (stream, _) = listener.accept().await.context("accept connection")?;
let coord = coord.clone();
tokio::spawn(async move {
if let Err(e) = handle(stream, coord).await {
tracing::warn!(error = ?e, "connection failed");
}
});
}
}
async fn handle(stream: UnixStream, coord: Arc<Coordinator>) -> Result<()> {
let (read, mut write) = stream.into_split();
let mut reader = BufReader::new(read);
let mut line = String::new();
loop {
line.clear();
let n = reader.read_line(&mut line).await?;
if n == 0 {
return Ok(());
}
let resp = match serde_json::from_str::<HostRequest>(line.trim()) {
Ok(req) => dispatch(&req, coord.clone()).await,
Err(e) => HostResponse::error(format!("parse error: {e}")),
};
let mut payload = serde_json::to_string(&resp)?;
payload.push('\n');
write.write_all(payload.as_bytes()).await?;
write.flush().await?;
}
}
#[allow(clippy::too_many_lines)]
async fn dispatch(req: &HostRequest, coord: Arc<Coordinator>) -> HostResponse {
let result: anyhow::Result<HostResponse> = async {
Ok(match req {
HostRequest::Spawn { name } => {
tracing::info!(%name, "spawn");
let agent_dir = coord.ensure_runtime(name)?;
let proposed_dir = Coordinator::agent_proposed_dir(name);
let applied_dir = Coordinator::agent_applied_dir(name);
let claude_dir = Coordinator::agent_claude_dir(name);
let notes_dir = Coordinator::agent_notes_dir(name);
match lifecycle::spawn(
name,
&coord.hyperhive_flake,
&agent_dir,
&proposed_dir,
&applied_dir,
&claude_dir,
&notes_dir,
coord.dashboard_port,
&coord.operator_pronouns,
)
.await
{
Ok(()) => {
coord.notify_manager(&hive_sh4re::HelperEvent::Spawned {
agent: name.clone(),
ok: true,
note: None,
sha: None,
});
}
Err(e) => {
// Roll back socket registration if container creation failed.
coord.unregister_agent(name);
coord.notify_manager(&hive_sh4re::HelperEvent::Spawned {
agent: name.clone(),
ok: false,
note: Some(format!("{e:#}")),
sha: None,
});
return Err(e);
}
}
HostResponse::success()
}
HostRequest::RequestSpawn { name } => {
tracing::info!(%name, "request_spawn");
let id = coord
.approvals
.submit_kind(name, hive_sh4re::ApprovalKind::Spawn, "")?;
tracing::info!(%id, %name, "spawn approval queued");
HostResponse::success()
}
HostRequest::Kill { name } => {
tracing::info!(%name, "kill");
lifecycle::kill(name).await?;
coord.unregister_agent(name);
coord.notify_manager(&hive_sh4re::HelperEvent::Killed {
agent: name.clone(),
});
HostResponse::success()
}
HostRequest::Destroy { name, purge } => {
actions::destroy(&coord, name, *purge).await?;
HostResponse::success()
}
HostRequest::Rebuild { name } => {
tracing::info!(%name, "rebuild");
let agent_dir = coord.ensure_runtime(name)?;
let applied_dir = Coordinator::agent_applied_dir(name);
let claude_dir = Coordinator::agent_claude_dir(name);
let notes_dir = Coordinator::agent_notes_dir(name);
let result = lifecycle::rebuild(
name,
&coord.hyperhive_flake,
&agent_dir,
&applied_dir,
&claude_dir,
&notes_dir,
coord.dashboard_port,
&coord.operator_pronouns,
)
.await;
// Mirror auto_update::rebuild_agent — the manager wants
// to know about every rebuild attempt regardless of
// which surface triggered it, especially failures
// (build error → manager can adjust the agent's
// agent.nix). Without this the admin-socket CLI was
// a notify-gap.
match &result {
Ok(()) => {
coord.notify_manager(&hive_sh4re::HelperEvent::Rebuilt {
agent: name.clone(),
ok: true,
note: None,
sha: None,
tag: None,
});
// Wake the agent's next turn with the
// "you were rebuilt" hint. Same pattern as
// auto_update::rebuild_agent and the dashboard
// rebuild path — this is the CLI's equivalent.
coord.kick_agent(name, "container rebuilt");
}
Err(e) => coord.notify_manager(&hive_sh4re::HelperEvent::Rebuilt {
agent: name.clone(),
ok: false,
note: Some(format!("{e:#}")),
sha: None,
tag: None,
}),
}
result?;
HostResponse::success()
}
HostRequest::List => HostResponse::list(lifecycle::list().await?),
HostRequest::Pending => HostResponse::pending(coord.approvals.pending()?),
HostRequest::Approve { id } => {
actions::approve(coord.clone(), *id).await?;
HostResponse::success()
}
HostRequest::Deny { id } => {
actions::deny(&coord, *id, None).await?;
HostResponse::success()
}
})
}
.await;
match result {
Ok(r) => r,
Err(e) => HostResponse::error(format!("{e:#}")),
}
}