From 754db7830e099956f819422231a29d013291f2c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?m=C3=BCde?= Date: Fri, 15 May 2026 20:38:02 +0200 Subject: [PATCH] ask_operator: ttl_seconds auto-cancel + remaining-time chip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit manager can pass ttl_seconds to ask_operator. on submit, host stores deadline_at = now + ttl in operator_questions (new column, migrated via existing pragma_table_info pattern), spawns a tokio task that sleeps until the deadline then resolves the question with answer '[expired]' and fires the same OperatorAnswered helper event. already-resolved races no-op silently. dashboard renders a '⏳ MM:SS' chip on the question row when deadline_at is set. format collapses seconds → s, < 1h → m s, ≥ 1h → h m. heartbeat refresh (5s) keeps the chip current; the operator sees it tick down. manager prompt + mcp tool description updated. journald viewer per container queued in todo (separate task). --- TODO.md | 18 +++++---- hive-ag3nt/prompts/manager.md | 2 +- hive-ag3nt/src/mcp.rs | 11 +++++- hive-c0re/assets/app.js | 23 ++++++++---- hive-c0re/assets/dashboard.css | 6 +++ hive-c0re/src/manager_server.rs | 45 ++++++++++++++++++++-- hive-c0re/src/operator_questions.rs | 58 +++++++++++++++++++++-------- hive-sh4re/src/lib.rs | 6 +++ 8 files changed, 133 insertions(+), 36 deletions(-) diff --git a/TODO.md b/TODO.md index 1896fed..a84af5f 100644 --- a/TODO.md +++ b/TODO.md @@ -68,13 +68,6 @@ Pick anything from here when relevant. Cross-cutting design notes live in ## Manager → operator question channel -- **TTL on `ask_operator`.** Manual cancel via dashboard already - ships (✗ CANC3L button resolves the question with answer - `[cancelled]` and fires `OperatorAnswered` so the manager sees a - terminal state). Still missing: per-question `ttl_seconds` that - auto-cancels after a deadline. Spawn a tokio task per submitted - question that calls the same cancel path after the ttl expires - (cheap; rare). Surface remaining time on the dashboard. ## Spawn flow @@ -114,6 +107,17 @@ Pick anything from here when relevant. Cross-cutting design notes live in ## Lifecycle / reliability +- **journald viewer per container in the dashboard.** Surface the + equivalent of `journalctl -M h-coder -b` in the dashboard so the + operator can see container logs without ssh-ing in. Optional + filter by hive-specific systemd unit (`hive-ag3nt.service`, + `hive-m1nd.service`). Implementation: backend shells out to + `journalctl -M -b --output=short-iso --no-pager` + (optionally `-u `), streams or paginates the result over a + new dashboard endpoint. Could be a `
` per container row + or a dedicated page. Honest journalctl, not the in-container + events stream — those are different surfaces (events = claude turn + loop; journalctl = systemd-wide logs incl. boot, network, etc.). - **Container crash events.** Watch `container@*.service` via D-Bus, push `HelperEvent::ContainerCrash` to the manager's inbox so the manager can react (restart, escalate, etc.). diff --git a/hive-ag3nt/prompts/manager.md b/hive-ag3nt/prompts/manager.md index 15b82f3..ca96cae 100644 --- a/hive-ag3nt/prompts/manager.md +++ b/hive-ag3nt/prompts/manager.md @@ -9,7 +9,7 @@ Tools (hyperhive surface): - `mcp__hyperhive__start(name)` — start a stopped sub-agent. No approval required. - `mcp__hyperhive__restart(name)` — stop + start a sub-agent. No approval required. - `mcp__hyperhive__request_apply_commit(agent, commit_ref)` — submit a config change for any agent (`hm1nd` for self) for operator approval. -- `mcp__hyperhive__ask_operator(question, options?, multi?)` — surface a question on the dashboard. Returns immediately with a question id; the operator's answer arrives later as a system `operator_answered` event in your inbox. Options are advisory: the dashboard always lets the operator type a free-text answer in addition. Set `multi: true` to render options as checkboxes (operator can pick multiple); the answer comes back as `, `-separated. Do not poll inside the same turn — finish the current work and react when the event lands. +- `mcp__hyperhive__ask_operator(question, options?, multi?, ttl_seconds?)` — surface a question on the dashboard. Returns immediately with a question id; the operator's answer arrives later as a system `operator_answered` event in your inbox. Options are advisory: the dashboard always lets the operator type a free-text answer in addition. Set `multi: true` to render options as checkboxes (operator can pick multiple); the answer comes back as `, `-separated. Set `ttl_seconds` to auto-cancel after a deadline — useful when the decision becomes moot if the operator hasn't responded in time; on expiry the answer is `[expired]`. Do not poll inside the same turn — finish the current work and react when the event lands. Approval boundary: lifecycle ops on *existing* sub-agents (`kill`, `start`, `restart`) are at your discretion — no operator approval. *Creating* a new agent (`request_spawn`) and *changing* any agent's config (`request_apply_commit`) still go through the approval queue. The operator only signs off on changes; you run the day-to-day. diff --git a/hive-ag3nt/src/mcp.rs b/hive-ag3nt/src/mcp.rs index cf1d76b..d47a021 100644 --- a/hive-ag3nt/src/mcp.rs +++ b/hive-ag3nt/src/mcp.rs @@ -240,6 +240,12 @@ pub struct AskOperatorArgs { /// selections joined by ", ". Ignored when `options` is empty. #[serde(default)] pub multi: bool, + /// Optional auto-cancel after `ttl_seconds`. On expiry the question + /// resolves with answer `[expired]` and the manager receives the + /// usual `operator_answered` system event. `None` (default) = + /// wait indefinitely. + #[serde(default)] + pub ttl_seconds: Option, } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] @@ -380,7 +386,9 @@ impl ManagerServer { request, policy call, scope clarification). `options` is advisory: pass a short \ fixed-choice list when applicable, otherwise leave empty for free text. Set \ `multi: true` to let the operator pick multiple options (checkboxes); the answer \ - comes back as a comma-separated string." + comes back as a comma-separated string. Set `ttl_seconds` to auto-cancel a \ + no-longer-relevant question instead of blocking forever — on expiry the answer \ + is `[expired]` and the same `operator_answered` event fires." )] async fn ask_operator(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); @@ -390,6 +398,7 @@ impl ManagerServer { question: args.question, options: args.options, multi: args.multi, + ttl_seconds: args.ttl_seconds, }) .await; match resp { diff --git a/hive-c0re/assets/app.js b/hive-c0re/assets/app.js index feb1926..61ee03e 100644 --- a/hive-c0re/assets/app.js +++ b/hive-c0re/assets/app.js @@ -237,14 +237,23 @@ const ul = el('ul', { class: 'questions' }); for (const q of s.questions) { const li = el('li', { class: 'question' }); - li.append( - el('div', { class: 'q-head' }, - el('span', { class: 'msg-ts' }, fmt(q.asked_at)), ' ', - el('span', { class: 'msg-from' }, q.asker), ' ', - el('span', { class: 'msg-sep' }, 'asks:'), - ), - el('div', { class: 'q-body' }, q.question), + const head = el('div', { class: 'q-head' }, + el('span', { class: 'msg-ts' }, fmt(q.asked_at)), ' ', + el('span', { class: 'msg-from' }, q.asker), ' ', + el('span', { class: 'msg-sep' }, 'asks:'), ); + if (q.deadline_at) { + const remaining = q.deadline_at - Math.floor(Date.now() / 1000); + let txt; + if (remaining <= 0) txt = 'expiring…'; + else if (remaining < 60) txt = '⏳ ' + remaining + 's'; + else if (remaining < 3600) txt = '⏳ ' + Math.floor(remaining / 60) + 'm ' + + (remaining % 60) + 's'; + else txt = '⏳ ' + Math.floor(remaining / 3600) + 'h ' + + Math.floor((remaining % 3600) / 60) + 'm'; + head.append(' ', el('span', { class: 'q-ttl' }, txt)); + } + li.append(head, el('div', { class: 'q-body' }, q.question)); const f = el('form', { method: 'POST', action: '/answer-question/' + q.id, class: 'qform', 'data-async': '', diff --git a/hive-c0re/assets/dashboard.css b/hive-c0re/assets/dashboard.css index 5fc4c4e..b48cac3 100644 --- a/hive-c0re/assets/dashboard.css +++ b/hive-c0re/assets/dashboard.css @@ -296,6 +296,12 @@ summary:hover { color: var(--purple); } } .questions li.question:last-child { border-bottom: 0; } .questions .q-head { font-size: 0.9em; } +.questions .q-ttl { + color: var(--amber); + margin-left: 0.4em; + font-size: 0.95em; + letter-spacing: 0.05em; +} .questions .q-body { color: var(--fg); margin: 0.3em 0; diff --git a/hive-c0re/src/manager_server.rs b/hive-c0re/src/manager_server.rs index efefd64..d946b27 100644 --- a/hive-c0re/src/manager_server.rs +++ b/hive-c0re/src/manager_server.rs @@ -72,7 +72,7 @@ async fn serve(stream: UnixStream, coord: Arc) -> Result<()> { const MANAGER_RECV_LONG_POLL: std::time::Duration = std::time::Duration::from_secs(30); #[allow(clippy::too_many_lines)] -async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse { +async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResponse { match req { ManagerRequest::Send { to, body } => match coord.broker.send(&Message { from: MANAGER_AGENT.to_owned(), @@ -198,14 +198,26 @@ async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse question, options, multi, + ttl_seconds, } => { - tracing::info!(%question, ?options, multi, "manager: ask_operator"); + tracing::info!(%question, ?options, multi, ?ttl_seconds, "manager: ask_operator"); + let deadline_at = ttl_seconds.and_then(|s| { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + .unwrap_or(0); + i64::try_from(s).ok().map(|s| now + s) + }); match coord .questions - .submit(MANAGER_AGENT, question, options, *multi) + .submit(MANAGER_AGENT, question, options, *multi, deadline_at) { Ok(id) => { - tracing::info!(%id, "operator question queued"); + tracing::info!(%id, ?deadline_at, "operator question queued"); + if let Some(ttl) = *ttl_seconds { + spawn_question_watchdog(coord, id, ttl); + } ManagerResponse::QuestionQueued { id } } Err(e) => ManagerResponse::Err { @@ -227,3 +239,28 @@ async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse } } } + +/// On `AskOperator { ttl_seconds: Some(n) }`, sleep n seconds and then +/// try to resolve the question with `[expired]`. If the operator (or +/// any other path) already answered it, `answer()` returns Err and +/// we no-op silently. Otherwise fire the usual `OperatorAnswered` +/// helper event so the manager sees a terminal state. +const TTL_SENTINEL: &str = "[expired]"; + +fn spawn_question_watchdog(coord: &Arc, id: i64, ttl_secs: u64) { + let coord = coord.clone(); + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(ttl_secs)).await; + // `answer` returns Err if already resolved — that's the + // normal path when the operator responded before the ttl + // fired, so no-op silently. + if let Ok(question) = coord.questions.answer(id, TTL_SENTINEL) { + tracing::info!(%id, "operator question expired (ttl)"); + coord.notify_manager(&hive_sh4re::HelperEvent::OperatorAnswered { + id, + question, + answer: TTL_SENTINEL.to_owned(), + }); + } + }); +} diff --git a/hive-c0re/src/operator_questions.rs b/hive-c0re/src/operator_questions.rs index 7ece4fc..4ca652e 100644 --- a/hive-c0re/src/operator_questions.rs +++ b/hive-c0re/src/operator_questions.rs @@ -25,17 +25,29 @@ CREATE INDEX IF NOT EXISTS idx_operator_questions_pending ON operator_questions (id) WHERE answered_at IS NULL; "; -/// Add the `multi` column to pre-existing databases. `ALTER TABLE ADD COLUMN` -/// has no `IF NOT EXISTS` form in sqlite, so we check `pragma_table_info` first. -fn ensure_multi_column(conn: &Connection) -> Result<()> { - let has: bool = conn - .prepare("SELECT 1 FROM pragma_table_info('operator_questions') WHERE name = 'multi'")? - .exists([])?; - if !has { - conn.execute_batch( +/// Add late-added columns to pre-existing databases. `ALTER TABLE +/// ADD COLUMN` has no `IF NOT EXISTS` form in sqlite, so we check +/// `pragma_table_info` first per column. +fn ensure_columns(conn: &Connection) -> Result<()> { + for (name, sql) in [ + ( + "multi", "ALTER TABLE operator_questions ADD COLUMN multi INTEGER NOT NULL DEFAULT 0;", - ) - .context("add operator_questions.multi column")?; + ), + ( + "deadline_at", + "ALTER TABLE operator_questions ADD COLUMN deadline_at INTEGER;", + ), + ] { + let has: bool = conn + .prepare(&format!( + "SELECT 1 FROM pragma_table_info('operator_questions') WHERE name = '{name}'" + ))? + .exists([])?; + if !has { + conn.execute_batch(sql) + .with_context(|| format!("add operator_questions.{name} column"))?; + } } Ok(()) } @@ -49,6 +61,10 @@ pub struct OpQuestion { pub options: Vec, pub multi: bool, pub asked_at: i64, + /// Absolute unix-seconds deadline after which a watchdog auto- + /// resolves the question with answer `[expired]`. `None` = no + /// expiry. Surfaced on the dashboard as a remaining-time chip. + pub deadline_at: Option, pub answered_at: Option, pub answer: Option, } @@ -68,7 +84,7 @@ impl OperatorQuestions { .with_context(|| format!("open operator_questions db {}", path.display()))?; conn.execute_batch(SCHEMA) .context("apply operator_questions schema")?; - ensure_multi_column(&conn).context("migrate operator_questions.multi")?; + ensure_columns(&conn).context("migrate operator_questions columns")?; Ok(Self { conn: Mutex::new(conn), }) @@ -80,13 +96,22 @@ impl OperatorQuestions { question: &str, options: &[String], multi: bool, + deadline_at: Option, ) -> Result { let conn = self.conn.lock().unwrap(); let options_json = serde_json::to_string(options).unwrap_or_else(|_| "[]".into()); conn.execute( - "INSERT INTO operator_questions (asker, question, options_json, multi, asked_at) - VALUES (?1, ?2, ?3, ?4, ?5)", - params![asker, question, options_json, i64::from(multi), now_unix()], + "INSERT INTO operator_questions + (asker, question, options_json, multi, deadline_at, asked_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + params![ + asker, + question, + options_json, + i64::from(multi), + deadline_at, + now_unix(), + ], )?; Ok(conn.last_insert_rowid()) } @@ -119,7 +144,7 @@ impl OperatorQuestions { pub fn get(&self, id: i64) -> Result> { let conn = self.conn.lock().unwrap(); conn.query_row( - "SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer + "SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at FROM operator_questions WHERE id = ?1", params![id], row_to_question, @@ -131,7 +156,7 @@ impl OperatorQuestions { pub fn pending(&self) -> Result> { let conn = self.conn.lock().unwrap(); let mut stmt = conn.prepare( - "SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer + "SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at FROM operator_questions WHERE answered_at IS NULL ORDER BY id ASC", @@ -155,6 +180,7 @@ fn row_to_question(row: &rusqlite::Row<'_>) -> rusqlite::Result { asked_at: row.get(5)?, answered_at: row.get(6)?, answer: row.get(7)?, + deadline_at: row.get(8)?, }) } diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index 56eead0..52f8000 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -323,12 +323,18 @@ pub enum ManagerRequest { /// - `multi=true` lets the operator pick multiple options (rendered /// as checkboxes). The answer is returned as a single string with /// selections joined by ", ". + /// - `ttl_seconds`: optional auto-cancel after that many seconds. On + /// expiry the question is resolved with answer `[expired]` and the + /// manager gets the usual `OperatorAnswered` event. None = wait + /// forever for an operator answer (or manual cancel). AskOperator { question: String, #[serde(default)] options: Vec, #[serde(default)] multi: bool, + #[serde(default)] + ttl_seconds: Option, }, }