61 lines
2.8 KiB
TypeScript
61 lines
2.8 KiB
TypeScript
|
|
import { describe, expect, test } from 'bun:test';
|
||
|
|
import { OutboundQueue } from '../src/index.js';
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Regression coverage for the long-poll waiter `since` cursor.
|
||
|
|
*
|
||
|
|
* The bug being guarded against: when `enqueue` woke a pending
|
||
|
|
* `drain` waiter, it used a `since=0` snapshot and replayed every
|
||
|
|
* event that had ever been queued — including the ones the waiter
|
||
|
|
* had already processed in a previous poll. Downstream the queue
|
||
|
|
* fed `Shade.acceptTransferEnvelope`, so the duplicate replay
|
||
|
|
* dispatched the same envelope into `manager.decrypt` twice. The
|
||
|
|
* second decrypt consumed an already-used skipped key, fell into
|
||
|
|
* the stale-counter branch of `ratchetDecrypt`, and corrupted the
|
||
|
|
* Double Ratchet receive chain — surfacing as
|
||
|
|
* `DecryptionError: wrong key or tampered data` on every
|
||
|
|
* subsequent message.
|
||
|
|
*/
|
||
|
|
describe('OutboundQueue — waiter since cursor', () => {
|
||
|
|
test('mid-poll enqueue must not replay events the waiter already saw', async () => {
|
||
|
|
const queue = new OutboundQueue({ idleEvictionMs: 0 });
|
||
|
|
const peer = 'alice';
|
||
|
|
const e1 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([1]) });
|
||
|
|
const e2 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([2]) });
|
||
|
|
|
||
|
|
// First poll drains both events (no blocking — they're already there).
|
||
|
|
const first = await queue.drain(peer, 0, 0);
|
||
|
|
expect(first.map((e) => e.id)).toEqual([e1.id, e2.id]);
|
||
|
|
|
||
|
|
// Now the waiter polls past the last seen id. It blocks because
|
||
|
|
// there are no events newer than `since`. Concurrently a fresh
|
||
|
|
// event gets enqueued — that's the path the bug fired on.
|
||
|
|
const blockMs = 5_000;
|
||
|
|
const polling = queue.drain(peer, e2.id, blockMs);
|
||
|
|
// Yield so `drain` actually parks on the waiter list before we
|
||
|
|
// race the enqueue against it.
|
||
|
|
await Promise.resolve();
|
||
|
|
const e3 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([3]) });
|
||
|
|
const woken = await polling;
|
||
|
|
|
||
|
|
// Pre-fix: would resolve with [e1, e2, e3] (a `since=0` snapshot
|
||
|
|
// drained verbatim). Post-fix: only the events newer than the
|
||
|
|
// waiter's recorded `since` come through.
|
||
|
|
expect(woken.map((e) => e.id)).toEqual([e3.id]);
|
||
|
|
});
|
||
|
|
|
||
|
|
test('parked waiter at the head still gets the new event when others have polled past it', async () => {
|
||
|
|
const queue = new OutboundQueue({ idleEvictionMs: 0 });
|
||
|
|
const peer = 'alice';
|
||
|
|
const e1 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([1]) });
|
||
|
|
|
||
|
|
// A waiter that parks past the head — there are no events newer
|
||
|
|
// than e1.id, so it has to block.
|
||
|
|
const polling = queue.drain(peer, e1.id, 5_000);
|
||
|
|
await Promise.resolve();
|
||
|
|
const e2 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([2]) });
|
||
|
|
const woken = await polling;
|
||
|
|
expect(woken.map((e) => e.id)).toEqual([e2.id]);
|
||
|
|
});
|
||
|
|
});
|