318 lines
12 KiB
Rust
318 lines
12 KiB
Rust
//! Question queue. Agents submit via `Ask`; the answer comes from
|
|
//! either the operator (via the dashboard, for `target IS NULL`) or
|
|
//! a peer agent (via `Answer`, for agent-to-agent questions).
|
|
//!
|
|
//! Despite the file name (kept for git history sanity), this table
|
|
//! now stores *all* asynchronous questions in the hive — both the
|
|
//! operator-targeted ones and the peer-to-peer ones. `target IS
|
|
//! NULL` is the operator path (back-compat with rows written before
|
|
//! the column existed); `target = '<agent-name>'` is the
|
|
//! agent-to-agent path.
|
|
|
|
use std::path::Path;
|
|
use std::sync::Mutex;
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
|
|
use anyhow::{Context, Result, bail};
|
|
use rusqlite::{Connection, OptionalExtension, params};
|
|
use serde::Serialize;
|
|
|
|
const SCHEMA: &str = r"
|
|
CREATE TABLE IF NOT EXISTS operator_questions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
asker TEXT NOT NULL,
|
|
question TEXT NOT NULL,
|
|
options_json TEXT NOT NULL,
|
|
asked_at INTEGER NOT NULL,
|
|
answered_at INTEGER,
|
|
answer TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_operator_questions_pending
|
|
ON operator_questions (id) WHERE answered_at IS NULL;
|
|
";
|
|
|
|
/// Add late-added columns to pre-existing databases. `ALTER TABLE
|
|
/// ADD COLUMN` has no `IF NOT EXISTS` form in sqlite, so we check
|
|
/// `pragma_table_info` first per column.
|
|
fn ensure_columns(conn: &Connection) -> Result<()> {
|
|
for (name, sql) in [
|
|
(
|
|
"multi",
|
|
"ALTER TABLE operator_questions ADD COLUMN multi INTEGER NOT NULL DEFAULT 0;",
|
|
),
|
|
(
|
|
"deadline_at",
|
|
"ALTER TABLE operator_questions ADD COLUMN deadline_at INTEGER;",
|
|
),
|
|
// `target` = recipient of the question. NULL = operator
|
|
// (back-compat default for rows written before agent-to-agent
|
|
// questions existed); a non-null agent name = peer-to-peer
|
|
// question. Dashboard's `pending()` filters on `target IS NULL`
|
|
// so peer questions never leak into the operator's queue.
|
|
(
|
|
"target",
|
|
"ALTER TABLE operator_questions ADD COLUMN target TEXT;",
|
|
),
|
|
] {
|
|
let has: bool = conn
|
|
.prepare(&format!(
|
|
"SELECT 1 FROM pragma_table_info('operator_questions') WHERE name = '{name}'"
|
|
))?
|
|
.exists([])?;
|
|
if !has {
|
|
conn.execute_batch(sql)
|
|
.with_context(|| format!("add operator_questions.{name} column"))?;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize)]
|
|
#[allow(clippy::doc_markdown)]
|
|
pub struct OpQuestion {
|
|
pub id: i64,
|
|
pub asker: String,
|
|
pub question: String,
|
|
pub options: Vec<String>,
|
|
pub multi: bool,
|
|
pub asked_at: i64,
|
|
/// Absolute unix-seconds deadline after which a watchdog auto-
|
|
/// resolves the question with answer `[expired]`. `None` = no
|
|
/// expiry. Surfaced on the dashboard as a remaining-time chip.
|
|
pub deadline_at: Option<i64>,
|
|
pub answered_at: Option<i64>,
|
|
pub answer: Option<String>,
|
|
/// Recipient of the question. `None` = the operator (dashboard
|
|
/// path); `Some(<agent>)` = a peer agent asked via
|
|
/// `Ask { to: Some(<agent>), ... }`. Agent-to-agent questions
|
|
/// never appear in `pending()` so the operator's queue stays clean.
|
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
|
pub target: Option<String>,
|
|
}
|
|
|
|
pub struct OperatorQuestions {
|
|
conn: Mutex<Connection>,
|
|
}
|
|
|
|
impl OperatorQuestions {
|
|
pub fn open(path: &Path) -> Result<Self> {
|
|
if let Some(parent) = path.parent() {
|
|
std::fs::create_dir_all(parent).with_context(|| {
|
|
format!("create operator_questions db parent {}", parent.display())
|
|
})?;
|
|
}
|
|
let conn = Connection::open(path)
|
|
.with_context(|| format!("open operator_questions db {}", path.display()))?;
|
|
conn.execute_batch(SCHEMA)
|
|
.context("apply operator_questions schema")?;
|
|
ensure_columns(&conn).context("migrate operator_questions columns")?;
|
|
Ok(Self {
|
|
conn: Mutex::new(conn),
|
|
})
|
|
}
|
|
|
|
pub fn submit(
|
|
&self,
|
|
asker: &str,
|
|
question: &str,
|
|
options: &[String],
|
|
multi: bool,
|
|
deadline_at: Option<i64>,
|
|
target: Option<&str>,
|
|
) -> Result<i64> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let options_json = serde_json::to_string(options).unwrap_or_else(|_| "[]".into());
|
|
conn.execute(
|
|
"INSERT INTO operator_questions
|
|
(asker, question, options_json, multi, deadline_at, target, asked_at)
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
|
|
params![
|
|
asker,
|
|
question,
|
|
options_json,
|
|
i64::from(multi),
|
|
deadline_at,
|
|
target,
|
|
now_unix(),
|
|
],
|
|
)?;
|
|
Ok(conn.last_insert_rowid())
|
|
}
|
|
|
|
/// Mark a pending question answered. `answerer` is who's actually
|
|
/// answering: `"operator"` for the dashboard path, or an agent's
|
|
/// own name when responding via `Answer`. Authorisation:
|
|
///
|
|
/// - Operator-targeted questions (`target IS NULL`) can only be
|
|
/// answered by `"operator"`. (Agents must not be able to spoof
|
|
/// answers to operator questions — the dashboard is the
|
|
/// privileged path.)
|
|
/// - Agent-targeted questions can only be answered by the
|
|
/// declared target agent, OR by `"operator"` (operator override
|
|
/// for stuck threads — useful when an agent is offline/down
|
|
/// and someone has to close the loop).
|
|
///
|
|
/// Returns `(question, asker, target)` so the caller can fire the
|
|
/// `QuestionAnswered` event with the right answerer label and route
|
|
/// it back to the original asker.
|
|
pub fn answer(
|
|
&self,
|
|
id: i64,
|
|
answer: &str,
|
|
answerer: &str,
|
|
) -> Result<(String, String, Option<String>)> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let row: Option<(String, String, Option<String>, Option<i64>)> = conn
|
|
.query_row(
|
|
"SELECT question, asker, target, answered_at FROM operator_questions WHERE id = ?1",
|
|
params![id],
|
|
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
|
|
)
|
|
.optional()?;
|
|
let Some((question, asker, target, answered_at)) = row else {
|
|
bail!("question {id} not found");
|
|
};
|
|
if answered_at.is_some() {
|
|
bail!("question {id} already answered");
|
|
}
|
|
// Authorisation check: must match the target, or be the operator
|
|
// (operator-targeted questions are operator-only; the operator
|
|
// can additionally override agent-to-agent questions to close
|
|
// stuck threads).
|
|
let authorised = match target.as_deref() {
|
|
None => answerer == hive_sh4re::OPERATOR_RECIPIENT,
|
|
Some(t) => answerer == t || answerer == hive_sh4re::OPERATOR_RECIPIENT,
|
|
};
|
|
if !authorised {
|
|
bail!(
|
|
"question {id} not addressed to '{answerer}' (target = {:?})",
|
|
target.as_deref().unwrap_or(hive_sh4re::OPERATOR_RECIPIENT)
|
|
);
|
|
}
|
|
conn.execute(
|
|
"UPDATE operator_questions SET answer = ?1, answered_at = ?2 WHERE id = ?3",
|
|
params![answer, now_unix(), id],
|
|
)?;
|
|
Ok((question, asker, target))
|
|
}
|
|
|
|
/// Cancel a pending question on behalf of `canceller`. Returns
|
|
/// `(question, asker, target)` so the caller can fire the usual
|
|
/// `QuestionAnswered` event to the asker with a `[cancelled by
|
|
/// <canceller>]` sentinel.
|
|
///
|
|
/// Auth: the canceller must be one of:
|
|
/// - the original asker (an agent withdrawing their own ask),
|
|
/// - the operator (already covered by the existing `answer` path
|
|
/// but allowed here too for symmetry / dashboard cancel),
|
|
/// - the manager (privileged hive-wide cleanup).
|
|
///
|
|
/// Not the target — that's covered by `answer` (responding with
|
|
/// an actual reply, sentinel or otherwise).
|
|
pub fn cancel(
|
|
&self,
|
|
id: i64,
|
|
canceller: &str,
|
|
) -> Result<(String, String, Option<String>)> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let row: Option<(String, String, Option<String>, Option<i64>)> = conn
|
|
.query_row(
|
|
"SELECT question, asker, target, answered_at FROM operator_questions WHERE id = ?1",
|
|
params![id],
|
|
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
|
|
)
|
|
.optional()?;
|
|
let Some((question, asker, target, answered_at)) = row else {
|
|
bail!("question {id} not found");
|
|
};
|
|
if answered_at.is_some() {
|
|
bail!("question {id} already answered/cancelled");
|
|
}
|
|
let authorised = canceller == asker
|
|
|| canceller == hive_sh4re::OPERATOR_RECIPIENT
|
|
|| canceller == hive_sh4re::MANAGER_AGENT;
|
|
if !authorised {
|
|
bail!(
|
|
"question {id}: '{canceller}' not allowed to cancel (asker = '{asker}')"
|
|
);
|
|
}
|
|
let sentinel = format!("[cancelled by {canceller}]");
|
|
conn.execute(
|
|
"UPDATE operator_questions SET answer = ?1, answered_at = ?2 WHERE id = ?3",
|
|
params![sentinel, now_unix(), id],
|
|
)?;
|
|
Ok((question, asker, target))
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
pub fn get(&self, id: i64) -> Result<Option<OpQuestion>> {
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.query_row(
|
|
"SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at, target
|
|
FROM operator_questions WHERE id = ?1",
|
|
params![id],
|
|
row_to_question,
|
|
)
|
|
.optional()
|
|
.map_err(Into::into)
|
|
}
|
|
|
|
/// Every pending question, operator-targeted or peer-to-peer.
|
|
/// Drives the dashboard's questions pane now that peer threads
|
|
/// are surfaced for visibility + operator override-answer.
|
|
pub fn pending_all(&self) -> Result<Vec<OpQuestion>> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let mut stmt = conn.prepare(
|
|
"SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at, target
|
|
FROM operator_questions
|
|
WHERE answered_at IS NULL
|
|
ORDER BY id ASC",
|
|
)?;
|
|
let rows = stmt.query_map([], row_to_question)?;
|
|
rows.collect::<rusqlite::Result<Vec<_>>>()
|
|
.map_err(Into::into)
|
|
}
|
|
|
|
/// Last `limit` answered questions across both target kinds,
|
|
/// newest-first. Companion to `pending_all`.
|
|
pub fn recent_answered_all(&self, limit: u64) -> Result<Vec<OpQuestion>> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let mut stmt = conn.prepare(
|
|
"SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at, target
|
|
FROM operator_questions
|
|
WHERE answered_at IS NOT NULL
|
|
ORDER BY answered_at DESC
|
|
LIMIT ?1",
|
|
)?;
|
|
let rows = stmt.query_map(params![limit as i64], row_to_question)?;
|
|
rows.collect::<rusqlite::Result<Vec<_>>>()
|
|
.map_err(Into::into)
|
|
}
|
|
|
|
}
|
|
|
|
fn row_to_question(row: &rusqlite::Row<'_>) -> rusqlite::Result<OpQuestion> {
|
|
let options_json: String = row.get(3)?;
|
|
let options: Vec<String> = serde_json::from_str(&options_json).unwrap_or_default();
|
|
let multi: i64 = row.get(4)?;
|
|
Ok(OpQuestion {
|
|
id: row.get(0)?,
|
|
asker: row.get(1)?,
|
|
question: row.get(2)?,
|
|
options,
|
|
multi: multi != 0,
|
|
asked_at: row.get(5)?,
|
|
answered_at: row.get(6)?,
|
|
answer: row.get(7)?,
|
|
deadline_at: row.get(8)?,
|
|
target: row.get(9)?,
|
|
})
|
|
}
|
|
|
|
fn now_unix() -> i64 {
|
|
SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.ok()
|
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
|
.unwrap_or(0)
|
|
}
|