diff --git a/hive-ag3nt/src/bin/hive-ag3nt.rs b/hive-ag3nt/src/bin/hive-ag3nt.rs index 5073497..1254242 100644 --- a/hive-ag3nt/src/bin/hive-ag3nt.rs +++ b/hive-ag3nt/src/bin/hive-ag3nt.rs @@ -378,6 +378,7 @@ async fn notify_manager_of_failure(socket: &Path, label: &str, err: &anyhow::Err &AgentRequest::Send { to: "manager".into(), body, + in_reply_to: None, }, ) .await; diff --git a/hive-ag3nt/src/mcp.rs b/hive-ag3nt/src/mcp.rs index bced8cc..e06fa84 100644 --- a/hive-ag3nt/src/mcp.rs +++ b/hive-ag3nt/src/mcp.rs @@ -314,6 +314,12 @@ pub struct SendArgs { pub to: String, /// Message body. Plain text; the broker doesn't parse it. pub body: String, + /// Optional broker row-id of the message this is a reply to. Lets + /// the dashboard render conversation threads. Pass the `id` from the + /// `DeliveredMessage` you're responding to; omit for new threads. + /// Silently ignored if the id is unknown or out of retention. + #[serde(default)] + pub in_reply_to: Option, } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] @@ -410,6 +416,7 @@ impl AgentServer { .dispatch(hive_sh4re::AgentRequest::Send { to: args.to, body: args.body, + in_reply_to: args.in_reply_to, }) .await; annotate_retries(format_ack(resp, "send", format!("sent to {to}")), retries) @@ -802,6 +809,7 @@ impl ManagerServer { .dispatch(hive_sh4re::ManagerRequest::Send { to: args.to, body: args.body, + in_reply_to: args.in_reply_to, }) .await; annotate_retries(format_ack(resp, "send", format!("sent to {to}")), retries) diff --git a/hive-ag3nt/src/plugins.rs b/hive-ag3nt/src/plugins.rs index 0fb0b98..a75246f 100644 --- a/hive-ag3nt/src/plugins.rs +++ b/hive-ag3nt/src/plugins.rs @@ -176,6 +176,7 @@ async fn notify(socket: &Path, to: &str, body: String) { let req = hive_sh4re::AgentRequest::Send { to: to.to_owned(), body, + in_reply_to: None, }; if let Err(e) = client::request::<_, hive_sh4re::AgentResponse>(socket, &req).await { tracing::warn!(error = ?e, "failed to notify {to} of plugin install failure"); diff --git a/hive-c0re/src/agent_server.rs b/hive-c0re/src/agent_server.rs index d7e94eb..87daea2 100644 --- a/hive-c0re/src/agent_server.rs +++ b/hive-c0re/src/agent_server.rs @@ -107,7 +107,9 @@ fn recv_timeout(wait_seconds: Option) -> std::time::Duration { async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> AgentResponse { let broker = &coord.broker; match req { - AgentRequest::Send { to, body } => handle_send(coord, agent, to, body), + AgentRequest::Send { to, body, in_reply_to } => { + handle_send(coord, agent, to, body, *in_reply_to) + } AgentRequest::Recv { wait_seconds, max } => { let cap = max.unwrap_or(1).min(RECV_BATCH_MAX) as usize; match broker @@ -122,6 +124,7 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> body: d.message.body, id: d.id, redelivered: d.redelivered, + in_reply_to: d.message.in_reply_to, }) .collect(), }, @@ -140,6 +143,7 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> from: hive_sh4re::OPERATOR_RECIPIENT.to_owned(), to: agent.to_owned(), body: body.clone(), + in_reply_to: None, }) { Ok(()) => AgentResponse::Ok, Err(e) => AgentResponse::Err { @@ -150,6 +154,7 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> from: from.clone(), to: agent.to_owned(), body: body.clone(), + in_reply_to: None, }) { Ok(()) => AgentResponse::Ok, Err(e) => AgentResponse::Err { @@ -252,7 +257,13 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc) -> /// through their respective broker calls. Pulled out of `dispatch` /// to keep that function under the clippy too-many-lines limit; the /// behaviour is identical to inlining. -fn handle_send(coord: &Arc, agent: &str, to: &str, body: &str) -> AgentResponse { +fn handle_send( + coord: &Arc, + agent: &str, + to: &str, + body: &str, + in_reply_to: Option, +) -> AgentResponse { if let Err(message) = crate::limits::check_size("send", body) { return AgentResponse::Err { message }; } @@ -270,6 +281,7 @@ fn handle_send(coord: &Arc, agent: &str, to: &str, body: &str) -> A from: agent.to_owned(), to: to.to_owned(), body: body.to_owned(), + in_reply_to, }) { Ok(()) => AgentResponse::Ok, Err(e) => AgentResponse::Err { diff --git a/hive-c0re/src/broker.rs b/hive-c0re/src/broker.rs index 185a3c2..54c4002 100644 --- a/hive-c0re/src/broker.rs +++ b/hive-c0re/src/broker.rs @@ -19,7 +19,8 @@ CREATE TABLE IF NOT EXISTS messages ( recipient TEXT NOT NULL, body TEXT NOT NULL, sent_at INTEGER NOT NULL, - delivered_at INTEGER + delivered_at INTEGER, + in_reply_to INTEGER ); CREATE INDEX IF NOT EXISTS idx_messages_undelivered ON messages (recipient, id) WHERE delivered_at IS NULL; @@ -167,8 +168,8 @@ impl Broker { pub fn send(&self, message: &Message) -> Result<()> { let conn = self.conn.lock().unwrap(); conn.execute( - "INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)", - params![message.from, message.to, message.body, now_unix()], + "INSERT INTO messages (sender, recipient, body, sent_at, in_reply_to) VALUES (?1, ?2, ?3, ?4, ?5)", + params![message.from, message.to, message.body, now_unix(), message.in_reply_to], )?; drop(conn); let _ = self.events.send(MessageEvent::Sent { @@ -187,7 +188,7 @@ impl Broker { let conn = self.conn.lock().unwrap(); let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX); let mut stmt = conn.prepare( - "SELECT id, sender, body, sent_at + "SELECT id, sender, body, sent_at, in_reply_to FROM messages WHERE recipient = ?1 ORDER BY id DESC @@ -199,6 +200,7 @@ impl Broker { from: row.get(1)?, body: row.get(2)?, at: row.get(3)?, + in_reply_to: row.get(4)?, }) })?; rows.collect::>>() @@ -223,6 +225,10 @@ impl Broker { ORDER BY id DESC LIMIT ?1", )?; + // `recent_all` powers dashboard backfill; in_reply_to is not + // carried through MessageEvent::Sent (no field for it) — that + // is fine for now; the dashboard thread-rendering will use + // /api/state InboxRow data which does carry the field. let rows = stmt.query_map(params![limit_i], |row| { Ok(MessageEvent::Sent { from: row.get(0)?, @@ -348,15 +354,15 @@ impl Broker { let conn = self.conn.lock().unwrap(); let max_i = i64::try_from(max).unwrap_or(i64::MAX); let mut stmt = conn.prepare( - "SELECT id, sender, recipient, body + "SELECT id, sender, recipient, body, in_reply_to FROM messages WHERE recipient = ?1 AND delivered_at IS NULL ORDER BY id ASC LIMIT ?2", )?; - let rows: Vec<(i64, String, String, String)> = stmt + let rows: Vec<(i64, String, String, String, Option)> = stmt .query_map(params![recipient, max_i], |row| { - Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)) + Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)) })? .collect::>()?; drop(stmt); @@ -366,7 +372,7 @@ impl Broker { // Stamp all popped rows in a single UPDATE — under the broker // mutex, well within sqlite's 999-param default. let now = now_unix(); - let ids: Vec = rows.iter().map(|(id, _, _, _)| *id).collect(); + let ids: Vec = rows.iter().map(|(id, _, _, _, _)| *id).collect(); let placeholders = std::iter::repeat_n("?", ids.len()) .collect::>() .join(","); @@ -382,13 +388,18 @@ impl Broker { // `requeued_ids` lookup runs once per pop, same as `recv`. let slot = inflight.entry(recipient.to_owned()).or_default(); let mut deliveries = Vec::with_capacity(rows.len()); - for (id, from, to, body) in rows { + for (id, from, to, body, in_reply_to) in rows { slot.unacked_ids.push(id); let redelivered = slot.requeued_ids.remove(&id); deliveries.push(Delivery { id, redelivered, - message: Message { from, to, body }, + message: Message { + from, + to, + body, + in_reply_to, + }, }); } drop(inflight); @@ -735,10 +746,10 @@ impl Broker { /// pre-migration sessions count as "fully handled" and won't be /// resurfaced by the first `requeue_inflight` after upgrade. fn ensure_message_columns(conn: &Connection) -> Result<()> { - let has: bool = conn + let has_acked: bool = conn .prepare("SELECT 1 FROM pragma_table_info('messages') WHERE name = 'acked_at'")? .exists([])?; - if !has { + if !has_acked { conn.execute_batch("ALTER TABLE messages ADD COLUMN acked_at INTEGER;") .context("add messages.acked_at column")?; // Backfill: treat every existing delivered row as acked. The @@ -751,6 +762,15 @@ fn ensure_message_columns(conn: &Connection) -> Result<()> { ) .context("backfill messages.acked_at from delivered_at")?; } + let has_reply: bool = conn + .prepare("SELECT 1 FROM pragma_table_info('messages') WHERE name = 'in_reply_to'")? + .exists([])?; + if !has_reply { + conn.execute_batch("ALTER TABLE messages ADD COLUMN in_reply_to INTEGER;") + .context("add messages.in_reply_to column")?; + // No backfill needed — existing messages simply have NULL here, + // meaning "root of a new thread", which is correct. + } Ok(()) } @@ -826,6 +846,7 @@ mod tests { from: from.to_owned(), to: to.to_owned(), body: body.to_owned(), + in_reply_to: None, } } diff --git a/hive-c0re/src/coordinator.rs b/hive-c0re/src/coordinator.rs index 11adf9d..881ce04 100644 --- a/hive-c0re/src/coordinator.rs +++ b/hive-c0re/src/coordinator.rs @@ -480,6 +480,7 @@ impl Coordinator { from: hive_sh4re::SYSTEM_SENDER.to_owned(), to: name.to_owned(), body, + in_reply_to: None, }) { tracing::warn!(error = ?e, %name, "kick_agent: broker.send failed"); } @@ -510,6 +511,7 @@ impl Coordinator { from: hive_sh4re::SYSTEM_SENDER.to_owned(), to: agent.to_owned(), body, + in_reply_to: None, }) { tracing::warn!(error = ?e, target = %agent, "failed to push helper event"); } @@ -529,6 +531,7 @@ impl Coordinator { from: from.to_owned(), to: agent_name.clone(), body: broadcast_body.clone(), + in_reply_to: None, }) { errors.push(format!("{agent_name}: {e}")); } diff --git a/hive-c0re/src/dashboard.rs b/hive-c0re/src/dashboard.rs index 425e0e5..f3ad75a 100644 --- a/hive-c0re/src/dashboard.rs +++ b/hive-c0re/src/dashboard.rs @@ -1496,6 +1496,7 @@ async fn post_op_send(State(state): State, Form(form): Form) -> std::time::Duration { #[allow(clippy::too_many_lines)] async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResponse { match req { - ManagerRequest::Send { to, body } => { + ManagerRequest::Send { to, body, in_reply_to } => { if let Err(message) = crate::limits::check_size("send", body) { return ManagerResponse::Err { message }; } @@ -108,6 +108,7 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResp from: MANAGER_AGENT.to_owned(), to: to.clone(), body: body.clone(), + in_reply_to: *in_reply_to, }) { Ok(()) => ManagerResponse::Ok, Err(e) => ManagerResponse::Err { @@ -120,6 +121,7 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResp from: hive_sh4re::OPERATOR_RECIPIENT.to_owned(), to: MANAGER_AGENT.to_owned(), body: body.clone(), + in_reply_to: None, }) { Ok(()) => ManagerResponse::Ok, Err(e) => ManagerResponse::Err { @@ -153,6 +155,7 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc) -> ManagerResp body: d.message.body, id: d.id, redelivered: d.redelivered, + in_reply_to: d.message.in_reply_to, }) .collect(), }, diff --git a/hive-sh4re/src/lib.rs b/hive-sh4re/src/lib.rs index a323636..8992f74 100644 --- a/hive-sh4re/src/lib.rs +++ b/hive-sh4re/src/lib.rs @@ -170,6 +170,12 @@ pub struct Message { pub from: String, pub to: String, pub body: String, + /// Optional broker row-id of the message this is a reply to. + /// Stored in the DB and echoed back on `Recv` so the dashboard can + /// render conversation threads. `None` for messages that start a + /// new thread. Ignored if the referenced id is unknown or out of + /// retention — purely advisory. + pub in_reply_to: Option, } /// One row of a broker inbox query — what the dashboard renders in @@ -183,6 +189,9 @@ pub struct InboxRow { pub from: String, pub body: String, pub at: i64, + /// Row-id of the message this is a reply to, if any. + #[serde(skip_serializing_if = "Option::is_none")] + pub in_reply_to: Option, } /// One delivered message in a `Recv` response. The unified @@ -207,6 +216,9 @@ pub struct DeliveredMessage { /// sees the warning per-message in the batch. #[serde(default)] pub redelivered: bool, + /// Row-id of the message this is a reply to, if any. + #[serde(skip_serializing_if = "Option::is_none")] + pub in_reply_to: Option, } /// Reminder timing: either relative (wait N seconds) or absolute (at unix @@ -298,7 +310,15 @@ pub enum CancelLooseEndKind { #[serde(tag = "cmd", rename_all = "snake_case")] pub enum AgentRequest { /// Send a message to another agent. - Send { to: String, body: String }, + Send { + to: String, + body: String, + /// Optional id of the message being replied to. Stored in the + /// broker DB and returned on `Recv` so the dashboard can render + /// threads. Ignored if the id is unknown or out of retention. + #[serde(default, skip_serializing_if = "Option::is_none")] + in_reply_to: Option, + }, /// Pop pending messages from this agent's inbox. Always returns /// a list (`Messages { messages }`) — empty when nothing's /// pending. `max` caps the batch size (default 1 = single-message @@ -624,6 +644,10 @@ pub enum ManagerRequest { Send { to: String, body: String, + /// Optional id of the message being replied to. Mirror of + /// `AgentRequest::Send.in_reply_to`; see that doc. + #[serde(default, skip_serializing_if = "Option::is_none")] + in_reply_to: Option, }, /// Same shape as `AgentRequest::Recv` — caller-tunable /// `wait_seconds` (capped at 60s server-side, default 30s when