Pull-mode httpClient + drainer + parallel RPCs against the same peer deteriorated after ~10s with `DecryptionError`. Two bugs combined: - `OutboundQueue.enqueue` woke `drain` waiters with a `since=0` snapshot, replaying already-processed events into `Shade.acceptTransferEnvelope` → `manager.decrypt` twice. The duplicate consumed an already-used skipped key and corrupted the Double Ratchet receive chain. - `ratchetDecrypt` then propagated the corruption: a same-DH message behind the chain with no cached skipped key fell through to `kdfChainKey` on the ahead state and rewound `chain.counter`, permanently desyncing the chain. Fix `OutboundQueue` to honor each waiter's `since`, and harden `ratchetDecrypt` so any future duplicate fails cleanly without mutating state. Adds regression coverage at all three layers. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
185 lines
6.7 KiB
TypeScript
185 lines
6.7 KiB
TypeScript
import { describe, expect, test } from 'bun:test';
|
|
import { createShade } from '@shade/sdk';
|
|
import {
|
|
createPrekeyServer,
|
|
MemoryPrekeyStore,
|
|
PrekeyServerEvents,
|
|
} from '@shade/server';
|
|
import { SubtleCryptoProvider } from '@shade/crypto-web';
|
|
import { Hono } from 'hono';
|
|
|
|
const crypto = new SubtleCryptoProvider();
|
|
|
|
/**
|
|
* Concurrent-ratchet hardening tests.
|
|
*
|
|
* Reproduces the scenario described in the v4.2.0 ratchet-desync bug
|
|
* report: with the queue-drainer running on Alice and many concurrent
|
|
* `shade.send`/RPC operations against the same peer, do
|
|
* encrypt/decrypt paths share the per-peer mutex on
|
|
* `ShadeSessionManager` so that no path observes a stale ratchet
|
|
* state?
|
|
*
|
|
* If the lock coverage regresses (a future change re-introduces a
|
|
* sidekanal bypass), one of these tests will fail with
|
|
* `DecryptionError: Failed to decrypt message — wrong key or
|
|
* tampered data`.
|
|
*/
|
|
async function setupPullRig() {
|
|
const prekey = createPrekeyServer({
|
|
crypto,
|
|
store: new MemoryPrekeyStore(),
|
|
disableRateLimit: true,
|
|
events: new PrekeyServerEvents(),
|
|
});
|
|
const prekeyServer = Bun.serve({ port: 0, fetch: prekey.fetch });
|
|
const prekeyUrl = `http://localhost:${prekeyServer.port}`;
|
|
|
|
const alice = await createShade({ prekeyServer: prekeyUrl, address: 'alice' });
|
|
const bob = await createShade({ prekeyServer: prekeyUrl, address: 'bob' });
|
|
|
|
const queueRoute = await bob.transferQueueRoute({ blockMs: 500 });
|
|
await bob.files.serve({
|
|
stat: async () => ({
|
|
name: '_',
|
|
kind: 'dir' as const,
|
|
size: 0,
|
|
mtime: 0,
|
|
metadata: {},
|
|
}),
|
|
});
|
|
const rpcRoute = bob.files.rpcRoute({ acceptFirstMessage: true });
|
|
|
|
const app = new Hono();
|
|
app.route('/', queueRoute);
|
|
app.route('/', rpcRoute);
|
|
const bobServer = Bun.serve({ port: 0, fetch: app.fetch });
|
|
const baseUrl = `http://localhost:${bobServer.port}`;
|
|
|
|
const fs = alice.files.httpClient('bob', {
|
|
rpcUrl: `${baseUrl}/rpc`,
|
|
outboundQueueUrl: `${baseUrl}/queue`,
|
|
transferBaseUrl: baseUrl,
|
|
defaultTimeoutMs: 10_000,
|
|
queueBlockMs: 500,
|
|
});
|
|
|
|
return {
|
|
alice,
|
|
bob,
|
|
fs,
|
|
baseUrl,
|
|
teardown: async () => {
|
|
fs.close();
|
|
await alice.shutdown();
|
|
await bob.shutdown();
|
|
bobServer.stop();
|
|
prekeyServer.stop();
|
|
},
|
|
};
|
|
}
|
|
|
|
describe('@shade/files — concurrent ratchet under drainer', () => {
|
|
test('100 parallel httpClient RPCs while drainer runs — no DecryptionError', async () => {
|
|
const rig = await setupPullRig();
|
|
try {
|
|
// Warm-up: establishes the X3DH session (Alice → Bob first message
|
|
// is a PreKeyMessage; subsequent messages are pure ratchet).
|
|
const first = await rig.fs.stat('/');
|
|
expect(first.kind).toBe('dir');
|
|
|
|
// Fire 100 concurrent stat RPCs. Each one is a full ratchet
|
|
// round-trip: encrypt request, POST, decrypt response. They all
|
|
// contend for `manager.peerOpChains["bob"]` on Alice's side
|
|
// (encrypt + decrypt) and `manager.peerOpChains["alice"]` on
|
|
// Bob's side. Drainer is running in the background polling
|
|
// Bob's queue — its decrypt path also funnels through the same
|
|
// per-peer lock.
|
|
// 100 concurrent — minimal repro (after warm-up only).
|
|
const N = 100;
|
|
const results = await Promise.allSettled(
|
|
Array.from({ length: N }, () => rig.fs.stat('/')),
|
|
);
|
|
const failures = results.filter((s) => s.status === 'rejected') as Array<
|
|
PromiseRejectedResult
|
|
>;
|
|
if (failures.length > 0) {
|
|
const sample = failures.slice(0, 1).map((f) => String(f.reason));
|
|
throw new Error(`${failures.length}/${N} concurrent RPCs failed: ${sample[0]}`);
|
|
}
|
|
} finally {
|
|
await rig.teardown();
|
|
}
|
|
}, 30_000);
|
|
|
|
test('parallel shade.send + drainer + RPCs — ratchet stays in sync', async () => {
|
|
const rig = await setupPullRig();
|
|
try {
|
|
// Establish session via one warm-up RPC.
|
|
await rig.fs.stat('/');
|
|
|
|
// Subscribe Bob to inbound plaintext from Alice — when Alice's
|
|
// raw `shade.send` plaintext arrives, Bob echoes a reply back
|
|
// through `shade.send` + `deliverControlEnvelope`, which the
|
|
// pull-mode envelope transport enqueues for Alice's drainer.
|
|
// This injects extra inbound traffic into Alice's drainer in
|
|
// parallel with her ongoing RPCs.
|
|
const echoes: string[] = [];
|
|
rig.bob.onMessage(async (from, plaintext) => {
|
|
if (from !== 'alice') return;
|
|
if (!plaintext.startsWith('ping:')) return;
|
|
echoes.push(plaintext);
|
|
const reply = `pong:${plaintext.slice('ping:'.length)}`;
|
|
const env = await rig.bob.send('alice', reply);
|
|
await rig.bob.deliverControlEnvelope('alice', env);
|
|
});
|
|
|
|
const inboundDrained: string[] = [];
|
|
rig.alice.onMessage((from, plaintext) => {
|
|
if (from !== 'bob') return;
|
|
if (plaintext.startsWith('pong:')) inboundDrained.push(plaintext);
|
|
});
|
|
|
|
// Mix three concurrent workloads against the same peer:
|
|
// - 50 inline file RPCs through httpClient (encrypt + decrypt)
|
|
// - 50 raw `shade.send` deliveries via control envelope
|
|
// - drainer pulling Bob's responses + echoes
|
|
const N = 50;
|
|
const rpcs = Array.from({ length: N }, () => rig.fs.stat('/'));
|
|
const sends = Array.from({ length: N }, async (_, i) => {
|
|
const env = await rig.alice.send('bob', `ping:${i}`);
|
|
await rig.alice.deliverControlEnvelope('bob', env);
|
|
});
|
|
|
|
const settled = await Promise.allSettled([...rpcs, ...sends]);
|
|
const failures = settled.filter((s) => s.status === 'rejected') as Array<
|
|
PromiseRejectedResult
|
|
>;
|
|
if (failures.length > 0) {
|
|
const sample = failures.slice(0, 3).map((f) => String(f.reason));
|
|
throw new Error(
|
|
`${failures.length}/${settled.length} concurrent ops failed: ${sample.join(' | ')}`,
|
|
);
|
|
}
|
|
|
|
// Give Bob's queue + Alice's drainer a beat to drain pongs back.
|
|
// Echoes round-trip Alice → Bob (control envelope) → Bob's
|
|
// onMessage → Bob.send + deliver (queue) → Alice's drainer →
|
|
// Alice's onMessage. We just verify some make it back without
|
|
// any DecryptionError surfacing.
|
|
const deadline = Date.now() + 5_000;
|
|
while (inboundDrained.length < N && Date.now() < deadline) {
|
|
await new Promise((r) => setTimeout(r, 50));
|
|
}
|
|
// Don't gate on every echo arriving — the long-poll cadence and
|
|
// bun's serve/abort timing can lag a few. We only care that the
|
|
// ratchet didn't desync; if it had, every subsequent op would
|
|
// throw DecryptionError above.
|
|
expect(echoes.length).toBe(N);
|
|
expect(inboundDrained.length).toBeGreaterThan(0);
|
|
} finally {
|
|
await rig.teardown();
|
|
}
|
|
}, 30_000);
|
|
});
|