import { describe, test, expect, afterAll } from 'bun:test'; import { SubtleCryptoProvider } from '@shade/crypto-web'; import { sha256Once } from '@shade/streams'; import { TransferEngine, MemoryControlChannel, ShadeTransferHttpTransport, createTransferRoutes, type TransferHandle, type TransferResult, } from '../src/index.js'; const crypto = new SubtleCryptoProvider(); function hex(bytes: Uint8Array): string { return Array.from(bytes, (b) => b.toString(16).padStart(2, '0')).join(''); } interface IntegrationServers { baseUrl: string; senderEngine: TransferEngine; receiverEngine: TransferEngine; serverHandle: { stop: () => void }; } const cleanups: Array<() => void> = []; afterAll(() => { for (const c of cleanups) c(); }); async function setup(): Promise { const { a: ctrlA, b: ctrlB } = MemoryControlChannel.linked('alice', 'bob'); const receiverEngine = new TransferEngine({ crypto, controlChannel: ctrlB, transport: { // Receiver-side transport is unused for outgoing operations; we only // route incoming chunks via the Hono server. probe: async () => undefined, sendChunk: async () => { throw new Error('receiver-side sendChunk should not be called'); }, fetchResumeState: async () => null, }, myAddress: 'bob', }); // Spin up a Hono+Bun server that routes chunks into receiverEngine. const app = await createTransferRoutes(receiverEngine); // Bun's `Bun.serve` accepts a Hono app's `fetch` handler. const bunGlobal = (globalThis as unknown as { Bun?: { serve: (opts: unknown) => { url: URL; stop: () => void } } }).Bun; if (bunGlobal === undefined) throw new Error('Bun runtime required for this test'); const server = bunGlobal.serve({ port: 0, fetch: (req: Request) => app.fetch(req), }); const baseUrl = server.url.toString().replace(/\/$/, ''); const senderEngine = new TransferEngine({ crypto, controlChannel: ctrlA, transport: new ShadeTransferHttpTransport({ resolveBaseUrl: async () => baseUrl, authenticator: { signChunk: async () => ({ 'X-Shade-Sender-Address': 'alice' }), signControl: async () => ({ 'X-Shade-Sender-Address': 'alice' }), }, }), myAddress: 'alice', }); cleanups.push(() => { server.stop(); senderEngine.destroy(); receiverEngine.destroy(); }); return { baseUrl, senderEngine, receiverEngine, serverHandle: server }; } async function uploadRoundtrip( servers: IntegrationServers, input: Uint8Array, opts?: { lanes?: number; chunkSize?: number; partition?: 'auto' | 'range' | 'round-robin' }, ): Promise<{ senderResult: TransferResult; received: Uint8Array }> { let resolveRecv!: (h: TransferHandle) => void; const recvHandlePromise = new Promise((r) => { resolveRecv = r; }); const unsubscribe = servers.receiverEngine.onIncomingTransfer(async (incoming) => { const h = await incoming.accept({ output: { kind: 'buffer' } }); resolveRecv(h); }); const handle = await servers.senderEngine.upload({ to: 'bob', input, ...(opts?.lanes !== undefined ? { lanes: opts.lanes } : {}), ...(opts?.chunkSize !== undefined ? { chunkSize: opts.chunkSize } : {}), ...(opts?.partition !== undefined ? { partition: opts.partition } : {}), metadata: { name: 'http-test.bin' }, }); const recvHandle = await recvHandlePromise; const [senderResult, recvResult] = await Promise.all([handle.done(), recvHandle.done()]); unsubscribe(); const received = (recvResult as TransferResult & { bytes?: Uint8Array }).bytes ?? new Uint8Array(); return { senderResult, received }; } describe('HTTP transport — Bun.serve loopback', () => { test('100 KiB / 1 lane', async () => { const servers = await setup(); const input = crypto.randomBytes(100 * 1024); const { senderResult, received } = await uploadRoundtrip(servers, input, { lanes: 1, chunkSize: 32 * 1024, }); expect(received).toEqual(input); expect(senderResult.sha256).toBe(hex(sha256Once(input))); expect(senderResult.bytesSent).toBe(input.length); }); test('1 MiB / 4 lanes range', async () => { const servers = await setup(); const input = crypto.randomBytes(1024 * 1024); const { senderResult, received } = await uploadRoundtrip(servers, input, { lanes: 4, chunkSize: 64 * 1024, partition: 'range', }); expect(received).toEqual(input); expect(senderResult.sha256).toBe(hex(sha256Once(input))); }); test('1 MiB / 4 lanes round-robin', async () => { const servers = await setup(); const input = crypto.randomBytes(1024 * 1024); const { senderResult, received } = await uploadRoundtrip(servers, input, { lanes: 4, chunkSize: 64 * 1024, partition: 'round-robin', }); expect(received).toEqual(input); expect(senderResult.sha256).toBe(hex(sha256Once(input))); }); test( '8 MiB / 4 lanes range (ship-gate proxy for 100 MB)', async () => { const servers = await setup(); const input = crypto.randomBytes(8 * 1024 * 1024); const { senderResult, received } = await uploadRoundtrip(servers, input, { lanes: 4, chunkSize: 256 * 1024, partition: 'range', }); expect(received).toEqual(input); expect(senderResult.sha256).toBe(hex(sha256Once(input))); }, 30_000, ); test('upload with ReadableStream input — round-robin auto-pick', async () => { const servers = await setup(); const input = crypto.randomBytes(512 * 1024); const stream = new ReadableStream({ start(controller) { for (let off = 0; off < input.length; off += 32 * 1024) { controller.enqueue(input.subarray(off, Math.min(off + 32 * 1024, input.length))); } controller.close(); }, }); const { senderResult, received } = await uploadRoundtrip(servers, stream as unknown as Uint8Array, { lanes: 4, chunkSize: 32 * 1024, }); expect(received).toEqual(input); expect(senderResult.sha256).toBe(hex(sha256Once(input))); }); test('peer offline → TransferOfflineError', async () => { const { a: ctrlA } = MemoryControlChannel.linked('alice', 'bob'); const sender = new TransferEngine({ crypto, controlChannel: ctrlA, transport: new ShadeTransferHttpTransport({ resolveBaseUrl: async () => 'http://127.0.0.1:1', // intentionally unreachable }), myAddress: 'alice', }); cleanups.push(() => sender.destroy()); await expect( sender.upload({ to: 'bob', input: crypto.randomBytes(64), }), ).rejects.toThrow(/offline|fetch failed|connection|ECONN|connect/i); }); });