ask_operator: any agent can call it, answer routes by asker
new AgentRequest::AskOperator + AgentResponse::QuestionQueued on
the per-agent socket — same shape as the manager flavor, agent
gets the same wire surface (still uses the same operator_questions
table). agent_server::dispatch wires AskOperator through coord
.questions.submit(agent, ...) so the row's asker is the sub-agent
name; the ttl watchdog already in manager_server gets shared and
spawn_question_watchdog goes pub.
answer routing: operator_questions::answer now returns (question,
asker). post_answer_question + post_cancel_question + the watchdog
fire OperatorAnswered through new coord.notify_agent(asker, event)
instead of always notify_manager — the event lands in whichever
agent originally asked. notify_manager is now a thin wrapper.
agent socket plumbing: agent_server::start takes Arc<Coordinator>
instead of Arc<Broker> so dispatch has access to questions +
notify path; coordinator::{register_agent,ensure_runtime} take
self: &Arc<Self>. mcp::AgentServer grows the ask_operator tool;
allowed_mcp_tools(Agent) adds it; prompts/agent.md replaces the
'message the manager to ask the operator' guidance with the
direct tool description.
This commit is contained in:
parent
6b3ef4549c
commit
2a6d084718
9 changed files with 156 additions and 43 deletions
|
|
@ -11,14 +11,18 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
|||
use tokio::net::{UnixListener, UnixStream};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::broker::Broker;
|
||||
use crate::coordinator::Coordinator;
|
||||
|
||||
pub struct AgentSocket {
|
||||
pub path: PathBuf,
|
||||
pub handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
pub fn start(agent: &str, socket_path: &Path, broker: Arc<Broker>) -> Result<AgentSocket> {
|
||||
pub fn start(
|
||||
agent: &str,
|
||||
socket_path: &Path,
|
||||
coord: Arc<Coordinator>,
|
||||
) -> Result<AgentSocket> {
|
||||
let agent = agent.to_owned();
|
||||
if let Some(parent) = socket_path.parent() {
|
||||
std::fs::create_dir_all(parent)
|
||||
|
|
@ -37,9 +41,9 @@ pub fn start(agent: &str, socket_path: &Path, broker: Arc<Broker>) -> Result<Age
|
|||
match listener.accept().await {
|
||||
Ok((stream, _)) => {
|
||||
let agent = agent.clone();
|
||||
let broker = broker.clone();
|
||||
let coord = coord.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = serve(stream, agent, broker).await {
|
||||
if let Err(e) = serve(stream, agent, coord).await {
|
||||
tracing::warn!(error = ?e, "agent connection failed");
|
||||
}
|
||||
});
|
||||
|
|
@ -54,7 +58,7 @@ pub fn start(agent: &str, socket_path: &Path, broker: Arc<Broker>) -> Result<Age
|
|||
Ok(AgentSocket { path, handle })
|
||||
}
|
||||
|
||||
async fn serve(stream: UnixStream, agent: String, broker: Arc<Broker>) -> Result<()> {
|
||||
async fn serve(stream: UnixStream, agent: String, coord: Arc<Coordinator>) -> Result<()> {
|
||||
let (read, mut write) = stream.into_split();
|
||||
let mut reader = BufReader::new(read);
|
||||
let mut line = String::new();
|
||||
|
|
@ -65,7 +69,7 @@ async fn serve(stream: UnixStream, agent: String, broker: Arc<Broker>) -> Result
|
|||
return Ok(());
|
||||
}
|
||||
let resp = match serde_json::from_str::<AgentRequest>(line.trim()) {
|
||||
Ok(req) => dispatch(&req, &agent, &broker).await,
|
||||
Ok(req) => dispatch(&req, &agent, &coord).await,
|
||||
Err(e) => AgentResponse::Err {
|
||||
message: format!("parse error: {e}"),
|
||||
},
|
||||
|
|
@ -93,7 +97,8 @@ fn recv_timeout(wait_seconds: Option<u64>) -> std::time::Duration {
|
|||
}
|
||||
}
|
||||
|
||||
async fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse {
|
||||
async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) -> AgentResponse {
|
||||
let broker = &coord.broker;
|
||||
match req {
|
||||
AgentRequest::Send { to, body } => {
|
||||
match broker.send(&Message {
|
||||
|
|
@ -142,5 +147,35 @@ async fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResp
|
|||
message: format!("{e:#}"),
|
||||
},
|
||||
},
|
||||
AgentRequest::AskOperator {
|
||||
question,
|
||||
options,
|
||||
multi,
|
||||
ttl_seconds,
|
||||
} => {
|
||||
let deadline_at = ttl_seconds.and_then(|s| {
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.ok()
|
||||
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
||||
.unwrap_or(0);
|
||||
i64::try_from(s).ok().map(|s| now + s)
|
||||
});
|
||||
match coord
|
||||
.questions
|
||||
.submit(agent, question, options, *multi, deadline_at)
|
||||
{
|
||||
Ok(id) => {
|
||||
tracing::info!(%id, %agent, ?deadline_at, "agent question queued");
|
||||
if let Some(ttl) = *ttl_seconds {
|
||||
crate::manager_server::spawn_question_watchdog(coord, id, ttl);
|
||||
}
|
||||
AgentResponse::QuestionQueued { id }
|
||||
}
|
||||
Err(e) => AgentResponse::Err {
|
||||
message: format!("{e:#}"),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ impl Coordinator {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn register_agent(&self, name: &str) -> Result<PathBuf> {
|
||||
pub fn register_agent(self: &Arc<Self>, name: &str) -> Result<PathBuf> {
|
||||
// Idempotent: drop any existing listener so re-registration (e.g. on rebuild,
|
||||
// or after a hive-c0re restart cleared /run/hyperhive) gets a fresh socket.
|
||||
self.unregister_agent(name);
|
||||
|
|
@ -90,7 +90,10 @@ impl Coordinator {
|
|||
std::fs::create_dir_all(&agent_dir)
|
||||
.with_context(|| format!("create agent dir {}", agent_dir.display()))?;
|
||||
let socket_path = Self::socket_path(name);
|
||||
let socket = agent_server::start(name, &socket_path, self.broker.clone())?;
|
||||
// Hand the full Coordinator to the per-agent socket — it
|
||||
// needs broker + operator_questions to handle the agent-side
|
||||
// `ask_operator` tool, not just the broker.
|
||||
let socket = agent_server::start(name, &socket_path, self.clone())?;
|
||||
self.agents.lock().unwrap().insert(name.to_owned(), socket);
|
||||
Ok(agent_dir)
|
||||
}
|
||||
|
|
@ -148,6 +151,15 @@ impl Coordinator {
|
|||
/// recognises the sender and parses the body. Best-effort: a serde or
|
||||
/// broker error is logged but does not propagate.
|
||||
pub fn notify_manager(&self, event: &hive_sh4re::HelperEvent) {
|
||||
self.notify_agent(hive_sh4re::MANAGER_AGENT, event);
|
||||
}
|
||||
|
||||
/// Push a `HelperEvent` into an arbitrary agent's inbox. Encoded
|
||||
/// the same way as `notify_manager` (sender = `SYSTEM_SENDER`,
|
||||
/// body = JSON-encoded event). Used to route `OperatorAnswered`
|
||||
/// events back to the agent that called `ask_operator`, not just
|
||||
/// the manager.
|
||||
pub fn notify_agent(&self, agent: &str, event: &hive_sh4re::HelperEvent) {
|
||||
let body = match serde_json::to_string(event) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
|
|
@ -157,10 +169,10 @@ impl Coordinator {
|
|||
};
|
||||
if let Err(e) = self.broker.send(&hive_sh4re::Message {
|
||||
from: hive_sh4re::SYSTEM_SENDER.to_owned(),
|
||||
to: hive_sh4re::MANAGER_AGENT.to_owned(),
|
||||
to: agent.to_owned(),
|
||||
body,
|
||||
}) {
|
||||
tracing::warn!(error = ?e, "failed to push helper event to manager");
|
||||
tracing::warn!(error = ?e, target = %agent, "failed to push helper event");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -185,7 +197,7 @@ impl Coordinator {
|
|||
/// the dir. For sub-agents this is `register_agent` (creates a fresh
|
||||
/// listener bound to `socket_path(name)`). Source directory of the
|
||||
/// `/run/hive/mcp.sock` bind that ends up in `set_nspawn_flags`.
|
||||
pub fn ensure_runtime(&self, name: &str) -> Result<PathBuf> {
|
||||
pub fn ensure_runtime(self: &Arc<Self>, name: &str) -> Result<PathBuf> {
|
||||
if name == crate::lifecycle::MANAGER_NAME {
|
||||
let dir = Self::manager_dir();
|
||||
std::fs::create_dir_all(&dir)
|
||||
|
|
|
|||
|
|
@ -528,15 +528,16 @@ async fn post_answer_question(
|
|||
return error_response("answer: required");
|
||||
}
|
||||
match state.coord.questions.answer(id, answer) {
|
||||
Ok(question) => {
|
||||
tracing::info!(%id, "operator answered question");
|
||||
state
|
||||
.coord
|
||||
.notify_manager(&hive_sh4re::HelperEvent::OperatorAnswered {
|
||||
Ok((question, asker)) => {
|
||||
tracing::info!(%id, %asker, "operator answered question");
|
||||
state.coord.notify_agent(
|
||||
&asker,
|
||||
&hive_sh4re::HelperEvent::OperatorAnswered {
|
||||
id,
|
||||
question,
|
||||
answer: answer.to_owned(),
|
||||
});
|
||||
},
|
||||
);
|
||||
Redirect::to("/").into_response()
|
||||
}
|
||||
Err(e) => error_response(&format!("answer {id} failed: {e:#}")),
|
||||
|
|
@ -555,15 +556,16 @@ async fn post_cancel_question(
|
|||
) -> Response {
|
||||
const SENTINEL: &str = "[cancelled]";
|
||||
match state.coord.questions.answer(id, SENTINEL) {
|
||||
Ok(question) => {
|
||||
tracing::info!(%id, "operator cancelled question");
|
||||
state
|
||||
.coord
|
||||
.notify_manager(&hive_sh4re::HelperEvent::OperatorAnswered {
|
||||
Ok((question, asker)) => {
|
||||
tracing::info!(%id, %asker, "operator cancelled question");
|
||||
state.coord.notify_agent(
|
||||
&asker,
|
||||
&hive_sh4re::HelperEvent::OperatorAnswered {
|
||||
id,
|
||||
question,
|
||||
answer: SENTINEL.to_owned(),
|
||||
});
|
||||
},
|
||||
);
|
||||
Redirect::to("/").into_response()
|
||||
}
|
||||
Err(e) => error_response(&format!("cancel-question {id} failed: {e:#}")),
|
||||
|
|
|
|||
|
|
@ -388,20 +388,23 @@ async fn proposal_modifies(
|
|||
/// helper event so the manager sees a terminal state.
|
||||
const TTL_SENTINEL: &str = "[expired]";
|
||||
|
||||
fn spawn_question_watchdog(coord: &Arc<Coordinator>, id: i64, ttl_secs: u64) {
|
||||
pub fn spawn_question_watchdog(coord: &Arc<Coordinator>, id: i64, ttl_secs: u64) {
|
||||
let coord = coord.clone();
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(ttl_secs)).await;
|
||||
// `answer` returns Err if already resolved — that's the
|
||||
// normal path when the operator responded before the ttl
|
||||
// fired, so no-op silently.
|
||||
if let Ok(question) = coord.questions.answer(id, TTL_SENTINEL) {
|
||||
tracing::info!(%id, "operator question expired (ttl)");
|
||||
coord.notify_manager(&hive_sh4re::HelperEvent::OperatorAnswered {
|
||||
id,
|
||||
question,
|
||||
answer: TTL_SENTINEL.to_owned(),
|
||||
});
|
||||
if let Ok((question, asker)) = coord.questions.answer(id, TTL_SENTINEL) {
|
||||
tracing::info!(%id, %asker, "operator question expired (ttl)");
|
||||
coord.notify_agent(
|
||||
&asker,
|
||||
&hive_sh4re::HelperEvent::OperatorAnswered {
|
||||
id,
|
||||
question,
|
||||
answer: TTL_SENTINEL.to_owned(),
|
||||
},
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -117,17 +117,20 @@ impl OperatorQuestions {
|
|||
}
|
||||
|
||||
/// Mark the question answered. Returns the original question text so the
|
||||
/// caller can include it in any helper event it fires off.
|
||||
pub fn answer(&self, id: i64, answer: &str) -> Result<String> {
|
||||
/// Mark a pending question answered. Returns `(question, asker)`
|
||||
/// so the caller can both echo the question back in a helper
|
||||
/// event AND route that event to whichever agent originally
|
||||
/// asked it.
|
||||
pub fn answer(&self, id: i64, answer: &str) -> Result<(String, String)> {
|
||||
let conn = self.conn.lock().unwrap();
|
||||
let question: Option<(String, Option<i64>)> = conn
|
||||
let row: Option<(String, String, Option<i64>)> = conn
|
||||
.query_row(
|
||||
"SELECT question, answered_at FROM operator_questions WHERE id = ?1",
|
||||
"SELECT question, asker, answered_at FROM operator_questions WHERE id = ?1",
|
||||
params![id],
|
||||
|row| Ok((row.get(0)?, row.get(1)?)),
|
||||
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
|
||||
)
|
||||
.optional()?;
|
||||
let Some((question, answered_at)) = question else {
|
||||
let Some((question, asker, answered_at)) = row else {
|
||||
bail!("question {id} not found");
|
||||
};
|
||||
if answered_at.is_some() {
|
||||
|
|
@ -137,7 +140,7 @@ impl OperatorQuestions {
|
|||
"UPDATE operator_questions SET answer = ?1, answered_at = ?2 WHERE id = ?3",
|
||||
params![answer, now_unix(), id],
|
||||
)?;
|
||||
Ok(question)
|
||||
Ok((question, asker))
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue