hyperhive/hive-c0re/src/approvals.rs

310 lines
11 KiB
Rust

//! Approval queue. Manager submits via `RequestApplyCommit`; the user
//! approves/denies via the host admin CLI; on approval the host runs the
//! corresponding action (Phase 5a: `lifecycle::rebuild(agent)`).
use std::path::Path;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result, bail};
use hive_sh4re::{Approval, ApprovalKind, ApprovalStatus};
use rusqlite::{Connection, OptionalExtension, params};
const SCHEMA: &str = r"
CREATE TABLE IF NOT EXISTS approvals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent TEXT NOT NULL,
commit_ref TEXT NOT NULL,
requested_at INTEGER NOT NULL,
status TEXT NOT NULL,
resolved_at INTEGER,
note TEXT
);
CREATE INDEX IF NOT EXISTS idx_approvals_pending
ON approvals (id) WHERE status = 'pending';
";
/// Add the `description` column to pre-description databases. Manager-supplied
/// note shown on the dashboard approval card at submission time (distinct from
/// `note` which is set on denial/failure).
fn ensure_description_column(conn: &Connection) -> Result<()> {
let has: bool = conn
.prepare("SELECT 1 FROM pragma_table_info('approvals') WHERE name = 'description'")?
.exists([])?;
if !has {
conn.execute_batch("ALTER TABLE approvals ADD COLUMN description TEXT;")
.context("add approvals.description column")?;
}
Ok(())
}
/// Add the `kind` column to pre-Phase-8 databases. ALTER TABLE ADD COLUMN is
/// idempotent here only via a column-existence check (sqlite doesn't support
/// IF NOT EXISTS on ADD COLUMN). Defaults legacy rows to `apply_commit`,
/// which matches their actual semantics.
fn ensure_kind_column(conn: &Connection) -> Result<()> {
let has_kind: bool = conn
.prepare("SELECT 1 FROM pragma_table_info('approvals') WHERE name = 'kind'")?
.exists([])?;
if !has_kind {
conn.execute_batch(
"ALTER TABLE approvals ADD COLUMN kind TEXT NOT NULL DEFAULT 'apply_commit';",
)
.context("add approvals.kind column")?;
}
Ok(())
}
/// Same shape as `ensure_kind_column` but for `fetched_sha` — the
/// canonical sha hive-c0re vouched for at `request_apply_commit` time.
/// Distinct from `commit_ref` (manager-supplied, may not even resolve
/// in proposed by the time we approve).
fn ensure_fetched_sha_column(conn: &Connection) -> Result<()> {
let has: bool = conn
.prepare("SELECT 1 FROM pragma_table_info('approvals') WHERE name = 'fetched_sha'")?
.exists([])?;
if !has {
conn.execute_batch("ALTER TABLE approvals ADD COLUMN fetched_sha TEXT;")
.context("add approvals.fetched_sha column")?;
}
Ok(())
}
pub struct Approvals {
conn: Mutex<Connection>,
}
impl Approvals {
pub fn open(path: &Path) -> Result<Self> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create approvals db parent {}", parent.display()))?;
}
let conn = Connection::open(path)
.with_context(|| format!("open approvals db {}", path.display()))?;
conn.execute_batch(SCHEMA)
.context("apply approvals schema")?;
ensure_kind_column(&conn).context("migrate approvals.kind")?;
ensure_fetched_sha_column(&conn).context("migrate approvals.fetched_sha")?;
ensure_description_column(&conn).context("migrate approvals.description")?;
Ok(Self {
conn: Mutex::new(conn),
})
}
pub fn submit_kind(
&self,
agent: &str,
kind: ApprovalKind,
commit_ref: &str,
description: Option<&str>,
) -> Result<i64> {
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO approvals (agent, kind, commit_ref, requested_at, status, description)
VALUES (?1, ?2, ?3, ?4, 'pending', ?5)",
params![agent, kind_to_str(kind), commit_ref, now_unix(), description],
)?;
Ok(conn.last_insert_rowid())
}
/// Record the canonical sha hive-c0re fetched from the proposed repo
/// into applied at submission time. Idempotent on identical values.
pub fn set_fetched_sha(&self, id: i64, sha: &str) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute(
"UPDATE approvals SET fetched_sha = ?1 WHERE id = ?2",
params![sha, id],
)?;
Ok(())
}
/// Last `limit` resolved approvals (approved / denied / failed),
/// newest-first. Drives the history tab on the dashboard.
pub fn recent_resolved(&self, limit: u64) -> Result<Vec<Approval>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id, agent, kind, commit_ref, requested_at, status, resolved_at, note, fetched_sha, description
FROM approvals
WHERE status IN ('approved', 'denied', 'failed')
ORDER BY resolved_at DESC, id DESC
LIMIT ?1",
)?;
let rows = stmt.query_map([limit], row_to_approval)?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.map_err(Into::into)
}
pub fn pending(&self) -> Result<Vec<Approval>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id, agent, kind, commit_ref, requested_at, status, resolved_at, note, fetched_sha, description
FROM approvals
WHERE status = 'pending'
ORDER BY id ASC",
)?;
let rows = stmt.query_map([], row_to_approval)?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.map_err(Into::into)
}
pub fn get(&self, id: i64) -> Result<Option<Approval>> {
let conn = self.conn.lock().unwrap();
conn.query_row(
"SELECT id, agent, kind, commit_ref, requested_at, status, resolved_at, note, fetched_sha, description
FROM approvals WHERE id = ?1",
params![id],
row_to_approval,
)
.optional()
.map_err(Into::into)
}
/// Mark pending -> approved (or fail if not pending). Returns the (now-updated)
/// approval so the caller can run the action and pass the agent name.
pub fn mark_approved(&self, id: i64) -> Result<Approval> {
let conn = self.conn.lock().unwrap();
let current: Option<(String, String, String, i64, String, Option<String>, Option<String>)> =
conn.query_row(
"SELECT agent, kind, commit_ref, requested_at, status, fetched_sha, description
FROM approvals WHERE id = ?1",
params![id],
|row| {
Ok((
row.get(0)?,
row.get(1)?,
row.get(2)?,
row.get(3)?,
row.get(4)?,
row.get(5)?,
row.get(6)?,
))
},
)
.optional()?;
let Some((agent, kind, commit_ref, requested_at, status, fetched_sha, description)) =
current
else {
bail!("approval {id} not found");
};
if status != "pending" {
bail!("approval {id} is {status}, not pending");
}
let resolved_at = now_unix();
conn.execute(
"UPDATE approvals SET status = 'approved', resolved_at = ?1 WHERE id = ?2",
params![resolved_at, id],
)?;
Ok(Approval {
id,
agent,
kind: kind_from_str(&kind)?,
commit_ref,
requested_at,
status: ApprovalStatus::Approved,
resolved_at: Some(resolved_at),
note: None,
fetched_sha,
description,
})
}
pub fn mark_denied(&self, id: i64, note: Option<&str>) -> Result<()> {
let conn = self.conn.lock().unwrap();
let affected = conn.execute(
"UPDATE approvals SET status = 'denied', resolved_at = ?1, note = ?2
WHERE id = ?3 AND status = 'pending'",
params![now_unix(), note, id],
)?;
if affected == 0 {
bail!("approval {id} not pending");
}
Ok(())
}
pub fn mark_failed(&self, id: i64, note: &str) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute(
"UPDATE approvals SET status = 'failed', resolved_at = ?1, note = ?2 WHERE id = ?3",
params![now_unix(), note, id],
)?;
Ok(())
}
/// Mark every pending approval for `agent` as failed (returns rows affected).
/// Used by `destroy` to clear the queue of an agent that no longer exists.
pub fn fail_pending_for_agent(&self, agent: &str, note: &str) -> Result<usize> {
let conn = self.conn.lock().unwrap();
let n = conn.execute(
"UPDATE approvals SET status = 'failed', resolved_at = ?1, note = ?2
WHERE agent = ?3 AND status = 'pending'",
params![now_unix(), note, agent],
)?;
Ok(n)
}
}
fn row_to_approval(row: &rusqlite::Row<'_>) -> rusqlite::Result<Approval> {
// Column order: id, agent, kind, commit_ref, requested_at, status, resolved_at, note, fetched_sha, description.
let kind: String = row.get(2)?;
let kind = match kind.as_str() {
"apply_commit" => ApprovalKind::ApplyCommit,
"spawn" => ApprovalKind::Spawn,
other => {
return Err(rusqlite::Error::FromSqlConversionFailure(
2,
rusqlite::types::Type::Text,
format!("unknown approval kind '{other}'").into(),
));
}
};
let status: String = row.get(5)?;
let status = match status.as_str() {
"pending" => ApprovalStatus::Pending,
"approved" => ApprovalStatus::Approved,
"denied" => ApprovalStatus::Denied,
"failed" => ApprovalStatus::Failed,
other => {
return Err(rusqlite::Error::FromSqlConversionFailure(
5,
rusqlite::types::Type::Text,
format!("unknown approval status '{other}'").into(),
));
}
};
Ok(Approval {
id: row.get(0)?,
agent: row.get(1)?,
kind,
commit_ref: row.get(3)?,
requested_at: row.get(4)?,
status,
resolved_at: row.get(6)?,
note: row.get(7)?,
fetched_sha: row.get(8)?,
description: row.get(9)?,
})
}
fn kind_to_str(kind: ApprovalKind) -> &'static str {
match kind {
ApprovalKind::ApplyCommit => "apply_commit",
ApprovalKind::Spawn => "spawn",
}
}
fn kind_from_str(s: &str) -> Result<ApprovalKind> {
Ok(match s {
"apply_commit" => ApprovalKind::ApplyCommit,
"spawn" => ApprovalKind::Spawn,
other => bail!("unknown approval kind '{other}'"),
})
}
fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0)
}