Phase 4: hive-m1nd harness + manager nixos template; devshell sqlite

This commit is contained in:
müde 2026-05-14 22:36:34 +02:00
parent aa67e5a481
commit 17092961a2
5 changed files with 159 additions and 28 deletions

View file

@ -79,12 +79,17 @@
nixosModules = { nixosModules = {
agent-base = ./nix/templates/agent-base.nix; agent-base = ./nix/templates/agent-base.nix;
hive-c0re = ./nix/modules/hive-c0re.nix; hive-c0re = ./nix/modules/hive-c0re.nix;
manager = ./nix/templates/manager.nix;
}; };
nixosConfigurations.agent-base = nixpkgs.lib.nixosSystem { nixosConfigurations =
let
mkContainer =
module:
nixpkgs.lib.nixosSystem {
system = "x86_64-linux"; system = "x86_64-linux";
modules = [ modules = [
self.nixosModules.agent-base module
{ {
nixpkgs.overlays = [ nixpkgs.overlays = [
self.overlays.default self.overlays.default
@ -93,6 +98,11 @@
} }
]; ];
}; };
in
{
agent-base = mkContainer self.nixosModules.agent-base;
manager = mkContainer self.nixosModules.manager;
};
devShells = forAllSystems ( devShells = forAllSystems (
{ pkgs, ... }: { pkgs, ... }:

View file

@ -46,12 +46,14 @@ async fn main() -> Result<()> {
match cli.cmd { match cli.cmd {
Cmd::Serve { poll_ms } => serve(&cli.socket, Duration::from_millis(poll_ms)).await, Cmd::Serve { poll_ms } => serve(&cli.socket, Duration::from_millis(poll_ms)).await,
Cmd::Send { to, body } => { Cmd::Send { to, body } => {
let resp = client::request(&cli.socket, AgentRequest::Send { to, body }).await?; let resp: AgentResponse =
client::request(&cli.socket, &AgentRequest::Send { to, body }).await?;
render(&resp)?; render(&resp)?;
check(&resp) check(&resp)
} }
Cmd::Recv => { Cmd::Recv => {
let resp = client::request(&cli.socket, AgentRequest::Recv).await?; let resp: AgentResponse =
client::request(&cli.socket, &AgentRequest::Recv).await?;
render(&resp)?; render(&resp)?;
check(&resp) check(&resp)
} }
@ -61,7 +63,8 @@ async fn main() -> Result<()> {
async fn serve(socket: &Path, interval: Duration) -> Result<()> { async fn serve(socket: &Path, interval: Duration) -> Result<()> {
tracing::info!(socket = %socket.display(), "hive-ag3nt serve"); tracing::info!(socket = %socket.display(), "hive-ag3nt serve");
loop { loop {
match client::request(socket, AgentRequest::Recv).await { let recv: Result<AgentResponse> = client::request(socket, &AgentRequest::Recv).await;
match recv {
Ok(AgentResponse::Message { from, body }) => { Ok(AgentResponse::Message { from, body }) => {
tracing::info!(%from, %body, "inbox"); tracing::info!(%from, %body, "inbox");
// Don't auto-reply to echoes — prevents infinite ping-pong when // Don't auto-reply to echoes — prevents infinite ping-pong when
@ -69,15 +72,15 @@ async fn serve(socket: &Path, interval: Duration) -> Result<()> {
// manager's job (Phase 4+). // manager's job (Phase 4+).
if !body.starts_with("echo: ") { if !body.starts_with("echo: ") {
let reply = compute_reply(&body).await; let reply = compute_reply(&body).await;
if let Err(e) = client::request( let send: Result<AgentResponse> = client::request(
socket, socket,
AgentRequest::Send { &AgentRequest::Send {
to: from, to: from,
body: reply, body: reply,
}, },
) )
.await .await;
{ if let Err(e) = send {
tracing::warn!(error = ?e, "send reply failed"); tracing::warn!(error = ?e, "send reply failed");
} }
} }

View file

@ -1,5 +1,91 @@
fn main() { //! Manager harness. Talks to the manager socket (bind-mounted from the host
// Phase 4 — manager tool surface. For now, a placeholder so the binary //! at `/run/hive/mcp.sock` inside the `hm1nd` container) using the privileged
// exists and can be referenced from the manager nixos-container template. //! tool surface. Phase 4 minimum: a CLI to exercise the verbs from a shell,
println!("hive-m1nd placeholder"); //! plus a `serve` loop that logs the manager's inbox.
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Result, bail};
use clap::{Parser, Subcommand};
use hive_ag3nt::{DEFAULT_SOCKET, client};
use hive_sh4re::{ManagerRequest, ManagerResponse};
#[derive(Parser)]
#[command(name = "hive-m1nd", about = "hyperhive manager harness")]
struct Cli {
/// Path to the manager MCP socket (bind-mounted from the host).
#[arg(long, global = true, default_value = DEFAULT_SOCKET)]
socket: PathBuf,
#[command(subcommand)]
cmd: Cmd,
}
#[derive(Subcommand)]
enum Cmd {
/// Long-lived loop polling the manager inbox.
Serve {
#[arg(long, default_value_t = 1000)]
poll_ms: u64,
},
/// Send a message to a sub-agent (or anywhere — the broker doesn't validate).
Send { to: String, body: String },
/// Pop one message from the manager's inbox.
Recv,
/// Spawn a sub-agent.
Spawn { name: String },
/// Kill a sub-agent.
Kill { name: String },
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
let cli = Cli::parse();
match cli.cmd {
Cmd::Serve { poll_ms } => serve(&cli.socket, Duration::from_millis(poll_ms)).await,
Cmd::Send { to, body } => one_shot(&cli.socket, ManagerRequest::Send { to, body }).await,
Cmd::Recv => one_shot(&cli.socket, ManagerRequest::Recv).await,
Cmd::Spawn { name } => one_shot(&cli.socket, ManagerRequest::Spawn { name }).await,
Cmd::Kill { name } => one_shot(&cli.socket, ManagerRequest::Kill { name }).await,
}
}
async fn one_shot(socket: &Path, req: ManagerRequest) -> Result<()> {
let resp: ManagerResponse = client::request(socket, &req).await?;
println!("{}", serde_json::to_string_pretty(&resp)?);
if let ManagerResponse::Err { message } = resp {
bail!("{message}");
}
Ok(())
}
async fn serve(socket: &Path, interval: Duration) -> Result<()> {
tracing::info!(socket = %socket.display(), "hive-m1nd serve");
loop {
let recv: Result<ManagerResponse> = client::request(socket, &ManagerRequest::Recv).await;
match recv {
Ok(ManagerResponse::Message { from, body }) => {
tracing::info!(%from, %body, "manager inbox");
}
Ok(ManagerResponse::Empty) => {}
Ok(ManagerResponse::Ok) => {
tracing::warn!("recv produced Ok (unexpected)");
}
Ok(ManagerResponse::Err { message }) => {
tracing::warn!(%message, "recv error");
}
Err(e) => {
tracing::warn!(error = ?e, "recv failed; retrying");
}
}
tokio::time::sleep(interval).await;
}
} }

View file

@ -1,17 +1,24 @@
use std::path::Path; use std::path::Path;
use anyhow::{Context, Result, bail}; use anyhow::{Context, Result, bail};
use hive_sh4re::{AgentRequest, AgentResponse}; use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream; use tokio::net::UnixStream;
pub async fn request(socket: &Path, req: AgentRequest) -> Result<AgentResponse> { /// Generic JSON-line request/response over a unix socket. One request, one
/// response, then drop. Used by both the agent and manager harnesses.
pub async fn request<Req, Resp>(socket: &Path, req: &Req) -> Result<Resp>
where
Req: Serialize + ?Sized,
Resp: DeserializeOwned,
{
let stream = UnixStream::connect(socket) let stream = UnixStream::connect(socket)
.await .await
.with_context(|| format!("connect to {}", socket.display()))?; .with_context(|| format!("connect to {}", socket.display()))?;
let (read, mut write) = stream.into_split(); let (read, mut write) = stream.into_split();
let mut payload = serde_json::to_string(&req)?; let mut payload = serde_json::to_string(req)?;
payload.push('\n'); payload.push('\n');
write.write_all(payload.as_bytes()).await?; write.write_all(payload.as_bytes()).await?;
write.flush().await?; write.flush().await?;
@ -22,6 +29,5 @@ pub async fn request(socket: &Path, req: AgentRequest) -> Result<AgentResponse>
if line.is_empty() { if line.is_empty() {
bail!("server closed connection without responding"); bail!("server closed connection without responding");
} }
let resp: AgentResponse = serde_json::from_str(line.trim())?; Ok(serde_json::from_str(line.trim())?)
Ok(resp)
} }

26
nix/templates/manager.nix Normal file
View file

@ -0,0 +1,26 @@
{ pkgs, ... }:
{
boot.isNspawnContainer = true;
nixpkgs.config.allowUnfreePredicate = pkg: builtins.elem (pkgs.lib.getName pkg) [ "claude-code" ];
environment.systemPackages = with pkgs; [
hyperhive
claude-code
git
coreutils-full
];
systemd.services.hive-m1nd = {
description = "hive-m1nd manager harness";
wantedBy = [ "multi-user.target" ];
after = [ "network.target" ];
serviceConfig = {
ExecStart = "${pkgs.hyperhive}/bin/hive-m1nd serve";
Restart = "on-failure";
RestartSec = 2;
};
};
system.stateVersion = "25.11";
}