622 lines
24 KiB
Rust
622 lines
24 KiB
Rust
//! Background Forgejo notification poller.
|
|
//!
|
|
//! Reads `HIVE_FORGE_URL` + `{HYPERHIVE_STATE_DIR}/forge-token`, polls
|
|
//! `GET /notifications?all=false` every 30 seconds, and delivers each
|
|
//! unread notification as a broker `Wake { from: "forge" }` message so
|
|
//! claude's normal turn loop picks it up.
|
|
//!
|
|
//! Each notification is enriched with the subject body and/or latest
|
|
//! comment body so the agent sees actual content, not just a title.
|
|
//!
|
|
//! Graceful no-ops:
|
|
//! - `HIVE_FORGE_URL` not set → disabled (no forge configured)
|
|
//! - token file absent → disabled (agent has no forge account yet)
|
|
//! - HTTP errors → logged at debug, retry next tick
|
|
//!
|
|
//! After successfully delivering a notification it is marked read via
|
|
//! `PATCH /notifications/threads/{id}` so it does not re-fire. If delivery
|
|
//! fails the thread is left unread so it resurfaces next tick.
|
|
//!
|
|
//! Self-notification filtering (closes #230):
|
|
//! - New issues/PRs created by this agent (`reason == "author"` + `state == open`)
|
|
//! are silently marked read — the agent already knows it opened them.
|
|
//! - Comment notifications where the comment author matches this agent's own
|
|
//! forge login are silently marked read.
|
|
//!
|
|
//! Own login is fetched once at startup via `GET /user` and cached for the
|
|
//! lifetime of the polling loop.
|
|
//!
|
|
//! PR review formatting (closes #231):
|
|
//! - When `latest_comment_url` points to a review (the fetched JSON has a
|
|
//! `state` field like `APPROVED` / `REQUEST_CHANGES` / `COMMENT`), the
|
|
//! notification is formatted as `[PR approved #N repo]` instead of the
|
|
//! generic `[comment on PR #N repo]` so agents can action it immediately.
|
|
|
|
use std::collections::HashSet;
|
|
use std::fmt::Write as _;
|
|
use std::path::{Path, PathBuf};
|
|
use std::time::Duration;
|
|
|
|
use tracing::{debug, info, warn};
|
|
|
|
const POLL_INTERVAL_SECS: u64 = 30;
|
|
const HTTP_TIMEOUT_SECS: u64 = 10;
|
|
/// Maximum characters of a body/comment to include in the wake message.
|
|
const BODY_TRUNCATE: usize = 500;
|
|
|
|
/// Spawn point: called once from `hive-ag3nt serve` (agent) or
|
|
/// `hive-m1nd serve` (manager). Returns immediately if the forge is not
|
|
/// configured. Otherwise loops forever, polling every
|
|
/// `POLL_INTERVAL_SECS` seconds. Errors are never fatal.
|
|
///
|
|
/// `is_manager`: when true, wakes the inbox via `ManagerRequest::Wake`
|
|
/// instead of `AgentRequest::Wake` (the manager socket rejects the agent
|
|
/// request type).
|
|
pub async fn run(socket: PathBuf, is_manager: bool) {
|
|
let forge_url = match std::env::var("HIVE_FORGE_URL") {
|
|
Ok(u) if !u.is_empty() => u,
|
|
_ => {
|
|
debug!("forge_notify: HIVE_FORGE_URL not set — disabled");
|
|
return;
|
|
}
|
|
};
|
|
|
|
let state_dir = std::env::var("HYPERHIVE_STATE_DIR").unwrap_or_default();
|
|
let token_path = format!("{state_dir}/forge-token");
|
|
let token = match tokio::fs::read_to_string(&token_path).await {
|
|
Ok(t) => {
|
|
let t = t.trim().to_owned();
|
|
if t.is_empty() {
|
|
debug!("forge_notify: empty forge token at {token_path} — disabled");
|
|
return;
|
|
}
|
|
t
|
|
}
|
|
Err(e) => {
|
|
debug!("forge_notify: no forge token at {token_path} ({e}) — disabled");
|
|
return;
|
|
}
|
|
};
|
|
|
|
let client = match reqwest::Client::builder()
|
|
.timeout(Duration::from_secs(HTTP_TIMEOUT_SECS))
|
|
.build()
|
|
{
|
|
Ok(c) => c,
|
|
Err(e) => {
|
|
warn!("forge_notify: failed to build HTTP client: {e}");
|
|
return;
|
|
}
|
|
};
|
|
|
|
// Fetch own login once for self-notification filtering (closes #230).
|
|
// Falls back to empty string on failure — no filtering (safe degradation).
|
|
let own_login = {
|
|
let url = format!("{forge_url}/api/v1/user");
|
|
fetch_json(&client, &url, &token)
|
|
.await
|
|
.and_then(|v| v["login"].as_str().map(std::borrow::ToOwned::to_owned))
|
|
.unwrap_or_default()
|
|
};
|
|
if own_login.is_empty() {
|
|
warn!("forge_notify: could not resolve own login — self-notification filtering disabled");
|
|
} else {
|
|
debug!(%own_login, "forge_notify: own login resolved");
|
|
}
|
|
|
|
let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS));
|
|
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
|
|
// First tick fires immediately — skip it so we don't race the broker
|
|
// socket becoming available right at boot.
|
|
interval.tick().await;
|
|
|
|
// HIVE_FORGE_KEEP_SUBSCRIPTIONS=1 disables auto-unsubscribe for agents
|
|
// that intentionally consume the full repo notification firehose (e.g. triage).
|
|
let keep_subscriptions = std::env::var("HIVE_FORGE_KEEP_SUBSCRIPTIONS")
|
|
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
|
|
.unwrap_or(false);
|
|
|
|
// Optional reason drop-list. `HIVE_FORGE_NOTIFY_SKIP_REASONS` is a
|
|
// comma-separated list of Forgejo notification `reason` values to
|
|
// suppress (e.g. `subscribed,participating`). Notifications with
|
|
// those reasons are marked read and silently dropped; everything
|
|
// else -- including notifications with a null/unrecognised reason --
|
|
// is delivered. Drop-list is safer than an allow-list: it kills the
|
|
// firehose without risking silent misses of directed signals
|
|
// (review_requested, assigned) or future unknown reason strings.
|
|
// Configurable per-agent via `hyperhive.forge.skipNotifyReasons` in agent.nix.
|
|
let skip_reasons: Vec<String> = std::env::var("HIVE_FORGE_NOTIFY_SKIP_REASONS")
|
|
.unwrap_or_default()
|
|
.split(',')
|
|
.map(str::trim)
|
|
.filter(|s| !s.is_empty())
|
|
.map(str::to_owned)
|
|
.collect();
|
|
|
|
if skip_reasons.is_empty() {
|
|
info!(forge_url = %forge_url, "forge_notify: polling started (all reasons)");
|
|
} else {
|
|
info!(forge_url = %forge_url, skip = ?skip_reasons, "forge_notify: polling started");
|
|
}
|
|
|
|
// Repos we have already unsubscribed this process lifetime. Persists
|
|
// across polls so we don't hammer DELETE on every cycle.
|
|
let mut unsubbed_repos: HashSet<String> = HashSet::new();
|
|
|
|
loop {
|
|
interval.tick().await;
|
|
poll_once(
|
|
&client,
|
|
&forge_url,
|
|
&token,
|
|
&socket,
|
|
is_manager,
|
|
keep_subscriptions,
|
|
&mut unsubbed_repos,
|
|
&own_login,
|
|
&skip_reasons,
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
|
|
/// Fetch a JSON value from a URL using the agent's forge token. Returns
|
|
/// `None` on any HTTP or parse error (best-effort enrichment).
|
|
async fn fetch_json(
|
|
client: &reqwest::Client,
|
|
url: &str,
|
|
token: &str,
|
|
) -> Option<serde_json::Value> {
|
|
let resp = client
|
|
.get(url)
|
|
.header("Authorization", format!("token {token}"))
|
|
.send()
|
|
.await
|
|
.ok()?;
|
|
if !resp.status().is_success() {
|
|
return None;
|
|
}
|
|
resp.json().await.ok()
|
|
}
|
|
|
|
/// Map a Forgejo notification `subject.type` to a human-readable label.
|
|
/// Known values: "Pull", "Issue", "Commit", "Repository". Any unknown
|
|
/// type is passed through as-is so new Forgejo types degrade gracefully
|
|
/// rather than silently collapsing into a generic label.
|
|
fn notif_type_label(t: &str) -> &str {
|
|
match t {
|
|
"Pull" => "PR",
|
|
"Issue" => "issue",
|
|
other => other,
|
|
}
|
|
}
|
|
|
|
/// Truncate a string to `max` bytes at a char boundary, appending `…` if cut.
|
|
fn truncate(s: &str, max: usize) -> String {
|
|
if s.len() <= max {
|
|
return s.to_owned();
|
|
}
|
|
let end = s
|
|
.char_indices()
|
|
.map(|(i, _)| i)
|
|
.take_while(|&i| i <= max - 3)
|
|
.last()
|
|
.unwrap_or(0);
|
|
format!("{}…", &s[..end])
|
|
}
|
|
|
|
/// Map a Forgejo review state to a readable action label.
|
|
/// Returns `None` for non-review states (regular comments have no `state` field;
|
|
/// `PENDING` means the review was saved but not submitted yet).
|
|
/// Forgejo review states: "APPROVED", "`REQUEST_CHANGES`", "COMMENT", "PENDING".
|
|
fn review_state_label(state: &str) -> Option<&str> {
|
|
match state {
|
|
"APPROVED" => Some("approved"),
|
|
"REQUEST_CHANGES" => Some("changes requested"),
|
|
"COMMENT" => Some("review comment"),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
/// Build a human-readable wake message for one Forgejo notification.
|
|
/// Returns `None` when the notification is a self-echo (actor is `own_login`)
|
|
/// and should be silently discarded (and marked read by the caller).
|
|
///
|
|
/// Formats:
|
|
/// - Comment: `[comment on PR #N repo] title\nurl: ...\n\nauthor: body\nassignee: user\nreason: mention`
|
|
/// - Review: `[PR approved #N repo] title\nurl: ...\n\nreviewer: body\nassignee: user\nreason: review_requested`
|
|
/// - New item: `[new issue #N repo] title\nurl: ...\nassignee: user\nreason: author`
|
|
/// - State: `[PR merged #N repo] title\nurl: ...\nassignee: user\nreason: subscribed`
|
|
///
|
|
/// Assignees (and, for PRs, `requested_reviewers`) are appended unconditionally
|
|
/// on all issue/PR notifications (closes #256).
|
|
///
|
|
/// The `reason` field from the Forgejo notification is always appended (closes #110).
|
|
/// Forgejo emits one notification entry per reason for the same event, so including
|
|
/// it makes otherwise-identical messages distinguishable (e.g. `mention` vs
|
|
/// `subscribed` both arriving for the same PR comment).
|
|
///
|
|
/// Number is extracted from `html_url` last path segment before any `#`.
|
|
/// Repo slug (`owner/name`) is always included — agents may watch multiple repos.
|
|
async fn format_notification(
|
|
client: &reqwest::Client,
|
|
token: &str,
|
|
notif: &serde_json::Value,
|
|
own_login: &str,
|
|
) -> Option<String> {
|
|
let title = notif["subject"]["title"].as_str().unwrap_or("?");
|
|
let notif_type = notif["subject"]["type"].as_str().unwrap_or("?");
|
|
let html_url = notif["subject"]["html_url"]
|
|
.as_str()
|
|
.unwrap_or_else(|| notif["subject"]["url"].as_str().unwrap_or(""));
|
|
|
|
// Extract issue/PR number from the html_url. URL ends with /issues/N or
|
|
// /pulls/N (possibly followed by #anchor for comments). Best-effort.
|
|
let num = html_url
|
|
.split('#')
|
|
.next()
|
|
.and_then(|u| u.rsplit('/').next())
|
|
.and_then(|s| s.parse::<u64>().ok())
|
|
.map(|n| format!(" #{n}"))
|
|
.unwrap_or_default();
|
|
|
|
// Repo slug for multi-repo disambiguation. Falls back gracefully when absent.
|
|
let repo = notif["repository"]["full_name"]
|
|
.as_str()
|
|
.map(|r| format!(" {r}"))
|
|
.unwrap_or_default();
|
|
|
|
// API URLs for fetching content
|
|
let subject_api_url = notif["subject"]["url"].as_str().unwrap_or("");
|
|
let comment_api_url = notif["subject"]["latest_comment_url"].as_str().unwrap_or("");
|
|
let comment_html_url = notif["subject"]["latest_comment_html_url"]
|
|
.as_str()
|
|
.unwrap_or("");
|
|
|
|
// Always fetch subject detail for assignee/reviewer metadata (#256).
|
|
// Keeps agents informed of current ownership without a follow-up fetch.
|
|
let subject = if subject_api_url.is_empty() {
|
|
None
|
|
} else {
|
|
fetch_json(client, subject_api_url, token).await
|
|
};
|
|
|
|
let is_pr = matches!(notif_type, "Pull Request" | "Pull");
|
|
let reason = notif["reason"].as_str().unwrap_or("");
|
|
let meta_suffix = build_meta_suffix(subject.as_ref(), is_pr, reason);
|
|
|
|
// Determine whether this notification was triggered by a comment/review or
|
|
// by creation/state-change of the subject itself.
|
|
let has_comment = !comment_api_url.is_empty() && comment_api_url != subject_api_url;
|
|
|
|
let meta = NotifMeta { title, notif_type, html_url, num, repo, meta_suffix, reason, subject, is_pr };
|
|
if has_comment {
|
|
format_comment_notification(client, token, &meta, comment_api_url, comment_html_url, own_login).await
|
|
} else {
|
|
format_state_change_notification(notif, &meta, own_login)
|
|
}
|
|
}
|
|
|
|
/// Shared notification metadata extracted from the raw Forgejo JSON.
|
|
struct NotifMeta<'a> {
|
|
title: &'a str,
|
|
notif_type: &'a str,
|
|
html_url: &'a str,
|
|
num: String,
|
|
repo: String,
|
|
meta_suffix: String,
|
|
/// Forgejo `reason` value (e.g. "mention", "assigned", "subscribed").
|
|
/// Appended to every formatted message so that multiple notifications for
|
|
/// the same event (each with a different reason) are distinguishable (closes #110).
|
|
reason: &'a str,
|
|
/// Fetched subject detail (issue/PR JSON); used for review-request detection.
|
|
subject: Option<serde_json::Value>,
|
|
is_pr: bool,
|
|
}
|
|
|
|
/// Build the `\nassignee: ...` (and optionally `\nreviewer: ...` and `\nreason: ...`) suffix
|
|
/// appended to all notification kinds.
|
|
fn build_meta_suffix(subject: Option<&serde_json::Value>, is_pr: bool, reason: &str) -> String {
|
|
let assignees: Vec<&str> = subject
|
|
.and_then(|s| s["assignees"].as_array())
|
|
.map(|arr| arr.iter().filter_map(|a| a["login"].as_str()).collect())
|
|
.unwrap_or_default();
|
|
let assignee_line = if assignees.is_empty() {
|
|
"assignee: unassigned".to_owned()
|
|
} else {
|
|
format!("assignee: {}", assignees.join(", "))
|
|
};
|
|
// For PRs, include requested_reviewers when present.
|
|
let reviewer_line = if is_pr {
|
|
let reviewers: Vec<&str> = subject
|
|
.and_then(|s| s["requested_reviewers"].as_array())
|
|
.map(|arr| arr.iter().filter_map(|r| r["login"].as_str()).collect())
|
|
.unwrap_or_default();
|
|
if reviewers.is_empty() { None } else { Some(format!("reviewer: {}", reviewers.join(", "))) }
|
|
} else {
|
|
None
|
|
};
|
|
// Always include reason so multiple notifications for the same event
|
|
// (each with a different Forgejo reason) are distinguishable (closes #110).
|
|
let reason_line = if reason.is_empty() { None } else { Some(format!("reason: {reason}")) };
|
|
let mut out = format!("\n{assignee_line}");
|
|
if let Some(r) = reviewer_line { write!(out, "\n{r}").ok(); }
|
|
if let Some(r) = reason_line { write!(out, "\n{r}").ok(); }
|
|
out
|
|
}
|
|
|
|
/// Format a notification triggered by a new comment or review submission.
|
|
async fn format_comment_notification(
|
|
client: &reqwest::Client,
|
|
token: &str,
|
|
meta: &NotifMeta<'_>,
|
|
comment_api_url: &str,
|
|
comment_html_url: &str,
|
|
own_login: &str,
|
|
) -> Option<String> {
|
|
let payload = fetch_json(client, comment_api_url, token).await;
|
|
|
|
let actor_login = payload
|
|
.as_ref()
|
|
.and_then(|c| c["user"]["login"].as_str())
|
|
.unwrap_or("");
|
|
|
|
// Self-notification filter (#230): skip if we authored the comment/review.
|
|
if !own_login.is_empty() && actor_login == own_login {
|
|
debug!(%own_login, "forge_notify: skipping self-authored comment/review");
|
|
return None;
|
|
}
|
|
|
|
let body_text = payload
|
|
.as_ref()
|
|
.and_then(|c| c["body"].as_str())
|
|
.unwrap_or("")
|
|
.trim();
|
|
|
|
// PR review detection (#231): Forgejo review objects carry a `state` field
|
|
// with values like "APPROVED" / "REQUEST_CHANGES" / "COMMENT". Regular
|
|
// issue/PR comments have no such field. Format reviews distinctly so the
|
|
// agent knows the review outcome immediately without reading the body.
|
|
let review_state = payload
|
|
.as_ref()
|
|
.and_then(|c| c["state"].as_str())
|
|
.and_then(review_state_label);
|
|
|
|
let url = if comment_html_url.is_empty() { meta.html_url } else { comment_html_url };
|
|
let author = if actor_login.is_empty() { "?" } else { actor_login };
|
|
let NotifMeta { title, notif_type, num, repo, meta_suffix, .. } = meta;
|
|
|
|
if let Some(review_label) = review_state {
|
|
// Review submission on a PR.
|
|
let kind = format!("PR {review_label}{num}{repo}");
|
|
let mut out = format!("[{kind}] {title}\nurl: {url}");
|
|
if body_text.is_empty() {
|
|
write!(out, "\n\nreviewer: {author}").ok();
|
|
} else {
|
|
write!(out, "\n\n{author}: {}", truncate(body_text, BODY_TRUNCATE)).ok();
|
|
}
|
|
out.push_str(meta_suffix);
|
|
Some(out)
|
|
} else {
|
|
// Regular comment.
|
|
let kind = format!("comment on {}{num}{repo}", notif_type_label(notif_type));
|
|
let mut out = format!(
|
|
"[{kind}] {title}\nurl: {url}\n\n{author}: {}",
|
|
truncate(body_text, BODY_TRUNCATE)
|
|
);
|
|
if out.ends_with('\n') {
|
|
out.pop();
|
|
}
|
|
out.push_str(meta_suffix);
|
|
Some(out)
|
|
}
|
|
}
|
|
|
|
/// Format a notification triggered by creation or state change of the subject.
|
|
fn format_state_change_notification(
|
|
notif: &serde_json::Value,
|
|
meta: &NotifMeta<'_>,
|
|
own_login: &str,
|
|
) -> Option<String> {
|
|
// Classification uses notif["subject"]["state"] directly — Forgejo
|
|
// returns "open" / "closed" / "merged" here. We do NOT rely on
|
|
// fetching the PR/issue detail for `merged`:
|
|
// - `subject.url` points to the *issues* endpoint, which returns
|
|
// `pull_request.merged`, not top-level `merged`.
|
|
// - Forgejo API type is "Pull" / "Issue", never "Pull Request".
|
|
let notif_state = notif["subject"]["state"].as_str().unwrap_or("");
|
|
|
|
// Self-notification filter (#230): skip new items we authored ourselves.
|
|
// `reason == "author"` combined with open state means we just opened the
|
|
// issue/PR. We do NOT filter merged/closed state changes — those are
|
|
// triggered by someone else and we want them.
|
|
let is_new = notif_state == "open" || notif_state.is_empty();
|
|
if is_new && meta.reason == "author" && !own_login.is_empty() {
|
|
debug!(%own_login, "forge_notify: skipping self-authored new item");
|
|
return None;
|
|
}
|
|
|
|
let NotifMeta { title, notif_type, html_url, num, repo, meta_suffix, reason: _, subject, is_pr } = meta;
|
|
let label = notif_type_label(notif_type);
|
|
let kind = match notif_state {
|
|
"merged" => format!("{label} merged{num}{repo}"),
|
|
"closed" => format!("{label} closed{num}{repo}"),
|
|
"open" | "" => format!("new {label}{num}{repo}"),
|
|
other => format!("{label}{num}{repo}: {other}"),
|
|
};
|
|
|
|
// Review-request detection (#253): Forgejo does not always set
|
|
// reason == "review_requested" (observed as null). Check
|
|
// requested_reviewers instead, which is reliable. If own_login is
|
|
// in the list, override the kind.
|
|
// subject and is_pr are already fetched unconditionally above (#256).
|
|
let is_review_request = is_new
|
|
&& *is_pr
|
|
&& !own_login.is_empty()
|
|
&& subject
|
|
.as_ref()
|
|
.and_then(|s| s["requested_reviewers"].as_array())
|
|
.is_some_and(|arr| arr.iter().any(|r| r["login"].as_str() == Some(own_login)));
|
|
let kind = if is_review_request {
|
|
format!("review requested{num}{repo}")
|
|
} else {
|
|
kind
|
|
};
|
|
|
|
let mut out = format!("[{kind}] {title}\nurl: {html_url}");
|
|
out.push_str(meta_suffix);
|
|
Some(out)
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
async fn poll_once(
|
|
client: &reqwest::Client,
|
|
forge_url: &str,
|
|
token: &str,
|
|
socket: &Path,
|
|
is_manager: bool,
|
|
keep_subscriptions: bool,
|
|
unsubbed_repos: &mut HashSet<String>,
|
|
own_login: &str,
|
|
skip_reasons: &[String],
|
|
) {
|
|
let url = format!("{forge_url}/api/v1/notifications?all=false&limit=50");
|
|
let resp = match client
|
|
.get(&url)
|
|
.header("Authorization", format!("token {token}"))
|
|
.send()
|
|
.await
|
|
{
|
|
Ok(r) => r,
|
|
Err(e) => {
|
|
debug!("forge_notify: poll request failed: {e}");
|
|
return;
|
|
}
|
|
};
|
|
|
|
if !resp.status().is_success() {
|
|
debug!("forge_notify: poll status {}", resp.status());
|
|
return;
|
|
}
|
|
|
|
let notifications: Vec<serde_json::Value> = match resp.json().await {
|
|
Ok(v) => v,
|
|
Err(e) => {
|
|
warn!("forge_notify: response parse error: {e}");
|
|
return;
|
|
}
|
|
};
|
|
|
|
if notifications.is_empty() {
|
|
return;
|
|
}
|
|
|
|
debug!(count = notifications.len(), "forge_notify: delivering notifications");
|
|
|
|
for notif in ¬ifications {
|
|
let Some(id) = notif["id"].as_u64() else { continue };
|
|
|
|
// Reason drop-list: suppress noisy reasons (subscribed/participating).
|
|
// null/unknown reasons pass through — directed signals are never
|
|
// silently dropped even if Forgejo returns an unexpected value.
|
|
if !skip_reasons.is_empty() {
|
|
let reason = notif["reason"].as_str().unwrap_or("");
|
|
if !reason.is_empty() && skip_reasons.iter().any(|r| r == reason) {
|
|
debug!(%id, %reason, "forge_notify: skipping (reason in drop-list)");
|
|
mark_read(client, forge_url, token, id).await;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
let body_opt = format_notification(client, token, notif, own_login).await;
|
|
|
|
// None means self-echo — mark read silently, no delivery.
|
|
let Some(body) = body_opt else {
|
|
mark_read(client, forge_url, token, id).await;
|
|
continue;
|
|
};
|
|
|
|
let delivered = if is_manager {
|
|
let req = hive_sh4re::ManagerRequest::Wake {
|
|
from: "forge".to_owned(),
|
|
body,
|
|
};
|
|
crate::client::request::<_, hive_sh4re::ManagerResponse>(socket, &req)
|
|
.await
|
|
.map(|_| ())
|
|
} else {
|
|
let req = hive_sh4re::AgentRequest::Wake {
|
|
from: "forge".to_owned(),
|
|
body,
|
|
};
|
|
crate::client::request::<_, hive_sh4re::AgentResponse>(socket, &req)
|
|
.await
|
|
.map(|_| ())
|
|
};
|
|
match delivered {
|
|
Ok(()) => {
|
|
debug!(%id, "forge_notify: delivered");
|
|
}
|
|
Err(e) => {
|
|
warn!(%id, error = ?e, "forge_notify: deliver failed — leaving unread");
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Mark as read only after successful delivery so a failed-delivery
|
|
// notification resurfaces on the next poll tick.
|
|
mark_read(client, forge_url, token, id).await;
|
|
|
|
// Auto-unsubscribe from broad repo watches when the notification
|
|
// reason is "subscribed" (agent watching the whole repo). Skipped
|
|
// when HIVE_FORGE_KEEP_SUBSCRIPTIONS=1 — triage and other firehose
|
|
// consumers set this to retain broad repo visibility.
|
|
let reason = notif["reason"].as_str().unwrap_or("");
|
|
if !keep_subscriptions && reason == "subscribed"
|
|
&& let Some(repo) = notif["repository"]["full_name"].as_str()
|
|
&& !unsubbed_repos.contains(repo) {
|
|
let unsub_url = format!("{forge_url}/api/v1/repos/{repo}/subscription");
|
|
match client
|
|
.delete(&unsub_url)
|
|
.header("Authorization", format!("token {token}"))
|
|
.send()
|
|
.await
|
|
{
|
|
Ok(r) if r.status().is_success() || r.status().as_u16() == 404 => {
|
|
debug!(%repo, "forge_notify: unsubscribed from repo watch");
|
|
unsubbed_repos.insert(repo.to_owned());
|
|
}
|
|
Ok(r) => {
|
|
debug!(%repo, status = %r.status(), "forge_notify: unsub non-2xx (ignored)");
|
|
}
|
|
Err(e) => {
|
|
debug!(%repo, error = ?e, "forge_notify: unsub request failed (ignored)");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Mark a notification thread as read. Best-effort — logs on failure but
|
|
/// does not abort the poll loop. A notification left unread will resurface
|
|
/// on the next poll tick (desirable for delivery failures; for self-echo
|
|
/// silencing we call this without prior delivery).
|
|
async fn mark_read(client: &reqwest::Client, forge_url: &str, token: &str, id: u64) {
|
|
let mark_url = format!("{forge_url}/api/v1/notifications/threads/{id}");
|
|
match client
|
|
.patch(&mark_url)
|
|
.header("Authorization", format!("token {token}"))
|
|
.send()
|
|
.await
|
|
{
|
|
Err(e) => {
|
|
warn!(%id, error = ?e, "forge_notify: mark-read request failed — notification will resurface");
|
|
}
|
|
Ok(r) if !r.status().is_success() => {
|
|
warn!(%id, status = %r.status(), "forge_notify: mark-read returned non-2xx — notification will resurface");
|
|
}
|
|
Ok(_) => {
|
|
debug!(%id, "forge_notify: marked read");
|
|
}
|
|
}
|
|
}
|