/** * End-to-end test of `WebRtcTransferTransport` against the manager + memory * factory. Exercises the same `ITransferTransport` API the engine calls * (`probe`, `sendChunk`, `fetchResumeState`). */ import { afterEach, describe, expect, it } from 'bun:test'; import { MemoryRtcFactory } from '../src/memory-rtc.js'; import { MemoryShadeBridge, WebRtcSignalingChannel } from '../src/signaling.js'; import { WebRtcConnectionManager } from '../src/manager.js'; import { DEFAULT_MAX_DATACHANNEL_MESSAGE, WebRtcTransferTransport, } from '../src/transport.js'; import { streamIdBytesToString } from '../src/wire.js'; afterEach(() => { MemoryRtcFactory.reset(); }); function paired(opts: { bobReceiver: import('../src/connection.js').WebRtcReceiverHooks; }): { alice: WebRtcConnectionManager; bob: WebRtcConnectionManager; aliceTransport: WebRtcTransferTransport; } { const factory = new MemoryRtcFactory(); const { a, b } = MemoryShadeBridge.linked('alice', 'bob'); const aliceSig = new WebRtcSignalingChannel(a); const bobSig = new WebRtcSignalingChannel(b); const alice = new WebRtcConnectionManager({ factory, signaling: aliceSig }); const bob = new WebRtcConnectionManager({ factory, signaling: bobSig, receiver: opts.bobReceiver, }); const aliceTransport = new WebRtcTransferTransport({ manager: alice }); return { alice, bob, aliceTransport }; } function makeStreamId(): string { const b = new Uint8Array(16); globalThis.crypto.getRandomValues(b); return streamIdBytesToString(b); } describe('WebRtcTransferTransport', () => { it('probe opens the peer connection', async () => { const { alice, bob, aliceTransport } = paired({ bobReceiver: { async onChunk() { return { lastSeq: 0 }; }, async onResumeQuery() { return null; }, }, }); await aliceTransport.probe('bob'); expect(alice.isConnected('bob')).toBe(true); alice.destroy(); bob.destroy(); }); it('sendChunk routes envelope to receiver and returns the ack', async () => { let received: { laneId: number; seq: bigint; bytes: number } | null = null; const { alice, bob, aliceTransport } = paired({ bobReceiver: { async onChunk(_from, _streamId, laneId, seq, envelope) { received = { laneId, seq, bytes: envelope.length }; return { lastSeq: Number(seq), bytesReceived: envelope.length }; }, async onResumeQuery() { return null; }, }, }); const streamId = makeStreamId(); const envelope = new Uint8Array(2048); envelope.fill(0x42); const ack = await aliceTransport.sendChunk('bob', streamId, 1, 5n, envelope); expect(ack.lastSeq).toBe(5); expect(ack.bytesReceived).toBe(2048); expect(received).not.toBeNull(); expect(received!.laneId).toBe(1); expect(received!.seq).toBe(5n); expect(received!.bytes).toBe(2048); alice.destroy(); bob.destroy(); }); it('rejects oversized envelopes that would exceed the data channel cap', async () => { const { alice, bob, aliceTransport } = paired({ bobReceiver: { async onChunk() { return { lastSeq: 0 }; }, async onResumeQuery() { return null; }, }, }); const streamId = makeStreamId(); const huge = new Uint8Array(DEFAULT_MAX_DATACHANNEL_MESSAGE + 1); await expect(aliceTransport.sendChunk('bob', streamId, 0, 0n, huge)).rejects.toThrow( /frame too large/, ); alice.destroy(); bob.destroy(); }); it('fetchResumeState returns parsed state when the receiver knows the stream', async () => { const { alice, bob, aliceTransport } = paired({ bobReceiver: { async onChunk() { return { lastSeq: 0 }; }, async onResumeQuery(_from, streamId) { return { streamId, lanes: [ { laneId: 0, lastSeqAcked: 11 }, { laneId: 1, lastSeqAcked: 4 }, ], }; }, }, }); const sid = makeStreamId(); const state = await aliceTransport.fetchResumeState('bob', sid); expect(state).not.toBeNull(); expect(state!.streamId).toBe(sid); expect(state!.lanes[0]!.lastSeqAcked).toBe(11); expect(state!.lanes[1]!.lastSeqAcked).toBe(4); alice.destroy(); bob.destroy(); }); it('fetchResumeState returns null when the peer reports not found', async () => { const { alice, bob, aliceTransport } = paired({ bobReceiver: { async onChunk() { return { lastSeq: 0 }; }, async onResumeQuery() { return null; }, }, }); const state = await aliceTransport.fetchResumeState('bob', makeStreamId()); expect(state).toBeNull(); alice.destroy(); bob.destroy(); }); it('multiple in-flight requests interleave correctly via requestId correlation', async () => { let inflight = 0; let maxInflight = 0; const { alice, bob, aliceTransport } = paired({ bobReceiver: { async onChunk(_from, _streamId, _laneId, seq) { inflight++; if (inflight > maxInflight) maxInflight = inflight; // Stagger response so request ordering doesn't trivially match // response ordering. await new Promise((resolve) => setTimeout(resolve, Number(seq % 5n) * 5), ); inflight--; return { lastSeq: Number(seq), bytesReceived: 0 }; }, async onResumeQuery() { return null; }, }, }); const streamId = makeStreamId(); const acks = await Promise.all( Array.from({ length: 12 }, (_, i) => aliceTransport.sendChunk('bob', streamId, i % 4, BigInt(i), new Uint8Array(8)), ), ); // Each ack matches its request seq (round-trip via requestId). for (let i = 0; i < acks.length; i++) { expect(acks[i]!.lastSeq).toBe(i); } expect(maxInflight).toBeGreaterThan(1); alice.destroy(); bob.destroy(); }); });