ask_operator tool: non-blocking; operator answer arrives as helper event
new mcp tool on the manager surface that queues a question on the
dashboard and returns the question id immediately. operator submits an
answer via /answer-question/<id>; the dashboard fires
HelperEvent::OperatorAnswered { id, question, answer } into the manager
inbox so the next turn picks it up.
also: fix async-form button stuck on spinner after successful submit
(refreshState skipped re-rendering, so the button was never re-enabled).
This commit is contained in:
parent
abfd2cce4b
commit
2770630f33
17 changed files with 426 additions and 79 deletions
|
|
@ -101,8 +101,7 @@ impl Broker {
|
|||
/// inbox view on the dashboard. Caller decides what to show.
|
||||
pub fn recent_for(&self, recipient: &str, limit: u64) -> Result<Vec<InboxRow>> {
|
||||
let conn = self.conn.lock().unwrap();
|
||||
let limit_i =
|
||||
i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX);
|
||||
let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX);
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, sender, body, sent_at
|
||||
FROM messages
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ use anyhow::{Context, Result};
|
|||
use crate::agent_server::{self, AgentSocket};
|
||||
use crate::approvals::Approvals;
|
||||
use crate::broker::Broker;
|
||||
use crate::operator_questions::OperatorQuestions;
|
||||
|
||||
const AGENT_RUNTIME_ROOT: &str = "/run/hyperhive/agents";
|
||||
const MANAGER_RUNTIME_ROOT: &str = "/run/hyperhive/manager";
|
||||
|
|
@ -26,6 +27,7 @@ const APPLIED_STATE_ROOT: &str = "/var/lib/hyperhive/applied";
|
|||
pub struct Coordinator {
|
||||
pub broker: Arc<Broker>,
|
||||
pub approvals: Arc<Approvals>,
|
||||
pub questions: Arc<OperatorQuestions>,
|
||||
/// URL of the hyperhive flake (no fragment). Inlined into per-agent
|
||||
/// `flake.nix` files as `inputs.hyperhive.url`.
|
||||
pub hyperhive_flake: String,
|
||||
|
|
@ -58,9 +60,11 @@ impl Coordinator {
|
|||
pub fn open(db_path: &Path, hyperhive_flake: String, dashboard_port: u16) -> Result<Self> {
|
||||
let broker = Broker::open(db_path).context("open broker")?;
|
||||
let approvals = Approvals::open(db_path).context("open approvals")?;
|
||||
let questions = OperatorQuestions::open(db_path).context("open operator_questions")?;
|
||||
Ok(Self {
|
||||
broker: Arc::new(broker),
|
||||
approvals: Arc::new(approvals),
|
||||
questions: Arc::new(questions),
|
||||
hyperhive_flake,
|
||||
dashboard_port,
|
||||
agents: Mutex::new(HashMap::new()),
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ pub async fn serve(port: u16, coord: Arc<Coordinator>) -> Result<()> {
|
|||
.route("/start/{name}", post(post_start))
|
||||
.route("/rebuild/{name}", post(post_rebuild))
|
||||
.route("/update-all", post(post_update_all))
|
||||
.route("/answer-question/{id}", post(post_answer_question))
|
||||
.route("/request-spawn", post(post_request_spawn))
|
||||
.route("/messages/stream", get(messages_stream))
|
||||
.with_state(AppState { coord });
|
||||
|
|
@ -75,7 +76,10 @@ async fn serve_index() -> impl IntoResponse {
|
|||
}
|
||||
|
||||
async fn serve_css() -> impl IntoResponse {
|
||||
([("content-type", "text/css")], include_str!("../assets/dashboard.css"))
|
||||
(
|
||||
[("content-type", "text/css")],
|
||||
include_str!("../assets/dashboard.css"),
|
||||
)
|
||||
}
|
||||
|
||||
async fn serve_app_js() -> impl IntoResponse {
|
||||
|
|
@ -97,6 +101,11 @@ struct StateSnapshot {
|
|||
/// asynchronously so the operator can see them without watching the
|
||||
/// live panel during a turn.
|
||||
operator_inbox: Vec<crate::broker::InboxRow>,
|
||||
/// Pending operator questions (currently only from the manager).
|
||||
/// `ask_operator` returns immediately with the id; on `/answer-question`
|
||||
/// we mark the row answered and fire `HelperEvent::OperatorAnswered`
|
||||
/// into the manager's inbox.
|
||||
questions: Vec<crate::operator_questions::OpQuestion>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
|
@ -131,10 +140,7 @@ struct ApprovalView {
|
|||
diff_html: Option<String>,
|
||||
}
|
||||
|
||||
async fn api_state(
|
||||
headers: HeaderMap,
|
||||
State(state): State<AppState>,
|
||||
) -> axum::Json<StateSnapshot> {
|
||||
async fn api_state(headers: HeaderMap, State(state): State<AppState>) -> axum::Json<StateSnapshot> {
|
||||
let host = headers
|
||||
.get("host")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
|
|
@ -227,6 +233,7 @@ async fn api_state(
|
|||
.broker
|
||||
.recent_for(hive_sh4re::OPERATOR_RECIPIENT, 50)
|
||||
.unwrap_or_default();
|
||||
let questions = state.coord.questions.pending().unwrap_or_default();
|
||||
|
||||
axum::Json(StateSnapshot {
|
||||
hostname,
|
||||
|
|
@ -236,6 +243,7 @@ async fn api_state(
|
|||
transients,
|
||||
approvals: approval_views,
|
||||
operator_inbox,
|
||||
questions,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -271,6 +279,36 @@ struct RequestSpawnForm {
|
|||
name: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct AnswerForm {
|
||||
answer: String,
|
||||
}
|
||||
|
||||
async fn post_answer_question(
|
||||
State(state): State<AppState>,
|
||||
AxumPath(id): AxumPath<i64>,
|
||||
Form(form): Form<AnswerForm>,
|
||||
) -> Response {
|
||||
let answer = form.answer.trim();
|
||||
if answer.is_empty() {
|
||||
return error_response("answer: required");
|
||||
}
|
||||
match state.coord.questions.answer(id, answer) {
|
||||
Ok(question) => {
|
||||
tracing::info!(%id, "operator answered question");
|
||||
state
|
||||
.coord
|
||||
.notify_manager(&hive_sh4re::HelperEvent::OperatorAnswered {
|
||||
id,
|
||||
question,
|
||||
answer: answer.to_owned(),
|
||||
});
|
||||
Redirect::to("/").into_response()
|
||||
}
|
||||
Err(e) => error_response(&format!("answer {id} failed: {e:#}")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn post_request_spawn(
|
||||
State(state): State<AppState>,
|
||||
Form(form): Form<RequestSpawnForm>,
|
||||
|
|
@ -325,7 +363,10 @@ async fn post_kill(State(state): State<AppState>, AxumPath(name): AxumPath<Strin
|
|||
}
|
||||
}
|
||||
|
||||
async fn post_restart(State(_state): State<AppState>, AxumPath(name): AxumPath<String>) -> Response {
|
||||
async fn post_restart(
|
||||
State(_state): State<AppState>,
|
||||
AxumPath(name): AxumPath<String>,
|
||||
) -> Response {
|
||||
let logical = strip_container_prefix(&name);
|
||||
match lifecycle::restart(&logical).await {
|
||||
Ok(()) => Redirect::to("/").into_response(),
|
||||
|
|
@ -368,7 +409,10 @@ async fn post_update_all(State(state): State<AppState>) -> Response {
|
|||
if errors.is_empty() {
|
||||
Redirect::to("/").into_response()
|
||||
} else {
|
||||
error_response(&format!("update-all partial failure:\n{}", errors.join("\n")))
|
||||
error_response(&format!(
|
||||
"update-all partial failure:\n{}",
|
||||
errors.join("\n")
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -393,8 +437,6 @@ fn error_response(message: &str) -> Response {
|
|||
(StatusCode::INTERNAL_SERVER_ERROR, message.to_owned()).into_response()
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// Filter out approvals whose agent state dir was wiped out from under us
|
||||
/// (e.g. by a test script's cleanup). Marks them failed so they fall out of
|
||||
/// `pending` on next render.
|
||||
|
|
@ -508,4 +550,3 @@ fn html_escape(s: &str) -> String {
|
|||
.replace('<', "<")
|
||||
.replace('>', ">")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ mod coordinator;
|
|||
mod dashboard;
|
||||
mod lifecycle;
|
||||
mod manager_server;
|
||||
mod operator_questions;
|
||||
mod server;
|
||||
|
||||
use coordinator::Coordinator;
|
||||
|
|
|
|||
|
|
@ -71,6 +71,7 @@ async fn serve(stream: UnixStream, coord: Arc<Coordinator>) -> Result<()> {
|
|||
|
||||
const MANAGER_RECV_LONG_POLL: std::time::Duration = std::time::Duration::from_secs(30);
|
||||
|
||||
#[allow(clippy::too_many_lines)]
|
||||
async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse {
|
||||
match req {
|
||||
ManagerRequest::Send { to, body } => match coord.broker.send(&Message {
|
||||
|
|
@ -143,6 +144,18 @@ async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse
|
|||
},
|
||||
}
|
||||
}
|
||||
ManagerRequest::AskOperator { question, options } => {
|
||||
tracing::info!(%question, ?options, "manager: ask_operator");
|
||||
match coord.questions.submit(MANAGER_AGENT, question, options) {
|
||||
Ok(id) => {
|
||||
tracing::info!(%id, "operator question queued");
|
||||
ManagerResponse::QuestionQueued { id }
|
||||
}
|
||||
Err(e) => ManagerResponse::Err {
|
||||
message: format!("{e:#}"),
|
||||
},
|
||||
}
|
||||
}
|
||||
ManagerRequest::RequestApplyCommit { agent, commit_ref } => {
|
||||
tracing::info!(%agent, %commit_ref, "manager: request_apply_commit");
|
||||
match coord.approvals.submit(agent, commit_ref) {
|
||||
|
|
|
|||
141
hive-c0re/src/operator_questions.rs
Normal file
141
hive-c0re/src/operator_questions.rs
Normal file
|
|
@ -0,0 +1,141 @@
|
|||
//! Operator question queue. Manager submits via `AskOperator`; the
|
||||
//! operator answers via the dashboard. The manager-socket handler long-polls
|
||||
//! the store until the answer lands, so claude's `ask_operator` tool call
|
||||
//! returns the answer directly as its result.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Mutex;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use anyhow::{Context, Result, bail};
|
||||
use rusqlite::{Connection, OptionalExtension, params};
|
||||
use serde::Serialize;
|
||||
|
||||
const SCHEMA: &str = r"
|
||||
CREATE TABLE IF NOT EXISTS operator_questions (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
asker TEXT NOT NULL,
|
||||
question TEXT NOT NULL,
|
||||
options_json TEXT NOT NULL,
|
||||
asked_at INTEGER NOT NULL,
|
||||
answered_at INTEGER,
|
||||
answer TEXT
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_operator_questions_pending
|
||||
ON operator_questions (id) WHERE answered_at IS NULL;
|
||||
";
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct OpQuestion {
|
||||
pub id: i64,
|
||||
pub asker: String,
|
||||
pub question: String,
|
||||
pub options: Vec<String>,
|
||||
pub asked_at: i64,
|
||||
pub answered_at: Option<i64>,
|
||||
pub answer: Option<String>,
|
||||
}
|
||||
|
||||
pub struct OperatorQuestions {
|
||||
conn: Mutex<Connection>,
|
||||
}
|
||||
|
||||
impl OperatorQuestions {
|
||||
pub fn open(path: &Path) -> Result<Self> {
|
||||
if let Some(parent) = path.parent() {
|
||||
std::fs::create_dir_all(parent).with_context(|| {
|
||||
format!("create operator_questions db parent {}", parent.display())
|
||||
})?;
|
||||
}
|
||||
let conn = Connection::open(path)
|
||||
.with_context(|| format!("open operator_questions db {}", path.display()))?;
|
||||
conn.execute_batch(SCHEMA)
|
||||
.context("apply operator_questions schema")?;
|
||||
Ok(Self {
|
||||
conn: Mutex::new(conn),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn submit(&self, asker: &str, question: &str, options: &[String]) -> Result<i64> {
|
||||
let conn = self.conn.lock().unwrap();
|
||||
let options_json = serde_json::to_string(options).unwrap_or_else(|_| "[]".into());
|
||||
conn.execute(
|
||||
"INSERT INTO operator_questions (asker, question, options_json, asked_at)
|
||||
VALUES (?1, ?2, ?3, ?4)",
|
||||
params![asker, question, options_json, now_unix()],
|
||||
)?;
|
||||
Ok(conn.last_insert_rowid())
|
||||
}
|
||||
|
||||
/// Mark the question answered. Returns the original question text so the
|
||||
/// caller can include it in any helper event it fires off.
|
||||
pub fn answer(&self, id: i64, answer: &str) -> Result<String> {
|
||||
let conn = self.conn.lock().unwrap();
|
||||
let question: Option<(String, Option<i64>)> = conn
|
||||
.query_row(
|
||||
"SELECT question, answered_at FROM operator_questions WHERE id = ?1",
|
||||
params![id],
|
||||
|row| Ok((row.get(0)?, row.get(1)?)),
|
||||
)
|
||||
.optional()?;
|
||||
let Some((question, answered_at)) = question else {
|
||||
bail!("question {id} not found");
|
||||
};
|
||||
if answered_at.is_some() {
|
||||
bail!("question {id} already answered");
|
||||
}
|
||||
conn.execute(
|
||||
"UPDATE operator_questions SET answer = ?1, answered_at = ?2 WHERE id = ?3",
|
||||
params![answer, now_unix(), id],
|
||||
)?;
|
||||
Ok(question)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn get(&self, id: i64) -> Result<Option<OpQuestion>> {
|
||||
let conn = self.conn.lock().unwrap();
|
||||
conn.query_row(
|
||||
"SELECT id, asker, question, options_json, asked_at, answered_at, answer
|
||||
FROM operator_questions WHERE id = ?1",
|
||||
params![id],
|
||||
row_to_question,
|
||||
)
|
||||
.optional()
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
pub fn pending(&self) -> Result<Vec<OpQuestion>> {
|
||||
let conn = self.conn.lock().unwrap();
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, asker, question, options_json, asked_at, answered_at, answer
|
||||
FROM operator_questions
|
||||
WHERE answered_at IS NULL
|
||||
ORDER BY id ASC",
|
||||
)?;
|
||||
let rows = stmt.query_map([], row_to_question)?;
|
||||
rows.collect::<rusqlite::Result<Vec<_>>>()
|
||||
.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
fn row_to_question(row: &rusqlite::Row<'_>) -> rusqlite::Result<OpQuestion> {
|
||||
let options_json: String = row.get(3)?;
|
||||
let options: Vec<String> = serde_json::from_str(&options_json).unwrap_or_default();
|
||||
Ok(OpQuestion {
|
||||
id: row.get(0)?,
|
||||
asker: row.get(1)?,
|
||||
question: row.get(2)?,
|
||||
options,
|
||||
asked_at: row.get(4)?,
|
||||
answered_at: row.get(5)?,
|
||||
answer: row.get(6)?,
|
||||
})
|
||||
}
|
||||
|
||||
fn now_unix() -> i64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.ok()
|
||||
.and_then(|d| i64::try_from(d.as_secs()).ok())
|
||||
.unwrap_or(0)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue