import { describe, test, expect } from 'bun:test'; import { SubtleCryptoProvider } from '@shade/crypto-web'; import { sha256Once } from '@shade/streams'; import { TransferEngine, MemoryControlChannel, MemoryResumeStore, ShadeTransferHttpTransport, createTransferRoutes, type TransferHandle, type TransferResult, } from '../src/index.js'; const crypto = new SubtleCryptoProvider(); function hex(b: Uint8Array): string { return Array.from(b, (x) => x.toString(16).padStart(2, '0')).join(''); } describe('Resume protocol — kill-restart-verify', () => { test('sender crash mid-transfer → resumeUpload completes the same stream', async () => { const senderResumeStore = new MemoryResumeStore(); // Stable deviceKey shared across engine instances — simulates a stable // identity-derived key. In real use this is `deriveDeviceKey(identity)`. const deviceKey = crypto.randomBytes(32); // Receiver is a single, long-lived engine. Its in-memory IncomingState // already tracks accepted chunks; we don't need to persist receiver state // for the in-process scenario. const { a: ctrlA, b: ctrlB } = MemoryControlChannel.linked('alice', 'bob'); const receiverEngine = new TransferEngine({ crypto, controlChannel: ctrlB, transport: { probe: async () => undefined, sendChunk: async () => { throw new Error('receiver-side sendChunk should not be called'); }, fetchResumeState: async () => null, }, myAddress: 'bob', }); const receiverApp = await createTransferRoutes(receiverEngine); const port = 22000 + Math.floor(Math.random() * 500); const server = Bun.serve({ port, fetch: receiverApp.fetch }); const baseUrl = `http://localhost:${port}`; // Receiver accepts incoming. let resolveRecv!: (h: TransferHandle) => void; const recvHandlePromise = new Promise((r) => { resolveRecv = r; }); receiverEngine.onIncomingTransfer(async (incoming) => { const h = await incoming.accept({ output: { kind: 'buffer' } }); resolveRecv(h); }); // Sender #1 — this one will "crash" partway through. const senderEngine1 = new TransferEngine({ crypto, controlChannel: ctrlA, transport: new ShadeTransferHttpTransport({ resolveBaseUrl: async () => baseUrl, authenticator: { signChunk: async () => ({ 'X-Shade-Sender-Address': 'alice' }), signControl: async () => ({ 'X-Shade-Sender-Address': 'alice' }), }, }), myAddress: 'alice', resumeStore: senderResumeStore, deviceKey, }); const input = crypto.randomBytes(256 * 1024); // 256 KiB // Start an upload that will be intentionally interrupted. We pause the // sender by aborting the upload after ~25% bytes. The receiver still // holds its IncomingState — chunks already accepted stay tracked. const abort = new AbortController(); let bytesAtAbort = 0; const handle1 = await senderEngine1.upload({ to: 'bob', input, lanes: 4, chunkSize: 8 * 1024, partition: 'range', signal: abort.signal, onProgress: (p) => { if (p.bytesSent > input.length / 4 && bytesAtAbort === 0) { bytesAtAbort = p.bytesSent; abort.abort(); } }, }); const streamId = handle1.streamId; // Wait for the upload to fail (abort). let firstErrCaught = false; try { await handle1.done(); } catch { firstErrCaught = true; } expect(firstErrCaught).toBe(true); expect(bytesAtAbort).toBeGreaterThan(0); // Tear down the first sender engine — simulates a crashed/restarted client. senderEngine1.destroy(); // Sender #2 — fresh engine, same resume store. const senderEngine2 = new TransferEngine({ crypto, controlChannel: ctrlA, // re-use the link; in real life this would transport: new ShadeTransferHttpTransport({ resolveBaseUrl: async () => baseUrl, authenticator: { signChunk: async () => ({ 'X-Shade-Sender-Address': 'alice' }), signControl: async () => ({ 'X-Shade-Sender-Address': 'alice' }), }, }), myAddress: 'alice', resumeStore: senderResumeStore, deviceKey, }); // Verify state is still in the resume store. const persisted = await senderResumeStore.get(streamId); expect(persisted).not.toBeNull(); expect(persisted!.status).toBe('active'); // Resume the upload with the same input bytes. const handle2 = await senderEngine2.resumeUpload(streamId, input); const senderResult = await handle2.done(); expect(senderResult.streamId).toBe(streamId); // Receiver finishes too. const recvHandle = await recvHandlePromise; const recvResult = await recvHandle.done(); const received = (recvResult as TransferResult & { bytes?: Uint8Array }).bytes ?? new Uint8Array(); expect(received).toEqual(input); expect(senderResult.sha256).toBe(hex(sha256Once(input))); // Persisted state should now be finished. const after = await senderResumeStore.get(streamId); expect(after?.status).toBe('finished'); senderEngine2.destroy(); receiverEngine.destroy(); server.stop(); }, 30_000); });