add optional in_reply_to field on send for conversation threading

This commit is contained in:
damocles 2026-05-20 13:01:02 +02:00
parent 03db764101
commit 67b47872e0
9 changed files with 90 additions and 16 deletions

View file

@ -107,7 +107,9 @@ fn recv_timeout(wait_seconds: Option<u64>) -> std::time::Duration {
async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) -> AgentResponse {
let broker = &coord.broker;
match req {
AgentRequest::Send { to, body } => handle_send(coord, agent, to, body),
AgentRequest::Send { to, body, in_reply_to } => {
handle_send(coord, agent, to, body, *in_reply_to)
}
AgentRequest::Recv { wait_seconds, max } => {
let cap = max.unwrap_or(1).min(RECV_BATCH_MAX) as usize;
match broker
@ -122,6 +124,7 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
body: d.message.body,
id: d.id,
redelivered: d.redelivered,
in_reply_to: d.message.in_reply_to,
})
.collect(),
},
@ -140,6 +143,7 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
from: hive_sh4re::OPERATOR_RECIPIENT.to_owned(),
to: agent.to_owned(),
body: body.clone(),
in_reply_to: None,
}) {
Ok(()) => AgentResponse::Ok,
Err(e) => AgentResponse::Err {
@ -150,6 +154,7 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
from: from.clone(),
to: agent.to_owned(),
body: body.clone(),
in_reply_to: None,
}) {
Ok(()) => AgentResponse::Ok,
Err(e) => AgentResponse::Err {
@ -252,7 +257,13 @@ async fn dispatch(req: &AgentRequest, agent: &str, coord: &Arc<Coordinator>) ->
/// through their respective broker calls. Pulled out of `dispatch`
/// to keep that function under the clippy too-many-lines limit; the
/// behaviour is identical to inlining.
fn handle_send(coord: &Arc<Coordinator>, agent: &str, to: &str, body: &str) -> AgentResponse {
fn handle_send(
coord: &Arc<Coordinator>,
agent: &str,
to: &str,
body: &str,
in_reply_to: Option<i64>,
) -> AgentResponse {
if let Err(message) = crate::limits::check_size("send", body) {
return AgentResponse::Err { message };
}
@ -270,6 +281,7 @@ fn handle_send(coord: &Arc<Coordinator>, agent: &str, to: &str, body: &str) -> A
from: agent.to_owned(),
to: to.to_owned(),
body: body.to_owned(),
in_reply_to,
}) {
Ok(()) => AgentResponse::Ok,
Err(e) => AgentResponse::Err {

View file

@ -19,7 +19,8 @@ CREATE TABLE IF NOT EXISTS messages (
recipient TEXT NOT NULL,
body TEXT NOT NULL,
sent_at INTEGER NOT NULL,
delivered_at INTEGER
delivered_at INTEGER,
in_reply_to INTEGER
);
CREATE INDEX IF NOT EXISTS idx_messages_undelivered
ON messages (recipient, id) WHERE delivered_at IS NULL;
@ -167,8 +168,8 @@ impl Broker {
pub fn send(&self, message: &Message) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO messages (sender, recipient, body, sent_at) VALUES (?1, ?2, ?3, ?4)",
params![message.from, message.to, message.body, now_unix()],
"INSERT INTO messages (sender, recipient, body, sent_at, in_reply_to) VALUES (?1, ?2, ?3, ?4, ?5)",
params![message.from, message.to, message.body, now_unix(), message.in_reply_to],
)?;
drop(conn);
let _ = self.events.send(MessageEvent::Sent {
@ -187,7 +188,7 @@ impl Broker {
let conn = self.conn.lock().unwrap();
let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX);
let mut stmt = conn.prepare(
"SELECT id, sender, body, sent_at
"SELECT id, sender, body, sent_at, in_reply_to
FROM messages
WHERE recipient = ?1
ORDER BY id DESC
@ -199,6 +200,7 @@ impl Broker {
from: row.get(1)?,
body: row.get(2)?,
at: row.get(3)?,
in_reply_to: row.get(4)?,
})
})?;
rows.collect::<rusqlite::Result<Vec<_>>>()
@ -223,6 +225,10 @@ impl Broker {
ORDER BY id DESC
LIMIT ?1",
)?;
// `recent_all` powers dashboard backfill; in_reply_to is not
// carried through MessageEvent::Sent (no field for it) — that
// is fine for now; the dashboard thread-rendering will use
// /api/state InboxRow data which does carry the field.
let rows = stmt.query_map(params![limit_i], |row| {
Ok(MessageEvent::Sent {
from: row.get(0)?,
@ -348,15 +354,15 @@ impl Broker {
let conn = self.conn.lock().unwrap();
let max_i = i64::try_from(max).unwrap_or(i64::MAX);
let mut stmt = conn.prepare(
"SELECT id, sender, recipient, body
"SELECT id, sender, recipient, body, in_reply_to
FROM messages
WHERE recipient = ?1 AND delivered_at IS NULL
ORDER BY id ASC
LIMIT ?2",
)?;
let rows: Vec<(i64, String, String, String)> = stmt
let rows: Vec<(i64, String, String, String, Option<i64>)> = stmt
.query_map(params![recipient, max_i], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?))
})?
.collect::<rusqlite::Result<_>>()?;
drop(stmt);
@ -366,7 +372,7 @@ impl Broker {
// Stamp all popped rows in a single UPDATE — under the broker
// mutex, well within sqlite's 999-param default.
let now = now_unix();
let ids: Vec<i64> = rows.iter().map(|(id, _, _, _)| *id).collect();
let ids: Vec<i64> = rows.iter().map(|(id, _, _, _, _)| *id).collect();
let placeholders = std::iter::repeat_n("?", ids.len())
.collect::<Vec<_>>()
.join(",");
@ -382,13 +388,18 @@ impl Broker {
// `requeued_ids` lookup runs once per pop, same as `recv`.
let slot = inflight.entry(recipient.to_owned()).or_default();
let mut deliveries = Vec::with_capacity(rows.len());
for (id, from, to, body) in rows {
for (id, from, to, body, in_reply_to) in rows {
slot.unacked_ids.push(id);
let redelivered = slot.requeued_ids.remove(&id);
deliveries.push(Delivery {
id,
redelivered,
message: Message { from, to, body },
message: Message {
from,
to,
body,
in_reply_to,
},
});
}
drop(inflight);
@ -735,10 +746,10 @@ impl Broker {
/// pre-migration sessions count as "fully handled" and won't be
/// resurfaced by the first `requeue_inflight` after upgrade.
fn ensure_message_columns(conn: &Connection) -> Result<()> {
let has: bool = conn
let has_acked: bool = conn
.prepare("SELECT 1 FROM pragma_table_info('messages') WHERE name = 'acked_at'")?
.exists([])?;
if !has {
if !has_acked {
conn.execute_batch("ALTER TABLE messages ADD COLUMN acked_at INTEGER;")
.context("add messages.acked_at column")?;
// Backfill: treat every existing delivered row as acked. The
@ -751,6 +762,15 @@ fn ensure_message_columns(conn: &Connection) -> Result<()> {
)
.context("backfill messages.acked_at from delivered_at")?;
}
let has_reply: bool = conn
.prepare("SELECT 1 FROM pragma_table_info('messages') WHERE name = 'in_reply_to'")?
.exists([])?;
if !has_reply {
conn.execute_batch("ALTER TABLE messages ADD COLUMN in_reply_to INTEGER;")
.context("add messages.in_reply_to column")?;
// No backfill needed — existing messages simply have NULL here,
// meaning "root of a new thread", which is correct.
}
Ok(())
}
@ -826,6 +846,7 @@ mod tests {
from: from.to_owned(),
to: to.to_owned(),
body: body.to_owned(),
in_reply_to: None,
}
}

View file

@ -480,6 +480,7 @@ impl Coordinator {
from: hive_sh4re::SYSTEM_SENDER.to_owned(),
to: name.to_owned(),
body,
in_reply_to: None,
}) {
tracing::warn!(error = ?e, %name, "kick_agent: broker.send failed");
}
@ -510,6 +511,7 @@ impl Coordinator {
from: hive_sh4re::SYSTEM_SENDER.to_owned(),
to: agent.to_owned(),
body,
in_reply_to: None,
}) {
tracing::warn!(error = ?e, target = %agent, "failed to push helper event");
}
@ -529,6 +531,7 @@ impl Coordinator {
from: from.to_owned(),
to: agent_name.clone(),
body: broadcast_body.clone(),
in_reply_to: None,
}) {
errors.push(format!("{agent_name}: {e}"));
}

View file

@ -1496,6 +1496,7 @@ async fn post_op_send(State(state): State<AppState>, Form(form): Form<OpSendForm
from: hive_sh4re::OPERATOR_RECIPIENT.to_owned(),
to: to.clone(),
body,
in_reply_to: None,
}) {
return error_response(&format!("op-send to {to} failed: {e:#}"));
}

View file

@ -90,7 +90,7 @@ fn manager_recv_timeout(wait_seconds: Option<u64>) -> std::time::Duration {
#[allow(clippy::too_many_lines)]
async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResponse {
match req {
ManagerRequest::Send { to, body } => {
ManagerRequest::Send { to, body, in_reply_to } => {
if let Err(message) = crate::limits::check_size("send", body) {
return ManagerResponse::Err { message };
}
@ -108,6 +108,7 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResp
from: MANAGER_AGENT.to_owned(),
to: to.clone(),
body: body.clone(),
in_reply_to: *in_reply_to,
}) {
Ok(()) => ManagerResponse::Ok,
Err(e) => ManagerResponse::Err {
@ -120,6 +121,7 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResp
from: hive_sh4re::OPERATOR_RECIPIENT.to_owned(),
to: MANAGER_AGENT.to_owned(),
body: body.clone(),
in_reply_to: None,
}) {
Ok(()) => ManagerResponse::Ok,
Err(e) => ManagerResponse::Err {
@ -153,6 +155,7 @@ async fn dispatch(req: &ManagerRequest, coord: &Arc<Coordinator>) -> ManagerResp
body: d.message.body,
id: d.id,
redelivered: d.redelivered,
in_reply_to: d.message.in_reply_to,
})
.collect(),
},