/** * Client-side bridge between `@shade/files` content RPC ops and the * `@shade/transfer` engine. * * Two responsibilities, mirror-image of `server/streams-bridge.ts`: * 1. **Outbound writes (client → server, > 256 KiB).** When `FileClient.write` * promotes to the streams path, this bridge calls `shade.upload(...)` * with `userMetadata.shadeFilesWriteId = ` so the server can * correlate the inbound transfer with the parallel `write` RPC. * * 2. **Inbound reads (server → client, > 256 KiB).** When `FileClient.read` * gets a `{ kind: 'streams', streamId, ... }` RPC response, it asks * this bridge for the matching incoming transfer's plaintext stream. * The bridge subscribes to `shade.onIncomingTransfer` once and, on * each transfer tagged with `userMetadata.shadeFilesReadStreamId`, * **immediately** calls `accept(...)` (the engine rejects chunks that * arrive before accept), pipes plaintext into a TransformStream, and * parks the readable side until the matching read RPC awaits it. */ import type { TransferHandle, TransferProgress } from '@shade/transfer'; import { generateRequestId } from '../protocol/correlate.js'; import { OperationTimeoutError } from '../schemas/errors.js'; import { META_KEY_READ_STREAM_ID, META_KEY_WRITE_ID, type StreamsBridgeShade, } from '../server/streams-bridge.js'; export interface AwaitReadOptions { /** Sender address — must match `incoming.from` for delivery. */ expectedFrom: string; signal: AbortSignal; /** Hard deadline (ms from now). Default 60_000. */ timeoutMs?: number; } export interface ParkedRead { from: string; /** Plaintext stream. The bridge already accepted the transfer. */ readable: ReadableStream; /** Resolves when the transfer fully completes (verified sha256 available). */ done: Promise<{ sha256: string; bytesSent: number }>; /** Underlying transfer handle — for abort propagation. */ handle: TransferHandle; arrivedAt: number; } export interface ClientStreamsBridge { /** * Generate a fresh writeId, kick `shade.upload(...)` to `peer` with that * id stamped in `userMetadata`, and return both the id (for the parallel * RPC envelope) and the transfer handle (for `done()`/`abort()`). */ initiateWrite(opts: { peer: string; stream: ReadableStream; size: number; contentType?: string; name?: string; signal?: AbortSignal; onProgress?: (p: TransferProgress) => void; }): Promise<{ writeId: string; handle: TransferHandle }>; /** * Wait for an inbound transfer carrying `userMetadata.shadeFilesReadStreamId * === streamId` from `expectedFrom`. Resolves with the parked entry whose * `readable` can be consumed by the caller. */ awaitRead(streamId: string, opts: AwaitReadOptions): Promise; destroy(): Promise; } interface PendingReadWaiter { resolve: (parked: ParkedRead) => void; reject: (err: unknown) => void; expectedFrom: string; timer: ReturnType | null; abortListener: (() => void) | null; signal: AbortSignal; } export interface CreateClientStreamsBridgeOptions { /** Default deadline for `awaitRead` if the caller doesn't supply one. */ defaultAwaitReadTimeoutMs?: number; /** How long to retain a parked transfer waiting for its RPC. Default 60_000. */ parkedReadTtlMs?: number; } export async function createClientStreamsBridge( shade: StreamsBridgeShade, options: CreateClientStreamsBridgeOptions = {}, ): Promise { const parkedReadTtlMs = options.parkedReadTtlMs ?? 60_000; const defaultAwaitTimeoutMs = options.defaultAwaitReadTimeoutMs ?? 60_000; const parked = new Map(); const waiters = new Map(); let destroyed = false; const unsubscribe = await shade.onIncomingTransfer(async (incoming) => { const readStreamId = incoming.metadata.userMetadata?.[META_KEY_READ_STREAM_ID]; if (readStreamId === undefined) return; // Generous HWM so the receiver-side write loop (drainer → // engine.receiveChunk → sink.write) doesn't stall on backpressure // before the consumer's reader is wired up. The reader still // applies its own backpressure once it's consuming, but we no // longer race fs.read's await on stream-init against the consumer // attaching its reader. const ts = new TransformStream(undefined, undefined, { highWaterMark: 64, size: (chunk?: Uint8Array) => (chunk === undefined ? 0 : 1), }); let handle: TransferHandle; try { handle = await incoming.accept({ output: { kind: 'pipe', pipeTo: ts.writable }, }); } catch (err) { console.error('[shade-files client streams-bridge] accept failed:', err); return; } const arrival: ParkedRead = { from: incoming.from, readable: ts.readable, done: handle.done().then((r) => ({ sha256: r.sha256, bytesSent: r.bytesSent })), handle, arrivedAt: Date.now(), }; arrival.done.catch(() => { /* swallow until consumer awaits */ }); const waiter = waiters.get(readStreamId); if (waiter !== undefined) { waiters.delete(readStreamId); cleanupWaiter(waiter); if (incoming.from !== waiter.expectedFrom) { void handle.abort('sender-mismatch').catch(() => undefined); waiter.reject( new Error( `streams-bridge: readStreamId=${readStreamId} delivered by ${incoming.from}, expected ${waiter.expectedFrom}`, ), ); return; } waiter.resolve(arrival); return; } parked.set(readStreamId, arrival); const t = setTimeout(() => { const stale = parked.get(readStreamId); if (stale === arrival) { parked.delete(readStreamId); void handle.abort('rpc-timeout').catch(() => undefined); } }, parkedReadTtlMs); (t as unknown as { unref?: () => void }).unref?.(); }); function cleanupWaiter(w: PendingReadWaiter): void { if (w.timer !== null) clearTimeout(w.timer); if (w.abortListener !== null) { w.signal.removeEventListener('abort', w.abortListener); } } return { async initiateWrite(opts) { if (destroyed) throw new Error('streams-bridge: destroyed'); const writeId = generateRequestId(); const transferOpts: import('@shade/transfer').TransferOptions = { to: opts.peer, input: opts.stream, metadata: { ...(opts.name !== undefined ? { name: opts.name } : {}), ...(opts.contentType !== undefined ? { contentType: opts.contentType } : {}), sizeBytes: opts.size, userMetadata: { [META_KEY_WRITE_ID]: writeId }, }, ...(opts.signal !== undefined ? { signal: opts.signal } : {}), ...(opts.onProgress !== undefined ? { onProgress: opts.onProgress } : {}), }; const handle = await shade.upload(transferOpts); return { writeId, handle }; }, async awaitRead(streamId, opts) { if (destroyed) throw new Error('streams-bridge: destroyed'); const ready = parked.get(streamId); if (ready !== undefined) { parked.delete(streamId); if (ready.from !== opts.expectedFrom) { void ready.handle.abort('sender-mismatch').catch(() => undefined); throw new Error( `streams-bridge: readStreamId=${streamId} delivered by ${ready.from}, expected ${opts.expectedFrom}`, ); } return ready; } if (waiters.has(streamId)) { throw new Error(`streams-bridge: readStreamId=${streamId} already awaited`); } const timeoutMs = opts.timeoutMs ?? defaultAwaitTimeoutMs; return await new Promise((resolve, reject) => { const w: PendingReadWaiter = { resolve, reject, expectedFrom: opts.expectedFrom, timer: null, abortListener: null, signal: opts.signal, }; w.timer = setTimeout(() => { if (waiters.get(streamId) === w) { waiters.delete(streamId); cleanupWaiter(w); reject(new OperationTimeoutError(`streams-bridge: readStreamId=${streamId} timed out after ${timeoutMs}ms`)); } }, timeoutMs); if (opts.signal.aborted) { cleanupWaiter(w); reject(opts.signal.reason ?? new Error('aborted before await')); return; } const onAbort = (): void => { if (waiters.get(streamId) === w) { waiters.delete(streamId); cleanupWaiter(w); reject(opts.signal.reason ?? new Error('aborted')); } }; w.abortListener = onAbort; opts.signal.addEventListener('abort', onAbort, { once: true }); waiters.set(streamId, w); }); }, async destroy() { if (destroyed) return; destroyed = true; unsubscribe(); for (const w of waiters.values()) { cleanupWaiter(w); w.reject(new Error('streams-bridge: destroyed')); } waiters.clear(); for (const p of parked.values()) { try { await p.handle.abort('bridge-destroyed'); } catch { /* swallow */ } } parked.clear(); }, }; }