dashboard: re-sync /api/state on SSE (re)connect
The dashboard cold-loaded its derived stores (approvals, questions, containers, …) from /api/state once, then relied solely on live SSE events. Events that fired during a disconnect window (reconnect, hive-c0re restart) are never replayed, so the dashboard drifted stale until a manual reload. - terminal.js: add onStreamOpen, fired on every EventSource open (initial + reconnect); the dashboard wires it to refreshState() so every connection epoch re-syncs the authoritative snapshot. - terminal.js: seq-dedupe only event kinds that actually appeared in the history replay. Mutation events are never in /dashboard/history, so deduping them against the broker-history seq wrongly dropped ones that fired between the /api/state snapshot and the history fetch. - app.js: make applyApprovalResolved / applyQuestionResolved idempotent (guard the history unshift by id) so a re-sync overlapping a live event can't double a history row. closes #163
This commit is contained in:
parent
fefa91a39e
commit
32f4796a7f
2 changed files with 75 additions and 34 deletions
|
|
@ -789,6 +789,10 @@
|
||||||
const idx = questionsState.pending.findIndex((q) => q.id === ev.id);
|
const idx = questionsState.pending.findIndex((q) => q.id === ev.id);
|
||||||
const existing = idx >= 0 ? questionsState.pending[idx] : null;
|
const existing = idx >= 0 ? questionsState.pending[idx] : null;
|
||||||
if (idx >= 0) questionsState.pending.splice(idx, 1);
|
if (idx >= 0) questionsState.pending.splice(idx, 1);
|
||||||
|
// Idempotent: a snapshot re-sync (issue #163) can carry this same
|
||||||
|
// answered row in `question_history` while a live event also
|
||||||
|
// delivers it — guard the unshift so history can't double a row.
|
||||||
|
if (!questionsState.history.some((h) => h.id === ev.id)) {
|
||||||
questionsState.history.unshift({
|
questionsState.history.unshift({
|
||||||
id: ev.id,
|
id: ev.id,
|
||||||
asker: existing?.asker || '?',
|
asker: existing?.asker || '?',
|
||||||
|
|
@ -806,6 +810,7 @@
|
||||||
if (questionsState.history.length > QUESTION_HISTORY_LIMIT) {
|
if (questionsState.history.length > QUESTION_HISTORY_LIMIT) {
|
||||||
questionsState.history.length = QUESTION_HISTORY_LIMIT;
|
questionsState.history.length = QUESTION_HISTORY_LIMIT;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
renderQuestions();
|
renderQuestions();
|
||||||
}
|
}
|
||||||
// Filter selection for the questions section. Persisted so the
|
// Filter selection for the questions section. Persisted so the
|
||||||
|
|
@ -1083,6 +1088,10 @@
|
||||||
function applyApprovalResolved(ev) {
|
function applyApprovalResolved(ev) {
|
||||||
// Drop from pending; prepend to history (newest-first), cap at 30.
|
// Drop from pending; prepend to history (newest-first), cap at 30.
|
||||||
approvalsState.pending = approvalsState.pending.filter((a) => a.id !== ev.id);
|
approvalsState.pending = approvalsState.pending.filter((a) => a.id !== ev.id);
|
||||||
|
// Idempotent: a snapshot re-sync (issue #163) can carry this same
|
||||||
|
// resolved row in `approval_history` while a live event also
|
||||||
|
// delivers it — guard the unshift so history can't double a row.
|
||||||
|
if (!approvalsState.history.some((h) => h.id === ev.id)) {
|
||||||
approvalsState.history.unshift({
|
approvalsState.history.unshift({
|
||||||
id: ev.id,
|
id: ev.id,
|
||||||
agent: ev.agent,
|
agent: ev.agent,
|
||||||
|
|
@ -1096,6 +1105,7 @@
|
||||||
if (approvalsState.history.length > APPROVAL_HISTORY_LIMIT) {
|
if (approvalsState.history.length > APPROVAL_HISTORY_LIMIT) {
|
||||||
approvalsState.history.length = APPROVAL_HISTORY_LIMIT;
|
approvalsState.history.length = APPROVAL_HISTORY_LIMIT;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
renderApprovals();
|
renderApprovals();
|
||||||
}
|
}
|
||||||
// Classify each unified-diff line by its leading char so
|
// Classify each unified-diff line by its leading char so
|
||||||
|
|
@ -1727,6 +1737,13 @@
|
||||||
onAnyEvent: (ev /* , { fromHistory } */) => {
|
onAnyEvent: (ev /* , { fromHistory } */) => {
|
||||||
if (inboxAppendFromEvent(ev)) renderInbox();
|
if (inboxAppendFromEvent(ev)) renderInbox();
|
||||||
},
|
},
|
||||||
|
// Re-sync the full /api/state snapshot on every SSE (re)connect.
|
||||||
|
// Live mutation events that fired during a disconnect window are
|
||||||
|
// never replayed, so without this the derived stores (approvals,
|
||||||
|
// questions, containers, …) would drift stale until a manual
|
||||||
|
// reload (issue #163). refreshState() replaces every store from
|
||||||
|
// the snapshot, so a missed event self-heals on reconnect.
|
||||||
|
onStreamOpen: () => { refreshState(); },
|
||||||
onLiveEvent: (ev) => {
|
onLiveEvent: (ev) => {
|
||||||
pulseBanner();
|
pulseBanner();
|
||||||
if (ev.kind === 'sent' && ev.to === 'operator') {
|
if (ev.kind === 'sent' && ev.to === 'operator') {
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,8 @@
|
||||||
// the full picture (e.g. a per-recipient inbox built from broker
|
// the full picture (e.g. a per-recipient inbox built from broker
|
||||||
// events) */ },
|
// events) */ },
|
||||||
// onBackfillDone: (count) => { /* one-shot after history replay */ },
|
// onBackfillDone: (count) => { /* one-shot after history replay */ },
|
||||||
|
// onStreamOpen: () => { /* fires on every EventSource (re)connect —
|
||||||
|
// use to re-sync snapshot-derived state after a reconnect gap */ },
|
||||||
// pillAnchor: document.getElementById('msgflow').parentElement,
|
// pillAnchor: document.getElementById('msgflow').parentElement,
|
||||||
// });
|
// });
|
||||||
//
|
//
|
||||||
|
|
@ -213,16 +215,35 @@
|
||||||
if (es.readyState === EventSource.CONNECTING) row('note', '[reconnecting…]');
|
if (es.readyState === EventSource.CONNECTING) row('note', '[reconnecting…]');
|
||||||
else row('note', '[disconnected]');
|
else row('note', '[disconnected]');
|
||||||
};
|
};
|
||||||
|
es.onopen = () => {
|
||||||
|
// Fires on the initial connect and on every automatic
|
||||||
|
// reconnect. EventSource never replays events that fired
|
||||||
|
// during a disconnect window, so a consumer with
|
||||||
|
// snapshot-derived state (the dashboard's /api/state stores)
|
||||||
|
// must re-sync here or it shows stale state until a manual
|
||||||
|
// reload (issue #163).
|
||||||
|
if (opts.onStreamOpen) {
|
||||||
|
try { opts.onStreamOpen(); }
|
||||||
|
catch (err) { console.error('onStreamOpen threw', err); }
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
function flushBuffered(boundarySeq) {
|
function flushBuffered(boundarySeq, historyKinds) {
|
||||||
const drained = buffered;
|
const drained = buffered;
|
||||||
buffered = [];
|
buffered = [];
|
||||||
live = true;
|
live = true;
|
||||||
for (const ev of drained) {
|
for (const ev of drained) {
|
||||||
// ev.seq is set by the server on live frames; absent/0 means
|
// Seq-dedupe only events of a kind that actually appeared in
|
||||||
// "no dedupe possible, apply." Historical replays via the
|
// the history replay — those are the only ones that could
|
||||||
// history endpoint carry no seq either way.
|
// double (once via history, once via the live buffer).
|
||||||
if (boundarySeq != null && typeof ev.seq === 'number' && ev.seq <= boundarySeq) {
|
// Mutation events (approval/question/container/…) are never
|
||||||
|
// carried by the history endpoint; deduping them against the
|
||||||
|
// broker-history seq would wrongly drop ones that fired
|
||||||
|
// between a consumer's own snapshot read and this history
|
||||||
|
// fetch (issue #163). ev.seq absent/0 → no dedupe possible.
|
||||||
|
if (boundarySeq != null
|
||||||
|
&& typeof ev.seq === 'number' && ev.seq <= boundarySeq
|
||||||
|
&& historyKinds && historyKinds.has(ev.kind)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
dispatch(ev, false);
|
dispatch(ev, false);
|
||||||
|
|
@ -252,12 +273,15 @@
|
||||||
// it as "no dedupe possible."
|
// it as "no dedupe possible."
|
||||||
const events = Array.isArray(body) ? body : (body.events || []);
|
const events = Array.isArray(body) ? body : (body.events || []);
|
||||||
const boundarySeq = Array.isArray(body) ? null : (body.seq ?? null);
|
const boundarySeq = Array.isArray(body) ? null : (body.seq ?? null);
|
||||||
|
// Kinds present in the history replay — the only kinds that
|
||||||
|
// can double and therefore the only ones to seq-dedupe.
|
||||||
|
const historyKinds = new Set(events.map((ev) => ev.kind));
|
||||||
currentNoAnim = true;
|
currentNoAnim = true;
|
||||||
for (const ev of events) dispatch(ev, true);
|
for (const ev of events) dispatch(ev, true);
|
||||||
currentNoAnim = false;
|
currentNoAnim = false;
|
||||||
if (events.length) row('note', '─── live (older above) ───');
|
if (events.length) row('note', '─── live (older above) ───');
|
||||||
else placeholder('(connected — waiting for events)');
|
else placeholder('(connected — waiting for events)');
|
||||||
flushBuffered(boundarySeq);
|
flushBuffered(boundarySeq, historyKinds);
|
||||||
if (opts.onBackfillDone) opts.onBackfillDone(events.length);
|
if (opts.onBackfillDone) opts.onBackfillDone(events.length);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.warn('history backfill failed', err);
|
console.warn('history backfill failed', err);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue