//! Background Forgejo notification poller. //! //! Reads `HIVE_FORGE_URL` + `{HYPERHIVE_STATE_DIR}/forge-token`, polls //! `GET /notifications?all=false` every 30 seconds, and delivers each //! unread notification as a broker `Wake { from: "forge" }` message so //! claude's normal turn loop picks it up. //! //! Each notification is enriched with the subject body and/or latest //! comment body so the agent sees actual content, not just a title. //! //! Graceful no-ops: //! - `HIVE_FORGE_URL` not set → disabled (no forge configured) //! - token file absent → disabled (agent has no forge account yet) //! - HTTP errors → logged at debug, retry next tick //! //! After successfully delivering a notification it is marked read via //! `PATCH /notifications/threads/{id}` so it does not re-fire. If delivery //! fails the thread is left unread so it resurfaces next tick. //! //! Self-notification filtering (closes #230): //! - New issues/PRs created by this agent (`reason == "author"` + `state == open`) //! are silently marked read — the agent already knows it opened them. //! - Comment notifications where the comment author matches this agent's own //! forge login are silently marked read. //! //! Own login is fetched once at startup via `GET /user` and cached for the //! lifetime of the polling loop. //! //! PR review formatting (closes #231): //! - When `latest_comment_url` points to a review (the fetched JSON has a //! `state` field like `APPROVED` / `REQUEST_CHANGES` / `COMMENT`), the //! notification is formatted as `[PR approved #N repo]` instead of the //! generic `[comment on PR #N repo]` so agents can action it immediately. use std::collections::HashSet; use std::fmt::Write as _; use std::path::{Path, PathBuf}; use std::time::Duration; use tracing::{debug, info, warn}; const POLL_INTERVAL_SECS: u64 = 30; const HTTP_TIMEOUT_SECS: u64 = 10; /// Maximum characters of a body/comment to include in the wake message. const BODY_TRUNCATE: usize = 500; /// Spawn point: called once from `hive-ag3nt serve` (agent) or /// `hive-m1nd serve` (manager). Returns immediately if the forge is not /// configured. Otherwise loops forever, polling every /// `POLL_INTERVAL_SECS` seconds. Errors are never fatal. /// /// `is_manager`: when true, wakes the inbox via `ManagerRequest::Wake` /// instead of `AgentRequest::Wake` (the manager socket rejects the agent /// request type). pub async fn run(socket: PathBuf, is_manager: bool) { let forge_url = match std::env::var("HIVE_FORGE_URL") { Ok(u) if !u.is_empty() => u, _ => { debug!("forge_notify: HIVE_FORGE_URL not set — disabled"); return; } }; let state_dir = std::env::var("HYPERHIVE_STATE_DIR").unwrap_or_default(); let token_path = format!("{state_dir}/forge-token"); let token = match tokio::fs::read_to_string(&token_path).await { Ok(t) => { let t = t.trim().to_owned(); if t.is_empty() { debug!("forge_notify: empty forge token at {token_path} — disabled"); return; } t } Err(e) => { debug!("forge_notify: no forge token at {token_path} ({e}) — disabled"); return; } }; let client = match reqwest::Client::builder() .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS)) .build() { Ok(c) => c, Err(e) => { warn!("forge_notify: failed to build HTTP client: {e}"); return; } }; // Fetch own login once for self-notification filtering (closes #230). // Falls back to empty string on failure — no filtering (safe degradation). let own_login = { let url = format!("{forge_url}/api/v1/user"); fetch_json(&client, &url, &token) .await .and_then(|v| v["login"].as_str().map(std::borrow::ToOwned::to_owned)) .unwrap_or_default() }; if own_login.is_empty() { warn!("forge_notify: could not resolve own login — self-notification filtering disabled"); } else { debug!(%own_login, "forge_notify: own login resolved"); } info!(forge_url = %forge_url, "forge_notify: polling started"); let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); // First tick fires immediately — skip it so we don't race the broker // socket becoming available right at boot. interval.tick().await; // HIVE_FORGE_KEEP_SUBSCRIPTIONS=1 disables auto-unsubscribe for agents // that intentionally consume the full repo notification firehose (e.g. triage). let keep_subscriptions = std::env::var("HIVE_FORGE_KEEP_SUBSCRIPTIONS") .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) .unwrap_or(false); // Repos we have already unsubscribed this process lifetime. Persists // across polls so we don't hammer DELETE on every cycle. let mut unsubbed_repos: HashSet = HashSet::new(); loop { interval.tick().await; poll_once( &client, &forge_url, &token, &socket, is_manager, keep_subscriptions, &mut unsubbed_repos, &own_login, ) .await; } } /// Fetch a JSON value from a URL using the agent's forge token. Returns /// `None` on any HTTP or parse error (best-effort enrichment). async fn fetch_json( client: &reqwest::Client, url: &str, token: &str, ) -> Option { let resp = client .get(url) .header("Authorization", format!("token {token}")) .send() .await .ok()?; if !resp.status().is_success() { return None; } resp.json().await.ok() } /// Map a Forgejo notification `subject.type` to a human-readable label. /// Known values: "Pull", "Issue", "Commit", "Repository". Any unknown /// type is passed through as-is so new Forgejo types degrade gracefully /// rather than silently collapsing into a generic label. fn notif_type_label(t: &str) -> &str { match t { "Pull" => "PR", "Issue" => "issue", other => other, } } /// Truncate a string to `max` bytes at a char boundary, appending `…` if cut. fn truncate(s: &str, max: usize) -> String { if s.len() <= max { return s.to_owned(); } let end = s .char_indices() .map(|(i, _)| i) .take_while(|&i| i <= max - 3) .last() .unwrap_or(0); format!("{}…", &s[..end]) } /// Map a Forgejo review state to a readable action label. /// Returns `None` for non-review states (regular comments have no `state` field; /// `PENDING` means the review was saved but not submitted yet). /// Forgejo review states: "APPROVED", "`REQUEST_CHANGES`", "COMMENT", "PENDING". fn review_state_label(state: &str) -> Option<&str> { match state { "APPROVED" => Some("approved"), "REQUEST_CHANGES" => Some("changes requested"), "COMMENT" => Some("review comment"), _ => None, } } /// Build a human-readable wake message for one Forgejo notification. /// Returns `None` when the notification is a self-echo (actor is `own_login`) /// and should be silently discarded (and marked read by the caller). /// /// Formats: /// - Comment: `[comment on PR #N repo] title\nurl: ...\n\nauthor: body\nassignee: user` /// - Review: `[PR approved #N repo] title\nurl: ...\n\nreviewer: body\nassignee: user` /// - New item: `[new issue #N repo] title\nurl: ...\nassignee: user` /// - State: `[PR merged #N repo] title\nurl: ...\nassignee: user` /// /// Assignees (and, for PRs, `requested_reviewers`) are appended unconditionally /// on all issue/PR notifications (closes #256). /// /// Number is extracted from `html_url` last path segment before any `#`. /// Repo slug (`owner/name`) is always included — agents may watch multiple repos. async fn format_notification( client: &reqwest::Client, token: &str, notif: &serde_json::Value, own_login: &str, ) -> Option { let title = notif["subject"]["title"].as_str().unwrap_or("?"); let notif_type = notif["subject"]["type"].as_str().unwrap_or("?"); let html_url = notif["subject"]["html_url"] .as_str() .unwrap_or_else(|| notif["subject"]["url"].as_str().unwrap_or("")); // Extract issue/PR number from the html_url. URL ends with /issues/N or // /pulls/N (possibly followed by #anchor for comments). Best-effort. let num = html_url .split('#') .next() .and_then(|u| u.rsplit('/').next()) .and_then(|s| s.parse::().ok()) .map(|n| format!(" #{n}")) .unwrap_or_default(); // Repo slug for multi-repo disambiguation. Falls back gracefully when absent. let repo = notif["repository"]["full_name"] .as_str() .map(|r| format!(" {r}")) .unwrap_or_default(); // API URLs for fetching content let subject_api_url = notif["subject"]["url"].as_str().unwrap_or(""); let comment_api_url = notif["subject"]["latest_comment_url"].as_str().unwrap_or(""); let comment_html_url = notif["subject"]["latest_comment_html_url"] .as_str() .unwrap_or(""); // Always fetch subject detail for assignee/reviewer metadata (#256). // Keeps agents informed of current ownership without a follow-up fetch. let subject = if subject_api_url.is_empty() { None } else { fetch_json(client, subject_api_url, token).await }; let is_pr = matches!(notif_type, "Pull Request" | "Pull"); let meta_suffix = build_meta_suffix(subject.as_ref(), is_pr); // Determine whether this notification was triggered by a comment/review or // by creation/state-change of the subject itself. let has_comment = !comment_api_url.is_empty() && comment_api_url != subject_api_url; let meta = NotifMeta { title, notif_type, html_url, num, repo, meta_suffix, subject, is_pr }; if has_comment { format_comment_notification(client, token, &meta, comment_api_url, comment_html_url, own_login).await } else { format_state_change_notification(notif, &meta, own_login) } } /// Shared notification metadata extracted from the raw Forgejo JSON. struct NotifMeta<'a> { title: &'a str, notif_type: &'a str, html_url: &'a str, num: String, repo: String, meta_suffix: String, /// Fetched subject detail (issue/PR JSON); used for review-request detection. subject: Option, is_pr: bool, } /// Build the `\nassignee: ...` (and optionally `\nreviewer: ...`) suffix /// appended to all notification kinds. fn build_meta_suffix(subject: Option<&serde_json::Value>, is_pr: bool) -> String { let assignees: Vec<&str> = subject .and_then(|s| s["assignees"].as_array()) .map(|arr| arr.iter().filter_map(|a| a["login"].as_str()).collect()) .unwrap_or_default(); let assignee_line = if assignees.is_empty() { "assignee: unassigned".to_owned() } else { format!("assignee: {}", assignees.join(", ")) }; // For PRs, include requested_reviewers when present. let reviewer_line = if is_pr { let reviewers: Vec<&str> = subject .and_then(|s| s["requested_reviewers"].as_array()) .map(|arr| arr.iter().filter_map(|r| r["login"].as_str()).collect()) .unwrap_or_default(); if reviewers.is_empty() { None } else { Some(format!("reviewer: {}", reviewers.join(", "))) } } else { None }; match reviewer_line { Some(r) => format!("\n{assignee_line}\n{r}"), None => format!("\n{assignee_line}"), } } /// Format a notification triggered by a new comment or review submission. async fn format_comment_notification( client: &reqwest::Client, token: &str, meta: &NotifMeta<'_>, comment_api_url: &str, comment_html_url: &str, own_login: &str, ) -> Option { let payload = fetch_json(client, comment_api_url, token).await; let actor_login = payload .as_ref() .and_then(|c| c["user"]["login"].as_str()) .unwrap_or(""); // Self-notification filter (#230): skip if we authored the comment/review. if !own_login.is_empty() && actor_login == own_login { debug!(%own_login, "forge_notify: skipping self-authored comment/review"); return None; } let body_text = payload .as_ref() .and_then(|c| c["body"].as_str()) .unwrap_or("") .trim(); // PR review detection (#231): Forgejo review objects carry a `state` field // with values like "APPROVED" / "REQUEST_CHANGES" / "COMMENT". Regular // issue/PR comments have no such field. Format reviews distinctly so the // agent knows the review outcome immediately without reading the body. let review_state = payload .as_ref() .and_then(|c| c["state"].as_str()) .and_then(review_state_label); let url = if comment_html_url.is_empty() { meta.html_url } else { comment_html_url }; let author = if actor_login.is_empty() { "?" } else { actor_login }; let NotifMeta { title, notif_type, num, repo, meta_suffix, .. } = meta; if let Some(review_label) = review_state { // Review submission on a PR. let kind = format!("PR {review_label}{num}{repo}"); let mut out = format!("[{kind}] {title}\nurl: {url}"); if body_text.is_empty() { write!(out, "\n\nreviewer: {author}").ok(); } else { write!(out, "\n\n{author}: {}", truncate(body_text, BODY_TRUNCATE)).ok(); } out.push_str(meta_suffix); Some(out) } else { // Regular comment. let kind = format!("comment on {}{num}{repo}", notif_type_label(notif_type)); let mut out = format!( "[{kind}] {title}\nurl: {url}\n\n{author}: {}", truncate(body_text, BODY_TRUNCATE) ); if out.ends_with('\n') { out.pop(); } out.push_str(meta_suffix); Some(out) } } /// Format a notification triggered by creation or state change of the subject. fn format_state_change_notification( notif: &serde_json::Value, meta: &NotifMeta<'_>, own_login: &str, ) -> Option { // Classification uses notif["subject"]["state"] directly — Forgejo // returns "open" / "closed" / "merged" here. We do NOT rely on // fetching the PR/issue detail for `merged`: // - `subject.url` points to the *issues* endpoint, which returns // `pull_request.merged`, not top-level `merged`. // - Forgejo API type is "Pull" / "Issue", never "Pull Request". let notif_state = notif["subject"]["state"].as_str().unwrap_or(""); let reason = notif["reason"].as_str().unwrap_or(""); // Self-notification filter (#230): skip new items we authored ourselves. // `reason == "author"` combined with open state means we just opened the // issue/PR. We do NOT filter merged/closed state changes — those are // triggered by someone else and we want them. let is_new = notif_state == "open" || notif_state.is_empty(); if is_new && reason == "author" && !own_login.is_empty() { debug!(%own_login, "forge_notify: skipping self-authored new item"); return None; } let NotifMeta { title, notif_type, html_url, num, repo, meta_suffix, subject, is_pr } = meta; let label = notif_type_label(notif_type); let kind = match notif_state { "merged" => format!("{label} merged{num}{repo}"), "closed" => format!("{label} closed{num}{repo}"), "open" | "" => format!("new {label}{num}{repo}"), other => format!("{label}{num}{repo}: {other}"), }; // Review-request detection (#253): Forgejo does not always set // reason == "review_requested" (observed as null). Check // requested_reviewers instead, which is reliable. If own_login is // in the list, override the kind. // subject and is_pr are already fetched unconditionally above (#256). let is_review_request = is_new && *is_pr && !own_login.is_empty() && subject .as_ref() .and_then(|s| s["requested_reviewers"].as_array()) .is_some_and(|arr| arr.iter().any(|r| r["login"].as_str() == Some(own_login))); let kind = if is_review_request { format!("review requested{num}{repo}") } else { kind }; let mut out = format!("[{kind}] {title}\nurl: {html_url}"); out.push_str(meta_suffix); Some(out) } #[allow(clippy::too_many_arguments)] async fn poll_once( client: &reqwest::Client, forge_url: &str, token: &str, socket: &Path, is_manager: bool, keep_subscriptions: bool, unsubbed_repos: &mut HashSet, own_login: &str, ) { let url = format!("{forge_url}/api/v1/notifications?all=false&limit=50"); let resp = match client .get(&url) .header("Authorization", format!("token {token}")) .send() .await { Ok(r) => r, Err(e) => { debug!("forge_notify: poll request failed: {e}"); return; } }; if !resp.status().is_success() { debug!("forge_notify: poll status {}", resp.status()); return; } let notifications: Vec = match resp.json().await { Ok(v) => v, Err(e) => { warn!("forge_notify: response parse error: {e}"); return; } }; if notifications.is_empty() { return; } debug!(count = notifications.len(), "forge_notify: delivering notifications"); for notif in ¬ifications { let Some(id) = notif["id"].as_u64() else { continue }; let body_opt = format_notification(client, token, notif, own_login).await; // None means self-echo — mark read silently, no delivery. let Some(body) = body_opt else { mark_read(client, forge_url, token, id).await; continue; }; let delivered = if is_manager { let req = hive_sh4re::ManagerRequest::Wake { from: "forge".to_owned(), body, }; crate::client::request::<_, hive_sh4re::ManagerResponse>(socket, &req) .await .map(|_| ()) } else { let req = hive_sh4re::AgentRequest::Wake { from: "forge".to_owned(), body, }; crate::client::request::<_, hive_sh4re::AgentResponse>(socket, &req) .await .map(|_| ()) }; match delivered { Ok(()) => { debug!(%id, "forge_notify: delivered"); } Err(e) => { warn!(%id, error = ?e, "forge_notify: deliver failed — leaving unread"); continue; } } // Mark as read only after successful delivery so a failed-delivery // notification resurfaces on the next poll tick. mark_read(client, forge_url, token, id).await; // Auto-unsubscribe from broad repo watches when the notification // reason is "subscribed" (agent watching the whole repo). Skipped // when HIVE_FORGE_KEEP_SUBSCRIPTIONS=1 — triage and other firehose // consumers set this to retain broad repo visibility. let reason = notif["reason"].as_str().unwrap_or(""); if !keep_subscriptions && reason == "subscribed" && let Some(repo) = notif["repository"]["full_name"].as_str() && !unsubbed_repos.contains(repo) { let unsub_url = format!("{forge_url}/api/v1/repos/{repo}/subscription"); match client .delete(&unsub_url) .header("Authorization", format!("token {token}")) .send() .await { Ok(r) if r.status().is_success() || r.status().as_u16() == 404 => { debug!(%repo, "forge_notify: unsubscribed from repo watch"); unsubbed_repos.insert(repo.to_owned()); } Ok(r) => { debug!(%repo, status = %r.status(), "forge_notify: unsub non-2xx (ignored)"); } Err(e) => { debug!(%repo, error = ?e, "forge_notify: unsub request failed (ignored)"); } } } } } /// Mark a notification thread as read. Best-effort — logs on failure but /// does not abort the poll loop. A notification left unread will resurface /// on the next poll tick (desirable for delivery failures; for self-echo /// silencing we call this without prior delivery). async fn mark_read(client: &reqwest::Client, forge_url: &str, token: &str, id: u64) { let mark_url = format!("{forge_url}/api/v1/notifications/threads/{id}"); match client .patch(&mark_url) .header("Authorization", format!("token {token}")) .send() .await { Err(e) => { warn!(%id, error = ?e, "forge_notify: mark-read request failed — notification will resurface"); } Ok(r) if !r.status().is_success() => { warn!(%id, status = %r.status(), "forge_notify: mark-read returned non-2xx — notification will resurface"); } Ok(_) => { debug!(%id, "forge_notify: marked read"); } } }