operator input: per-agent /send form (dashboard T4LK removed)

This commit is contained in:
müde 2026-05-15 15:28:17 +02:00
parent 3c493934da
commit 409263f1c9
8 changed files with 142 additions and 52 deletions

View file

@ -182,6 +182,13 @@ and the MCP tools. Claude drives any further `recv`/`send` itself —
harness no longer relays claude's stdout as a reply. Stdout is logged for harness no longer relays claude's stdout as a reply. Stdout is logged for
debugging; the side effects (sends via MCP) are what matter. debugging; the side effects (sends via MCP) are what matter.
**Operator input** moved from the hive-c0re dashboard's T4LK form to each
per-agent page. The per-agent `/send` POST hits the new
`AgentRequest::OperatorMsg` / `ManagerRequest::OperatorMsg` wire verb,
which enqueues `Message { from: "operator", to: <self>, body }` directly
into the broker. No more global recipient dropdown — one input per agent
page, scoped to that agent.
**Live view.** Each agent runs a `hive_ag3nt::events::Bus` (a **Live view.** Each agent runs a `hive_ag3nt::events::Bus` (a
`tokio::sync::broadcast<LiveEvent>` wrapper). The harness emits: `tokio::sync::broadcast<LiveEvent>` wrapper). The harness emits:
- `TurnStart { from, body }` when a wake-up message is popped. - `TurnStart { from, body }` when a wake-up message is popped.

View file

@ -63,8 +63,18 @@ async fn main() -> Result<()> {
let ui_state = login_state.clone(); let ui_state = login_state.clone();
let bus = Bus::new(); let bus = Bus::new();
let ui_bus = bus.clone(); let ui_bus = bus.clone();
let ui_socket = cli.socket.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = web_ui::serve(label, port, ui_state, ui_bus).await { if let Err(e) = web_ui::serve(
label,
port,
ui_state,
ui_bus,
ui_socket,
web_ui::Flavor::Agent,
)
.await
{
tracing::error!(error = ?e, "web ui failed"); tracing::error!(error = ?e, "web ui failed");
} }
}); });

View file

@ -74,8 +74,18 @@ async fn main() -> Result<()> {
let ui_state = login_state.clone(); let ui_state = login_state.clone();
let bus = Bus::new(); let bus = Bus::new();
let ui_bus = bus.clone(); let ui_bus = bus.clone();
let ui_socket = cli.socket.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = web_ui::serve(label, port, ui_state, ui_bus).await { if let Err(e) = web_ui::serve(
label,
port,
ui_state,
ui_bus,
ui_socket,
web_ui::Flavor::Manager,
)
.await
{
tracing::error!(error = ?e, "web ui failed"); tracing::error!(error = ?e, "web ui failed");
} }
}); });

View file

@ -5,6 +5,7 @@
use std::convert::Infallible; use std::convert::Infallible;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -20,6 +21,7 @@ use axum::{
use serde::Deserialize; use serde::Deserialize;
use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream}; use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream};
use crate::client;
use crate::events::Bus; use crate::events::Bus;
use crate::login::LoginState; use crate::login::LoginState;
use crate::login_session::{LoginSession, drop_if_finished}; use crate::login_session::{LoginSession, drop_if_finished};
@ -35,18 +37,38 @@ struct AppState {
login: LoginStateCell, login: LoginStateCell,
session: Arc<Mutex<Option<Arc<LoginSession>>>>, session: Arc<Mutex<Option<Arc<LoginSession>>>>,
bus: Bus, bus: Bus,
socket: PathBuf,
flavor: Flavor,
} }
pub async fn serve(label: String, port: u16, login: LoginStateCell, bus: Bus) -> Result<()> { /// Which wire protocol the per-agent UI's `/send` handler should speak.
/// Sub-agent → `AgentRequest::OperatorMsg`; manager → `ManagerRequest::OperatorMsg`.
#[derive(Debug, Clone, Copy)]
pub enum Flavor {
Agent,
Manager,
}
pub async fn serve(
label: String,
port: u16,
login: LoginStateCell,
bus: Bus,
socket: PathBuf,
flavor: Flavor,
) -> Result<()> {
let state = AppState { let state = AppState {
label, label,
login, login,
session: Arc::new(Mutex::new(None)), session: Arc::new(Mutex::new(None)),
bus, bus,
socket,
flavor,
}; };
let app = Router::new() let app = Router::new()
.route("/", get(index)) .route("/", get(index))
.route("/events/stream", get(events_stream)) .route("/events/stream", get(events_stream))
.route("/send", post(post_send))
.route("/login/start", post(post_login_start)) .route("/login/start", post(post_login_start))
.route("/login/code", post(post_login_code)) .route("/login/code", post(post_login_code))
.route("/login/cancel", post(post_login_cancel)) .route("/login/cancel", post(post_login_cancel))
@ -65,7 +87,7 @@ async fn index(State(state): State<AppState>) -> Html<String> {
let login = *state.login.lock().unwrap(); let login = *state.login.lock().unwrap();
let session_snapshot = state.session.lock().unwrap().clone(); let session_snapshot = state.session.lock().unwrap().clone();
let body = match (login, session_snapshot) { let body = match (login, session_snapshot) {
(LoginState::Online, _) => render_online(), (LoginState::Online, _) => render_online(&state.label),
(LoginState::NeedsLogin, None) => render_needs_login_idle(), (LoginState::NeedsLogin, None) => render_needs_login_idle(),
(LoginState::NeedsLogin, Some(session)) => render_login_in_progress(&session), (LoginState::NeedsLogin, Some(session)) => render_login_in_progress(&session),
}; };
@ -75,9 +97,15 @@ async fn index(State(state): State<AppState>) -> Html<String> {
)) ))
} }
fn render_online() -> String { fn render_online(label: &str) -> String {
format!( format!(
"<p class=\"status-online\">▓█▓▒░ harness alive — turn loop running ▓█▓▒░</p>\n{LIVE_PANEL}", "<p class=\"status-online\">▓█▓▒░ harness alive — turn loop running ▓█▓▒░</p>\n\
<form method=\"POST\" action=\"/send\" class=\"sendform\">\n \
<input name=\"body\" placeholder=\"message {label} as operator…\" required autocomplete=\"off\">\n \
<button type=\"submit\" class=\"btn btn-send\">◆ S3ND</button>\n\
</form>\n\
<p class=\"meta\">enqueued with <code>from: operator</code> on this agent's inbox; the next turn picks it up.</p>\n\
{LIVE_PANEL}",
) )
} }
@ -200,6 +228,46 @@ fn render_login_in_progress(session: &Arc<LoginSession>) -> String {
) )
} }
#[derive(Deserialize)]
struct SendForm {
body: String,
}
async fn post_send(State(state): State<AppState>, Form(form): Form<SendForm>) -> Response {
let body = form.body.trim().to_owned();
if body.is_empty() {
return error_response("send: `body` required");
}
let result = match state.flavor {
Flavor::Agent => match client::request::<_, hive_sh4re::AgentResponse>(
&state.socket,
&hive_sh4re::AgentRequest::OperatorMsg { body },
)
.await
{
Ok(hive_sh4re::AgentResponse::Ok) => Ok(()),
Ok(hive_sh4re::AgentResponse::Err { message }) => Err(message),
Ok(other) => Err(format!("unexpected response: {other:?}")),
Err(e) => Err(format!("transport: {e:#}")),
},
Flavor::Manager => match client::request::<_, hive_sh4re::ManagerResponse>(
&state.socket,
&hive_sh4re::ManagerRequest::OperatorMsg { body },
)
.await
{
Ok(hive_sh4re::ManagerResponse::Ok) => Ok(()),
Ok(hive_sh4re::ManagerResponse::Err { message }) => Err(message),
Ok(other) => Err(format!("unexpected response: {other:?}")),
Err(e) => Err(format!("transport: {e:#}")),
},
};
match result {
Ok(()) => Redirect::to("/").into_response(),
Err(e) => error_response(&format!("send failed: {e}")),
}
}
async fn events_stream( async fn events_stream(
State(state): State<AppState>, State(state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> { ) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
@ -339,6 +407,17 @@ const STYLE: &str = r#"
.btn:hover { background: rgba(204, 102, 255, 0.1); } .btn:hover { background: rgba(204, 102, 255, 0.1); }
.btn-login { color: var(--amber); border-color: var(--amber); } .btn-login { color: var(--amber); border-color: var(--amber); }
.btn-cancel { color: #ff6b6b; border-color: #ff6b6b; font-size: 0.85em; padding: 0.15em 0.6em; } .btn-cancel { color: #ff6b6b; border-color: #ff6b6b; font-size: 0.85em; padding: 0.15em 0.6em; }
.btn-send { color: var(--green); border-color: var(--green); }
.sendform { display: flex; gap: 0.6em; margin-top: 0.5em; }
.sendform input {
font-family: inherit; font-size: 1em;
background: rgba(255, 255, 255, 0.04);
color: var(--fg);
border: 1px solid var(--purple-dim);
padding: 0.4em 0.6em;
flex: 1;
}
.sendform input:focus { outline: 1px solid var(--purple); }
.loginform { display: flex; gap: 0.6em; margin-top: 0.5em; } .loginform { display: flex; gap: 0.6em; margin-top: 0.5em; }
.loginform input { .loginform input {
font-family: inherit; font-size: 1em; font-family: inherit; font-size: 1em;

View file

@ -107,5 +107,15 @@ fn dispatch(req: &AgentRequest, agent: &str, broker: &Broker) -> AgentResponse {
message: format!("{e:#}"), message: format!("{e:#}"),
}, },
}, },
AgentRequest::OperatorMsg { body } => match broker.send(&Message {
from: "operator".to_owned(),
to: agent.to_owned(),
body: body.clone(),
}) {
Ok(()) => AgentResponse::Ok,
Err(e) => AgentResponse::Err {
message: format!("{e:#}"),
},
},
} }
} }

View file

@ -20,7 +20,7 @@ use axum::{
}, },
routing::{get, post}, routing::{get, post},
}; };
use hive_sh4re::{Approval, MANAGER_AGENT, Message}; use hive_sh4re::Approval;
use serde::Deserialize; use serde::Deserialize;
use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::{Stream, StreamExt}; use tokio_stream::{Stream, StreamExt};
@ -44,7 +44,6 @@ pub async fn serve(port: u16, coord: Arc<Coordinator>) -> Result<()> {
.route("/destroy/{name}", post(post_destroy)) .route("/destroy/{name}", post(post_destroy))
.route("/rebuild/{name}", post(post_rebuild)) .route("/rebuild/{name}", post(post_rebuild))
.route("/request-spawn", post(post_request_spawn)) .route("/request-spawn", post(post_request_spawn))
.route("/send", post(post_send))
.route("/messages/stream", get(messages_stream)) .route("/messages/stream", get(messages_stream))
.with_state(AppState { coord }); .with_state(AppState { coord });
let addr = SocketAddr::from(([0, 0, 0, 0], port)); let addr = SocketAddr::from(([0, 0, 0, 0], port));
@ -83,35 +82,11 @@ async fn index(headers: HeaderMap, State(state): State<AppState>) -> Html<String
}; };
Html(format!( Html(format!(
"<!doctype html>\n<html lang=\"en\">\n<head>\n<meta charset=\"utf-8\">\n<title>hyperhive // h1ve-c0re</title>\n{refresh}\n{STYLE}\n</head>\n<body>\n{BANNER}\n{containers}\n{talk}\n{approvals_html}\n{MSG_FLOW}\n{FOOTER}\n{MSG_FLOW_JS}\n</body>\n</html>\n", "<!doctype html>\n<html lang=\"en\">\n<head>\n<meta charset=\"utf-8\">\n<title>hyperhive // h1ve-c0re</title>\n{refresh}\n{STYLE}\n</head>\n<body>\n{BANNER}\n{containers}\n{approvals_html}\n{MSG_FLOW}\n{FOOTER}\n{MSG_FLOW_JS}\n</body>\n</html>\n",
containers = render_containers(&containers, &transient, current_rev.as_deref(), &hostname), containers = render_containers(&containers, &transient, current_rev.as_deref(), &hostname),
talk = render_talk(&containers),
)) ))
} }
#[derive(Deserialize)]
struct SendForm {
to: String,
body: String,
}
async fn post_send(State(state): State<AppState>, Form(form): Form<SendForm>) -> Response {
let to = form.to.trim().to_owned();
let body = form.body.trim().to_owned();
if to.is_empty() || body.is_empty() {
return error_response("send: `to` and `body` required");
}
let msg = Message {
from: "operator".into(),
to,
body,
};
match state.coord.broker.send(&msg) {
Ok(()) => Redirect::to("/").into_response(),
Err(e) => error_response(&format!("send failed: {e:#}")),
}
}
async fn messages_stream( async fn messages_stream(
State(state): State<AppState>, State(state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> { ) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
@ -296,25 +271,6 @@ async fn render_approvals(approvals: &[Approval]) -> String {
out out
} }
fn render_talk(containers: &[String]) -> String {
let mut options = String::new();
let _ = writeln!(
options,
"<option value=\"{MANAGER_AGENT}\">manager (hm1nd)</option>",
);
for container in containers {
if container == MANAGER_NAME {
continue;
}
if let Some(name) = container.strip_prefix(AGENT_PREFIX) {
let _ = writeln!(options, "<option value=\"{name}\">{name}</option>");
}
}
format!(
"<h2>◆ T4LK ◆</h2>\n<div class=\"divider\">══════════════════════════════════════════════════════════════</div>\n<form method=\"POST\" action=\"/send\" class=\"talkform\">\n <select name=\"to\" required>{options}</select>\n <input name=\"body\" placeholder=\"message body...\" required autocomplete=\"off\">\n <button type=\"submit\" class=\"btn btn-talk\">◆ S3ND</button>\n</form>\n<p class=\"meta\">sends as <code>from: operator</code>. Replies stream into the message panel below.</p>\n"
)
}
/// Filter out approvals whose agent state dir was wiped out from under us /// Filter out approvals whose agent state dir was wiped out from under us
/// (e.g. by a test script's cleanup). Marks them failed so they fall out of /// (e.g. by a test script's cleanup). Marks them failed so they fall out of
/// `pending` on next render. /// `pending` on next render.

View file

@ -81,6 +81,16 @@ async fn dispatch(req: &ManagerRequest, coord: &Coordinator) -> ManagerResponse
message: format!("{e:#}"), message: format!("{e:#}"),
}, },
}, },
ManagerRequest::OperatorMsg { body } => match coord.broker.send(&Message {
from: "operator".to_owned(),
to: MANAGER_AGENT.to_owned(),
body: body.clone(),
}) {
Ok(()) => ManagerResponse::Ok,
Err(e) => ManagerResponse::Err {
message: format!("{e:#}"),
},
},
ManagerRequest::Status => match coord.broker.count_pending(MANAGER_AGENT) { ManagerRequest::Status => match coord.broker.count_pending(MANAGER_AGENT) {
Ok(unread) => ManagerResponse::Status { unread }, Ok(unread) => ManagerResponse::Status { unread },
Err(e) => ManagerResponse::Err { Err(e) => ManagerResponse::Err {

View file

@ -151,6 +151,11 @@ pub enum AgentRequest {
/// Non-mutating: how many pending messages are addressed to me? /// Non-mutating: how many pending messages are addressed to me?
/// Used by the harness to render a status line after each tool call. /// Used by the harness to render a status line after each tool call.
Status, Status,
/// Operator-injected message TO this agent (from this agent's own web
/// UI). Recipient is implicit — `from` is `"operator"`. Effectively the
/// per-agent equivalent of the old dashboard T4LK form, but scoped to
/// the agent whose page the operator is on.
OperatorMsg { body: String },
} }
/// Responses on a per-agent socket. /// Responses on a per-agent socket.
@ -211,6 +216,9 @@ pub enum ManagerRequest {
/// Non-mutating: pending message count, used to render a status line /// Non-mutating: pending message count, used to render a status line
/// after each MCP tool call (mirrors `AgentRequest::Status`). /// after each MCP tool call (mirrors `AgentRequest::Status`).
Status, Status,
/// Operator-injected message TO the manager (from the manager's own web
/// UI). Same shape as `AgentRequest::OperatorMsg`.
OperatorMsg { body: String },
/// Submit a spawn request for the user to approve. On approval the host /// Submit a spawn request for the user to approve. On approval the host
/// creates and starts the container. Brand-new agent names only — if an /// creates and starts the container. Brand-new agent names only — if an
/// agent of the same name already exists, the approval will fail. /// agent of the same name already exists, the approval will fail.