import { describe, expect, test } from 'bun:test'; import { createShade } from '@shade/sdk'; import { createPrekeyServer, MemoryPrekeyStore, PrekeyServerEvents, } from '@shade/server'; import { SubtleCryptoProvider } from '@shade/crypto-web'; import { Hono } from 'hono'; const crypto = new SubtleCryptoProvider(); /** * Stand up the full pull-mode rig: * - Prekey server (for X3DH) * - Bob: file handler + rpcRoute + transferQueueRoute, all on one server * - Alice: httpClient with outboundQueueUrl + transferBaseUrl wired * * Returns Alice's `FileClient`, which speaks browser-style: ONE base URL, * no inbound listener, streams supported via long-poll. */ async function setupPullRig(opts: { bobHandler: Parameters>['files']>['serve']>[0]; }) { const prekey = createPrekeyServer({ crypto, store: new MemoryPrekeyStore(), disableRateLimit: true, events: new PrekeyServerEvents(), }); const prekeyServer = Bun.serve({ port: 0, fetch: prekey.fetch }); const prekeyUrl = `http://localhost:${prekeyServer.port}`; const alice = await createShade({ prekeyServer: prekeyUrl, address: 'alice' }); const bob = await createShade({ prekeyServer: prekeyUrl, address: 'bob' }); // Bob: queue-route FIRST (configures bob's transports), then files.serve. const queueRoute = await bob.transferQueueRoute({ blockMs: 1_500 }); await bob.files.serve(opts.bobHandler); const rpcRoute = bob.files.rpcRoute({ acceptFirstMessage: true }); const app = new Hono(); app.route('/', queueRoute); app.route('/', rpcRoute); const bobServer = Bun.serve({ port: 0, fetch: app.fetch }); const baseUrl = `http://localhost:${bobServer.port}`; const fs = alice.files.httpClient('bob', { rpcUrl: `${baseUrl}/rpc`, outboundQueueUrl: `${baseUrl}/queue`, transferBaseUrl: baseUrl, defaultTimeoutMs: 10_000, queueBlockMs: 1_000, }); return { alice, bob, fs, baseUrl, teardown: async () => { fs.close(); await alice.shutdown(); await bob.shutdown(); bobServer.stop(); prekeyServer.stop(); }, }; } describe('@shade/files HTTP RPC — pull-mode streams', () => { test('streamed read (4 MiB) via long-poll queue', async () => { const payload = new Uint8Array(4 * 1024 * 1024); for (let i = 0; i < payload.length; i++) payload[i] = (i * 97) & 0xff; const rig = await setupPullRig({ bobHandler: { read: async () => { // Return the payload as a streamed read so the rpc-handler // promotes it via the streams-bridge into a transfer. const stream = new ReadableStream({ start(controller) { const CHUNK = 256 * 1024; for (let off = 0; off < payload.byteLength; off += CHUNK) { controller.enqueue(payload.slice(off, Math.min(off + CHUNK, payload.byteLength))); } controller.close(); }, }); // Need a precomputed sha256 for streamed reads. Use the // crypto provider's sha256 directly. const digest = new Uint8Array(await globalThis.crypto.subtle.digest('SHA-256', payload)); const sha256Hex = Array.from(digest, (b) => b.toString(16).padStart(2, '0')).join(''); return { kind: 'streams' as const, stream, size: payload.byteLength, sha256: sha256Hex, contentType: 'application/octet-stream', }; }, }, }); try { const result = await rig.fs.read('/big.bin'); expect(result.kind).toBe('streams'); if (result.kind !== 'streams') return; // Drain the stream and compare. const reader = result.stream.getReader(); const got = new Uint8Array(payload.byteLength); let offset = 0; while (true) { const { value, done } = await reader.read(); if (done) break; if (value !== undefined) { got.set(value, offset); offset += value.byteLength; } } reader.releaseLock(); await result.done(); expect(offset).toBe(payload.byteLength); // Compare in 64KiB strides for speed. let mismatch = -1; for (let i = 0; i < payload.byteLength; i++) { if (got[i] !== payload[i]) { mismatch = i; break; } } expect(mismatch).toBe(-1); } finally { await rig.teardown(); } }, 30_000); test('streamed read fails with clear error when outboundQueueUrl is omitted', async () => { const rig = await setupPullRig({ bobHandler: { read: async () => { const stream = new ReadableStream({ start(c) { c.enqueue(new Uint8Array(512 * 1024)); c.close(); }, }); const digest = new Uint8Array(await globalThis.crypto.subtle.digest('SHA-256', new Uint8Array(512 * 1024))); const sha256Hex = Array.from(digest, (b) => b.toString(16).padStart(2, '0')).join(''); return { kind: 'streams' as const, stream, size: 512 * 1024, sha256: sha256Hex, }; }, }, }); // Tear down the rig's drainer so we can construct an inline-only client rig.fs.close(); const inlineOnly = rig.alice.files.httpClient('bob', { rpcUrl: `${rig.baseUrl}/rpc`, defaultTimeoutMs: 10_000, }); try { await expect(inlineOnly.read('/big.bin')).rejects.toThrow(/streamed read/); } finally { inlineOnly.close(); await rig.teardown(); } }, 15_000); test('long-poll returns empty events on idle timeout', async () => { const rig = await setupPullRig({ bobHandler: { stat: async () => ({ name: '_', kind: 'dir' as const, size: 0, mtime: 0, metadata: {}, }), }, }); try { // Direct poll without any pending events — should return after blockMs. const start = Date.now(); const res = await fetch(`${rig.baseUrl}/queue`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-Shade-Sender-Address': 'alice', }, body: JSON.stringify({ since: 0, blockMs: 500 }), }); expect(res.status).toBe(200); const body = (await res.json()) as { events: unknown[]; nextSince: number }; expect(body.events).toHaveLength(0); expect(Date.now() - start).toBeGreaterThanOrEqual(400); } finally { await rig.teardown(); } }, 10_000); });