//! Operator question queue. Manager submits via `AskOperator`; the //! operator answers via the dashboard. The manager-socket handler long-polls //! the store until the answer lands, so claude's `ask_operator` tool call //! returns the answer directly as its result. use std::path::Path; use std::sync::Mutex; use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result, bail}; use rusqlite::{Connection, OptionalExtension, params}; use serde::Serialize; const SCHEMA: &str = r" CREATE TABLE IF NOT EXISTS operator_questions ( id INTEGER PRIMARY KEY AUTOINCREMENT, asker TEXT NOT NULL, question TEXT NOT NULL, options_json TEXT NOT NULL, asked_at INTEGER NOT NULL, answered_at INTEGER, answer TEXT ); CREATE INDEX IF NOT EXISTS idx_operator_questions_pending ON operator_questions (id) WHERE answered_at IS NULL; "; /// Add late-added columns to pre-existing databases. `ALTER TABLE /// ADD COLUMN` has no `IF NOT EXISTS` form in sqlite, so we check /// `pragma_table_info` first per column. fn ensure_columns(conn: &Connection) -> Result<()> { for (name, sql) in [ ( "multi", "ALTER TABLE operator_questions ADD COLUMN multi INTEGER NOT NULL DEFAULT 0;", ), ( "deadline_at", "ALTER TABLE operator_questions ADD COLUMN deadline_at INTEGER;", ), ] { let has: bool = conn .prepare(&format!( "SELECT 1 FROM pragma_table_info('operator_questions') WHERE name = '{name}'" ))? .exists([])?; if !has { conn.execute_batch(sql) .with_context(|| format!("add operator_questions.{name} column"))?; } } Ok(()) } #[derive(Debug, Clone, Serialize)] #[allow(clippy::doc_markdown)] pub struct OpQuestion { pub id: i64, pub asker: String, pub question: String, pub options: Vec, pub multi: bool, pub asked_at: i64, /// Absolute unix-seconds deadline after which a watchdog auto- /// resolves the question with answer `[expired]`. `None` = no /// expiry. Surfaced on the dashboard as a remaining-time chip. pub deadline_at: Option, pub answered_at: Option, pub answer: Option, } pub struct OperatorQuestions { conn: Mutex, } impl OperatorQuestions { pub fn open(path: &Path) -> Result { if let Some(parent) = path.parent() { std::fs::create_dir_all(parent).with_context(|| { format!("create operator_questions db parent {}", parent.display()) })?; } let conn = Connection::open(path) .with_context(|| format!("open operator_questions db {}", path.display()))?; conn.execute_batch(SCHEMA) .context("apply operator_questions schema")?; ensure_columns(&conn).context("migrate operator_questions columns")?; Ok(Self { conn: Mutex::new(conn), }) } pub fn submit( &self, asker: &str, question: &str, options: &[String], multi: bool, deadline_at: Option, ) -> Result { let conn = self.conn.lock().unwrap(); let options_json = serde_json::to_string(options).unwrap_or_else(|_| "[]".into()); conn.execute( "INSERT INTO operator_questions (asker, question, options_json, multi, deadline_at, asked_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", params![ asker, question, options_json, i64::from(multi), deadline_at, now_unix(), ], )?; Ok(conn.last_insert_rowid()) } /// Mark the question answered. Returns the original question text so the /// Mark a pending question answered. Returns `(question, asker)` /// so the caller can both echo the question back in a helper /// event AND route that event to whichever agent originally /// asked it. pub fn answer(&self, id: i64, answer: &str) -> Result<(String, String)> { let conn = self.conn.lock().unwrap(); let row: Option<(String, String, Option)> = conn .query_row( "SELECT question, asker, answered_at FROM operator_questions WHERE id = ?1", params![id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), ) .optional()?; let Some((question, asker, answered_at)) = row else { bail!("question {id} not found"); }; if answered_at.is_some() { bail!("question {id} already answered"); } conn.execute( "UPDATE operator_questions SET answer = ?1, answered_at = ?2 WHERE id = ?3", params![answer, now_unix(), id], )?; Ok((question, asker)) } #[allow(dead_code)] pub fn get(&self, id: i64) -> Result> { let conn = self.conn.lock().unwrap(); conn.query_row( "SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at FROM operator_questions WHERE id = ?1", params![id], row_to_question, ) .optional() .map_err(Into::into) } pub fn pending(&self) -> Result> { let conn = self.conn.lock().unwrap(); let mut stmt = conn.prepare( "SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at FROM operator_questions WHERE answered_at IS NULL ORDER BY id ASC", )?; let rows = stmt.query_map([], row_to_question)?; rows.collect::>>() .map_err(Into::into) } } fn row_to_question(row: &rusqlite::Row<'_>) -> rusqlite::Result { let options_json: String = row.get(3)?; let options: Vec = serde_json::from_str(&options_json).unwrap_or_default(); let multi: i64 = row.get(4)?; Ok(OpQuestion { id: row.get(0)?, asker: row.get(1)?, question: row.get(2)?, options, multi: multi != 0, asked_at: row.get(5)?, answered_at: row.get(6)?, answer: row.get(7)?, deadline_at: row.get(8)?, }) } fn now_unix() -> i64 { SystemTime::now() .duration_since(UNIX_EPOCH) .ok() .and_then(|d| i64::try_from(d.as_secs()).ok()) .unwrap_or(0) }