From 0ec54ecf898f20c4ea773e94aed07059cfb03d09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?m=C3=BCde?= Date: Thu, 14 May 2026 20:49:11 +0200 Subject: [PATCH] hive-c0re: admin socket server + client (stub dispatch) --- hive-c0re/src/client.rs | 27 ++++++++++++++++ hive-c0re/src/main.rs | 50 ++++++++++++++--------------- hive-c0re/src/server.rs | 70 +++++++++++++++++++++++++++++++++++++++++ hive-sh4re/Cargo.toml | 3 ++ hive-sh4re/src/lib.rs | 53 +++++++++++++++++++++++++++++++ 5 files changed, 177 insertions(+), 26 deletions(-) create mode 100644 hive-c0re/src/client.rs create mode 100644 hive-c0re/src/server.rs diff --git a/hive-c0re/src/client.rs b/hive-c0re/src/client.rs new file mode 100644 index 0000000..1486636 --- /dev/null +++ b/hive-c0re/src/client.rs @@ -0,0 +1,27 @@ +use std::path::Path; + +use anyhow::{Context, Result, bail}; +use hive_sh4re::{HostRequest, HostResponse}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixStream; + +pub async fn request(socket: &Path, req: HostRequest) -> Result { + let stream = UnixStream::connect(socket) + .await + .with_context(|| format!("connect to {}", socket.display()))?; + let (read, mut write) = stream.into_split(); + + let mut payload = serde_json::to_string(&req)?; + payload.push('\n'); + write.write_all(payload.as_bytes()).await?; + write.flush().await?; + + let mut reader = BufReader::new(read); + let mut line = String::new(); + reader.read_line(&mut line).await?; + if line.is_empty() { + bail!("server closed connection without responding"); + } + let resp: HostResponse = serde_json::from_str(line.trim())?; + Ok(resp) +} diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 24f4ac8..0f13e81 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -2,10 +2,18 @@ use std::path::PathBuf; use anyhow::{Result, bail}; use clap::{Parser, Subcommand}; +use hive_sh4re::{HostRequest, HostResponse}; + +mod client; +mod server; #[derive(Parser)] #[command(name = "hive-c0re", about = "hyperhive coordinator daemon and CLI")] struct Cli { + /// Path to the host admin socket. + #[arg(long, global = true, default_value = "/run/hyperhive/host.sock")] + socket: PathBuf, + #[command(subcommand)] cmd: Cmd, } @@ -17,11 +25,8 @@ enum Cmd { /// Flake reference for the agent base template. #[arg(long, default_value = "/etc/hyperhive#agent-base")] agent_flake: String, - /// Path to the host admin socket. - #[arg(long, default_value = "/run/hyperhive/host.sock")] - socket: PathBuf, }, - /// Spawn a new agent container (creates `hive-agent-`). + /// Spawn a new agent container (`hive-agent-`). Spawn { name: String }, /// Stop a managed container (graceful). Kill { name: String }, @@ -31,7 +36,8 @@ enum Cmd { List, } -fn main() -> Result<()> { +#[tokio::main] +async fn main() -> Result<()> { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() @@ -41,28 +47,20 @@ fn main() -> Result<()> { let cli = Cli::parse(); match cli.cmd { - Cmd::Serve { - agent_flake, - socket, - } => { - tracing::info!(?socket, %agent_flake, "serve: not yet implemented"); - bail!("serve not yet implemented"); - } - Cmd::Spawn { name } => { - tracing::info!(%name, "spawn: not yet implemented"); - bail!("spawn not yet implemented"); - } - Cmd::Kill { name } => { - tracing::info!(%name, "kill: not yet implemented"); - bail!("kill not yet implemented"); - } + Cmd::Serve { agent_flake } => server::serve(&cli.socket, &agent_flake).await, + Cmd::Spawn { name } => render(client::request(&cli.socket, HostRequest::Spawn { name }).await?), + Cmd::Kill { name } => render(client::request(&cli.socket, HostRequest::Kill { name }).await?), Cmd::Rebuild { name } => { - tracing::info!(%name, "rebuild: not yet implemented"); - bail!("rebuild not yet implemented"); - } - Cmd::List => { - tracing::info!("list: not yet implemented"); - bail!("list not yet implemented"); + render(client::request(&cli.socket, HostRequest::Rebuild { name }).await?) } + Cmd::List => render(client::request(&cli.socket, HostRequest::List).await?), } } + +fn render(resp: HostResponse) -> Result<()> { + println!("{}", serde_json::to_string_pretty(&resp)?); + if !resp.ok { + bail!(resp.error.unwrap_or_else(|| "request failed".to_owned())); + } + Ok(()) +} diff --git a/hive-c0re/src/server.rs b/hive-c0re/src/server.rs new file mode 100644 index 0000000..d7b9fd3 --- /dev/null +++ b/hive-c0re/src/server.rs @@ -0,0 +1,70 @@ +use std::path::Path; + +use anyhow::{Context, Result}; +use hive_sh4re::{HostRequest, HostResponse}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::{UnixListener, UnixStream}; + +pub async fn serve(socket: &Path, agent_flake: &str) -> Result<()> { + if let Some(parent) = socket.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("create socket parent {}", parent.display()))?; + } + if socket.exists() { + std::fs::remove_file(socket).context("remove stale socket")?; + } + + let listener = UnixListener::bind(socket) + .with_context(|| format!("bind admin socket {}", socket.display()))?; + tracing::info!(socket = %socket.display(), %agent_flake, "hive-c0re listening"); + + loop { + let (stream, _) = listener.accept().await.context("accept connection")?; + let agent_flake = agent_flake.to_owned(); + tokio::spawn(async move { + if let Err(e) = handle(stream, &agent_flake).await { + tracing::warn!(error = ?e, "connection failed"); + } + }); + } +} + +async fn handle(stream: UnixStream, agent_flake: &str) -> Result<()> { + let (read, mut write) = stream.into_split(); + let mut reader = BufReader::new(read); + let mut line = String::new(); + + loop { + line.clear(); + let n = reader.read_line(&mut line).await?; + if n == 0 { + return Ok(()); + } + let resp = match serde_json::from_str::(line.trim()) { + Ok(req) => dispatch(&req, agent_flake).await, + Err(e) => HostResponse::error(format!("parse error: {e}")), + }; + let mut payload = serde_json::to_string(&resp)?; + payload.push('\n'); + write.write_all(payload.as_bytes()).await?; + write.flush().await?; + } +} + +async fn dispatch(req: &HostRequest, _agent_flake: &str) -> HostResponse { + match req { + HostRequest::Spawn { name } => { + tracing::info!(%name, "spawn (stub)"); + HostResponse::error("spawn: nixos-container integration pending") + } + HostRequest::Kill { name } => { + tracing::info!(%name, "kill (stub)"); + HostResponse::error("kill: nixos-container integration pending") + } + HostRequest::Rebuild { name } => { + tracing::info!(%name, "rebuild (stub)"); + HostResponse::error("rebuild: nixos-container integration pending") + } + HostRequest::List => HostResponse::list(Vec::new()), + } +} diff --git a/hive-sh4re/Cargo.toml b/hive-sh4re/Cargo.toml index 004c5f0..45f43cf 100644 --- a/hive-sh4re/Cargo.toml +++ b/hive-sh4re/Cargo.toml @@ -2,3 +2,6 @@ name = "hive-sh4re" edition.workspace = true version.workspace = true + +[dependencies] +serde.workspace = true diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index 8b13789..9322282 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -1 +1,54 @@ +//! Wire types shared between `hive-c0re` and the in-container harness. +use serde::{Deserialize, Serialize}; + +/// Requests on the host admin socket (`/run/hyperhive/host.sock`). +/// +/// Wire format: one JSON object per line. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "cmd", rename_all = "snake_case")] +pub enum HostRequest { + /// Create and start a sub-agent container `hive-agent-`. + Spawn { name: String }, + /// Stop a managed container (graceful). + Kill { name: String }, + /// Apply pending config to a managed container. + Rebuild { name: String }, + /// List managed containers. + List, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HostResponse { + pub ok: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub error: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub agents: Option>, +} + +impl HostResponse { + pub fn success() -> Self { + Self { + ok: true, + error: None, + agents: None, + } + } + + pub fn error(message: impl Into) -> Self { + Self { + ok: false, + error: Some(message.into()), + agents: None, + } + } + + pub fn list(agents: Vec) -> Self { + Self { + ok: true, + error: None, + agents: Some(agents), + } + } +}