From 3b444104278729e4839053d431415b5d8f177246 Mon Sep 17 00:00:00 2001 From: damocles Date: Thu, 21 May 2026 18:33:39 +0200 Subject: [PATCH] manager: receive forge notifications (ManagerRequest::Wake) --- hive-ag3nt/src/bin/hive-m1nd.rs | 1 + hive-ag3nt/src/forge_notify.rs | 40 ++++++++++++++++++++++++--------- hive-c0re/src/manager_server.rs | 11 +++++++++ hive-sh4re/src/lib.rs | 5 +++++ 4 files changed, 46 insertions(+), 11 deletions(-) diff --git a/hive-ag3nt/src/bin/hive-m1nd.rs b/hive-ag3nt/src/bin/hive-m1nd.rs index 6e085e5..816ce64 100644 --- a/hive-ag3nt/src/bin/hive-m1nd.rs +++ b/hive-ag3nt/src/bin/hive-m1nd.rs @@ -82,6 +82,7 @@ async fn main() -> Result<()> { files.clone(), turn_lock.clone(), )); + tokio::spawn(hive_ag3nt::forge_notify::run(cli.socket.clone(), true)); match initial { LoginState::Online => { serve( diff --git a/hive-ag3nt/src/forge_notify.rs b/hive-ag3nt/src/forge_notify.rs index bf94345..5f76608 100644 --- a/hive-ag3nt/src/forge_notify.rs +++ b/hive-ag3nt/src/forge_notify.rs @@ -27,10 +27,15 @@ const HTTP_TIMEOUT_SECS: u64 = 10; /// Maximum characters of a body/comment to include in the wake message. const BODY_TRUNCATE: usize = 500; -/// Spawn point: called once from `hive-ag3nt serve`. Returns immediately if -/// the forge is not configured for this agent. Otherwise loops forever, -/// polling every `POLL_INTERVAL_SECS` seconds. Errors are never fatal. -pub async fn run(socket: PathBuf) { +/// Spawn point: called once from `hive-ag3nt serve` (agent) or +/// `hive-m1nd serve` (manager). Returns immediately if the forge is not +/// configured. Otherwise loops forever, polling every +/// `POLL_INTERVAL_SECS` seconds. Errors are never fatal. +/// +/// `is_manager`: when true, wakes the inbox via `ManagerRequest::Wake` +/// instead of `AgentRequest::Wake` (the manager socket rejects the agent +/// request type). +pub async fn run(socket: PathBuf, is_manager: bool) { let forge_url = match std::env::var("HIVE_FORGE_URL") { Ok(u) if !u.is_empty() => u, _ => { @@ -77,7 +82,7 @@ pub async fn run(socket: PathBuf) { loop { interval.tick().await; - poll_once(&client, &forge_url, &token, &socket).await; + poll_once(&client, &forge_url, &token, &socket, is_manager).await; } } @@ -208,7 +213,7 @@ async fn format_notification( } } -async fn poll_once(client: &reqwest::Client, forge_url: &str, token: &str, socket: &Path) { +async fn poll_once(client: &reqwest::Client, forge_url: &str, token: &str, socket: &Path, is_manager: bool) { let url = format!("{forge_url}/api/v1/notifications?all=false&limit=50"); let resp = match client .get(&url) @@ -250,12 +255,25 @@ async fn poll_once(client: &reqwest::Client, forge_url: &str, token: &str, socke let body = format_notification(client, token, notif).await; - let req = hive_sh4re::AgentRequest::Wake { - from: "forge".to_owned(), - body, + let delivered = if is_manager { + let req = hive_sh4re::ManagerRequest::Wake { + from: "forge".to_owned(), + body, + }; + crate::client::request::<_, hive_sh4re::ManagerResponse>(socket, &req) + .await + .map(|_| ()) + } else { + let req = hive_sh4re::AgentRequest::Wake { + from: "forge".to_owned(), + body, + }; + crate::client::request::<_, hive_sh4re::AgentResponse>(socket, &req) + .await + .map(|_| ()) }; - match crate::client::request::<_, hive_sh4re::AgentResponse>(socket, &req).await { - Ok(_) => { + match delivered { + Ok(()) => { debug!(%id, "forge_notify: delivered"); } Err(e) => { diff --git a/hive-c0re/src/manager_server.rs b/hive-c0re/src/manager_server.rs index 3717f01..75b6075 100644 --- a/hive-c0re/src/manager_server.rs +++ b/hive-c0re/src/manager_server.rs @@ -117,6 +117,17 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResp } } } + ManagerRequest::Wake { from, body } => match coord.broker.send(&Message { + from: from.clone(), + to: MANAGER_AGENT.to_owned(), + body: body.clone(), + in_reply_to: None, + }) { + Ok(()) => ManagerResponse::Ok, + Err(e) => ManagerResponse::Err { + message: format!("{e:#}"), + }, + }, ManagerRequest::OperatorMsg { body } => match coord.broker.send(&Message { from: hive_sh4re::OPERATOR_RECIPIENT.to_owned(), to: MANAGER_AGENT.to_owned(), diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index 57a81ee..6b59aa3 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -837,6 +837,11 @@ pub enum ManagerRequest { /// Mirror of `AgentRequest::RequeueInflight` on the manager /// surface — fired exactly once on manager harness boot. RequeueInflight, + /// Mirror of `AgentRequest::Wake` on the manager surface. Used by + /// in-container background tasks (e.g. `forge_notify`) to push a + /// message into the manager's own broker inbox. `from` is caller- + /// chosen; `body` becomes the wake prompt body. + Wake { from: String, body: String }, } #[derive(Debug, Clone, Serialize, Deserialize)]