diff --git a/CHANGELOG.md b/CHANGELOG.md index 4046bbb..fa5f117 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,87 @@ All notable changes to Shade are documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [4.8.2] — 2026-05-08 — Per-`from` decrypt serialization + per-connection bridge dedup + +Two interlocking robustness fixes for the first-contact / duplicate-fan-out +class of failures Prism reported. Either fix on its own would help; together +they make the receiver path tolerant of any combination of relay duplicates +and concurrent dispatchers. + +**(1) `Shade.receive(from, env)` now serializes its ratchet/storage +step per `from`.** The send path has had a per-address `encryptChains` +mutex since V1 — receive did not. Concurrent decrypts for the same peer +raced the `SessionManager` ratchet (mutated in place) and the +`StorageProvider` (which is not required to be a concurrent-safe +writer — `bun:sqlite` throws `database is locked`, IndexedDB throws +transaction conflicts). Symptom in production: a single relay PUT that +fans out 8× over a WS bridge gets dispatched as 8 parallel +`shade.receive` calls; one wins the X3DH prekey race, the other 7 fail +with `database is locked` or `one-time prekey not found: `, and the +post-decrypt side effects (`markPeerVerified`, +`BroadcastChannel.addMember`, paired-reply `inbox.send`) get lost in +the rubble. The decrypt step is now chained off a per-`from` promise +queue. Crucially, the user-facing **message handlers run outside the +queue** — streams + file-RPC issue nested `shade.receive` calls for the +same peer from inside their handlers (e.g. `stream-end` arrives while a +write-RPC is still waiting on chunks), and holding the queue across the +handler would self-deadlock. Only the atomic ratchet+storage step is +protected. + +**(2) Bridge handlers (WS + SSE) now run a per-connection msgId +LRU dedup.** Cursor-based delivery already de-duplicates in the happy +path, but the gate is a defense-in-depth against any subtle re-entry of +`flushTo` (event-storm, future refactor, fallback-timer race). The chain +that drives flush is now also wrapped in `.catch(() => {})` so a +transient `ws.send` / SSE write rejection doesn't poison every future +push on the connection. + +Both reported by Prism (multi-device E2EE terminal). Wave-3 pair +handshake is unblocked even when the receiver runs multiple bridges or +the relay double-fires `inbox.blob_stored`. + +### Fixed + +#### `@shade/sdk` — `Shade.receive` per-`from` serialization +- `Shade` gains a private `decryptChains: Map>` + mirroring the existing `encryptChains` on the send path. +- `Shade.receive(from, env)` chains its `manager.decrypt(from, env)` + call off the prior decrypt promise for the same `from`. The + post-decrypt control-plaintext check and user `messageHandlers` run + *outside* the chain so nested `shade.receive` calls from inside a + handler don't self-deadlock (streams + file-RPC depend on this). +- The stored chain is `decryptPromise.catch(() => undefined)` so a + rejection in one decrypt doesn't sabotage the next; this caller + still sees its own rejection through the original promise. +- External signature unchanged. + +#### `@shade/inbox-server` — bridge per-connection msgId dedup +- New internal `DeliveredIdLru` (4096-entry bounded set, FIFO eviction) + per WS / SSE connection. `flushTo` skips emit when a row's `msgId` is + already in the LRU. Long-poll handlers don't need it (each request is + isolated). +- `pendingFlushPromise` chains in both WS and SSE handlers now + terminate in `.catch(() => {})` so a transient emit failure doesn't + silently kill the connection's flush loop. + +### Tests +- `packages/shade-transport-bridge/tests/bridge.test.ts` — new + "Bridge dedup" describe block: storms `inbox.blob_stored` 10× for one + PUT and asserts WS / SSE both deliver exactly one frame. +- `packages/shade-sdk/tests/sdk.test.ts` — new + "concurrent receive(from, env) for same `from` does not race the + ratchet" exercises 8 parallel `bob.receive('alice', env)` for the + same envelope and asserts: + 1. at least one fulfills with the right plaintext; + 2. no rejection mentions `database is locked`; + 3. the next legitimate message still decrypts (ratchet intact). + +### Migration + +None. Drop-in. Bridges and receivers behave identically on non- +duplicate paths; the new gates only kick in when a duplicate would +otherwise have been emitted / dispatched. + ## [4.8.1] — 2026-05-08 — `SHADE_DISABLE_RATE_LIMIT` env var for single-tenant deploys The standalone server's `routes.ts` and `inbox-server`'s diff --git a/packages/shade-cli/package.json b/packages/shade-cli/package.json index 0f4113a..2d3aa1e 100644 --- a/packages/shade-cli/package.json +++ b/packages/shade-cli/package.json @@ -1,6 +1,6 @@ { "name": "@shade/cli", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/cli.ts", "bin": { diff --git a/packages/shade-core/package.json b/packages/shade-core/package.json index ebae657..05e4276 100644 --- a/packages/shade-core/package.json +++ b/packages/shade-core/package.json @@ -1,6 +1,6 @@ { "name": "@shade/core", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-crypto-web/package.json b/packages/shade-crypto-web/package.json index 845ea27..f442a42 100644 --- a/packages/shade-crypto-web/package.json +++ b/packages/shade-crypto-web/package.json @@ -1,6 +1,6 @@ { "name": "@shade/crypto-web", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-dashboard/package.json b/packages/shade-dashboard/package.json index e69b8f0..17c7d94 100644 --- a/packages/shade-dashboard/package.json +++ b/packages/shade-dashboard/package.json @@ -1,6 +1,6 @@ { "name": "@shade/dashboard", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "scripts": { "dev": "vite", diff --git a/packages/shade-files/package.json b/packages/shade-files/package.json index 05524ee..dbf0796 100644 --- a/packages/shade-files/package.json +++ b/packages/shade-files/package.json @@ -1,6 +1,6 @@ { "name": "@shade/files", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-inbox-server/package.json b/packages/shade-inbox-server/package.json index 6c6ee65..180f43e 100644 --- a/packages/shade-inbox-server/package.json +++ b/packages/shade-inbox-server/package.json @@ -1,6 +1,6 @@ { "name": "@shade/inbox-server", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-inbox-server/src/bridge.ts b/packages/shade-inbox-server/src/bridge.ts index 906a486..52ea6bd 100644 --- a/packages/shade-inbox-server/src/bridge.ts +++ b/packages/shade-inbox-server/src/bridge.ts @@ -136,15 +136,22 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): { }; let cursor = verified.since; const writer = makeBlobWriter(opts.store, pageLimit); + const delivered = new DeliveredIdLru(); // Initial backlog drain. - const flushed = await flushTo(writer, address, cursor, async (blob) => { - await stream.writeSSE({ - id: String(blob.receivedAt), - event: 'envelope', - data: JSON.stringify(serializeBlob(blob)), - }); - }); + const flushed = await flushTo( + writer, + address, + cursor, + async (blob) => { + await stream.writeSSE({ + id: String(blob.receivedAt), + event: 'envelope', + data: JSON.stringify(serializeBlob(blob)), + }); + }, + delivered, + ); cursor = Math.max(cursor, flushed); // Hook up event-driven push if available, else fall back to a poll @@ -156,19 +163,31 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): { const triggerFlush = (): void => { signalled = true; // Serialize fan-in so concurrent triggers don't double-fetch. - pendingFlushPromise = pendingFlushPromise.then(async () => { - while (signalled) { - signalled = false; - const drained = await flushTo(writer, address, cursor, async (blob) => { - await stream.writeSSE({ - id: String(blob.receivedAt), - event: 'envelope', - data: JSON.stringify(serializeBlob(blob)), - }); - }); - if (drained > cursor) cursor = drained; - } - }); + // `.catch(() => {})` keeps the chain alive across transient + // emit failures (e.g. a closed SSE write throws) — without it + // one rejection silently kills every future flush on this + // connection. + pendingFlushPromise = pendingFlushPromise + .then(async () => { + while (signalled) { + signalled = false; + const drained = await flushTo( + writer, + address, + cursor, + async (blob) => { + await stream.writeSSE({ + id: String(blob.receivedAt), + event: 'envelope', + data: JSON.stringify(serializeBlob(blob)), + }); + }, + delivered, + ); + if (drained > cursor) cursor = drained; + } + }) + .catch(() => {}); }; if (opts.events) { @@ -327,6 +346,7 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): { const connId = presence.newConnectionId(); let cursor = verified.since; const writer = makeBlobWriter(opts.store, pageLimit); + const delivered = new DeliveredIdLru(); let unsubscribe: (() => void) | null = null; let fallbackTimer: ReturnType | null = null; let pendingFlushPromise: Promise = Promise.resolve(); @@ -347,15 +367,26 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): { presence.markConnected(address, 'ws', connId); const triggerFlush = (): void => { signalled = true; - pendingFlushPromise = pendingFlushPromise.then(async () => { - while (signalled && connected) { - signalled = false; - const drained = await flushTo(writer, address, cursor, async (blob) => { - ws.send(JSON.stringify(serializeBlob(blob))); - }); - if (drained > cursor) cursor = drained; - } - }); + // `.catch(() => {})` mirrors the SSE chain — keeps the + // pending-flush queue alive across transient ws.send errors + // (e.g. partial close, backpressure overflow). + pendingFlushPromise = pendingFlushPromise + .then(async () => { + while (signalled && connected) { + signalled = false; + const drained = await flushTo( + writer, + address, + cursor, + async (blob) => { + ws.send(JSON.stringify(serializeBlob(blob))); + }, + delivered, + ); + if (drained > cursor) cursor = drained; + } + }) + .catch(() => {}); }; if (opts.events) { unsubscribe = opts.events.on((e) => { @@ -518,11 +549,41 @@ function makeBlobWriter(store: InboxStore, pageLimit: number): BlobWriter { }; } +/** + * Per-connection bounded msgId tracker — defense in depth against duplicate + * delivery of the same blob to the same bridge socket. Cursor pagination + * already guarantees uniqueness in the happy path, but a dedup gate at the + * emit boundary catches any subtle bug (e.g. a flushTo race, a future + * refactor, an event-emit retry) without changing wire semantics. + * + * The cap is intentionally large enough to cover any realistic bridge + * pageLimit and small enough to bound memory under long-running streams. + */ +const DELIVERED_LRU_CAP = 4096; + +class DeliveredIdLru { + private readonly seen = new Set(); + private readonly order: string[] = []; + + /** Returns true if `msgId` has not been seen on this connection yet. */ + add(msgId: string): boolean { + if (this.seen.has(msgId)) return false; + this.seen.add(msgId); + this.order.push(msgId); + if (this.order.length > DELIVERED_LRU_CAP) { + const evicted = this.order.shift()!; + this.seen.delete(evicted); + } + return true; + } +} + async function flushTo( writer: BlobWriter, address: string, startCursor: number, emit: (blob: BlobRow) => Promise, + delivered?: DeliveredIdLru, ): Promise { let cursor = startCursor; // Drain page-by-page so a backlog larger than `pageLimit` still flushes. @@ -531,7 +592,12 @@ async function flushTo( const page = await writer.fetchPage(address, cursor); if (page.length === 0) break; for (const row of page) { - await emit(row); + // Per-connection dedup gate — prevents the same msgId from being + // emitted twice if flushTo is somehow re-entered before the cursor + // catches up. See comment on `DeliveredIdLru`. + if (!delivered || delivered.add(row.msgId)) { + await emit(row); + } if (row.receivedAt > cursor) cursor = row.receivedAt; } if (page.length === 0) break; diff --git a/packages/shade-inbox/package.json b/packages/shade-inbox/package.json index d21385a..83a6d42 100644 --- a/packages/shade-inbox/package.json +++ b/packages/shade-inbox/package.json @@ -1,6 +1,6 @@ { "name": "@shade/inbox", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-key-transparency/package.json b/packages/shade-key-transparency/package.json index 65edc23..623d0fc 100644 --- a/packages/shade-key-transparency/package.json +++ b/packages/shade-key-transparency/package.json @@ -1,6 +1,6 @@ { "name": "@shade/key-transparency", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-keychain/package.json b/packages/shade-keychain/package.json index 004269a..c106aca 100644 --- a/packages/shade-keychain/package.json +++ b/packages/shade-keychain/package.json @@ -1,6 +1,6 @@ { "name": "@shade/keychain", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-observability/package.json b/packages/shade-observability/package.json index bc7fffc..b455c41 100644 --- a/packages/shade-observability/package.json +++ b/packages/shade-observability/package.json @@ -1,6 +1,6 @@ { "name": "@shade/observability", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-observer/package.json b/packages/shade-observer/package.json index 24c4506..289c7c2 100644 --- a/packages/shade-observer/package.json +++ b/packages/shade-observer/package.json @@ -1,6 +1,6 @@ { "name": "@shade/observer", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-proto/package.json b/packages/shade-proto/package.json index 887452d..e4f741f 100644 --- a/packages/shade-proto/package.json +++ b/packages/shade-proto/package.json @@ -1,6 +1,6 @@ { "name": "@shade/proto", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-recovery/package.json b/packages/shade-recovery/package.json index a346f89..89dd8bf 100644 --- a/packages/shade-recovery/package.json +++ b/packages/shade-recovery/package.json @@ -1,6 +1,6 @@ { "name": "@shade/recovery", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-sdk/package.json b/packages/shade-sdk/package.json index 1d6783f..897af6d 100644 --- a/packages/shade-sdk/package.json +++ b/packages/shade-sdk/package.json @@ -1,6 +1,6 @@ { "name": "@shade/sdk", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-sdk/src/shade.ts b/packages/shade-sdk/src/shade.ts index 1e70311..c2abb28 100644 --- a/packages/shade-sdk/src/shade.ts +++ b/packages/shade-sdk/src/shade.ts @@ -153,6 +153,13 @@ export class Shade { private establishing = new Map>(); // Per-address encrypt queue to serialize ratchet mutations private encryptChains = new Map>(); + // Per-`from` decrypt queue: serializes incoming receives so two concurrent + // shade.receive(from, env) calls can't race the ratchet/storage. Without + // this, parallel deliveries (relay duplicate fan-out, fast pipelined + // sends) hit `database is locked` (sqlite) or transaction conflicts (IDB) + // because the underlying StorageProvider isn't required to be a + // concurrent-safe writer. See V4.8.2 changelog. + private decryptChains = new Map>(); // Message handlers — may be sync or async; receive() awaits each. The // optional third arg distinguishes direct vs broadcast plaintexts; @@ -436,7 +443,35 @@ export class Shade { */ async receive(from: string, envelope: ShadeEnvelope): Promise { if (!this.initialized) throw new Error('Not initialized'); - const plaintext = await this.manager.decrypt(from, envelope); + + // Serialize ONLY the ratchet/storage write portion of receive (the + // call into `manager.decrypt`). Concurrent decrypts race the + // SessionManager ratchet (mutated in place) and the StorageProvider + // (not required to be a concurrent-safe writer — `bun:sqlite` + // throws `database is locked`, IDB throws transaction conflicts). + // The Prism FR called this out: a relay-duplicated WS fan-out + // dispatched 8 parallel `shade.receive(from, env)` calls, one won + // the X3DH prekey race and the other 7 failed with + // `database is locked` / `one-time prekey not found`. The fix is + // to queue per-`from` decrypts so the ratchet step is sequential. + // + // Crucially the user-visible MESSAGE HANDLERS run *outside* the + // queue. Streams + file-RPC issue nested `shade.receive` calls for + // the same peer from inside their handlers (e.g. `stream-end` + // arrives while a write-RPC is still waiting on chunks); holding + // the queue across the handler would self-deadlock. The atomic + // unit we have to protect is just the ratchet+storage step, not + // the consumer's reaction to it. + const previous = this.decryptChains.get(from) ?? Promise.resolve(); + const decryptPromise = previous + .catch(() => undefined) // don't propagate upstream failures + .then(() => this.manager.decrypt(from, envelope)); + // Store a never-rejecting copy so the next chained receive doesn't + // see a rejection from this one (we still surface our own rejection + // to *this* caller via the original `decryptPromise`). + this.decryptChains.set(from, decryptPromise.catch(() => undefined)); + const plaintext = await decryptPromise; + const consumed = await maybeHandleControlPlaintext( this.broadcastHooks(), from, diff --git a/packages/shade-sdk/tests/sdk.test.ts b/packages/shade-sdk/tests/sdk.test.ts index 5af99e4..aa6e361 100644 --- a/packages/shade-sdk/tests/sdk.test.ts +++ b/packages/shade-sdk/tests/sdk.test.ts @@ -131,6 +131,53 @@ describe('createShade — happy path', () => { await expect(alice.send('nobody', 'ghost')).rejects.toThrow(); }); + test('concurrent receive(from, env) for same `from` does not race the ratchet (V4.8.2)', async () => { + // Reproduces the Prism FR scenario: a single PUT is fanned out + // multiple times by the relay (or any duplicating transport), the + // receiver dispatches several `shade.receive(from, env)` in + // parallel, and the underlying SessionManager + StorageProvider + // would race on the ratchet (and on storage writes — sqlite throws + // "database is locked", IDB throws transaction conflicts) without + // per-`from` serialization. We pre-establish a session, then fire + // the same envelope at `bob.receive` from many concurrent callers + // and verify all of them either decrypt to the same plaintext or + // surface a benign "already-consumed" error. Crucially: no + // unhandled storage races, no ratchet corruption, and the next + // legitimate message still decrypts. + alice = await createShade({ prekeyServer: server.url, address: 'alice' }); + bob = await createShade({ prekeyServer: server.url, address: 'bob' }); + + const env1 = await alice.send('bob', 'first'); + expect(await bob.receive('alice', env1)).toBe('first'); + + const env2 = await alice.send('bob', 'second'); + // Fan the same envelope out to 8 concurrent receives — exactly the + // shape of the relay duplicate fan-out described in the FR. + const dispatches = await Promise.allSettled( + Array.from({ length: 8 }, () => bob.receive('alice', env2)), + ); + // At least one must have succeeded with the right plaintext; the + // others may legitimately reject (replay protection / OTPK + // already-consumed) but MUST NOT corrupt the ratchet or throw + // "database is locked". + const fulfilled = dispatches.filter((d) => d.status === 'fulfilled') as Array< + PromiseFulfilledResult + >; + expect(fulfilled.length).toBeGreaterThan(0); + expect(fulfilled[0]!.value).toBe('second'); + + for (const d of dispatches) { + if (d.status === 'rejected') { + const msg = String((d.reason as Error)?.message ?? d.reason); + expect(msg).not.toMatch(/database is locked/i); + } + } + + // Ratchet must still advance — the next legitimate message decrypts. + const env3 = await alice.send('bob', 'third'); + expect(await bob.receive('alice', env3)).toBe('third'); + }); + test('verify fingerprint matches pinned identity', async () => { alice = await createShade({ prekeyServer: server.url, address: 'alice' }); bob = await createShade({ prekeyServer: server.url, address: 'bob' }); diff --git a/packages/shade-server/package.json b/packages/shade-server/package.json index a4aa700..aaa6eca 100644 --- a/packages/shade-server/package.json +++ b/packages/shade-server/package.json @@ -1,6 +1,6 @@ { "name": "@shade/server", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-storage-encrypted/package.json b/packages/shade-storage-encrypted/package.json index b44e42a..e5a3939 100644 --- a/packages/shade-storage-encrypted/package.json +++ b/packages/shade-storage-encrypted/package.json @@ -1,6 +1,6 @@ { "name": "@shade/storage-encrypted", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-storage-indexeddb/package.json b/packages/shade-storage-indexeddb/package.json index 5c755a8..7d2a37c 100644 --- a/packages/shade-storage-indexeddb/package.json +++ b/packages/shade-storage-indexeddb/package.json @@ -1,6 +1,6 @@ { "name": "@shade/storage-indexeddb", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-storage-postgres/package.json b/packages/shade-storage-postgres/package.json index c34cf7f..98fc8f9 100644 --- a/packages/shade-storage-postgres/package.json +++ b/packages/shade-storage-postgres/package.json @@ -1,6 +1,6 @@ { "name": "@shade/storage-postgres", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-storage-sqlite/package.json b/packages/shade-storage-sqlite/package.json index 7e395cb..155d96e 100644 --- a/packages/shade-storage-sqlite/package.json +++ b/packages/shade-storage-sqlite/package.json @@ -1,6 +1,6 @@ { "name": "@shade/storage-sqlite", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-streams/package.json b/packages/shade-streams/package.json index ca0936a..ac031d2 100644 --- a/packages/shade-streams/package.json +++ b/packages/shade-streams/package.json @@ -1,6 +1,6 @@ { "name": "@shade/streams", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-transfer/package.json b/packages/shade-transfer/package.json index dcecc30..22c8d63 100644 --- a/packages/shade-transfer/package.json +++ b/packages/shade-transfer/package.json @@ -1,6 +1,6 @@ { "name": "@shade/transfer", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-transport-bridge/package.json b/packages/shade-transport-bridge/package.json index f94d2ea..f99e52d 100644 --- a/packages/shade-transport-bridge/package.json +++ b/packages/shade-transport-bridge/package.json @@ -1,6 +1,6 @@ { "name": "@shade/transport-bridge", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-transport-bridge/tests/bridge.test.ts b/packages/shade-transport-bridge/tests/bridge.test.ts index 8744a55..0010745 100644 --- a/packages/shade-transport-bridge/tests/bridge.test.ts +++ b/packages/shade-transport-bridge/tests/bridge.test.ts @@ -951,3 +951,80 @@ describe('Sender attribution — bridge push surfaces IncomingMessage.from', () } }); }); + +// ─── V4.8.2 — per-connection msgId dedup (Prism FR: duplicate fan-out) ─ + +describe('Bridge dedup — single PUT yields exactly one push per connection', () => { + test('WS: storming inbox.blob_stored does not duplicate frames for one msgId', async () => { + const h = await bootstrap(); + try { + const received: IncomingMessage[] = []; + const bridge = new WsBridge({ + baseUrl: h.baseUrl, + auth: bobAuth(h), + connectTimeoutMs: 2_000, + disableAutoReconnect: true, + }); + await bridge.connect({ onMessage: (m) => received.push(m) }); + try { + // One real PUT + replay the inbox.blob_stored event ten times to + // simulate any future code path (or external bug) that double- + // fires the trigger. The cursor in flushTo would already cover + // the happy case, but the per-connection LRU is the explicit + // dedup gate that survives even if cursor logic regresses. + const msgId = await putBlob(h, rand(48)); + for (let i = 0; i < 10; i++) { + h.events.emit('inbox.blob_stored', { + address: 'bob', + msgId, + bytes: 48, + ttlSeconds: 60, + }); + } + await waitFor(() => received.length >= 1, 2_000); + // Give any stragglers a chance to arrive and inflate the count. + await new Promise((r) => setTimeout(r, 250)); + expect(received.length).toBe(1); + expect(received[0]!.msgId).toBe(msgId); + } finally { + await bridge.disconnect(); + } + } finally { + h.server.stop(true); + } + }); + + test('SSE: same dedup contract', async () => { + const h = await bootstrap(); + try { + const received: IncomingMessage[] = []; + const bridge = new SseBridge({ + baseUrl: h.baseUrl, + auth: bobAuth(h), + initialBackoffMs: 50, + maxBackoffMs: 200, + disableAutoReconnect: true, + }); + await bridge.connect({ onMessage: (m) => received.push(m) }); + try { + const msgId = await putBlob(h, rand(48)); + for (let i = 0; i < 10; i++) { + h.events.emit('inbox.blob_stored', { + address: 'bob', + msgId, + bytes: 48, + ttlSeconds: 60, + }); + } + await waitFor(() => received.length >= 1, 2_000); + await new Promise((r) => setTimeout(r, 250)); + expect(received.length).toBe(1); + expect(received[0]!.msgId).toBe(msgId); + } finally { + await bridge.disconnect(); + } + } finally { + h.server.stop(true); + } + }); +}); diff --git a/packages/shade-transport-webrtc/package.json b/packages/shade-transport-webrtc/package.json index fe1a9d1..d8919e8 100644 --- a/packages/shade-transport-webrtc/package.json +++ b/packages/shade-transport-webrtc/package.json @@ -1,6 +1,6 @@ { "name": "@shade/transport-webrtc", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-transport/package.json b/packages/shade-transport/package.json index d26a59c..21d62aa 100644 --- a/packages/shade-transport/package.json +++ b/packages/shade-transport/package.json @@ -1,6 +1,6 @@ { "name": "@shade/transport", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-widgets/package.json b/packages/shade-widgets/package.json index 25aa100..0d29e78 100644 --- a/packages/shade-widgets/package.json +++ b/packages/shade-widgets/package.json @@ -1,6 +1,6 @@ { "name": "@shade/widgets", - "version": "4.8.1", + "version": "4.8.2", "type": "module", "main": "src/index.ts", "types": "src/index.ts",