From 7520b11b2515b25e7ed7c5819284376a0bcfe102 Mon Sep 17 00:00:00 2001 From: Sterister Date: Sun, 3 May 2026 23:27:06 +0200 Subject: [PATCH] release(v4.2.0): pull-mode streams for browser @shade/files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 4.1.0's HTTP RPC for browsers capped at inline payloads (≤ 256 KiB). 4.2.0 unlocks streams: server queues outbound chunks + control envelopes per peer, browser long-polls the queue. Browser-to-server writes ride the existing /v1/transfer//chunk POST routes unchanged. For Dispatch this unlocks mod-jar uploads (50 MB) and world-backup downloads (100+ MB) — the actual reason browser-side @shade/files matters. ### New API @shade/sdk: - shade.transferQueueRoute(opts?) — Hono app with /queue + /v1/transfer/* routes. Auto-configures the queue transport. - shade.configureTransfers extended: transport + envelopeTransport override slots; resolveBaseUrl optional when both supplied. @shade/transfer: - OutboundQueue — per-peer monotonic event log with long-poll semantics, idle-eviction GC, ring-buffered to maxEventsPerPeer. - QueueTransferTransport — enqueues instead of POSTing. @shade/files: - httpClient({ outboundQueueUrl, transferBaseUrl }) — when set, starts a long-poll drainer + builds a streams-bridge. fs.read / fs.write of >256 KiB work end-to-end. - startQueueDrainer(shade, opts) — exported helper for advanced consumers driving their own drainer. ### Implementation notes - ClientStreamsBridge's TransformStream had HWM=0 by default which stalled the drainer's await chain at chunk 4 (writer.write pended before the consumer's reader was attached). Bumped to HWM=64 so the receive loop can buffer ahead of the consumer. ### Tests 3 new integration tests in tests/integration/http-rpc-streams.test.ts: 4 MiB streamed read round-trip, inline-only error path, idle-timeout long-poll behaviour. Wire-compatible. Source-compatible. Lockstep bump to 4.2.0. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 145 ++++++++ packages/shade-cli/package.json | 2 +- packages/shade-core/package.json | 2 +- packages/shade-crypto-web/package.json | 2 +- packages/shade-dashboard/package.json | 2 +- packages/shade-files/package.json | 2 +- .../shade-files/src/client/http-client.ts | 233 +++++++++++-- .../shade-files/src/client/queue-drainer.ts | 172 ++++++++++ .../shade-files/src/client/streams-bridge.ts | 11 +- packages/shade-files/src/index.ts | 5 + .../src/integration/shade-bridge.ts | 21 ++ .../integration/http-rpc-streams.test.ts | 208 ++++++++++++ packages/shade-inbox-server/package.json | 2 +- packages/shade-inbox/package.json | 2 +- packages/shade-key-transparency/package.json | 2 +- packages/shade-keychain/package.json | 2 +- packages/shade-observability/package.json | 2 +- packages/shade-observer/package.json | 2 +- packages/shade-proto/package.json | 2 +- packages/shade-recovery/package.json | 2 +- packages/shade-sdk/package.json | 2 +- packages/shade-sdk/src/shade.ts | 227 ++++++++++++- packages/shade-server/package.json | 2 +- packages/shade-storage-encrypted/package.json | 2 +- packages/shade-storage-postgres/package.json | 2 +- packages/shade-storage-sqlite/package.json | 2 +- packages/shade-streams/package.json | 2 +- packages/shade-transfer/package.json | 2 +- packages/shade-transfer/src/index.ts | 1 + .../src/transport/queue-transport.ts | 319 ++++++++++++++++++ packages/shade-transport-bridge/package.json | 2 +- packages/shade-transport-webrtc/package.json | 2 +- packages/shade-transport/package.json | 2 +- packages/shade-widgets/package.json | 2 +- 34 files changed, 1331 insertions(+), 59 deletions(-) create mode 100644 packages/shade-files/src/client/queue-drainer.ts create mode 100644 packages/shade-files/tests/integration/http-rpc-streams.test.ts create mode 100644 packages/shade-transfer/src/transport/queue-transport.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 77e6f23..2ea2af2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,151 @@ All notable changes to Shade are documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [4.2.0] — 2026-05-03 — Pull-mode streams for browser @shade/files + +`4.1.0` shipped HTTP RPC for browser clients but capped them at inline +payloads (≤ 256 KiB). Larger reads/writes — mod-jars (1–50 MB), +world-backups (100+ MB), the things that actually need streaming — +threw `ConflictError` directing callers to the server-to-server +pathway. That made browser-side `@shade/files` insufficient for +admin-panel-style apps where the client is a browser tab and the +server is a Bun process. + +`4.2.0` flips the direction: when the browser supplies +`outboundQueueUrl` + `transferBaseUrl`, server-to-browser chunks + +control envelopes ride a per-peer queue that the browser long-polls, +and browser-to-server chunks POST directly to the server's existing +chunk-receive routes. No WebSockets, no SSE, no inbound listener on +the browser. Long-polling + a request-response inbound queue is +the entire wire surface. + +### Added + +#### `@shade/transfer` +- `OutboundQueue` — per-peer monotonic event log with long-poll + semantics. `enqueue(peer, event)` appends, `drain(peer, since, + blockMs, signal)` returns events with `id > since` (blocking up + to `blockMs` if none are ready). Idle-eviction GC drops peers + that haven't polled in `idleEvictionMs` (default 10 min). Ring- + buffered to `maxEventsPerPeer` (default 1000) — overflow drops + oldest, receivers pick up the gap via re-resume from `since=0`. +- `QueuedEvent` discriminated union: `{ kind: 'envelope', bytes }` + or `{ kind: 'chunk', bytes, meta: { streamId, laneId, seq } }`. +- `QueueTransferTransport` (implements `ITransferTransport`) — + enqueues outbound chunks instead of POSTing. Returns optimistic + `ChunkAck` because the queue *is* the delivery; chunk-resume picks + up dropped events on receiver-side reconnect. + +#### `@shade/sdk` +- `Shade.transferQueueRoute(opts?)` — Hono app with all five routes a + pull-mode receiver needs: + - `POST /queue` — long-poll the per-peer outbound queue. + - `POST /v1/transfer/:streamId/chunk` — receive incoming chunks + (browser → server writes). + - `GET /v1/transfer/:streamId/state` — resume-state lookup. + - `POST /v1/transfer/control` — receive incoming control envelopes + (browser → server stream-init / abort). + - `GET /v1/transfer/health` — peer reachability probe. + Auto-configures `shade.configureTransfers(...)` with the queue + transport + `QueueEnvelopeTransport` if not already configured. +- `Shade.configureTransfers(opts)` extended: `resolveBaseUrl` is now + optional when `transport` and `envelopeTransport` are both supplied + (lets pure-queue servers omit the baseUrl entirely). New + `transport?: ITransferTransport` override slot. +- `QueueEnvelopeTransport` — `ControlEnvelopeTransport` impl that + enqueues outbound envelopes for browser receivers. + +#### `@shade/files` +- `createFilesHttpClient` (and `shade.files.httpClient`) accept new + options: + - `outboundQueueUrl` — `/queue` endpoint to long-poll. + - `transferBaseUrl` — base URL for outbound chunk POSTs and control + envelope POSTs (browser → server writes). + - `queueBlockMs` — long-poll timeout (default 30 s; server clamps + at `maxBlockMs`). + When set, the client: + 1. Configures `shade.configureTransfers({ resolveBaseUrl })` so + outbound chunks POST to `/v1/transfer/...`. + 2. Builds a `ClientStreamsBridge` eagerly so the engine's + incoming-transfer subscription is in place before the drainer + dispatches the first envelope. + 3. Starts a long-poll `startQueueDrainer(...)` that pulls queued + events and dispatches them via `shade.acceptTransferEnvelope`. +- Streamed reads (`fs.read` of files > 256 KiB) and streamed writes + (`fs.write` of large inputs) now work end-to-end on the browser + client when the queue options are set. +- `startQueueDrainer(shade, opts)` exported for advanced consumers + that want to drive their own drainer (e.g. service-worker setups + that want a single shared drainer across multiple `httpClient`s). +- `client.close()` now stops the drainer and tears down the streams- + bridge — important on tab unload to free the long-poll socket. + +#### `@shade/files` (internal) +- `ClientStreamsBridge` uses a TransformStream with `highWaterMark: + 64` instead of the default `0` so the receive-side write loop + doesn't stall on backpressure before the consumer attaches its + reader (default HWM stalled at chunk 4 in pull-mode where the + drainer races the consumer's `getReader()` call). + +### Wire contract + +``` +POST /queue HTTP/1.1 +X-Shade-Sender-Address: alice@example.com +{ "since": 42, "blockMs": 30000 } + +──── + +200 OK +{ + "events": [ + { "id": 43, "kind": "envelope", "bytesB64": "...", "timestampMs": 1730... }, + { "id": 44, "kind": "chunk", "bytesB64": "...", "meta": { "streamId": "...", "laneId": 0, "seq": 0 } }, + ... + ], + "nextSince": 47 +} +``` + +### Tests + +`tests/integration/http-rpc-streams.test.ts` — three integration tests: +- 4 MiB streamed read end-to-end via long-poll queue (verifies bytes + match the source). +- Inline-only client throws clear error on streamed read. +- Long-poll returns empty events on idle timeout (verifies the + `blockMs` pathway). + +### Migration + +`4.1.0 → 4.2.0` is wire-compatible and source-compatible — the +queue route is purely additive. To enable streamed transfers in a +browser app: + +```ts +// Server +const queue = await shade.transferQueueRoute({ blockMs: 30_000 }); +await shade.files.serve(handler); +const rpc = shade.files.rpcRoute({ acceptFirstMessage: true }); + +const app = new Hono(); +app.route('/api/v1/shade-files', queue); +app.route('/api/v1/shade-files', rpc); + +// Browser +const fs = shade.files.httpClient(serverAddress, { + rpcUrl: 'https://server/api/v1/shade-files/rpc', + outboundQueueUrl: 'https://server/api/v1/shade-files/queue', + transferBaseUrl: 'https://server/api/v1/shade-files', +}); +await fs.write('/mods/some-mod.jar', new Uint8Array(/* 50 MB */)); +const result = await fs.read('/backups/world.tar.gz'); // streamed +``` + +`shade.files.serve(handler, { inlineOnly: true })` is still supported +for HTTP-RPC-without-streams deployments — it skips the streams-bridge +setup entirely. + ## [4.1.0] — 2026-05-03 — Browser-friendly HTTP RPC for @shade/files The default `shade.files.client(peer)` requires both peers to be diff --git a/packages/shade-cli/package.json b/packages/shade-cli/package.json index 24d0946..0337f85 100644 --- a/packages/shade-cli/package.json +++ b/packages/shade-cli/package.json @@ -1,6 +1,6 @@ { "name": "@shade/cli", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/cli.ts", "bin": { diff --git a/packages/shade-core/package.json b/packages/shade-core/package.json index 65f95d0..8b1c6c4 100644 --- a/packages/shade-core/package.json +++ b/packages/shade-core/package.json @@ -1,6 +1,6 @@ { "name": "@shade/core", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-crypto-web/package.json b/packages/shade-crypto-web/package.json index ba781a3..fd4027e 100644 --- a/packages/shade-crypto-web/package.json +++ b/packages/shade-crypto-web/package.json @@ -1,6 +1,6 @@ { "name": "@shade/crypto-web", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-dashboard/package.json b/packages/shade-dashboard/package.json index 6bca262..2c3c0ee 100644 --- a/packages/shade-dashboard/package.json +++ b/packages/shade-dashboard/package.json @@ -1,6 +1,6 @@ { "name": "@shade/dashboard", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "scripts": { "dev": "vite", diff --git a/packages/shade-files/package.json b/packages/shade-files/package.json index fa9e498..44e1a53 100644 --- a/packages/shade-files/package.json +++ b/packages/shade-files/package.json @@ -1,6 +1,6 @@ { "name": "@shade/files", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-files/src/client/http-client.ts b/packages/shade-files/src/client/http-client.ts index 5755caf..7114cd8 100644 --- a/packages/shade-files/src/client/http-client.ts +++ b/packages/shade-files/src/client/http-client.ts @@ -69,6 +69,11 @@ import { import { buildRpcRequest } from '../protocol/rpc-builder.js'; import { decideInline, INLINE_THRESHOLD, type WriteSource } from './inline-threshold.js'; import { base64ToBytes, bytesToBase64 } from '../protocol/canonical.js'; +import { startQueueDrainer, type QueueDrainerHandle } from './queue-drainer.js'; +import { + createClientStreamsBridge, + type ClientStreamsBridge, +} from './streams-bridge.js'; import type { FileClient, ReadOpts, @@ -80,7 +85,7 @@ import type { } from './client.js'; export interface FilesHttpClientOptions - extends Omit { + extends Omit { /** * Server endpoint that hosts `createFilesRpcRoute(...)`. Typically: * `https://server.example.com/api/v1/shade-files/rpc`. @@ -98,6 +103,32 @@ export interface FilesHttpClientOptions * orthogonal to the ratchet authentication on the envelope itself. */ headers?: Record; + /** + * Server endpoint that hosts `transferQueueRoute()`'s long-poll + * endpoint. Typically: + * `https://server.example.com/api/v1/shade-files/queue`. + * + * When supplied, the client starts a background long-poll that + * drains queued envelopes + chunks from the server and dispatches + * them via `shade.acceptTransferEnvelope`. This unlocks + * **streamed reads** (>256 KiB) for browser-style consumers. + */ + outboundQueueUrl?: string; + /** + * Base URL for outbound transfer routes (browser → server). Required + * alongside `outboundQueueUrl` to enable streamed writes. Typically: + * `https://server.example.com/api/v1/shade-files`. + * + * The client POSTs: + * - chunks to `/v1/transfer//chunk` + * - control envelopes to `/v1/transfer/control` + */ + transferBaseUrl?: string; + /** + * Long-poll block timeout, milliseconds. Default 30_000. Server + * clamps to its own `maxBlockMs` (default 55_000). + */ + queueBlockMs?: number; } interface RoundTripOpts { @@ -112,6 +143,12 @@ interface RoundTripOpts { * (via `shade.initSessionFromBundle(peerAddress, bundle)` or an * incoming first-message). Otherwise the first RPC will fail with * "decrypt failed: no session for peer". + * + * When `outboundQueueUrl` + `transferBaseUrl` are supplied, the + * client also unlocks **streamed reads/writes** for files larger than + * the inline threshold (256 KiB). The browser polls the server's + * outbound queue for chunks/envelopes and POSTs its own outbound + * chunks to the server's transfer-receive routes. */ export function createFilesHttpClient( shade: ShadeBridge, @@ -122,9 +159,84 @@ export function createFilesHttpClient( const fetchFn = options.fetch ?? globalThis.fetch.bind(globalThis); const extraHeaders = options.headers ?? {}; const defaultTimeoutMs = options.defaultTimeoutMs ?? 30_000; + const ioTimeoutMs = options.ioTimeoutMs ?? 60_000; const signRequest = options.signRequest; const senderAddress = shade.myAddress; + // ─── Streamed-mode bootstrap ───────────────────────────────── + // + // When `outboundQueueUrl` is supplied, the client: + // 1. Configures `shade.configureTransfers(...)` so outbound + // chunks POST to `/v1/transfer//chunk` + // and outbound control envelopes POST to + // `/v1/transfer/control`. + // 2. Spawns a streams-bridge so streamed reads can be awaited. + // 3. Starts a long-poll drainer that pulls queued envelopes + + // chunks from the server and dispatches via + // `shade.acceptTransferEnvelope`. + + let drainer: QueueDrainerHandle | null = null; + let streamsBridgePromise: Promise | null = null; + let streamsBridge: ClientStreamsBridge | null = null; + + if (options.outboundQueueUrl !== undefined) { + const outboundQueueUrl = options.outboundQueueUrl; + if (options.transferBaseUrl === undefined) { + throw new Error( + 'createFilesHttpClient: outboundQueueUrl was supplied without transferBaseUrl. Pass `transferBaseUrl` (the server prefix that hosts /v1/transfer/...) so outbound chunks have a destination.', + ); + } + if (shade.configureTransfers === undefined) { + throw new Error( + 'createFilesHttpClient: shade.configureTransfers is required for streamed mode (the underlying ShadeBridge must surface it).', + ); + } + const transferBaseUrl = options.transferBaseUrl.replace(/\/$/, ''); + shade.configureTransfers({ + resolveBaseUrl: async (peer) => { + if (peer !== peerAddress) { + throw new Error( + `httpClient is bound to peer "${peerAddress}" — refusing to resolve outgoing chunks for "${peer}" without a multi-peer registry. Use shade.files.client(peer) for server-to-server multi-peer.`, + ); + } + return transferBaseUrl; + }, + }); + // Build the streams-bridge eagerly. The engine's incoming-transfer + // subscription has to be in place BEFORE the drainer dispatches the + // first stream-init envelope, otherwise the engine emits the + // IncomingTransfer to zero handlers and the read silently never + // accepts. We kick off the drainer once the bridge has subscribed. + streamsBridgePromise = createClientStreamsBridge(shade).then((bridge) => { + streamsBridge = bridge; + drainer = startQueueDrainer(shade, { + outboundQueueUrl, + peerAddress, + senderAddress, + ...(options.fetch !== undefined ? { fetch: options.fetch } : {}), + ...(options.headers !== undefined ? { headers: options.headers } : {}), + ...(options.queueBlockMs !== undefined ? { blockMs: options.queueBlockMs } : {}), + }); + return bridge; + }); + // Surface bridge-construction failures eagerly via a rejected + // promise the next read/write picks up. + streamsBridgePromise.catch(() => { + /* observed via getStreamsBridge() */ + }); + } + + async function getStreamsBridge(): Promise { + if (streamsBridge !== null) return streamsBridge; + if (streamsBridgePromise === null) { + throw new ConflictError( + `http RPC client supports inline writes/reads only (≤ ${INLINE_THRESHOLD} bytes) — pass { outboundQueueUrl, transferBaseUrl } to enable streamed transfers.`, + ); + } + streamsBridge = await streamsBridgePromise; + return streamsBridge; + } + /** * Encrypt + POST + decrypt + parse one RPC round-trip. * @@ -321,20 +433,39 @@ export function createFilesHttpClient( ReadResultSchema, opts, ); - if (wire.kind !== 'inline') { - // The HTTP RPC route does not service streamed reads — there is - // no place to stream from in pure request-response. + if (wire.kind === 'inline') { + const bytes = base64ToBytes(wire.bytesB64); + const out: ReadOutput = { + kind: 'inline', + bytes, + size: wire.size, + sha256: wire.sha256, + ...(wire.contentType !== undefined ? { contentType: wire.contentType } : {}), + }; + return out; + } + // Streamed read — only supported when the queue drainer is wired. + if (drainer === null) { throw new InternalFileError( - `http RPC client received a streamed read (size ${wire.size}). Use shade.files.client(peer) on a server-to-server deployment, or pass { preferInline: true } when the file is known to fit inline.`, + `http RPC client received a streamed read (size ${wire.size}) but is in inline-only mode. Pass { outboundQueueUrl, transferBaseUrl } when constructing the client to enable streamed reads.`, ); } - const bytes = base64ToBytes(wire.bytesB64); + const bridge = await getStreamsBridge(); + const bridgeSignal = opts.signal ?? new AbortController().signal; + const parked = await bridge.awaitRead(wire.streamId, { + expectedFrom: peerAddress, + signal: bridgeSignal, + timeoutMs: ioTimeoutMs, + }); const out: ReadOutput = { - kind: 'inline', - bytes, + kind: 'streams', + stream: parked.readable, size: wire.size, sha256: wire.sha256, ...(wire.contentType !== undefined ? { contentType: wire.contentType } : {}), + done: async () => { + await parked.done; + }, }; return out; }, @@ -344,25 +475,77 @@ export function createFilesHttpClient( const overwrite = opts.overwrite ?? false; const contentType = opts.contentType ?? decision.contentType; - if (decision.kind !== 'inline') { - throw new ConflictError( - `http RPC client supports inline writes only (≤ ${INLINE_THRESHOLD} bytes). The supplied input was promoted to streams (size ${decision.size ?? 'unknown'}). Use shade.files.client(peer) for streamed writes, or pre-buffer the input below the inline threshold.`, + if (decision.kind === 'inline' || opts.forceInline === true) { + const bytes = decision.kind === 'inline' ? decision.bytes : null; + if (bytes === null) { + // forceInline === true with a streams-typed decision — + // decideInline always produced a `streams` shape because the + // input was a bare ReadableStream. We can't drain a stream + // synchronously here without a streams-bridge. + throw new ConflictError( + 'http RPC client cannot forceInline a streamed input — pass a Uint8Array / Blob, or pre-buffer the stream.', + ); + } + if (bytes.byteLength > INLINE_THRESHOLD) { + throw new ConflictError( + `inline write exceeds ${INLINE_THRESHOLD}-byte threshold (got ${bytes.byteLength}); pass forceInline=true to override`, + ); + } + const args = WriteArgsSchema.parse({ + kind: 'inline', + path, + bytesB64: bytesToBase64(bytes), + ...(contentType !== undefined ? { contentType } : {}), + overwrite, + }); + return await roundTrip( + KIND_WRITE_V1, + 'write', + args, + WriteResultSchema, + opts, ); } + + // Streamed write — requires the queue drainer + streams-bridge. + if (drainer === null) { + throw new ConflictError( + `http RPC client supports inline writes only (≤ ${INLINE_THRESHOLD} bytes). The supplied input was promoted to streams (size ${decision.size ?? 'unknown'}). Pass { outboundQueueUrl, transferBaseUrl } to enable streamed writes.`, + ); + } + const bridge = await getStreamsBridge(); + const size = decision.size; + if (size === undefined) { + throw new ConflictError( + 'streams write requires a known plaintext size; pass `{ stream, size }` instead of a bare ReadableStream', + ); + } + const { writeId, handle } = await bridge.initiateWrite({ + peer: peerAddress, + stream: decision.stream, + size, + ...(contentType !== undefined ? { contentType } : {}), + name: path, + ...(opts.signal !== undefined ? { signal: opts.signal } : {}), + }); const args = WriteArgsSchema.parse({ - kind: 'inline', + kind: 'streams', path, - bytesB64: bytesToBase64(decision.bytes), + size, ...(contentType !== undefined ? { contentType } : {}), overwrite, + writeId, }); - return await roundTrip( - KIND_WRITE_V1, - 'write', - args, - WriteResultSchema, - opts, - ); + try { + const [result] = await Promise.all([ + roundTrip(KIND_WRITE_V1, 'write', args, WriteResultSchema, opts), + handle.done(), + ]); + return result; + } catch (err) { + await handle.abort('rpc-failed').catch(() => undefined); + throw err; + } }, async getThumbnail(path, size: ThumbnailSize, opts): Promise { @@ -393,7 +576,15 @@ export function createFilesHttpClient( }, close(): void { - // Stateless — nothing to release. Exists for FileClient symmetry. + // Stop the long-poll drainer + tear down the streams-bridge if + // we built one. Idempotent — safe to call multiple times. + drainer?.stop(); + drainer = null; + if (streamsBridge !== null) { + void streamsBridge.destroy().catch(() => undefined); + streamsBridge = null; + } + streamsBridgePromise = null; }, } as FileClient; } diff --git a/packages/shade-files/src/client/queue-drainer.ts b/packages/shade-files/src/client/queue-drainer.ts new file mode 100644 index 0000000..6a2e966 --- /dev/null +++ b/packages/shade-files/src/client/queue-drainer.ts @@ -0,0 +1,172 @@ +/** + * Browser-side drainer for the pull-mode outbound queue. + * + * Background task that long-polls the server's `/queue` endpoint, + * decodes each event, and dispatches it into the consumer's Shade + * instance via `shade.acceptTransferEnvelope`. Same wire-shape as the + * server-to-server case where the engine receives chunks via direct + * HTTP POSTs — we just flip the direction so the browser pulls + * instead of accepts. + */ +import type { ShadeBridge } from '../integration/shade-bridge.js'; + +export interface QueueDrainerOptions { + /** + * Server endpoint that hosts `transferQueueRoute()`. Typically: + * `https://server.example.com/api/v1/shade-files/queue`. + */ + outboundQueueUrl: string; + /** Peer the queue is pulled FROM (the server's address). */ + peerAddress: string; + /** Address we identify ourselves with on the long-poll. */ + senderAddress: string; + /** Optional `fetch` override. Default `globalThis.fetch`. */ + fetch?: typeof globalThis.fetch; + /** Extra headers applied to every poll request. */ + headers?: Record; + /** + * Long-poll request timeout (server-side block). Default 30_000. + * Server clamps to its own `maxBlockMs` (default 55_000). + */ + blockMs?: number; + /** + * Backoff after a network error before re-polling. Default 2_000. + * Doubles up to `maxBackoffMs` on consecutive failures. + */ + initialBackoffMs?: number; + maxBackoffMs?: number; + /** + * Called when a poll cycle fails. Defaults to logging via `console.error`. + * Throwing from this hook does NOT stop the drainer — it backs off + * and retries. + */ + onError?: (err: unknown) => void; +} + +export interface QueueDrainerHandle { + /** Stop the drainer. Pending fetch is aborted; the loop exits. */ + stop(): void; + /** Promise that resolves once the drainer has fully stopped. */ + stopped: Promise; +} + +interface PolledEvent { + id: number; + timestampMs: number; + kind: 'envelope' | 'chunk'; + bytesB64: string; + meta?: { streamId: string; laneId: number; seq: number }; +} + +interface PollResponseBody { + events: PolledEvent[]; + nextSince: number; +} + +const DEFAULT_BLOCK_MS = 30_000; +const DEFAULT_INITIAL_BACKOFF_MS = 2_000; +const DEFAULT_MAX_BACKOFF_MS = 30_000; + +/** + * Start a long-poll loop that drains queued envelopes + chunks from + * the server and dispatches them into the local Shade instance. + * + * Returns a handle the caller can use to stop the drainer when the + * `httpClient` is closed (e.g. on tab unload). + */ +export function startQueueDrainer( + shade: ShadeBridge, + options: QueueDrainerOptions, +): QueueDrainerHandle { + if (shade.acceptTransferEnvelope === undefined) { + throw new Error( + 'startQueueDrainer: shade.acceptTransferEnvelope is required for pull-mode streams. The supplied ShadeBridge implementation must surface it.', + ); + } + const accept = shade.acceptTransferEnvelope.bind(shade); + const fetchFn = options.fetch ?? globalThis.fetch.bind(globalThis); + const blockMs = options.blockMs ?? DEFAULT_BLOCK_MS; + const onError = options.onError ?? ((err: unknown) => console.error('[shade.files queue-drainer]', err)); + const initialBackoffMs = options.initialBackoffMs ?? DEFAULT_INITIAL_BACKOFF_MS; + const maxBackoffMs = options.maxBackoffMs ?? DEFAULT_MAX_BACKOFF_MS; + const ac = new AbortController(); + let stopped = false; + let resolveStopped!: () => void; + const stoppedPromise = new Promise((r) => { + resolveStopped = r; + }); + + void (async () => { + let since = 0; + let backoff = initialBackoffMs; + while (!stopped) { + try { + const response = await fetchFn(options.outboundQueueUrl, { + method: 'POST', + signal: ac.signal, + headers: { + 'Content-Type': 'application/json', + 'X-Shade-Sender-Address': options.senderAddress, + ...(options.headers ?? {}), + }, + body: JSON.stringify({ since, blockMs }), + }); + if (!response.ok) { + throw new Error(`queue poll → ${response.status} ${response.statusText}`); + } + const body = (await response.json()) as PollResponseBody; + if (Array.isArray(body.events) && body.events.length > 0) { + for (const event of body.events) { + if (stopped) break; + try { + const bytes = base64ToBytes(event.bytesB64); + await accept(options.peerAddress, bytes); + } catch (err) { + // Per-event dispatch failure should not kill the loop — + // resume picks up missing chunks via @shade/transfer's + // built-in lane-resume protocol. + onError(err); + } + } + } + since = typeof body.nextSince === 'number' ? body.nextSince : since; + backoff = initialBackoffMs; + } catch (err) { + if (stopped || ac.signal.aborted) break; + onError(err); + // Exponential backoff with jitter — caps at maxBackoffMs. + const jitter = Math.floor(Math.random() * Math.min(backoff, 1_000)); + await new Promise((r) => { + const t = setTimeout(r, backoff + jitter); + (t as unknown as { unref?: () => void }).unref?.(); + ac.signal.addEventListener( + 'abort', + () => { + clearTimeout(t); + r(); + }, + { once: true }, + ); + }); + backoff = Math.min(maxBackoffMs, backoff * 2); + } + } + resolveStopped(); + })(); + + return { + stop(): void { + if (stopped) return; + stopped = true; + ac.abort(new Error('queue drainer stopped')); + }, + stopped: stoppedPromise, + }; +} + +function base64ToBytes(b64: string): Uint8Array { + const bin = atob(b64); + const out = new Uint8Array(bin.length); + for (let i = 0; i < bin.length; i++) out[i] = bin.charCodeAt(i); + return out; +} diff --git a/packages/shade-files/src/client/streams-bridge.ts b/packages/shade-files/src/client/streams-bridge.ts index 5b5a91f..46c76e6 100644 --- a/packages/shade-files/src/client/streams-bridge.ts +++ b/packages/shade-files/src/client/streams-bridge.ts @@ -102,7 +102,16 @@ export async function createClientStreamsBridge( const readStreamId = incoming.metadata.userMetadata?.[META_KEY_READ_STREAM_ID]; if (readStreamId === undefined) return; - const ts = new TransformStream(); + // Generous HWM so the receiver-side write loop (drainer → + // engine.receiveChunk → sink.write) doesn't stall on backpressure + // before the consumer's reader is wired up. The reader still + // applies its own backpressure once it's consuming, but we no + // longer race fs.read's await on stream-init against the consumer + // attaching its reader. + const ts = new TransformStream(undefined, undefined, { + highWaterMark: 64, + size: (chunk?: Uint8Array) => (chunk === undefined ? 0 : 1), + }); let handle: TransferHandle; try { handle = await incoming.accept({ diff --git a/packages/shade-files/src/index.ts b/packages/shade-files/src/index.ts index 8d4c1b9..7be49bd 100644 --- a/packages/shade-files/src/index.ts +++ b/packages/shade-files/src/index.ts @@ -190,6 +190,11 @@ export { createFilesRpcRoute } from './server/rpc-route.js'; export type { FilesRpcRouteOptions } from './server/rpc-route.js'; export { createFilesHttpClient } from './client/http-client.js'; export type { FilesHttpClientOptions } from './client/http-client.js'; +export { startQueueDrainer } from './client/queue-drainer.js'; +export type { + QueueDrainerHandle, + QueueDrainerOptions, +} from './client/queue-drainer.js'; // Shared structural surface @shade/files needs from a Shade instance — // exposed so consumers building custom Shade-shaped bridges can verify diff --git a/packages/shade-files/src/integration/shade-bridge.ts b/packages/shade-files/src/integration/shade-bridge.ts index 26c820d..07b683c 100644 --- a/packages/shade-files/src/integration/shade-bridge.ts +++ b/packages/shade-files/src/integration/shade-bridge.ts @@ -70,4 +70,25 @@ export interface ShadeBridge { /** Optional control-envelope passthrough used by the WebRTC bridge. */ deliverControlEnvelope?(peer: string, envelope: ShadeEnvelope): Promise; + + /** + * Hand a freshly-decoded wire envelope (control or chunk) to the + * transfer engine. Required by the pull-mode HTTP client when it + * drains queued events from the server: each polled chunk / control + * envelope is dispatched here so the engine sees it just as if it + * had arrived via an HTTP POST on `/v1/transfer/...`. + */ + acceptTransferEnvelope?(from: string, env: ShadeEnvelope | Uint8Array): Promise; + + /** + * Configure the transfer stack. Called by the pull-mode HTTP client + * to point the browser's outgoing chunks + control envelopes at the + * server's transferQueueRoute mount. Optional because the + * server-to-server path uses a separate, app-driven configuration. + */ + configureTransfers?(opts: { + resolveBaseUrl?: (peerAddress: string) => Promise; + transport?: unknown; + envelopeTransport?: unknown; + }): void; } diff --git a/packages/shade-files/tests/integration/http-rpc-streams.test.ts b/packages/shade-files/tests/integration/http-rpc-streams.test.ts new file mode 100644 index 0000000..f67acb5 --- /dev/null +++ b/packages/shade-files/tests/integration/http-rpc-streams.test.ts @@ -0,0 +1,208 @@ +import { describe, expect, test } from 'bun:test'; +import { createShade } from '@shade/sdk'; +import { + createPrekeyServer, + MemoryPrekeyStore, + PrekeyServerEvents, +} from '@shade/server'; +import { SubtleCryptoProvider } from '@shade/crypto-web'; +import { Hono } from 'hono'; + +const crypto = new SubtleCryptoProvider(); + +/** + * Stand up the full pull-mode rig: + * - Prekey server (for X3DH) + * - Bob: file handler + rpcRoute + transferQueueRoute, all on one server + * - Alice: httpClient with outboundQueueUrl + transferBaseUrl wired + * + * Returns Alice's `FileClient`, which speaks browser-style: ONE base URL, + * no inbound listener, streams supported via long-poll. + */ +async function setupPullRig(opts: { + bobHandler: Parameters>['files']>['serve']>[0]; +}) { + const prekey = createPrekeyServer({ + crypto, + store: new MemoryPrekeyStore(), + disableRateLimit: true, + events: new PrekeyServerEvents(), + }); + const prekeyServer = Bun.serve({ port: 0, fetch: prekey.fetch }); + const prekeyUrl = `http://localhost:${prekeyServer.port}`; + + const alice = await createShade({ prekeyServer: prekeyUrl, address: 'alice' }); + const bob = await createShade({ prekeyServer: prekeyUrl, address: 'bob' }); + + // Bob: queue-route FIRST (configures bob's transports), then files.serve. + const queueRoute = await bob.transferQueueRoute({ blockMs: 1_500 }); + await bob.files.serve(opts.bobHandler); + const rpcRoute = bob.files.rpcRoute({ acceptFirstMessage: true }); + + const app = new Hono(); + app.route('/', queueRoute); + app.route('/', rpcRoute); + + const bobServer = Bun.serve({ port: 0, fetch: app.fetch }); + const baseUrl = `http://localhost:${bobServer.port}`; + + const fs = alice.files.httpClient('bob', { + rpcUrl: `${baseUrl}/rpc`, + outboundQueueUrl: `${baseUrl}/queue`, + transferBaseUrl: baseUrl, + defaultTimeoutMs: 10_000, + queueBlockMs: 1_000, + }); + + return { + alice, + bob, + fs, + baseUrl, + teardown: async () => { + fs.close(); + await alice.shutdown(); + await bob.shutdown(); + bobServer.stop(); + prekeyServer.stop(); + }, + }; +} + +describe('@shade/files HTTP RPC — pull-mode streams', () => { + test('streamed read (4 MiB) via long-poll queue', async () => { + const payload = new Uint8Array(4 * 1024 * 1024); + for (let i = 0; i < payload.length; i++) payload[i] = (i * 97) & 0xff; + + const rig = await setupPullRig({ + bobHandler: { + read: async () => { + // Return the payload as a streamed read so the rpc-handler + // promotes it via the streams-bridge into a transfer. + const stream = new ReadableStream({ + start(controller) { + const CHUNK = 256 * 1024; + for (let off = 0; off < payload.byteLength; off += CHUNK) { + controller.enqueue(payload.slice(off, Math.min(off + CHUNK, payload.byteLength))); + } + controller.close(); + }, + }); + // Need a precomputed sha256 for streamed reads. Use the + // crypto provider's sha256 directly. + const digest = new Uint8Array(await globalThis.crypto.subtle.digest('SHA-256', payload)); + const sha256Hex = Array.from(digest, (b) => b.toString(16).padStart(2, '0')).join(''); + return { + kind: 'streams' as const, + stream, + size: payload.byteLength, + sha256: sha256Hex, + contentType: 'application/octet-stream', + }; + }, + }, + }); + + try { + const result = await rig.fs.read('/big.bin'); + expect(result.kind).toBe('streams'); + if (result.kind !== 'streams') return; + + // Drain the stream and compare. + const reader = result.stream.getReader(); + const got = new Uint8Array(payload.byteLength); + let offset = 0; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + if (value !== undefined) { + got.set(value, offset); + offset += value.byteLength; + } + } + reader.releaseLock(); + await result.done(); + + expect(offset).toBe(payload.byteLength); + // Compare in 64KiB strides for speed. + let mismatch = -1; + for (let i = 0; i < payload.byteLength; i++) { + if (got[i] !== payload[i]) { + mismatch = i; + break; + } + } + expect(mismatch).toBe(-1); + } finally { + await rig.teardown(); + } + }, 30_000); + + test('streamed read fails with clear error when outboundQueueUrl is omitted', async () => { + const rig = await setupPullRig({ + bobHandler: { + read: async () => { + const stream = new ReadableStream({ + start(c) { + c.enqueue(new Uint8Array(512 * 1024)); + c.close(); + }, + }); + const digest = new Uint8Array(await globalThis.crypto.subtle.digest('SHA-256', new Uint8Array(512 * 1024))); + const sha256Hex = Array.from(digest, (b) => b.toString(16).padStart(2, '0')).join(''); + return { + kind: 'streams' as const, + stream, + size: 512 * 1024, + sha256: sha256Hex, + }; + }, + }, + }); + // Tear down the rig's drainer so we can construct an inline-only client + rig.fs.close(); + + const inlineOnly = rig.alice.files.httpClient('bob', { + rpcUrl: `${rig.baseUrl}/rpc`, + defaultTimeoutMs: 10_000, + }); + try { + await expect(inlineOnly.read('/big.bin')).rejects.toThrow(/streamed read/); + } finally { + inlineOnly.close(); + await rig.teardown(); + } + }, 15_000); + + test('long-poll returns empty events on idle timeout', async () => { + const rig = await setupPullRig({ + bobHandler: { + stat: async () => ({ + name: '_', + kind: 'dir' as const, + size: 0, + mtime: 0, + metadata: {}, + }), + }, + }); + try { + // Direct poll without any pending events — should return after blockMs. + const start = Date.now(); + const res = await fetch(`${rig.baseUrl}/queue`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Shade-Sender-Address': 'alice', + }, + body: JSON.stringify({ since: 0, blockMs: 500 }), + }); + expect(res.status).toBe(200); + const body = (await res.json()) as { events: unknown[]; nextSince: number }; + expect(body.events).toHaveLength(0); + expect(Date.now() - start).toBeGreaterThanOrEqual(400); + } finally { + await rig.teardown(); + } + }, 10_000); +}); diff --git a/packages/shade-inbox-server/package.json b/packages/shade-inbox-server/package.json index 6e5c3a7..6fae038 100644 --- a/packages/shade-inbox-server/package.json +++ b/packages/shade-inbox-server/package.json @@ -1,6 +1,6 @@ { "name": "@shade/inbox-server", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-inbox/package.json b/packages/shade-inbox/package.json index 6d949eb..b3a80ac 100644 --- a/packages/shade-inbox/package.json +++ b/packages/shade-inbox/package.json @@ -1,6 +1,6 @@ { "name": "@shade/inbox", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-key-transparency/package.json b/packages/shade-key-transparency/package.json index 07386e9..26113d8 100644 --- a/packages/shade-key-transparency/package.json +++ b/packages/shade-key-transparency/package.json @@ -1,6 +1,6 @@ { "name": "@shade/key-transparency", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-keychain/package.json b/packages/shade-keychain/package.json index efb1e6a..17c4ebe 100644 --- a/packages/shade-keychain/package.json +++ b/packages/shade-keychain/package.json @@ -1,6 +1,6 @@ { "name": "@shade/keychain", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-observability/package.json b/packages/shade-observability/package.json index 584a03b..011b2db 100644 --- a/packages/shade-observability/package.json +++ b/packages/shade-observability/package.json @@ -1,6 +1,6 @@ { "name": "@shade/observability", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-observer/package.json b/packages/shade-observer/package.json index fa663ef..414f20d 100644 --- a/packages/shade-observer/package.json +++ b/packages/shade-observer/package.json @@ -1,6 +1,6 @@ { "name": "@shade/observer", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-proto/package.json b/packages/shade-proto/package.json index 86eaef2..214e608 100644 --- a/packages/shade-proto/package.json +++ b/packages/shade-proto/package.json @@ -1,6 +1,6 @@ { "name": "@shade/proto", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-recovery/package.json b/packages/shade-recovery/package.json index 57786f5..12b9feb 100644 --- a/packages/shade-recovery/package.json +++ b/packages/shade-recovery/package.json @@ -1,6 +1,6 @@ { "name": "@shade/recovery", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-sdk/package.json b/packages/shade-sdk/package.json index 5c0afa8..beb9849 100644 --- a/packages/shade-sdk/package.json +++ b/packages/shade-sdk/package.json @@ -1,6 +1,6 @@ { "name": "@shade/sdk", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-sdk/src/shade.ts b/packages/shade-sdk/src/shade.ts index 30282f0..53e9c81 100644 --- a/packages/shade-sdk/src/shade.ts +++ b/packages/shade-sdk/src/shade.ts @@ -31,6 +31,8 @@ import { type TransferHandle, type TransferOptions, type TransferSummary, + type OutboundQueue as OutboundQueueLike, + type QueuedEventInput, } from '@shade/transfer'; import type { Hono } from 'hono'; import { BackgroundTasks } from './background.js'; @@ -151,6 +153,8 @@ export class Shade { private controlChannel: ShadeControlChannel | null = null; private peerBaseUrlResolver: ((peerAddress: string) => Promise) | null = null; private envelopeOutboxes: ControlEnvelopeTransport | null = null; + private transferTransportOverride: ITransferTransport | null = null; + private transferQueue: OutboundQueueLike | null = null; // `@shade/files` namespace, lazy + memoized. private filesNamespace: FilesNamespace | null = null; @@ -746,12 +750,52 @@ export class Shade { * HTTP POSTs to `/v1/transfer/control`). */ configureTransfers(opts: { - resolveBaseUrl: (peerAddress: string) => Promise; + /** + * Resolver for the peer's HTTP base URL (used by the default + * `ShadeTransferHttpTransport` to POST chunks). Optional when a + * custom `transport` and `envelopeTransport` are supplied — e.g. + * for pull-mode browser servers (`@shade/files transferQueueRoute`) + * which never POST chunks anywhere. + */ + resolveBaseUrl?: (peerAddress: string) => Promise; + /** + * Override the chunk-level transport. Defaults to + * `ShadeTransferHttpTransport` (HTTP POSTs per chunk) when + * `resolveBaseUrl` is supplied. Required when `resolveBaseUrl` + * is omitted. + */ + transport?: ITransferTransport; + /** + * Override the control-envelope transport. Defaults to HTTP POSTs + * to `/v1/transfer/control` when `resolveBaseUrl` is + * supplied. Required when `resolveBaseUrl` is omitted. + */ envelopeTransport?: ControlEnvelopeTransport; }): void { - this.peerBaseUrlResolver = opts.resolveBaseUrl; - this.envelopeOutboxes = - opts.envelopeTransport ?? new HttpEnvelopeTransport(opts.resolveBaseUrl, this.address); + if (opts.resolveBaseUrl === undefined) { + if (opts.transport === undefined || opts.envelopeTransport === undefined) { + throw new Error( + 'configureTransfers: resolveBaseUrl is required unless both `transport` and `envelopeTransport` are supplied (e.g. for pull-mode queue servers).', + ); + } + this.peerBaseUrlResolver = async () => { + throw new Error( + 'resolveBaseUrl was not configured — this Shade is in queue/pull mode and does not POST chunks. Configure a custom transport instead.', + ); + }; + } else { + this.peerBaseUrlResolver = opts.resolveBaseUrl; + } + this.transferTransportOverride = opts.transport ?? null; + if (opts.envelopeTransport !== undefined) { + this.envelopeOutboxes = opts.envelopeTransport; + } else if (opts.resolveBaseUrl !== undefined) { + this.envelopeOutboxes = new HttpEnvelopeTransport(opts.resolveBaseUrl, this.address); + } else { + throw new Error( + 'configureTransfers: envelopeTransport is required when resolveBaseUrl is omitted.', + ); + } } /** @@ -898,6 +942,109 @@ export class Shade { return (await this.engine()).onIncomingTransfer(handler); } + /** + * Mount the **pull-mode** transfer routes on a Hono app. Mount under + * any base path: `app.route('/api/v1/shade-files', shade.transferQueueRoute())`. + * + * Configures this Shade instance to queue all outbound chunks + + * control envelopes per peer instead of POSTing them. Browser-style + * receivers drain the queue via long-polling — no inbound HTTP + * listener required on the receiver. + * + * Routes mounted (relative to the base path): + * POST /queue — long-poll the per-peer outbound queue + * POST /v1/transfer/:streamId/chunk — receive incoming chunks (browser → server) + * GET /v1/transfer/:streamId/state — resume-state lookup + * POST /v1/transfer/control — receive incoming control envelopes + * GET /v1/transfer/health — peer reachability probe + * + * **Idempotent**: calling twice returns a fresh `Hono` app each + * time but reuses the underlying queue + transport (so the engine + * stays single). + * + * **Ordering**: must be called **before** `shade.files.serve(...)` + * (or any other path that builds the engine), because configuring + * the queue transport mutates the transfer stack. Calling after the + * engine is built throws. + */ + async transferQueueRoute(opts: TransferQueueRouteOptions = {}): Promise { + if (this.transferEngine !== null && this.transferTransportOverride === null) { + throw new Error( + 'transferQueueRoute(): the transfer engine has already been built with the default HTTP transport. Call transferQueueRoute() before any upload()/onIncomingTransfer()/configureTransfers().', + ); + } + const { OutboundQueue, QueueTransferTransport } = await import('@shade/transfer'); + if (this.transferQueue === null) { + this.transferQueue = new OutboundQueue({ + ...(opts.maxEventsPerPeer !== undefined ? { maxEventsPerPeer: opts.maxEventsPerPeer } : {}), + ...(opts.idleEvictionMs !== undefined ? { idleEvictionMs: opts.idleEvictionMs } : {}), + }); + } + if (this.transferTransportOverride === null) { + const queueTransport = new QueueTransferTransport(this.transferQueue); + const queueEnvelopeTransport = new QueueEnvelopeTransport(this.transferQueue); + this.configureTransfers({ + transport: queueTransport, + envelopeTransport: queueEnvelopeTransport, + }); + } + const queue = this.transferQueue; + const blockMs = opts.blockMs ?? 30_000; + const maxBlockMs = opts.maxBlockMs ?? 55_000; + const engine = await this.engine(); + const { createTransferRoutes, PermissiveAuthenticator } = await import('@shade/transfer'); + const app = await createTransferRoutes(engine, { + authenticator: PermissiveAuthenticator, + }); + app.post('/v1/transfer/control', async (c) => { + const senderAddress = c.req.header('X-Shade-Sender-Address'); + if (senderAddress === undefined || senderAddress === '') { + return c.json({ error: 'missing X-Shade-Sender-Address' }, 400); + } + const ab = await c.req.arrayBuffer(); + const bytes = new Uint8Array(ab); + try { + await this.acceptTransferEnvelope(senderAddress, bytes); + } catch (err) { + return c.json({ error: (err as Error).message }, 400); + } + return c.json({ ok: true }); + }); + // Long-poll endpoint. + app.post('/queue', async (c) => { + const senderAddress = c.req.header('X-Shade-Sender-Address'); + if (senderAddress === undefined || senderAddress === '') { + return c.json({ error: 'missing X-Shade-Sender-Address' }, 400); + } + let body: { since?: unknown; blockMs?: unknown }; + try { + body = (await c.req.json()) as { since?: unknown; blockMs?: unknown }; + } catch { + return c.json({ error: 'invalid JSON body' }, 400); + } + const since = typeof body.since === 'number' && Number.isFinite(body.since) ? body.since : 0; + const requestedBlockMs = + typeof body.blockMs === 'number' && Number.isFinite(body.blockMs) + ? Math.max(0, Math.min(maxBlockMs, body.blockMs)) + : blockMs; + // Bun-side short-circuit if the request was aborted while we + // were holding the long-poll. AbortSignal from the request body + // is already surfaced via `c.req.raw.signal` in Hono. + const events = await queue.drain(senderAddress, since, requestedBlockMs, c.req.raw.signal); + return c.json({ + events: events.map((e) => ({ + id: e.id, + timestampMs: e.timestampMs, + kind: e.kind, + bytesB64: bytesToBase64Std(e.bytes), + ...(e.kind === 'chunk' ? { meta: e.meta } : {}), + })), + nextSince: events.length > 0 ? events[events.length - 1]!.id : since, + }); + }); + return app; + } + /** * Mount the receiver-side HTTP routes on a Hono app. Mount under any * base path: `app.route('/shade', await shade.transferRoute())`. @@ -1019,16 +1166,23 @@ export class Shade { ); } this.controlChannel = new ShadeControlChannel(this, this.envelopeOutboxes); - const httpTransport: ITransferTransport = new ShadeTransferHttpTransport({ - resolveBaseUrl: this.peerBaseUrlResolver, - authenticator: await this.makeAuthenticator(), - }); - - let transport: ITransferTransport = httpTransport; + let transport: ITransferTransport; let webrtcRuntime: ShadeWebRtcRuntime | null = null; - if (this.webrtcConfig !== null) { - webrtcRuntime = await this.buildWebRtcRuntime(this.webrtcConfig, httpTransport); - transport = webrtcRuntime.fallback; + if (this.transferTransportOverride !== null) { + // Custom transport (queue, in-memory, custom adapter) — used as-is. + // WebRTC fallback only attaches when the default HTTP transport is + // active because WebRTC's `MultiTransportFallback` is HTTP-shaped. + transport = this.transferTransportOverride; + } else { + const httpTransport: ITransferTransport = new ShadeTransferHttpTransport({ + resolveBaseUrl: this.peerBaseUrlResolver, + authenticator: await this.makeAuthenticator(), + }); + transport = httpTransport; + if (this.webrtcConfig !== null) { + webrtcRuntime = await this.buildWebRtcRuntime(this.webrtcConfig, httpTransport); + transport = webrtcRuntime.fallback; + } } this.transferEngine = new TransferEngine({ @@ -1256,6 +1410,53 @@ function parseChunkHeader(bytes: Uint8Array): { return { streamId, laneId, seq }; } +// ─── Queue-mode (pull) envelope transport ───────────────────── + +/** + * Configuration for {@link Shade.transferQueueRoute}. All fields are + * optional with sensible production defaults. + */ +export interface TransferQueueRouteOptions { + /** + * Long-poll timeout in milliseconds. Server holds the request open + * up to this long before returning an empty `events` array. Default + * 30_000. + */ + blockMs?: number; + /** + * Hard cap on long-poll timeout (clamps client-supplied `blockMs`). + * Default 55_000 — under typical reverse-proxy idle thresholds (60s + * on most CDNs). + */ + maxBlockMs?: number; + /** + * Per-peer ring-buffer size. When the queue is full, oldest events + * are dropped on enqueue. Receivers detect the gap via missing + * sequence numbers and re-resume from `since=0`. Default 1000. + */ + maxEventsPerPeer?: number; + /** + * Drop a peer's queue + reject pending pollers after this much + * silence. Default 10 minutes. Setting to `0` disables idle-eviction. + */ + idleEvictionMs?: number; +} + +/** + * `ControlEnvelopeTransport` that enqueues outbound envelopes into an + * `OutboundQueue` for browser-style receivers to long-poll. Mirrors + * `HttpEnvelopeTransport` shape (one `send(peer, envelope)` method); + * the difference is the destination — local queue, not remote HTTP. + */ +class QueueEnvelopeTransport implements ControlEnvelopeTransport { + constructor(private readonly queue: OutboundQueueLike) {} + async send(peerAddress: string, envelope: ShadeEnvelope): Promise { + const bytes = encodeEnvelope(envelope); + const event: QueuedEventInput = { kind: 'envelope', bytes }; + this.queue.enqueue(peerAddress, event); + } +} + // ─── Default HTTP envelope transport ────────────────────────── class HttpEnvelopeTransport implements ControlEnvelopeTransport { diff --git a/packages/shade-server/package.json b/packages/shade-server/package.json index 4a4457a..c5f7f51 100644 --- a/packages/shade-server/package.json +++ b/packages/shade-server/package.json @@ -1,6 +1,6 @@ { "name": "@shade/server", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-storage-encrypted/package.json b/packages/shade-storage-encrypted/package.json index f5bfb45..ee15f43 100644 --- a/packages/shade-storage-encrypted/package.json +++ b/packages/shade-storage-encrypted/package.json @@ -1,6 +1,6 @@ { "name": "@shade/storage-encrypted", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-storage-postgres/package.json b/packages/shade-storage-postgres/package.json index 68c2b33..2cbcbc6 100644 --- a/packages/shade-storage-postgres/package.json +++ b/packages/shade-storage-postgres/package.json @@ -1,6 +1,6 @@ { "name": "@shade/storage-postgres", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-storage-sqlite/package.json b/packages/shade-storage-sqlite/package.json index af80b9e..92dbfcd 100644 --- a/packages/shade-storage-sqlite/package.json +++ b/packages/shade-storage-sqlite/package.json @@ -1,6 +1,6 @@ { "name": "@shade/storage-sqlite", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-streams/package.json b/packages/shade-streams/package.json index 6c9f61d..b46c39a 100644 --- a/packages/shade-streams/package.json +++ b/packages/shade-streams/package.json @@ -1,6 +1,6 @@ { "name": "@shade/streams", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-transfer/package.json b/packages/shade-transfer/package.json index 104beb8..c7e320a 100644 --- a/packages/shade-transfer/package.json +++ b/packages/shade-transfer/package.json @@ -1,6 +1,6 @@ { "name": "@shade/transfer", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-transfer/src/index.ts b/packages/shade-transfer/src/index.ts index fd76684..56b3be2 100644 --- a/packages/shade-transfer/src/index.ts +++ b/packages/shade-transfer/src/index.ts @@ -7,6 +7,7 @@ export * from './transport/memory.js'; export * from './transport/http-transport.js'; export * from './transport/ws-transport.js'; export * from './transport/multi-fallback.js'; +export * from './transport/queue-transport.js'; export * from './engine.js'; export { createTransferRoutes, diff --git a/packages/shade-transfer/src/transport/queue-transport.ts b/packages/shade-transfer/src/transport/queue-transport.ts new file mode 100644 index 0000000..d6c30af --- /dev/null +++ b/packages/shade-transfer/src/transport/queue-transport.ts @@ -0,0 +1,319 @@ +/** + * Per-peer outbound queue + queue-transport for browser-friendly + * pull-mode streams. + * + * The default `ShadeTransferHttpTransport` POSTs each chunk directly + * to the receiver's `/v1/transfer//chunk` route. That + * requires the receiver to host an HTTP server, which a browser tab + * cannot. `QueueTransferTransport` flips the direction: it queues + * chunks per peer and lets the receiver pull them via a long-poll + * endpoint. + * + * The companion `OutboundQueue` data structure is plain server-side + * state — wired up by `@shade/files`'s `transferQueueRoute()` (and any + * future consumer) to expose the long-poll surface and feed envelopes + * + chunks into the browser receiver. + */ +import type { + ChunkAck, + ChunkSendOptions, + ITransferTransport, + TransferResumeState, +} from './transport.js'; +import { TransferTransportError } from '../errors.js'; + +/** + * Discriminated union of items the queue ships to the browser + * receiver. Both kinds carry **wire-encoded bytes** of an envelope — + * the receiver decodes via `decodeEnvelope` (control envelopes) or + * forwards directly to `engine.receiveChunk` (chunk envelopes). + */ +export type QueuedEvent = + | { + id: number; + timestampMs: number; + kind: 'envelope'; + /** Wire-encoded `0x02` ratchet envelope (or `0x01` first-message). */ + bytes: Uint8Array; + } + | { + id: number; + timestampMs: number; + kind: 'chunk'; + /** Wire-encoded `0x11` stream-chunk envelope. */ + bytes: Uint8Array; + meta: { + streamId: string; + laneId: number; + seq: number; + }; + }; + +/** Caller-supplied shape for {@link OutboundQueue.enqueue} — the queue assigns `id` + `timestampMs`. */ +export type QueuedEventInput = + | { kind: 'envelope'; bytes: Uint8Array } + | { + kind: 'chunk'; + bytes: Uint8Array; + meta: { streamId: string; laneId: number; seq: number }; + }; + +export interface OutboundQueueOptions { + /** + * Maximum events held per peer. When the queue is full, the oldest + * unacked event is dropped on next enqueue. Default 1000 — at the + * default chunk size (256 KiB plaintext) this caps a single peer's + * outbound buffer at ~256 MiB. Tune up for fewer/bigger streams, + * down for many concurrent small flows. + */ + maxEventsPerPeer?: number; + /** + * After a peer has not polled for this long, the queue's events are + * dropped and any pending waiters are released. Default 10 minutes. + * Setting to `0` disables idle-eviction. + */ + idleEvictionMs?: number; +} + +const DEFAULT_MAX_EVENTS = 1000; +const DEFAULT_IDLE_EVICTION_MS = 10 * 60 * 1000; + +interface PendingWaiter { + resolve(events: QueuedEvent[]): void; + reject(err: Error): void; + timer: ReturnType; + abortHandler?: () => void; + signal?: AbortSignal; +} + +interface PerPeerState { + nextId: number; + events: QueuedEvent[]; + waiters: PendingWaiter[]; + lastTouchedMs: number; +} + +/** + * Per-peer monotonic event log with long-poll semantics. + * + * `enqueue` appends; `drain` returns all events with `id > since`, + * blocking up to `blockMs` if there are none. `since`-based pagination + * is the resume mechanism: a client crashing mid-stream restarts with + * its last-processed id and the queue replays everything after it + * (subject to `maxEventsPerPeer` retention). + */ +export class OutboundQueue { + private peers = new Map(); + private readonly maxEvents: number; + private readonly idleEvictionMs: number; + private evictTimer: ReturnType | null = null; + private destroyed = false; + + constructor(opts: OutboundQueueOptions = {}) { + this.maxEvents = opts.maxEventsPerPeer ?? DEFAULT_MAX_EVENTS; + this.idleEvictionMs = opts.idleEvictionMs ?? DEFAULT_IDLE_EVICTION_MS; + if (this.idleEvictionMs > 0) this.scheduleEviction(); + } + + /** Append an event and wake any waiters for that peer. */ + enqueue(peer: string, ev: QueuedEventInput): QueuedEvent { + if (this.destroyed) throw new Error('OutboundQueue: destroyed'); + const state = this.getOrCreate(peer); + const event: QueuedEvent = + ev.kind === 'chunk' + ? { + id: state.nextId++, + timestampMs: Date.now(), + kind: 'chunk', + bytes: ev.bytes, + meta: ev.meta, + } + : { + id: state.nextId++, + timestampMs: Date.now(), + kind: 'envelope', + bytes: ev.bytes, + }; + state.events.push(event); + state.lastTouchedMs = Date.now(); + // Cap: drop oldest. Lost events trigger receiver-side resume from + // last polled id; the @shade/transfer engine handles missing seqs + // by re-sending on resume. + while (state.events.length > this.maxEvents) state.events.shift(); + // Wake all waiters with whatever has accumulated. + const drained = this.collect(state, 0); + if (drained.length > 0) { + const waiters = state.waiters.splice(0); + for (const w of waiters) { + clearTimeout(w.timer); + if (w.abortHandler !== undefined && w.signal !== undefined) { + w.signal.removeEventListener('abort', w.abortHandler); + } + w.resolve(drained); + } + } + return event; + } + + /** + * Drain events with `id > since`. If none are available, block up + * to `blockMs` until any arrive. `signal` cancels the wait early. + */ + async drain( + peer: string, + since: number, + blockMs: number, + signal?: AbortSignal, + ): Promise { + if (this.destroyed) throw new Error('OutboundQueue: destroyed'); + const state = this.getOrCreate(peer); + state.lastTouchedMs = Date.now(); + const ready = this.collect(state, since); + if (ready.length > 0 || blockMs <= 0) return ready; + if (signal?.aborted) return []; + return await new Promise((resolve, reject) => { + const timer = setTimeout(() => { + const idx = state.waiters.indexOf(waiter); + if (idx >= 0) state.waiters.splice(idx, 1); + if (waiter.abortHandler !== undefined && waiter.signal !== undefined) { + waiter.signal.removeEventListener('abort', waiter.abortHandler); + } + // Empty drain on timeout — that's the "no new events" signal. + resolve([]); + }, blockMs); + const waiter: PendingWaiter = { resolve, reject, timer }; + if (signal !== undefined) { + const handler = () => { + const idx = state.waiters.indexOf(waiter); + if (idx >= 0) state.waiters.splice(idx, 1); + clearTimeout(timer); + resolve([]); + }; + signal.addEventListener('abort', handler, { once: true }); + waiter.abortHandler = handler; + waiter.signal = signal; + } + state.waiters.push(waiter); + }); + } + + /** Drop a peer's queue + reject waiters. */ + evict(peer: string): void { + const state = this.peers.get(peer); + if (state === undefined) return; + this.peers.delete(peer); + for (const w of state.waiters) { + clearTimeout(w.timer); + if (w.abortHandler !== undefined && w.signal !== undefined) { + w.signal.removeEventListener('abort', w.abortHandler); + } + w.reject(new Error('OutboundQueue: peer evicted')); + } + } + + /** Peer-specific snapshot for diagnostics. */ + stats(peer: string): { eventCount: number; nextId: number; waiters: number } | null { + const state = this.peers.get(peer); + if (state === undefined) return null; + return { + eventCount: state.events.length, + nextId: state.nextId, + waiters: state.waiters.length, + }; + } + + /** Tear everything down. Pending waiters are rejected. */ + destroy(): void { + if (this.destroyed) return; + this.destroyed = true; + if (this.evictTimer !== null) clearTimeout(this.evictTimer); + for (const peer of [...this.peers.keys()]) this.evict(peer); + } + + // ─── internals ────────────────────────────────────────────── + + private getOrCreate(peer: string): PerPeerState { + let state = this.peers.get(peer); + if (state === undefined) { + state = { + nextId: 1, + events: [], + waiters: [], + lastTouchedMs: Date.now(), + }; + this.peers.set(peer, state); + } + return state; + } + + private collect(state: PerPeerState, since: number): QueuedEvent[] { + if (state.events.length === 0) return []; + return state.events.filter((e) => e.id > since); + } + + private scheduleEviction(): void { + const interval = Math.max(60_000, Math.floor(this.idleEvictionMs / 4)); + this.evictTimer = setTimeout(() => { + if (this.destroyed) return; + const cutoff = Date.now() - this.idleEvictionMs; + for (const [peer, state] of this.peers.entries()) { + if (state.lastTouchedMs < cutoff) this.evict(peer); + } + this.scheduleEviction(); + }, interval); + (this.evictTimer as unknown as { unref?: () => void }).unref?.(); + } +} + +/** + * Chunk transport that enqueues into an {@link OutboundQueue} instead + * of POSTing. + * + * Returns an optimistic `ChunkAck` immediately because the queue *is* + * the delivery — the receiver polls and dispatches. Browser receivers + * cannot synchronously confirm receipt before the next chunk; the + * engine's stream-protocol catches dropped chunks at finish-time + * integrity check, and chunk-resume restarts the lane from the last + * polled `since`. + */ +export class QueueTransferTransport implements ITransferTransport { + constructor(private readonly queue: OutboundQueue) {} + + async probe(_peer: string): Promise { + // The queue is local. Reachability is "is there a poller?" which + // is decided by `idleEvictionMs`. We don't synchronously check + // here; the engine retries via `withRetry` on `sendChunk` errors. + } + + async sendChunk( + peer: string, + streamId: string, + laneId: number, + seq: number | bigint, + bytes: Uint8Array, + options?: ChunkSendOptions, + ): Promise { + if (options?.signal?.aborted) { + throw new TransferTransportError('sendChunk aborted by caller'); + } + const seqNum = typeof seq === 'bigint' ? Number(seq) : seq; + this.queue.enqueue(peer, { + kind: 'chunk', + bytes, + meta: { streamId, laneId, seq: seqNum }, + }); + return { lastSeq: seqNum }; + } + + async fetchResumeState( + _peer: string, + _streamId: string, + ): Promise { + // Pull-mode receivers report resume state by re-polling with the + // `since` cursor they last successfully processed; the queue does + // not need to query the receiver. Return null so the engine + // restarts from seq 0 (deterministic), and the queue replays from + // `since=0` if the client reconnects fresh. + return null; + } +} diff --git a/packages/shade-transport-bridge/package.json b/packages/shade-transport-bridge/package.json index 47872cd..f07f9e7 100644 --- a/packages/shade-transport-bridge/package.json +++ b/packages/shade-transport-bridge/package.json @@ -1,6 +1,6 @@ { "name": "@shade/transport-bridge", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-transport-webrtc/package.json b/packages/shade-transport-webrtc/package.json index cf8209a..454798b 100644 --- a/packages/shade-transport-webrtc/package.json +++ b/packages/shade-transport-webrtc/package.json @@ -1,6 +1,6 @@ { "name": "@shade/transport-webrtc", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-transport/package.json b/packages/shade-transport/package.json index 39439a4..84f7bfd 100644 --- a/packages/shade-transport/package.json +++ b/packages/shade-transport/package.json @@ -1,6 +1,6 @@ { "name": "@shade/transport", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts", diff --git a/packages/shade-widgets/package.json b/packages/shade-widgets/package.json index 442d168..5c33607 100644 --- a/packages/shade-widgets/package.json +++ b/packages/shade-widgets/package.json @@ -1,6 +1,6 @@ { "name": "@shade/widgets", - "version": "4.1.0", + "version": "4.2.0", "type": "module", "main": "src/index.ts", "types": "src/index.ts",