hyperhive/hive-c0re/src/operator_questions.rs
müde 2a6d084718 ask_operator: any agent can call it, answer routes by asker
new AgentRequest::AskOperator + AgentResponse::QuestionQueued on
the per-agent socket — same shape as the manager flavor, agent
gets the same wire surface (still uses the same operator_questions
table). agent_server::dispatch wires AskOperator through coord
.questions.submit(agent, ...) so the row's asker is the sub-agent
name; the ttl watchdog already in manager_server gets shared and
spawn_question_watchdog goes pub.

answer routing: operator_questions::answer now returns (question,
asker). post_answer_question + post_cancel_question + the watchdog
fire OperatorAnswered through new coord.notify_agent(asker, event)
instead of always notify_manager — the event lands in whichever
agent originally asked. notify_manager is now a thin wrapper.

agent socket plumbing: agent_server::start takes Arc<Coordinator>
instead of Arc<Broker> so dispatch has access to questions +
notify path; coordinator::{register_agent,ensure_runtime} take
self: &Arc<Self>. mcp::AgentServer grows the ask_operator tool;
allowed_mcp_tools(Agent) adds it; prompts/agent.md replaces the
'message the manager to ask the operator' guidance with the
direct tool description.
2026-05-16 01:48:10 +02:00

196 lines
6.5 KiB
Rust

//! Operator question queue. Manager submits via `AskOperator`; the
//! operator answers via the dashboard. The manager-socket handler long-polls
//! the store until the answer lands, so claude's `ask_operator` tool call
//! returns the answer directly as its result.
use std::path::Path;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result, bail};
use rusqlite::{Connection, OptionalExtension, params};
use serde::Serialize;
const SCHEMA: &str = r"
CREATE TABLE IF NOT EXISTS operator_questions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
asker TEXT NOT NULL,
question TEXT NOT NULL,
options_json TEXT NOT NULL,
asked_at INTEGER NOT NULL,
answered_at INTEGER,
answer TEXT
);
CREATE INDEX IF NOT EXISTS idx_operator_questions_pending
ON operator_questions (id) WHERE answered_at IS NULL;
";
/// Add late-added columns to pre-existing databases. `ALTER TABLE
/// ADD COLUMN` has no `IF NOT EXISTS` form in sqlite, so we check
/// `pragma_table_info` first per column.
fn ensure_columns(conn: &Connection) -> Result<()> {
for (name, sql) in [
(
"multi",
"ALTER TABLE operator_questions ADD COLUMN multi INTEGER NOT NULL DEFAULT 0;",
),
(
"deadline_at",
"ALTER TABLE operator_questions ADD COLUMN deadline_at INTEGER;",
),
] {
let has: bool = conn
.prepare(&format!(
"SELECT 1 FROM pragma_table_info('operator_questions') WHERE name = '{name}'"
))?
.exists([])?;
if !has {
conn.execute_batch(sql)
.with_context(|| format!("add operator_questions.{name} column"))?;
}
}
Ok(())
}
#[derive(Debug, Clone, Serialize)]
#[allow(clippy::doc_markdown)]
pub struct OpQuestion {
pub id: i64,
pub asker: String,
pub question: String,
pub options: Vec<String>,
pub multi: bool,
pub asked_at: i64,
/// Absolute unix-seconds deadline after which a watchdog auto-
/// resolves the question with answer `[expired]`. `None` = no
/// expiry. Surfaced on the dashboard as a remaining-time chip.
pub deadline_at: Option<i64>,
pub answered_at: Option<i64>,
pub answer: Option<String>,
}
pub struct OperatorQuestions {
conn: Mutex<Connection>,
}
impl OperatorQuestions {
pub fn open(path: &Path) -> Result<Self> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).with_context(|| {
format!("create operator_questions db parent {}", parent.display())
})?;
}
let conn = Connection::open(path)
.with_context(|| format!("open operator_questions db {}", path.display()))?;
conn.execute_batch(SCHEMA)
.context("apply operator_questions schema")?;
ensure_columns(&conn).context("migrate operator_questions columns")?;
Ok(Self {
conn: Mutex::new(conn),
})
}
pub fn submit(
&self,
asker: &str,
question: &str,
options: &[String],
multi: bool,
deadline_at: Option<i64>,
) -> Result<i64> {
let conn = self.conn.lock().unwrap();
let options_json = serde_json::to_string(options).unwrap_or_else(|_| "[]".into());
conn.execute(
"INSERT INTO operator_questions
(asker, question, options_json, multi, deadline_at, asked_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![
asker,
question,
options_json,
i64::from(multi),
deadline_at,
now_unix(),
],
)?;
Ok(conn.last_insert_rowid())
}
/// Mark the question answered. Returns the original question text so the
/// Mark a pending question answered. Returns `(question, asker)`
/// so the caller can both echo the question back in a helper
/// event AND route that event to whichever agent originally
/// asked it.
pub fn answer(&self, id: i64, answer: &str) -> Result<(String, String)> {
let conn = self.conn.lock().unwrap();
let row: Option<(String, String, Option<i64>)> = conn
.query_row(
"SELECT question, asker, answered_at FROM operator_questions WHERE id = ?1",
params![id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)
.optional()?;
let Some((question, asker, answered_at)) = row else {
bail!("question {id} not found");
};
if answered_at.is_some() {
bail!("question {id} already answered");
}
conn.execute(
"UPDATE operator_questions SET answer = ?1, answered_at = ?2 WHERE id = ?3",
params![answer, now_unix(), id],
)?;
Ok((question, asker))
}
#[allow(dead_code)]
pub fn get(&self, id: i64) -> Result<Option<OpQuestion>> {
let conn = self.conn.lock().unwrap();
conn.query_row(
"SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at
FROM operator_questions WHERE id = ?1",
params![id],
row_to_question,
)
.optional()
.map_err(Into::into)
}
pub fn pending(&self) -> Result<Vec<OpQuestion>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at
FROM operator_questions
WHERE answered_at IS NULL
ORDER BY id ASC",
)?;
let rows = stmt.query_map([], row_to_question)?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.map_err(Into::into)
}
}
fn row_to_question(row: &rusqlite::Row<'_>) -> rusqlite::Result<OpQuestion> {
let options_json: String = row.get(3)?;
let options: Vec<String> = serde_json::from_str(&options_json).unwrap_or_default();
let multi: i64 = row.get(4)?;
Ok(OpQuestion {
id: row.get(0)?,
asker: row.get(1)?,
question: row.get(2)?,
options,
multi: multi != 0,
asked_at: row.get(5)?,
answered_at: row.get(6)?,
answer: row.get(7)?,
deadline_at: row.get(8)?,
})
}
fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0)
}