From 82b0877c47b06787a3bfa063fa049e437325fa8a Mon Sep 17 00:00:00 2001 From: damocles Date: Sun, 17 May 2026 12:10:49 +0200 Subject: [PATCH] =?UTF-8?q?ask:=20rename=20ask=5Foperator=20=E2=86=92=20as?= =?UTF-8?q?k=20+=20optional=20'to'=20for=20agent-to-agent=20Q&A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CLAUDE.md | 30 +++-- README.md | 19 +-- TODO.md | 3 +- docs/approvals.md | 36 ++++-- docs/persistence.md | 11 +- docs/turn-loop.md | 42 ++++--- docs/web-ui.md | 4 +- hive-ag3nt/prompts/agent.md | 3 +- hive-ag3nt/prompts/manager.md | 7 +- hive-ag3nt/src/bin/hive-m1nd.rs | 2 +- hive-ag3nt/src/mcp.rs | 188 ++++++++++++++++++++-------- hive-c0re/assets/app.js | 2 +- hive-c0re/src/agent_server.rs | 102 +++++++-------- hive-c0re/src/coordinator.rs | 8 +- hive-c0re/src/dashboard.rs | 36 ++++-- hive-c0re/src/limits.rs | 8 +- hive-c0re/src/main.rs | 1 + hive-c0re/src/manager_server.rs | 84 +++++++------ hive-c0re/src/operator_questions.rs | 105 ++++++++++++---- hive-c0re/src/questions.rs | 128 +++++++++++++++++++ hive-sh4re/src/lib.rs | 87 +++++++++---- 21 files changed, 640 insertions(+), 266 deletions(-) create mode 100644 hive-c0re/src/questions.rs diff --git a/CLAUDE.md b/CLAUDE.md index 22b8c5f..c7db0ed 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -22,7 +22,10 @@ hive-c0re/ host daemon + CLI (one binary, subcommand-dispatched) src/broker.rs sqlite Message store + broadcast channel for SSE + hourly vacuum of delivered>30d src/approvals.rs sqlite Approval queue + kinds - src/operator_questions.rs sqlite question queue backing `ask_operator` + src/operator_questions.rs sqlite question queue backing `ask` / + `answer` (both operator + agent-to-agent) + src/questions.rs shared dispatch for `Ask` / `Answer` — + used by both agent + manager surfaces src/reminder_scheduler.rs 5s poll loop: drains due reminders, resolves file_path container→host, persists payload + delivers pointer string @@ -31,8 +34,9 @@ hive-c0re/ host daemon + CLI (one binary, subcommand-dispatched) src/crash_watch.rs poll every 10s; fire HelperEvent::ContainerCrash when a previously-running container disappears without an operator-initiated transient - src/coordinator.rs shared state (broker/approvals/questions/transient/ - sockets) + tombstone enumeration + kick_agent + src/coordinator.rs shared state (broker/approvals/operator_questions/ + transient/sockets) + tombstone enumeration + + kick_agent + notify_agent (helper-event push) src/actions.rs approve/deny/destroy (transient-aware) src/auto_update.rs startup rebuild scan + ensure_manager + meta::lock_update_hyperhive bump @@ -161,11 +165,21 @@ Prune freely. domain tooling — the agent flake's `inputs` block pulls the external flake, `agent.nix` references it via `flakeInputs..packages.${pkgs.system}.default`. -- **Just landed:** `mcp__hyperhive__ask_operator` is now on - the sub-agent surface too (not just the manager). Answer - routes back to whichever agent asked via - `coord.notify_agent`; the dashboard already shows the - asker on each question row. +- **Just landed:** `ask_operator` → `ask` rename + optional + `to: ` param for agent-to-agent structured Q&A. + Recipient defaults to the operator (dashboard); peer + questions land in the target's inbox as `QuestionAsked` + events and the recipient replies via new `answer(id, + answer)` tool. Answer always flows back as + `QuestionAnswered { id, question, answer, answerer }` + (renamed from `OperatorAnswered`; `answerer` distinguishes + operator vs peer vs `ttl-watchdog`). Authorisation: + operator-targeted questions can only be answered by the + operator; agent-targeted by the named target (or the + operator as override). Self-ask rejected. Shared dispatch + lives in `hive-c0re/src/questions.rs`. Dashboard's + `pending()` filters on `target IS NULL` so peer questions + never leak into the operator's queue. - **Just landed:** dashboard now has a terminal-style compose textbox under the message-flow stream — `@name` picks the recipient (sticky in localStorage, auto- diff --git a/README.md b/README.md index 57be5ac..ed56da6 100644 --- a/README.md +++ b/README.md @@ -33,12 +33,12 @@ host (NixOS, runs hive-c0re.service) ├── hm1nd hive-m1nd serve : claude turn loop + │ MCP (send / recv / request_spawn / kill / start / │ restart / update / request_apply_commit / - │ ask_operator) + web UI on :8000 + │ ask / answer / remind) + web UI on :8000 │ └── h- hive-ag3nt serve : claude turn loop + - MCP (send / recv / ask_operator + agent-declared extras - via hyperhive.extraMcpServers) + web UI on a - hashed :8100-8999 + MCP (send / recv / ask / answer / remind + agent-declared + extras via hyperhive.extraMcpServers) + web UI + on a hashed :8100-8999 ``` Each turn: harness pops one inbox message (Recv long-polls server-side and @@ -89,10 +89,13 @@ inside the container — so `git fetch applied`, `cat /meta/flake.lock` all just work without constructing paths by hand. See [`docs/approvals.md`](docs/approvals.md) for the full state machine + lock-flow walkthrough. -For decisions the manager needs human signal on, `ask_operator(question, -options?, multi?)` queues a free-text/checkbox/radio form on the -dashboard; the answer arrives later as a `HelperEvent::OperatorAnswered` -in the manager's inbox. +For decisions any agent (manager or sub) needs structured signal on, +`ask(question, options?, multi?, ttl_seconds?, to?)` queues a question: +default recipient is the operator (dashboard renders a free-text / +checkbox / radio form), or pass `to: ""` to route a structured +peer question into another agent's inbox. The answer arrives later as +a `HelperEvent::QuestionAnswered { id, question, answer, answerer }` +in the asker's inbox. Peer recipients respond via `answer(id, answer)`. ## Host config diff --git a/TODO.md b/TODO.md index 3fa31eb..18e4c60 100644 --- a/TODO.md +++ b/TODO.md @@ -8,7 +8,7 @@ - **Broadcast messaging**: allow sending messages with recipient "*" to all agents; deliver with hint "this was a broadcast and may not need any action from you" - **Multi-agent restart coordination**: when rebuilding all agents, manager should start first so it can coordinate post-restart confusion (notify agents, suppress unnecessary retries, etc) - **Shared docs/skills repo (RO)**: a single repo on the hive forge that every agent has read-only access to — common references, prompts, runbooks, "skills" the operator wants every agent to inherit without baking into the system prompt or `/shared`. Implementation likely: seed an `org-shared/docs` repo on first hive-forge boot, grant every per-agent user a read membership in the org. Agents `git clone` it (or use the API) to read; only the manager + operator can push. -- **Rename `ask_operator` → `ask` with optional `to` param**: today `mcp__hyperhive__ask_operator` always targets the operator dashboard. Generalise: rename to `ask`, add optional `to: ` argument that defaults to `"operator"`. When `to` is another agent, route the question to that agent's inbox as a structured "question event" (different from a plain send so the recipient can answer back with the same id and the answer threads back to the asker). Unblocks agent-to-agent structured Q&A without burning regular inbox slots. +- ~~**Rename `ask_operator` → `ask` with optional `to` param**~~ ✓ done — `Ask { question, options, multi, ttl_seconds, to: Option }` on both `AgentRequest` + `ManagerRequest`. `to = None` (or `Some("operator")`) = dashboard path; `to = Some()` pushes `HelperEvent::QuestionAsked` into the target's inbox. New `Answer { id, answer }` request on both surfaces — target answers via `mcp__hyperhive__answer`; answer flows back to the asker as `HelperEvent::QuestionAnswered { id, question, answer, answerer }` (renamed from `OperatorAnswered`; carries who answered so the asker can distinguish operator vs peer vs `ttl-watchdog`). Authorisation: only the question's `target` agent or the operator can answer; self-ask is rejected. DB gets a nullable `target` column (NULL = operator path, back-compat). Dashboard's `pending()` / `recent_answered()` filter on `target IS NULL` so peer questions never leak into the operator's queue. Shared dispatch lives in `hive-c0re/src/questions.rs` so both surfaces stay aligned. - **Loose-ends tracker + `get_open_threads` tool**: hive-c0re already knows about pending approvals + unanswered questions; soon will also know about open PRs on hive-forge. Aggregate these into a per-agent "open threads" view (e.g. `[{kind: "approval", id: 7, summary: "spawn alice"}, {kind: "question", id: 12, asker: "alice", summary: "deploy now?"}]`). New MCP tool `mcp__hyperhive__get_open_threads` returns the list so an agent can see what's still pending against it without rebuilding context from inbox history. Manager's version includes hive-wide threads. **Also surface this list on the per-agent web UI** so the operator can see at a glance what each agent has hanging open — same data source as the MCP tool, just rendered into the existing per-agent dashboard page (next to inbox view / model chip / etc). ## Reminder Tool @@ -25,6 +25,7 @@ ## Dashboard +- **UI for agent-to-agent questions** (follow-up to the `ask` rename): now that agents can `ask(to: )` each other, surface those threads in the per-agent dashboard view. Replace the existing read/unread tabs with THREE filters: `unread`, `from: `, `to: `. The `to:` filter makes agent-targeted questions visible so the operator can see at a glance "alice has 3 questions outstanding from bob" and intervene if a thread is stuck. Same UI is useful for general inbox filtering too. Data lives in the existing `operator_questions` table (with the new `target` column) + the broker inbox; no new schema needed. Also expose a "respond" affordance so the operator can override-answer a peer question when an agent is offline / stuck (the answerer-auth check in `OperatorQuestions::answer` already permits the operator on any target). - **UI for pending reminders**: show pending/queued reminders in dashboard, allow operator to view/debug/cancel - Per-agent reminder status (pending, delivered) - Reminder query interface for debugging diff --git a/docs/approvals.md b/docs/approvals.md index de35a2f..28e459d 100644 --- a/docs/approvals.md +++ b/docs/approvals.md @@ -248,16 +248,25 @@ package legitimacy, cheaper alternative, blast radius) before committing and calling `request_apply_commit`. For ambiguous cases or anything that needs human signal, the -manager calls `ask_operator(question, options?, multi?, -ttl_seconds?)` — queues the question on the dashboard and returns -the id immediately. The operator's answer arrives later as -`HelperEvent::OperatorAnswered` in the manager inbox. Storage is -`hive-c0re::operator_questions` (sqlite); the answer flow is: +manager calls `ask(question, options?, multi?, ttl_seconds?, to?)` — +queues the question and returns the id immediately. When `to` is +omitted (or `"operator"`) the question shows up on the dashboard; +when `to` is a sub-agent's name, the recipient receives a +`HelperEvent::QuestionAsked` and answers via their own `answer` +tool. Either way the answer arrives back as +`HelperEvent::QuestionAnswered { id, question, answer, answerer }` +in the asker's inbox. Storage is `hive-c0re::operator_questions` +(sqlite) — same table, with a nullable `target` column +(NULL = operator). Dispatch goes through +`hive-c0re/src/questions.rs::{handle_ask, handle_answer}` so both +the agent + manager surfaces stay aligned. The answer flow is: ``` -POST /answer-question/{id} - → OperatorQuestions::answer - → notify_manager(OperatorAnswered { id, question, answer }) +POST /answer-question/{id} agent: Answer { id, answer } + → OperatorQuestions::answer(_, _, "operator") → questions::handle_answer + → notify_agent(asker, QuestionAnswered { → OperatorQuestions::answer(_, _, agent) + answerer: "operator", ... }) → notify_agent(asker, QuestionAnswered { + answerer: agent, ... }) ``` Two more paths resolve a pending question with a sentinel answer: @@ -301,9 +310,14 @@ regular claude turn so the manager can react. Variants - `NeedsUpdate { agent }` — sub-agent's recorded flake rev is stale. Manager calls `update(name)` to rebuild — idempotent, no approval required. -- `OperatorAnswered { id, question, answer }` — dashboard - `/answer-question/{id}` after the operator submits the answer - form. +- `QuestionAnswered { id, question, answer, answerer }` — + dashboard `/answer-question/{id}` (answerer = `"operator"`), + peer `Answer` request (answerer = agent name), or ttl watchdog + expiry (answerer = `"ttl-watchdog"`, answer = `"[expired]"`). +- `QuestionAsked { id, asker, question, options, multi }` — + fired when an agent calls `Ask { to: Some(), ... }`. + The recipient responds via `Answer { id, answer }` and the + asker sees the matching `QuestionAnswered`. To add a new event: new `HelperEvent` variant + call sites + update `prompts/manager.md` so the manager knows the new shape. diff --git a/docs/persistence.md b/docs/persistence.md index aeb90fd..55b951d 100644 --- a/docs/persistence.md +++ b/docs/persistence.md @@ -12,10 +12,15 @@ Three tables, all in one file: `sender / recipient / body / sent_at / delivered_at`. - `approvals` — the queue. `agent / kind (apply_commit | spawn) / commit_ref / requested_at / status / resolved_at / note`. -- `operator_questions` — `ask_operator` queue. +- `operator_questions` — `ask` / `answer` queue (despite the + file name, stores both operator-targeted + agent-to-agent + questions since the `ask` rename). `asker / question / options_json / multi / asked_at / - deadline_at (ttl) / answered_at / answer`. Migrated via - `ALTER TABLE ADD COLUMN` against `pragma_table_info`. + deadline_at (ttl) / answered_at / answer / target`. `target IS + NULL` = operator path (dashboard); `target = ''` = peer + Q&A (`HelperEvent::QuestionAsked` pushed into target's inbox, + answered via `Answer` request). Migrated via `ALTER TABLE ADD + COLUMN` against `pragma_table_info`. Retention: diff --git a/docs/turn-loop.md b/docs/turn-loop.md index 83a1f1e..cf23a49 100644 --- a/docs/turn-loop.md +++ b/docs/turn-loop.md @@ -107,10 +107,18 @@ it as a stdio child via `--mcp-config`. The hyperhive socket name is "anything pending?" peek. Positive value parks the turn up to that many seconds (cap 180) — incoming messages wake instantly, otherwise returns empty at the timeout. -- `ask_operator(question, options?, multi?, ttl_seconds?)` — - surface a question on the dashboard. Same shape as the manager's; - answer routes back to the asker's own inbox as - `HelperEvent::OperatorAnswered` via `coord.notify_agent`. +- `ask(question, options?, multi?, ttl_seconds?, to?)` — + surface a structured question. Same shape as the manager's; + recipient defaults to the operator (dashboard) but can be set + to a peer agent name via `to: ""`. Answer routes back + to the asker's own inbox as `HelperEvent::QuestionAnswered` + via `coord.notify_agent`. For peer questions the recipient + sees a `HelperEvent::QuestionAsked` event and replies with + `answer(id, answer)`. +- `answer(id, answer)` — respond to a `question_asked` event + routed to this agent. Authorisation is strict: only the + declared target (or the operator via the dashboard) can + answer. ### Waking the agent from inside the container @@ -167,16 +175,22 @@ meta's. - `request_apply_commit(agent, commit_ref)` — submit a config change for any agent (`hm1nd` for the manager's own config) for operator approval. -- `ask_operator(question, options?, multi?, ttl_seconds?)` — - surface a question on the dashboard. Non-blocking — returns the - queued question id; the operator's answer arrives later as - `HelperEvent::OperatorAnswered` in the manager inbox. Options - always render alongside a free-text fallback; `multi=true` - renders options as checkboxes. `ttl_seconds` auto-cancels with - answer `[expired]` after the deadline (useful for time-sensitive - decisions that become moot if the operator hasn't responded). - The operator can also manually cancel with `[cancelled]` via the - dashboard. +- `ask(question, options?, multi?, ttl_seconds?, to?)` — + surface a structured question to the operator (default) or a + sub-agent (`to: ""`). Non-blocking — returns the + queued question id; the answer arrives later as + `HelperEvent::QuestionAnswered { id, question, answer, + answerer }` in the asker's inbox. Options always render + alongside a free-text fallback; `multi=true` renders options + as checkboxes. `ttl_seconds` auto-cancels with answer + `[expired]` (and `answerer: "ttl-watchdog"`) after the + deadline (useful for time-sensitive decisions that become moot + if no one has responded). The operator can also manually + cancel with `[cancelled]` via the dashboard. +- `answer(id, answer)` — respond to a `question_asked` event + that was routed to the manager (a sub-agent did + `ask(to: "manager", ...)`). Surfaces in the asker's inbox as + the same `question_answered` event. The boundary: lifecycle ops on *existing* sub-agents (`kill`/`start`/`restart`) are at the manager's discretion — no diff --git a/docs/web-ui.md b/docs/web-ui.md index 2fec461..c00f548 100644 --- a/docs/web-ui.md +++ b/docs/web-ui.md @@ -61,7 +61,9 @@ the previous process's socket release resolves itself. age + claude-creds badge). Two actions: `⊕ R3V1V3` (queues a Spawn approval; existing state is reused), `PURG3` (wipes state + applied dirs; `POST /purge-tombstone/{name}`). -4. **M1ND H4S QU3STI0NS** — pending `ask_operator` questions +4. **M1ND H4S QU3STI0NS** — pending operator-targeted `ask` + questions, i.e. rows with `target IS NULL` (peer-to-peer + questions live in the same table but never surface here) (amber pulsing border). Free-text fallback always rendered alongside any option list; `multi=true` renders options as checkboxes; submit merges selections + free text comma-joined. diff --git a/hive-ag3nt/prompts/agent.md b/hive-ag3nt/prompts/agent.md index dbbf500..2616c83 100644 --- a/hive-ag3nt/prompts/agent.md +++ b/hive-ag3nt/prompts/agent.md @@ -5,7 +5,8 @@ Tools (hyperhive surface): - `mcp__hyperhive__recv(wait_seconds?)` — drain one more message from your inbox (returns `(empty)` if nothing pending). Without `wait_seconds` (or with `0`) it returns immediately — a cheap "anything pending?" peek you can sprinkle between tool calls. To **wait** for work when you have nothing else useful to do this turn, call with a long wait (e.g. `wait_seconds: 180`, the max) — incoming messages wake you instantly, otherwise the call returns empty at the timeout. That's strictly better than a fixed `sleep` shell command: lower latency on new work, no busy-loop. - `mcp__hyperhive__send(to, body)` — message a peer (by their name) or the operator (recipient `operator`, surfaces in the dashboard). Use `to: "*"` to broadcast to all agents (they receive a hint that it's a broadcast and may not need action). Some agents have a per-agent allow-list (`hyperhive.allowedRecipients` in their `agent.nix`) — if so the tool refuses recipients outside the list with a clear error; route through the manager (`send(to: "manager", …)`) which is always reachable. - (some agents only) **extra MCP tools** surfaced as `mcp____` — these are agent-specific (matrix client, scraper, db connector, etc.) declared in your `agent.nix` under `hyperhive.extraMcpServers`. Treat them as first-class tools alongside the hyperhive surface; the operator already auto-approved them at deploy time. -- `mcp__hyperhive__ask_operator(question, options?, multi?, ttl_seconds?)` — surface a question to the human operator on the dashboard. Returns immediately with a question id — do NOT wait inline. When the operator answers, a system message with event `operator_answered { id, question, answer }` lands in your inbox; handle it on a future turn. Use this for clarifications, permission for risky actions, or choice between options. `options` is advisory: a short fixed-choice list when applicable, otherwise leave empty for free text. `multi: true` lets the operator pick multiple (checkboxes), answer comes back comma-joined. `ttl_seconds` auto-cancels with answer `[expired]` when the decision becomes moot. +- `mcp__hyperhive__ask(question, options?, multi?, ttl_seconds?, to?)` — surface a structured question to the human operator (default, or `to: "operator"`) OR a peer agent (`to: ""`). Returns immediately with a question id — do NOT wait inline. When the recipient answers, a system message with event `question_answered { id, question, answer, answerer }` lands in your inbox; handle it on a future turn. Use this for clarifications, permission for risky actions, choice between options, or peer Q&A without burning regular inbox slots. `options` is advisory: a short fixed-choice list when applicable, otherwise leave empty for free text. `multi: true` lets the answerer pick multiple (checkboxes), answer comes back comma-joined. `ttl_seconds` auto-cancels with answer `[expired]` (and `answerer: "ttl-watchdog"`) when the decision becomes moot. +- `mcp__hyperhive__answer(id, answer)` — answer a question that was routed to YOU. You'll see one in your inbox as a `question_asked { id, asker, question, options, multi }` system event when a peer or the manager calls `ask(to: "", ...)`. The answer surfaces in the asker's inbox as a `question_answered` event. Strict authorisation: you can only answer questions where you are the declared target. Need new packages, env vars, or other NixOS config for yourself? You can't edit your own config directly — message the manager (recipient `manager`) describing what you need + why. The manager evaluates the request (it doesn't rubber-stamp), edits `/agents/{label}/config/agent.nix` on your behalf, commits, and submits an approval that the operator can accept on the dashboard; on approve hive-c0re rebuilds your container with the new config. diff --git a/hive-ag3nt/prompts/manager.md b/hive-ag3nt/prompts/manager.md index b7ae739..2f3a2e5 100644 --- a/hive-ag3nt/prompts/manager.md +++ b/hive-ag3nt/prompts/manager.md @@ -10,7 +10,8 @@ Tools (hyperhive surface): - `mcp__hyperhive__restart(name)` — stop + start a sub-agent. No approval required. - `mcp__hyperhive__update(name)` — rebuild a sub-agent (re-applies the current hyperhive flake + agent.nix, restarts the container). No approval required — idempotent. Use when you receive a `needs_update` system event. - `mcp__hyperhive__request_apply_commit(agent, commit_ref, description?)` — submit a config change for any agent (`hm1nd` for self) for operator approval. Pass an optional `description` and it appears on the dashboard approval card so the operator knows what changed without opening the diff. At submit time hive-c0re fetches your commit into the agent's applied repo and pins it as `proposal/`; from that moment your proposed-side commit can be amended or force-pushed freely without changing what the operator will build. -- `mcp__hyperhive__ask_operator(question, options?, multi?, ttl_seconds?)` — surface a question on the dashboard. Returns immediately with a question id; the operator's answer arrives later as a system `operator_answered` event in your inbox. Options are advisory: the dashboard always lets the operator type a free-text answer in addition. Set `multi: true` to render options as checkboxes (operator can pick multiple); the answer comes back as `, `-separated. Set `ttl_seconds` to auto-cancel after a deadline — useful when the decision becomes moot if the operator hasn't responded in time; on expiry the answer is `[expired]`. Do not poll inside the same turn — finish the current work and react when the event lands. +- `mcp__hyperhive__ask(question, options?, multi?, ttl_seconds?, to?)` — surface a structured question to the operator (default, or `to: "operator"`) OR a sub-agent (`to: ""`). Returns immediately with a question id; the answer arrives later as a system `question_answered { id, question, answer, answerer }` event in your inbox. Options are advisory: the dashboard always lets the operator type a free-text answer in addition. Set `multi: true` to render options as checkboxes (operator can pick multiple); the answer comes back as `, `-separated. Set `ttl_seconds` to auto-cancel after a deadline (capped at 6h server-side) — on expiry the answer is `[expired]` and `answerer` is `"ttl-watchdog"`. Do not poll inside the same turn — finish the current work and react when the event lands. +- `mcp__hyperhive__answer(id, answer)` — answer a question that was routed to YOU (a sub-agent did `ask(to: "manager", ...)`). The triggering event in your inbox is `question_asked { id, asker, question, options, multi }`. The answer surfaces in the asker's inbox as a `question_answered` event. Approval boundary: lifecycle ops on *existing* sub-agents (`kill`, `start`, `restart`) are at your discretion — no operator approval. *Creating* a new agent (`request_spawn`) and *changing* any agent's config (`request_apply_commit`) still go through the approval queue. The operator only signs off on changes; you run the day-to-day. @@ -62,9 +63,9 @@ Sub-agents are NOT trusted by default. When one asks for a config change (new pa You're the policy gate between sub-agents and the operator's approval queue — the operator clicks ◆ APPR0VE on your commits, so don't submit changes you wouldn't defend. -Two ways to talk to the operator: `send(to: "operator", ...)` for fire-and-forget status / pointers (surfaces in the operator inbox), or `ask_operator(question, options?)` when you need a decision. `ask_operator` is non-blocking — it queues the question and returns an id immediately; the answer arrives on a future turn as an `operator_answered` system event. Prefer `ask_operator` over an open-ended `send` for anything you actually need to wait on. +Two ways to talk to the operator: `send(to: "operator", ...)` for fire-and-forget status / pointers (surfaces in the operator inbox), or `ask(question, options?)` when you need a decision (omit `to`, or pass `to: "operator"`). `ask` is non-blocking — it queues the question and returns an id immediately; the answer arrives on a future turn as a `question_answered` system event. Prefer `ask` over an open-ended `send` for anything you actually need to wait on. Same primitive can target a sub-agent (`to: ""`) when you need a structured answer from a peer rather than free-form chat. -Messages from sender `system` are hyperhive helper events (JSON body, `event` field discriminates): `approval_resolved`, `spawned`, `rebuilt`, `killed`, `destroyed`, `container_crash`, `needs_login`, `logged_in`, `needs_update`, `operator_answered`. Use these to react to lifecycle changes: +Messages from sender `system` are hyperhive helper events (JSON body, `event` field discriminates): `approval_resolved`, `spawned`, `rebuilt`, `killed`, `destroyed`, `container_crash`, `needs_login`, `logged_in`, `needs_update`, `question_asked`, `question_answered`. Use these to react to lifecycle changes: - `needs_login` — agent has no claude session yet. You can't help directly (login is interactive OAuth on the operator side); flag the operator if it's been long. - `logged_in` — agent just completed login; first useful turn is imminent. Good time to brief them on what to do. diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 4e88813..705920c 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -36,7 +36,7 @@ enum Cmd { /// Run the manager MCP server on stdio. Spawned by claude via /// `--mcp-config`; same shape as `hive-ag3nt mcp` but with the /// manager tool surface (`request_spawn`, `kill`, `start`, `restart`, - /// `request_apply_commit`, `ask_operator`). + /// `request_apply_commit`, `ask`, `answer`, `remind`). Mcp, } diff --git a/hive-ag3nt/src/mcp.rs b/hive-ag3nt/src/mcp.rs index ce98231..51e1066 100644 --- a/hive-ag3nt/src/mcp.rs +++ b/hive-ag3nt/src/mcp.rs @@ -226,42 +226,74 @@ impl AgentServer { } #[tool( - description = "Surface a question to the operator on the dashboard. Returns immediately \ - with a question id — do NOT wait inline. When the operator answers, a system message \ - with event `operator_answered { id, question, answer }` lands in your inbox; handle it \ - on a future turn. Use this when a decision needs human signal (ambiguous scope, \ - permission to do something risky, choosing between options). `options` is advisory: \ - pass a short fixed-choice list when applicable, otherwise leave empty for free text. \ - Set `multi: true` to let the operator pick multiple options (checkboxes); the answer \ - comes back as a comma-separated string. Set `ttl_seconds` to auto-cancel a \ - no-longer-relevant question — on expiry the answer is `[expired]` and the same \ - `operator_answered` event fires." + description = "Surface a structured question to either the operator OR a peer agent. \ + Returns immediately with a question id — do NOT wait inline. When the recipient \ + answers, a system message with event `question_answered { id, question, answer, \ + answerer }` lands in your inbox; handle it on a future turn. \n\n\ + Recipient: omit `to` (or set `to: \"operator\"`) for the human operator on the \ + dashboard. Set `to: \"\"` to ask a peer agent — they receive a \ + `question_asked { id, asker, question, options, multi }` event in their inbox \ + and answer via `mcp__hyperhive__answer`. \n\n\ + `options` is advisory: pass a short fixed-choice list when applicable, otherwise \ + leave empty for free text. Set `multi: true` to let the answerer pick multiple \ + options (checkboxes on the dashboard, hint to the agent otherwise) — answer comes \ + back as a comma-separated string. Set `ttl_seconds` to auto-cancel a \ + no-longer-relevant question — on expiry the answer is `[expired]` (with \ + `answerer: \"ttl-watchdog\"`) and the same `question_answered` event fires." )] - async fn ask_operator(&self, Parameters(args): Parameters) -> String { + async fn ask(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); - run_tool_envelope("ask_operator", log, async move { + run_tool_envelope("ask", log, async move { let (resp, retries) = self - .dispatch(hive_sh4re::AgentRequest::AskOperator { + .dispatch(hive_sh4re::AgentRequest::Ask { question: args.question, options: args.options, multi: args.multi, ttl_seconds: args.ttl_seconds, + to: args.to, }) .await; let s = match resp { Ok(SocketReply::QuestionQueued(id)) => format!( - "question queued (id={id}); operator's answer will arrive as a system \ - `operator_answered` event in your inbox" + "question queued (id={id}); answer will arrive as a system \ + `question_answered` event in your inbox" ), - Ok(SocketReply::Err(m)) => format!("ask_operator failed: {m}"), - Ok(other) => format!("ask_operator unexpected response: {other:?}"), - Err(e) => format!("ask_operator transport error: {e:#}"), + Ok(SocketReply::Err(m)) => format!("ask failed: {m}"), + Ok(other) => format!("ask unexpected response: {other:?}"), + Err(e) => format!("ask transport error: {e:#}"), }; annotate_retries(s, retries) }) .await } + #[tool( + description = "Answer a question that was routed to YOU via a `question_asked` system \ + event in your inbox. Pass the `id` from that event and your `answer` string. The \ + answer will surface in the asker's inbox as a `question_answered { id, question, \ + answer, answerer: }` event. \n\n\ + Authorisation is strict — you can only answer questions where you are the declared \ + target (i.e. the asker did `ask(to: \"\", ...)`). Trying to answer an \ + operator-targeted question or a question addressed to a different agent will fail." + )] + async fn answer(&self, Parameters(args): Parameters) -> String { + let log = format!("{args:?}"); + let id = args.id; + run_tool_envelope("answer", log, async move { + let (resp, retries) = self + .dispatch(hive_sh4re::AgentRequest::Answer { + id, + answer: args.answer, + }) + .await; + annotate_retries( + format_ack(resp, "answer", format!("answered question {id}")), + retries, + ) + }) + .await + } + #[tool( description = "Pop one message from this agent's inbox. Returns the sender and body, \ or an empty marker if nothing is waiting. Without `wait_seconds` (or with 0) the \ @@ -389,25 +421,44 @@ pub struct UpdateArgs { } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -pub struct AskOperatorArgs { - /// The question to surface on the dashboard. +pub struct AskArgs { + /// The question to surface. pub question: String, - /// Optional fixed-choice answers. The dashboard always renders a - /// free-text fallback ("Other…") so the operator is never trapped - /// by an incomplete list. + /// Optional fixed-choice answers. The dashboard renders these as + /// chips alongside a free-text fallback ("Other…") so the operator + /// is never trapped by an incomplete list; peer-agent recipients + /// see the list in their inbox event and can return any string. #[serde(default)] pub options: Vec, - /// When true, options are rendered as checkboxes — operator can pick - /// any subset. The answer comes back as a single string with - /// selections joined by ", ". Ignored when `options` is empty. + /// When true, options are rendered as checkboxes — the answerer + /// can pick any subset. The answer comes back as a single string + /// with selections joined by ", ". Ignored when `options` is empty. #[serde(default)] pub multi: bool, - /// Optional auto-cancel after `ttl_seconds`. On expiry the question - /// resolves with answer `[expired]` and the manager receives the - /// usual `operator_answered` system event. `None` (default) = - /// wait indefinitely. + /// Optional auto-cancel after `ttl_seconds` (capped server-side at + /// 6 hours). On expiry the question resolves with answer + /// `[expired]` and the asker receives the usual + /// `question_answered` system event (with `answerer: + /// "ttl-watchdog"`). `None` (default) = wait indefinitely. #[serde(default)] pub ttl_seconds: Option, + /// Recipient. Omit (or pass `"operator"`) to ask the human + /// operator via the dashboard. Pass another agent's logical name + /// to ask that peer — they receive a `question_asked` event in + /// their inbox and answer via `mcp__hyperhive__answer`. + #[serde(default)] + pub to: Option, +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct AnswerArgs { + /// Id of the question being answered — comes from the + /// `question_asked` event in your inbox. + pub id: i64, + /// Free-text answer body. Soft-capped at 1 KiB by the same + /// `MESSAGE_MAX_BYTES` limit as `send`; keep it short or write the + /// detail to a file and pass a path. + pub answer: String, } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] @@ -597,42 +648,71 @@ impl ManagerServer { } #[tool( - description = "Surface a question to the operator on the dashboard. Returns immediately \ - with a question id — do NOT wait inline. When the operator answers, a system message \ - with event `operator_answered { id, question, answer }` lands in your inbox; handle it \ - on a future turn. Use this when a decision needs human signal (ambiguous sub-agent \ - request, policy call, scope clarification). `options` is advisory: pass a short \ - fixed-choice list when applicable, otherwise leave empty for free text. Set \ - `multi: true` to let the operator pick multiple options (checkboxes); the answer \ - comes back as a comma-separated string. Set `ttl_seconds` to auto-cancel a \ - no-longer-relevant question instead of blocking forever — on expiry the answer \ - is `[expired]` and the same `operator_answered` event fires." + description = "Surface a structured question to either the operator OR a sub-agent. \ + Returns immediately with a question id — do NOT wait inline. When the recipient \ + answers, a system message with event `question_answered { id, question, answer, \ + answerer }` lands in your inbox; handle it on a future turn. \n\n\ + Recipient: omit `to` (or set `to: \"operator\"`) for the human operator on the \ + dashboard. Set `to: \"\"` to ask a sub-agent — they receive a \ + `question_asked` event in their inbox and answer via their `mcp__hyperhive__answer` \ + tool. Useful for delegating decisions / clarifications without losing the \ + question id correlation. \n\n\ + `options` is advisory: pass a short fixed-choice list when applicable, otherwise \ + leave empty for free text. Set `multi: true` to render checkboxes; the answer \ + comes back as a comma-separated string. Set `ttl_seconds` to auto-cancel — on \ + expiry the answer is `[expired]` (with `answerer: \"ttl-watchdog\"`) and the same \ + `question_answered` event fires." )] - async fn ask_operator(&self, Parameters(args): Parameters) -> String { + async fn ask(&self, Parameters(args): Parameters) -> String { let log = format!("{args:?}"); - run_tool_envelope("ask_operator", log, async move { + run_tool_envelope("ask", log, async move { let (resp, retries) = self - .dispatch(hive_sh4re::ManagerRequest::AskOperator { + .dispatch(hive_sh4re::ManagerRequest::Ask { question: args.question, options: args.options, multi: args.multi, ttl_seconds: args.ttl_seconds, + to: args.to, }) .await; let s = match resp { Ok(SocketReply::QuestionQueued(id)) => format!( - "question queued (id={id}); operator's answer will arrive as a system \ - `operator_answered` event in your inbox" + "question queued (id={id}); answer will arrive as a system \ + `question_answered` event in your inbox" ), - Ok(SocketReply::Err(m)) => format!("ask_operator failed: {m}"), - Ok(other) => format!("ask_operator unexpected response: {other:?}"), - Err(e) => format!("ask_operator transport error: {e:#}"), + Ok(SocketReply::Err(m)) => format!("ask failed: {m}"), + Ok(other) => format!("ask unexpected response: {other:?}"), + Err(e) => format!("ask transport error: {e:#}"), }; annotate_retries(s, retries) }) .await } + #[tool( + description = "Answer a question that was routed to the manager via a `question_asked` \ + system event in the manager's inbox (i.e. a sub-agent did `ask(to: \"manager\", \ + ...)`). Pass the `id` from the event and your `answer`. The answer surfaces in the \ + asker's inbox as a `question_answered` event." + )] + async fn answer(&self, Parameters(args): Parameters) -> String { + let log = format!("{args:?}"); + let id = args.id; + run_tool_envelope("answer", log, async move { + let (resp, retries) = self + .dispatch(hive_sh4re::ManagerRequest::Answer { + id, + answer: args.answer, + }) + .await; + annotate_retries( + format_ack(resp, "answer", format!("answered question {id}")), + retries, + ) + }) + .await + } + #[tool( description = "Submit a config change for operator approval. Pass the agent name \ (e.g. `alice` or `hm1nd` for the manager's own config) and a commit sha in that \ @@ -744,9 +824,10 @@ impl ManagerServer { relay between them and the operator. Use `send` to talk to agents/operator, `recv` \ to drain your inbox. Privileged: `request_spawn` (new agent, gated on operator \ approval), `kill` (graceful stop), `request_apply_commit` (config change for \ - any agent including yourself), `ask_operator` (block on a human answer via the \ - dashboard). The manager's own config lives at \ - `/agents/hm1nd/config/agent.nix`." + any agent including yourself), `ask` (structured question to the operator or a \ + sub-agent — non-blocking, answer arrives later as a `question_answered` event), \ + `answer` (respond to a `question_asked` event directed at you). The manager's own \ + config lives at `/agents/hm1nd/config/agent.nix`." )] impl ServerHandler for ManagerServer {} @@ -780,7 +861,7 @@ pub enum Flavor { #[must_use] pub fn allowed_mcp_tools(flavor: Flavor) -> Vec { let names: &[&str] = match flavor { - Flavor::Agent => &["send", "recv", "ask_operator", "remind"], + Flavor::Agent => &["send", "recv", "ask", "answer", "remind"], Flavor::Manager => &[ "send", "recv", @@ -790,7 +871,8 @@ pub fn allowed_mcp_tools(flavor: Flavor) -> Vec { "restart", "update", "request_apply_commit", - "ask_operator", + "ask", + "answer", "get_logs", "remind", ], diff --git a/hive-c0re/assets/app.js b/hive-c0re/assets/app.js index 06f5e10..736bdde 100644 --- a/hive-c0re/assets/app.js +++ b/hive-c0re/assets/app.js @@ -37,7 +37,7 @@ // ─── browser notifications ────────────────────────────────────────────── // Fires OS notifications on three operator-bound signals: // - new approval landed in the queue - // - new operator question queued (ask_operator) + // - new operator question queued (ask, target IS NULL) // - broker message sent `to: "operator"` // permission grant is per-browser; a localStorage "muted" toggle lets // the operator silence without revoking. Secure-context only (HTTPS / diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index 39c3ed8..e9dfffd 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -97,34 +97,7 @@ fn recv_timeout(wait_seconds: Option) -> std::time::Duration { async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> AgentResponse { let broker = &coord.broker; match req { - AgentRequest::Send { to, body } => { - if let Err(message) = crate::limits::check_size("send", body) { - return AgentResponse::Err { message }; - } - // Handle broadcast sends (recipient = "*") - if to == "*" { - let errors = coord.broadcast_send(agent, body); - if errors.is_empty() { - AgentResponse::Ok - } else { - AgentResponse::Err { - message: format!("broadcast failed for agents: {}", errors.join(", ")), - } - } - } else { - // Normal unicast send - match broker.send(&Message { - from: agent.to_owned(), - to: to.clone(), - body: body.clone(), - }) { - Ok(()) => AgentResponse::Ok, - Err(e) => AgentResponse::Err { - message: format!("{e:#}"), - }, - } - } - } + AgentRequest::Send { to, body } => handle_send(coord, agent, to, body), AgentRequest::Recv { wait_seconds } => match broker .recv_blocking(agent, recv_timeout(*wait_seconds)) .await @@ -170,12 +143,32 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> message: format!("{e:#}"), }, }, - AgentRequest::AskOperator { + AgentRequest::Ask { question, options, multi, ttl_seconds, - } => handle_ask_operator(coord, agent, question, options, *multi, *ttl_seconds), + to, + } => crate::questions::handle_ask( + coord, + agent, + question, + options, + *multi, + *ttl_seconds, + to.as_deref(), + ) + .map_or_else( + |message| AgentResponse::Err { message }, + |id| AgentResponse::QuestionQueued { id }, + ), + AgentRequest::Answer { id, answer } => crate::questions::handle_answer( + coord, agent, *id, answer, + ) + .map_or_else( + |message| AgentResponse::Err { message }, + |()| AgentResponse::Ok, + ), AgentRequest::Remind { message, timing, @@ -184,36 +177,31 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> } } -fn handle_ask_operator( - coord: &Arc, - agent: &str, - question: &str, - options: &[String], - multi: bool, - ttl_seconds: Option, -) -> AgentResponse { - if let Err(message) = crate::limits::check_size("question", question) { +/// Common Send handler shared between dispatch arms. Applies the +/// 1 KiB body cap, then routes broadcast (`to == "*"`) vs unicast +/// through their respective broker calls. Pulled out of `dispatch` +/// to keep that function under the clippy too-many-lines limit; the +/// behaviour is identical to inlining. +fn handle_send(coord: &Arc, agent: &str, to: &str, body: &str) -> AgentResponse { + if let Err(message) = crate::limits::check_size("send", body) { return AgentResponse::Err { message }; } - let deadline_at = ttl_seconds.and_then(|s| { - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .ok() - .and_then(|d| i64::try_from(d.as_secs()).ok()) - .unwrap_or(0); - i64::try_from(s).ok().map(|s| now + s) - }); - match coord - .questions - .submit(agent, question, options, multi, deadline_at) - { - Ok(id) => { - tracing::info!(%id, %agent, ?deadline_at, "agent question queued"); - if let Some(ttl) = ttl_seconds { - crate::manager_server::spawn_question_watchdog(coord, id, ttl); + if to == "*" { + let errors = coord.broadcast_send(agent, body); + return if errors.is_empty() { + AgentResponse::Ok + } else { + AgentResponse::Err { + message: format!("broadcast failed for agents: {}", errors.join(", ")), } - AgentResponse::QuestionQueued { id } - } + }; + } + match coord.broker.send(&Message { + from: agent.to_owned(), + to: to.to_owned(), + body: body.to_owned(), + }) { + Ok(()) => AgentResponse::Ok, Err(e) => AgentResponse::Err { message: format!("{e:#}"), }, diff --git a/hive-c0re/src/coordinator.rs b/hive-c0re/src/coordinator.rs index 4df990e..0718924 100644 --- a/hive-c0re/src/coordinator.rs +++ b/hive-c0re/src/coordinator.rs @@ -171,7 +171,7 @@ impl Coordinator { let socket_path = Self::socket_path(name); // Hand the full Coordinator to the per-agent socket — it // needs broker + operator_questions to handle the agent-side - // `ask_operator` tool, not just the broker. + // `ask` / `answer` tools, not just the broker. let socket = agent_server::start(name, &socket_path, self.clone())?; self.agents.lock().unwrap().insert(name.to_owned(), socket); Ok(agent_dir) @@ -264,9 +264,9 @@ impl Coordinator { /// Push a `HelperEvent` into an arbitrary agent's inbox. Encoded /// the same way as `notify_manager` (sender = `SYSTEM_SENDER`, - /// body = JSON-encoded event). Used to route `OperatorAnswered` - /// events back to the agent that called `ask_operator`, not just - /// the manager. + /// body = JSON-encoded event). Used to route `QuestionAnswered` + /// events back to the agent that called `ask`, `QuestionAsked` + /// events to the target of a peer question, etc. pub fn notify_agent(&self, agent: &str, event: &hive_sh4re::HelperEvent) { let body = match serde_json::to_string(event) { Ok(s) => s, diff --git a/hive-c0re/src/dashboard.rs b/hive-c0re/src/dashboard.rs index 4e78acd..868d1d3 100644 --- a/hive-c0re/src/dashboard.rs +++ b/hive-c0re/src/dashboard.rs @@ -160,10 +160,12 @@ struct StateSnapshot { /// Last 30 resolved approvals (approved / denied / failed), newest- /// first. Drives the "history" tab on the approvals section. approval_history: Vec, - /// Pending operator questions (currently only from the manager). - /// `ask_operator` returns immediately with the id; on `/answer-question` - /// we mark the row answered and fire `HelperEvent::OperatorAnswered` - /// into the manager's inbox. + /// Pending operator-targeted questions (`target IS NULL`). Any + /// agent can `ask` the operator and `ask` returns immediately with + /// the id; on `/answer-question` we mark the row answered and + /// fire `HelperEvent::QuestionAnswered` back into the asker's + /// inbox. Peer-to-peer questions live in the same table but never + /// surface here (see `OperatorQuestions::pending`). questions: Vec, /// Last 20 answered questions, newest-first. question_history: Vec, @@ -827,15 +829,20 @@ async fn post_answer_question( if answer.is_empty() { return error_response("answer: required"); } - match state.coord.questions.answer(id, answer) { - Ok((question, asker)) => { + match state + .coord + .questions + .answer(id, answer, hive_sh4re::OPERATOR_RECIPIENT) + { + Ok((question, asker, _target)) => { tracing::info!(%id, %asker, "operator answered question"); state.coord.notify_agent( &asker, - &hive_sh4re::HelperEvent::OperatorAnswered { + &hive_sh4re::HelperEvent::QuestionAnswered { id, question, answer: answer.to_owned(), + answerer: hive_sh4re::OPERATOR_RECIPIENT.to_owned(), }, ); Redirect::to("/").into_response() @@ -845,8 +852,8 @@ async fn post_answer_question( } /// Resolve a pending operator question with a sentinel answer when -/// the operator decides not to / can't answer. The manager harness -/// receives an `OperatorAnswered` event with `answer = "[cancelled]"` +/// the operator decides not to / can't answer. The asker harness +/// receives a `QuestionAnswered` event with `answer = "[cancelled]"` /// so it can fall back on whatever default it had. Same code path as /// a real answer — just lets the operator close the loop instead of /// letting the question dangle forever. @@ -855,15 +862,20 @@ async fn post_cancel_question( AxumPath(id): AxumPath, ) -> Response { const SENTINEL: &str = "[cancelled]"; - match state.coord.questions.answer(id, SENTINEL) { - Ok((question, asker)) => { + match state + .coord + .questions + .answer(id, SENTINEL, hive_sh4re::OPERATOR_RECIPIENT) + { + Ok((question, asker, _target)) => { tracing::info!(%id, %asker, "operator cancelled question"); state.coord.notify_agent( &asker, - &hive_sh4re::HelperEvent::OperatorAnswered { + &hive_sh4re::HelperEvent::QuestionAnswered { id, question, answer: SENTINEL.to_owned(), + answerer: hive_sh4re::OPERATOR_RECIPIENT.to_owned(), }, ); Redirect::to("/").into_response() diff --git a/hive-c0re/src/limits.rs b/hive-c0re/src/limits.rs index eaef1e9..43d921c 100644 --- a/hive-c0re/src/limits.rs +++ b/hive-c0re/src/limits.rs @@ -10,10 +10,10 @@ //! about it — oversized reminder bodies get persisted to disk //! transparently and the inbox sees a pointer. -/// Per-message body cap. Applies to `send`, `ask_operator` question -/// text, and the stored inline form of a reminder. 1 KiB is small -/// enough that 100 unread messages don't dominate a wake prompt, -/// large enough for routine cross-agent chatter. +/// Per-message body cap. Applies to `send`, `ask` question text, +/// `answer` body, and the stored inline form of a reminder. 1 KiB +/// is small enough that 100 unread messages don't dominate a wake +/// prompt, large enough for routine cross-agent chatter. pub const MESSAGE_MAX_BYTES: usize = 1024; /// Validate that `body` fits under [`MESSAGE_MAX_BYTES`]. Returns a diff --git a/hive-c0re/src/main.rs b/hive-c0re/src/main.rs index 97c5adf..6c10a6e 100644 --- a/hive-c0re/src/main.rs +++ b/hive-c0re/src/main.rs @@ -23,6 +23,7 @@ mod manager_server; mod meta; mod migrate; mod operator_questions; +mod questions; mod reminder_scheduler; mod server; diff --git a/hive-c0re/src/manager_server.rs b/hive-c0re/src/manager_server.rs index e7b9307..2959c64 100644 --- a/hive-c0re/src/manager_server.rs +++ b/hive-c0re/src/manager_server.rs @@ -244,39 +244,30 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResp }, } } - ManagerRequest::AskOperator { + ManagerRequest::Ask { question, options, multi, ttl_seconds, - } => { - if let Err(message) = crate::limits::check_size("question", question) { - return ManagerResponse::Err { message }; - } - tracing::info!(%question, ?options, multi, ?ttl_seconds, "manager: ask_operator"); - let deadline_at = ttl_seconds.and_then(|s| { - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .ok() - .and_then(|d| i64::try_from(d.as_secs()).ok()) - .unwrap_or(0); - i64::try_from(s).ok().map(|s| now + s) - }); - match coord - .questions - .submit(MANAGER_AGENT, question, options, *multi, deadline_at) - { - Ok(id) => { - tracing::info!(%id, ?deadline_at, "operator question queued"); - if let Some(ttl) = *ttl_seconds { - spawn_question_watchdog(coord, id, ttl); - } - ManagerResponse::QuestionQueued { id } - } - Err(e) => ManagerResponse::Err { - message: format!("{e:#}"), - }, - } + to, + } => crate::questions::handle_ask( + coord, + MANAGER_AGENT, + question, + options, + *multi, + *ttl_seconds, + to.as_deref(), + ) + .map_or_else( + |message| ManagerResponse::Err { message }, + |id| ManagerResponse::QuestionQueued { id }, + ), + ManagerRequest::Answer { id, answer } => { + crate::questions::handle_answer(coord, MANAGER_AGENT, *id, answer).map_or_else( + |message| ManagerResponse::Err { message }, + |()| ManagerResponse::Ok, + ) } ManagerRequest::GetLogs { agent, lines } => { let n = lines.unwrap_or(50); @@ -402,28 +393,41 @@ async fn submit_apply_commit( Ok((id, sha)) } -/// On `AskOperator { ttl_seconds: Some(n) }`, sleep n seconds and then -/// try to resolve the question with `[expired]`. If the operator (or -/// any other path) already answered it, `answer()` returns Err and -/// we no-op silently. Otherwise fire the usual `OperatorAnswered` -/// helper event so the manager sees a terminal state. +/// On `Ask { ttl_seconds: Some(n) }`, sleep n seconds and then try to +/// resolve the question with `[expired]`. If the operator (or any +/// other path) already answered it, `answer()` returns Err and we +/// no-op silently. Otherwise fire a `QuestionAnswered` helper event +/// with `answerer = "ttl-watchdog"` so the asker can distinguish a +/// real answer from a deadline trip without parsing the answer text. const TTL_SENTINEL: &str = "[expired]"; +/// Synthetic `answerer` label used when the ttl watchdog resolves a +/// question instead of a real human / agent. Lives in a distinct +/// namespace from agent names + the operator so the asker can pattern +/// match `event.answerer == "ttl-watchdog"`. +const TTL_ANSWERER: &str = "ttl-watchdog"; pub fn spawn_question_watchdog(coord: &Arc, id: i64, ttl_secs: u64) { let coord = coord.clone(); tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_secs(ttl_secs)).await; - // `answer` returns Err if already resolved — that's the - // normal path when the operator responded before the ttl - // fired, so no-op silently. - if let Ok((question, asker)) = coord.questions.answer(id, TTL_SENTINEL) { - tracing::info!(%id, %asker, "operator question expired (ttl)"); + // Watchdog has its own answerer label so the authorisation + // check in `answer()` permits it for any target. We bypass + // the public `answer()` path by calling it with the operator + // identity, since the operator is always permitted; the + // event we fire carries the real watchdog label for observers. + if let Ok((question, asker, _target)) = + coord + .questions + .answer(id, TTL_SENTINEL, hive_sh4re::OPERATOR_RECIPIENT) + { + tracing::info!(%id, %asker, "question expired (ttl)"); coord.notify_agent( &asker, - &hive_sh4re::HelperEvent::OperatorAnswered { + &hive_sh4re::HelperEvent::QuestionAnswered { id, question, answer: TTL_SENTINEL.to_owned(), + answerer: TTL_ANSWERER.to_owned(), }, ); } diff --git a/hive-c0re/src/operator_questions.rs b/hive-c0re/src/operator_questions.rs index 3416213..6c24dc3 100644 --- a/hive-c0re/src/operator_questions.rs +++ b/hive-c0re/src/operator_questions.rs @@ -1,7 +1,13 @@ -//! Operator question queue. Manager submits via `AskOperator`; the -//! operator answers via the dashboard. The manager-socket handler long-polls -//! the store until the answer lands, so claude's `ask_operator` tool call -//! returns the answer directly as its result. +//! Question queue. Agents submit via `Ask`; the answer comes from +//! either the operator (via the dashboard, for `target IS NULL`) or +//! a peer agent (via `Answer`, for agent-to-agent questions). +//! +//! Despite the file name (kept for git history sanity), this table +//! now stores *all* asynchronous questions in the hive — both the +//! operator-targeted ones and the peer-to-peer ones. `target IS +//! NULL` is the operator path (back-compat with rows written before +//! the column existed); `target = ''` is the +//! agent-to-agent path. use std::path::Path; use std::sync::Mutex; @@ -38,6 +44,15 @@ fn ensure_columns(conn: &Connection) -> Result<()> { "deadline_at", "ALTER TABLE operator_questions ADD COLUMN deadline_at INTEGER;", ), + // `target` = recipient of the question. NULL = operator + // (back-compat default for rows written before agent-to-agent + // questions existed); a non-null agent name = peer-to-peer + // question. Dashboard's `pending()` filters on `target IS NULL` + // so peer questions never leak into the operator's queue. + ( + "target", + "ALTER TABLE operator_questions ADD COLUMN target TEXT;", + ), ] { let has: bool = conn .prepare(&format!( @@ -67,6 +82,12 @@ pub struct OpQuestion { pub deadline_at: Option, pub answered_at: Option, pub answer: Option, + /// Recipient of the question. `None` = the operator (dashboard + /// path); `Some()` = a peer agent asked via + /// `Ask { to: Some(), ... }`. Agent-to-agent questions + /// never appear in `pending()` so the operator's queue stays clean. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub target: Option, } pub struct OperatorQuestions { @@ -97,57 +118,89 @@ impl OperatorQuestions { options: &[String], multi: bool, deadline_at: Option, + target: Option<&str>, ) -> Result { let conn = self.conn.lock().unwrap(); let options_json = serde_json::to_string(options).unwrap_or_else(|_| "[]".into()); conn.execute( "INSERT INTO operator_questions - (asker, question, options_json, multi, deadline_at, asked_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + (asker, question, options_json, multi, deadline_at, target, asked_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", params![ asker, question, options_json, i64::from(multi), deadline_at, + target, now_unix(), ], )?; Ok(conn.last_insert_rowid()) } - /// Mark the question answered. Returns the original question text so the - /// Mark a pending question answered. Returns `(question, asker)` - /// so the caller can both echo the question back in a helper - /// event AND route that event to whichever agent originally - /// asked it. - pub fn answer(&self, id: i64, answer: &str) -> Result<(String, String)> { + /// Mark a pending question answered. `answerer` is who's actually + /// answering: `"operator"` for the dashboard path, or an agent's + /// own name when responding via `Answer`. Authorisation: + /// + /// - Operator-targeted questions (`target IS NULL`) can only be + /// answered by `"operator"`. (Agents must not be able to spoof + /// answers to operator questions — the dashboard is the + /// privileged path.) + /// - Agent-targeted questions can only be answered by the + /// declared target agent, OR by `"operator"` (operator override + /// for stuck threads — useful when an agent is offline/down + /// and someone has to close the loop). + /// + /// Returns `(question, asker, target)` so the caller can fire the + /// `QuestionAnswered` event with the right answerer label and route + /// it back to the original asker. + pub fn answer( + &self, + id: i64, + answer: &str, + answerer: &str, + ) -> Result<(String, String, Option)> { let conn = self.conn.lock().unwrap(); - let row: Option<(String, String, Option)> = conn + let row: Option<(String, String, Option, Option)> = conn .query_row( - "SELECT question, asker, answered_at FROM operator_questions WHERE id = ?1", + "SELECT question, asker, target, answered_at FROM operator_questions WHERE id = ?1", params![id], - |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), ) .optional()?; - let Some((question, asker, answered_at)) = row else { + let Some((question, asker, target, answered_at)) = row else { bail!("question {id} not found"); }; if answered_at.is_some() { bail!("question {id} already answered"); } + // Authorisation check: must match the target, or be the operator + // (operator-targeted questions are operator-only; the operator + // can additionally override agent-to-agent questions to close + // stuck threads). + let authorised = match target.as_deref() { + None => answerer == hive_sh4re::OPERATOR_RECIPIENT, + Some(t) => answerer == t || answerer == hive_sh4re::OPERATOR_RECIPIENT, + }; + if !authorised { + bail!( + "question {id} not addressed to '{answerer}' (target = {:?})", + target.as_deref().unwrap_or(hive_sh4re::OPERATOR_RECIPIENT) + ); + } conn.execute( "UPDATE operator_questions SET answer = ?1, answered_at = ?2 WHERE id = ?3", params![answer, now_unix(), id], )?; - Ok((question, asker)) + Ok((question, asker, target)) } #[allow(dead_code)] pub fn get(&self, id: i64) -> Result> { let conn = self.conn.lock().unwrap(); conn.query_row( - "SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at + "SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at, target FROM operator_questions WHERE id = ?1", params![id], row_to_question, @@ -156,12 +209,15 @@ impl OperatorQuestions { .map_err(Into::into) } + /// Pending operator-targeted questions only (`target IS NULL`). + /// Drives the dashboard's pending-question pane — agent-to-agent + /// questions never appear here so the operator's queue stays clean. pub fn pending(&self) -> Result> { let conn = self.conn.lock().unwrap(); let mut stmt = conn.prepare( - "SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at + "SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at, target FROM operator_questions - WHERE answered_at IS NULL + WHERE answered_at IS NULL AND target IS NULL ORDER BY id ASC", )?; let rows = stmt.query_map([], row_to_question)?; @@ -169,13 +225,15 @@ impl OperatorQuestions { .map_err(Into::into) } - /// Last `limit` answered questions, newest-first. + /// Last `limit` answered operator-targeted questions, newest-first. + /// Same `target IS NULL` filter as `pending()` so the dashboard's + /// history view only shows operator-relevant rows. pub fn recent_answered(&self, limit: u64) -> Result> { let conn = self.conn.lock().unwrap(); let mut stmt = conn.prepare( - "SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at + "SELECT id, asker, question, options_json, multi, asked_at, answered_at, answer, deadline_at, target FROM operator_questions - WHERE answered_at IS NOT NULL + WHERE answered_at IS NOT NULL AND target IS NULL ORDER BY answered_at DESC LIMIT ?1", )?; @@ -199,6 +257,7 @@ fn row_to_question(row: &rusqlite::Row<'_>) -> rusqlite::Result { answered_at: row.get(6)?, answer: row.get(7)?, deadline_at: row.get(8)?, + target: row.get(9)?, }) } diff --git a/hive-c0re/src/questions.rs b/hive-c0re/src/questions.rs new file mode 100644 index 0000000..d522eff --- /dev/null +++ b/hive-c0re/src/questions.rs @@ -0,0 +1,128 @@ +//! Shared dispatch helpers for the `Ask` / `Answer` flow. Both the +//! agent socket and the manager socket call into here so the routing +//! semantics — recipient = operator vs. peer agent, answerer +//! authorisation, asker-notification — only live in one place. +//! +//! Routing rules at a glance: +//! +//! - `Ask { to: None | Some("operator") }` → stored with `target = NULL`; +//! the dashboard's `pending()` query surfaces it; operator answers +//! via the dashboard. +//! - `Ask { to: Some() }` → stored with `target = `; +//! a `HelperEvent::QuestionAsked` is pushed into ``'s +//! inbox so they can `Answer { id, answer }` on their own socket. +//! - `Answer { id, answer }` → permission-checked in +//! `OperatorQuestions::answer` (only the target agent or the +//! operator can answer; both paths fire the same +//! `QuestionAnswered` event to the asker). + +use std::sync::Arc; + +use crate::coordinator::Coordinator; +use crate::limits; +use crate::manager_server::spawn_question_watchdog; + +/// Cap on how long an asker can demand an answer before the watchdog +/// auto-resolves with `[expired]`. Six hours mirrors typical agent +/// session lifetimes — beyond that an unanswered question is +/// effectively a dead thread and should be re-asked, not blocked on. +const MAX_TTL_SECONDS: u64 = 6 * 60 * 60; + +/// Handle either surface's `Ask` request. Returns the queued +/// question id on success or a caller-ready error string. Caller is +/// responsible for wrapping in the matching `*Response::Err` / +/// `QuestionQueued` variant. +pub fn handle_ask( + coord: &Arc, + asker: &str, + question: &str, + options: &[String], + multi: bool, + ttl_seconds: Option, + to: Option<&str>, +) -> Result { + limits::check_size("question", question)?; + // Normalise `Some("operator")` → None so the storage layer + // only has to think about NULL vs. non-NULL targets, not + // "is this string the operator?". + let target = match to { + None => None, + Some(t) if t == hive_sh4re::OPERATOR_RECIPIENT => None, + Some("") => { + return Err("ask: `to` cannot be empty (omit it for the operator path)".to_owned()); + } + Some(t) if t == asker => { + return Err("ask: cannot ask yourself a question (would loop forever)".to_owned()); + } + Some(t) => Some(t), + }; + let ttl = ttl_seconds.map(|s| s.min(MAX_TTL_SECONDS)); + let deadline_at = ttl.and_then(|s| { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + .unwrap_or(0); + i64::try_from(s).ok().map(|s| now + s) + }); + let id = coord + .questions + .submit(asker, question, options, multi, deadline_at, target) + .map_err(|e| format!("{e:#}"))?; + tracing::info!(%id, %asker, ?target, ?deadline_at, "question queued"); + // Agent-targeted questions need to wake the recipient — drop a + // QuestionAsked event into their inbox so the answerer doesn't + // have to poll. Operator-targeted questions show up on the + // dashboard's pending pane via `pending()` instead. + if let Some(target_agent) = target { + coord.notify_agent( + target_agent, + &hive_sh4re::HelperEvent::QuestionAsked { + id, + asker: asker.to_owned(), + question: question.to_owned(), + options: options.to_vec(), + multi, + }, + ); + } + if let Some(t) = ttl { + spawn_question_watchdog(coord, id, t); + } + Ok(id) +} + +/// Handle either surface's `Answer` request. Returns `Ok(())` on +/// success or a caller-ready error string. Authorisation lives in +/// `OperatorQuestions::answer` — we only have to wire the result +/// back to the asker as a `QuestionAnswered` event. +pub fn handle_answer( + coord: &Arc, + answerer: &str, + id: i64, + answer: &str, +) -> Result<(), String> { + limits::check_size("answer", answer)?; + let (question, asker, _target) = coord + .questions + .answer(id, answer, answerer) + .map_err(|e| format!("{e:#}"))?; + tracing::info!(%id, %answerer, %asker, "question answered"); + coord.notify_agent( + &asker, + &hive_sh4re::HelperEvent::QuestionAnswered { + id, + question, + answer: answer.to_owned(), + answerer: answerer.to_owned(), + }, + ); + Ok(()) +} + +// Real coverage needs a `Coordinator` fixture (broker + sqlite + +// in-memory questions). Skipped for now — the normalisation branches +// in `handle_ask` are short enough to read line-by-line; once we add +// a coord test harness, drop integration tests here for: self-target +// rejection, operator-string passthrough, agent-to-agent QuestionAsked +// emission, and `Answer` authorisation. diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index 6de7228..f76d73a 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -221,11 +221,17 @@ pub enum AgentRequest { /// Non-mutating — pulls from the broker without delivering. The /// per-agent web UI uses this to render its own inbox section. Recent { limit: u64 }, - /// Surface a question to the operator on the dashboard. Same - /// shape as `ManagerRequest::AskOperator` — any agent can ask; - /// the answer routes back to the asker's inbox as a - /// `HelperEvent::OperatorAnswered`. - AskOperator { + /// Surface a question to either the operator or another agent. + /// `to = None` (or `Some("operator")`) routes the question to the + /// dashboard's operator-question queue (legacy `AskOperator` + /// behaviour). `to = Some()` routes it to that agent's + /// inbox as a `HelperEvent::QuestionAsked` so the recipient can + /// answer back via `AgentRequest::Answer` (or + /// `ManagerRequest::Answer`); the answer threads back to the asker + /// as a `HelperEvent::QuestionAnswered` event. Either way the + /// response shape is `QuestionQueued { id }` — the asker uses the + /// id to correlate the asynchronous answer event. + Ask { question: String, #[serde(default)] options: Vec, @@ -233,7 +239,18 @@ pub enum AgentRequest { multi: bool, #[serde(default)] ttl_seconds: Option, + /// Recipient of the question. `None` or `Some("operator")` = + /// the human operator (dashboard); `Some()` = a + /// peer agent (their inbox). + #[serde(default)] + to: Option, }, + /// Answer a question previously routed to this agent via + /// `HelperEvent::QuestionAsked`. The caller is implicitly the + /// answerer; only the question's `target` agent (or the operator, + /// via the dashboard) is authorised. Wires through to + /// `HelperEvent::QuestionAnswered` in the asker's inbox. + Answer { id: i64, answer: String }, /// Schedule a reminder message to be delivered to this agent at a /// future time. The reminder lands in the agent's inbox as an auto-sent /// message from `"reminder"`. Use for agent follow-ups (e.g. check task @@ -264,8 +281,8 @@ pub enum AgentResponse { Status { unread: u64 }, /// `Recent` result: newest-first inbox rows. Recent { rows: Vec }, - /// `AskOperator` result: the queued question id. The answer lands - /// later as `HelperEvent::OperatorAnswered` in this agent's inbox. + /// `Ask` result: the queued question id. The answer lands later + /// as `HelperEvent::QuestionAnswered` in this agent's inbox. QuestionQueued { id: i64 }, } @@ -375,14 +392,32 @@ pub enum HelperEvent { #[serde(default, skip_serializing_if = "Option::is_none")] note: Option, }, - /// The operator answered a question that was queued via - /// `AskOperator`. `id` matches the `QuestionQueued.id` returned to the - /// asker; `question` echoes the original prompt so the manager can - /// stitch the answer back to context across compactions. - OperatorAnswered { + /// A question queued via `Ask` was answered (by the operator via + /// the dashboard, or by another agent via `Answer`). `id` matches + /// the `QuestionQueued.id` returned to the asker; `question` + /// echoes the original prompt so the asker can stitch the answer + /// back to context across compactions; `answerer` is who answered + /// (`"operator"` or a peer agent name). + QuestionAnswered { id: i64, question: String, answer: String, + answerer: String, + }, + /// A peer (or the manager) asked this agent a question via + /// `Ask { to: Some(), ... }`. The recipient should + /// answer via `Answer { id, answer }` on their socket; the answer + /// will route back to the asker as a `QuestionAnswered` event. + /// `options` + `multi` mirror the original `Ask` args so the + /// answerer knows what shape of reply is expected. + QuestionAsked { + id: i64, + asker: String, + question: String, + #[serde(default)] + options: Vec, + #[serde(default)] + multi: bool, }, } @@ -452,9 +487,10 @@ pub enum ManagerRequest { #[serde(default, skip_serializing_if = "Option::is_none")] description: Option, }, - /// Ask the operator a question. Returns immediately with the queued - /// question id; the operator's answer arrives later as a - /// `HelperEvent::OperatorAnswered` in the manager inbox. + /// Surface a question to either the operator or another agent. + /// Mirrors `AgentRequest::Ask` exactly — see that doc for the + /// routing semantics (operator = dashboard queue; agent = the + /// peer's inbox via `HelperEvent::QuestionAsked`). /// /// - `options` is advisory: empty = free-text only; non-empty = the /// dashboard renders the choices alongside a free-text fallback @@ -464,9 +500,11 @@ pub enum ManagerRequest { /// selections joined by ", ". /// - `ttl_seconds`: optional auto-cancel after that many seconds. On /// expiry the question is resolved with answer `[expired]` and the - /// manager gets the usual `OperatorAnswered` event. None = wait - /// forever for an operator answer (or manual cancel). - AskOperator { + /// asker gets the usual `QuestionAnswered` event. None = wait + /// forever for an answer (or manual cancel). + /// - `to`: recipient (None / `Some("operator")` = operator; + /// `Some()` = peer agent). + Ask { question: String, #[serde(default)] options: Vec, @@ -474,7 +512,13 @@ pub enum ManagerRequest { multi: bool, #[serde(default)] ttl_seconds: Option, + #[serde(default)] + to: Option, }, + /// Answer a question previously routed to the manager via + /// `HelperEvent::QuestionAsked` (i.e. an agent asked the manager + /// for input). Mirror of `AgentRequest::Answer`. + Answer { id: i64, answer: String }, /// Fetch recent journal lines for a sub-agent container. hive-c0re /// runs `journalctl -M -n --no-pager` and returns /// the output as a string. Useful for diagnosing MCP registration @@ -514,9 +558,10 @@ pub enum ManagerResponse { Status { unread: u64, }, - /// Result of `AskOperator`: the queued question id. The actual answer - /// arrives later as a `HelperEvent::OperatorAnswered` in the manager - /// inbox, so this returns immediately rather than blocking the turn. + /// Result of `Ask`: the queued question id. The actual answer + /// arrives later as a `HelperEvent::QuestionAnswered` in the + /// asker's inbox, so this returns immediately rather than blocking + /// the turn. QuestionQueued { id: i64, },