Phase 5a: approval queue (request_apply_commit, pending/approve/deny)
This commit is contained in:
parent
4a73340150
commit
f12837fe32
7 changed files with 270 additions and 0 deletions
|
|
@ -37,6 +37,8 @@ enum Cmd {
|
||||||
Spawn { name: String },
|
Spawn { name: String },
|
||||||
/// Kill a sub-agent.
|
/// Kill a sub-agent.
|
||||||
Kill { name: String },
|
Kill { name: String },
|
||||||
|
/// Submit a config commit on the agent's config repo for user approval.
|
||||||
|
RequestApplyCommit { agent: String, commit_ref: String },
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
|
@ -55,6 +57,13 @@ async fn main() -> Result<()> {
|
||||||
Cmd::Recv => one_shot(&cli.socket, ManagerRequest::Recv).await,
|
Cmd::Recv => one_shot(&cli.socket, ManagerRequest::Recv).await,
|
||||||
Cmd::Spawn { name } => one_shot(&cli.socket, ManagerRequest::Spawn { name }).await,
|
Cmd::Spawn { name } => one_shot(&cli.socket, ManagerRequest::Spawn { name }).await,
|
||||||
Cmd::Kill { name } => one_shot(&cli.socket, ManagerRequest::Kill { name }).await,
|
Cmd::Kill { name } => one_shot(&cli.socket, ManagerRequest::Kill { name }).await,
|
||||||
|
Cmd::RequestApplyCommit { agent, commit_ref } => {
|
||||||
|
one_shot(
|
||||||
|
&cli.socket,
|
||||||
|
ManagerRequest::RequestApplyCommit { agent, commit_ref },
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
167
hive-c0re/src/approvals.rs
Normal file
167
hive-c0re/src/approvals.rs
Normal file
|
|
@ -0,0 +1,167 @@
|
||||||
|
//! Approval queue. Manager submits via `RequestApplyCommit`; the user
|
||||||
|
//! approves/denies via the host admin CLI; on approval the host runs the
|
||||||
|
//! corresponding action (Phase 5a: `lifecycle::rebuild(agent)`).
|
||||||
|
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
use anyhow::{Context, Result, bail};
|
||||||
|
use hive_sh4re::{Approval, ApprovalStatus};
|
||||||
|
use rusqlite::{Connection, OptionalExtension, params};
|
||||||
|
|
||||||
|
const SCHEMA: &str = r#"
|
||||||
|
CREATE TABLE IF NOT EXISTS approvals (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
agent TEXT NOT NULL,
|
||||||
|
commit_ref TEXT NOT NULL,
|
||||||
|
requested_at INTEGER NOT NULL,
|
||||||
|
status TEXT NOT NULL,
|
||||||
|
resolved_at INTEGER,
|
||||||
|
note TEXT
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_approvals_pending
|
||||||
|
ON approvals (id) WHERE status = 'pending';
|
||||||
|
"#;
|
||||||
|
|
||||||
|
pub struct Approvals {
|
||||||
|
conn: Mutex<Connection>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Approvals {
|
||||||
|
pub fn open(path: &Path) -> Result<Self> {
|
||||||
|
if let Some(parent) = path.parent() {
|
||||||
|
std::fs::create_dir_all(parent)
|
||||||
|
.with_context(|| format!("create approvals db parent {}", parent.display()))?;
|
||||||
|
}
|
||||||
|
let conn = Connection::open(path)
|
||||||
|
.with_context(|| format!("open approvals db {}", path.display()))?;
|
||||||
|
conn.execute_batch(SCHEMA).context("apply approvals schema")?;
|
||||||
|
Ok(Self {
|
||||||
|
conn: Mutex::new(conn),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn submit(&self, agent: &str, commit_ref: &str) -> Result<i64> {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO approvals (agent, commit_ref, requested_at, status)
|
||||||
|
VALUES (?1, ?2, ?3, 'pending')",
|
||||||
|
params![agent, commit_ref, now_unix()],
|
||||||
|
)?;
|
||||||
|
Ok(conn.last_insert_rowid())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn pending(&self) -> Result<Vec<Approval>> {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
let mut stmt = conn.prepare(
|
||||||
|
"SELECT id, agent, commit_ref, requested_at, status, resolved_at, note
|
||||||
|
FROM approvals
|
||||||
|
WHERE status = 'pending'
|
||||||
|
ORDER BY id ASC",
|
||||||
|
)?;
|
||||||
|
let rows = stmt.query_map([], row_to_approval)?;
|
||||||
|
rows.collect::<rusqlite::Result<Vec<_>>>()
|
||||||
|
.map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, id: i64) -> Result<Option<Approval>> {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
conn.query_row(
|
||||||
|
"SELECT id, agent, commit_ref, requested_at, status, resolved_at, note
|
||||||
|
FROM approvals WHERE id = ?1",
|
||||||
|
params![id],
|
||||||
|
row_to_approval,
|
||||||
|
)
|
||||||
|
.optional()
|
||||||
|
.map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark pending -> approved (or fail if not pending). Returns the (now-updated)
|
||||||
|
/// approval so the caller can run the action and pass the agent name.
|
||||||
|
pub fn mark_approved(&self, id: i64) -> Result<Approval> {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
let current: Option<(String, String, i64, String)> = conn
|
||||||
|
.query_row(
|
||||||
|
"SELECT agent, commit_ref, requested_at, status FROM approvals WHERE id = ?1",
|
||||||
|
params![id],
|
||||||
|
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
|
||||||
|
)
|
||||||
|
.optional()?;
|
||||||
|
let Some((agent, commit_ref, requested_at, status)) = current else {
|
||||||
|
bail!("approval {id} not found");
|
||||||
|
};
|
||||||
|
if status != "pending" {
|
||||||
|
bail!("approval {id} is {status}, not pending");
|
||||||
|
}
|
||||||
|
let resolved_at = now_unix();
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE approvals SET status = 'approved', resolved_at = ?1 WHERE id = ?2",
|
||||||
|
params![resolved_at, id],
|
||||||
|
)?;
|
||||||
|
Ok(Approval {
|
||||||
|
id,
|
||||||
|
agent,
|
||||||
|
commit_ref,
|
||||||
|
requested_at,
|
||||||
|
status: ApprovalStatus::Approved,
|
||||||
|
resolved_at: Some(resolved_at),
|
||||||
|
note: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn mark_denied(&self, id: i64) -> Result<()> {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
let affected = conn.execute(
|
||||||
|
"UPDATE approvals SET status = 'denied', resolved_at = ?1
|
||||||
|
WHERE id = ?2 AND status = 'pending'",
|
||||||
|
params![now_unix(), id],
|
||||||
|
)?;
|
||||||
|
if affected == 0 {
|
||||||
|
bail!("approval {id} not pending");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn mark_failed(&self, id: i64, note: &str) -> Result<()> {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE approvals SET status = 'failed', resolved_at = ?1, note = ?2 WHERE id = ?3",
|
||||||
|
params![now_unix(), note, id],
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn row_to_approval(row: &rusqlite::Row<'_>) -> rusqlite::Result<Approval> {
|
||||||
|
let status: String = row.get(4)?;
|
||||||
|
let status = match status.as_str() {
|
||||||
|
"pending" => ApprovalStatus::Pending,
|
||||||
|
"approved" => ApprovalStatus::Approved,
|
||||||
|
"denied" => ApprovalStatus::Denied,
|
||||||
|
"failed" => ApprovalStatus::Failed,
|
||||||
|
other => {
|
||||||
|
return Err(rusqlite::Error::FromSqlConversionFailure(
|
||||||
|
4,
|
||||||
|
rusqlite::types::Type::Text,
|
||||||
|
format!("unknown approval status '{other}'").into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(Approval {
|
||||||
|
id: row.get(0)?,
|
||||||
|
agent: row.get(1)?,
|
||||||
|
commit_ref: row.get(2)?,
|
||||||
|
requested_at: row.get(3)?,
|
||||||
|
status,
|
||||||
|
resolved_at: row.get(5)?,
|
||||||
|
note: row.get(6)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn now_unix() -> i64 {
|
||||||
|
SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.map(|d| d.as_secs() as i64)
|
||||||
|
.unwrap_or(0)
|
||||||
|
}
|
||||||
|
|
@ -9,6 +9,7 @@ use std::sync::{Arc, Mutex};
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
|
|
||||||
use crate::agent_server::{self, AgentSocket};
|
use crate::agent_server::{self, AgentSocket};
|
||||||
|
use crate::approvals::Approvals;
|
||||||
use crate::broker::Broker;
|
use crate::broker::Broker;
|
||||||
|
|
||||||
const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents";
|
const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents";
|
||||||
|
|
@ -16,6 +17,7 @@ const MANAGER_RUNTIME_ROOT: &str = "/run/hyperhive/manager";
|
||||||
|
|
||||||
pub struct Coordinator {
|
pub struct Coordinator {
|
||||||
pub broker: Arc<Broker>,
|
pub broker: Arc<Broker>,
|
||||||
|
pub approvals: Arc<Approvals>,
|
||||||
pub agent_flake: String,
|
pub agent_flake: String,
|
||||||
agents: Mutex<HashMap<String, AgentSocket>>,
|
agents: Mutex<HashMap<String, AgentSocket>>,
|
||||||
}
|
}
|
||||||
|
|
@ -23,8 +25,10 @@ pub struct Coordinator {
|
||||||
impl Coordinator {
|
impl Coordinator {
|
||||||
pub fn open(db_path: &Path, agent_flake: String) -> Result<Self> {
|
pub fn open(db_path: &Path, agent_flake: String) -> Result<Self> {
|
||||||
let broker = Broker::open(db_path).context("open broker")?;
|
let broker = Broker::open(db_path).context("open broker")?;
|
||||||
|
let approvals = Approvals::open(db_path).context("open approvals")?;
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
broker: Arc::new(broker),
|
broker: Arc::new(broker),
|
||||||
|
approvals: Arc::new(approvals),
|
||||||
agent_flake,
|
agent_flake,
|
||||||
agents: Mutex::new(HashMap::new()),
|
agents: Mutex::new(HashMap::new()),
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ use clap::{Parser, Subcommand};
|
||||||
use hive_sh4re::{HostRequest, HostResponse};
|
use hive_sh4re::{HostRequest, HostResponse};
|
||||||
|
|
||||||
mod agent_server;
|
mod agent_server;
|
||||||
|
mod approvals;
|
||||||
mod broker;
|
mod broker;
|
||||||
mod client;
|
mod client;
|
||||||
mod coordinator;
|
mod coordinator;
|
||||||
|
|
@ -45,6 +46,12 @@ enum Cmd {
|
||||||
Rebuild { name: String },
|
Rebuild { name: String },
|
||||||
/// List managed containers.
|
/// List managed containers.
|
||||||
List,
|
List,
|
||||||
|
/// List pending approval requests submitted by the manager.
|
||||||
|
Pending,
|
||||||
|
/// Approve a pending request by id; the action runs immediately.
|
||||||
|
Approve { id: i64 },
|
||||||
|
/// Deny a pending request by id.
|
||||||
|
Deny { id: i64 },
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
|
@ -73,6 +80,9 @@ async fn main() -> Result<()> {
|
||||||
render(client::request(&cli.socket, HostRequest::Rebuild { name }).await?)
|
render(client::request(&cli.socket, HostRequest::Rebuild { name }).await?)
|
||||||
}
|
}
|
||||||
Cmd::List => render(client::request(&cli.socket, HostRequest::List).await?),
|
Cmd::List => render(client::request(&cli.socket, HostRequest::List).await?),
|
||||||
|
Cmd::Pending => render(client::request(&cli.socket, HostRequest::Pending).await?),
|
||||||
|
Cmd::Approve { id } => render(client::request(&cli.socket, HostRequest::Approve { id }).await?),
|
||||||
|
Cmd::Deny { id } => render(client::request(&cli.socket, HostRequest::Deny { id }).await?),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -124,5 +124,17 @@ async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ManagerRequest::RequestApplyCommit { agent, commit_ref } => {
|
||||||
|
tracing::info!(%agent, %commit_ref, "manager: request_apply_commit");
|
||||||
|
match coord.approvals.submit(agent, commit_ref) {
|
||||||
|
Ok(id) => {
|
||||||
|
tracing::info!(%id, %agent, %commit_ref, "approval queued");
|
||||||
|
ManagerResponse::Ok
|
||||||
|
}
|
||||||
|
Err(e) => ManagerResponse::Err {
|
||||||
|
message: format!("{e:#}"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -81,6 +81,25 @@ async fn dispatch(req: &HostRequest, coord: &Coordinator) -> HostResponse {
|
||||||
HostResponse::success()
|
HostResponse::success()
|
||||||
}
|
}
|
||||||
HostRequest::List => HostResponse::list(lifecycle::list().await?),
|
HostRequest::List => HostResponse::list(lifecycle::list().await?),
|
||||||
|
HostRequest::Pending => HostResponse::pending(coord.approvals.pending()?),
|
||||||
|
HostRequest::Approve { id } => {
|
||||||
|
let approval = coord.approvals.mark_approved(*id)?;
|
||||||
|
tracing::info!(%approval.id, %approval.agent, %approval.commit_ref, "approval applied: rebuilding agent");
|
||||||
|
let agent_dir = coord.register_agent(&approval.agent).await?;
|
||||||
|
if let Err(e) =
|
||||||
|
lifecycle::rebuild(&approval.agent, &coord.agent_flake, &agent_dir).await
|
||||||
|
{
|
||||||
|
let note = format!("{e:#}");
|
||||||
|
let _ = coord.approvals.mark_failed(approval.id, ¬e);
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
HostResponse::success()
|
||||||
|
}
|
||||||
|
HostRequest::Deny { id } => {
|
||||||
|
coord.approvals.mark_denied(*id)?;
|
||||||
|
tracing::info!(%id, "approval denied");
|
||||||
|
HostResponse::success()
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
.await;
|
.await;
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,12 @@ pub enum HostRequest {
|
||||||
Rebuild { name: String },
|
Rebuild { name: String },
|
||||||
/// List managed containers.
|
/// List managed containers.
|
||||||
List,
|
List,
|
||||||
|
/// List pending approval requests.
|
||||||
|
Pending,
|
||||||
|
/// Approve a pending request by id; the action runs immediately.
|
||||||
|
Approve { id: i64 },
|
||||||
|
/// Deny a pending request by id.
|
||||||
|
Deny { id: i64 },
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
|
@ -29,6 +35,30 @@ pub struct HostResponse {
|
||||||
pub error: Option<String>,
|
pub error: Option<String>,
|
||||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
pub agents: Option<Vec<String>>,
|
pub agents: Option<Vec<String>>,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub approvals: Option<Vec<Approval>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Approval {
|
||||||
|
pub id: i64,
|
||||||
|
pub agent: String,
|
||||||
|
pub commit_ref: String,
|
||||||
|
pub requested_at: i64,
|
||||||
|
pub status: ApprovalStatus,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub resolved_at: Option<i64>,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub note: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum ApprovalStatus {
|
||||||
|
Pending,
|
||||||
|
Approved,
|
||||||
|
Denied,
|
||||||
|
Failed,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HostResponse {
|
impl HostResponse {
|
||||||
|
|
@ -37,6 +67,7 @@ impl HostResponse {
|
||||||
ok: true,
|
ok: true,
|
||||||
error: None,
|
error: None,
|
||||||
agents: None,
|
agents: None,
|
||||||
|
approvals: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -45,6 +76,7 @@ impl HostResponse {
|
||||||
ok: false,
|
ok: false,
|
||||||
error: Some(message.into()),
|
error: Some(message.into()),
|
||||||
agents: None,
|
agents: None,
|
||||||
|
approvals: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -53,6 +85,16 @@ impl HostResponse {
|
||||||
ok: true,
|
ok: true,
|
||||||
error: None,
|
error: None,
|
||||||
agents: Some(agents),
|
agents: Some(agents),
|
||||||
|
approvals: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn pending(approvals: Vec<Approval>) -> Self {
|
||||||
|
Self {
|
||||||
|
ok: true,
|
||||||
|
error: None,
|
||||||
|
agents: None,
|
||||||
|
approvals: Some(approvals),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -121,6 +163,13 @@ pub enum ManagerRequest {
|
||||||
Kill {
|
Kill {
|
||||||
name: String,
|
name: String,
|
||||||
},
|
},
|
||||||
|
/// Submit a config commit for the user to approve. `commit_ref` is opaque
|
||||||
|
/// to the host (typically a git sha pointing into the agent's config repo).
|
||||||
|
/// On approval the host applies the change via `nixos-container update`.
|
||||||
|
RequestApplyCommit {
|
||||||
|
agent: String,
|
||||||
|
commit_ref: String,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue