diff --git a/hive-ag3nt/prompts/agent.md b/hive-ag3nt/prompts/agent.md index 724d673..66b0243 100644 --- a/hive-ag3nt/prompts/agent.md +++ b/hive-ag3nt/prompts/agent.md @@ -4,11 +4,10 @@ Tools (hyperhive surface): - `mcp__hyperhive__recv(wait_seconds?)` — drain one more message from your inbox (returns `(empty)` if nothing pending after the wait). Without `wait_seconds` it long-polls 30s. To **wait** for work when you have nothing else useful to do this turn, call with a long wait (e.g. `wait_seconds: 180`, the max) — you'll be woken instantly when a message arrives, otherwise return after the timeout. That is strictly better than calling `recv` repeatedly with short waits: lower latency on new work, fewer turns, no busy-loop. Never use a fixed `sleep` shell command for the same purpose. - `mcp__hyperhive__send(to, body)` — message a peer (by their name) or the operator (recipient `operator`, surfaces in the dashboard). +- `mcp__hyperhive__ask_operator(question, options?, multi?, ttl_seconds?)` — surface a question to the human operator on the dashboard. Returns immediately with a question id — do NOT wait inline. When the operator answers, a system message with event `operator_answered { id, question, answer }` lands in your inbox; handle it on a future turn. Use this for clarifications, permission for risky actions, or choice between options. `options` is advisory: a short fixed-choice list when applicable, otherwise leave empty for free text. `multi: true` lets the operator pick multiple (checkboxes), answer comes back comma-joined. `ttl_seconds` auto-cancels with answer `[expired]` when the decision becomes moot. Need new packages, env vars, or other NixOS config for yourself? You can't edit your own config directly — message the manager (recipient `manager`) describing what you need + why. The manager evaluates the request (it doesn't rubber-stamp), edits `/agents/{label}/config/agent.nix` on your behalf, commits, and submits an approval that the operator can accept on the dashboard; on approve hive-c0re rebuilds your container with the new config. -Need to ask the human operator a question (clarification, permission, choice)? You don't have direct operator access — ask the manager to surface the question on your behalf ("please ask the operator: …"). The manager has a channel for this. - Durable knowledge: write to `/state/notes.md` (free-form) or any other path under `/state/`. That directory is bind-mounted from the host and persists across container destroy/recreate — claude's `--continue` session only carries short-term context, but `/state/` is forever. Read it back at the start of relevant turns to remember things across resets. Keep messages short — a few sentences each. For anything big (file listings, long diffs, transcripts, analysis): write the payload to `/state/` and `send` a short pointer ("dropped the cluster audit in /state/cluster-audit-2026-05.md, headline: 3 nodes over 80% mem"). The manager + operator can read your `/state/` from the host as `/agents/{label}/state/`. Sub-agent peers can't read each other's `/state/` directly — go through the manager if a payload needs to reach another sub-agent. diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index ee77d29..3a1b201 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -125,7 +125,10 @@ async fn serve( bus.set_state(TurnState::Idle); } Ok(AgentResponse::Empty) => {} - Ok(AgentResponse::Ok | AgentResponse::Status { .. } | AgentResponse::Recent { .. }) => { + Ok(AgentResponse::Ok + | AgentResponse::Status { .. } + | AgentResponse::Recent { .. } + | AgentResponse::QuestionQueued { .. }) => { tracing::warn!("recv produced unexpected response kind"); } Ok(AgentResponse::Err { message }) => { diff --git a/hive-ag3nt/src/mcp.rs b/hive-ag3nt/src/mcp.rs index dcfe56e..ab0e749 100644 --- a/hive-ag3nt/src/mcp.rs +++ b/hive-ag3nt/src/mcp.rs @@ -50,6 +50,7 @@ impl From for SocketReply { hive_sh4re::AgentResponse::Empty => Self::Empty, hive_sh4re::AgentResponse::Status { unread } => Self::Status(unread), hive_sh4re::AgentResponse::Recent { rows } => Self::Recent(rows), + hive_sh4re::AgentResponse::QuestionQueued { id } => Self::QuestionQueued(id), } } } @@ -163,6 +164,45 @@ impl AgentServer { .await } + #[tool( + description = "Surface a question to the operator on the dashboard. Returns immediately \ + with a question id — do NOT wait inline. When the operator answers, a system message \ + with event `operator_answered { id, question, answer }` lands in your inbox; handle it \ + on a future turn. Use this when a decision needs human signal (ambiguous scope, \ + permission to do something risky, choosing between options). `options` is advisory: \ + pass a short fixed-choice list when applicable, otherwise leave empty for free text. \ + Set `multi: true` to let the operator pick multiple options (checkboxes); the answer \ + comes back as a comma-separated string. Set `ttl_seconds` to auto-cancel a \ + no-longer-relevant question — on expiry the answer is `[expired]` and the same \ + `operator_answered` event fires." + )] + async fn ask_operator(&self, Parameters(args): Parameters) -> String { + let log = format!("{args:?}"); + run_tool_envelope("ask_operator", log, async move { + let resp = client::request::<_, hive_sh4re::AgentResponse>( + &self.socket, + &hive_sh4re::AgentRequest::AskOperator { + question: args.question, + options: args.options, + multi: args.multi, + ttl_seconds: args.ttl_seconds, + }, + ) + .await + .map(SocketReply::from); + match resp { + Ok(SocketReply::QuestionQueued(id)) => format!( + "question queued (id={id}); operator's answer will arrive as a system \ + `operator_answered` event in your inbox" + ), + Ok(SocketReply::Err(m)) => format!("ask_operator failed: {m}"), + Ok(other) => format!("ask_operator unexpected response: {other:?}"), + Err(e) => format!("ask_operator transport error: {e:#}"), + } + }) + .await + } + #[tool( description = "Pop one message from this agent's inbox. Returns the sender and body, \ or an empty marker if nothing is waiting. Optional `wait_seconds` long-polls \ @@ -527,7 +567,7 @@ pub enum Flavor { #[must_use] pub fn allowed_mcp_tools(flavor: Flavor) -> Vec { let names: &[&str] = match flavor { - Flavor::Agent => &["send", "recv"], + Flavor::Agent => &["send", "recv", "ask_operator"], Flavor::Manager => &[ "send", "recv", diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index c47e5b4..ed5b7fb 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -11,14 +11,18 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; use tokio::task::JoinHandle; -use crate::broker::Broker; +use crate::coordinator::Coordinator; pub struct AgentSocket { pub path: PathBuf, pub handle: JoinHandle<()>, } -pub fn start(agent: &str, socket_path: &Path, broker: Arc) -> Result { +pub fn start( + agent: &str, + socket_path: &Path, + coord: Arc, +) -> Result { let agent = agent.to_owned(); if let Some(parent) = socket_path.parent() { std::fs::create_dir_all(parent) @@ -37,9 +41,9 @@ pub fn start(agent: &str, socket_path: &Path, broker: Arc) -> Result { let agent = agent.clone(); - let broker = broker.clone(); + let coord = coord.clone(); tokio::spawn(async move { - if let Err(e) = serve(stream, agent, broker).await { + if let Err(e) = serve(stream, agent, coord).await { tracing::warn!(error = ?e, "agent connection failed"); } }); @@ -54,7 +58,7 @@ pub fn start(agent: &str, socket_path: &Path, broker: Arc) -> Result) -> Result<()> { +async fn serve(stream: UnixStream, agent: String, coord: Arc) -> Result<()> { let (read, mut write) = stream.into_split(); let mut reader = BufReader::new(read); let mut line = String::new(); @@ -65,7 +69,7 @@ async fn serve(stream: UnixStream, agent: String, broker: Arc) -> Result return Ok(()); } let resp = match serde_json::from_str::(line.trim()) { - Ok(req) => dispatch(&req, &agent, &broker).await, + Ok(req) => dispatch(&req, &agent, &coord).await, Err(e) => AgentResponse::Err { message: format!("parse error: {e}"), }, @@ -93,7 +97,8 @@ fn recv_timeout(wait_seconds: Option) -> std::time::Duration { } } -async fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse { +async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> AgentResponse { + let broker = &coord.broker; match req { AgentRequest::Send { to, body } => { match broker.send(&Message { @@ -142,5 +147,35 @@ async fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResp message: format!("{e:#}"), }, }, + AgentRequest::AskOperator { + question, + options, + multi, + ttl_seconds, + } => { + let deadline_at = ttl_seconds.and_then(|s| { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_secs()).ok()) + .unwrap_or(0); + i64::try_from(s).ok().map(|s| now + s) + }); + match coord + .questions + .submit(agent, question, options, *multi, deadline_at) + { + Ok(id) => { + tracing::info!(%id, %agent, ?deadline_at, "agent question queued"); + if let Some(ttl) = *ttl_seconds { + crate::manager_server::spawn_question_watchdog(coord, id, ttl); + } + AgentResponse::QuestionQueued { id } + } + Err(e) => AgentResponse::Err { + message: format!("{e:#}"), + }, + } + } } } diff --git a/hive-c0re/src/coordinator.rs b/hive-c0re/src/coordinator.rs index 93ef7f6..96b2d75 100644 --- a/hive-c0re/src/coordinator.rs +++ b/hive-c0re/src/coordinator.rs @@ -82,7 +82,7 @@ impl Coordinator { }) } - pub fn register_agent(&self, name: &str) -> Result { + pub fn register_agent(self: &Arc, name: &str) -> Result { // Idempotent: drop any existing listener so re-registration (e.g. on rebuild, // or after a hive-c0re restart cleared /run/hyperhive) gets a fresh socket. self.unregister_agent(name); @@ -90,7 +90,10 @@ impl Coordinator { std::fs::create_dir_all(&agent_dir) .with_context(|| format!("create agent dir {}", agent_dir.display()))?; let socket_path = Self::socket_path(name); - let socket = agent_server::start(name, &socket_path, self.broker.clone())?; + // Hand the full Coordinator to the per-agent socket — it + // needs broker + operator_questions to handle the agent-side + // `ask_operator` tool, not just the broker. + let socket = agent_server::start(name, &socket_path, self.clone())?; self.agents.lock().unwrap().insert(name.to_owned(), socket); Ok(agent_dir) } @@ -148,6 +151,15 @@ impl Coordinator { /// recognises the sender and parses the body. Best-effort: a serde or /// broker error is logged but does not propagate. pub fn notify_manager(&self, event: &hive_sh4re::HelperEvent) { + self.notify_agent(hive_sh4re::MANAGER_AGENT, event); + } + + /// Push a `HelperEvent` into an arbitrary agent's inbox. Encoded + /// the same way as `notify_manager` (sender = `SYSTEM_SENDER`, + /// body = JSON-encoded event). Used to route `OperatorAnswered` + /// events back to the agent that called `ask_operator`, not just + /// the manager. + pub fn notify_agent(&self, agent: &str, event: &hive_sh4re::HelperEvent) { let body = match serde_json::to_string(event) { Ok(s) => s, Err(e) => { @@ -157,10 +169,10 @@ impl Coordinator { }; if let Err(e) = self.broker.send(&hive_sh4re::Message { from: hive_sh4re::SYSTEM_SENDER.to_owned(), - to: hive_sh4re::MANAGER_AGENT.to_owned(), + to: agent.to_owned(), body, }) { - tracing::warn!(error = ?e, "failed to push helper event to manager"); + tracing::warn!(error = ?e, target = %agent, "failed to push helper event"); } } @@ -185,7 +197,7 @@ impl Coordinator { /// the dir. For sub-agents this is `register_agent` (creates a fresh /// listener bound to `socket_path(name)`). Source directory of the /// `/run/hive/mcp.sock` bind that ends up in `set_nspawn_flags`. - pub fn ensure_runtime(&self, name: &str) -> Result { + pub fn ensure_runtime(self: &Arc, name: &str) -> Result { if name == crate::lifecycle::MANAGER_NAME { let dir = Self::manager_dir(); std::fs::create_dir_all(&dir) diff --git a/hive-c0re/src/dashboard.rs b/hive-c0re/src/dashboard.rs index 210b976..0027465 100644 --- a/hive-c0re/src/dashboard.rs +++ b/hive-c0re/src/dashboard.rs @@ -528,15 +528,16 @@ async fn post_answer_question( return error_response("answer: required"); } match state.coord.questions.answer(id, answer) { - Ok(question) => { - tracing::info!(%id, "operator answered question"); - state - .coord - .notify_manager(&hive_sh4re::HelperEvent::OperatorAnswered { + Ok((question, asker)) => { + tracing::info!(%id, %asker, "operator answered question"); + state.coord.notify_agent( + &asker, + &hive_sh4re::HelperEvent::OperatorAnswered { id, question, answer: answer.to_owned(), - }); + }, + ); Redirect::to("/").into_response() } Err(e) => error_response(&format!("answer {id} failed: {e:#}")), @@ -555,15 +556,16 @@ async fn post_cancel_question( ) -> Response { const SENTINEL: &str = "[cancelled]"; match state.coord.questions.answer(id, SENTINEL) { - Ok(question) => { - tracing::info!(%id, "operator cancelled question"); - state - .coord - .notify_manager(&hive_sh4re::HelperEvent::OperatorAnswered { + Ok((question, asker)) => { + tracing::info!(%id, %asker, "operator cancelled question"); + state.coord.notify_agent( + &asker, + &hive_sh4re::HelperEvent::OperatorAnswered { id, question, answer: SENTINEL.to_owned(), - }); + }, + ); Redirect::to("/").into_response() } Err(e) => error_response(&format!("cancel-question {id} failed: {e:#}")), diff --git a/hive-c0re/src/manager_server.rs b/hive-c0re/src/manager_server.rs index 2a47d34..6913d09 100644 --- a/hive-c0re/src/manager_server.rs +++ b/hive-c0re/src/manager_server.rs @@ -388,20 +388,23 @@ async fn proposal_modifies( /// helper event so the manager sees a terminal state. const TTL_SENTINEL: &str = "[expired]"; -fn spawn_question_watchdog(coord: &Arc, id: i64, ttl_secs: u64) { +pub fn spawn_question_watchdog(coord: &Arc, id: i64, ttl_secs: u64) { let coord = coord.clone(); tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_secs(ttl_secs)).await; // `answer` returns Err if already resolved — that's the // normal path when the operator responded before the ttl // fired, so no-op silently. - if let Ok(question) = coord.questions.answer(id, TTL_SENTINEL) { - tracing::info!(%id, "operator question expired (ttl)"); - coord.notify_manager(&hive_sh4re::HelperEvent::OperatorAnswered { - id, - question, - answer: TTL_SENTINEL.to_owned(), - }); + if let Ok((question, asker)) = coord.questions.answer(id, TTL_SENTINEL) { + tracing::info!(%id, %asker, "operator question expired (ttl)"); + coord.notify_agent( + &asker, + &hive_sh4re::HelperEvent::OperatorAnswered { + id, + question, + answer: TTL_SENTINEL.to_owned(), + }, + ); } }); } diff --git a/hive-c0re/src/operator_questions.rs b/hive-c0re/src/operator_questions.rs index 4ca652e..c04c3bd 100644 --- a/hive-c0re/src/operator_questions.rs +++ b/hive-c0re/src/operator_questions.rs @@ -117,17 +117,20 @@ impl OperatorQuestions { } /// Mark the question answered. Returns the original question text so the - /// caller can include it in any helper event it fires off. - pub fn answer(&self, id: i64, answer: &str) -> Result { + /// Mark a pending question answered. Returns `(question, asker)` + /// so the caller can both echo the question back in a helper + /// event AND route that event to whichever agent originally + /// asked it. + pub fn answer(&self, id: i64, answer: &str) -> Result<(String, String)> { let conn = self.conn.lock().unwrap(); - let question: Option<(String, Option)> = conn + let row: Option<(String, String, Option)> = conn .query_row( - "SELECT question, answered_at FROM operator_questions WHERE id = ?1", + "SELECT question, asker, answered_at FROM operator_questions WHERE id = ?1", params![id], - |row| Ok((row.get(0)?, row.get(1)?)), + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), ) .optional()?; - let Some((question, answered_at)) = question else { + let Some((question, asker, answered_at)) = row else { bail!("question {id} not found"); }; if answered_at.is_some() { @@ -137,7 +140,7 @@ impl OperatorQuestions { "UPDATE operator_questions SET answer = ?1, answered_at = ?2 WHERE id = ?3", params![answer, now_unix(), id], )?; - Ok(question) + Ok((question, asker)) } #[allow(dead_code)] diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index 5d4a4fb..298d5ad 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -195,6 +195,19 @@ pub enum AgentRequest { /// Non-mutating — pulls from the broker without delivering. The /// per-agent web UI uses this to render its own inbox section. Recent { limit: u64 }, + /// Surface a question to the operator on the dashboard. Same + /// shape as `ManagerRequest::AskOperator` — any agent can ask; + /// the answer routes back to the asker's inbox as a + /// `HelperEvent::OperatorAnswered`. + AskOperator { + question: String, + #[serde(default)] + options: Vec, + #[serde(default)] + multi: bool, + #[serde(default)] + ttl_seconds: Option, + }, } /// Responses on a per-agent socket. @@ -213,6 +226,9 @@ pub enum AgentResponse { Status { unread: u64 }, /// `Recent` result: newest-first inbox rows. Recent { rows: Vec }, + /// `AskOperator` result: the queued question id. The answer lands + /// later as `HelperEvent::OperatorAnswered` in this agent's inbox. + QuestionQueued { id: i64 }, } // -----------------------------------------------------------------------------