Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7520b11b25 |
145
CHANGELOG.md
145
CHANGELOG.md
@@ -5,6 +5,151 @@ All notable changes to Shade are documented in this file.
|
|||||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
|
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
|
||||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||||
|
|
||||||
|
## [4.2.0] — 2026-05-03 — Pull-mode streams for browser @shade/files
|
||||||
|
|
||||||
|
`4.1.0` shipped HTTP RPC for browser clients but capped them at inline
|
||||||
|
payloads (≤ 256 KiB). Larger reads/writes — mod-jars (1–50 MB),
|
||||||
|
world-backups (100+ MB), the things that actually need streaming —
|
||||||
|
threw `ConflictError` directing callers to the server-to-server
|
||||||
|
pathway. That made browser-side `@shade/files` insufficient for
|
||||||
|
admin-panel-style apps where the client is a browser tab and the
|
||||||
|
server is a Bun process.
|
||||||
|
|
||||||
|
`4.2.0` flips the direction: when the browser supplies
|
||||||
|
`outboundQueueUrl` + `transferBaseUrl`, server-to-browser chunks +
|
||||||
|
control envelopes ride a per-peer queue that the browser long-polls,
|
||||||
|
and browser-to-server chunks POST directly to the server's existing
|
||||||
|
chunk-receive routes. No WebSockets, no SSE, no inbound listener on
|
||||||
|
the browser. Long-polling + a request-response inbound queue is
|
||||||
|
the entire wire surface.
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
#### `@shade/transfer`
|
||||||
|
- `OutboundQueue` — per-peer monotonic event log with long-poll
|
||||||
|
semantics. `enqueue(peer, event)` appends, `drain(peer, since,
|
||||||
|
blockMs, signal)` returns events with `id > since` (blocking up
|
||||||
|
to `blockMs` if none are ready). Idle-eviction GC drops peers
|
||||||
|
that haven't polled in `idleEvictionMs` (default 10 min). Ring-
|
||||||
|
buffered to `maxEventsPerPeer` (default 1000) — overflow drops
|
||||||
|
oldest, receivers pick up the gap via re-resume from `since=0`.
|
||||||
|
- `QueuedEvent` discriminated union: `{ kind: 'envelope', bytes }`
|
||||||
|
or `{ kind: 'chunk', bytes, meta: { streamId, laneId, seq } }`.
|
||||||
|
- `QueueTransferTransport` (implements `ITransferTransport`) —
|
||||||
|
enqueues outbound chunks instead of POSTing. Returns optimistic
|
||||||
|
`ChunkAck` because the queue *is* the delivery; chunk-resume picks
|
||||||
|
up dropped events on receiver-side reconnect.
|
||||||
|
|
||||||
|
#### `@shade/sdk`
|
||||||
|
- `Shade.transferQueueRoute(opts?)` — Hono app with all five routes a
|
||||||
|
pull-mode receiver needs:
|
||||||
|
- `POST /queue` — long-poll the per-peer outbound queue.
|
||||||
|
- `POST /v1/transfer/:streamId/chunk` — receive incoming chunks
|
||||||
|
(browser → server writes).
|
||||||
|
- `GET /v1/transfer/:streamId/state` — resume-state lookup.
|
||||||
|
- `POST /v1/transfer/control` — receive incoming control envelopes
|
||||||
|
(browser → server stream-init / abort).
|
||||||
|
- `GET /v1/transfer/health` — peer reachability probe.
|
||||||
|
Auto-configures `shade.configureTransfers(...)` with the queue
|
||||||
|
transport + `QueueEnvelopeTransport` if not already configured.
|
||||||
|
- `Shade.configureTransfers(opts)` extended: `resolveBaseUrl` is now
|
||||||
|
optional when `transport` and `envelopeTransport` are both supplied
|
||||||
|
(lets pure-queue servers omit the baseUrl entirely). New
|
||||||
|
`transport?: ITransferTransport` override slot.
|
||||||
|
- `QueueEnvelopeTransport` — `ControlEnvelopeTransport` impl that
|
||||||
|
enqueues outbound envelopes for browser receivers.
|
||||||
|
|
||||||
|
#### `@shade/files`
|
||||||
|
- `createFilesHttpClient` (and `shade.files.httpClient`) accept new
|
||||||
|
options:
|
||||||
|
- `outboundQueueUrl` — `/queue` endpoint to long-poll.
|
||||||
|
- `transferBaseUrl` — base URL for outbound chunk POSTs and control
|
||||||
|
envelope POSTs (browser → server writes).
|
||||||
|
- `queueBlockMs` — long-poll timeout (default 30 s; server clamps
|
||||||
|
at `maxBlockMs`).
|
||||||
|
When set, the client:
|
||||||
|
1. Configures `shade.configureTransfers({ resolveBaseUrl })` so
|
||||||
|
outbound chunks POST to `<transferBaseUrl>/v1/transfer/...`.
|
||||||
|
2. Builds a `ClientStreamsBridge` eagerly so the engine's
|
||||||
|
incoming-transfer subscription is in place before the drainer
|
||||||
|
dispatches the first envelope.
|
||||||
|
3. Starts a long-poll `startQueueDrainer(...)` that pulls queued
|
||||||
|
events and dispatches them via `shade.acceptTransferEnvelope`.
|
||||||
|
- Streamed reads (`fs.read` of files > 256 KiB) and streamed writes
|
||||||
|
(`fs.write` of large inputs) now work end-to-end on the browser
|
||||||
|
client when the queue options are set.
|
||||||
|
- `startQueueDrainer(shade, opts)` exported for advanced consumers
|
||||||
|
that want to drive their own drainer (e.g. service-worker setups
|
||||||
|
that want a single shared drainer across multiple `httpClient`s).
|
||||||
|
- `client.close()` now stops the drainer and tears down the streams-
|
||||||
|
bridge — important on tab unload to free the long-poll socket.
|
||||||
|
|
||||||
|
#### `@shade/files` (internal)
|
||||||
|
- `ClientStreamsBridge` uses a TransformStream with `highWaterMark:
|
||||||
|
64` instead of the default `0` so the receive-side write loop
|
||||||
|
doesn't stall on backpressure before the consumer attaches its
|
||||||
|
reader (default HWM stalled at chunk 4 in pull-mode where the
|
||||||
|
drainer races the consumer's `getReader()` call).
|
||||||
|
|
||||||
|
### Wire contract
|
||||||
|
|
||||||
|
```
|
||||||
|
POST <base>/queue HTTP/1.1
|
||||||
|
X-Shade-Sender-Address: alice@example.com
|
||||||
|
{ "since": 42, "blockMs": 30000 }
|
||||||
|
|
||||||
|
────
|
||||||
|
|
||||||
|
200 OK
|
||||||
|
{
|
||||||
|
"events": [
|
||||||
|
{ "id": 43, "kind": "envelope", "bytesB64": "...", "timestampMs": 1730... },
|
||||||
|
{ "id": 44, "kind": "chunk", "bytesB64": "...", "meta": { "streamId": "...", "laneId": 0, "seq": 0 } },
|
||||||
|
...
|
||||||
|
],
|
||||||
|
"nextSince": 47
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Tests
|
||||||
|
|
||||||
|
`tests/integration/http-rpc-streams.test.ts` — three integration tests:
|
||||||
|
- 4 MiB streamed read end-to-end via long-poll queue (verifies bytes
|
||||||
|
match the source).
|
||||||
|
- Inline-only client throws clear error on streamed read.
|
||||||
|
- Long-poll returns empty events on idle timeout (verifies the
|
||||||
|
`blockMs` pathway).
|
||||||
|
|
||||||
|
### Migration
|
||||||
|
|
||||||
|
`4.1.0 → 4.2.0` is wire-compatible and source-compatible — the
|
||||||
|
queue route is purely additive. To enable streamed transfers in a
|
||||||
|
browser app:
|
||||||
|
|
||||||
|
```ts
|
||||||
|
// Server
|
||||||
|
const queue = await shade.transferQueueRoute({ blockMs: 30_000 });
|
||||||
|
await shade.files.serve(handler);
|
||||||
|
const rpc = shade.files.rpcRoute({ acceptFirstMessage: true });
|
||||||
|
|
||||||
|
const app = new Hono();
|
||||||
|
app.route('/api/v1/shade-files', queue);
|
||||||
|
app.route('/api/v1/shade-files', rpc);
|
||||||
|
|
||||||
|
// Browser
|
||||||
|
const fs = shade.files.httpClient(serverAddress, {
|
||||||
|
rpcUrl: 'https://server/api/v1/shade-files/rpc',
|
||||||
|
outboundQueueUrl: 'https://server/api/v1/shade-files/queue',
|
||||||
|
transferBaseUrl: 'https://server/api/v1/shade-files',
|
||||||
|
});
|
||||||
|
await fs.write('/mods/some-mod.jar', new Uint8Array(/* 50 MB */));
|
||||||
|
const result = await fs.read('/backups/world.tar.gz'); // streamed
|
||||||
|
```
|
||||||
|
|
||||||
|
`shade.files.serve(handler, { inlineOnly: true })` is still supported
|
||||||
|
for HTTP-RPC-without-streams deployments — it skips the streams-bridge
|
||||||
|
setup entirely.
|
||||||
|
|
||||||
## [4.1.0] — 2026-05-03 — Browser-friendly HTTP RPC for @shade/files
|
## [4.1.0] — 2026-05-03 — Browser-friendly HTTP RPC for @shade/files
|
||||||
|
|
||||||
The default `shade.files.client(peer)` requires both peers to be
|
The default `shade.files.client(peer)` requires both peers to be
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/cli",
|
"name": "@shade/cli",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/cli.ts",
|
"main": "src/cli.ts",
|
||||||
"bin": {
|
"bin": {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/core",
|
"name": "@shade/core",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/crypto-web",
|
"name": "@shade/crypto-web",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/dashboard",
|
"name": "@shade/dashboard",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"dev": "vite",
|
"dev": "vite",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/files",
|
"name": "@shade/files",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -69,6 +69,11 @@ import {
|
|||||||
import { buildRpcRequest } from '../protocol/rpc-builder.js';
|
import { buildRpcRequest } from '../protocol/rpc-builder.js';
|
||||||
import { decideInline, INLINE_THRESHOLD, type WriteSource } from './inline-threshold.js';
|
import { decideInline, INLINE_THRESHOLD, type WriteSource } from './inline-threshold.js';
|
||||||
import { base64ToBytes, bytesToBase64 } from '../protocol/canonical.js';
|
import { base64ToBytes, bytesToBase64 } from '../protocol/canonical.js';
|
||||||
|
import { startQueueDrainer, type QueueDrainerHandle } from './queue-drainer.js';
|
||||||
|
import {
|
||||||
|
createClientStreamsBridge,
|
||||||
|
type ClientStreamsBridge,
|
||||||
|
} from './streams-bridge.js';
|
||||||
import type {
|
import type {
|
||||||
FileClient,
|
FileClient,
|
||||||
ReadOpts,
|
ReadOpts,
|
||||||
@@ -80,7 +85,7 @@ import type {
|
|||||||
} from './client.js';
|
} from './client.js';
|
||||||
|
|
||||||
export interface FilesHttpClientOptions
|
export interface FilesHttpClientOptions
|
||||||
extends Omit<CreateFileClientOptions, 'streamsBridge' | 'ioTimeoutMs'> {
|
extends Omit<CreateFileClientOptions, 'streamsBridge'> {
|
||||||
/**
|
/**
|
||||||
* Server endpoint that hosts `createFilesRpcRoute(...)`. Typically:
|
* Server endpoint that hosts `createFilesRpcRoute(...)`. Typically:
|
||||||
* `https://server.example.com/api/v1/shade-files/rpc`.
|
* `https://server.example.com/api/v1/shade-files/rpc`.
|
||||||
@@ -98,6 +103,32 @@ export interface FilesHttpClientOptions
|
|||||||
* orthogonal to the ratchet authentication on the envelope itself.
|
* orthogonal to the ratchet authentication on the envelope itself.
|
||||||
*/
|
*/
|
||||||
headers?: Record<string, string>;
|
headers?: Record<string, string>;
|
||||||
|
/**
|
||||||
|
* Server endpoint that hosts `transferQueueRoute()`'s long-poll
|
||||||
|
* endpoint. Typically:
|
||||||
|
* `https://server.example.com/api/v1/shade-files/queue`.
|
||||||
|
*
|
||||||
|
* When supplied, the client starts a background long-poll that
|
||||||
|
* drains queued envelopes + chunks from the server and dispatches
|
||||||
|
* them via `shade.acceptTransferEnvelope`. This unlocks
|
||||||
|
* **streamed reads** (>256 KiB) for browser-style consumers.
|
||||||
|
*/
|
||||||
|
outboundQueueUrl?: string;
|
||||||
|
/**
|
||||||
|
* Base URL for outbound transfer routes (browser → server). Required
|
||||||
|
* alongside `outboundQueueUrl` to enable streamed writes. Typically:
|
||||||
|
* `https://server.example.com/api/v1/shade-files`.
|
||||||
|
*
|
||||||
|
* The client POSTs:
|
||||||
|
* - chunks to `<base>/v1/transfer/<streamId>/chunk`
|
||||||
|
* - control envelopes to `<base>/v1/transfer/control`
|
||||||
|
*/
|
||||||
|
transferBaseUrl?: string;
|
||||||
|
/**
|
||||||
|
* Long-poll block timeout, milliseconds. Default 30_000. Server
|
||||||
|
* clamps to its own `maxBlockMs` (default 55_000).
|
||||||
|
*/
|
||||||
|
queueBlockMs?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface RoundTripOpts {
|
interface RoundTripOpts {
|
||||||
@@ -112,6 +143,12 @@ interface RoundTripOpts {
|
|||||||
* (via `shade.initSessionFromBundle(peerAddress, bundle)` or an
|
* (via `shade.initSessionFromBundle(peerAddress, bundle)` or an
|
||||||
* incoming first-message). Otherwise the first RPC will fail with
|
* incoming first-message). Otherwise the first RPC will fail with
|
||||||
* "decrypt failed: no session for peer".
|
* "decrypt failed: no session for peer".
|
||||||
|
*
|
||||||
|
* When `outboundQueueUrl` + `transferBaseUrl` are supplied, the
|
||||||
|
* client also unlocks **streamed reads/writes** for files larger than
|
||||||
|
* the inline threshold (256 KiB). The browser polls the server's
|
||||||
|
* outbound queue for chunks/envelopes and POSTs its own outbound
|
||||||
|
* chunks to the server's transfer-receive routes.
|
||||||
*/
|
*/
|
||||||
export function createFilesHttpClient(
|
export function createFilesHttpClient(
|
||||||
shade: ShadeBridge,
|
shade: ShadeBridge,
|
||||||
@@ -122,9 +159,84 @@ export function createFilesHttpClient(
|
|||||||
const fetchFn = options.fetch ?? globalThis.fetch.bind(globalThis);
|
const fetchFn = options.fetch ?? globalThis.fetch.bind(globalThis);
|
||||||
const extraHeaders = options.headers ?? {};
|
const extraHeaders = options.headers ?? {};
|
||||||
const defaultTimeoutMs = options.defaultTimeoutMs ?? 30_000;
|
const defaultTimeoutMs = options.defaultTimeoutMs ?? 30_000;
|
||||||
|
const ioTimeoutMs = options.ioTimeoutMs ?? 60_000;
|
||||||
const signRequest = options.signRequest;
|
const signRequest = options.signRequest;
|
||||||
const senderAddress = shade.myAddress;
|
const senderAddress = shade.myAddress;
|
||||||
|
|
||||||
|
// ─── Streamed-mode bootstrap ─────────────────────────────────
|
||||||
|
//
|
||||||
|
// When `outboundQueueUrl` is supplied, the client:
|
||||||
|
// 1. Configures `shade.configureTransfers(...)` so outbound
|
||||||
|
// chunks POST to `<transferBaseUrl>/v1/transfer/<streamId>/chunk`
|
||||||
|
// and outbound control envelopes POST to
|
||||||
|
// `<transferBaseUrl>/v1/transfer/control`.
|
||||||
|
// 2. Spawns a streams-bridge so streamed reads can be awaited.
|
||||||
|
// 3. Starts a long-poll drainer that pulls queued envelopes +
|
||||||
|
// chunks from the server and dispatches via
|
||||||
|
// `shade.acceptTransferEnvelope`.
|
||||||
|
|
||||||
|
let drainer: QueueDrainerHandle | null = null;
|
||||||
|
let streamsBridgePromise: Promise<ClientStreamsBridge> | null = null;
|
||||||
|
let streamsBridge: ClientStreamsBridge | null = null;
|
||||||
|
|
||||||
|
if (options.outboundQueueUrl !== undefined) {
|
||||||
|
const outboundQueueUrl = options.outboundQueueUrl;
|
||||||
|
if (options.transferBaseUrl === undefined) {
|
||||||
|
throw new Error(
|
||||||
|
'createFilesHttpClient: outboundQueueUrl was supplied without transferBaseUrl. Pass `transferBaseUrl` (the server prefix that hosts /v1/transfer/...) so outbound chunks have a destination.',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (shade.configureTransfers === undefined) {
|
||||||
|
throw new Error(
|
||||||
|
'createFilesHttpClient: shade.configureTransfers is required for streamed mode (the underlying ShadeBridge must surface it).',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const transferBaseUrl = options.transferBaseUrl.replace(/\/$/, '');
|
||||||
|
shade.configureTransfers({
|
||||||
|
resolveBaseUrl: async (peer) => {
|
||||||
|
if (peer !== peerAddress) {
|
||||||
|
throw new Error(
|
||||||
|
`httpClient is bound to peer "${peerAddress}" — refusing to resolve outgoing chunks for "${peer}" without a multi-peer registry. Use shade.files.client(peer) for server-to-server multi-peer.`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return transferBaseUrl;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
// Build the streams-bridge eagerly. The engine's incoming-transfer
|
||||||
|
// subscription has to be in place BEFORE the drainer dispatches the
|
||||||
|
// first stream-init envelope, otherwise the engine emits the
|
||||||
|
// IncomingTransfer to zero handlers and the read silently never
|
||||||
|
// accepts. We kick off the drainer once the bridge has subscribed.
|
||||||
|
streamsBridgePromise = createClientStreamsBridge(shade).then((bridge) => {
|
||||||
|
streamsBridge = bridge;
|
||||||
|
drainer = startQueueDrainer(shade, {
|
||||||
|
outboundQueueUrl,
|
||||||
|
peerAddress,
|
||||||
|
senderAddress,
|
||||||
|
...(options.fetch !== undefined ? { fetch: options.fetch } : {}),
|
||||||
|
...(options.headers !== undefined ? { headers: options.headers } : {}),
|
||||||
|
...(options.queueBlockMs !== undefined ? { blockMs: options.queueBlockMs } : {}),
|
||||||
|
});
|
||||||
|
return bridge;
|
||||||
|
});
|
||||||
|
// Surface bridge-construction failures eagerly via a rejected
|
||||||
|
// promise the next read/write picks up.
|
||||||
|
streamsBridgePromise.catch(() => {
|
||||||
|
/* observed via getStreamsBridge() */
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async function getStreamsBridge(): Promise<ClientStreamsBridge> {
|
||||||
|
if (streamsBridge !== null) return streamsBridge;
|
||||||
|
if (streamsBridgePromise === null) {
|
||||||
|
throw new ConflictError(
|
||||||
|
`http RPC client supports inline writes/reads only (≤ ${INLINE_THRESHOLD} bytes) — pass { outboundQueueUrl, transferBaseUrl } to enable streamed transfers.`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
streamsBridge = await streamsBridgePromise;
|
||||||
|
return streamsBridge;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encrypt + POST + decrypt + parse one RPC round-trip.
|
* Encrypt + POST + decrypt + parse one RPC round-trip.
|
||||||
*
|
*
|
||||||
@@ -321,20 +433,39 @@ export function createFilesHttpClient(
|
|||||||
ReadResultSchema,
|
ReadResultSchema,
|
||||||
opts,
|
opts,
|
||||||
);
|
);
|
||||||
if (wire.kind !== 'inline') {
|
if (wire.kind === 'inline') {
|
||||||
// The HTTP RPC route does not service streamed reads — there is
|
const bytes = base64ToBytes(wire.bytesB64);
|
||||||
// no place to stream from in pure request-response.
|
const out: ReadOutput = {
|
||||||
|
kind: 'inline',
|
||||||
|
bytes,
|
||||||
|
size: wire.size,
|
||||||
|
sha256: wire.sha256,
|
||||||
|
...(wire.contentType !== undefined ? { contentType: wire.contentType } : {}),
|
||||||
|
};
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
// Streamed read — only supported when the queue drainer is wired.
|
||||||
|
if (drainer === null) {
|
||||||
throw new InternalFileError(
|
throw new InternalFileError(
|
||||||
`http RPC client received a streamed read (size ${wire.size}). Use shade.files.client(peer) on a server-to-server deployment, or pass { preferInline: true } when the file is known to fit inline.`,
|
`http RPC client received a streamed read (size ${wire.size}) but is in inline-only mode. Pass { outboundQueueUrl, transferBaseUrl } when constructing the client to enable streamed reads.`,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
const bytes = base64ToBytes(wire.bytesB64);
|
const bridge = await getStreamsBridge();
|
||||||
|
const bridgeSignal = opts.signal ?? new AbortController().signal;
|
||||||
|
const parked = await bridge.awaitRead(wire.streamId, {
|
||||||
|
expectedFrom: peerAddress,
|
||||||
|
signal: bridgeSignal,
|
||||||
|
timeoutMs: ioTimeoutMs,
|
||||||
|
});
|
||||||
const out: ReadOutput = {
|
const out: ReadOutput = {
|
||||||
kind: 'inline',
|
kind: 'streams',
|
||||||
bytes,
|
stream: parked.readable,
|
||||||
size: wire.size,
|
size: wire.size,
|
||||||
sha256: wire.sha256,
|
sha256: wire.sha256,
|
||||||
...(wire.contentType !== undefined ? { contentType: wire.contentType } : {}),
|
...(wire.contentType !== undefined ? { contentType: wire.contentType } : {}),
|
||||||
|
done: async () => {
|
||||||
|
await parked.done;
|
||||||
|
},
|
||||||
};
|
};
|
||||||
return out;
|
return out;
|
||||||
},
|
},
|
||||||
@@ -344,25 +475,77 @@ export function createFilesHttpClient(
|
|||||||
const overwrite = opts.overwrite ?? false;
|
const overwrite = opts.overwrite ?? false;
|
||||||
const contentType = opts.contentType ?? decision.contentType;
|
const contentType = opts.contentType ?? decision.contentType;
|
||||||
|
|
||||||
if (decision.kind !== 'inline') {
|
if (decision.kind === 'inline' || opts.forceInline === true) {
|
||||||
throw new ConflictError(
|
const bytes = decision.kind === 'inline' ? decision.bytes : null;
|
||||||
`http RPC client supports inline writes only (≤ ${INLINE_THRESHOLD} bytes). The supplied input was promoted to streams (size ${decision.size ?? 'unknown'}). Use shade.files.client(peer) for streamed writes, or pre-buffer the input below the inline threshold.`,
|
if (bytes === null) {
|
||||||
|
// forceInline === true with a streams-typed decision —
|
||||||
|
// decideInline always produced a `streams` shape because the
|
||||||
|
// input was a bare ReadableStream. We can't drain a stream
|
||||||
|
// synchronously here without a streams-bridge.
|
||||||
|
throw new ConflictError(
|
||||||
|
'http RPC client cannot forceInline a streamed input — pass a Uint8Array / Blob, or pre-buffer the stream.',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (bytes.byteLength > INLINE_THRESHOLD) {
|
||||||
|
throw new ConflictError(
|
||||||
|
`inline write exceeds ${INLINE_THRESHOLD}-byte threshold (got ${bytes.byteLength}); pass forceInline=true to override`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const args = WriteArgsSchema.parse({
|
||||||
|
kind: 'inline',
|
||||||
|
path,
|
||||||
|
bytesB64: bytesToBase64(bytes),
|
||||||
|
...(contentType !== undefined ? { contentType } : {}),
|
||||||
|
overwrite,
|
||||||
|
});
|
||||||
|
return await roundTrip<WriteResult>(
|
||||||
|
KIND_WRITE_V1,
|
||||||
|
'write',
|
||||||
|
args,
|
||||||
|
WriteResultSchema,
|
||||||
|
opts,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Streamed write — requires the queue drainer + streams-bridge.
|
||||||
|
if (drainer === null) {
|
||||||
|
throw new ConflictError(
|
||||||
|
`http RPC client supports inline writes only (≤ ${INLINE_THRESHOLD} bytes). The supplied input was promoted to streams (size ${decision.size ?? 'unknown'}). Pass { outboundQueueUrl, transferBaseUrl } to enable streamed writes.`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const bridge = await getStreamsBridge();
|
||||||
|
const size = decision.size;
|
||||||
|
if (size === undefined) {
|
||||||
|
throw new ConflictError(
|
||||||
|
'streams write requires a known plaintext size; pass `{ stream, size }` instead of a bare ReadableStream',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const { writeId, handle } = await bridge.initiateWrite({
|
||||||
|
peer: peerAddress,
|
||||||
|
stream: decision.stream,
|
||||||
|
size,
|
||||||
|
...(contentType !== undefined ? { contentType } : {}),
|
||||||
|
name: path,
|
||||||
|
...(opts.signal !== undefined ? { signal: opts.signal } : {}),
|
||||||
|
});
|
||||||
const args = WriteArgsSchema.parse({
|
const args = WriteArgsSchema.parse({
|
||||||
kind: 'inline',
|
kind: 'streams',
|
||||||
path,
|
path,
|
||||||
bytesB64: bytesToBase64(decision.bytes),
|
size,
|
||||||
...(contentType !== undefined ? { contentType } : {}),
|
...(contentType !== undefined ? { contentType } : {}),
|
||||||
overwrite,
|
overwrite,
|
||||||
|
writeId,
|
||||||
});
|
});
|
||||||
return await roundTrip<WriteResult>(
|
try {
|
||||||
KIND_WRITE_V1,
|
const [result] = await Promise.all([
|
||||||
'write',
|
roundTrip<WriteResult>(KIND_WRITE_V1, 'write', args, WriteResultSchema, opts),
|
||||||
args,
|
handle.done(),
|
||||||
WriteResultSchema,
|
]);
|
||||||
opts,
|
return result;
|
||||||
);
|
} catch (err) {
|
||||||
|
await handle.abort('rpc-failed').catch(() => undefined);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
async getThumbnail(path, size: ThumbnailSize, opts): Promise<ThumbnailResult> {
|
async getThumbnail(path, size: ThumbnailSize, opts): Promise<ThumbnailResult> {
|
||||||
@@ -393,7 +576,15 @@ export function createFilesHttpClient(
|
|||||||
},
|
},
|
||||||
|
|
||||||
close(): void {
|
close(): void {
|
||||||
// Stateless — nothing to release. Exists for FileClient symmetry.
|
// Stop the long-poll drainer + tear down the streams-bridge if
|
||||||
|
// we built one. Idempotent — safe to call multiple times.
|
||||||
|
drainer?.stop();
|
||||||
|
drainer = null;
|
||||||
|
if (streamsBridge !== null) {
|
||||||
|
void streamsBridge.destroy().catch(() => undefined);
|
||||||
|
streamsBridge = null;
|
||||||
|
}
|
||||||
|
streamsBridgePromise = null;
|
||||||
},
|
},
|
||||||
} as FileClient;
|
} as FileClient;
|
||||||
}
|
}
|
||||||
|
|||||||
172
packages/shade-files/src/client/queue-drainer.ts
Normal file
172
packages/shade-files/src/client/queue-drainer.ts
Normal file
@@ -0,0 +1,172 @@
|
|||||||
|
/**
|
||||||
|
* Browser-side drainer for the pull-mode outbound queue.
|
||||||
|
*
|
||||||
|
* Background task that long-polls the server's `/queue` endpoint,
|
||||||
|
* decodes each event, and dispatches it into the consumer's Shade
|
||||||
|
* instance via `shade.acceptTransferEnvelope`. Same wire-shape as the
|
||||||
|
* server-to-server case where the engine receives chunks via direct
|
||||||
|
* HTTP POSTs — we just flip the direction so the browser pulls
|
||||||
|
* instead of accepts.
|
||||||
|
*/
|
||||||
|
import type { ShadeBridge } from '../integration/shade-bridge.js';
|
||||||
|
|
||||||
|
export interface QueueDrainerOptions {
|
||||||
|
/**
|
||||||
|
* Server endpoint that hosts `transferQueueRoute()`. Typically:
|
||||||
|
* `https://server.example.com/api/v1/shade-files/queue`.
|
||||||
|
*/
|
||||||
|
outboundQueueUrl: string;
|
||||||
|
/** Peer the queue is pulled FROM (the server's address). */
|
||||||
|
peerAddress: string;
|
||||||
|
/** Address we identify ourselves with on the long-poll. */
|
||||||
|
senderAddress: string;
|
||||||
|
/** Optional `fetch` override. Default `globalThis.fetch`. */
|
||||||
|
fetch?: typeof globalThis.fetch;
|
||||||
|
/** Extra headers applied to every poll request. */
|
||||||
|
headers?: Record<string, string>;
|
||||||
|
/**
|
||||||
|
* Long-poll request timeout (server-side block). Default 30_000.
|
||||||
|
* Server clamps to its own `maxBlockMs` (default 55_000).
|
||||||
|
*/
|
||||||
|
blockMs?: number;
|
||||||
|
/**
|
||||||
|
* Backoff after a network error before re-polling. Default 2_000.
|
||||||
|
* Doubles up to `maxBackoffMs` on consecutive failures.
|
||||||
|
*/
|
||||||
|
initialBackoffMs?: number;
|
||||||
|
maxBackoffMs?: number;
|
||||||
|
/**
|
||||||
|
* Called when a poll cycle fails. Defaults to logging via `console.error`.
|
||||||
|
* Throwing from this hook does NOT stop the drainer — it backs off
|
||||||
|
* and retries.
|
||||||
|
*/
|
||||||
|
onError?: (err: unknown) => void;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface QueueDrainerHandle {
|
||||||
|
/** Stop the drainer. Pending fetch is aborted; the loop exits. */
|
||||||
|
stop(): void;
|
||||||
|
/** Promise that resolves once the drainer has fully stopped. */
|
||||||
|
stopped: Promise<void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface PolledEvent {
|
||||||
|
id: number;
|
||||||
|
timestampMs: number;
|
||||||
|
kind: 'envelope' | 'chunk';
|
||||||
|
bytesB64: string;
|
||||||
|
meta?: { streamId: string; laneId: number; seq: number };
|
||||||
|
}
|
||||||
|
|
||||||
|
interface PollResponseBody {
|
||||||
|
events: PolledEvent[];
|
||||||
|
nextSince: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
const DEFAULT_BLOCK_MS = 30_000;
|
||||||
|
const DEFAULT_INITIAL_BACKOFF_MS = 2_000;
|
||||||
|
const DEFAULT_MAX_BACKOFF_MS = 30_000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start a long-poll loop that drains queued envelopes + chunks from
|
||||||
|
* the server and dispatches them into the local Shade instance.
|
||||||
|
*
|
||||||
|
* Returns a handle the caller can use to stop the drainer when the
|
||||||
|
* `httpClient` is closed (e.g. on tab unload).
|
||||||
|
*/
|
||||||
|
export function startQueueDrainer(
|
||||||
|
shade: ShadeBridge,
|
||||||
|
options: QueueDrainerOptions,
|
||||||
|
): QueueDrainerHandle {
|
||||||
|
if (shade.acceptTransferEnvelope === undefined) {
|
||||||
|
throw new Error(
|
||||||
|
'startQueueDrainer: shade.acceptTransferEnvelope is required for pull-mode streams. The supplied ShadeBridge implementation must surface it.',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const accept = shade.acceptTransferEnvelope.bind(shade);
|
||||||
|
const fetchFn = options.fetch ?? globalThis.fetch.bind(globalThis);
|
||||||
|
const blockMs = options.blockMs ?? DEFAULT_BLOCK_MS;
|
||||||
|
const onError = options.onError ?? ((err: unknown) => console.error('[shade.files queue-drainer]', err));
|
||||||
|
const initialBackoffMs = options.initialBackoffMs ?? DEFAULT_INITIAL_BACKOFF_MS;
|
||||||
|
const maxBackoffMs = options.maxBackoffMs ?? DEFAULT_MAX_BACKOFF_MS;
|
||||||
|
const ac = new AbortController();
|
||||||
|
let stopped = false;
|
||||||
|
let resolveStopped!: () => void;
|
||||||
|
const stoppedPromise = new Promise<void>((r) => {
|
||||||
|
resolveStopped = r;
|
||||||
|
});
|
||||||
|
|
||||||
|
void (async () => {
|
||||||
|
let since = 0;
|
||||||
|
let backoff = initialBackoffMs;
|
||||||
|
while (!stopped) {
|
||||||
|
try {
|
||||||
|
const response = await fetchFn(options.outboundQueueUrl, {
|
||||||
|
method: 'POST',
|
||||||
|
signal: ac.signal,
|
||||||
|
headers: {
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'X-Shade-Sender-Address': options.senderAddress,
|
||||||
|
...(options.headers ?? {}),
|
||||||
|
},
|
||||||
|
body: JSON.stringify({ since, blockMs }),
|
||||||
|
});
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`queue poll → ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
const body = (await response.json()) as PollResponseBody;
|
||||||
|
if (Array.isArray(body.events) && body.events.length > 0) {
|
||||||
|
for (const event of body.events) {
|
||||||
|
if (stopped) break;
|
||||||
|
try {
|
||||||
|
const bytes = base64ToBytes(event.bytesB64);
|
||||||
|
await accept(options.peerAddress, bytes);
|
||||||
|
} catch (err) {
|
||||||
|
// Per-event dispatch failure should not kill the loop —
|
||||||
|
// resume picks up missing chunks via @shade/transfer's
|
||||||
|
// built-in lane-resume protocol.
|
||||||
|
onError(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
since = typeof body.nextSince === 'number' ? body.nextSince : since;
|
||||||
|
backoff = initialBackoffMs;
|
||||||
|
} catch (err) {
|
||||||
|
if (stopped || ac.signal.aborted) break;
|
||||||
|
onError(err);
|
||||||
|
// Exponential backoff with jitter — caps at maxBackoffMs.
|
||||||
|
const jitter = Math.floor(Math.random() * Math.min(backoff, 1_000));
|
||||||
|
await new Promise<void>((r) => {
|
||||||
|
const t = setTimeout(r, backoff + jitter);
|
||||||
|
(t as unknown as { unref?: () => void }).unref?.();
|
||||||
|
ac.signal.addEventListener(
|
||||||
|
'abort',
|
||||||
|
() => {
|
||||||
|
clearTimeout(t);
|
||||||
|
r();
|
||||||
|
},
|
||||||
|
{ once: true },
|
||||||
|
);
|
||||||
|
});
|
||||||
|
backoff = Math.min(maxBackoffMs, backoff * 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resolveStopped();
|
||||||
|
})();
|
||||||
|
|
||||||
|
return {
|
||||||
|
stop(): void {
|
||||||
|
if (stopped) return;
|
||||||
|
stopped = true;
|
||||||
|
ac.abort(new Error('queue drainer stopped'));
|
||||||
|
},
|
||||||
|
stopped: stoppedPromise,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function base64ToBytes(b64: string): Uint8Array {
|
||||||
|
const bin = atob(b64);
|
||||||
|
const out = new Uint8Array(bin.length);
|
||||||
|
for (let i = 0; i < bin.length; i++) out[i] = bin.charCodeAt(i);
|
||||||
|
return out;
|
||||||
|
}
|
||||||
@@ -102,7 +102,16 @@ export async function createClientStreamsBridge(
|
|||||||
const readStreamId = incoming.metadata.userMetadata?.[META_KEY_READ_STREAM_ID];
|
const readStreamId = incoming.metadata.userMetadata?.[META_KEY_READ_STREAM_ID];
|
||||||
if (readStreamId === undefined) return;
|
if (readStreamId === undefined) return;
|
||||||
|
|
||||||
const ts = new TransformStream<Uint8Array, Uint8Array>();
|
// Generous HWM so the receiver-side write loop (drainer →
|
||||||
|
// engine.receiveChunk → sink.write) doesn't stall on backpressure
|
||||||
|
// before the consumer's reader is wired up. The reader still
|
||||||
|
// applies its own backpressure once it's consuming, but we no
|
||||||
|
// longer race fs.read's await on stream-init against the consumer
|
||||||
|
// attaching its reader.
|
||||||
|
const ts = new TransformStream<Uint8Array, Uint8Array>(undefined, undefined, {
|
||||||
|
highWaterMark: 64,
|
||||||
|
size: (chunk?: Uint8Array) => (chunk === undefined ? 0 : 1),
|
||||||
|
});
|
||||||
let handle: TransferHandle;
|
let handle: TransferHandle;
|
||||||
try {
|
try {
|
||||||
handle = await incoming.accept({
|
handle = await incoming.accept({
|
||||||
|
|||||||
@@ -190,6 +190,11 @@ export { createFilesRpcRoute } from './server/rpc-route.js';
|
|||||||
export type { FilesRpcRouteOptions } from './server/rpc-route.js';
|
export type { FilesRpcRouteOptions } from './server/rpc-route.js';
|
||||||
export { createFilesHttpClient } from './client/http-client.js';
|
export { createFilesHttpClient } from './client/http-client.js';
|
||||||
export type { FilesHttpClientOptions } from './client/http-client.js';
|
export type { FilesHttpClientOptions } from './client/http-client.js';
|
||||||
|
export { startQueueDrainer } from './client/queue-drainer.js';
|
||||||
|
export type {
|
||||||
|
QueueDrainerHandle,
|
||||||
|
QueueDrainerOptions,
|
||||||
|
} from './client/queue-drainer.js';
|
||||||
|
|
||||||
// Shared structural surface @shade/files needs from a Shade instance —
|
// Shared structural surface @shade/files needs from a Shade instance —
|
||||||
// exposed so consumers building custom Shade-shaped bridges can verify
|
// exposed so consumers building custom Shade-shaped bridges can verify
|
||||||
|
|||||||
@@ -70,4 +70,25 @@ export interface ShadeBridge {
|
|||||||
|
|
||||||
/** Optional control-envelope passthrough used by the WebRTC bridge. */
|
/** Optional control-envelope passthrough used by the WebRTC bridge. */
|
||||||
deliverControlEnvelope?(peer: string, envelope: ShadeEnvelope): Promise<void>;
|
deliverControlEnvelope?(peer: string, envelope: ShadeEnvelope): Promise<void>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hand a freshly-decoded wire envelope (control or chunk) to the
|
||||||
|
* transfer engine. Required by the pull-mode HTTP client when it
|
||||||
|
* drains queued events from the server: each polled chunk / control
|
||||||
|
* envelope is dispatched here so the engine sees it just as if it
|
||||||
|
* had arrived via an HTTP POST on `/v1/transfer/...`.
|
||||||
|
*/
|
||||||
|
acceptTransferEnvelope?(from: string, env: ShadeEnvelope | Uint8Array): Promise<void>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure the transfer stack. Called by the pull-mode HTTP client
|
||||||
|
* to point the browser's outgoing chunks + control envelopes at the
|
||||||
|
* server's transferQueueRoute mount. Optional because the
|
||||||
|
* server-to-server path uses a separate, app-driven configuration.
|
||||||
|
*/
|
||||||
|
configureTransfers?(opts: {
|
||||||
|
resolveBaseUrl?: (peerAddress: string) => Promise<string>;
|
||||||
|
transport?: unknown;
|
||||||
|
envelopeTransport?: unknown;
|
||||||
|
}): void;
|
||||||
}
|
}
|
||||||
|
|||||||
208
packages/shade-files/tests/integration/http-rpc-streams.test.ts
Normal file
208
packages/shade-files/tests/integration/http-rpc-streams.test.ts
Normal file
@@ -0,0 +1,208 @@
|
|||||||
|
import { describe, expect, test } from 'bun:test';
|
||||||
|
import { createShade } from '@shade/sdk';
|
||||||
|
import {
|
||||||
|
createPrekeyServer,
|
||||||
|
MemoryPrekeyStore,
|
||||||
|
PrekeyServerEvents,
|
||||||
|
} from '@shade/server';
|
||||||
|
import { SubtleCryptoProvider } from '@shade/crypto-web';
|
||||||
|
import { Hono } from 'hono';
|
||||||
|
|
||||||
|
const crypto = new SubtleCryptoProvider();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stand up the full pull-mode rig:
|
||||||
|
* - Prekey server (for X3DH)
|
||||||
|
* - Bob: file handler + rpcRoute + transferQueueRoute, all on one server
|
||||||
|
* - Alice: httpClient with outboundQueueUrl + transferBaseUrl wired
|
||||||
|
*
|
||||||
|
* Returns Alice's `FileClient`, which speaks browser-style: ONE base URL,
|
||||||
|
* no inbound listener, streams supported via long-poll.
|
||||||
|
*/
|
||||||
|
async function setupPullRig(opts: {
|
||||||
|
bobHandler: Parameters<NonNullable<Awaited<ReturnType<typeof createShade>>['files']>['serve']>[0];
|
||||||
|
}) {
|
||||||
|
const prekey = createPrekeyServer({
|
||||||
|
crypto,
|
||||||
|
store: new MemoryPrekeyStore(),
|
||||||
|
disableRateLimit: true,
|
||||||
|
events: new PrekeyServerEvents(),
|
||||||
|
});
|
||||||
|
const prekeyServer = Bun.serve({ port: 0, fetch: prekey.fetch });
|
||||||
|
const prekeyUrl = `http://localhost:${prekeyServer.port}`;
|
||||||
|
|
||||||
|
const alice = await createShade({ prekeyServer: prekeyUrl, address: 'alice' });
|
||||||
|
const bob = await createShade({ prekeyServer: prekeyUrl, address: 'bob' });
|
||||||
|
|
||||||
|
// Bob: queue-route FIRST (configures bob's transports), then files.serve.
|
||||||
|
const queueRoute = await bob.transferQueueRoute({ blockMs: 1_500 });
|
||||||
|
await bob.files.serve(opts.bobHandler);
|
||||||
|
const rpcRoute = bob.files.rpcRoute({ acceptFirstMessage: true });
|
||||||
|
|
||||||
|
const app = new Hono();
|
||||||
|
app.route('/', queueRoute);
|
||||||
|
app.route('/', rpcRoute);
|
||||||
|
|
||||||
|
const bobServer = Bun.serve({ port: 0, fetch: app.fetch });
|
||||||
|
const baseUrl = `http://localhost:${bobServer.port}`;
|
||||||
|
|
||||||
|
const fs = alice.files.httpClient('bob', {
|
||||||
|
rpcUrl: `${baseUrl}/rpc`,
|
||||||
|
outboundQueueUrl: `${baseUrl}/queue`,
|
||||||
|
transferBaseUrl: baseUrl,
|
||||||
|
defaultTimeoutMs: 10_000,
|
||||||
|
queueBlockMs: 1_000,
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
alice,
|
||||||
|
bob,
|
||||||
|
fs,
|
||||||
|
baseUrl,
|
||||||
|
teardown: async () => {
|
||||||
|
fs.close();
|
||||||
|
await alice.shutdown();
|
||||||
|
await bob.shutdown();
|
||||||
|
bobServer.stop();
|
||||||
|
prekeyServer.stop();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('@shade/files HTTP RPC — pull-mode streams', () => {
|
||||||
|
test('streamed read (4 MiB) via long-poll queue', async () => {
|
||||||
|
const payload = new Uint8Array(4 * 1024 * 1024);
|
||||||
|
for (let i = 0; i < payload.length; i++) payload[i] = (i * 97) & 0xff;
|
||||||
|
|
||||||
|
const rig = await setupPullRig({
|
||||||
|
bobHandler: {
|
||||||
|
read: async () => {
|
||||||
|
// Return the payload as a streamed read so the rpc-handler
|
||||||
|
// promotes it via the streams-bridge into a transfer.
|
||||||
|
const stream = new ReadableStream<Uint8Array>({
|
||||||
|
start(controller) {
|
||||||
|
const CHUNK = 256 * 1024;
|
||||||
|
for (let off = 0; off < payload.byteLength; off += CHUNK) {
|
||||||
|
controller.enqueue(payload.slice(off, Math.min(off + CHUNK, payload.byteLength)));
|
||||||
|
}
|
||||||
|
controller.close();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
// Need a precomputed sha256 for streamed reads. Use the
|
||||||
|
// crypto provider's sha256 directly.
|
||||||
|
const digest = new Uint8Array(await globalThis.crypto.subtle.digest('SHA-256', payload));
|
||||||
|
const sha256Hex = Array.from(digest, (b) => b.toString(16).padStart(2, '0')).join('');
|
||||||
|
return {
|
||||||
|
kind: 'streams' as const,
|
||||||
|
stream,
|
||||||
|
size: payload.byteLength,
|
||||||
|
sha256: sha256Hex,
|
||||||
|
contentType: 'application/octet-stream',
|
||||||
|
};
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
const result = await rig.fs.read('/big.bin');
|
||||||
|
expect(result.kind).toBe('streams');
|
||||||
|
if (result.kind !== 'streams') return;
|
||||||
|
|
||||||
|
// Drain the stream and compare.
|
||||||
|
const reader = result.stream.getReader();
|
||||||
|
const got = new Uint8Array(payload.byteLength);
|
||||||
|
let offset = 0;
|
||||||
|
while (true) {
|
||||||
|
const { value, done } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
if (value !== undefined) {
|
||||||
|
got.set(value, offset);
|
||||||
|
offset += value.byteLength;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reader.releaseLock();
|
||||||
|
await result.done();
|
||||||
|
|
||||||
|
expect(offset).toBe(payload.byteLength);
|
||||||
|
// Compare in 64KiB strides for speed.
|
||||||
|
let mismatch = -1;
|
||||||
|
for (let i = 0; i < payload.byteLength; i++) {
|
||||||
|
if (got[i] !== payload[i]) {
|
||||||
|
mismatch = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expect(mismatch).toBe(-1);
|
||||||
|
} finally {
|
||||||
|
await rig.teardown();
|
||||||
|
}
|
||||||
|
}, 30_000);
|
||||||
|
|
||||||
|
test('streamed read fails with clear error when outboundQueueUrl is omitted', async () => {
|
||||||
|
const rig = await setupPullRig({
|
||||||
|
bobHandler: {
|
||||||
|
read: async () => {
|
||||||
|
const stream = new ReadableStream<Uint8Array>({
|
||||||
|
start(c) {
|
||||||
|
c.enqueue(new Uint8Array(512 * 1024));
|
||||||
|
c.close();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const digest = new Uint8Array(await globalThis.crypto.subtle.digest('SHA-256', new Uint8Array(512 * 1024)));
|
||||||
|
const sha256Hex = Array.from(digest, (b) => b.toString(16).padStart(2, '0')).join('');
|
||||||
|
return {
|
||||||
|
kind: 'streams' as const,
|
||||||
|
stream,
|
||||||
|
size: 512 * 1024,
|
||||||
|
sha256: sha256Hex,
|
||||||
|
};
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
// Tear down the rig's drainer so we can construct an inline-only client
|
||||||
|
rig.fs.close();
|
||||||
|
|
||||||
|
const inlineOnly = rig.alice.files.httpClient('bob', {
|
||||||
|
rpcUrl: `${rig.baseUrl}/rpc`,
|
||||||
|
defaultTimeoutMs: 10_000,
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
await expect(inlineOnly.read('/big.bin')).rejects.toThrow(/streamed read/);
|
||||||
|
} finally {
|
||||||
|
inlineOnly.close();
|
||||||
|
await rig.teardown();
|
||||||
|
}
|
||||||
|
}, 15_000);
|
||||||
|
|
||||||
|
test('long-poll returns empty events on idle timeout', async () => {
|
||||||
|
const rig = await setupPullRig({
|
||||||
|
bobHandler: {
|
||||||
|
stat: async () => ({
|
||||||
|
name: '_',
|
||||||
|
kind: 'dir' as const,
|
||||||
|
size: 0,
|
||||||
|
mtime: 0,
|
||||||
|
metadata: {},
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
// Direct poll without any pending events — should return after blockMs.
|
||||||
|
const start = Date.now();
|
||||||
|
const res = await fetch(`${rig.baseUrl}/queue`, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: {
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'X-Shade-Sender-Address': 'alice',
|
||||||
|
},
|
||||||
|
body: JSON.stringify({ since: 0, blockMs: 500 }),
|
||||||
|
});
|
||||||
|
expect(res.status).toBe(200);
|
||||||
|
const body = (await res.json()) as { events: unknown[]; nextSince: number };
|
||||||
|
expect(body.events).toHaveLength(0);
|
||||||
|
expect(Date.now() - start).toBeGreaterThanOrEqual(400);
|
||||||
|
} finally {
|
||||||
|
await rig.teardown();
|
||||||
|
}
|
||||||
|
}, 10_000);
|
||||||
|
});
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/inbox-server",
|
"name": "@shade/inbox-server",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/inbox",
|
"name": "@shade/inbox",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/key-transparency",
|
"name": "@shade/key-transparency",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/keychain",
|
"name": "@shade/keychain",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/observability",
|
"name": "@shade/observability",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/observer",
|
"name": "@shade/observer",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/proto",
|
"name": "@shade/proto",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/recovery",
|
"name": "@shade/recovery",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/sdk",
|
"name": "@shade/sdk",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -31,6 +31,8 @@ import {
|
|||||||
type TransferHandle,
|
type TransferHandle,
|
||||||
type TransferOptions,
|
type TransferOptions,
|
||||||
type TransferSummary,
|
type TransferSummary,
|
||||||
|
type OutboundQueue as OutboundQueueLike,
|
||||||
|
type QueuedEventInput,
|
||||||
} from '@shade/transfer';
|
} from '@shade/transfer';
|
||||||
import type { Hono } from 'hono';
|
import type { Hono } from 'hono';
|
||||||
import { BackgroundTasks } from './background.js';
|
import { BackgroundTasks } from './background.js';
|
||||||
@@ -151,6 +153,8 @@ export class Shade {
|
|||||||
private controlChannel: ShadeControlChannel | null = null;
|
private controlChannel: ShadeControlChannel | null = null;
|
||||||
private peerBaseUrlResolver: ((peerAddress: string) => Promise<string>) | null = null;
|
private peerBaseUrlResolver: ((peerAddress: string) => Promise<string>) | null = null;
|
||||||
private envelopeOutboxes: ControlEnvelopeTransport | null = null;
|
private envelopeOutboxes: ControlEnvelopeTransport | null = null;
|
||||||
|
private transferTransportOverride: ITransferTransport | null = null;
|
||||||
|
private transferQueue: OutboundQueueLike | null = null;
|
||||||
|
|
||||||
// `@shade/files` namespace, lazy + memoized.
|
// `@shade/files` namespace, lazy + memoized.
|
||||||
private filesNamespace: FilesNamespace | null = null;
|
private filesNamespace: FilesNamespace | null = null;
|
||||||
@@ -746,12 +750,52 @@ export class Shade {
|
|||||||
* HTTP POSTs to `<base>/v1/transfer/control`).
|
* HTTP POSTs to `<base>/v1/transfer/control`).
|
||||||
*/
|
*/
|
||||||
configureTransfers(opts: {
|
configureTransfers(opts: {
|
||||||
resolveBaseUrl: (peerAddress: string) => Promise<string>;
|
/**
|
||||||
|
* Resolver for the peer's HTTP base URL (used by the default
|
||||||
|
* `ShadeTransferHttpTransport` to POST chunks). Optional when a
|
||||||
|
* custom `transport` and `envelopeTransport` are supplied — e.g.
|
||||||
|
* for pull-mode browser servers (`@shade/files transferQueueRoute`)
|
||||||
|
* which never POST chunks anywhere.
|
||||||
|
*/
|
||||||
|
resolveBaseUrl?: (peerAddress: string) => Promise<string>;
|
||||||
|
/**
|
||||||
|
* Override the chunk-level transport. Defaults to
|
||||||
|
* `ShadeTransferHttpTransport` (HTTP POSTs per chunk) when
|
||||||
|
* `resolveBaseUrl` is supplied. Required when `resolveBaseUrl`
|
||||||
|
* is omitted.
|
||||||
|
*/
|
||||||
|
transport?: ITransferTransport;
|
||||||
|
/**
|
||||||
|
* Override the control-envelope transport. Defaults to HTTP POSTs
|
||||||
|
* to `<base>/v1/transfer/control` when `resolveBaseUrl` is
|
||||||
|
* supplied. Required when `resolveBaseUrl` is omitted.
|
||||||
|
*/
|
||||||
envelopeTransport?: ControlEnvelopeTransport;
|
envelopeTransport?: ControlEnvelopeTransport;
|
||||||
}): void {
|
}): void {
|
||||||
this.peerBaseUrlResolver = opts.resolveBaseUrl;
|
if (opts.resolveBaseUrl === undefined) {
|
||||||
this.envelopeOutboxes =
|
if (opts.transport === undefined || opts.envelopeTransport === undefined) {
|
||||||
opts.envelopeTransport ?? new HttpEnvelopeTransport(opts.resolveBaseUrl, this.address);
|
throw new Error(
|
||||||
|
'configureTransfers: resolveBaseUrl is required unless both `transport` and `envelopeTransport` are supplied (e.g. for pull-mode queue servers).',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
this.peerBaseUrlResolver = async () => {
|
||||||
|
throw new Error(
|
||||||
|
'resolveBaseUrl was not configured — this Shade is in queue/pull mode and does not POST chunks. Configure a custom transport instead.',
|
||||||
|
);
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
this.peerBaseUrlResolver = opts.resolveBaseUrl;
|
||||||
|
}
|
||||||
|
this.transferTransportOverride = opts.transport ?? null;
|
||||||
|
if (opts.envelopeTransport !== undefined) {
|
||||||
|
this.envelopeOutboxes = opts.envelopeTransport;
|
||||||
|
} else if (opts.resolveBaseUrl !== undefined) {
|
||||||
|
this.envelopeOutboxes = new HttpEnvelopeTransport(opts.resolveBaseUrl, this.address);
|
||||||
|
} else {
|
||||||
|
throw new Error(
|
||||||
|
'configureTransfers: envelopeTransport is required when resolveBaseUrl is omitted.',
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -898,6 +942,109 @@ export class Shade {
|
|||||||
return (await this.engine()).onIncomingTransfer(handler);
|
return (await this.engine()).onIncomingTransfer(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mount the **pull-mode** transfer routes on a Hono app. Mount under
|
||||||
|
* any base path: `app.route('/api/v1/shade-files', shade.transferQueueRoute())`.
|
||||||
|
*
|
||||||
|
* Configures this Shade instance to queue all outbound chunks +
|
||||||
|
* control envelopes per peer instead of POSTing them. Browser-style
|
||||||
|
* receivers drain the queue via long-polling — no inbound HTTP
|
||||||
|
* listener required on the receiver.
|
||||||
|
*
|
||||||
|
* Routes mounted (relative to the base path):
|
||||||
|
* POST /queue — long-poll the per-peer outbound queue
|
||||||
|
* POST /v1/transfer/:streamId/chunk — receive incoming chunks (browser → server)
|
||||||
|
* GET /v1/transfer/:streamId/state — resume-state lookup
|
||||||
|
* POST /v1/transfer/control — receive incoming control envelopes
|
||||||
|
* GET /v1/transfer/health — peer reachability probe
|
||||||
|
*
|
||||||
|
* **Idempotent**: calling twice returns a fresh `Hono` app each
|
||||||
|
* time but reuses the underlying queue + transport (so the engine
|
||||||
|
* stays single).
|
||||||
|
*
|
||||||
|
* **Ordering**: must be called **before** `shade.files.serve(...)`
|
||||||
|
* (or any other path that builds the engine), because configuring
|
||||||
|
* the queue transport mutates the transfer stack. Calling after the
|
||||||
|
* engine is built throws.
|
||||||
|
*/
|
||||||
|
async transferQueueRoute(opts: TransferQueueRouteOptions = {}): Promise<Hono> {
|
||||||
|
if (this.transferEngine !== null && this.transferTransportOverride === null) {
|
||||||
|
throw new Error(
|
||||||
|
'transferQueueRoute(): the transfer engine has already been built with the default HTTP transport. Call transferQueueRoute() before any upload()/onIncomingTransfer()/configureTransfers().',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const { OutboundQueue, QueueTransferTransport } = await import('@shade/transfer');
|
||||||
|
if (this.transferQueue === null) {
|
||||||
|
this.transferQueue = new OutboundQueue({
|
||||||
|
...(opts.maxEventsPerPeer !== undefined ? { maxEventsPerPeer: opts.maxEventsPerPeer } : {}),
|
||||||
|
...(opts.idleEvictionMs !== undefined ? { idleEvictionMs: opts.idleEvictionMs } : {}),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if (this.transferTransportOverride === null) {
|
||||||
|
const queueTransport = new QueueTransferTransport(this.transferQueue);
|
||||||
|
const queueEnvelopeTransport = new QueueEnvelopeTransport(this.transferQueue);
|
||||||
|
this.configureTransfers({
|
||||||
|
transport: queueTransport,
|
||||||
|
envelopeTransport: queueEnvelopeTransport,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
const queue = this.transferQueue;
|
||||||
|
const blockMs = opts.blockMs ?? 30_000;
|
||||||
|
const maxBlockMs = opts.maxBlockMs ?? 55_000;
|
||||||
|
const engine = await this.engine();
|
||||||
|
const { createTransferRoutes, PermissiveAuthenticator } = await import('@shade/transfer');
|
||||||
|
const app = await createTransferRoutes(engine, {
|
||||||
|
authenticator: PermissiveAuthenticator,
|
||||||
|
});
|
||||||
|
app.post('/v1/transfer/control', async (c) => {
|
||||||
|
const senderAddress = c.req.header('X-Shade-Sender-Address');
|
||||||
|
if (senderAddress === undefined || senderAddress === '') {
|
||||||
|
return c.json({ error: 'missing X-Shade-Sender-Address' }, 400);
|
||||||
|
}
|
||||||
|
const ab = await c.req.arrayBuffer();
|
||||||
|
const bytes = new Uint8Array(ab);
|
||||||
|
try {
|
||||||
|
await this.acceptTransferEnvelope(senderAddress, bytes);
|
||||||
|
} catch (err) {
|
||||||
|
return c.json({ error: (err as Error).message }, 400);
|
||||||
|
}
|
||||||
|
return c.json({ ok: true });
|
||||||
|
});
|
||||||
|
// Long-poll endpoint.
|
||||||
|
app.post('/queue', async (c) => {
|
||||||
|
const senderAddress = c.req.header('X-Shade-Sender-Address');
|
||||||
|
if (senderAddress === undefined || senderAddress === '') {
|
||||||
|
return c.json({ error: 'missing X-Shade-Sender-Address' }, 400);
|
||||||
|
}
|
||||||
|
let body: { since?: unknown; blockMs?: unknown };
|
||||||
|
try {
|
||||||
|
body = (await c.req.json()) as { since?: unknown; blockMs?: unknown };
|
||||||
|
} catch {
|
||||||
|
return c.json({ error: 'invalid JSON body' }, 400);
|
||||||
|
}
|
||||||
|
const since = typeof body.since === 'number' && Number.isFinite(body.since) ? body.since : 0;
|
||||||
|
const requestedBlockMs =
|
||||||
|
typeof body.blockMs === 'number' && Number.isFinite(body.blockMs)
|
||||||
|
? Math.max(0, Math.min(maxBlockMs, body.blockMs))
|
||||||
|
: blockMs;
|
||||||
|
// Bun-side short-circuit if the request was aborted while we
|
||||||
|
// were holding the long-poll. AbortSignal from the request body
|
||||||
|
// is already surfaced via `c.req.raw.signal` in Hono.
|
||||||
|
const events = await queue.drain(senderAddress, since, requestedBlockMs, c.req.raw.signal);
|
||||||
|
return c.json({
|
||||||
|
events: events.map((e) => ({
|
||||||
|
id: e.id,
|
||||||
|
timestampMs: e.timestampMs,
|
||||||
|
kind: e.kind,
|
||||||
|
bytesB64: bytesToBase64Std(e.bytes),
|
||||||
|
...(e.kind === 'chunk' ? { meta: e.meta } : {}),
|
||||||
|
})),
|
||||||
|
nextSince: events.length > 0 ? events[events.length - 1]!.id : since,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return app;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mount the receiver-side HTTP routes on a Hono app. Mount under any
|
* Mount the receiver-side HTTP routes on a Hono app. Mount under any
|
||||||
* base path: `app.route('/shade', await shade.transferRoute())`.
|
* base path: `app.route('/shade', await shade.transferRoute())`.
|
||||||
@@ -1019,16 +1166,23 @@ export class Shade {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
this.controlChannel = new ShadeControlChannel(this, this.envelopeOutboxes);
|
this.controlChannel = new ShadeControlChannel(this, this.envelopeOutboxes);
|
||||||
const httpTransport: ITransferTransport = new ShadeTransferHttpTransport({
|
let transport: ITransferTransport;
|
||||||
resolveBaseUrl: this.peerBaseUrlResolver,
|
|
||||||
authenticator: await this.makeAuthenticator(),
|
|
||||||
});
|
|
||||||
|
|
||||||
let transport: ITransferTransport = httpTransport;
|
|
||||||
let webrtcRuntime: ShadeWebRtcRuntime | null = null;
|
let webrtcRuntime: ShadeWebRtcRuntime | null = null;
|
||||||
if (this.webrtcConfig !== null) {
|
if (this.transferTransportOverride !== null) {
|
||||||
webrtcRuntime = await this.buildWebRtcRuntime(this.webrtcConfig, httpTransport);
|
// Custom transport (queue, in-memory, custom adapter) — used as-is.
|
||||||
transport = webrtcRuntime.fallback;
|
// WebRTC fallback only attaches when the default HTTP transport is
|
||||||
|
// active because WebRTC's `MultiTransportFallback` is HTTP-shaped.
|
||||||
|
transport = this.transferTransportOverride;
|
||||||
|
} else {
|
||||||
|
const httpTransport: ITransferTransport = new ShadeTransferHttpTransport({
|
||||||
|
resolveBaseUrl: this.peerBaseUrlResolver,
|
||||||
|
authenticator: await this.makeAuthenticator(),
|
||||||
|
});
|
||||||
|
transport = httpTransport;
|
||||||
|
if (this.webrtcConfig !== null) {
|
||||||
|
webrtcRuntime = await this.buildWebRtcRuntime(this.webrtcConfig, httpTransport);
|
||||||
|
transport = webrtcRuntime.fallback;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.transferEngine = new TransferEngine({
|
this.transferEngine = new TransferEngine({
|
||||||
@@ -1256,6 +1410,53 @@ function parseChunkHeader(bytes: Uint8Array): {
|
|||||||
return { streamId, laneId, seq };
|
return { streamId, laneId, seq };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ─── Queue-mode (pull) envelope transport ─────────────────────
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration for {@link Shade.transferQueueRoute}. All fields are
|
||||||
|
* optional with sensible production defaults.
|
||||||
|
*/
|
||||||
|
export interface TransferQueueRouteOptions {
|
||||||
|
/**
|
||||||
|
* Long-poll timeout in milliseconds. Server holds the request open
|
||||||
|
* up to this long before returning an empty `events` array. Default
|
||||||
|
* 30_000.
|
||||||
|
*/
|
||||||
|
blockMs?: number;
|
||||||
|
/**
|
||||||
|
* Hard cap on long-poll timeout (clamps client-supplied `blockMs`).
|
||||||
|
* Default 55_000 — under typical reverse-proxy idle thresholds (60s
|
||||||
|
* on most CDNs).
|
||||||
|
*/
|
||||||
|
maxBlockMs?: number;
|
||||||
|
/**
|
||||||
|
* Per-peer ring-buffer size. When the queue is full, oldest events
|
||||||
|
* are dropped on enqueue. Receivers detect the gap via missing
|
||||||
|
* sequence numbers and re-resume from `since=0`. Default 1000.
|
||||||
|
*/
|
||||||
|
maxEventsPerPeer?: number;
|
||||||
|
/**
|
||||||
|
* Drop a peer's queue + reject pending pollers after this much
|
||||||
|
* silence. Default 10 minutes. Setting to `0` disables idle-eviction.
|
||||||
|
*/
|
||||||
|
idleEvictionMs?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* `ControlEnvelopeTransport` that enqueues outbound envelopes into an
|
||||||
|
* `OutboundQueue` for browser-style receivers to long-poll. Mirrors
|
||||||
|
* `HttpEnvelopeTransport` shape (one `send(peer, envelope)` method);
|
||||||
|
* the difference is the destination — local queue, not remote HTTP.
|
||||||
|
*/
|
||||||
|
class QueueEnvelopeTransport implements ControlEnvelopeTransport {
|
||||||
|
constructor(private readonly queue: OutboundQueueLike) {}
|
||||||
|
async send(peerAddress: string, envelope: ShadeEnvelope): Promise<void> {
|
||||||
|
const bytes = encodeEnvelope(envelope);
|
||||||
|
const event: QueuedEventInput = { kind: 'envelope', bytes };
|
||||||
|
this.queue.enqueue(peerAddress, event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ─── Default HTTP envelope transport ──────────────────────────
|
// ─── Default HTTP envelope transport ──────────────────────────
|
||||||
|
|
||||||
class HttpEnvelopeTransport implements ControlEnvelopeTransport {
|
class HttpEnvelopeTransport implements ControlEnvelopeTransport {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/server",
|
"name": "@shade/server",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/storage-encrypted",
|
"name": "@shade/storage-encrypted",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/storage-postgres",
|
"name": "@shade/storage-postgres",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/storage-sqlite",
|
"name": "@shade/storage-sqlite",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/streams",
|
"name": "@shade/streams",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/transfer",
|
"name": "@shade/transfer",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ export * from './transport/memory.js';
|
|||||||
export * from './transport/http-transport.js';
|
export * from './transport/http-transport.js';
|
||||||
export * from './transport/ws-transport.js';
|
export * from './transport/ws-transport.js';
|
||||||
export * from './transport/multi-fallback.js';
|
export * from './transport/multi-fallback.js';
|
||||||
|
export * from './transport/queue-transport.js';
|
||||||
export * from './engine.js';
|
export * from './engine.js';
|
||||||
export {
|
export {
|
||||||
createTransferRoutes,
|
createTransferRoutes,
|
||||||
|
|||||||
319
packages/shade-transfer/src/transport/queue-transport.ts
Normal file
319
packages/shade-transfer/src/transport/queue-transport.ts
Normal file
@@ -0,0 +1,319 @@
|
|||||||
|
/**
|
||||||
|
* Per-peer outbound queue + queue-transport for browser-friendly
|
||||||
|
* pull-mode streams.
|
||||||
|
*
|
||||||
|
* The default `ShadeTransferHttpTransport` POSTs each chunk directly
|
||||||
|
* to the receiver's `/v1/transfer/<streamId>/chunk` route. That
|
||||||
|
* requires the receiver to host an HTTP server, which a browser tab
|
||||||
|
* cannot. `QueueTransferTransport` flips the direction: it queues
|
||||||
|
* chunks per peer and lets the receiver pull them via a long-poll
|
||||||
|
* endpoint.
|
||||||
|
*
|
||||||
|
* The companion `OutboundQueue` data structure is plain server-side
|
||||||
|
* state — wired up by `@shade/files`'s `transferQueueRoute()` (and any
|
||||||
|
* future consumer) to expose the long-poll surface and feed envelopes
|
||||||
|
* + chunks into the browser receiver.
|
||||||
|
*/
|
||||||
|
import type {
|
||||||
|
ChunkAck,
|
||||||
|
ChunkSendOptions,
|
||||||
|
ITransferTransport,
|
||||||
|
TransferResumeState,
|
||||||
|
} from './transport.js';
|
||||||
|
import { TransferTransportError } from '../errors.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Discriminated union of items the queue ships to the browser
|
||||||
|
* receiver. Both kinds carry **wire-encoded bytes** of an envelope —
|
||||||
|
* the receiver decodes via `decodeEnvelope` (control envelopes) or
|
||||||
|
* forwards directly to `engine.receiveChunk` (chunk envelopes).
|
||||||
|
*/
|
||||||
|
export type QueuedEvent =
|
||||||
|
| {
|
||||||
|
id: number;
|
||||||
|
timestampMs: number;
|
||||||
|
kind: 'envelope';
|
||||||
|
/** Wire-encoded `0x02` ratchet envelope (or `0x01` first-message). */
|
||||||
|
bytes: Uint8Array;
|
||||||
|
}
|
||||||
|
| {
|
||||||
|
id: number;
|
||||||
|
timestampMs: number;
|
||||||
|
kind: 'chunk';
|
||||||
|
/** Wire-encoded `0x11` stream-chunk envelope. */
|
||||||
|
bytes: Uint8Array;
|
||||||
|
meta: {
|
||||||
|
streamId: string;
|
||||||
|
laneId: number;
|
||||||
|
seq: number;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
/** Caller-supplied shape for {@link OutboundQueue.enqueue} — the queue assigns `id` + `timestampMs`. */
|
||||||
|
export type QueuedEventInput =
|
||||||
|
| { kind: 'envelope'; bytes: Uint8Array }
|
||||||
|
| {
|
||||||
|
kind: 'chunk';
|
||||||
|
bytes: Uint8Array;
|
||||||
|
meta: { streamId: string; laneId: number; seq: number };
|
||||||
|
};
|
||||||
|
|
||||||
|
export interface OutboundQueueOptions {
|
||||||
|
/**
|
||||||
|
* Maximum events held per peer. When the queue is full, the oldest
|
||||||
|
* unacked event is dropped on next enqueue. Default 1000 — at the
|
||||||
|
* default chunk size (256 KiB plaintext) this caps a single peer's
|
||||||
|
* outbound buffer at ~256 MiB. Tune up for fewer/bigger streams,
|
||||||
|
* down for many concurrent small flows.
|
||||||
|
*/
|
||||||
|
maxEventsPerPeer?: number;
|
||||||
|
/**
|
||||||
|
* After a peer has not polled for this long, the queue's events are
|
||||||
|
* dropped and any pending waiters are released. Default 10 minutes.
|
||||||
|
* Setting to `0` disables idle-eviction.
|
||||||
|
*/
|
||||||
|
idleEvictionMs?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
const DEFAULT_MAX_EVENTS = 1000;
|
||||||
|
const DEFAULT_IDLE_EVICTION_MS = 10 * 60 * 1000;
|
||||||
|
|
||||||
|
interface PendingWaiter {
|
||||||
|
resolve(events: QueuedEvent[]): void;
|
||||||
|
reject(err: Error): void;
|
||||||
|
timer: ReturnType<typeof setTimeout>;
|
||||||
|
abortHandler?: () => void;
|
||||||
|
signal?: AbortSignal;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface PerPeerState {
|
||||||
|
nextId: number;
|
||||||
|
events: QueuedEvent[];
|
||||||
|
waiters: PendingWaiter[];
|
||||||
|
lastTouchedMs: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Per-peer monotonic event log with long-poll semantics.
|
||||||
|
*
|
||||||
|
* `enqueue` appends; `drain` returns all events with `id > since`,
|
||||||
|
* blocking up to `blockMs` if there are none. `since`-based pagination
|
||||||
|
* is the resume mechanism: a client crashing mid-stream restarts with
|
||||||
|
* its last-processed id and the queue replays everything after it
|
||||||
|
* (subject to `maxEventsPerPeer` retention).
|
||||||
|
*/
|
||||||
|
export class OutboundQueue {
|
||||||
|
private peers = new Map<string, PerPeerState>();
|
||||||
|
private readonly maxEvents: number;
|
||||||
|
private readonly idleEvictionMs: number;
|
||||||
|
private evictTimer: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
private destroyed = false;
|
||||||
|
|
||||||
|
constructor(opts: OutboundQueueOptions = {}) {
|
||||||
|
this.maxEvents = opts.maxEventsPerPeer ?? DEFAULT_MAX_EVENTS;
|
||||||
|
this.idleEvictionMs = opts.idleEvictionMs ?? DEFAULT_IDLE_EVICTION_MS;
|
||||||
|
if (this.idleEvictionMs > 0) this.scheduleEviction();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Append an event and wake any waiters for that peer. */
|
||||||
|
enqueue(peer: string, ev: QueuedEventInput): QueuedEvent {
|
||||||
|
if (this.destroyed) throw new Error('OutboundQueue: destroyed');
|
||||||
|
const state = this.getOrCreate(peer);
|
||||||
|
const event: QueuedEvent =
|
||||||
|
ev.kind === 'chunk'
|
||||||
|
? {
|
||||||
|
id: state.nextId++,
|
||||||
|
timestampMs: Date.now(),
|
||||||
|
kind: 'chunk',
|
||||||
|
bytes: ev.bytes,
|
||||||
|
meta: ev.meta,
|
||||||
|
}
|
||||||
|
: {
|
||||||
|
id: state.nextId++,
|
||||||
|
timestampMs: Date.now(),
|
||||||
|
kind: 'envelope',
|
||||||
|
bytes: ev.bytes,
|
||||||
|
};
|
||||||
|
state.events.push(event);
|
||||||
|
state.lastTouchedMs = Date.now();
|
||||||
|
// Cap: drop oldest. Lost events trigger receiver-side resume from
|
||||||
|
// last polled id; the @shade/transfer engine handles missing seqs
|
||||||
|
// by re-sending on resume.
|
||||||
|
while (state.events.length > this.maxEvents) state.events.shift();
|
||||||
|
// Wake all waiters with whatever has accumulated.
|
||||||
|
const drained = this.collect(state, 0);
|
||||||
|
if (drained.length > 0) {
|
||||||
|
const waiters = state.waiters.splice(0);
|
||||||
|
for (const w of waiters) {
|
||||||
|
clearTimeout(w.timer);
|
||||||
|
if (w.abortHandler !== undefined && w.signal !== undefined) {
|
||||||
|
w.signal.removeEventListener('abort', w.abortHandler);
|
||||||
|
}
|
||||||
|
w.resolve(drained);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return event;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Drain events with `id > since`. If none are available, block up
|
||||||
|
* to `blockMs` until any arrive. `signal` cancels the wait early.
|
||||||
|
*/
|
||||||
|
async drain(
|
||||||
|
peer: string,
|
||||||
|
since: number,
|
||||||
|
blockMs: number,
|
||||||
|
signal?: AbortSignal,
|
||||||
|
): Promise<QueuedEvent[]> {
|
||||||
|
if (this.destroyed) throw new Error('OutboundQueue: destroyed');
|
||||||
|
const state = this.getOrCreate(peer);
|
||||||
|
state.lastTouchedMs = Date.now();
|
||||||
|
const ready = this.collect(state, since);
|
||||||
|
if (ready.length > 0 || blockMs <= 0) return ready;
|
||||||
|
if (signal?.aborted) return [];
|
||||||
|
return await new Promise<QueuedEvent[]>((resolve, reject) => {
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
const idx = state.waiters.indexOf(waiter);
|
||||||
|
if (idx >= 0) state.waiters.splice(idx, 1);
|
||||||
|
if (waiter.abortHandler !== undefined && waiter.signal !== undefined) {
|
||||||
|
waiter.signal.removeEventListener('abort', waiter.abortHandler);
|
||||||
|
}
|
||||||
|
// Empty drain on timeout — that's the "no new events" signal.
|
||||||
|
resolve([]);
|
||||||
|
}, blockMs);
|
||||||
|
const waiter: PendingWaiter = { resolve, reject, timer };
|
||||||
|
if (signal !== undefined) {
|
||||||
|
const handler = () => {
|
||||||
|
const idx = state.waiters.indexOf(waiter);
|
||||||
|
if (idx >= 0) state.waiters.splice(idx, 1);
|
||||||
|
clearTimeout(timer);
|
||||||
|
resolve([]);
|
||||||
|
};
|
||||||
|
signal.addEventListener('abort', handler, { once: true });
|
||||||
|
waiter.abortHandler = handler;
|
||||||
|
waiter.signal = signal;
|
||||||
|
}
|
||||||
|
state.waiters.push(waiter);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Drop a peer's queue + reject waiters. */
|
||||||
|
evict(peer: string): void {
|
||||||
|
const state = this.peers.get(peer);
|
||||||
|
if (state === undefined) return;
|
||||||
|
this.peers.delete(peer);
|
||||||
|
for (const w of state.waiters) {
|
||||||
|
clearTimeout(w.timer);
|
||||||
|
if (w.abortHandler !== undefined && w.signal !== undefined) {
|
||||||
|
w.signal.removeEventListener('abort', w.abortHandler);
|
||||||
|
}
|
||||||
|
w.reject(new Error('OutboundQueue: peer evicted'));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Peer-specific snapshot for diagnostics. */
|
||||||
|
stats(peer: string): { eventCount: number; nextId: number; waiters: number } | null {
|
||||||
|
const state = this.peers.get(peer);
|
||||||
|
if (state === undefined) return null;
|
||||||
|
return {
|
||||||
|
eventCount: state.events.length,
|
||||||
|
nextId: state.nextId,
|
||||||
|
waiters: state.waiters.length,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Tear everything down. Pending waiters are rejected. */
|
||||||
|
destroy(): void {
|
||||||
|
if (this.destroyed) return;
|
||||||
|
this.destroyed = true;
|
||||||
|
if (this.evictTimer !== null) clearTimeout(this.evictTimer);
|
||||||
|
for (const peer of [...this.peers.keys()]) this.evict(peer);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── internals ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
private getOrCreate(peer: string): PerPeerState {
|
||||||
|
let state = this.peers.get(peer);
|
||||||
|
if (state === undefined) {
|
||||||
|
state = {
|
||||||
|
nextId: 1,
|
||||||
|
events: [],
|
||||||
|
waiters: [],
|
||||||
|
lastTouchedMs: Date.now(),
|
||||||
|
};
|
||||||
|
this.peers.set(peer, state);
|
||||||
|
}
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
private collect(state: PerPeerState, since: number): QueuedEvent[] {
|
||||||
|
if (state.events.length === 0) return [];
|
||||||
|
return state.events.filter((e) => e.id > since);
|
||||||
|
}
|
||||||
|
|
||||||
|
private scheduleEviction(): void {
|
||||||
|
const interval = Math.max(60_000, Math.floor(this.idleEvictionMs / 4));
|
||||||
|
this.evictTimer = setTimeout(() => {
|
||||||
|
if (this.destroyed) return;
|
||||||
|
const cutoff = Date.now() - this.idleEvictionMs;
|
||||||
|
for (const [peer, state] of this.peers.entries()) {
|
||||||
|
if (state.lastTouchedMs < cutoff) this.evict(peer);
|
||||||
|
}
|
||||||
|
this.scheduleEviction();
|
||||||
|
}, interval);
|
||||||
|
(this.evictTimer as unknown as { unref?: () => void }).unref?.();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Chunk transport that enqueues into an {@link OutboundQueue} instead
|
||||||
|
* of POSTing.
|
||||||
|
*
|
||||||
|
* Returns an optimistic `ChunkAck` immediately because the queue *is*
|
||||||
|
* the delivery — the receiver polls and dispatches. Browser receivers
|
||||||
|
* cannot synchronously confirm receipt before the next chunk; the
|
||||||
|
* engine's stream-protocol catches dropped chunks at finish-time
|
||||||
|
* integrity check, and chunk-resume restarts the lane from the last
|
||||||
|
* polled `since`.
|
||||||
|
*/
|
||||||
|
export class QueueTransferTransport implements ITransferTransport {
|
||||||
|
constructor(private readonly queue: OutboundQueue) {}
|
||||||
|
|
||||||
|
async probe(_peer: string): Promise<void> {
|
||||||
|
// The queue is local. Reachability is "is there a poller?" which
|
||||||
|
// is decided by `idleEvictionMs`. We don't synchronously check
|
||||||
|
// here; the engine retries via `withRetry` on `sendChunk` errors.
|
||||||
|
}
|
||||||
|
|
||||||
|
async sendChunk(
|
||||||
|
peer: string,
|
||||||
|
streamId: string,
|
||||||
|
laneId: number,
|
||||||
|
seq: number | bigint,
|
||||||
|
bytes: Uint8Array,
|
||||||
|
options?: ChunkSendOptions,
|
||||||
|
): Promise<ChunkAck> {
|
||||||
|
if (options?.signal?.aborted) {
|
||||||
|
throw new TransferTransportError('sendChunk aborted by caller');
|
||||||
|
}
|
||||||
|
const seqNum = typeof seq === 'bigint' ? Number(seq) : seq;
|
||||||
|
this.queue.enqueue(peer, {
|
||||||
|
kind: 'chunk',
|
||||||
|
bytes,
|
||||||
|
meta: { streamId, laneId, seq: seqNum },
|
||||||
|
});
|
||||||
|
return { lastSeq: seqNum };
|
||||||
|
}
|
||||||
|
|
||||||
|
async fetchResumeState(
|
||||||
|
_peer: string,
|
||||||
|
_streamId: string,
|
||||||
|
): Promise<TransferResumeState | null> {
|
||||||
|
// Pull-mode receivers report resume state by re-polling with the
|
||||||
|
// `since` cursor they last successfully processed; the queue does
|
||||||
|
// not need to query the receiver. Return null so the engine
|
||||||
|
// restarts from seq 0 (deterministic), and the queue replays from
|
||||||
|
// `since=0` if the client reconnects fresh.
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/transport-bridge",
|
"name": "@shade/transport-bridge",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/transport-webrtc",
|
"name": "@shade/transport-webrtc",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/transport",
|
"name": "@shade/transport",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@shade/widgets",
|
"name": "@shade/widgets",
|
||||||
"version": "4.1.0",
|
"version": "4.2.0",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"main": "src/index.ts",
|
"main": "src/index.ts",
|
||||||
"types": "src/index.ts",
|
"types": "src/index.ts",
|
||||||
|
|||||||
Reference in New Issue
Block a user