diff --git a/CLAUDE.md b/CLAUDE.md index b28be55..bc82dcb 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -169,6 +169,10 @@ Manager additionally: - `mcp__hyperhive__kill(name)` — graceful stop. - `mcp__hyperhive__request_apply_commit(agent, commit_ref)` — submit a config change for any agent (including `hm1nd` for self-mods). +- `mcp__hyperhive__ask_operator(question, options?)` — non-blocking; + queues a question on the dashboard, returns the question id. Operator's + answer arrives later as a `HelperEvent::OperatorAnswered` in the + manager inbox. The shared per-turn plumbing lives in `hive_ag3nt::turn::{write_mcp_config, write_settings, write_system_prompt, run_turn, drive_turn, emit_turn_end, @@ -274,9 +278,13 @@ manager itself now. rubber-stamp sub-agent config requests. It verifies (role match, package legitimacy, cheaper alternative, blast radius) before committing + calling `request_apply_commit`. For ambiguous cases or anything that -needs human signal, the manager forwards the question to the operator -via `send(to: "operator", ...)` — a dedicated `mcp__hyperhive__ask_operator` -tool with proper pause/resume semantics is in [TODO.md](TODO.md). +needs human signal, the manager calls `ask_operator(question, options?)` +which queues the question on the dashboard and returns the id +immediately; the operator's answer arrives later as +`HelperEvent::OperatorAnswered` in the manager inbox. Store at +`hive-c0re::operator_questions` (sqlite); answer flow: +`POST /answer-question/{id}` → `OperatorQuestions::answer` → +`notify_manager(OperatorAnswered { ... })`. ## Helper events to the manager @@ -296,6 +304,9 @@ turn so the manager can react. Variants (ApplyCommit). - `Killed { agent }` — admin `HostRequest::Kill` + dashboard `/kill`. - `Destroyed { agent }` — `actions::destroy`. +- `OperatorAnswered { id, question, answer }` — `dashboard::post_answer_question` + fires this after the operator submits the answer form for a question + the manager queued via `ask_operator`. To add a new event: new `HelperEvent` variant + call sites + update `prompts/manager.md` so the manager knows the new shape. diff --git a/TODO.md b/TODO.md index d9d5647..6f5c5e6 100644 --- a/TODO.md +++ b/TODO.md @@ -33,43 +33,12 @@ Pick anything from here when relevant. Cross-cutting design notes live in ## Manager → operator question channel -- **`mcp__hyperhive__ask_operator(question, options?)` tool** on the manager - MCP surface. The manager turn pauses; the question gets surfaced as a - prominent prompt on the dashboard (its own section, or interleaved with - the operator inbox); the operator's typed answer comes back as the tool - result. Modelled after Claude Code's `AskUserQuestion` tool. - - Design open questions: - - - **Storage.** New sqlite table `operator_questions(id, asker, question, - options_json, asked_at, answered_at, answer)` — or piggyback on the - existing message broker with a new envelope kind. Probably a new - table because the lifecycle (pending → answered) is different from - fire-and-forget messages. - - - **Waiting semantics.** The MCP tool call needs to block until - answered. Two options: - 1. Long-poll from inside the tool handler (broker-style — broadcast - on insert, await via `tokio::sync::broadcast`). Simple but the - claude turn stays alive for the whole wait, eating context-window - budget. - 2. Tool returns a `question_id` immediately; manager re-enters its - inbox loop and a `HelperEvent::OperatorAnswered { id, answer }` - wakes it. Cheaper context-wise but two-step. - - - **Dashboard UX.** New "◆ M1ND H4S QU3STI0NS ◆" section at the top - when any question is pending. Inline `
` with a textarea (or - select if `options` were provided), POST `/api/answer-question`. - State refresh + the live SSE stream notify the manager harness. - - - **Sub-agent path.** Sub-agents don't get the tool — they message the - manager and the manager decides whether to relay the question to the - operator. The manager's system prompt already covers this. - - - **Timeout / cancel.** Questions that sit pending too long: do they - expire? Manager probably wants to know if the operator hasn't - answered after some interval so it can fall back. Maybe a per- - question `ttl_seconds`. +- **TTL / cancel on `ask_operator`.** Questions today block forever; the + manager turn stays alive until the operator answers. Add a per-question + `ttl_seconds` (or a dashboard "cancel" button that resolves the question + with a sentinel answer) so a long-idle question can time out and let the + manager fall back. Wire the timeout into `OperatorQuestions::wait_answered` + and surface remaining-time on the dashboard. ## Loop substance diff --git a/hive-ag3nt/assets/app.js b/hive-ag3nt/assets/app.js index cc31b13..3ae251c 100644 --- a/hive-ag3nt/assets/app.js +++ b/hive-ag3nt/assets/app.js @@ -48,6 +48,10 @@ } // Clear text inputs the operator typed into (the form value was sent). f.querySelectorAll('input[type="text"], input:not([type])').forEach((i) => { i.value = ''; }); + // Re-enable the button — refreshState() often skips re-rendering the + // form (status unchanged), so without this the spinner sticks and + // the operator can't submit again. + if (btn) { btn.disabled = false; btn.innerHTML = original; } refreshState(); } catch (err) { alert('action failed: ' + err); diff --git a/hive-ag3nt/prompts/manager.md b/hive-ag3nt/prompts/manager.md index 76e54d1..b75c8b3 100644 --- a/hive-ag3nt/prompts/manager.md +++ b/hive-ag3nt/prompts/manager.md @@ -7,6 +7,7 @@ Tools (hyperhive surface): - `mcp__hyperhive__request_spawn(name)` — queue a brand-new sub-agent for operator approval (≤9 char name). - `mcp__hyperhive__kill(name)` — graceful stop on a sub-agent. - `mcp__hyperhive__request_apply_commit(agent, commit_ref)` — submit a config change for any agent (`hm1nd` for self) for operator approval. +- `mcp__hyperhive__ask_operator(question, options?)` — surface a question on the dashboard. Returns immediately with a question id; the operator's answer arrives later as a system `operator_answered` event in your inbox. Do not poll inside the same turn — finish the current work and react when the event lands. Your own editable config lives at `/agents/hm1nd/config/agent.nix`; every sub-agent's lives at `/agents//config/agent.nix`. Use file/git tools to edit + commit, then `request_apply_commit`. @@ -19,9 +20,9 @@ Sub-agents are NOT trusted by default. When one asks for a config change (new pa You're the policy gate between sub-agents and the operator's approval queue — the operator clicks ◆ APPR0VE on your commits, so don't submit changes you wouldn't defend. -You can surface questions to the operator. (NOT YET IMPLEMENTED: a dedicated `mcp__hyperhive__ask_operator` tool will land soon — it pauses the turn, drops a prompt on the dashboard, and resumes with the answer.) For now, send to `operator` with a clear question and wait for the next turn to see their reply; the cadence is slower but the shape is the same. +Two ways to talk to the operator: `send(to: "operator", ...)` for fire-and-forget status / pointers (surfaces in the operator inbox), or `ask_operator(question, options?)` when you need a decision. `ask_operator` is non-blocking — it queues the question and returns an id immediately; the answer arrives on a future turn as an `operator_answered` system event. Prefer `ask_operator` over an open-ended `send` for anything you actually need to wait on. -Messages from sender `system` are hyperhive helper events (JSON body, `event` field discriminates): `approval_resolved`, `spawned`, `rebuilt`, `killed`, `destroyed`. Use these to react to lifecycle changes — e.g. greet a freshly-spawned agent, retry a failed rebuild, or note the change to the operator. +Messages from sender `system` are hyperhive helper events (JSON body, `event` field discriminates): `approval_resolved`, `spawned`, `rebuilt`, `killed`, `destroyed`, `operator_answered`. Use these to react to lifecycle changes — e.g. greet a freshly-spawned agent, retry a failed rebuild, or pick up the operator's answer to a question you previously asked. Durable knowledge: diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index ecb9bcc..16b521f 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -176,7 +176,9 @@ fn format_wake_prompt(from: &str, body: &str, unread: u64) -> String { let pending = if unread == 0 { String::new() } else { - format!("\n\n({unread} more message(s) pending in your inbox — drain via `mcp__hyperhive__recv` if relevant.)") + format!( + "\n\n({unread} more message(s) pending in your inbox — drain via `mcp__hyperhive__recv` if relevant.)" + ) }; format!("Incoming message from `{from}`:\n---\n{body}\n---{pending}") } diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 295dd1f..524a3ba 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -170,7 +170,11 @@ async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> { turn::emit_turn_end(&bus, &outcome); } Ok(ManagerResponse::Empty) => {} - Ok(ManagerResponse::Ok | ManagerResponse::Status { .. }) => { + Ok( + ManagerResponse::Ok + | ManagerResponse::Status { .. } + | ManagerResponse::QuestionQueued { .. }, + ) => { tracing::warn!("recv produced unexpected response kind"); } Ok(ManagerResponse::Err { message }) => { @@ -184,7 +188,6 @@ async fn serve(socket: &Path, interval: Duration, bus: Bus) -> Result<()> { } } - /// Per-turn user prompt. The role/tools/etc. is in the system prompt /// (`prompts/manager.md` → `claude --system-prompt-file`); this is just /// the wake signal. `unread` is the inbox depth after this message was @@ -193,7 +196,9 @@ fn format_wake_prompt(from: &str, body: &str, unread: u64) -> String { let pending = if unread == 0 { String::new() } else { - format!("\n\n({unread} more message(s) pending in your inbox — drain via `mcp__hyperhive__recv` if relevant.)") + format!( + "\n\n({unread} more message(s) pending in your inbox — drain via `mcp__hyperhive__recv` if relevant.)" + ) }; format!("Incoming message from `{from}`:\n---\n{body}\n---{pending}") } diff --git a/hive-ag3nt/src/mcp.rs b/hive-ag3nt/src/mcp.rs index c2c7485..eec9bfa 100644 --- a/hive-ag3nt/src/mcp.rs +++ b/hive-ag3nt/src/mcp.rs @@ -37,6 +37,7 @@ pub enum SocketReply { Message { from: String, body: String }, Empty, Status(u64), + QuestionQueued(i64), } impl From for SocketReply { @@ -59,6 +60,7 @@ impl From for SocketReply { hive_sh4re::ManagerResponse::Message { from, body } => Self::Message { from, body }, hive_sh4re::ManagerResponse::Empty => Self::Empty, hive_sh4re::ManagerResponse::Status { unread } => Self::Status(unread), + hive_sh4re::ManagerResponse::QuestionQueued { id } => Self::QuestionQueued(id), } } } @@ -209,6 +211,16 @@ pub struct KillArgs { pub name: String, } +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct AskOperatorArgs { + /// The question to surface on the dashboard. + pub question: String, + /// Optional fixed-choice answers. If empty, the dashboard renders a + /// free-text input. Otherwise renders a select list of these options. + #[serde(default)] + pub options: Vec, +} + #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct RequestApplyCommitArgs { /// Agent whose config repo the commit lives in (use `"hm1nd"` for the @@ -310,6 +322,36 @@ impl ManagerServer { .await } + #[tool( + description = "Surface a question to the operator on the dashboard. Returns immediately \ + with a question id — do NOT wait inline. When the operator answers, a system message \ + with event `operator_answered { id, question, answer }` lands in your inbox; handle it \ + on a future turn. Use this when a decision needs human signal (ambiguous sub-agent \ + request, policy call, scope clarification). `options` is advisory: pass a short \ + fixed-choice list when applicable, otherwise leave empty for free text." + )] + async fn ask_operator(&self, Parameters(args): Parameters) -> String { + let log = format!("{args:?}"); + run_tool_envelope("ask_operator", log, async move { + let resp = self + .dispatch(hive_sh4re::ManagerRequest::AskOperator { + question: args.question, + options: args.options, + }) + .await; + match resp { + Ok(SocketReply::QuestionQueued(id)) => format!( + "question queued (id={id}); operator's answer will arrive as a system \ + `operator_answered` event in your inbox" + ), + Ok(SocketReply::Err(m)) => format!("ask_operator failed: {m}"), + Ok(other) => format!("ask_operator unexpected response: {other:?}"), + Err(e) => format!("ask_operator transport error: {e:#}"), + } + }) + .await + } + #[tool( description = "Submit a config change for operator approval. Pass the agent name \ (e.g. `alice` or `hm1nd` for the manager's own config) and a commit sha in that \ @@ -322,23 +364,19 @@ impl ManagerServer { let log = format!("{args:?}"); let agent = args.agent.clone(); let commit_ref = args.commit_ref.clone(); - run_tool_envelope( - "request_apply_commit", - log, - async move { - let resp = self - .dispatch(hive_sh4re::ManagerRequest::RequestApplyCommit { - agent: args.agent, - commit_ref: args.commit_ref, - }) - .await; - format_ack( - resp, - "request_apply_commit", - format!("apply approval queued for {agent} @ {commit_ref}"), - ) - }, - ) + run_tool_envelope("request_apply_commit", log, async move { + let resp = self + .dispatch(hive_sh4re::ManagerRequest::RequestApplyCommit { + agent: args.agent, + commit_ref: args.commit_ref, + }) + .await; + format_ack( + resp, + "request_apply_commit", + format!("apply approval queued for {agent} @ {commit_ref}"), + ) + }) .await } } @@ -348,7 +386,8 @@ impl ManagerServer { relay between them and the operator. Use `send` to talk to agents/operator, `recv` \ to drain your inbox. Privileged: `request_spawn` (new agent, gated on operator \ approval), `kill` (graceful stop), `request_apply_commit` (config change for \ - any agent including yourself). The manager's own config lives at \ + any agent including yourself), `ask_operator` (block on a human answer via the \ + dashboard). The manager's own config lives at \ `/agents/hm1nd/config/agent.nix`." )] impl ServerHandler for ManagerServer {} @@ -388,6 +427,7 @@ pub fn allowed_mcp_tools(flavor: Flavor) -> Vec { "request_spawn", "kill", "request_apply_commit", + "ask_operator", ], }; names diff --git a/hive-c0re/assets/app.js b/hive-c0re/assets/app.js index 655e473..a8d59f0 100644 --- a/hive-c0re/assets/app.js +++ b/hive-c0re/assets/app.js @@ -58,6 +58,12 @@ if (btn) { btn.disabled = false; btn.innerHTML = original; } return; } + // Re-enable the button — refreshState() rebuilds most lists but + // skips forms that didn't change (e.g. the spawn form), so without + // this the spinner sticks and the button can't be clicked again. + if (btn) { btn.disabled = false; btn.innerHTML = original; } + // Clear text inputs whose value was just submitted. + f.querySelectorAll('input[type="text"], input:not([type]), textarea').forEach((i) => { i.value = ''; }); refreshState(); } catch (err) { alert('action failed: ' + err); @@ -170,6 +176,49 @@ root.append(ul); } + function renderQuestions(s) { + const root = $('questions-section'); + root.innerHTML = ''; + if (!s.questions || !s.questions.length) { + root.append(el('p', { class: 'empty' }, '▓ no pending questions ▓')); + return; + } + const fmt = (n) => new Date(n * 1000).toISOString().replace('T', ' ').slice(0, 19); + const ul = el('ul', { class: 'questions' }); + for (const q of s.questions) { + const li = el('li', { class: 'question' }); + li.append( + el('div', { class: 'q-head' }, + el('span', { class: 'msg-ts' }, fmt(q.asked_at)), ' ', + el('span', { class: 'msg-from' }, q.asker), ' ', + el('span', { class: 'msg-sep' }, 'asks:'), + ), + el('div', { class: 'q-body' }, q.question), + ); + const f = el('form', { + method: 'POST', action: '/answer-question/' + q.id, + class: 'qform', 'data-async': '', + }); + let input; + if (q.options && q.options.length) { + input = el('select', { name: 'answer', required: '' }); + input.append(el('option', { value: '', disabled: '', selected: '' }, 'choose…')); + for (const opt of q.options) { + input.append(el('option', { value: opt }, opt)); + } + } else { + input = el('input', { + name: 'answer', type: 'text', required: '', + placeholder: 'your answer', autocomplete: 'off', + }); + } + f.append(input, el('button', { type: 'submit', class: 'btn btn-approve' }, '▸ ANSW3R')); + li.append(f); + ul.append(li); + } + root.append(ul); + } + function renderInbox(s) { const root = $('inbox-section'); root.innerHTML = ''; @@ -250,6 +299,7 @@ if (!resp.ok) throw new Error('http ' + resp.status); const s = await resp.json(); renderContainers(s); + renderQuestions(s); renderInbox(s); renderApprovals(s); // Auto-refresh while a spawn is in flight; otherwise back off. diff --git a/hive-c0re/assets/dashboard.css b/hive-c0re/assets/dashboard.css index 8dee7fa..4d8e4b5 100644 --- a/hive-c0re/assets/dashboard.css +++ b/hive-c0re/assets/dashboard.css @@ -193,6 +193,36 @@ summary:hover { color: var(--purple); } .diff .diff-hunk { color: var(--cyan); } .diff .diff-file { color: var(--purple); font-weight: bold; } .diff .diff-ctx { color: var(--fg); } +.questions { + background: var(--bg-elev); + border: 1px solid var(--amber); + box-shadow: 0 0 12px -4px var(--amber); + padding: 0.6em 0.9em; +} +.questions li.question { + padding: 0.4em 0; + border-bottom: 1px solid var(--border); +} +.questions li.question:last-child { border-bottom: 0; } +.questions .q-head { font-size: 0.9em; } +.questions .q-body { + color: var(--fg); + margin: 0.3em 0; + white-space: pre-wrap; + word-break: break-word; +} +.qform { display: flex; gap: 0.6em; align-items: stretch; margin-top: 0.3em; } +.qform input, .qform select { + font-family: inherit; + font-size: 1em; + background: var(--bg); + color: var(--fg); + border: 1px solid var(--border); + padding: 0.4em 0.6em; + flex: 1; +} +.qform input::placeholder { color: var(--muted); } +.qform input:focus, .qform select:focus { outline: 1px solid var(--amber); } .inbox { background: var(--bg-elev); border: 1px solid var(--border); diff --git a/hive-c0re/assets/index.html b/hive-c0re/assets/index.html index 2565ec0..838591a 100644 --- a/hive-c0re/assets/index.html +++ b/hive-c0re/assets/index.html @@ -16,6 +16,12 @@

loading…

+

◆ M1ND H4S QU3STI0NS ◆

+
══════════════════════════════════════════════════════════════
+
+

loading…

+
+

◆ 0PER4T0R 1NB0X ◆

══════════════════════════════════════════════════════════════
diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 3f4022a..232b15c 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -101,8 +101,7 @@ impl Broker { /// inbox view on the dashboard. Caller decides what to show. pub fn recent_for(&self, recipient: &str, limit: u64) -> Result> { let conn = self.conn.lock().unwrap(); - let limit_i = - i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX); + let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX); let mut stmt = conn.prepare( "SELECT id, sender, body, sent_at FROM messages diff --git a/hive-c0re/src/coordinator.rs b/hive-c0re/src/coordinator.rs index fa19f58..389397d 100644 --- a/hive-c0re/src/coordinator.rs +++ b/hive-c0re/src/coordinator.rs @@ -11,6 +11,7 @@ use anyhow::{Context, Result}; use crate::agent_server::{self, AgentSocket}; use crate::approvals::Approvals; use crate::broker::Broker; +use crate::operator_questions::OperatorQuestions; const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents"; const MANAGER_RUNTIME_ROOT: &str = "/run/hyperhive/manager"; @@ -26,6 +27,7 @@ const APPLIED_STATE_ROOT: &str = "/var/lib/hyperhive/applied"; pub struct Coordinator { pub broker: Arc, pub approvals: Arc, + pub questions: Arc, /// URL of the hyperhive flake (no fragment). Inlined into per-agent /// `flake.nix` files as `inputs.hyperhive.url`. pub hyperhive_flake: String, @@ -58,9 +60,11 @@ impl Coordinator { pub fn open(db_path: &Path, hyperhive_flake: String, dashboard_port: u16) -> Result { let broker = Broker::open(db_path).context("open broker")?; let approvals = Approvals::open(db_path).context("open approvals")?; + let questions = OperatorQuestions::open(db_path).context("open operator_questions")?; Ok(Self { broker: Arc::new(broker), approvals: Arc::new(approvals), + questions: Arc::new(questions), hyperhive_flake, dashboard_port, agents: Mutex::new(HashMap::new()), diff --git a/hive-c0re/src/dashboard.rs b/hive-c0re/src/dashboard.rs index 2b41ad5..bcfed09 100644 --- a/hive-c0re/src/dashboard.rs +++ b/hive-c0re/src/dashboard.rs @@ -50,6 +50,7 @@ pub async fn serve(port: u16, coord: Arc) -> Result<()> { .route("/start/{name}", post(post_start)) .route("/rebuild/{name}", post(post_rebuild)) .route("/update-all", post(post_update_all)) + .route("/answer-question/{id}", post(post_answer_question)) .route("/request-spawn", post(post_request_spawn)) .route("/messages/stream", get(messages_stream)) .with_state(AppState { coord }); @@ -75,7 +76,10 @@ async fn serve_index() -> impl IntoResponse { } async fn serve_css() -> impl IntoResponse { - ([("content-type", "text/css")], include_str!("../assets/dashboard.css")) + ( + [("content-type", "text/css")], + include_str!("../assets/dashboard.css"), + ) } async fn serve_app_js() -> impl IntoResponse { @@ -97,6 +101,11 @@ struct StateSnapshot { /// asynchronously so the operator can see them without watching the /// live panel during a turn. operator_inbox: Vec, + /// Pending operator questions (currently only from the manager). + /// `ask_operator` returns immediately with the id; on `/answer-question` + /// we mark the row answered and fire `HelperEvent::OperatorAnswered` + /// into the manager's inbox. + questions: Vec, } #[derive(Serialize)] @@ -131,10 +140,7 @@ struct ApprovalView { diff_html: Option, } -async fn api_state( - headers: HeaderMap, - State(state): State, -) -> axum::Json { +async fn api_state(headers: HeaderMap, State(state): State) -> axum::Json { let host = headers .get("host") .and_then(|h| h.to_str().ok()) @@ -227,6 +233,7 @@ async fn api_state( .broker .recent_for(hive_sh4re::OPERATOR_RECIPIENT, 50) .unwrap_or_default(); + let questions = state.coord.questions.pending().unwrap_or_default(); axum::Json(StateSnapshot { hostname, @@ -236,6 +243,7 @@ async fn api_state( transients, approvals: approval_views, operator_inbox, + questions, }) } @@ -271,6 +279,36 @@ struct RequestSpawnForm { name: String, } +#[derive(Deserialize)] +struct AnswerForm { + answer: String, +} + +async fn post_answer_question( + State(state): State, + AxumPath(id): AxumPath, + Form(form): Form, +) -> Response { + let answer = form.answer.trim(); + if answer.is_empty() { + return error_response("answer: required"); + } + match state.coord.questions.answer(id, answer) { + Ok(question) => { + tracing::info!(%id, "operator answered question"); + state + .coord + .notify_manager(&hive_sh4re::HelperEvent::OperatorAnswered { + id, + question, + answer: answer.to_owned(), + }); + Redirect::to("/").into_response() + } + Err(e) => error_response(&format!("answer {id} failed: {e:#}")), + } +} + async fn post_request_spawn( State(state): State, Form(form): Form, @@ -325,7 +363,10 @@ async fn post_kill(State(state): State, AxumPath(name): AxumPath, AxumPath(name): AxumPath) -> Response { +async fn post_restart( + State(_state): State, + AxumPath(name): AxumPath, +) -> Response { let logical = strip_container_prefix(&name); match lifecycle::restart(&logical).await { Ok(()) => Redirect::to("/").into_response(), @@ -368,7 +409,10 @@ async fn post_update_all(State(state): State) -> Response { if errors.is_empty() { Redirect::to("/").into_response() } else { - error_response(&format!("update-all partial failure:\n{}", errors.join("\n"))) + error_response(&format!( + "update-all partial failure:\n{}", + errors.join("\n") + )) } } @@ -393,8 +437,6 @@ fn error_response(message: &str) -> Response { (StatusCode::INTERNAL_SERVER_ERROR, message.to_owned()).into_response() } - - /// Filter out approvals whose agent state dir was wiped out from under us /// (e.g. by a test script's cleanup). Marks them failed so they fall out of /// `pending` on next render. @@ -508,4 +550,3 @@ fn html_escape(s: &str) -> String { .replace('<', "<") .replace('>', ">") } - diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 9de9bee..c78735b 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -15,6 +15,7 @@ mod coordinator; mod dashboard; mod lifecycle; mod manager_server; +mod operator_questions; mod server; use coordinator::Coordinator; diff --git a/hive-c0re/src/manager_server.rs b/hive-c0re/src/manager_server.rs index 53826d6..84e8168 100644 --- a/hive-c0re/src/manager_server.rs +++ b/hive-c0re/src/manager_server.rs @@ -71,6 +71,7 @@ async fn serve(stream: UnixStream, coord: Arc) -> Result<()> { const MANAGER_RECV_LONG_POLL: std::time::Duration = std::time::Duration::from_secs(30); +#[allow(clippy::too_many_lines)] async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse { match req { ManagerRequest::Send { to, body } => match coord.broker.send(&Message { @@ -143,6 +144,18 @@ async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse }, } } + ManagerRequest::AskOperator { question, options } => { + tracing::info!(%question, ?options, "manager: ask_operator"); + match coord.questions.submit(MANAGER_AGENT, question, options) { + Ok(id) => { + tracing::info!(%id, "operator question queued"); + ManagerResponse::QuestionQueued { id } + } + Err(e) => ManagerResponse::Err { + message: format!("{e:#}"), + }, + } + } ManagerRequest::RequestApplyCommit { agent, commit_ref } => { tracing::info!(%agent, %commit_ref, "manager: request_apply_commit"); match coord.approvals.submit(agent, commit_ref) { diff --git a/hive-c0re/src/operator_questions.rs b/hive-c0re/src/operator_questions.rs new file mode 100644 index 0000000..aa985f4 --- /dev/null +++ b/hive-c0re/src/operator_questions.rs @@ -0,0 +1,141 @@ +//! Operator question queue. Manager submits via `AskOperator`; the +//! operator answers via the dashboard. The manager-socket handler long-polls +//! the store until the answer lands, so claude's `ask_operator` tool call +//! returns the answer directly as its result. + +use std::path::Path; +use std::sync::Mutex; +use std::time::{SystemTime, UNIX_EPOCH}; + +use anyhow::{Context, Result, bail}; +use rusqlite::{Connection, OptionalExtension, params}; +use serde::Serialize; + +const SCHEMA: &str = r" +CREATE TABLE IF NOT EXISTS operator_questions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + asker TEXT NOT NULL, + question TEXT NOT NULL, + options_json TEXT NOT NULL, + asked_at INTEGER NOT NULL, + answered_at INTEGER, + answer TEXT +); +CREATE INDEX IF NOT EXISTS idx_operator_questions_pending + ON operator_questions (id) WHERE answered_at IS NULL; +"; + +#[derive(Debug, Clone, Serialize)] +pub struct OpQuestion { + pub id: i64, + pub asker: String, + pub question: String, + pub options: Vec, + pub asked_at: i64, + pub answered_at: Option, + pub answer: Option, +} + +pub struct OperatorQuestions { + conn: Mutex, +} + +impl OperatorQuestions { + pub fn open(path: &Path) -> Result { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).with_context(|| { + format!("create operator_questions db parent {}", parent.display()) + })?; + } + let conn = Connection::open(path) + .with_context(|| format!("open operator_questions db {}", path.display()))?; + conn.execute_batch(SCHEMA) + .context("apply operator_questions schema")?; + Ok(Self { + conn: Mutex::new(conn), + }) + } + + pub fn submit(&self, asker: &str, question: &str, options: &[String]) -> Result { + let conn = self.conn.lock().unwrap(); + let options_json = serde_json::to_string(options).unwrap_or_else(|_| "[]".into()); + conn.execute( + "INSERT INTO operator_questions (asker, question, options_json, asked_at) + VALUES (?1, ?2, ?3, ?4)", + params![asker, question, options_json, now_unix()], + )?; + Ok(conn.last_insert_rowid()) + } + + /// Mark the question answered. Returns the original question text so the + /// caller can include it in any helper event it fires off. + pub fn answer(&self, id: i64, answer: &str) -> Result { + let conn = self.conn.lock().unwrap(); + let question: Option<(String, Option)> = conn + .query_row( + "SELECT question, answered_at FROM operator_questions WHERE id = ?1", + params![id], + |row| Ok((row.get(0)?, row.get(1)?)), + ) + .optional()?; + let Some((question, answered_at)) = question else { + bail!("question {id} not found"); + }; + if answered_at.is_some() { + bail!("question {id} already answered"); + } + conn.execute( + "UPDATE operator_questions SET answer = ?1, answered_at = ?2 WHERE id = ?3", + params![answer, now_unix(), id], + )?; + Ok(question) + } + + #[allow(dead_code)] + pub fn get(&self, id: i64) -> Result> { + let conn = self.conn.lock().unwrap(); + conn.query_row( + "SELECT id, asker, question, options_json, asked_at, answered_at, answer + FROM operator_questions WHERE id = ?1", + params![id], + row_to_question, + ) + .optional() + .map_err(Into::into) + } + + pub fn pending(&self) -> Result> { + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT id, asker, question, options_json, asked_at, answered_at, answer + FROM operator_questions + WHERE answered_at IS NULL + ORDER BY id ASC", + )?; + let rows = stmt.query_map([], row_to_question)?; + rows.collect::>>() + .map_err(Into::into) + } +} + +fn row_to_question(row: &rusqlite::Row<'_>) -> rusqlite::Result { + let options_json: String = row.get(3)?; + let options: Vec = serde_json::from_str(&options_json).unwrap_or_default(); + Ok(OpQuestion { + id: row.get(0)?, + asker: row.get(1)?, + question: row.get(2)?, + options, + asked_at: row.get(4)?, + answered_at: row.get(5)?, + answer: row.get(6)?, + }) +} + +fn now_unix() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + .unwrap_or(0) +} diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index c7e88f5..aa5e749 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -228,6 +228,15 @@ pub enum HelperEvent { /// A sub-agent's container was torn down (container removed; state /// dirs preserved per `destroy` semantics). Destroyed { agent: String }, + /// The operator answered a question that was queued via + /// `AskOperator`. `id` matches the `QuestionQueued.id` returned to the + /// asker; `question` echoes the original prompt so the manager can + /// stitch the answer back to context across compactions. + OperatorAnswered { + id: i64, + question: String, + answer: String, + }, } /// Requests on the manager socket. Manager has the agent surface (send/recv) @@ -265,14 +274,35 @@ pub enum ManagerRequest { agent: String, commit_ref: String, }, + /// Ask the operator a question. The host-side handler blocks until the + /// operator answers via the dashboard; the answer is then returned as the + /// response. `options` is advisory: an empty list means free-text. + AskOperator { + question: String, + #[serde(default)] + options: Vec, + }, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum ManagerResponse { Ok, - Err { message: String }, - Message { from: String, body: String }, + Err { + message: String, + }, + Message { + from: String, + body: String, + }, Empty, - Status { unread: u64 }, + Status { + unread: u64, + }, + /// Result of `AskOperator`: the queued question id. The actual answer + /// arrives later as a `HelperEvent::OperatorAnswered` in the manager + /// inbox, so this returns immediately rather than blocking the turn. + QuestionQueued { + id: i64, + }, }