open_threads: new get_open_threads MCP tool on agent + manager surfaces

This commit is contained in:
damocles 2026-05-17 22:39:10 +02:00
parent 9ec0d60308
commit dc1ce1f236
11 changed files with 305 additions and 9 deletions

View file

@ -8,10 +8,8 @@
- **Broadcast messaging**: allow sending messages with recipient "*" to all agents; deliver with hint "this was a broadcast and may not need any action from you"
- **Multi-agent restart coordination**: when rebuilding all agents, manager should start first so it can coordinate post-restart confusion (notify agents, suppress unnecessary retries, etc)
- **Shared docs/skills repo (RO)**: a single repo on the hive forge that every agent has read-only access to — common references, prompts, runbooks, "skills" the operator wants every agent to inherit without baking into the system prompt or `/shared`. Implementation likely: seed an `org-shared/docs` repo on first hive-forge boot, grant every per-agent user a read membership in the org. Agents `git clone` it (or use the API) to read; only the manager + operator can push.
- **Loose-ends tracker + `get_open_threads` tool**: hive-c0re already knows about pending approvals + unanswered questions; soon will also know about open PRs on hive-forge. Aggregate these into a per-agent "open threads" view (e.g. `[{kind: "approval", id: 7, summary: "spawn alice"}, {kind: "question", id: 12, asker: "alice", summary: "deploy now?"}]`). New MCP tool `mcp__hyperhive__get_open_threads` returns the list so an agent can see what's still pending against it without rebuilding context from inbox history. Manager's version includes hive-wide threads. **Also surface this list on the per-agent web UI** so the operator can see at a glance what each agent has hanging open — same data source as the MCP tool, just rendered into the existing per-agent dashboard page (next to inbox view / model chip / etc).
- **Scope per agent X (confirmed with operator):** include BOTH (a) unanswered questions where `asker == X` (X is waiting on someone) AND (b) unanswered questions where `target == X` (X owes an answer). Distinguish via a `role: "asker" | "target"` field on the question variant so the agent can render "waiting on" vs "owe a reply" appropriately. Approvals: include rows where the submitter is X (waiting on the operator). Forge PRs (future): include open PRs where X is author OR reviewer.
- **Wire shape sketch:** new `AgentRequest::GetOpenThreads` / `ManagerRequest::GetOpenThreads` returning `Response::OpenThreads { threads: Vec<OpenThread> }` with `OpenThread` as a tagged enum (`{kind: "approval", id, summary, age_seconds}` / `{kind: "question", id, role, counterparty, summary, age_seconds}` / future `{kind: "pr", ...}`). Manager flavour returns hive-wide threads (no asker/target filter). MCP tool `get_open_threads` takes no args.
- **Aggregator location:** new helper on `Coordinator` (or a dedicated `open_threads.rs`) so both surfaces share the query logic; queries `approvals` + `operator_questions` tables with a single per-call sweep (no caching — call frequency is low).
- ~~**Loose-ends tracker + `get_open_threads` tool**~~ ✓ landed — new `mcp__hyperhive__get_open_threads` MCP tool on both agent + manager surfaces. Wire types in `hive-sh4re`: `AgentRequest::GetOpenThreads` / `ManagerRequest::GetOpenThreads``OpenThreads { threads: Vec<OpenThread> }`. `OpenThread` is a tagged enum with `Approval { id, agent, commit_ref, description, age_seconds }` and `Question { id, asker, target, question, age_seconds }`. Shared aggregator at `hive-c0re/src/open_threads.rs`: `for_agent(coord, name)` (sub-agent surface; filters questions by asker == self OR target == self, approvals only for manager) and `hive_wide(coord)` (manager surface; everything pending in the swarm). No caching — fresh sqlite sweep per call. **Per-agent web UI rendering** is a follow-up below.
- **Follow-up: surface open-threads on the per-agent web UI** so the operator can see at a glance what each agent has hanging open — same data source as the MCP tool, just rendered into the existing per-agent dashboard page (next to inbox view / model chip / etc).
## Reminder Tool

View file

@ -7,6 +7,7 @@ Tools (hyperhive surface):
- (some agents only) **extra MCP tools** surfaced as `mcp__<server>__<tool>` — these are agent-specific (matrix client, scraper, db connector, etc.) declared in your `agent.nix` under `hyperhive.extraMcpServers`. Treat them as first-class tools alongside the hyperhive surface; the operator already auto-approved them at deploy time.
- `mcp__hyperhive__ask(question, options?, multi?, ttl_seconds?, to?)` — surface a structured question to the human operator (default, or `to: "operator"`) OR a peer agent (`to: "<agent-name>"`). Returns immediately with a question id — do NOT wait inline. When the recipient answers, a system message with event `question_answered { id, question, answer, answerer }` lands in your inbox; handle it on a future turn. Use this for clarifications, permission for risky actions, choice between options, or peer Q&A without burning regular inbox slots. `options` is advisory: a short fixed-choice list when applicable, otherwise leave empty for free text. `multi: true` lets the answerer pick multiple (checkboxes), answer comes back comma-joined. `ttl_seconds` auto-cancels with answer `[expired]` (and `answerer: "ttl-watchdog"`) when the decision becomes moot.
- `mcp__hyperhive__answer(id, answer)` — answer a question that was routed to YOU. You'll see one in your inbox as a `question_asked { id, asker, question, options, multi }` system event when a peer or the manager calls `ask(to: "<your-name>", ...)`. The answer surfaces in the asker's inbox as a `question_answered` event. Strict authorisation: you can only answer questions where you are the declared target.
- `mcp__hyperhive__get_open_threads()` — list your loose ends: unanswered questions where you're asker (waiting on someone) or target (owing a reply). No args, cheap server-side sweep. Useful at turn start to remember what's outstanding without scanning inbox archaeology.
Need new packages, env vars, or other NixOS config for yourself? You can't edit your own config directly — message the manager (recipient `manager`) describing what you need + why. The manager evaluates the request (it doesn't rubber-stamp), edits `/agents/{label}/config/agent.nix` on your behalf, commits, and submits an approval that the operator can accept on the dashboard; on approve hive-c0re rebuilds your container with the new config.

View file

@ -12,6 +12,7 @@ Tools (hyperhive surface):
- `mcp__hyperhive__request_apply_commit(agent, commit_ref, description?)` — submit a config change for any agent (`hm1nd` for self) for operator approval. Pass an optional `description` and it appears on the dashboard approval card so the operator knows what changed without opening the diff. At submit time hive-c0re fetches your commit into the agent's applied repo and pins it as `proposal/<id>`; from that moment your proposed-side commit can be amended or force-pushed freely without changing what the operator will build.
- `mcp__hyperhive__ask(question, options?, multi?, ttl_seconds?, to?)` — surface a structured question to the operator (default, or `to: "operator"`) OR a sub-agent (`to: "<agent-name>"`). Returns immediately with a question id; the answer arrives later as a system `question_answered { id, question, answer, answerer }` event in your inbox. Options are advisory: the dashboard always lets the operator type a free-text answer in addition. Set `multi: true` to render options as checkboxes (operator can pick multiple); the answer comes back as `, `-separated. Set `ttl_seconds` to auto-cancel after a deadline (capped at 6h server-side) — on expiry the answer is `[expired]` and `answerer` is `"ttl-watchdog"`. Do not poll inside the same turn — finish the current work and react when the event lands.
- `mcp__hyperhive__answer(id, answer)` — answer a question that was routed to YOU (a sub-agent did `ask(to: "manager", ...)`). The triggering event in your inbox is `question_asked { id, asker, question, options, multi }`. The answer surfaces in the asker's inbox as a `question_answered` event.
- `mcp__hyperhive__get_open_threads()` — hive-wide loose ends: every pending approval + every unanswered question across the swarm. Cheap server-side sweep, no args. Use to find stalled threads (sub-agent A asked B something three days ago and B never answered) before they rot.
Approval boundary: lifecycle ops on *existing* sub-agents (`kill`, `start`, `restart`) are at your discretion — no operator approval. *Creating* a new agent (`request_spawn`) and *changing* any agent's config (`request_apply_commit`) still go through the approval queue. The operator only signs off on changes; you run the day-to-day.

View file

@ -208,7 +208,8 @@ async fn serve(
AgentResponse::Ok
| AgentResponse::Status { .. }
| AgentResponse::Recent { .. }
| AgentResponse::QuestionQueued { .. },
| AgentResponse::QuestionQueued { .. }
| AgentResponse::OpenThreads { .. },
) => {
tracing::warn!("recv produced unexpected response kind");
}

View file

@ -175,7 +175,8 @@ async fn serve(
| ManagerResponse::Status { .. }
| ManagerResponse::QuestionQueued { .. }
| ManagerResponse::Recent { .. }
| ManagerResponse::Logs { .. },
| ManagerResponse::Logs { .. }
| ManagerResponse::OpenThreads { .. },
) => {
tracing::warn!("recv produced unexpected response kind");
}

View file

@ -40,6 +40,7 @@ pub enum SocketReply {
QuestionQueued(i64),
Recent(Vec<hive_sh4re::InboxRow>),
Logs(String),
OpenThreads(Vec<hive_sh4re::OpenThread>),
}
impl From<hive_sh4re::AgentResponse> for SocketReply {
@ -52,6 +53,7 @@ impl From<hive_sh4re::AgentResponse> for SocketReply {
hive_sh4re::AgentResponse::Status { unread } => Self::Status(unread),
hive_sh4re::AgentResponse::Recent { rows } => Self::Recent(rows),
hive_sh4re::AgentResponse::QuestionQueued { id } => Self::QuestionQueued(id),
hive_sh4re::AgentResponse::OpenThreads { threads } => Self::OpenThreads(threads),
}
}
}
@ -67,6 +69,7 @@ impl From<hive_sh4re::ManagerResponse> for SocketReply {
hive_sh4re::ManagerResponse::QuestionQueued { id } => Self::QuestionQueued(id),
hive_sh4re::ManagerResponse::Recent { rows } => Self::Recent(rows),
hive_sh4re::ManagerResponse::Logs { content } => Self::Logs(content),
hive_sh4re::ManagerResponse::OpenThreads { threads } => Self::OpenThreads(threads),
}
}
}
@ -95,6 +98,57 @@ pub fn format_recv(resp: Result<SocketReply, anyhow::Error>) -> String {
}
}
/// Format helper for `get_open_threads`: renders a short bulleted list
/// of pending approvals + questions. Empty list collapses to a clear
/// marker so claude doesn't go hunting for a payload that isn't there.
pub fn format_open_threads(resp: Result<SocketReply, anyhow::Error>) -> String {
use std::fmt::Write as _;
let threads = match resp {
Ok(SocketReply::OpenThreads(t)) => t,
Ok(SocketReply::Err(m)) => return format!("get_open_threads failed: {m}"),
Ok(other) => return format!("get_open_threads unexpected response: {other:?}"),
Err(e) => return format!("get_open_threads transport error: {e:#}"),
};
if threads.is_empty() {
return "(no open threads)".to_owned();
}
let mut out = format!("{} open thread(s):\n", threads.len());
for t in &threads {
match t {
hive_sh4re::OpenThread::Approval {
id,
agent,
commit_ref,
description,
age_seconds,
} => {
let desc = description
.as_deref()
.map(|d| format!("{d}"))
.unwrap_or_default();
let _ = writeln!(
out,
"- approval #{id} ({agent} @ {commit_ref}, {age_seconds}s old){desc}"
);
}
hive_sh4re::OpenThread::Question {
id,
asker,
target,
question,
age_seconds,
} => {
let to = target.as_deref().unwrap_or("operator");
let _ = writeln!(
out,
"- question #{id} ({asker} → {to}, {age_seconds}s old): {question}"
);
}
}
}
out
}
/// Common envelope around every MCP tool handler: pre-log → run →
/// post-log. The inbox-status hint used to be appended to every tool
/// result; that lives in the wake prompt + UI header now, so tool
@ -317,6 +371,23 @@ impl AgentServer {
.await
}
#[tool(
description = "List loose ends pending against this agent: unanswered questions \
where you are the asker (waiting on someone) or the target (someone's waiting on \
you), plus for the manager only pending approvals you submitted that the \
operator hasn't acted on yet. Cheap server-side sweep, no args. Useful at turn \
start to remember what you owe / what's owed to you without scrolling inbox \
history. Output is a short bulleted list with ids, ages in seconds, and the \
relevant context. Empty result is reported clearly."
)]
async fn get_open_threads(&self) -> String {
run_tool_envelope("get_open_threads", String::new(), async move {
let (resp, retries) = self.dispatch(hive_sh4re::AgentRequest::GetOpenThreads).await;
annotate_retries(format_open_threads(resp), retries)
})
.await
}
#[tool(
description = "Schedule a reminder that lands in this agent's own inbox at a future \
time (sender will appear as `reminder`). Use for self-paced follow-ups: 'check task \
@ -784,6 +855,23 @@ impl ManagerServer {
.await
}
#[tool(
description = "Hive-wide loose ends: EVERY pending approval + EVERY unanswered \
question across the swarm. Use to scan for stalled coordination questions \
sub-agents asked each other that nobody's answering, approvals stuck waiting on \
the operator, etc. No args. The sub-agent flavour of this tool only returns the \
agent's own threads; the manager flavour is unfiltered."
)]
async fn get_open_threads(&self) -> String {
run_tool_envelope("get_open_threads", String::new(), async move {
let (resp, retries) = self
.dispatch(hive_sh4re::ManagerRequest::GetOpenThreads)
.await;
annotate_retries(format_open_threads(resp), retries)
})
.await
}
#[tool(
description = "Fetch recent journal log lines for a sub-agent container. Useful \
for diagnosing MCP server registration failures, startup crashes, plugin install \
@ -826,8 +914,10 @@ impl ManagerServer {
approval), `kill` (graceful stop), `request_apply_commit` (config change for \
any agent including yourself), `ask` (structured question to the operator or a \
sub-agent non-blocking, answer arrives later as a `question_answered` event), \
`answer` (respond to a `question_asked` event directed at you). The manager's own \
config lives at `/agents/hm1nd/config/agent.nix`."
`answer` (respond to a `question_asked` event directed at you), \
`get_open_threads` (hive-wide loose ends pending approvals + unanswered \
questions across the swarm). The manager's own config lives at \
`/agents/hm1nd/config/agent.nix`."
)]
impl ServerHandler for ManagerServer {}
@ -861,7 +951,7 @@ pub enum Flavor {
#[must_use]
pub fn allowed_mcp_tools(flavor: Flavor) -> Vec<String> {
let names: &[&str] = match flavor {
Flavor::Agent => &["send", "recv", "ask", "answer", "remind"],
Flavor::Agent => &["send", "recv", "ask", "answer", "remind", "get_open_threads"],
Flavor::Manager => &[
"send",
"recv",
@ -874,6 +964,7 @@ pub fn allowed_mcp_tools(flavor: Flavor) -> Vec<String> {
"ask",
"answer",
"get_logs",
"get_open_threads",
"remind",
],
};

View file

@ -174,6 +174,12 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
timing,
file_path,
} => handle_remind(coord, agent, message, timing, file_path.as_deref()),
AgentRequest::GetOpenThreads => match crate::open_threads::for_agent(coord, agent) {
Ok(threads) => AgentResponse::OpenThreads { threads },
Err(e) => AgentResponse::Err {
message: format!("{e:#}"),
},
},
}
}

View file

@ -23,6 +23,7 @@ mod limits;
mod manager_server;
mod meta;
mod migrate;
mod open_threads;
mod operator_questions;
mod questions;
mod reminder_scheduler;

View file

@ -329,6 +329,12 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResp
},
}
}
ManagerRequest::GetOpenThreads => match crate::open_threads::hive_wide(coord) {
Ok(threads) => ManagerResponse::OpenThreads { threads },
Err(e) => ManagerResponse::Err {
message: format!("{e:#}"),
},
},
}
}

View file

@ -0,0 +1,129 @@
//! Loose-ends aggregator. Walks the `approvals` + `operator_questions`
//! tables once per call and assembles a `Vec<OpenThread>` for either
//! a single agent (`for_agent`) or the whole hive (`hive_wide`). Both
//! `AgentRequest::GetOpenThreads` and `ManagerRequest::GetOpenThreads`
//! land here so the routing logic + age-seconds derivation stay in
//! one place.
//!
//! Call frequency is low (an agent doing self-introspection between
//! turns), so the sweep happens fresh every time — no caching, no
//! mutation events. If the sweep ever shows up in a profile, the
//! sqlite queries already filter on the same indexes
//! (`idx_approvals_pending` + `idx_operator_questions_pending`) that
//! the dashboard uses, so the bottleneck would be json
//! (de)serialisation, not the read.
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::Result;
use hive_sh4re::{MANAGER_AGENT, OpenThread};
use crate::coordinator::Coordinator;
/// Open threads pending against `agent`:
/// - pending approvals where this agent is the submitter (only ever
/// true for the manager — sub-agents don't submit approvals — but
/// we keep the rule per-agent so the manager's MCP surface gets
/// the same shape via a different code path);
/// - unanswered questions where `agent` is the asker (waiting on
/// someone) OR the target (owes a reply).
///
/// Newest-first within each kind, approvals before questions.
pub fn for_agent(coord: &Coordinator, agent: &str) -> Result<Vec<OpenThread>> {
let now = now_unix();
let mut out = Vec::new();
// Approvals are only submitted by the manager today. When that
// expands (e.g. sub-agents propose changes to their own configs),
// teach the approvals table to track the submitter and filter
// here on that column — for now MANAGER_AGENT == sole submitter.
if agent == MANAGER_AGENT {
for a in coord.approvals.pending()? {
out.push(OpenThread::Approval {
id: a.id,
agent: a.agent,
commit_ref: a.commit_ref,
description: a.description,
age_seconds: saturating_age(now, a.requested_at),
});
}
}
for q in coord.questions.pending_all()? {
let role_match = q.asker == agent || q.target.as_deref() == Some(agent);
if !role_match {
continue;
}
out.push(OpenThread::Question {
id: q.id,
asker: q.asker,
target: q.target,
question: q.question,
age_seconds: saturating_age(now, q.asked_at),
});
}
Ok(out)
}
/// Hive-wide loose-ends view: EVERY pending approval + EVERY
/// unanswered question. Manager surface only; sub-agents can't see
/// each other's threads via the agent surface (`for_agent` filters
/// by name).
pub fn hive_wide(coord: &Coordinator) -> Result<Vec<OpenThread>> {
let now = now_unix();
let mut out = Vec::new();
for a in coord.approvals.pending()? {
out.push(OpenThread::Approval {
id: a.id,
agent: a.agent,
commit_ref: a.commit_ref,
description: a.description,
age_seconds: saturating_age(now, a.requested_at),
});
}
for q in coord.questions.pending_all()? {
out.push(OpenThread::Question {
id: q.id,
asker: q.asker,
target: q.target,
question: q.question,
age_seconds: saturating_age(now, q.asked_at),
});
}
Ok(out)
}
fn saturating_age(now: i64, then: i64) -> u64 {
let delta = now.saturating_sub(then);
u64::try_from(delta).unwrap_or(0)
}
fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn saturating_age_handles_clock_back_step() {
// `now` < `then`: caller's clock went backwards between rows.
// We saturate to 0 rather than returning a negative or
// wrapping around to ~u64::MAX (which would render as "27
// billion years ago" in the wake prompt).
assert_eq!(saturating_age(100, 200), 0);
}
#[test]
fn saturating_age_normal_case() {
assert_eq!(saturating_age(1_000_000, 999_990), 10);
}
#[test]
fn saturating_age_zero_when_equal() {
assert_eq!(saturating_age(42, 42), 0);
}
}

View file

@ -185,6 +185,46 @@ pub enum ReminderTiming {
At { unix_timestamp: i64 },
}
/// One row in the response to `GetOpenThreads`. Tagged enum so new
/// thread kinds (forge PRs, long-running approvals from a privileged
/// bot, etc) can land later without breaking existing handlers. The
/// caller (claude in the agent harness) is expected to render these
/// as a short bulleted list — the per-row fields are all the context
/// needed without a follow-up fetch.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum OpenThread {
/// A pending approval. For agent-flavour `GetOpenThreads` calls
/// this only surfaces when the agent itself is the manager
/// (sub-agents don't submit approvals). For manager-flavour calls
/// it lists every pending approval in the swarm. `agent` is the
/// affected agent (target of the spawn / config commit).
Approval {
id: i64,
agent: String,
commit_ref: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
description: Option<String>,
/// Wall-clock seconds since `requested_at`. Saturates at zero on
/// any clock anomaly (back-step etc).
age_seconds: u64,
},
/// An unanswered question. For agent-flavour calls: only threads
/// where the agent is `asker` OR `target`. For manager-flavour
/// calls: every unanswered question in the swarm. `target = None`
/// means the question is addressed to the operator (dashboard
/// path); `Some(agent)` is a peer-to-peer thread.
Question {
id: i64,
asker: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
target: Option<String>,
question: String,
/// Wall-clock seconds since `asked_at`. Saturates at zero.
age_seconds: u64,
},
}
/// Requests on a per-agent socket. The agent's identity is the socket
/// it came in on; `Send.from` is filled in by the server, not the client.
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -263,6 +303,12 @@ pub enum AgentRequest {
#[serde(default)]
file_path: Option<String>,
},
/// Loose-ends view: pending approvals + unanswered questions
/// pending against THIS agent. Approvals only surface if this
/// agent submitted them (which only ever happens for the
/// manager); questions surface where the agent is `asker` or
/// `target`. Cheap O(n) sweep server-side — no caching.
GetOpenThreads,
}
/// Responses on a per-agent socket.
@ -284,6 +330,9 @@ pub enum AgentResponse {
/// `Ask` result: the queued question id. The answer lands later
/// as `HelperEvent::QuestionAnswered` in this agent's inbox.
QuestionQueued { id: i64 },
/// `GetOpenThreads` result: list of loose ends pending against
/// this agent. Ordered newest-first within each kind.
OpenThreads { threads: Vec<OpenThread> },
}
// -----------------------------------------------------------------------------
@ -541,6 +590,12 @@ pub enum ManagerRequest {
#[serde(default)]
file_path: Option<String>,
},
/// Hive-wide loose-ends view: EVERY pending approval + EVERY
/// unanswered question in the swarm. Used by the manager to scan
/// for stalled coordination — the per-agent equivalent on the
/// sub-agent surface is `AgentRequest::GetOpenThreads` which
/// only returns rows where the agent itself is asker / target.
GetOpenThreads,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -573,4 +628,10 @@ pub enum ManagerResponse {
Logs {
content: String,
},
/// `GetOpenThreads` result: hive-wide loose ends (approvals +
/// unanswered questions). Same `OpenThread` variants as the
/// agent surface; the manager's view is unfiltered.
OpenThreads {
threads: Vec<OpenThread>,
},
}