ask_operator: ttl_seconds auto-cancel + remaining-time chip

manager can pass ttl_seconds to ask_operator. on submit, host
stores deadline_at = now + ttl in operator_questions (new column,
migrated via existing pragma_table_info pattern), spawns a tokio
task that sleeps until the deadline then resolves the question with
answer '[expired]' and fires the same OperatorAnswered helper event.
already-resolved races no-op silently.

dashboard renders a ' MM:SS' chip on the question row when
deadline_at is set. format collapses seconds → s, < 1h → m s, ≥ 1h
→ h m. heartbeat refresh (5s) keeps the chip current; the operator
sees it tick down.

manager prompt + mcp tool description updated. journald viewer per
container queued in todo (separate task).
This commit is contained in:
müde 2026-05-15 20:38:02 +02:00
parent 2146e47770
commit 754db7830e
8 changed files with 133 additions and 36 deletions

View file

@ -72,7 +72,7 @@ async fn serve(stream: UnixStream, coord: Arc<Coordinator>) -> Result<()> {
const MANAGER_RECV_LONG_POLL: std::time::Duration = std::time::Duration::from_secs(30);
#[allow(clippy::too_many_lines)]
async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse {
async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResponse {
match req {
ManagerRequest::Send { to, body } => match coord.broker.send(&Message {
from: MANAGER_AGENT.to_owned(),
@ -198,14 +198,26 @@ async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse
question,
options,
multi,
ttl_seconds,
} => {
tracing::info!(%question, ?options, multi, "manager: ask_operator");
tracing::info!(%question, ?options, multi, ?ttl_seconds, "manager: ask_operator");
let deadline_at = ttl_seconds.and_then(|s| {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0);
i64::try_from(s).ok().map(|s| now + s)
});
match coord
.questions
.submit(MANAGER_AGENT, question, options, *multi)
.submit(MANAGER_AGENT, question, options, *multi, deadline_at)
{
Ok(id) => {
tracing::info!(%id, "operator question queued");
tracing::info!(%id, ?deadline_at, "operator question queued");
if let Some(ttl) = *ttl_seconds {
spawn_question_watchdog(coord, id, ttl);
}
ManagerResponse::QuestionQueued { id }
}
Err(e) => ManagerResponse::Err {
@ -227,3 +239,28 @@ async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse
}
}
}
/// On `AskOperator { ttl_seconds: Some(n) }`, sleep n seconds and then
/// try to resolve the question with `[expired]`. If the operator (or
/// any other path) already answered it, `answer()` returns Err and
/// we no-op silently. Otherwise fire the usual `OperatorAnswered`
/// helper event so the manager sees a terminal state.
const TTL_SENTINEL: &str = "[expired]";
fn spawn_question_watchdog(coord: &Arc<Coordinator>, id: i64, ttl_secs: u64) {
let coord = coord.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(ttl_secs)).await;
// `answer` returns Err if already resolved — that's the
// normal path when the operator responded before the ttl
// fired, so no-op silently.
if let Ok(question) = coord.questions.answer(id, TTL_SENTINEL) {
tracing::info!(%id, "operator question expired (ttl)");
coord.notify_manager(&hive_sh4re::HelperEvent::OperatorAnswered {
id,
question,
answer: TTL_SENTINEL.to_owned(),
});
}
});
}

View file

@ -25,17 +25,29 @@ CREATE INDEX IF NOT EXISTS idx_operator_questions_pending
ON operator_questions (id) WHERE answered_at IS NULL;
";
/// Add the `multi` column to pre-existing databases. `ALTER TABLE ADD COLUMN`
/// has no `IF NOT EXISTS` form in sqlite, so we check `pragma_table_info` first.
fn ensure_multi_column(conn: &Connection) -> Result<()> {
let has: bool = conn
.prepare("SELECT 1 FROM pragma_table_info('operator_questions') WHERE name = 'multi'")?
.exists([])?;
if !has {
conn.execute_batch(
/// Add late-added columns to pre-existing databases. `ALTER TABLE
/// ADD COLUMN` has no `IF NOT EXISTS` form in sqlite, so we check
/// `pragma_table_info` first per column.
fn ensure_columns(conn: &Connection) -> Result<()> {
for (name, sql) in [
(
"multi",
"ALTER TABLE operator_questions ADD COLUMN multi INTEGER NOT NULL DEFAULT 0;",
)
.context("add operator_questions.multi column")?;
),
(
"deadline_at",
"ALTER TABLE operator_questions ADD COLUMN deadline_at INTEGER;",
),
] {
let has: bool = conn
.prepare(&format!(
"SELECT 1 FROM pragma_table_info('operator_questions') WHERE name = '{name}'"
))?
.exists([])?;
if !has {
conn.execute_batch(sql)
.with_context(|| format!("add operator_questions.{name} column"))?;
}
}
Ok(())
}
@ -49,6 +61,10 @@ pub struct OpQuestion {
pub options: Vec<String>,
pub multi: bool,
pub asked_at: i64,
/// Absolute unix-seconds deadline after which a watchdog auto-
/// resolves the question with answer `[expired]`. `None` = no
/// expiry. Surfaced on the dashboard as a remaining-time chip.
pub deadline_at: Option<i64>,
pub answered_at: Option<i64>,
pub answer: Option<String>,
}
@ -68,7 +84,7 @@ impl OperatorQuestions {
.with_context(|| format!("open operator_questions db {}", path.display()))?;
conn.execute_batch(SCHEMA)
.context("apply operator_questions schema")?;
ensure_multi_column(&conn).context("migrate operator_questions.multi")?;
ensure_columns(&conn).context("migrate operator_questions columns")?;
Ok(Self {
conn: Mutex::new(conn),
})
@ -80,13 +96,22 @@ impl OperatorQuestions {
question: &str,
options: &[String],
multi: bool,
deadline_at: Option<i64>,
) -> Result<i64> {
let conn = self.conn.lock().unwrap();
let options_json = serde_json::to_string(options).unwrap_or_else(|_| "[]".into());
conn.execute(
"INSERT INTO operator_questions (asker, question, options_json, multi, asked_at)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![asker, question, options_json, i64::from(multi), now_unix()],
"INSERT INTO operator_questions
(asker, question, options_json, multi, deadline_at, asked_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![
asker,
question,
options_json,
i64::from(multi),
deadline_at,
now_unix(),
],
)?;
Ok(conn.last_insert_rowid())
}
@ -119,7 +144,7 @@ impl OperatorQuestions {
pub fn get(&self, id: i64) -> Result<Option<OpQuestion>> {
let conn = self.conn.lock().unwrap();
conn.query_row(
"SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer
"SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at
FROM operator_questions WHERE id = ?1",
params![id],
row_to_question,
@ -131,7 +156,7 @@ impl OperatorQuestions {
pub fn pending(&self) -> Result<Vec<OpQuestion>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer
"SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at
FROM operator_questions
WHERE answered_at IS NULL
ORDER BY id ASC",
@ -155,6 +180,7 @@ fn row_to_question(row: &rusqlite::Row<'_>) -> rusqlite::Result<OpQuestion> {
asked_at: row.get(5)?,
answered_at: row.get(6)?,
answer: row.get(7)?,
deadline_at: row.get(8)?,
})
}