Files

194 lines
6.0 KiB
TypeScript
Raw Permalink Normal View History

/**
* End-to-end test of `WebRtcTransferTransport` against the manager + memory
* factory. Exercises the same `ITransferTransport` API the engine calls
* (`probe`, `sendChunk`, `fetchResumeState`).
*/
import { afterEach, describe, expect, it } from 'bun:test';
import { MemoryRtcFactory } from '../src/memory-rtc.js';
import { MemoryShadeBridge, WebRtcSignalingChannel } from '../src/signaling.js';
import { WebRtcConnectionManager } from '../src/manager.js';
import {
DEFAULT_MAX_DATACHANNEL_MESSAGE,
WebRtcTransferTransport,
} from '../src/transport.js';
import { streamIdBytesToString } from '../src/wire.js';
afterEach(() => {
MemoryRtcFactory.reset();
});
function paired(opts: {
bobReceiver: import('../src/connection.js').WebRtcReceiverHooks;
}): {
alice: WebRtcConnectionManager;
bob: WebRtcConnectionManager;
aliceTransport: WebRtcTransferTransport;
} {
const factory = new MemoryRtcFactory();
const { a, b } = MemoryShadeBridge.linked('alice', 'bob');
const aliceSig = new WebRtcSignalingChannel(a);
const bobSig = new WebRtcSignalingChannel(b);
const alice = new WebRtcConnectionManager({ factory, signaling: aliceSig });
const bob = new WebRtcConnectionManager({
factory,
signaling: bobSig,
receiver: opts.bobReceiver,
});
const aliceTransport = new WebRtcTransferTransport({ manager: alice });
return { alice, bob, aliceTransport };
}
function makeStreamId(): string {
const b = new Uint8Array(16);
globalThis.crypto.getRandomValues(b);
return streamIdBytesToString(b);
}
describe('WebRtcTransferTransport', () => {
it('probe opens the peer connection', async () => {
const { alice, bob, aliceTransport } = paired({
bobReceiver: {
async onChunk() {
return { lastSeq: 0 };
},
async onResumeQuery() {
return null;
},
},
});
await aliceTransport.probe('bob');
expect(alice.isConnected('bob')).toBe(true);
alice.destroy();
bob.destroy();
});
it('sendChunk routes envelope to receiver and returns the ack', async () => {
let received: { laneId: number; seq: bigint; bytes: number } | null = null;
const { alice, bob, aliceTransport } = paired({
bobReceiver: {
async onChunk(_from, _streamId, laneId, seq, envelope) {
received = { laneId, seq, bytes: envelope.length };
return { lastSeq: Number(seq), bytesReceived: envelope.length };
},
async onResumeQuery() {
return null;
},
},
});
const streamId = makeStreamId();
const envelope = new Uint8Array(2048);
envelope.fill(0x42);
const ack = await aliceTransport.sendChunk('bob', streamId, 1, 5n, envelope);
expect(ack.lastSeq).toBe(5);
expect(ack.bytesReceived).toBe(2048);
expect(received).not.toBeNull();
expect(received!.laneId).toBe(1);
expect(received!.seq).toBe(5n);
expect(received!.bytes).toBe(2048);
alice.destroy();
bob.destroy();
});
it('rejects oversized envelopes that would exceed the data channel cap', async () => {
const { alice, bob, aliceTransport } = paired({
bobReceiver: {
async onChunk() {
return { lastSeq: 0 };
},
async onResumeQuery() {
return null;
},
},
});
const streamId = makeStreamId();
const huge = new Uint8Array(DEFAULT_MAX_DATACHANNEL_MESSAGE + 1);
await expect(aliceTransport.sendChunk('bob', streamId, 0, 0n, huge)).rejects.toThrow(
/frame too large/,
);
alice.destroy();
bob.destroy();
});
it('fetchResumeState returns parsed state when the receiver knows the stream', async () => {
const { alice, bob, aliceTransport } = paired({
bobReceiver: {
async onChunk() {
return { lastSeq: 0 };
},
async onResumeQuery(_from, streamId) {
return {
streamId,
lanes: [
{ laneId: 0, lastSeqAcked: 11 },
{ laneId: 1, lastSeqAcked: 4 },
],
};
},
},
});
const sid = makeStreamId();
const state = await aliceTransport.fetchResumeState('bob', sid);
expect(state).not.toBeNull();
expect(state!.streamId).toBe(sid);
expect(state!.lanes[0]!.lastSeqAcked).toBe(11);
expect(state!.lanes[1]!.lastSeqAcked).toBe(4);
alice.destroy();
bob.destroy();
});
it('fetchResumeState returns null when the peer reports not found', async () => {
const { alice, bob, aliceTransport } = paired({
bobReceiver: {
async onChunk() {
return { lastSeq: 0 };
},
async onResumeQuery() {
return null;
},
},
});
const state = await aliceTransport.fetchResumeState('bob', makeStreamId());
expect(state).toBeNull();
alice.destroy();
bob.destroy();
});
it('multiple in-flight requests interleave correctly via requestId correlation', async () => {
let inflight = 0;
let maxInflight = 0;
const { alice, bob, aliceTransport } = paired({
bobReceiver: {
async onChunk(_from, _streamId, _laneId, seq) {
inflight++;
if (inflight > maxInflight) maxInflight = inflight;
// Stagger response so request ordering doesn't trivially match
// response ordering.
await new Promise<void>((resolve) =>
setTimeout(resolve, Number(seq % 5n) * 5),
);
inflight--;
return { lastSeq: Number(seq), bytesReceived: 0 };
},
async onResumeQuery() {
return null;
},
},
});
const streamId = makeStreamId();
const acks = await Promise.all(
Array.from({ length: 12 }, (_, i) =>
aliceTransport.sendChunk('bob', streamId, i % 4, BigInt(i), new Uint8Array(8)),
),
);
// Each ack matches its request seq (round-trip via requestId).
for (let i = 0; i < acks.length; i++) {
expect(acks[i]!.lastSeq).toBe(i);
}
expect(maxInflight).toBeGreaterThan(1);
alice.destroy();
bob.destroy();
});
});