clippy pedantic clean + wired into flake checks

This commit is contained in:
müde 2026-05-14 22:57:47 +02:00
parent f12837fe32
commit fef2dee92a
12 changed files with 55 additions and 25 deletions

View file

@ -2,7 +2,7 @@
//! authenticates the caller: connecting to `<.../agents/foo/mcp.sock>` means
//! you are `foo`.
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{Context, Result};
@ -18,23 +18,24 @@ pub struct AgentSocket {
pub handle: JoinHandle<()>,
}
pub async fn start(
agent: String,
socket_path: PathBuf,
pub fn start(
agent: &str,
socket_path: &Path,
broker: Arc<Broker>,
) -> Result<AgentSocket> {
let agent = agent.to_owned();
if let Some(parent) = socket_path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create agent socket dir {}", parent.display()))?;
}
if socket_path.exists() {
std::fs::remove_file(&socket_path).context("remove stale agent socket")?;
std::fs::remove_file(socket_path).context("remove stale agent socket")?;
}
let listener = UnixListener::bind(&socket_path)
let listener = UnixListener::bind(socket_path)
.with_context(|| format!("bind agent socket {}", socket_path.display()))?;
tracing::info!(%agent, socket = %socket_path.display(), "agent socket listening");
let path = socket_path.clone();
let path = socket_path.to_path_buf();
let handle = tokio::spawn(async move {
loop {
match listener.accept().await {
@ -83,7 +84,7 @@ async fn serve(stream: UnixStream, agent: String, broker: Arc<Broker>) -> Result
fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse {
match req {
AgentRequest::Send { to, body } => {
match broker.send(Message {
match broker.send(&Message {
from: agent.to_owned(),
to: to.clone(),
body: body.clone(),

View file

@ -10,7 +10,7 @@ use anyhow::{Context, Result, bail};
use hive_sh4re::{Approval, ApprovalStatus};
use rusqlite::{Connection, OptionalExtension, params};
const SCHEMA: &str = r#"
const SCHEMA: &str = r"
CREATE TABLE IF NOT EXISTS approvals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent TEXT NOT NULL,
@ -22,7 +22,7 @@ CREATE TABLE IF NOT EXISTS approvals (
);
CREATE INDEX IF NOT EXISTS idx_approvals_pending
ON approvals (id) WHERE status = 'pending';
"#;
";
pub struct Approvals {
conn: Mutex<Connection>,
@ -65,6 +65,7 @@ impl Approvals {
.map_err(Into::into)
}
#[allow(dead_code)] // used by Phase 5b commit verification
pub fn get(&self, id: i64) -> Result<Option<Approval>> {
let conn = self.conn.lock().unwrap();
conn.query_row(
@ -162,6 +163,7 @@ fn row_to_approval(row: &rusqlite::Row<'_>) -> rusqlite::Result<Approval> {
fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0)
}

View file

@ -8,7 +8,7 @@ use anyhow::{Context, Result};
use hive_sh4re::Message;
use rusqlite::{Connection, OptionalExtension, params};
const SCHEMA: &str = r#"
const SCHEMA: &str = r"
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender TEXT NOT NULL,
@ -19,7 +19,7 @@ CREATE TABLE IF NOT EXISTS messages (
);
CREATE INDEX IF NOT EXISTS idx_messages_undelivered
ON messages (recipient, id) WHERE delivered_at IS NULL;
"#;
";
pub struct Broker {
conn: Mutex<Connection>,
@ -39,7 +39,7 @@ impl Broker {
})
}
pub fn send(&self, message: Message) -> Result<()> {
pub fn send(&self, message: &Message) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)",
@ -75,6 +75,7 @@ impl Broker {
fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0)
}

View file

@ -34,7 +34,7 @@ impl Coordinator {
})
}
pub async fn register_agent(&self, name: &str) -> Result<PathBuf> {
pub fn register_agent(&self, name: &str) -> Result<PathBuf> {
// Idempotent: drop any existing listener so re-registration (e.g. on rebuild,
// or after a hive-c0re restart cleared /run/hyperhive) gets a fresh socket.
self.unregister_agent(name);
@ -42,7 +42,7 @@ impl Coordinator {
std::fs::create_dir_all(&agent_dir)
.with_context(|| format!("create agent dir {}", agent_dir.display()))?;
let socket_path = Self::socket_path(name);
let socket = agent_server::start(name.to_owned(), socket_path, self.broker.clone()).await?;
let socket = agent_server::start(name, &socket_path, self.broker.clone())?;
self.agents.lock().unwrap().insert(name.to_owned(), socket);
Ok(agent_dir)
}

View file

@ -67,7 +67,7 @@ async fn main() -> Result<()> {
match cli.cmd {
Cmd::Serve { agent_flake, db } => {
let coord = Arc::new(Coordinator::open(&db, agent_flake)?);
manager_server::start(coord.clone()).await?;
manager_server::start(coord.clone())?;
server::serve(&cli.socket, coord).await
}
Cmd::Spawn { name } => {

View file

@ -13,7 +13,7 @@ use tokio::net::{UnixListener, UnixStream};
use crate::coordinator::Coordinator;
use crate::lifecycle;
pub async fn start(coord: Arc<Coordinator>) -> Result<()> {
pub fn start(coord: Arc<Coordinator>) -> Result<()> {
let dir = Coordinator::manager_dir();
std::fs::create_dir_all(&dir)
.with_context(|| format!("create manager dir {}", dir.display()))?;
@ -71,7 +71,7 @@ async fn serve(stream: UnixStream, coord: Arc<Coordinator>) -> Result<()> {
async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse {
match req {
ManagerRequest::Send { to, body } => match coord.broker.send(Message {
ManagerRequest::Send { to, body } => match coord.broker.send(&Message {
from: MANAGER_AGENT.to_owned(),
to: to.clone(),
body: body.clone(),
@ -94,7 +94,7 @@ async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse
ManagerRequest::Spawn { name } => {
tracing::info!(%name, "manager: spawn");
let result: Result<()> = async {
let agent_dir = coord.register_agent(name).await?;
let agent_dir = coord.register_agent(name)?;
if let Err(e) = lifecycle::spawn(name, &coord.agent_flake, &agent_dir).await {
coord.unregister_agent(name);
return Err(e);

View file

@ -60,7 +60,7 @@ async fn dispatch(req: &HostRequest, coord: &Coordinator) -> HostResponse {
Ok(match req {
HostRequest::Spawn { name } => {
tracing::info!(%name, "spawn");
let agent_dir = coord.register_agent(name).await?;
let agent_dir = coord.register_agent(name)?;
if let Err(e) = lifecycle::spawn(name, &coord.agent_flake, &agent_dir).await {
// Roll back socket registration if container creation failed.
coord.unregister_agent(name);
@ -76,7 +76,7 @@ async fn dispatch(req: &HostRequest, coord: &Coordinator) -> HostResponse {
}
HostRequest::Rebuild { name } => {
tracing::info!(%name, "rebuild");
let agent_dir = coord.register_agent(name).await?;
let agent_dir = coord.register_agent(name)?;
lifecycle::rebuild(name, &coord.agent_flake, &agent_dir).await?;
HostResponse::success()
}
@ -85,7 +85,7 @@ async fn dispatch(req: &HostRequest, coord: &Coordinator) -> HostResponse {
HostRequest::Approve { id } => {
let approval = coord.approvals.mark_approved(*id)?;
tracing::info!(%approval.id, %approval.agent, %approval.commit_ref, "approval applied: rebuilding agent");
let agent_dir = coord.register_agent(&approval.agent).await?;
let agent_dir = coord.register_agent(&approval.agent)?;
if let Err(e) =
lifecycle::rebuild(&approval.agent, &coord.agent_flake, &agent_dir).await
{