hyperhive/hive-ag3nt/src/forge_notify.rs

305 lines
10 KiB
Rust

//! Background Forgejo notification poller.
//!
//! Reads `HIVE_FORGE_URL` + `{HYPERHIVE_STATE_DIR}/forge-token`, polls
//! `GET /notifications?all=false` every 30 seconds, and delivers each
//! unread notification as a broker `Wake { from: "forge" }` message so
//! claude's normal turn loop picks it up.
//!
//! Each notification is enriched with the subject body and/or latest
//! comment body so the agent sees actual content, not just a title.
//!
//! Graceful no-ops:
//! - `HIVE_FORGE_URL` not set → disabled (no forge configured)
//! - token file absent → disabled (agent has no forge account yet)
//! - HTTP errors → logged at debug, retry next tick
//!
//! After successfully delivering a notification it is marked read via
//! `PATCH /notifications/threads/{id}` so it does not re-fire. If delivery
//! fails the thread is left unread so it resurfaces next tick.
use std::path::{Path, PathBuf};
use std::time::Duration;
use tracing::{debug, info, warn};
const POLL_INTERVAL_SECS: u64 = 30;
const HTTP_TIMEOUT_SECS: u64 = 10;
/// Maximum characters of a body/comment to include in the wake message.
const BODY_TRUNCATE: usize = 500;
/// Spawn point: called once from `hive-ag3nt serve` (agent) or
/// `hive-m1nd serve` (manager). Returns immediately if the forge is not
/// configured. Otherwise loops forever, polling every
/// `POLL_INTERVAL_SECS` seconds. Errors are never fatal.
///
/// `is_manager`: when true, wakes the inbox via `ManagerRequest::Wake`
/// instead of `AgentRequest::Wake` (the manager socket rejects the agent
/// request type).
pub async fn run(socket: PathBuf, is_manager: bool) {
let forge_url = match std::env::var("HIVE_FORGE_URL") {
Ok(u) if !u.is_empty() => u,
_ => {
debug!("forge_notify: HIVE_FORGE_URL not set — disabled");
return;
}
};
let state_dir = std::env::var("HYPERHIVE_STATE_DIR").unwrap_or_default();
let token_path = format!("{state_dir}/forge-token");
let token = match tokio::fs::read_to_string(&token_path).await {
Ok(t) => {
let t = t.trim().to_owned();
if t.is_empty() {
debug!("forge_notify: empty forge token at {token_path} — disabled");
return;
}
t
}
Err(e) => {
debug!("forge_notify: no forge token at {token_path} ({e}) — disabled");
return;
}
};
let client = match reqwest::Client::builder()
.timeout(Duration::from_secs(HTTP_TIMEOUT_SECS))
.build()
{
Ok(c) => c,
Err(e) => {
warn!("forge_notify: failed to build HTTP client: {e}");
return;
}
};
info!(forge_url = %forge_url, "forge_notify: polling started");
let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
// First tick fires immediately — skip it so we don't race the broker
// socket becoming available right at boot.
interval.tick().await;
loop {
interval.tick().await;
poll_once(&client, &forge_url, &token, &socket, is_manager).await;
}
}
/// Fetch a JSON value from a URL using the agent's forge token. Returns
/// `None` on any HTTP or parse error (best-effort enrichment).
async fn fetch_json(
client: &reqwest::Client,
url: &str,
token: &str,
) -> Option<serde_json::Value> {
let resp = client
.get(url)
.header("Authorization", format!("token {token}"))
.send()
.await
.ok()?;
if !resp.status().is_success() {
return None;
}
resp.json().await.ok()
}
/// Truncate a string to `max` bytes at a char boundary, appending `…` if cut.
fn truncate(s: &str, max: usize) -> String {
if s.len() <= max {
return s.to_owned();
}
let end = s
.char_indices()
.map(|(i, _)| i)
.take_while(|&i| i <= max - 3)
.last()
.unwrap_or(0);
format!("{}", &s[..end])
}
/// Build a human-readable wake message for one Forgejo notification.
/// Fetches the subject and (if present) the latest comment to include
/// author and body. Falls back gracefully when API calls fail.
async fn format_notification(
client: &reqwest::Client,
token: &str,
notif: &serde_json::Value,
) -> String {
let title = notif["subject"]["title"].as_str().unwrap_or("?");
let notif_type = notif["subject"]["type"].as_str().unwrap_or("?");
let html_url = notif["subject"]["html_url"]
.as_str()
.unwrap_or_else(|| notif["subject"]["url"].as_str().unwrap_or(""));
let repo = notif["repository"]["full_name"].as_str().unwrap_or("?");
// API URLs for fetching content
let subject_api_url = notif["subject"]["url"].as_str().unwrap_or("");
let comment_api_url = notif["subject"]["latest_comment_url"].as_str().unwrap_or("");
let comment_html_url = notif["subject"]["latest_comment_html_url"]
.as_str()
.unwrap_or("");
// Determine whether this notification was triggered by a comment or by
// creation/state-change of the subject itself.
let has_comment = !comment_api_url.is_empty() && comment_api_url != subject_api_url;
if has_comment {
// Notification triggered by a new comment.
let comment = fetch_json(client, comment_api_url, token).await;
let author = comment
.as_ref()
.and_then(|c| c["user"]["login"].as_str())
.unwrap_or("?");
let body = comment
.as_ref()
.and_then(|c| c["body"].as_str())
.unwrap_or("")
.trim();
let kind = match notif_type {
"Pull Request" => "comment on PR",
"Issue" => "comment on issue",
_ => "comment",
};
let mut out = format!(
"[{kind}] {title}\nrepo: {repo}\nurl: {}\n\n{author}: {}\n",
if comment_html_url.is_empty() { html_url } else { comment_html_url },
truncate(body, BODY_TRUNCATE),
);
if out.ends_with('\n') {
out.pop();
}
out
} else {
// Notification triggered by creation or state change of the subject.
let subject = fetch_json(client, subject_api_url, token).await;
let author = subject
.as_ref()
.and_then(|s| s["user"]["login"].as_str())
.unwrap_or("?");
let body = subject
.as_ref()
.and_then(|s| s["body"].as_str())
.unwrap_or("")
.trim();
let state = subject
.as_ref()
.and_then(|s| s["state"].as_str())
.unwrap_or("");
let merged = subject
.as_ref()
.and_then(|s| s["merged"].as_bool())
.unwrap_or(false);
let kind = match (notif_type, state, merged) {
("Pull Request", "closed", true) => "PR merged".to_owned(),
("Pull Request", "closed", false) => "PR closed".to_owned(),
("Pull Request", _, _) => "new PR".to_owned(),
("Issue", "closed", _) => "issue closed".to_owned(),
("Issue", _, _) => "new issue".to_owned(),
_ => format!("new {notif_type}"),
};
let mut out = format!("[{kind}] {title}\nrepo: {repo}\nurl: {html_url}");
if !body.is_empty() && !state.contains("closed") && !merged {
out.push_str(&format!(
"\n\n{author}: {}",
truncate(body, BODY_TRUNCATE)
));
}
out
}
}
async fn poll_once(client: &reqwest::Client, forge_url: &str, token: &str, socket: &Path, is_manager: bool) {
let url = format!("{forge_url}/api/v1/notifications?all=false&limit=50");
let resp = match client
.get(&url)
.header("Authorization", format!("token {token}"))
.send()
.await
{
Ok(r) => r,
Err(e) => {
debug!("forge_notify: poll request failed: {e}");
return;
}
};
if !resp.status().is_success() {
debug!("forge_notify: poll status {}", resp.status());
return;
}
let notifications: Vec<serde_json::Value> = match resp.json().await {
Ok(v) => v,
Err(e) => {
warn!("forge_notify: response parse error: {e}");
return;
}
};
if notifications.is_empty() {
return;
}
debug!(count = notifications.len(), "forge_notify: delivering notifications");
for notif in &notifications {
let id = match notif["id"].as_u64() {
Some(n) => n,
None => continue,
};
let body = format_notification(client, token, notif).await;
let delivered = if is_manager {
let req = hive_sh4re::ManagerRequest::Wake {
from: "forge".to_owned(),
body,
};
crate::client::request::<_, hive_sh4re::ManagerResponse>(socket, &req)
.await
.map(|_| ())
} else {
let req = hive_sh4re::AgentRequest::Wake {
from: "forge".to_owned(),
body,
};
crate::client::request::<_, hive_sh4re::AgentResponse>(socket, &req)
.await
.map(|_| ())
};
match delivered {
Ok(()) => {
debug!(%id, "forge_notify: delivered");
}
Err(e) => {
warn!(%id, error = ?e, "forge_notify: deliver failed — leaving unread");
continue;
}
}
// Mark as read only after successful delivery so a failed-delivery
// notification resurfaces on the next poll tick.
let mark_url = format!("{forge_url}/api/v1/notifications/threads/{id}");
match client
.patch(&mark_url)
.header("Authorization", format!("token {token}"))
.send()
.await
{
Err(e) => {
warn!(%id, error = ?e, "forge_notify: mark-read request failed — notification will resurface");
}
Ok(r) if !r.status().is_success() => {
warn!(%id, status = %r.status(), "forge_notify: mark-read returned non-2xx — notification will resurface");
}
Ok(_) => {
debug!(%id, "forge_notify: marked read");
}
}
}
}