hyperhive/hive-c0re/src/open_threads.rs

129 lines
4.4 KiB
Rust

//! Loose-ends aggregator. Walks the `approvals` + `operator_questions`
//! tables once per call and assembles a `Vec<OpenThread>` for either
//! a single agent (`for_agent`) or the whole hive (`hive_wide`). Both
//! `AgentRequest::GetOpenThreads` and `ManagerRequest::GetOpenThreads`
//! land here so the routing logic + age-seconds derivation stay in
//! one place.
//!
//! Call frequency is low (an agent doing self-introspection between
//! turns), so the sweep happens fresh every time — no caching, no
//! mutation events. If the sweep ever shows up in a profile, the
//! sqlite queries already filter on the same indexes
//! (`idx_approvals_pending` + `idx_operator_questions_pending`) that
//! the dashboard uses, so the bottleneck would be json
//! (de)serialisation, not the read.
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::Result;
use hive_sh4re::{MANAGER_AGENT, OpenThread};
use crate::coordinator::Coordinator;
/// Open threads pending against `agent`:
/// - pending approvals where this agent is the submitter (only ever
/// true for the manager — sub-agents don't submit approvals — but
/// we keep the rule per-agent so the manager's MCP surface gets
/// the same shape via a different code path);
/// - unanswered questions where `agent` is the asker (waiting on
/// someone) OR the target (owes a reply).
///
/// Newest-first within each kind, approvals before questions.
pub fn for_agent(coord: &Coordinator, agent: &str) -> Result<Vec<OpenThread>> {
let now = now_unix();
let mut out = Vec::new();
// Approvals are only submitted by the manager today. When that
// expands (e.g. sub-agents propose changes to their own configs),
// teach the approvals table to track the submitter and filter
// here on that column — for now MANAGER_AGENT == sole submitter.
if agent == MANAGER_AGENT {
for a in coord.approvals.pending()? {
out.push(OpenThread::Approval {
id: a.id,
agent: a.agent,
commit_ref: a.commit_ref,
description: a.description,
age_seconds: saturating_age(now, a.requested_at),
});
}
}
for q in coord.questions.pending_all()? {
let role_match = q.asker == agent || q.target.as_deref() == Some(agent);
if !role_match {
continue;
}
out.push(OpenThread::Question {
id: q.id,
asker: q.asker,
target: q.target,
question: q.question,
age_seconds: saturating_age(now, q.asked_at),
});
}
Ok(out)
}
/// Hive-wide loose-ends view: EVERY pending approval + EVERY
/// unanswered question. Manager surface only; sub-agents can't see
/// each other's threads via the agent surface (`for_agent` filters
/// by name).
pub fn hive_wide(coord: &Coordinator) -> Result<Vec<OpenThread>> {
let now = now_unix();
let mut out = Vec::new();
for a in coord.approvals.pending()? {
out.push(OpenThread::Approval {
id: a.id,
agent: a.agent,
commit_ref: a.commit_ref,
description: a.description,
age_seconds: saturating_age(now, a.requested_at),
});
}
for q in coord.questions.pending_all()? {
out.push(OpenThread::Question {
id: q.id,
asker: q.asker,
target: q.target,
question: q.question,
age_seconds: saturating_age(now, q.asked_at),
});
}
Ok(out)
}
fn saturating_age(now: i64, then: i64) -> u64 {
let delta = now.saturating_sub(then);
u64::try_from(delta).unwrap_or(0)
}
fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_secs()).ok())
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn saturating_age_handles_clock_back_step() {
// `now` < `then`: caller's clock went backwards between rows.
// We saturate to 0 rather than returning a negative or
// wrapping around to ~u64::MAX (which would render as "27
// billion years ago" in the wake prompt).
assert_eq!(saturating_age(100, 200), 0);
}
#[test]
fn saturating_age_normal_case() {
assert_eq!(saturating_age(1_000_000, 999_990), 10);
}
#[test]
fn saturating_age_zero_when_equal() {
assert_eq!(saturating_age(42, 42), 0);
}
}