feat: poll forge notifications in agent harness

Closes #27
This commit is contained in:
damocles 2026-05-20 17:54:28 +02:00 committed by Mara
parent a4706d793e
commit cddaacd12f
8 changed files with 913 additions and 9 deletions

View file

@ -83,6 +83,7 @@ async fn main() -> Result<()> {
let files = turn::TurnFiles::prepare(&cli.socket, &label, mcp::Flavor::Agent).await?;
let turn_lock: TurnLock = Arc::new(tokio::sync::Mutex::new(()));
plugins::install_configured(&cli.socket, Some("manager")).await;
tokio::spawn(hive_ag3nt::forge_notify::run(cli.socket.clone()));
tokio::spawn(web_ui::serve(
label.clone(),
port,

View file

@ -0,0 +1,156 @@
//! Background Forgejo notification poller.
//!
//! Reads `HIVE_FORGE_URL` + `{HYPERHIVE_STATE_DIR}/forge-token`, polls
//! `GET /notifications?all=false` every 30 seconds, and delivers each
//! unread notification as a broker `Wake { from: "forge" }` message so
//! claude's normal turn loop picks it up.
//!
//! Graceful no-ops:
//! - `HIVE_FORGE_URL` not set → disabled (no forge configured)
//! - token file absent → disabled (agent has no forge account yet)
//! - HTTP errors → logged at debug, retry next tick
//!
//! After successfully delivering a notification it is marked read via
//! `PATCH /notifications/threads/{id}` so it does not re-fire. If delivery
//! fails the thread is left unread so it resurfaces next tick.
use std::path::{Path, PathBuf};
use std::time::Duration;
use tracing::{debug, info, warn};
const POLL_INTERVAL_SECS: u64 = 30;
const HTTP_TIMEOUT_SECS: u64 = 10;
/// Spawn point: called once from `hive-ag3nt serve`. Returns immediately if
/// the forge is not configured for this agent. Otherwise loops forever,
/// polling every `POLL_INTERVAL_SECS` seconds. Errors are never fatal.
pub async fn run(socket: PathBuf) {
let forge_url = match std::env::var("HIVE_FORGE_URL") {
Ok(u) if !u.is_empty() => u,
_ => {
debug!("forge_notify: HIVE_FORGE_URL not set — disabled");
return;
}
};
let state_dir = std::env::var("HYPERHIVE_STATE_DIR").unwrap_or_default();
let token_path = format!("{state_dir}/forge-token");
let token = match tokio::fs::read_to_string(&token_path).await {
Ok(t) => {
let t = t.trim().to_owned();
if t.is_empty() {
debug!("forge_notify: empty forge token at {token_path} — disabled");
return;
}
t
}
Err(e) => {
debug!("forge_notify: no forge token at {token_path} ({e}) — disabled");
return;
}
};
let client = match reqwest::Client::builder()
.timeout(Duration::from_secs(HTTP_TIMEOUT_SECS))
.build()
{
Ok(c) => c,
Err(e) => {
warn!("forge_notify: failed to build HTTP client: {e}");
return;
}
};
info!(forge_url = %forge_url, "forge_notify: polling started");
let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
// First tick fires immediately — skip it so we don't race the broker
// socket becoming available right at boot.
interval.tick().await;
loop {
interval.tick().await;
poll_once(&client, &forge_url, &token, &socket).await;
}
}
async fn poll_once(client: &reqwest::Client, forge_url: &str, token: &str, socket: &Path) {
let url = format!("{forge_url}/api/v1/notifications?all=false&limit=50");
let resp = match client
.get(&url)
.header("Authorization", format!("token {token}"))
.send()
.await
{
Ok(r) => r,
Err(e) => {
debug!("forge_notify: poll request failed: {e}");
return;
}
};
if !resp.status().is_success() {
debug!("forge_notify: poll status {}", resp.status());
return;
}
let notifications: Vec<serde_json::Value> = match resp.json().await {
Ok(v) => v,
Err(e) => {
warn!("forge_notify: response parse error: {e}");
return;
}
};
if notifications.is_empty() {
return;
}
debug!(count = notifications.len(), "forge_notify: delivering notifications");
for notif in &notifications {
let id = match notif["id"].as_u64() {
Some(n) => n,
None => continue,
};
let title = notif["subject"]["title"].as_str().unwrap_or("?");
let notif_type = notif["subject"]["type"].as_str().unwrap_or("?");
let html_url = notif["subject"]["html_url"]
.as_str()
.or_else(|| notif["subject"]["url"].as_str())
.unwrap_or("");
let repo = notif["repository"]["full_name"].as_str().unwrap_or("?");
let body = format!(
"forge notification: [{notif_type}] {title}\nrepo: {repo}\nurl: {html_url}"
);
let req = hive_sh4re::AgentRequest::Wake {
from: "forge".to_owned(),
body,
};
match crate::client::request::<_, hive_sh4re::AgentResponse>(socket, &req).await {
Ok(_) => {
debug!(%id, "forge_notify: delivered");
}
Err(e) => {
warn!(%id, error = ?e, "forge_notify: deliver failed — leaving unread");
continue;
}
}
// Mark as read only after successful delivery so a failed-delivery
// notification resurfaces on the next poll tick.
let mark_url = format!("{forge_url}/api/v1/notifications/threads/{id}");
if let Err(e) = client
.patch(&mark_url)
.header("Authorization", format!("token {token}"))
.send()
.await
{
debug!(%id, error = ?e, "forge_notify: mark-read failed (non-fatal)");
}
}
}

View file

@ -3,6 +3,7 @@
pub mod client;
pub mod events;
pub mod forge_notify;
pub mod login;
pub mod login_session;
pub mod mcp;