Phase 4: manager socket + manager_server with privileged tool surface
This commit is contained in:
parent
4f191b2e43
commit
aa67e5a481
6 changed files with 188 additions and 15 deletions
128
hive-c0re/src/manager_server.rs
Normal file
128
hive-c0re/src/manager_server.rs
Normal file
|
|
@ -0,0 +1,128 @@
|
|||
//! Manager socket listener. Privileged tool surface: agent-style send/recv
|
||||
//! plus lifecycle verbs (Phase 4). Phase 5 will gate Spawn/Kill behind the
|
||||
//! commit-approval flow; for now they hit the same code path the host admin
|
||||
//! socket uses.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use hive_sh4re::{MANAGER_AGENT, ManagerRequest, ManagerResponse, Message};
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::{UnixListener, UnixStream};
|
||||
|
||||
use crate::coordinator::Coordinator;
|
||||
use crate::lifecycle;
|
||||
|
||||
pub async fn start(coord: Arc<Coordinator>) -> Result<()> {
|
||||
let dir = Coordinator::manager_dir();
|
||||
std::fs::create_dir_all(&dir)
|
||||
.with_context(|| format!("create manager dir {}", dir.display()))?;
|
||||
let socket = Coordinator::manager_socket_path();
|
||||
if socket.exists() {
|
||||
std::fs::remove_file(&socket).context("remove stale manager socket")?;
|
||||
}
|
||||
let listener = UnixListener::bind(&socket)
|
||||
.with_context(|| format!("bind manager socket {}", socket.display()))?;
|
||||
tracing::info!(socket = %socket.display(), "manager socket listening");
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((stream, _)) => {
|
||||
let coord = coord.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = serve(stream, coord).await {
|
||||
tracing::warn!(error = ?e, "manager connection failed");
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(error = ?e, "manager listener accept failed");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serve(stream: UnixStream, coord: Arc<Coordinator>) -> Result<()> {
|
||||
let (read, mut write) = stream.into_split();
|
||||
let mut reader = BufReader::new(read);
|
||||
let mut line = String::new();
|
||||
loop {
|
||||
line.clear();
|
||||
let n = reader.read_line(&mut line).await?;
|
||||
if n == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
let resp = match serde_json::from_str::<ManagerRequest>(line.trim()) {
|
||||
Ok(req) => dispatch(&req, &coord).await,
|
||||
Err(e) => ManagerResponse::Err {
|
||||
message: format!("parse error: {e}"),
|
||||
},
|
||||
};
|
||||
let mut payload = serde_json::to_string(&resp)?;
|
||||
payload.push('\n');
|
||||
write.write_all(payload.as_bytes()).await?;
|
||||
write.flush().await?;
|
||||
}
|
||||
}
|
||||
|
||||
async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse {
|
||||
match req {
|
||||
ManagerRequest::Send { to, body } => match coord.broker.send(Message {
|
||||
from: MANAGER_AGENT.to_owned(),
|
||||
to: to.clone(),
|
||||
body: body.clone(),
|
||||
}) {
|
||||
Ok(()) => ManagerResponse::Ok,
|
||||
Err(e) => ManagerResponse::Err {
|
||||
message: format!("{e:#}"),
|
||||
},
|
||||
},
|
||||
ManagerRequest::Recv => match coord.broker.recv(MANAGER_AGENT) {
|
||||
Ok(Some(msg)) => ManagerResponse::Message {
|
||||
from: msg.from,
|
||||
body: msg.body,
|
||||
},
|
||||
Ok(None) => ManagerResponse::Empty,
|
||||
Err(e) => ManagerResponse::Err {
|
||||
message: format!("{e:#}"),
|
||||
},
|
||||
},
|
||||
ManagerRequest::Spawn { name } => {
|
||||
tracing::info!(%name, "manager: spawn");
|
||||
let result: Result<()> = async {
|
||||
let agent_dir = coord.register_agent(name).await?;
|
||||
if let Err(e) = lifecycle::spawn(name, &coord.agent_flake, &agent_dir).await {
|
||||
coord.unregister_agent(name);
|
||||
return Err(e);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
.await;
|
||||
match result {
|
||||
Ok(()) => ManagerResponse::Ok,
|
||||
Err(e) => ManagerResponse::Err {
|
||||
message: format!("{e:#}"),
|
||||
},
|
||||
}
|
||||
}
|
||||
ManagerRequest::Kill { name } => {
|
||||
tracing::info!(%name, "manager: kill");
|
||||
let result: Result<()> = async {
|
||||
lifecycle::kill(name).await?;
|
||||
coord.unregister_agent(name);
|
||||
Ok(())
|
||||
}
|
||||
.await;
|
||||
match result {
|
||||
Ok(()) => ManagerResponse::Ok,
|
||||
Err(e) => ManagerResponse::Err {
|
||||
message: format!("{e:#}"),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue