190 lines
7.7 KiB
Rust
190 lines
7.7 KiB
Rust
//! Shared dispatch helpers for the `Ask` / `Answer` flow. Both the
|
|
//! agent socket and the manager socket call into here so the routing
|
|
//! semantics — recipient = operator vs. peer agent, answerer
|
|
//! authorisation, asker-notification — only live in one place.
|
|
//!
|
|
//! Routing rules at a glance:
|
|
//!
|
|
//! - `Ask { to: None | Some("operator") }` → stored with `target = NULL`;
|
|
//! the dashboard's `pending()` query surfaces it; operator answers
|
|
//! via the dashboard.
|
|
//! - `Ask { to: Some(<agent>) }` → stored with `target = <agent>`;
|
|
//! a `HelperEvent::QuestionAsked` is pushed into `<agent>`'s
|
|
//! inbox so they can `Answer { id, answer }` on their own socket.
|
|
//! - `Answer { id, answer }` → permission-checked in
|
|
//! `OperatorQuestions::answer` (only the target agent or the
|
|
//! operator can answer; both paths fire the same
|
|
//! `QuestionAnswered` event to the asker).
|
|
|
|
use std::sync::Arc;
|
|
|
|
use crate::coordinator::Coordinator;
|
|
use crate::limits;
|
|
use crate::manager_server::spawn_question_watchdog;
|
|
|
|
/// Cap on how long an asker can demand an answer before the watchdog
|
|
/// auto-resolves with `[expired]`. Six hours mirrors typical agent
|
|
/// session lifetimes — beyond that an unanswered question is
|
|
/// effectively a dead thread and should be re-asked, not blocked on.
|
|
const MAX_TTL_SECONDS: u64 = 6 * 60 * 60;
|
|
|
|
/// Handle either surface's `Ask` request. Returns the queued
|
|
/// question id on success or a caller-ready error string. Caller is
|
|
/// responsible for wrapping in the matching `*Response::Err` /
|
|
/// `QuestionQueued` variant.
|
|
pub fn handle_ask(
|
|
coord: &Arc<Coordinator>,
|
|
asker: &str,
|
|
question: &str,
|
|
options: &[String],
|
|
multi: bool,
|
|
ttl_seconds: Option<u64>,
|
|
to: Option<&str>,
|
|
) -> Result<i64, String> {
|
|
limits::check_size("question", question)?;
|
|
// Normalise `Some("operator")` → None so the storage layer
|
|
// only has to think about NULL vs. non-NULL targets, not
|
|
// "is this string the operator?".
|
|
let target = match to {
|
|
None => None,
|
|
Some(t) if t == hive_sh4re::OPERATOR_RECIPIENT => None,
|
|
Some("") => {
|
|
return Err("ask: `to` cannot be empty (omit it for the operator path)".to_owned());
|
|
}
|
|
Some(t) if t == asker => {
|
|
return Err("ask: cannot ask yourself a question (would loop forever)".to_owned());
|
|
}
|
|
Some(t) => Some(t),
|
|
};
|
|
let ttl = ttl_seconds.map(|s| s.min(MAX_TTL_SECONDS));
|
|
let deadline_at = ttl.and_then(|s| {
|
|
let now = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.ok()
|
|
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
|
.unwrap_or(0);
|
|
i64::try_from(s).ok().map(|s| now + s)
|
|
});
|
|
let id = coord
|
|
.questions
|
|
.submit(asker, question, options, multi, deadline_at, target)
|
|
.map_err(|e| format!("{e:#}"))?;
|
|
tracing::info!(%id, %asker, ?target, ?deadline_at, "question queued");
|
|
// Agent-targeted questions need to wake the recipient — drop a
|
|
// QuestionAsked event into their inbox so the answerer doesn't
|
|
// have to poll. Operator-targeted questions show up on the
|
|
// dashboard's pending pane via `pending()` instead, plus a
|
|
// `QuestionAdded` dashboard event so the browser updates live.
|
|
if let Some(target_agent) = target {
|
|
coord.notify_agent(
|
|
target_agent,
|
|
&hive_sh4re::HelperEvent::QuestionAsked {
|
|
id,
|
|
asker: asker.to_owned(),
|
|
question: question.to_owned(),
|
|
options: options.to_vec(),
|
|
multi,
|
|
},
|
|
);
|
|
}
|
|
// Always fire on the dashboard channel — both operator-targeted
|
|
// and peer threads now surface in the dashboard's questions pane.
|
|
coord.emit_question_added(id, asker, question, options, multi, deadline_at, target);
|
|
if let Some(t) = ttl {
|
|
spawn_question_watchdog(coord, id, t);
|
|
}
|
|
Ok(id)
|
|
}
|
|
|
|
/// Handle either surface's `Answer` request. Returns `Ok(())` on
|
|
/// success or a caller-ready error string. Authorisation lives in
|
|
/// `OperatorQuestions::answer` — we only have to wire the result
|
|
/// back to the asker as a `QuestionAnswered` event.
|
|
pub fn handle_answer(
|
|
coord: &Arc<Coordinator>,
|
|
answerer: &str,
|
|
id: i64,
|
|
answer: &str,
|
|
) -> Result<(), String> {
|
|
limits::check_size("answer", answer)?;
|
|
let (question, asker, target) = coord
|
|
.questions
|
|
.answer(id, answer, answerer)
|
|
.map_err(|e| format!("{e:#}"))?;
|
|
tracing::info!(%id, %answerer, %asker, "question answered");
|
|
// Use answerer as the broker `from` so the asker's terminal shows
|
|
// the real name (agent or "operator") instead of "system".
|
|
coord.notify_agent_from(
|
|
answerer,
|
|
&asker,
|
|
&hive_sh4re::HelperEvent::QuestionAnswered {
|
|
id,
|
|
question,
|
|
answer: answer.to_owned(),
|
|
answerer: answerer.to_owned(),
|
|
},
|
|
);
|
|
// Dashboard surfaces both operator-targeted and peer threads;
|
|
// emit unconditionally so the derived store moves the row.
|
|
// `cancelled = false` because this path is a real answer (the
|
|
// operator-cancel button goes through `post_cancel_question`).
|
|
coord.emit_question_resolved(id, answer, answerer, false, target.as_deref());
|
|
Ok(())
|
|
}
|
|
|
|
/// Handle `CancelLooseEnd` from either surface. Dispatches by kind to
|
|
/// either `OperatorQuestions::cancel` or `Broker::cancel_reminder_as`,
|
|
/// both of which do their own auth check (canceller == owner /
|
|
/// asker, or `operator`, or `manager`). On question cancel, fires
|
|
/// the `QuestionAnswered` event back to the asker so the harness
|
|
/// loop can react (mirrors the operator-cancel dashboard path).
|
|
pub fn handle_cancel_loose_end(
|
|
coord: &Arc<Coordinator>,
|
|
canceller: &str,
|
|
kind: hive_sh4re::CancelLooseEndKind,
|
|
id: i64,
|
|
) -> Result<(), String> {
|
|
match kind {
|
|
hive_sh4re::CancelLooseEndKind::Question => {
|
|
let (question, asker, target) = coord
|
|
.questions
|
|
.cancel(id, canceller)
|
|
.map_err(|e| format!("{e:#}"))?;
|
|
let sentinel = format!("[cancelled by {canceller}]");
|
|
tracing::info!(%id, %canceller, %asker, "question cancelled");
|
|
// Only notify the asker if they didn't cancel it themselves.
|
|
// Self-cancels are already known to the canceller — sending
|
|
// a QuestionAnswered back would cause the harness to process
|
|
// its own cancel as an incoming answer.
|
|
if asker != canceller {
|
|
coord.notify_agent_from(
|
|
canceller,
|
|
&asker,
|
|
&hive_sh4re::HelperEvent::QuestionAnswered {
|
|
id,
|
|
question,
|
|
answer: sentinel.clone(),
|
|
answerer: canceller.to_owned(),
|
|
},
|
|
);
|
|
}
|
|
coord.emit_question_resolved(id, &sentinel, canceller, true, target.as_deref());
|
|
Ok(())
|
|
}
|
|
hive_sh4re::CancelLooseEndKind::Reminder => {
|
|
let owner = coord
|
|
.broker
|
|
.cancel_reminder_as(id, canceller)
|
|
.map_err(|e| format!("{e:#}"))?;
|
|
tracing::info!(%id, %canceller, %owner, "reminder cancelled");
|
|
Ok(())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Real coverage needs a `Coordinator` fixture (broker + sqlite +
|
|
// in-memory questions). Skipped for now — the normalisation branches
|
|
// in `handle_ask` are short enough to read line-by-line; once we add
|
|
// a coord test harness, drop integration tests here for: self-target
|
|
// rejection, operator-string passthrough, agent-to-agent QuestionAsked
|
|
// emission, and `Answer` authorisation.
|