import { describe, test, expect } from 'bun:test'; import { SubtleCryptoProvider } from '@shade/crypto-web'; import { sha256Once } from '@shade/streams'; import { TransferEngine, MemoryControlChannel, MemoryTransferTransport, } from '../src/index.js'; import type { IncomingTransfer, TransferResult } from '../src/index.js'; const crypto = new SubtleCryptoProvider(); function hex(bytes: Uint8Array): string { return Array.from(bytes, (b) => b.toString(16).padStart(2, '0')).join(''); } interface PairedEngines { sender: TransferEngine; receiver: TransferEngine; } function makePair(): PairedEngines { const { a: ctrlA, b: ctrlB } = MemoryControlChannel.linked('alice', 'bob'); const { a: txA, b: txB } = MemoryTransferTransport.linked('alice', 'bob'); const senderEngine = new TransferEngine({ crypto, controlChannel: ctrlA, transport: txA, myAddress: 'alice', }); const receiverEngine = new TransferEngine({ crypto, controlChannel: ctrlB, transport: txB, myAddress: 'bob', }); // Wire receiver-side transport to route chunks into receiver-engine. txB.setChunkHandler(async (from, streamId, laneId, seq, bytes) => receiverEngine.receiveChunk(from, streamId, laneId, seq, bytes), ); txB.setResumeProvider(async (from, streamId) => receiverEngine.getResumeState(from, streamId), ); return { sender: senderEngine, receiver: receiverEngine }; } async function uploadAndAwait( pair: PairedEngines, input: Uint8Array, opts?: { lanes?: number; chunkSize?: number; partition?: 'auto' | 'range' | 'round-robin' }, ): Promise<{ result: TransferResult; received: Uint8Array }> { // The handler accepts and PUBLISHES the receive-handle out-of-band so // it can return promptly (control channel awaits handler completion). let resolveReceiveHandle!: (h: import('../src/index.js').TransferHandle) => void; const receiveHandlePromise = new Promise( (r) => { resolveReceiveHandle = r; }, ); const unsubscribe = pair.receiver.onIncomingTransfer(async (incoming: IncomingTransfer) => { const handle = await incoming.accept({ output: { kind: 'buffer' } }); resolveReceiveHandle(handle); }); const handle = await pair.sender.upload({ to: 'bob', input, ...(opts?.lanes !== undefined ? { lanes: opts.lanes } : {}), ...(opts?.chunkSize !== undefined ? { chunkSize: opts.chunkSize } : {}), ...(opts?.partition !== undefined ? { partition: opts.partition } : {}), metadata: { name: 'test.bin', contentType: 'application/octet-stream' }, }); const recvHandle = await receiveHandlePromise; const [senderResult, receiverResult] = await Promise.all([handle.done(), recvHandle.done()]); unsubscribe(); const bytes = (receiverResult as TransferResult & { bytes?: Uint8Array }).bytes ?? new Uint8Array(0); return { result: senderResult, received: bytes }; } describe('TransferEngine (memory loopback)', () => { test('1 KiB upload — 1 lane (auto-degrade for small file)', async () => { const pair = makePair(); const input = crypto.randomBytes(1024); const { result, received } = await uploadAndAwait(pair, input, { lanes: 4, chunkSize: 256 }); expect(received).toEqual(input); expect(result.sha256).toBe(hex(sha256Once(input))); }); test('256 KiB upload — 4 lanes range partition', async () => { const pair = makePair(); const input = crypto.randomBytes(256 * 1024); const { result, received } = await uploadAndAwait(pair, input, { lanes: 4, chunkSize: 16 * 1024, partition: 'range', }); expect(received).toEqual(input); expect(result.sha256).toBe(hex(sha256Once(input))); }); test('1 MiB upload — 4 lanes round-robin partition', async () => { const pair = makePair(); const input = crypto.randomBytes(1024 * 1024); const { result, received } = await uploadAndAwait(pair, input, { lanes: 4, chunkSize: 64 * 1024, partition: 'round-robin', }); expect(received).toEqual(input); expect(result.sha256).toBe(hex(sha256Once(input))); }); test('integrity: same sha256 across lane counts', async () => { const input = crypto.randomBytes(512 * 1024); const expected = hex(sha256Once(input)); for (const lanes of [1, 2, 4, 8]) { const pair = makePair(); const { result, received } = await uploadAndAwait(pair, input, { lanes, chunkSize: 8 * 1024 }); expect(received).toEqual(input); expect(result.sha256).toBe(expected); } }); test('upload with ReadableStream input → round-robin partition', async () => { const pair = makePair(); const input = crypto.randomBytes(300 * 1024); const stream = new ReadableStream({ start(controller) { for (let off = 0; off < input.length; off += 64 * 1024) { controller.enqueue(input.subarray(off, Math.min(off + 64 * 1024, input.length))); } controller.close(); }, }); let resolveRecv!: (h: import('../src/index.js').TransferHandle) => void; const recvHandlePromise = new Promise((r) => { resolveRecv = r; }); const unsubscribe = pair.receiver.onIncomingTransfer(async (incoming) => { const h = await incoming.accept({ output: { kind: 'buffer' } }); resolveRecv(h); }); const handle = await pair.sender.upload({ to: 'bob', input: stream, lanes: 4, chunkSize: 32 * 1024, }); const recvHandle = await recvHandlePromise; const [, recvResult] = await Promise.all([handle.done(), recvHandle.done()]); unsubscribe(); const bytes = (recvResult as TransferResult & { bytes?: Uint8Array }).bytes ?? new Uint8Array(); expect(bytes).toEqual(input); }); test('progress events fire and end with complete', async () => { const pair = makePair(); const input = crypto.randomBytes(64 * 1024); const senderProgressSamples: number[] = []; const receiverEvents: string[] = []; let resolveRecv!: (h: import('../src/index.js').TransferHandle) => void; const recvHandlePromise = new Promise((r) => { resolveRecv = r; }); const unsub = pair.receiver.onIncomingTransfer(async (incoming) => { const h = await incoming.accept({ output: { kind: 'buffer' }, onEvent: (e) => { receiverEvents.push(e.type); }, }); resolveRecv(h); }); const handle = await pair.sender.upload({ to: 'bob', input, lanes: 2, chunkSize: 8 * 1024, onProgress: (p) => senderProgressSamples.push(p.bytesSent), }); const recvHandle = await recvHandlePromise; await Promise.all([handle.done(), recvHandle.done()]); unsub(); expect(senderProgressSamples.length).toBeGreaterThan(0); expect(senderProgressSamples[senderProgressSamples.length - 1]).toBe(64 * 1024); expect(receiverEvents).toContain('start'); expect(receiverEvents).toContain('complete'); }); });