diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ea2af2..2d0040a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,49 @@ All notable changes to Shade are documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [4.2.1] — 2026-05-04 — Concurrent-ratchet desync under pull-mode drainer + +A consumer running `shade.files.httpClient(server, { outboundQueueUrl, ... })` +alongside parallel RPC traffic against the same peer would, after ~10s of +load, see every subsequent message fail with +`DecryptionError: Failed to decrypt message — wrong key or tampered data`. +Two bugs combined to cause this; both are fixed in `4.2.1` with regression +coverage. + +### Fixed + +#### `@shade/transfer` — `OutboundQueue` waiter cursor +`enqueue` woke pending `drain` waiters with a `since=0` snapshot — the +full event log — instead of using the waiter's own `since`. A poll that +parked at the head and was woken by a fresh enqueue therefore replayed +every event the waiter had already processed. Downstream the queue +fed `Shade.acceptTransferEnvelope`, so the duplicate replayed an +envelope into `manager.decrypt` twice. The second decrypt consumed an +already-used skipped key and corrupted the Double Ratchet receive +chain. Each `PendingWaiter` now records its `since` cursor and is +delivered only events with `id > since`. + +#### `@shade/core` — `ratchetDecrypt` defense-in-depth +A same-DH message whose `counter` was already behind the chain — and +that did NOT match a cached skipped key — fell through to a path that +called `kdfChainKey` on the *current* (ahead) chain key and then set +`chain.counter = message.counter + 1`, permanently desyncing the +ratchet so every subsequent decrypt returned wrong-key. Such messages +are now rejected with `DecryptionError` without any state mutation, so +a downstream replay (transport bug, retry, intermitent network) cannot +poison the session. + +### Tests +- `packages/shade-files/tests/integration/concurrent-ratchet.test.ts` — + 100 parallel `httpClient` RPCs while the drainer runs, plus a mixed + workload of 50 RPCs + 50 raw `shade.send` deliveries with Bob + echoing replies through the queue. Both surface the bug pre-fix. +- `packages/shade-transfer/tests/outbound-queue.test.ts` — direct + regression on the waiter `since` cursor. +- `packages/shade-core/tests/ratchet.test.ts` — replay of an + already-decrypted message must throw cleanly without breaking + subsequent decrypts on the same chain. + ## [4.2.0] — 2026-05-03 — Pull-mode streams for browser @shade/files `4.1.0` shipped HTTP RPC for browser clients but capped them at inline diff --git a/packages/shade-cli/package.json b/packages/shade-cli/package.json index 0337f85..2e2aeeb 100644 --- a/packages/shade-cli/package.json +++ b/packages/shade-cli/package.json @@ -1,6 +1,6 @@ { "name": "@shade/cli", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/cli.ts", "bin": { diff --git a/packages/shade-core/package.json b/packages/shade-core/package.json index 8b1c6c4..9e3da1e 100644 --- a/packages/shade-core/package.json +++ b/packages/shade-core/package.json @@ -1,6 +1,6 @@ { "name": "@shade/core", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-core/src/ratchet.ts b/packages/shade-core/src/ratchet.ts index 3f790b2..5633e15 100644 --- a/packages/shade-core/src/ratchet.ts +++ b/packages/shade-core/src/ratchet.ts @@ -185,6 +185,22 @@ export async function ratchetDecrypt( if (!session.receiveChain) { throw new DecryptionError('No receiving chain available'); } + // Defense-in-depth: a same-DH message whose counter is already + // behind the chain — and that did NOT match a cached skipped key — + // is either a duplicate we already decrypted (skipped key was + // consumed) or one whose key was evicted under cache pressure. + // Falling through would call kdfChainKey on the *current* (ahead) + // chainKey and then rewind `chain.counter = message.counter + 1`, + // permanently desyncing the chain so every subsequent decrypt + // returns wrong-key. Reject without mutating state instead. + if ( + !isNewRatchet && + message.counter < session.receiveChain.counter + ) { + throw new DecryptionError( + 'Failed to decrypt message — wrong key or tampered data', + ); + } await skipMessageKeys(crypto, session, message.dhPublicKey, session.receiveChain, message.counter); // Advance the receiving chain one more step to get this message's key diff --git a/packages/shade-core/tests/ratchet.test.ts b/packages/shade-core/tests/ratchet.test.ts index acf2d4b..dc1b51d 100644 --- a/packages/shade-core/tests/ratchet.test.ts +++ b/packages/shade-core/tests/ratchet.test.ts @@ -281,6 +281,41 @@ describe('Double Ratchet', () => { expect(ratchetDecrypt(crypto, bob, msg)).rejects.toThrow(); }); + + /** + * Regression — the v4.2.0 OutboundQueue waiter-since bug delivered + * the same envelope twice to `manager.decrypt`. The first decrypt + * succeeded via a cached skipped key; the second one fell into the + * `message.counter < chain.counter` path with no skipped key + * available, advanced the chainKey ONCE and rewound `chain.counter` + * to `message.counter + 1`, leaving the ratchet permanently + * desynced. ratchetDecrypt now rejects without mutating state when + * a same-DH message is behind the chain and not in skippedKeys, so + * a downstream replay (transport bug, retry, etc.) cannot poison + * the session for everyone else. + */ + test('same-DH stale message after consumed skipped key fails without corrupting state', async () => { + const { alice, bob } = await setupPair(); + + // Alice sends 3 messages on the same DH chain. + const m0 = await ratchetEncrypt(crypto, alice, enc.encode('m0')); + const m1 = await ratchetEncrypt(crypto, alice, enc.encode('m1')); + const m2 = await ratchetEncrypt(crypto, alice, enc.encode('m2')); + + // Bob receives m1 first, caching m0's key. Then m0 (delivered + // via the cache). After this, m0's skipped key is consumed. + expect(dec.decode(await ratchetDecrypt(crypto, bob, m1))).toBe('m1'); + expect(dec.decode(await ratchetDecrypt(crypto, bob, m0))).toBe('m0'); + + // Replay of m0: skippedKey is gone, chain.counter is past m0. + // Pre-fix: this would corrupt Bob's chain state; post-fix it + // throws cleanly. + await expect(ratchetDecrypt(crypto, bob, m0)).rejects.toThrow(DecryptionError); + + // Bob can still decrypt the remaining valid message — chain + // state was NOT mutated by the rejected replay. + expect(dec.decode(await ratchetDecrypt(crypto, bob, m2))).toBe('m2'); + }); }); // ─── Long Conversation ──────────────────────────────────── diff --git a/packages/shade-crypto-web/package.json b/packages/shade-crypto-web/package.json index fd4027e..df58d1a 100644 --- a/packages/shade-crypto-web/package.json +++ b/packages/shade-crypto-web/package.json @@ -1,6 +1,6 @@ { "name": "@shade/crypto-web", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-dashboard/package.json b/packages/shade-dashboard/package.json index 2c3c0ee..bae816d 100644 --- a/packages/shade-dashboard/package.json +++ b/packages/shade-dashboard/package.json @@ -1,6 +1,6 @@ { "name": "@shade/dashboard", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "scripts": { "dev": "vite", diff --git a/packages/shade-files/package.json b/packages/shade-files/package.json index 44e1a53..8e27063 100644 --- a/packages/shade-files/package.json +++ b/packages/shade-files/package.json @@ -1,6 +1,6 @@ { "name": "@shade/files", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-files/tests/integration/concurrent-ratchet.test.ts b/packages/shade-files/tests/integration/concurrent-ratchet.test.ts new file mode 100644 index 0000000..3489ae3 --- /dev/null +++ b/packages/shade-files/tests/integration/concurrent-ratchet.test.ts @@ -0,0 +1,184 @@ +import { describe, expect, test } from 'bun:test'; +import { createShade } from '@shade/sdk'; +import { + createPrekeyServer, + MemoryPrekeyStore, + PrekeyServerEvents, +} from '@shade/server'; +import { SubtleCryptoProvider } from '@shade/crypto-web'; +import { Hono } from 'hono'; + +const crypto = new SubtleCryptoProvider(); + +/** + * Concurrent-ratchet hardening tests. + * + * Reproduces the scenario described in the v4.2.0 ratchet-desync bug + * report: with the queue-drainer running on Alice and many concurrent + * `shade.send`/RPC operations against the same peer, do + * encrypt/decrypt paths share the per-peer mutex on + * `ShadeSessionManager` so that no path observes a stale ratchet + * state? + * + * If the lock coverage regresses (a future change re-introduces a + * sidekanal bypass), one of these tests will fail with + * `DecryptionError: Failed to decrypt message — wrong key or + * tampered data`. + */ +async function setupPullRig() { + const prekey = createPrekeyServer({ + crypto, + store: new MemoryPrekeyStore(), + disableRateLimit: true, + events: new PrekeyServerEvents(), + }); + const prekeyServer = Bun.serve({ port: 0, fetch: prekey.fetch }); + const prekeyUrl = `http://localhost:${prekeyServer.port}`; + + const alice = await createShade({ prekeyServer: prekeyUrl, address: 'alice' }); + const bob = await createShade({ prekeyServer: prekeyUrl, address: 'bob' }); + + const queueRoute = await bob.transferQueueRoute({ blockMs: 500 }); + await bob.files.serve({ + stat: async () => ({ + name: '_', + kind: 'dir' as const, + size: 0, + mtime: 0, + metadata: {}, + }), + }); + const rpcRoute = bob.files.rpcRoute({ acceptFirstMessage: true }); + + const app = new Hono(); + app.route('/', queueRoute); + app.route('/', rpcRoute); + const bobServer = Bun.serve({ port: 0, fetch: app.fetch }); + const baseUrl = `http://localhost:${bobServer.port}`; + + const fs = alice.files.httpClient('bob', { + rpcUrl: `${baseUrl}/rpc`, + outboundQueueUrl: `${baseUrl}/queue`, + transferBaseUrl: baseUrl, + defaultTimeoutMs: 10_000, + queueBlockMs: 500, + }); + + return { + alice, + bob, + fs, + baseUrl, + teardown: async () => { + fs.close(); + await alice.shutdown(); + await bob.shutdown(); + bobServer.stop(); + prekeyServer.stop(); + }, + }; +} + +describe('@shade/files — concurrent ratchet under drainer', () => { + test('100 parallel httpClient RPCs while drainer runs — no DecryptionError', async () => { + const rig = await setupPullRig(); + try { + // Warm-up: establishes the X3DH session (Alice → Bob first message + // is a PreKeyMessage; subsequent messages are pure ratchet). + const first = await rig.fs.stat('/'); + expect(first.kind).toBe('dir'); + + // Fire 100 concurrent stat RPCs. Each one is a full ratchet + // round-trip: encrypt request, POST, decrypt response. They all + // contend for `manager.peerOpChains["bob"]` on Alice's side + // (encrypt + decrypt) and `manager.peerOpChains["alice"]` on + // Bob's side. Drainer is running in the background polling + // Bob's queue — its decrypt path also funnels through the same + // per-peer lock. + // 100 concurrent — minimal repro (after warm-up only). + const N = 100; + const results = await Promise.allSettled( + Array.from({ length: N }, () => rig.fs.stat('/')), + ); + const failures = results.filter((s) => s.status === 'rejected') as Array< + PromiseRejectedResult + >; + if (failures.length > 0) { + const sample = failures.slice(0, 1).map((f) => String(f.reason)); + throw new Error(`${failures.length}/${N} concurrent RPCs failed: ${sample[0]}`); + } + } finally { + await rig.teardown(); + } + }, 30_000); + + test('parallel shade.send + drainer + RPCs — ratchet stays in sync', async () => { + const rig = await setupPullRig(); + try { + // Establish session via one warm-up RPC. + await rig.fs.stat('/'); + + // Subscribe Bob to inbound plaintext from Alice — when Alice's + // raw `shade.send` plaintext arrives, Bob echoes a reply back + // through `shade.send` + `deliverControlEnvelope`, which the + // pull-mode envelope transport enqueues for Alice's drainer. + // This injects extra inbound traffic into Alice's drainer in + // parallel with her ongoing RPCs. + const echoes: string[] = []; + rig.bob.onMessage(async (from, plaintext) => { + if (from !== 'alice') return; + if (!plaintext.startsWith('ping:')) return; + echoes.push(plaintext); + const reply = `pong:${plaintext.slice('ping:'.length)}`; + const env = await rig.bob.send('alice', reply); + await rig.bob.deliverControlEnvelope('alice', env); + }); + + const inboundDrained: string[] = []; + rig.alice.onMessage((from, plaintext) => { + if (from !== 'bob') return; + if (plaintext.startsWith('pong:')) inboundDrained.push(plaintext); + }); + + // Mix three concurrent workloads against the same peer: + // - 50 inline file RPCs through httpClient (encrypt + decrypt) + // - 50 raw `shade.send` deliveries via control envelope + // - drainer pulling Bob's responses + echoes + const N = 50; + const rpcs = Array.from({ length: N }, () => rig.fs.stat('/')); + const sends = Array.from({ length: N }, async (_, i) => { + const env = await rig.alice.send('bob', `ping:${i}`); + await rig.alice.deliverControlEnvelope('bob', env); + }); + + const settled = await Promise.allSettled([...rpcs, ...sends]); + const failures = settled.filter((s) => s.status === 'rejected') as Array< + PromiseRejectedResult + >; + if (failures.length > 0) { + const sample = failures.slice(0, 3).map((f) => String(f.reason)); + throw new Error( + `${failures.length}/${settled.length} concurrent ops failed: ${sample.join(' | ')}`, + ); + } + + // Give Bob's queue + Alice's drainer a beat to drain pongs back. + // Echoes round-trip Alice → Bob (control envelope) → Bob's + // onMessage → Bob.send + deliver (queue) → Alice's drainer → + // Alice's onMessage. We just verify some make it back without + // any DecryptionError surfacing. + const deadline = Date.now() + 5_000; + while (inboundDrained.length < N && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 50)); + } + // Don't gate on every echo arriving — the long-poll cadence and + // bun's serve/abort timing can lag a few. We only care that the + // ratchet didn't desync; if it had, every subsequent op would + // throw DecryptionError above. + expect(echoes.length).toBe(N); + expect(inboundDrained.length).toBeGreaterThan(0); + } finally { + await rig.teardown(); + } + }, 30_000); +}); diff --git a/packages/shade-inbox-server/package.json b/packages/shade-inbox-server/package.json index 6fae038..0e40725 100644 --- a/packages/shade-inbox-server/package.json +++ b/packages/shade-inbox-server/package.json @@ -1,6 +1,6 @@ { "name": "@shade/inbox-server", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-inbox/package.json b/packages/shade-inbox/package.json index b3a80ac..1a11b2a 100644 --- a/packages/shade-inbox/package.json +++ b/packages/shade-inbox/package.json @@ -1,6 +1,6 @@ { "name": "@shade/inbox", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-key-transparency/package.json b/packages/shade-key-transparency/package.json index 26113d8..e3b7569 100644 --- a/packages/shade-key-transparency/package.json +++ b/packages/shade-key-transparency/package.json @@ -1,6 +1,6 @@ { "name": "@shade/key-transparency", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-keychain/package.json b/packages/shade-keychain/package.json index 17c4ebe..8efd31b 100644 --- a/packages/shade-keychain/package.json +++ b/packages/shade-keychain/package.json @@ -1,6 +1,6 @@ { "name": "@shade/keychain", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-observability/package.json b/packages/shade-observability/package.json index 011b2db..7afcc16 100644 --- a/packages/shade-observability/package.json +++ b/packages/shade-observability/package.json @@ -1,6 +1,6 @@ { "name": "@shade/observability", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-observer/package.json b/packages/shade-observer/package.json index 414f20d..604e50f 100644 --- a/packages/shade-observer/package.json +++ b/packages/shade-observer/package.json @@ -1,6 +1,6 @@ { "name": "@shade/observer", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-proto/package.json b/packages/shade-proto/package.json index 214e608..993e9c7 100644 --- a/packages/shade-proto/package.json +++ b/packages/shade-proto/package.json @@ -1,6 +1,6 @@ { "name": "@shade/proto", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-recovery/package.json b/packages/shade-recovery/package.json index 12b9feb..53feed6 100644 --- a/packages/shade-recovery/package.json +++ b/packages/shade-recovery/package.json @@ -1,6 +1,6 @@ { "name": "@shade/recovery", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-sdk/package.json b/packages/shade-sdk/package.json index beb9849..097916d 100644 --- a/packages/shade-sdk/package.json +++ b/packages/shade-sdk/package.json @@ -1,6 +1,6 @@ { "name": "@shade/sdk", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-server/package.json b/packages/shade-server/package.json index c5f7f51..cdf4b2c 100644 --- a/packages/shade-server/package.json +++ b/packages/shade-server/package.json @@ -1,6 +1,6 @@ { "name": "@shade/server", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-storage-encrypted/package.json b/packages/shade-storage-encrypted/package.json index ee15f43..d6eeee3 100644 --- a/packages/shade-storage-encrypted/package.json +++ b/packages/shade-storage-encrypted/package.json @@ -1,6 +1,6 @@ { "name": "@shade/storage-encrypted", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-storage-postgres/package.json b/packages/shade-storage-postgres/package.json index 2cbcbc6..caa0582 100644 --- a/packages/shade-storage-postgres/package.json +++ b/packages/shade-storage-postgres/package.json @@ -1,6 +1,6 @@ { "name": "@shade/storage-postgres", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-storage-sqlite/package.json b/packages/shade-storage-sqlite/package.json index 92dbfcd..15555bc 100644 --- a/packages/shade-storage-sqlite/package.json +++ b/packages/shade-storage-sqlite/package.json @@ -1,6 +1,6 @@ { "name": "@shade/storage-sqlite", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-streams/package.json b/packages/shade-streams/package.json index b46c39a..99d9a38 100644 --- a/packages/shade-streams/package.json +++ b/packages/shade-streams/package.json @@ -1,6 +1,6 @@ { "name": "@shade/streams", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-transfer/package.json b/packages/shade-transfer/package.json index c7e320a..38ee792 100644 --- a/packages/shade-transfer/package.json +++ b/packages/shade-transfer/package.json @@ -1,6 +1,6 @@ { "name": "@shade/transfer", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-transfer/src/transport/queue-transport.ts b/packages/shade-transfer/src/transport/queue-transport.ts index d6c30af..e2c3ac3 100644 --- a/packages/shade-transfer/src/transport/queue-transport.ts +++ b/packages/shade-transfer/src/transport/queue-transport.ts @@ -81,6 +81,14 @@ const DEFAULT_IDLE_EVICTION_MS = 10 * 60 * 1000; interface PendingWaiter { resolve(events: QueuedEvent[]): void; reject(err: Error): void; + /** + * The waiter's `since` cursor — only events with `id > since` should + * be delivered when this waiter is resolved. Without this, an + * enqueue that arrives while a poller is waiting would replay + * already-processed events, causing the receiver to double-decrypt + * (and corrupt ratchet state). + */ + since: number; timer: ReturnType; abortHandler?: () => void; signal?: AbortSignal; @@ -140,16 +148,21 @@ export class OutboundQueue { // last polled id; the @shade/transfer engine handles missing seqs // by re-sending on resume. while (state.events.length > this.maxEvents) state.events.shift(); - // Wake all waiters with whatever has accumulated. - const drained = this.collect(state, 0); - if (drained.length > 0) { + // Wake each waiter with events newer than ITS OWN `since`. Using a + // shared snapshot from `since=0` would replay events the waiter has + // already processed once a fresh enqueue arrived mid-poll, which on + // the receiver side double-dispatches an envelope into shade.receive + // → manager.decrypt and consumes the same skipped-key twice (the + // second dispatch corrupts the ratchet chain). + if (state.waiters.length > 0) { const waiters = state.waiters.splice(0); for (const w of waiters) { clearTimeout(w.timer); if (w.abortHandler !== undefined && w.signal !== undefined) { w.signal.removeEventListener('abort', w.abortHandler); } - w.resolve(drained); + const wDrained = this.collect(state, w.since); + w.resolve(wDrained); } } return event; @@ -181,7 +194,7 @@ export class OutboundQueue { // Empty drain on timeout — that's the "no new events" signal. resolve([]); }, blockMs); - const waiter: PendingWaiter = { resolve, reject, timer }; + const waiter: PendingWaiter = { resolve, reject, since, timer }; if (signal !== undefined) { const handler = () => { const idx = state.waiters.indexOf(waiter); diff --git a/packages/shade-transfer/tests/outbound-queue.test.ts b/packages/shade-transfer/tests/outbound-queue.test.ts new file mode 100644 index 0000000..3027e14 --- /dev/null +++ b/packages/shade-transfer/tests/outbound-queue.test.ts @@ -0,0 +1,60 @@ +import { describe, expect, test } from 'bun:test'; +import { OutboundQueue } from '../src/index.js'; + +/** + * Regression coverage for the long-poll waiter `since` cursor. + * + * The bug being guarded against: when `enqueue` woke a pending + * `drain` waiter, it used a `since=0` snapshot and replayed every + * event that had ever been queued — including the ones the waiter + * had already processed in a previous poll. Downstream the queue + * fed `Shade.acceptTransferEnvelope`, so the duplicate replay + * dispatched the same envelope into `manager.decrypt` twice. The + * second decrypt consumed an already-used skipped key, fell into + * the stale-counter branch of `ratchetDecrypt`, and corrupted the + * Double Ratchet receive chain — surfacing as + * `DecryptionError: wrong key or tampered data` on every + * subsequent message. + */ +describe('OutboundQueue — waiter since cursor', () => { + test('mid-poll enqueue must not replay events the waiter already saw', async () => { + const queue = new OutboundQueue({ idleEvictionMs: 0 }); + const peer = 'alice'; + const e1 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([1]) }); + const e2 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([2]) }); + + // First poll drains both events (no blocking — they're already there). + const first = await queue.drain(peer, 0, 0); + expect(first.map((e) => e.id)).toEqual([e1.id, e2.id]); + + // Now the waiter polls past the last seen id. It blocks because + // there are no events newer than `since`. Concurrently a fresh + // event gets enqueued — that's the path the bug fired on. + const blockMs = 5_000; + const polling = queue.drain(peer, e2.id, blockMs); + // Yield so `drain` actually parks on the waiter list before we + // race the enqueue against it. + await Promise.resolve(); + const e3 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([3]) }); + const woken = await polling; + + // Pre-fix: would resolve with [e1, e2, e3] (a `since=0` snapshot + // drained verbatim). Post-fix: only the events newer than the + // waiter's recorded `since` come through. + expect(woken.map((e) => e.id)).toEqual([e3.id]); + }); + + test('parked waiter at the head still gets the new event when others have polled past it', async () => { + const queue = new OutboundQueue({ idleEvictionMs: 0 }); + const peer = 'alice'; + const e1 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([1]) }); + + // A waiter that parks past the head — there are no events newer + // than e1.id, so it has to block. + const polling = queue.drain(peer, e1.id, 5_000); + await Promise.resolve(); + const e2 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([2]) }); + const woken = await polling; + expect(woken.map((e) => e.id)).toEqual([e2.id]); + }); +}); diff --git a/packages/shade-transport-bridge/package.json b/packages/shade-transport-bridge/package.json index f07f9e7..4217563 100644 --- a/packages/shade-transport-bridge/package.json +++ b/packages/shade-transport-bridge/package.json @@ -1,6 +1,6 @@ { "name": "@shade/transport-bridge", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-transport-webrtc/package.json b/packages/shade-transport-webrtc/package.json index 454798b..95c4853 100644 --- a/packages/shade-transport-webrtc/package.json +++ b/packages/shade-transport-webrtc/package.json @@ -1,6 +1,6 @@ { "name": "@shade/transport-webrtc", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-transport/package.json b/packages/shade-transport/package.json index 84f7bfd..93dcffd 100644 --- a/packages/shade-transport/package.json +++ b/packages/shade-transport/package.json @@ -1,6 +1,6 @@ { "name": "@shade/transport", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-widgets/package.json b/packages/shade-widgets/package.json index 5c33607..0430fc1 100644 --- a/packages/shade-widgets/package.json +++ b/packages/shade-widgets/package.json @@ -1,6 +1,6 @@ { "name": "@shade/widgets", - "version": "4.2.0", + "version": "4.2.1", "type": "module", "main": "src/index.ts", "types": "src/index.ts",