render message reply threads in dashboard and per-agent inbox

- MessageEvent and DashboardEvent Sent/Delivered now carry id and in_reply_to
- broker.send() includes last_insert_rowid in the emitted event
- recent_all() and recv_batch() include id and in_reply_to from the DB
- deliver_reminders_batch() tracks per-row rowids within the transaction
- dashboard message flow: reply rows are indented with a border-left and a
  clickable '↳ reply' tag that scroll-jumps + briefly highlights the parent
- per-agent inbox: reply messages get a '↳ reply ·' prefix and indent

Closes #26
This commit is contained in:
iris 2026-05-20 15:27:31 +02:00 committed by Mara
parent 804875d670
commit b1f10b1d1b
8 changed files with 132 additions and 22 deletions

View file

@ -100,16 +100,22 @@ pub const MAX_REMINDER_ATTEMPTS: u32 = 5;
#[serde(rename_all = "snake_case", tag = "kind")]
pub enum MessageEvent {
Sent {
/// Broker row id — used by the dashboard to track thread parents.
id: i64,
from: String,
to: String,
body: String,
at: i64,
in_reply_to: Option<i64>,
},
Delivered {
/// Broker row id — used by the dashboard to track thread parents.
id: i64,
from: String,
to: String,
body: String,
at: i64,
in_reply_to: Option<i64>,
},
}
@ -167,16 +173,20 @@ impl Broker {
pub fn send(&self, message: &Message) -> Result<()> {
let conn = self.conn.lock().unwrap();
let now = now_unix();
conn.execute(
"INSERT INTO messages (sender, recipient, body, sent_at, in_reply_to) VALUES (?1, ?2, ?3, ?4, ?5)",
params![message.from, message.to, message.body, now_unix(), message.in_reply_to],
params![message.from, message.to, message.body, now, message.in_reply_to],
)?;
let row_id = conn.last_insert_rowid();
drop(conn);
let _ = self.events.send(MessageEvent::Sent {
id: row_id,
from: message.from.clone(),
to: message.to.clone(),
body: message.body.clone(),
at: now_unix(),
at: now,
in_reply_to: message.in_reply_to,
});
Ok(())
}
@ -220,21 +230,19 @@ impl Broker {
let conn = self.conn.lock().unwrap();
let limit_i = i64::try_from(limit.min(i64::MAX as u64)).unwrap_or(i64::MAX);
let mut stmt = conn.prepare(
"SELECT sender, recipient, body, sent_at
"SELECT id, sender, recipient, body, sent_at, in_reply_to
FROM messages
ORDER BY id DESC
LIMIT ?1",
)?;
// `recent_all` powers dashboard backfill; in_reply_to is not
// carried through MessageEvent::Sent (no field for it) — that
// is fine for now; the dashboard thread-rendering will use
// /api/state InboxRow data which does carry the field.
let rows = stmt.query_map(params![limit_i], |row| {
Ok(MessageEvent::Sent {
from: row.get(0)?,
to: row.get(1)?,
body: row.get(2)?,
at: row.get(3)?,
id: row.get(0)?,
from: row.get(1)?,
to: row.get(2)?,
body: row.get(3)?,
at: row.get(4)?,
in_reply_to: row.get(5)?,
})
})?;
rows.collect::<rusqlite::Result<Vec<_>>>()
@ -408,10 +416,12 @@ impl Broker {
// which surface the harness used.
for d in &deliveries {
let _ = self.events.send(MessageEvent::Delivered {
id: d.id,
from: d.message.from.clone(),
to: d.message.to.clone(),
body: d.message.body.clone(),
at: now,
in_reply_to: d.message.in_reply_to,
});
}
Ok(deliveries)
@ -749,20 +759,33 @@ impl Broker {
}
};
let mut results: Vec<Result<()>> = Vec::with_capacity(items.len());
// Per-item broker row ids — collected inside the transaction so
// we can emit Sent events with the correct id after commit.
let mut msg_ids: Vec<i64> = Vec::with_capacity(items.len());
for (id, agent, body) in items {
let r = (|| -> Result<()> {
let r = (|| -> Result<i64> {
tx.execute(
"INSERT INTO messages (sender, recipient, body, sent_at) \
VALUES (?1, ?2, ?3, ?4)",
params!["reminder", agent, body, now],
)?;
let msg_id = tx.last_insert_rowid();
tx.execute(
"UPDATE reminders SET sent_at = ?1 WHERE id = ?2",
params![now, id],
)?;
Ok(())
Ok(msg_id)
})();
results.push(r);
match r {
Ok(msg_id) => {
msg_ids.push(msg_id);
results.push(Ok(()));
}
Err(e) => {
msg_ids.push(-1);
results.push(Err(e));
}
}
}
if let Err(e) = tx.commit() {
let err_str = format!("{e:#}");
@ -773,13 +796,15 @@ impl Broker {
}
drop(conn);
// Emit per-row Sent events (only for rows that succeeded).
for ((id, agent, body), result) in items.iter().zip(results.iter()) {
for (((id, agent, body), result), msg_id) in items.iter().zip(results.iter()).zip(msg_ids.iter()) {
if result.is_ok() {
let _ = self.events.send(MessageEvent::Sent {
id: *msg_id,
from: "reminder".to_owned(),
to: agent.clone(),
body: body.clone(),
at: now,
in_reply_to: None,
});
tracing::debug!(reminder_id = id, %agent, "reminder delivered");
}

View file

@ -696,25 +696,29 @@ async fn dashboard_history(State(state): State<AppState>) -> Response {
let events: Vec<crate::dashboard_events::DashboardEvent> = messages
.into_iter()
.map(|m| match m {
crate::broker::MessageEvent::Sent { from, to, body, at } => {
crate::broker::MessageEvent::Sent { id, from, to, body, at, in_reply_to } => {
let file_refs = scan_validated_paths(&body);
crate::dashboard_events::DashboardEvent::Sent {
seq: 0,
id,
from,
to,
body,
at,
in_reply_to,
file_refs,
}
}
crate::broker::MessageEvent::Delivered { from, to, body, at } => {
crate::broker::MessageEvent::Delivered { id, from, to, body, at, in_reply_to } => {
let file_refs = scan_validated_paths(&body);
crate::dashboard_events::DashboardEvent::Delivered {
seq: 0,
id,
from,
to,
body,
at,
in_reply_to,
file_refs,
}
}

View file

@ -40,10 +40,14 @@ pub enum DashboardEvent {
/// appear in this list, everything else stays plain text.
Sent {
seq: u64,
/// Broker row id. Allows the dashboard to track reply threads.
id: i64,
from: String,
to: String,
body: String,
at: i64,
#[serde(default, skip_serializing_if = "Option::is_none")]
in_reply_to: Option<i64>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
file_refs: Vec<String>,
},
@ -51,10 +55,14 @@ pub enum DashboardEvent {
/// `file_refs` is the same shape as `Sent`.
Delivered {
seq: u64,
/// Broker row id. Allows the dashboard to track reply threads.
id: i64,
from: String,
to: String,
body: String,
at: i64,
#[serde(default, skip_serializing_if = "Option::is_none")]
in_reply_to: Option<i64>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
file_refs: Vec<String>,
},

View file

@ -257,25 +257,29 @@ fn spawn_broker_to_dashboard_forwarder(coord: Arc<Coordinator>) {
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(MessageEvent::Sent { from, to, body, at }) => {
Ok(MessageEvent::Sent { id, from, to, body, at, in_reply_to }) => {
let file_refs = dashboard::scan_validated_paths(&body);
coord.emit_dashboard_event(DashboardEvent::Sent {
seq: coord.next_seq(),
id,
from,
to,
body,
at,
in_reply_to,
file_refs,
});
}
Ok(MessageEvent::Delivered { from, to, body, at }) => {
Ok(MessageEvent::Delivered { id, from, to, body, at, in_reply_to }) => {
let file_refs = dashboard::scan_validated_paths(&body);
coord.emit_dashboard_event(DashboardEvent::Delivered {
seq: coord.next_seq(),
id,
from,
to,
body,
at,
in_reply_to,
file_refs,
});
}