cancel_thread: new mcp tool — unify reminder + question cancel on both surfaces
This commit is contained in:
parent
fcd407da11
commit
b1d0a62cb9
11 changed files with 331 additions and 25 deletions
|
|
@ -193,6 +193,13 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
|
|||
role: "agent".to_owned(),
|
||||
hyperhive_rev: crate::auto_update::current_flake_rev(&coord.hyperhive_flake),
|
||||
},
|
||||
AgentRequest::CancelThread { kind, id } => crate::questions::handle_cancel_thread(
|
||||
coord, agent, *kind, *id,
|
||||
)
|
||||
.map_or_else(
|
||||
|message| AgentResponse::Err { message },
|
||||
|()| AgentResponse::Ok,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -405,6 +405,40 @@ impl Broker {
|
|||
Ok(n)
|
||||
}
|
||||
|
||||
/// Cancel a pending reminder on behalf of `canceller`. Returns
|
||||
/// the owner agent name on success (handy for logging). Auth
|
||||
/// rules mirror `OperatorQuestions::cancel`: owner, operator, or
|
||||
/// manager.
|
||||
pub fn cancel_reminder_as(&self, id: i64, canceller: &str) -> Result<String> {
|
||||
let conn = self.conn.lock().unwrap();
|
||||
let owner: Option<String> = conn
|
||||
.query_row(
|
||||
"SELECT agent FROM reminders WHERE id = ?1 AND sent_at IS NULL",
|
||||
params![id],
|
||||
|row| row.get(0),
|
||||
)
|
||||
.optional()?;
|
||||
let Some(owner) = owner else {
|
||||
anyhow::bail!("reminder {id} not pending (already delivered or unknown)");
|
||||
};
|
||||
let authorised = canceller == owner
|
||||
|| canceller == hive_sh4re::OPERATOR_RECIPIENT
|
||||
|| canceller == hive_sh4re::MANAGER_AGENT;
|
||||
if !authorised {
|
||||
anyhow::bail!(
|
||||
"reminder {id}: '{canceller}' not allowed to cancel (owner = '{owner}')"
|
||||
);
|
||||
}
|
||||
let n = conn.execute(
|
||||
"DELETE FROM reminders WHERE id = ?1 AND sent_at IS NULL",
|
||||
params![id],
|
||||
)?;
|
||||
if n == 0 {
|
||||
anyhow::bail!("reminder {id} vanished between auth check and delete");
|
||||
}
|
||||
Ok(owner)
|
||||
}
|
||||
|
||||
/// Get up to `limit` due reminders across all agents in a single query.
|
||||
/// Returns `(agent, id, message, file_path)` tuples. Pass a small limit
|
||||
/// (e.g. 100) so a burst of overdue reminders doesn't flood the broker
|
||||
|
|
|
|||
|
|
@ -348,6 +348,16 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResp
|
|||
role: "manager".to_owned(),
|
||||
hyperhive_rev: crate::auto_update::current_flake_rev(&coord.hyperhive_flake),
|
||||
},
|
||||
ManagerRequest::CancelThread { kind, id } => crate::questions::handle_cancel_thread(
|
||||
coord,
|
||||
MANAGER_AGENT,
|
||||
*kind,
|
||||
*id,
|
||||
)
|
||||
.map_or_else(
|
||||
|message| ManagerResponse::Err { message },
|
||||
|()| ManagerResponse::Ok,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -26,9 +26,12 @@ use crate::coordinator::Coordinator;
|
|||
/// we keep the rule per-agent so the manager's MCP surface gets
|
||||
/// the same shape via a different code path);
|
||||
/// - unanswered questions where `agent` is the asker (waiting on
|
||||
/// someone) OR the target (owes a reply).
|
||||
/// someone) OR the target (owes a reply);
|
||||
/// - pending reminders this agent scheduled (`owner == self`).
|
||||
///
|
||||
/// Newest-first within each kind, approvals before questions.
|
||||
/// Ordered approvals → questions → reminders within the returned
|
||||
/// vector. Within each kind, source-of-truth ordering (sqlite's
|
||||
/// `pending()` queries return newest-first within their indexes).
|
||||
pub fn for_agent(coord: &Coordinator, agent: &str) -> Result<Vec<OpenThread>> {
|
||||
let now = now_unix();
|
||||
let mut out = Vec::new();
|
||||
|
|
@ -60,13 +63,25 @@ pub fn for_agent(coord: &Coordinator, agent: &str) -> Result<Vec<OpenThread>> {
|
|||
age_seconds: saturating_age(now, q.asked_at),
|
||||
});
|
||||
}
|
||||
for r in coord.broker.list_pending_reminders()? {
|
||||
if r.agent != agent {
|
||||
continue;
|
||||
}
|
||||
out.push(OpenThread::Reminder {
|
||||
id: r.id,
|
||||
owner: r.agent,
|
||||
message: r.message,
|
||||
due_at: r.due_at,
|
||||
age_seconds: saturating_age(now, r.created_at),
|
||||
});
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Hive-wide loose-ends view: EVERY pending approval + EVERY
|
||||
/// unanswered question. Manager surface only; sub-agents can't see
|
||||
/// each other's threads via the agent surface (`for_agent` filters
|
||||
/// by name).
|
||||
/// unanswered question + EVERY pending reminder. Manager surface
|
||||
/// only; sub-agents can't see each other's threads via the agent
|
||||
/// surface (`for_agent` filters by name).
|
||||
pub fn hive_wide(coord: &Coordinator) -> Result<Vec<OpenThread>> {
|
||||
let now = now_unix();
|
||||
let mut out = Vec::new();
|
||||
|
|
@ -88,6 +103,15 @@ pub fn hive_wide(coord: &Coordinator) -> Result<Vec<OpenThread>> {
|
|||
age_seconds: saturating_age(now, q.asked_at),
|
||||
});
|
||||
}
|
||||
for r in coord.broker.list_pending_reminders()? {
|
||||
out.push(OpenThread::Reminder {
|
||||
id: r.id,
|
||||
owner: r.agent,
|
||||
message: r.message,
|
||||
due_at: r.due_at,
|
||||
age_seconds: saturating_age(now, r.created_at),
|
||||
});
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -196,6 +196,54 @@ impl OperatorQuestions {
|
|||
Ok((question, asker, target))
|
||||
}
|
||||
|
||||
/// Cancel a pending question on behalf of `canceller`. Returns
|
||||
/// `(question, asker, target)` so the caller can fire the usual
|
||||
/// `QuestionAnswered` event to the asker with a `[cancelled by
|
||||
/// <canceller>]` sentinel.
|
||||
///
|
||||
/// Auth: the canceller must be one of:
|
||||
/// - the original asker (an agent withdrawing their own ask),
|
||||
/// - the operator (already covered by the existing `answer` path
|
||||
/// but allowed here too for symmetry / dashboard cancel),
|
||||
/// - the manager (privileged hive-wide cleanup).
|
||||
///
|
||||
/// Not the target — that's covered by `answer` (responding with
|
||||
/// an actual reply, sentinel or otherwise).
|
||||
pub fn cancel(
|
||||
&self,
|
||||
id: i64,
|
||||
canceller: &str,
|
||||
) -> Result<(String, String, Option<String>)> {
|
||||
let conn = self.conn.lock().unwrap();
|
||||
let row: Option<(String, String, Option<String>, Option<i64>)> = conn
|
||||
.query_row(
|
||||
"SELECT question, asker, target, answered_at FROM operator_questions WHERE id = ?1",
|
||||
params![id],
|
||||
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
|
||||
)
|
||||
.optional()?;
|
||||
let Some((question, asker, target, answered_at)) = row else {
|
||||
bail!("question {id} not found");
|
||||
};
|
||||
if answered_at.is_some() {
|
||||
bail!("question {id} already answered/cancelled");
|
||||
}
|
||||
let authorised = canceller == asker
|
||||
|| canceller == hive_sh4re::OPERATOR_RECIPIENT
|
||||
|| canceller == hive_sh4re::MANAGER_AGENT;
|
||||
if !authorised {
|
||||
bail!(
|
||||
"question {id}: '{canceller}' not allowed to cancel (asker = '{asker}')"
|
||||
);
|
||||
}
|
||||
let sentinel = format!("[cancelled by {canceller}]");
|
||||
conn.execute(
|
||||
"UPDATE operator_questions SET answer = ?1, answered_at = ?2 WHERE id = ?3",
|
||||
params![sentinel, now_unix(), id],
|
||||
)?;
|
||||
Ok((question, asker, target))
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn get(&self, id: i64) -> Result<Option<OpQuestion>> {
|
||||
let conn = self.conn.lock().unwrap();
|
||||
|
|
|
|||
|
|
@ -129,6 +129,49 @@ pub fn handle_answer(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle `CancelThread` from either surface. Dispatches by kind to
|
||||
/// either `OperatorQuestions::cancel` or `Broker::cancel_reminder_as`,
|
||||
/// both of which do their own auth check (canceller == owner /
|
||||
/// asker, or `operator`, or `manager`). On question cancel, fires
|
||||
/// the `QuestionAnswered` event back to the asker so the harness
|
||||
/// loop can react (mirrors the operator-cancel dashboard path).
|
||||
pub fn handle_cancel_thread(
|
||||
coord: &Arc<Coordinator>,
|
||||
canceller: &str,
|
||||
kind: hive_sh4re::CancelThreadKind,
|
||||
id: i64,
|
||||
) -> Result<(), String> {
|
||||
match kind {
|
||||
hive_sh4re::CancelThreadKind::Question => {
|
||||
let (question, asker, target) = coord
|
||||
.questions
|
||||
.cancel(id, canceller)
|
||||
.map_err(|e| format!("{e:#}"))?;
|
||||
let sentinel = format!("[cancelled by {canceller}]");
|
||||
tracing::info!(%id, %canceller, %asker, "question cancelled");
|
||||
coord.notify_agent(
|
||||
&asker,
|
||||
&hive_sh4re::HelperEvent::QuestionAnswered {
|
||||
id,
|
||||
question,
|
||||
answer: sentinel.clone(),
|
||||
answerer: canceller.to_owned(),
|
||||
},
|
||||
);
|
||||
coord.emit_question_resolved(id, &sentinel, canceller, true, target.as_deref());
|
||||
Ok(())
|
||||
}
|
||||
hive_sh4re::CancelThreadKind::Reminder => {
|
||||
let owner = coord
|
||||
.broker
|
||||
.cancel_reminder_as(id, canceller)
|
||||
.map_err(|e| format!("{e:#}"))?;
|
||||
tracing::info!(%id, %canceller, %owner, "reminder cancelled");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Real coverage needs a `Coordinator` fixture (broker + sqlite +
|
||||
// in-memory questions). Skipped for now — the normalisation branches
|
||||
// in `handle_ask` are short enough to read line-by-line; once we add
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue