From f12837fe32157909187942c5f75ceb2ebf30fb91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?m=C3=BCde?= Date: Thu, 14 May 2026 22:50:19 +0200 Subject: [PATCH] Phase 5a: approval queue (request_apply_commit, pending/approve/deny) --- hive-ag3nt/src/bin/hive-m1nd.rs | 9 ++ hive-c0re/src/approvals.rs | 167 ++++++++++++++++++++++++++++++++ hive-c0re/src/coordinator.rs | 4 + hive-c0re/src/main.rs | 10 ++ hive-c0re/src/manager_server.rs | 12 +++ hive-c0re/src/server.rs | 19 ++++ hive-sh4re/src/lib.rs | 49 ++++++++++ 7 files changed, 270 insertions(+) create mode 100644 hive-c0re/src/approvals.rs diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index b4f9a10..89fb821 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -37,6 +37,8 @@ enum Cmd { Spawn { name: String }, /// Kill a sub-agent. Kill { name: String }, + /// Submit a config commit on the agent's config repo for user approval. + RequestApplyCommit { agent: String, commit_ref: String }, } #[tokio::main] @@ -55,6 +57,13 @@ async fn main() -> Result<()> { Cmd::Recv => one_shot(&cli.socket, ManagerRequest::Recv).await, Cmd::Spawn { name } => one_shot(&cli.socket, ManagerRequest::Spawn { name }).await, Cmd::Kill { name } => one_shot(&cli.socket, ManagerRequest::Kill { name }).await, + Cmd::RequestApplyCommit { agent, commit_ref } => { + one_shot( + &cli.socket, + ManagerRequest::RequestApplyCommit { agent, commit_ref }, + ) + .await + } } } diff --git a/hive-c0re/src/approvals.rs b/hive-c0re/src/approvals.rs new file mode 100644 index 0000000..b482e16 --- /dev/null +++ b/hive-c0re/src/approvals.rs @@ -0,0 +1,167 @@ +//! Approval queue. Manager submits via `RequestApplyCommit`; the user +//! approves/denies via the host admin CLI; on approval the host runs the +//! corresponding action (Phase 5a: `lifecycle::rebuild(agent)`). + +use std::path::Path; +use std::sync::Mutex; +use std::time::{SystemTime, UNIX_EPOCH}; + +use anyhow::{Context, Result, bail}; +use hive_sh4re::{Approval, ApprovalStatus}; +use rusqlite::{Connection, OptionalExtension, params}; + +const SCHEMA: &str = r#" +CREATE TABLE IF NOT EXISTS approvals ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent TEXT NOT NULL, + commit_ref TEXT NOT NULL, + requested_at INTEGER NOT NULL, + status TEXT NOT NULL, + resolved_at INTEGER, + note TEXT +); +CREATE INDEX IF NOT EXISTS idx_approvals_pending + ON approvals (id) WHERE status = 'pending'; +"#; + +pub struct Approvals { + conn: Mutex, +} + +impl Approvals { + pub fn open(path: &Path) -> Result { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("create approvals db parent {}", parent.display()))?; + } + let conn = Connection::open(path) + .with_context(|| format!("open approvals db {}", path.display()))?; + conn.execute_batch(SCHEMA).context("apply approvals schema")?; + Ok(Self { + conn: Mutex::new(conn), + }) + } + + pub fn submit(&self, agent: &str, commit_ref: &str) -> Result { + let conn = self.conn.lock().unwrap(); + conn.execute( + "INSERT INTO approvals (agent, commit_ref, requested_at, status) + VALUES (?1, ?2, ?3, 'pending')", + params![agent, commit_ref, now_unix()], + )?; + Ok(conn.last_insert_rowid()) + } + + pub fn pending(&self) -> Result> { + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT id, agent, commit_ref, requested_at, status, resolved_at, note + FROM approvals + WHERE status = 'pending' + ORDER BY id ASC", + )?; + let rows = stmt.query_map([], row_to_approval)?; + rows.collect::>>() + .map_err(Into::into) + } + + pub fn get(&self, id: i64) -> Result> { + let conn = self.conn.lock().unwrap(); + conn.query_row( + "SELECT id, agent, commit_ref, requested_at, status, resolved_at, note + FROM approvals WHERE id = ?1", + params![id], + row_to_approval, + ) + .optional() + .map_err(Into::into) + } + + /// Mark pending -> approved (or fail if not pending). Returns the (now-updated) + /// approval so the caller can run the action and pass the agent name. + pub fn mark_approved(&self, id: i64) -> Result { + let conn = self.conn.lock().unwrap(); + let current: Option<(String, String, i64, String)> = conn + .query_row( + "SELECT agent, commit_ref, requested_at, status FROM approvals WHERE id = ?1", + params![id], + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), + ) + .optional()?; + let Some((agent, commit_ref, requested_at, status)) = current else { + bail!("approval {id} not found"); + }; + if status != "pending" { + bail!("approval {id} is {status}, not pending"); + } + let resolved_at = now_unix(); + conn.execute( + "UPDATE approvals SET status = 'approved', resolved_at = ?1 WHERE id = ?2", + params![resolved_at, id], + )?; + Ok(Approval { + id, + agent, + commit_ref, + requested_at, + status: ApprovalStatus::Approved, + resolved_at: Some(resolved_at), + note: None, + }) + } + + pub fn mark_denied(&self, id: i64) -> Result<()> { + let conn = self.conn.lock().unwrap(); + let affected = conn.execute( + "UPDATE approvals SET status = 'denied', resolved_at = ?1 + WHERE id = ?2 AND status = 'pending'", + params![now_unix(), id], + )?; + if affected == 0 { + bail!("approval {id} not pending"); + } + Ok(()) + } + + pub fn mark_failed(&self, id: i64, note: &str) -> Result<()> { + let conn = self.conn.lock().unwrap(); + conn.execute( + "UPDATE approvals SET status = 'failed', resolved_at = ?1, note = ?2 WHERE id = ?3", + params![now_unix(), note, id], + )?; + Ok(()) + } +} + +fn row_to_approval(row: &rusqlite::Row<'_>) -> rusqlite::Result { + let status: String = row.get(4)?; + let status = match status.as_str() { + "pending" => ApprovalStatus::Pending, + "approved" => ApprovalStatus::Approved, + "denied" => ApprovalStatus::Denied, + "failed" => ApprovalStatus::Failed, + other => { + return Err(rusqlite::Error::FromSqlConversionFailure( + 4, + rusqlite::types::Type::Text, + format!("unknown approval status '{other}'").into(), + )); + } + }; + Ok(Approval { + id: row.get(0)?, + agent: row.get(1)?, + commit_ref: row.get(2)?, + requested_at: row.get(3)?, + status, + resolved_at: row.get(5)?, + note: row.get(6)?, + }) +} + +fn now_unix() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0) +} diff --git a/hive-c0re/src/coordinator.rs b/hive-c0re/src/coordinator.rs index 48f0bcf..e7c9896 100644 --- a/hive-c0re/src/coordinator.rs +++ b/hive-c0re/src/coordinator.rs @@ -9,6 +9,7 @@ use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; use crate::agent_server::{self, AgentSocket}; +use crate::approvals::Approvals; use crate::broker::Broker; const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents"; @@ -16,6 +17,7 @@ const MANAGER_RUNTIME_ROOT: &str = "/run/hyperhive/manager"; pub struct Coordinator { pub broker: Arc, + pub approvals: Arc, pub agent_flake: String, agents: Mutex>, } @@ -23,8 +25,10 @@ pub struct Coordinator { impl Coordinator { pub fn open(db_path: &Path, agent_flake: String) -> Result { let broker = Broker::open(db_path).context("open broker")?; + let approvals = Approvals::open(db_path).context("open approvals")?; Ok(Self { broker: Arc::new(broker), + approvals: Arc::new(approvals), agent_flake, agents: Mutex::new(HashMap::new()), }) diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index c3f57cc..823e465 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -6,6 +6,7 @@ use clap::{Parser, Subcommand}; use hive_sh4re::{HostRequest, HostResponse}; mod agent_server; +mod approvals; mod broker; mod client; mod coordinator; @@ -45,6 +46,12 @@ enum Cmd { Rebuild { name: String }, /// List managed containers. List, + /// List pending approval requests submitted by the manager. + Pending, + /// Approve a pending request by id; the action runs immediately. + Approve { id: i64 }, + /// Deny a pending request by id. + Deny { id: i64 }, } #[tokio::main] @@ -73,6 +80,9 @@ async fn main() -> Result<()> { render(client::request(&cli.socket, HostRequest::Rebuild { name }).await?) } Cmd::List => render(client::request(&cli.socket, HostRequest::List).await?), + Cmd::Pending => render(client::request(&cli.socket, HostRequest::Pending).await?), + Cmd::Approve { id } => render(client::request(&cli.socket, HostRequest::Approve { id }).await?), + Cmd::Deny { id } => render(client::request(&cli.socket, HostRequest::Deny { id }).await?), } } diff --git a/hive-c0re/src/manager_server.rs b/hive-c0re/src/manager_server.rs index f784c6d..38b54f6 100644 --- a/hive-c0re/src/manager_server.rs +++ b/hive-c0re/src/manager_server.rs @@ -124,5 +124,17 @@ async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse }, } } + ManagerRequest::RequestApplyCommit { agent, commit_ref } => { + tracing::info!(%agent, %commit_ref, "manager: request_apply_commit"); + match coord.approvals.submit(agent, commit_ref) { + Ok(id) => { + tracing::info!(%id, %agent, %commit_ref, "approval queued"); + ManagerResponse::Ok + } + Err(e) => ManagerResponse::Err { + message: format!("{e:#}"), + }, + } + } } } diff --git a/hive-c0re/src/server.rs b/hive-c0re/src/server.rs index 06a3635..3caff99 100644 --- a/hive-c0re/src/server.rs +++ b/hive-c0re/src/server.rs @@ -81,6 +81,25 @@ async fn dispatch(req: &HostRequest, coord: &Coordinator) -> HostResponse { HostResponse::success() } HostRequest::List => HostResponse::list(lifecycle::list().await?), + HostRequest::Pending => HostResponse::pending(coord.approvals.pending()?), + HostRequest::Approve { id } => { + let approval = coord.approvals.mark_approved(*id)?; + tracing::info!(%approval.id, %approval.agent, %approval.commit_ref, "approval applied: rebuilding agent"); + let agent_dir = coord.register_agent(&approval.agent).await?; + if let Err(e) = + lifecycle::rebuild(&approval.agent, &coord.agent_flake, &agent_dir).await + { + let note = format!("{e:#}"); + let _ = coord.approvals.mark_failed(approval.id, ¬e); + return Err(e); + } + HostResponse::success() + } + HostRequest::Deny { id } => { + coord.approvals.mark_denied(*id)?; + tracing::info!(%id, "approval denied"); + HostResponse::success() + } }) } .await; diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index 1014293..5f60963 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -20,6 +20,12 @@ pub enum HostRequest { Rebuild { name: String }, /// List managed containers. List, + /// List pending approval requests. + Pending, + /// Approve a pending request by id; the action runs immediately. + Approve { id: i64 }, + /// Deny a pending request by id. + Deny { id: i64 }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -29,6 +35,30 @@ pub struct HostResponse { pub error: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub agents: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub approvals: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Approval { + pub id: i64, + pub agent: String, + pub commit_ref: String, + pub requested_at: i64, + pub status: ApprovalStatus, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub resolved_at: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub note: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ApprovalStatus { + Pending, + Approved, + Denied, + Failed, } impl HostResponse { @@ -37,6 +67,7 @@ impl HostResponse { ok: true, error: None, agents: None, + approvals: None, } } @@ -45,6 +76,7 @@ impl HostResponse { ok: false, error: Some(message.into()), agents: None, + approvals: None, } } @@ -53,6 +85,16 @@ impl HostResponse { ok: true, error: None, agents: Some(agents), + approvals: None, + } + } + + pub fn pending(approvals: Vec) -> Self { + Self { + ok: true, + error: None, + agents: None, + approvals: Some(approvals), } } } @@ -121,6 +163,13 @@ pub enum ManagerRequest { Kill { name: String, }, + /// Submit a config commit for the user to approve. `commit_ref` is opaque + /// to the host (typically a git sha pointing into the agent's config repo). + /// On approval the host applies the change via `nixos-container update`. + RequestApplyCommit { + agent: String, + commit_ref: String, + }, } #[derive(Debug, Clone, Serialize, Deserialize)]