release(v4.8.2): per-from receive serialization + per-connection bridge dedup
Some checks failed
Test / test (push) Has been cancelled

Two interlocking robustness fixes for the duplicate-fan-out / first-contact
class of failures Prism reported.

1. `Shade.receive(from, env)` now queues its `manager.decrypt` step
   per `from` so concurrent dispatches can't race the SessionManager
   ratchet or the StorageProvider (sqlite "database is locked", IDB
   transaction conflicts). User message handlers run *outside* the
   queue so streams + file-RPC's nested `shade.receive` calls don't
   self-deadlock.

2. Bridge WS + SSE handlers now run a per-connection bounded msgId
   LRU as defense-in-depth against any flushTo re-entry (event-storm,
   future refactor). Pending-flush chains are wrapped in `.catch(() =>
   {})` so a transient `ws.send` rejection no longer poisons the
   connection's flush loop.

Tests: storming `inbox.blob_stored` 10× per PUT yields exactly one WS/
SSE frame; 8 concurrent `bob.receive('alice', envelope)` calls keep
the ratchet intact and never surface "database is locked".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-08 12:13:46 +02:00
parent 680d6386f3
commit 8c606ad498
30 changed files with 362 additions and 56 deletions

View File

@@ -136,15 +136,22 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): {
};
let cursor = verified.since;
const writer = makeBlobWriter(opts.store, pageLimit);
const delivered = new DeliveredIdLru();
// Initial backlog drain.
const flushed = await flushTo(writer, address, cursor, async (blob) => {
await stream.writeSSE({
id: String(blob.receivedAt),
event: 'envelope',
data: JSON.stringify(serializeBlob(blob)),
});
});
const flushed = await flushTo(
writer,
address,
cursor,
async (blob) => {
await stream.writeSSE({
id: String(blob.receivedAt),
event: 'envelope',
data: JSON.stringify(serializeBlob(blob)),
});
},
delivered,
);
cursor = Math.max(cursor, flushed);
// Hook up event-driven push if available, else fall back to a poll
@@ -156,19 +163,31 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): {
const triggerFlush = (): void => {
signalled = true;
// Serialize fan-in so concurrent triggers don't double-fetch.
pendingFlushPromise = pendingFlushPromise.then(async () => {
while (signalled) {
signalled = false;
const drained = await flushTo(writer, address, cursor, async (blob) => {
await stream.writeSSE({
id: String(blob.receivedAt),
event: 'envelope',
data: JSON.stringify(serializeBlob(blob)),
});
});
if (drained > cursor) cursor = drained;
}
});
// `.catch(() => {})` keeps the chain alive across transient
// emit failures (e.g. a closed SSE write throws) — without it
// one rejection silently kills every future flush on this
// connection.
pendingFlushPromise = pendingFlushPromise
.then(async () => {
while (signalled) {
signalled = false;
const drained = await flushTo(
writer,
address,
cursor,
async (blob) => {
await stream.writeSSE({
id: String(blob.receivedAt),
event: 'envelope',
data: JSON.stringify(serializeBlob(blob)),
});
},
delivered,
);
if (drained > cursor) cursor = drained;
}
})
.catch(() => {});
};
if (opts.events) {
@@ -327,6 +346,7 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): {
const connId = presence.newConnectionId();
let cursor = verified.since;
const writer = makeBlobWriter(opts.store, pageLimit);
const delivered = new DeliveredIdLru();
let unsubscribe: (() => void) | null = null;
let fallbackTimer: ReturnType<typeof setInterval> | null = null;
let pendingFlushPromise: Promise<void> = Promise.resolve();
@@ -347,15 +367,26 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): {
presence.markConnected(address, 'ws', connId);
const triggerFlush = (): void => {
signalled = true;
pendingFlushPromise = pendingFlushPromise.then(async () => {
while (signalled && connected) {
signalled = false;
const drained = await flushTo(writer, address, cursor, async (blob) => {
ws.send(JSON.stringify(serializeBlob(blob)));
});
if (drained > cursor) cursor = drained;
}
});
// `.catch(() => {})` mirrors the SSE chain — keeps the
// pending-flush queue alive across transient ws.send errors
// (e.g. partial close, backpressure overflow).
pendingFlushPromise = pendingFlushPromise
.then(async () => {
while (signalled && connected) {
signalled = false;
const drained = await flushTo(
writer,
address,
cursor,
async (blob) => {
ws.send(JSON.stringify(serializeBlob(blob)));
},
delivered,
);
if (drained > cursor) cursor = drained;
}
})
.catch(() => {});
};
if (opts.events) {
unsubscribe = opts.events.on((e) => {
@@ -518,11 +549,41 @@ function makeBlobWriter(store: InboxStore, pageLimit: number): BlobWriter {
};
}
/**
* Per-connection bounded msgId tracker — defense in depth against duplicate
* delivery of the same blob to the same bridge socket. Cursor pagination
* already guarantees uniqueness in the happy path, but a dedup gate at the
* emit boundary catches any subtle bug (e.g. a flushTo race, a future
* refactor, an event-emit retry) without changing wire semantics.
*
* The cap is intentionally large enough to cover any realistic bridge
* pageLimit and small enough to bound memory under long-running streams.
*/
const DELIVERED_LRU_CAP = 4096;
class DeliveredIdLru {
private readonly seen = new Set<string>();
private readonly order: string[] = [];
/** Returns true if `msgId` has not been seen on this connection yet. */
add(msgId: string): boolean {
if (this.seen.has(msgId)) return false;
this.seen.add(msgId);
this.order.push(msgId);
if (this.order.length > DELIVERED_LRU_CAP) {
const evicted = this.order.shift()!;
this.seen.delete(evicted);
}
return true;
}
}
async function flushTo(
writer: BlobWriter,
address: string,
startCursor: number,
emit: (blob: BlobRow) => Promise<void>,
delivered?: DeliveredIdLru,
): Promise<number> {
let cursor = startCursor;
// Drain page-by-page so a backlog larger than `pageLimit` still flushes.
@@ -531,7 +592,12 @@ async function flushTo(
const page = await writer.fetchPage(address, cursor);
if (page.length === 0) break;
for (const row of page) {
await emit(row);
// Per-connection dedup gate — prevents the same msgId from being
// emitted twice if flushTo is somehow re-entered before the cursor
// catches up. See comment on `DeliveredIdLru`.
if (!delivered || delivered.add(row.msgId)) {
await emit(row);
}
if (row.receivedAt > cursor) cursor = row.receivedAt;
}
if (page.length === 0) break;