3 Commits

Author SHA1 Message Date
b77b7e771c release(v4.2.1): fix concurrent-ratchet desync via OutboundQueue waiter cursor
Some checks failed
Publish / publish (push) Has been cancelled
Docker build and publish / docker (push) Has been cancelled
Pull-mode httpClient + drainer + parallel RPCs against the same peer
deteriorated after ~10s with `DecryptionError`. Two bugs combined:

- `OutboundQueue.enqueue` woke `drain` waiters with a `since=0`
  snapshot, replaying already-processed events into
  `Shade.acceptTransferEnvelope` → `manager.decrypt` twice. The
  duplicate consumed an already-used skipped key and corrupted the
  Double Ratchet receive chain.

- `ratchetDecrypt` then propagated the corruption: a same-DH
  message behind the chain with no cached skipped key fell through
  to `kdfChainKey` on the ahead state and rewound `chain.counter`,
  permanently desyncing the chain.

Fix `OutboundQueue` to honor each waiter's `since`, and harden
`ratchetDecrypt` so any future duplicate fails cleanly without
mutating state. Adds regression coverage at all three layers.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 22:58:26 +02:00
7520b11b25 release(v4.2.0): pull-mode streams for browser @shade/files
Some checks failed
Test / test (push) Has been cancelled
Docker build and publish / docker (push) Has been cancelled
Publish / publish (push) Has been cancelled
4.1.0's HTTP RPC for browsers capped at inline payloads (≤ 256 KiB).
4.2.0 unlocks streams: server queues outbound chunks + control
envelopes per peer, browser long-polls the queue. Browser-to-server
writes ride the existing /v1/transfer/<id>/chunk POST routes
unchanged.

For Dispatch this unlocks mod-jar uploads (50 MB) and world-backup
downloads (100+ MB) — the actual reason browser-side @shade/files
matters.

### New API

@shade/sdk:
- shade.transferQueueRoute(opts?) — Hono app with /queue +
  /v1/transfer/* routes. Auto-configures the queue transport.
- shade.configureTransfers extended: transport + envelopeTransport
  override slots; resolveBaseUrl optional when both supplied.

@shade/transfer:
- OutboundQueue — per-peer monotonic event log with long-poll
  semantics, idle-eviction GC, ring-buffered to maxEventsPerPeer.
- QueueTransferTransport — enqueues instead of POSTing.

@shade/files:
- httpClient({ outboundQueueUrl, transferBaseUrl }) — when set,
  starts a long-poll drainer + builds a streams-bridge. fs.read /
  fs.write of >256 KiB work end-to-end.
- startQueueDrainer(shade, opts) — exported helper for advanced
  consumers driving their own drainer.

### Implementation notes

- ClientStreamsBridge's TransformStream had HWM=0 by default which
  stalled the drainer's await chain at chunk 4 (writer.write pended
  before the consumer's reader was attached). Bumped to HWM=64 so
  the receive loop can buffer ahead of the consumer.

### Tests

3 new integration tests in tests/integration/http-rpc-streams.test.ts:
4 MiB streamed read round-trip, inline-only error path, idle-timeout
long-poll behaviour.

Wire-compatible. Source-compatible. Lockstep bump to 4.2.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 23:27:06 +02:00
da93b97cce release(v4.1.0): browser-friendly HTTP RPC for @shade/files
Some checks failed
Test / test (push) Has been cancelled
Docker build and publish / docker (push) Has been cancelled
Publish / publish (push) Has been cancelled
Default shade.files.client(peer) requires both peers to be mutually
addressable over HTTP — the response round-trips through
Shade.deliverControlEnvelope (POST to peer's /v1/transfer/control).
Browser tabs can't host an HTTP server, so they couldn't consume
@shade/files at all. Dispatch's filutforsker (admin-panel browser UI)
is the canonical use-case.

This release adds a parallel request-response transport: one POST per
RPC, encrypted envelope in the body, encrypted response in the same
HTTP response. No inbound channel needed on the client.

### New API

- shade.files.rpcRoute(opts?) — Hono app exposing POST /rpc.
- shade.files.httpClient(peer, opts) — request-response FileClient.
- FilesNamespace.serve(handler, { inlineOnly: true }) — skip streams-
  bridge (and its configureTransfers pre-condition); also skip
  channel-based dispatch so requests aren't double-dispatched.

### Limitations (v1)

Inline only (≤ 256 KiB). Streamed reads/writes throw clear errors
directing to shade.files.client(peer) on a server-to-server deploy.

### Tests

7 integration tests in tests/integration/http-rpc.test.ts covering
round-trip + negative cases (sender header, empty/garbage body,
maxBodyBytes, rpcRoute-without-serve).

### Symmetry

Mirrors @shade/server's shade-auth-middleware: encrypted envelope in
request body, decrypted via existing ratchet, response in same HTTP
roundtrip. No WebSocket, no SSE, no outbound from server.

Wire-compatible. Source-compatible. Lockstep bump to 4.1.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 22:08:14 +02:00
43 changed files with 3042 additions and 44 deletions

View File

@@ -5,6 +5,291 @@ All notable changes to Shade are documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [4.2.1] — 2026-05-04 — Concurrent-ratchet desync under pull-mode drainer
A consumer running `shade.files.httpClient(server, { outboundQueueUrl, ... })`
alongside parallel RPC traffic against the same peer would, after ~10s of
load, see every subsequent message fail with
`DecryptionError: Failed to decrypt message — wrong key or tampered data`.
Two bugs combined to cause this; both are fixed in `4.2.1` with regression
coverage.
### Fixed
#### `@shade/transfer` — `OutboundQueue` waiter cursor
`enqueue` woke pending `drain` waiters with a `since=0` snapshot — the
full event log — instead of using the waiter's own `since`. A poll that
parked at the head and was woken by a fresh enqueue therefore replayed
every event the waiter had already processed. Downstream the queue
fed `Shade.acceptTransferEnvelope`, so the duplicate replayed an
envelope into `manager.decrypt` twice. The second decrypt consumed an
already-used skipped key and corrupted the Double Ratchet receive
chain. Each `PendingWaiter` now records its `since` cursor and is
delivered only events with `id > since`.
#### `@shade/core` — `ratchetDecrypt` defense-in-depth
A same-DH message whose `counter` was already behind the chain — and
that did NOT match a cached skipped key — fell through to a path that
called `kdfChainKey` on the *current* (ahead) chain key and then set
`chain.counter = message.counter + 1`, permanently desyncing the
ratchet so every subsequent decrypt returned wrong-key. Such messages
are now rejected with `DecryptionError` without any state mutation, so
a downstream replay (transport bug, retry, intermitent network) cannot
poison the session.
### Tests
- `packages/shade-files/tests/integration/concurrent-ratchet.test.ts`
100 parallel `httpClient` RPCs while the drainer runs, plus a mixed
workload of 50 RPCs + 50 raw `shade.send` deliveries with Bob
echoing replies through the queue. Both surface the bug pre-fix.
- `packages/shade-transfer/tests/outbound-queue.test.ts` — direct
regression on the waiter `since` cursor.
- `packages/shade-core/tests/ratchet.test.ts` — replay of an
already-decrypted message must throw cleanly without breaking
subsequent decrypts on the same chain.
## [4.2.0] — 2026-05-03 — Pull-mode streams for browser @shade/files
`4.1.0` shipped HTTP RPC for browser clients but capped them at inline
payloads (≤ 256 KiB). Larger reads/writes — mod-jars (150 MB),
world-backups (100+ MB), the things that actually need streaming —
threw `ConflictError` directing callers to the server-to-server
pathway. That made browser-side `@shade/files` insufficient for
admin-panel-style apps where the client is a browser tab and the
server is a Bun process.
`4.2.0` flips the direction: when the browser supplies
`outboundQueueUrl` + `transferBaseUrl`, server-to-browser chunks +
control envelopes ride a per-peer queue that the browser long-polls,
and browser-to-server chunks POST directly to the server's existing
chunk-receive routes. No WebSockets, no SSE, no inbound listener on
the browser. Long-polling + a request-response inbound queue is
the entire wire surface.
### Added
#### `@shade/transfer`
- `OutboundQueue` — per-peer monotonic event log with long-poll
semantics. `enqueue(peer, event)` appends, `drain(peer, since,
blockMs, signal)` returns events with `id > since` (blocking up
to `blockMs` if none are ready). Idle-eviction GC drops peers
that haven't polled in `idleEvictionMs` (default 10 min). Ring-
buffered to `maxEventsPerPeer` (default 1000) — overflow drops
oldest, receivers pick up the gap via re-resume from `since=0`.
- `QueuedEvent` discriminated union: `{ kind: 'envelope', bytes }`
or `{ kind: 'chunk', bytes, meta: { streamId, laneId, seq } }`.
- `QueueTransferTransport` (implements `ITransferTransport`) —
enqueues outbound chunks instead of POSTing. Returns optimistic
`ChunkAck` because the queue *is* the delivery; chunk-resume picks
up dropped events on receiver-side reconnect.
#### `@shade/sdk`
- `Shade.transferQueueRoute(opts?)` — Hono app with all five routes a
pull-mode receiver needs:
- `POST /queue` — long-poll the per-peer outbound queue.
- `POST /v1/transfer/:streamId/chunk` — receive incoming chunks
(browser → server writes).
- `GET /v1/transfer/:streamId/state` — resume-state lookup.
- `POST /v1/transfer/control` — receive incoming control envelopes
(browser → server stream-init / abort).
- `GET /v1/transfer/health` — peer reachability probe.
Auto-configures `shade.configureTransfers(...)` with the queue
transport + `QueueEnvelopeTransport` if not already configured.
- `Shade.configureTransfers(opts)` extended: `resolveBaseUrl` is now
optional when `transport` and `envelopeTransport` are both supplied
(lets pure-queue servers omit the baseUrl entirely). New
`transport?: ITransferTransport` override slot.
- `QueueEnvelopeTransport``ControlEnvelopeTransport` impl that
enqueues outbound envelopes for browser receivers.
#### `@shade/files`
- `createFilesHttpClient` (and `shade.files.httpClient`) accept new
options:
- `outboundQueueUrl``/queue` endpoint to long-poll.
- `transferBaseUrl` — base URL for outbound chunk POSTs and control
envelope POSTs (browser → server writes).
- `queueBlockMs` — long-poll timeout (default 30 s; server clamps
at `maxBlockMs`).
When set, the client:
1. Configures `shade.configureTransfers({ resolveBaseUrl })` so
outbound chunks POST to `<transferBaseUrl>/v1/transfer/...`.
2. Builds a `ClientStreamsBridge` eagerly so the engine's
incoming-transfer subscription is in place before the drainer
dispatches the first envelope.
3. Starts a long-poll `startQueueDrainer(...)` that pulls queued
events and dispatches them via `shade.acceptTransferEnvelope`.
- Streamed reads (`fs.read` of files > 256 KiB) and streamed writes
(`fs.write` of large inputs) now work end-to-end on the browser
client when the queue options are set.
- `startQueueDrainer(shade, opts)` exported for advanced consumers
that want to drive their own drainer (e.g. service-worker setups
that want a single shared drainer across multiple `httpClient`s).
- `client.close()` now stops the drainer and tears down the streams-
bridge — important on tab unload to free the long-poll socket.
#### `@shade/files` (internal)
- `ClientStreamsBridge` uses a TransformStream with `highWaterMark:
64` instead of the default `0` so the receive-side write loop
doesn't stall on backpressure before the consumer attaches its
reader (default HWM stalled at chunk 4 in pull-mode where the
drainer races the consumer's `getReader()` call).
### Wire contract
```
POST <base>/queue HTTP/1.1
X-Shade-Sender-Address: alice@example.com
{ "since": 42, "blockMs": 30000 }
────
200 OK
{
"events": [
{ "id": 43, "kind": "envelope", "bytesB64": "...", "timestampMs": 1730... },
{ "id": 44, "kind": "chunk", "bytesB64": "...", "meta": { "streamId": "...", "laneId": 0, "seq": 0 } },
...
],
"nextSince": 47
}
```
### Tests
`tests/integration/http-rpc-streams.test.ts` — three integration tests:
- 4 MiB streamed read end-to-end via long-poll queue (verifies bytes
match the source).
- Inline-only client throws clear error on streamed read.
- Long-poll returns empty events on idle timeout (verifies the
`blockMs` pathway).
### Migration
`4.1.0 → 4.2.0` is wire-compatible and source-compatible — the
queue route is purely additive. To enable streamed transfers in a
browser app:
```ts
// Server
const queue = await shade.transferQueueRoute({ blockMs: 30_000 });
await shade.files.serve(handler);
const rpc = shade.files.rpcRoute({ acceptFirstMessage: true });
const app = new Hono();
app.route('/api/v1/shade-files', queue);
app.route('/api/v1/shade-files', rpc);
// Browser
const fs = shade.files.httpClient(serverAddress, {
rpcUrl: 'https://server/api/v1/shade-files/rpc',
outboundQueueUrl: 'https://server/api/v1/shade-files/queue',
transferBaseUrl: 'https://server/api/v1/shade-files',
});
await fs.write('/mods/some-mod.jar', new Uint8Array(/* 50 MB */));
const result = await fs.read('/backups/world.tar.gz'); // streamed
```
`shade.files.serve(handler, { inlineOnly: true })` is still supported
for HTTP-RPC-without-streams deployments — it skips the streams-bridge
setup entirely.
## [4.1.0] — 2026-05-03 — Browser-friendly HTTP RPC for @shade/files
The default `shade.files.client(peer)` requires both peers to be
mutually addressable over HTTP — the response to a `list` / `read` /
etc. round-trips through `Shade.deliverControlEnvelope`, which POSTs
to the peer's `/v1/transfer/control` endpoint. **That doesn't work
for browsers** — a tab can't host an HTTP server, so the server
cannot call back outbound.
This release ships a parallel request-response transport. One POST per
RPC, encrypted envelope in the request body, encrypted response in the
same HTTP response. Mirrors the way `@shade/server`'s
`shade-auth-middleware` works for prekey writes.
### Added
#### `@shade/files`
- `createFilesRpcRoute(shade, handler, options?)` — Hono app exposing
`POST /rpc`. Reads `X-Shade-Sender-Address`, decrypts the envelope
via the existing ratchet session, dispatches through the attached
`FileHandler`, encrypts the result, and returns it in the same HTTP
response. Transport-level failures (no session, undecryptable, body
too big) return JSON `{ error }` with appropriate 4xx; application-
level failures ship encrypted `RpcError` envelopes.
- `createFilesHttpClient(shade, peer, options)` — request-response
`FileClient` for browser-style consumers. Each method (list / stat /
mkdir / delete / move / getThumbnail / custom / write inline / read
inline) does one HTTP POST and parses the encrypted response. No
inbound channel required.
- `shade.files.rpcRoute(opts?)` — namespace-side getter for the route.
Throws if no handler has been attached via `shade.files.serve(...)`
first.
- `shade.files.httpClient(peer, opts)` — namespace-side getter for the
client.
- `FilesNamespace.serve(handler, { inlineOnly: true })` — opt-out flag
that skips the streams-bridge setup. Required for HTTP-RPC-only
servers (which don't need `configureTransfers({ resolveBaseUrl })`).
In `inlineOnly` mode the channel-based dispatcher is also not
attached, so requests are dispatched only by the rpc-route — avoids
double-dispatch when a browser client and a server-to-server client
share the same Shade instance.
- `ShadeBridge` (exported) gains a `receive(peer, envelope)` member
matching `Shade.receive` so server-side rpc-route can decrypt
inbound envelopes through the structural surface.
### Wire contract
```
POST /rpc HTTP/1.1
Content-Type: application/octet-stream
X-Shade-Sender-Address: alice@example.com
<wire-encoded ShadeEnvelope wrapping JSON-encoded RpcRequest>
────
200 OK
Content-Type: application/octet-stream
<wire-encoded ShadeEnvelope wrapping JSON-encoded RpcResponse | RpcError>
```
### Limitations (v1)
- **Inline payloads only** (≤ 256 KiB). `write` of larger inputs
throws `ConflictError` directing callers to `shade.files.client(peer)`
on a server-to-server deployment. Streamed `read` results throw
`InternalFileError` for the same reason.
- The X3DH first-message must ride the same RPC route — set
`acceptFirstMessage: true` on `rpcRoute({ acceptFirstMessage: true })`
when the browser client's first-ever call doubles as the handshake.
### Tests
- `tests/integration/http-rpc.test.ts` — round-trip via HTTP
(list / mkdir / stat / write / read / delete) plus negative cases
(streamed write rejected, missing sender header, empty body, garbage
body, body past `maxBodyBytes`, `rpcRoute()` without `serve()`).
### Migration
`4.0.x → 4.1.0` is wire-compatible and source-compatible. The HTTP
RPC route is purely additive — no existing code path changes. To
adopt:
```ts
// server (was)
await shade.files.serve(handlerConfig);
// server (HTTP-RPC mode)
await shade.files.serve(handlerConfig, { inlineOnly: true });
app.route('/api/v1/shade-files', shade.files.rpcRoute());
// browser client
const fs = shade.files.httpClient(serverAddress, { rpcUrl: '...' });
```
## [4.0.2] — 2026-05-03 — Consumer-strict reader-shape fixes ## [4.0.2] — 2026-05-03 — Consumer-strict reader-shape fixes
`4.0.1` shipped the `tsc --noEmit` gate that compiles each package `4.0.1` shipped the `tsc --noEmit` gate that compiles each package

View File

@@ -53,6 +53,111 @@ if (result.kind === 'inline') console.log(result.bytes.byteLength);
else for await (const _chunk of /* result.stream */) { /* ... */ } else for await (const _chunk of /* result.stream */) { /* ... */ }
``` ```
## HTTP RPC — browser-friendly request-response (4.1+)
The default `shade.files.client(peer)` requires both peers to be mutually
addressable over HTTP — the response to a `list`/`read` etc. round-trips
through `Shade.deliverControlEnvelope`, which POSTs to the peer's
`/v1/transfer/control` endpoint. **That doesn't work for browsers**
a tab can't host an HTTP server, so the server cannot call back outbound.
`@shade/files` 4.1 ships a parallel **request-response** transport that
lets browser-style clients fully consume the file-RPC surface without
any inbound channel. It mirrors the way `@shade/server`'s
`shade-auth-middleware` works: one POST per RPC, encrypted envelope in
the request body, encrypted response in the same HTTP response.
### Server side — mount the RPC route
```ts
// 1. Register the file handler. `inlineOnly: true` skips the
// streams-bridge (which would require @shade/transfer).
await shade.files.serve(handlerConfig, { inlineOnly: true });
// 2. Mount the route on your Hono app under any base path.
app.route('/api/v1/shade-files', shade.files.rpcRoute());
// ^^^^^^^^^^^^^^
// POST <base>/rpc
```
`rpcRoute()` accepts:
| Option | Default | Purpose |
|---------------------|---------|----------------------------------------------------------------------------------------------------|
| `maxBodyBytes` | 1 MiB | Max request body. The protocol caps inline payloads at 256 KiB; the headroom is for base64 inflation + custom-op envelopes. |
| `acceptFirstMessage`| `false` | Accept `0x01` PreKeyMessage envelopes — required when the RPC route also doubles as the X3DH handshake (browser's first-ever request). |
### Browser client
```ts
import { createShade } from '@shade/sdk';
const shade = await createShade({
prekeyServer: 'https://shade.example.com',
storage: 'memory',
address: 'alice@example.com',
});
const fs = shade.files.httpClient('bob@example.com', {
rpcUrl: 'https://dispatch.example.com/api/v1/shade-files/rpc',
// Optional: thread CSRF / auth tokens, override fetch, etc.
headers: { 'X-CSRF-Token': csrfToken },
});
await fs.mkdir('/photos');
await fs.write('/photos/cover.png', new Uint8Array([/* ... */]), {
contentType: 'image/png',
});
const result = await fs.read('/photos/cover.png');
```
### What works in HTTP-RPC mode
- `list`, `stat`, `mkdir`, `delete`, `move`, `getThumbnail`, `custom<K>` — full parity.
- `write`**inline only** (≤ 256 KiB plaintext). Larger inputs throw `ConflictError`.
- `read`**inline only**. If the server returns a streamed `read` result, the client throws `InternalFileError` directing callers to the stateful pathway.
### Wire contract
```
POST /rpc HTTP/1.1
Content-Type: application/octet-stream
X-Shade-Sender-Address: alice@example.com
<wire-encoded ShadeEnvelope (0x01 first-time, 0x02 after) wrapping
JSON-encoded RpcRequest>
────
200 OK
Content-Type: application/octet-stream
<wire-encoded ShadeEnvelope (0x02) wrapping JSON-encoded RpcResponse | RpcError>
```
Transport-level failures (no session, undecryptable envelope, body too
big) return JSON `{ "error": "..." }` with appropriate 4xx status.
Application-level failures (file not found, permission denied) ship
encrypted `RpcError` envelopes — the client maps them back to typed
`FileError` subclasses (`NotFoundError`, `ConflictError`, etc.).
### Symmetry with `@shade/server`
The shape mirrors `@shade/server`'s shade-auth-middleware: encrypted
envelope rides the request body, server decrypts via the existing
ratchet session, performs the protected operation, returns an encrypted
envelope in the response. No bidirectional channel required, no
WebSocket, no SSE.
### When to use which
| Setup | Use |
|-----------------------------------------------|-----------------------------------------------|
| Browser client ↔ Bun/Hono server | `httpClient()` + `rpcRoute()` |
| Server ↔ server (both can host HTTP) | `client()` (default) — supports streams |
| Service-worker / extension ↔ server | `httpClient()` (no inbound listener) |
| CLI / daemon ↔ daemon | Either; `client()` if you need streams |
## Op surface ## Op surface
| Op | Args | Result | | Op | Args | Result |

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/cli", "name": "@shade/cli",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/cli.ts", "main": "src/cli.ts",
"bin": { "bin": {

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/core", "name": "@shade/core",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -185,6 +185,22 @@ export async function ratchetDecrypt(
if (!session.receiveChain) { if (!session.receiveChain) {
throw new DecryptionError('No receiving chain available'); throw new DecryptionError('No receiving chain available');
} }
// Defense-in-depth: a same-DH message whose counter is already
// behind the chain — and that did NOT match a cached skipped key —
// is either a duplicate we already decrypted (skipped key was
// consumed) or one whose key was evicted under cache pressure.
// Falling through would call kdfChainKey on the *current* (ahead)
// chainKey and then rewind `chain.counter = message.counter + 1`,
// permanently desyncing the chain so every subsequent decrypt
// returns wrong-key. Reject without mutating state instead.
if (
!isNewRatchet &&
message.counter < session.receiveChain.counter
) {
throw new DecryptionError(
'Failed to decrypt message — wrong key or tampered data',
);
}
await skipMessageKeys(crypto, session, message.dhPublicKey, session.receiveChain, message.counter); await skipMessageKeys(crypto, session, message.dhPublicKey, session.receiveChain, message.counter);
// Advance the receiving chain one more step to get this message's key // Advance the receiving chain one more step to get this message's key

View File

@@ -281,6 +281,41 @@ describe('Double Ratchet', () => {
expect(ratchetDecrypt(crypto, bob, msg)).rejects.toThrow(); expect(ratchetDecrypt(crypto, bob, msg)).rejects.toThrow();
}); });
/**
* Regression — the v4.2.0 OutboundQueue waiter-since bug delivered
* the same envelope twice to `manager.decrypt`. The first decrypt
* succeeded via a cached skipped key; the second one fell into the
* `message.counter < chain.counter` path with no skipped key
* available, advanced the chainKey ONCE and rewound `chain.counter`
* to `message.counter + 1`, leaving the ratchet permanently
* desynced. ratchetDecrypt now rejects without mutating state when
* a same-DH message is behind the chain and not in skippedKeys, so
* a downstream replay (transport bug, retry, etc.) cannot poison
* the session for everyone else.
*/
test('same-DH stale message after consumed skipped key fails without corrupting state', async () => {
const { alice, bob } = await setupPair();
// Alice sends 3 messages on the same DH chain.
const m0 = await ratchetEncrypt(crypto, alice, enc.encode('m0'));
const m1 = await ratchetEncrypt(crypto, alice, enc.encode('m1'));
const m2 = await ratchetEncrypt(crypto, alice, enc.encode('m2'));
// Bob receives m1 first, caching m0's key. Then m0 (delivered
// via the cache). After this, m0's skipped key is consumed.
expect(dec.decode(await ratchetDecrypt(crypto, bob, m1))).toBe('m1');
expect(dec.decode(await ratchetDecrypt(crypto, bob, m0))).toBe('m0');
// Replay of m0: skippedKey is gone, chain.counter is past m0.
// Pre-fix: this would corrupt Bob's chain state; post-fix it
// throws cleanly.
await expect(ratchetDecrypt(crypto, bob, m0)).rejects.toThrow(DecryptionError);
// Bob can still decrypt the remaining valid message — chain
// state was NOT mutated by the rejected replay.
expect(dec.decode(await ratchetDecrypt(crypto, bob, m2))).toBe('m2');
});
}); });
// ─── Long Conversation ──────────────────────────────────── // ─── Long Conversation ────────────────────────────────────

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/crypto-web", "name": "@shade/crypto-web",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/dashboard", "name": "@shade/dashboard",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"scripts": { "scripts": {
"dev": "vite", "dev": "vite",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/files", "name": "@shade/files",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -0,0 +1,590 @@
/**
* Browser-friendly request-response `FileClient` for `@shade/files`.
*
* The default `shade.files.client(peer)` ships RPC envelopes via
* `Shade.send` + `Shade.deliverControlEnvelope`, which means the
* server has to be able to call back outbound to the client. That
* doesn't work for browser tabs (no inbound HTTP listener). This
* client posts each RPC envelope to a single server endpoint and
* reads the encrypted response from the same HTTP response — pure
* request-response, no inbound channel required.
*
* Inline payloads only (≤ 256 KiB). For larger reads/writes, use the
* stateful path: `shade.files.client(peer)` server-to-server, with
* `@shade/transfer` chunk routes for content I/O.
*
* @see {@link createFilesRpcRoute} for the matching server-side route.
*/
import type { ZodTypeAny } from 'zod';
import { decodeEnvelope, encodeEnvelope as encodeWireEnvelope } from '@shade/proto';
import type { ShadeBridge } from '../integration/shade-bridge.js';
import {
encodeEnvelope as encodeRpcEnvelope,
tryParseEnvelope,
} from '../protocol/envelope-codec.js';
import {
KIND_CUSTOM_V1,
KIND_DELETE_V1,
KIND_GET_THUMBNAIL_V1,
KIND_LIST_V1,
KIND_MKDIR_V1,
KIND_MOVE_V1,
KIND_READ_V1,
KIND_STAT_V1,
KIND_WRITE_V1,
} from '../protocol/kinds.js';
import {
CustomArgsSchema,
CustomResultSchema,
DeleteArgsSchema,
DeleteResultSchema,
GetThumbnailArgsSchema,
GetThumbnailResultSchema,
ListArgsSchema,
ListResultSchema,
MkdirArgsSchema,
MkdirResultSchema,
MoveArgsSchema,
MoveResultSchema,
ReadArgsSchema,
ReadResultSchema,
StatArgsSchema,
StatResultSchema,
WriteArgsSchema,
WriteResultSchema,
type ListResult,
type MkdirResult,
type DeleteResult,
type MoveResult,
type StatResult,
type ThumbnailSize,
type WriteResult,
} from '../schemas/ops.js';
import {
fileErrorFromPayload,
CancelledError,
InternalFileError,
ConflictError,
} from '../schemas/errors.js';
import { buildRpcRequest } from '../protocol/rpc-builder.js';
import { decideInline, INLINE_THRESHOLD, type WriteSource } from './inline-threshold.js';
import { base64ToBytes, bytesToBase64 } from '../protocol/canonical.js';
import { startQueueDrainer, type QueueDrainerHandle } from './queue-drainer.js';
import {
createClientStreamsBridge,
type ClientStreamsBridge,
} from './streams-bridge.js';
import type {
FileClient,
ReadOpts,
ReadOutput,
ThumbnailResult,
WriteOpts,
CreateFileClientOptions,
BaseOpts,
} from './client.js';
export interface FilesHttpClientOptions
extends Omit<CreateFileClientOptions, 'streamsBridge'> {
/**
* Server endpoint that hosts `createFilesRpcRoute(...)`. Typically:
* `https://server.example.com/api/v1/shade-files/rpc`.
*/
rpcUrl: string;
/**
* Optional `fetch` override. Defaults to `globalThis.fetch`. Wire a
* custom `fetch` to thread auth-cookies, CSRF tokens, or
* service-worker interception.
*/
fetch?: typeof globalThis.fetch;
/**
* Extra HTTP headers applied to every RPC POST. Useful for app-level
* auth (CSRF, session cookies via custom header, etc.) — these are
* orthogonal to the ratchet authentication on the envelope itself.
*/
headers?: Record<string, string>;
/**
* Server endpoint that hosts `transferQueueRoute()`'s long-poll
* endpoint. Typically:
* `https://server.example.com/api/v1/shade-files/queue`.
*
* When supplied, the client starts a background long-poll that
* drains queued envelopes + chunks from the server and dispatches
* them via `shade.acceptTransferEnvelope`. This unlocks
* **streamed reads** (>256 KiB) for browser-style consumers.
*/
outboundQueueUrl?: string;
/**
* Base URL for outbound transfer routes (browser → server). Required
* alongside `outboundQueueUrl` to enable streamed writes. Typically:
* `https://server.example.com/api/v1/shade-files`.
*
* The client POSTs:
* - chunks to `<base>/v1/transfer/<streamId>/chunk`
* - control envelopes to `<base>/v1/transfer/control`
*/
transferBaseUrl?: string;
/**
* Long-poll block timeout, milliseconds. Default 30_000. Server
* clamps to its own `maxBlockMs` (default 55_000).
*/
queueBlockMs?: number;
}
interface RoundTripOpts {
signal?: AbortSignal;
timeoutMs?: number;
idempotencyKey?: string;
}
/**
* Create a request-response `FileClient` bound to `peerAddress` and a
* server-side RPC URL. The session must already be established
* (via `shade.initSessionFromBundle(peerAddress, bundle)` or an
* incoming first-message). Otherwise the first RPC will fail with
* "decrypt failed: no session for peer".
*
* When `outboundQueueUrl` + `transferBaseUrl` are supplied, the
* client also unlocks **streamed reads/writes** for files larger than
* the inline threshold (256 KiB). The browser polls the server's
* outbound queue for chunks/envelopes and POSTs its own outbound
* chunks to the server's transfer-receive routes.
*/
export function createFilesHttpClient(
shade: ShadeBridge,
peerAddress: string,
options: FilesHttpClientOptions,
): FileClient {
const rpcUrl = options.rpcUrl;
const fetchFn = options.fetch ?? globalThis.fetch.bind(globalThis);
const extraHeaders = options.headers ?? {};
const defaultTimeoutMs = options.defaultTimeoutMs ?? 30_000;
const ioTimeoutMs = options.ioTimeoutMs ?? 60_000;
const signRequest = options.signRequest;
const senderAddress = shade.myAddress;
// ─── Streamed-mode bootstrap ─────────────────────────────────
//
// When `outboundQueueUrl` is supplied, the client:
// 1. Configures `shade.configureTransfers(...)` so outbound
// chunks POST to `<transferBaseUrl>/v1/transfer/<streamId>/chunk`
// and outbound control envelopes POST to
// `<transferBaseUrl>/v1/transfer/control`.
// 2. Spawns a streams-bridge so streamed reads can be awaited.
// 3. Starts a long-poll drainer that pulls queued envelopes +
// chunks from the server and dispatches via
// `shade.acceptTransferEnvelope`.
let drainer: QueueDrainerHandle | null = null;
let streamsBridgePromise: Promise<ClientStreamsBridge> | null = null;
let streamsBridge: ClientStreamsBridge | null = null;
if (options.outboundQueueUrl !== undefined) {
const outboundQueueUrl = options.outboundQueueUrl;
if (options.transferBaseUrl === undefined) {
throw new Error(
'createFilesHttpClient: outboundQueueUrl was supplied without transferBaseUrl. Pass `transferBaseUrl` (the server prefix that hosts /v1/transfer/...) so outbound chunks have a destination.',
);
}
if (shade.configureTransfers === undefined) {
throw new Error(
'createFilesHttpClient: shade.configureTransfers is required for streamed mode (the underlying ShadeBridge must surface it).',
);
}
const transferBaseUrl = options.transferBaseUrl.replace(/\/$/, '');
shade.configureTransfers({
resolveBaseUrl: async (peer) => {
if (peer !== peerAddress) {
throw new Error(
`httpClient is bound to peer "${peerAddress}" — refusing to resolve outgoing chunks for "${peer}" without a multi-peer registry. Use shade.files.client(peer) for server-to-server multi-peer.`,
);
}
return transferBaseUrl;
},
});
// Build the streams-bridge eagerly. The engine's incoming-transfer
// subscription has to be in place BEFORE the drainer dispatches the
// first stream-init envelope, otherwise the engine emits the
// IncomingTransfer to zero handlers and the read silently never
// accepts. We kick off the drainer once the bridge has subscribed.
streamsBridgePromise = createClientStreamsBridge(shade).then((bridge) => {
streamsBridge = bridge;
drainer = startQueueDrainer(shade, {
outboundQueueUrl,
peerAddress,
senderAddress,
...(options.fetch !== undefined ? { fetch: options.fetch } : {}),
...(options.headers !== undefined ? { headers: options.headers } : {}),
...(options.queueBlockMs !== undefined ? { blockMs: options.queueBlockMs } : {}),
});
return bridge;
});
// Surface bridge-construction failures eagerly via a rejected
// promise the next read/write picks up.
streamsBridgePromise.catch(() => {
/* observed via getStreamsBridge() */
});
}
async function getStreamsBridge(): Promise<ClientStreamsBridge> {
if (streamsBridge !== null) return streamsBridge;
if (streamsBridgePromise === null) {
throw new ConflictError(
`http RPC client supports inline writes/reads only (≤ ${INLINE_THRESHOLD} bytes) — pass { outboundQueueUrl, transferBaseUrl } to enable streamed transfers.`,
);
}
streamsBridge = await streamsBridgePromise;
return streamsBridge;
}
/**
* Encrypt + POST + decrypt + parse one RPC round-trip.
*
* Throws a typed `FileError` subclass when the server returns an
* encrypted `RpcError`, or `InternalFileError` for transport-level
* failures (network, 4xx/5xx, malformed body).
*/
async function roundTrip<TResult>(
kind: string,
op: 'list' | 'stat' | 'mkdir' | 'delete' | 'move' | 'read' | 'write' | 'getThumbnail' | 'custom',
args: unknown,
resultSchema: ZodTypeAny,
opts: RoundTripOpts | undefined,
): Promise<TResult> {
const requestEnv = await buildRpcRequest({
senderAddress,
kind,
op,
args,
...(opts?.idempotencyKey !== undefined ? { idempotencyKey: opts.idempotencyKey } : {}),
...(signRequest !== undefined ? { signRequest } : {}),
});
const plaintext = encodeRpcEnvelope(requestEnv);
const ratchetEnvelope = await shade.send(peerAddress, plaintext);
const wireBytes = encodeWireEnvelope(ratchetEnvelope);
const ac = new AbortController();
const timeoutMs = opts?.timeoutMs ?? defaultTimeoutMs;
const timer = setTimeout(
() => ac.abort(new Error(`RPC timeout after ${timeoutMs}ms`)),
timeoutMs,
);
(timer as unknown as { unref?: () => void }).unref?.();
if (opts?.signal !== undefined) {
const userSignal = opts.signal;
if (userSignal.aborted) ac.abort(userSignal.reason);
else userSignal.addEventListener('abort', () => ac.abort(userSignal.reason), { once: true });
}
let response: Response;
try {
// Wrap the wire bytes in a Blob so the body type satisfies the
// common-denominator `BodyInit` across DOM, Bun, and node-fetch
// (some runtimes accept `Uint8Array` directly, others don't).
// Cast through `unknown` because TS's `bun-types` and `lib.dom`
// disagree about whether `Uint8Array<ArrayBufferLike>` is itself
// a `BlobPart`; the runtime accepts it on every platform.
response = await fetchFn(rpcUrl, {
method: 'POST',
body: new Blob([wireBytes as unknown as ArrayBuffer]),
signal: ac.signal,
headers: {
'Content-Type': 'application/octet-stream',
'X-Shade-Sender-Address': senderAddress,
...extraHeaders,
},
});
} catch (err) {
clearTimeout(timer);
if ((err as Error).name === 'AbortError') {
throw new CancelledError(`RPC ${kind} aborted: ${(err as Error).message}`);
}
throw new InternalFileError(`RPC ${kind} fetch failed: ${(err as Error).message}`);
}
clearTimeout(timer);
if (!response.ok) {
let body: { error?: string } | null = null;
try {
body = (await response.json()) as { error?: string };
} catch {
/* server emitted non-JSON body */
}
throw new InternalFileError(
`RPC ${kind}${response.status} ${response.statusText}: ${
body?.error ?? '(no error body)'
}`,
);
}
const ab = await response.arrayBuffer();
if (ab.byteLength === 0) {
throw new InternalFileError(`RPC ${kind}: empty response body`);
}
let responseRatchet;
try {
responseRatchet = decodeEnvelope(new Uint8Array(ab));
} catch (err) {
throw new InternalFileError(
`RPC ${kind}: response body is not a valid wire envelope: ${(err as Error).message}`,
);
}
let responsePlaintext: string;
try {
responsePlaintext = await shade.receive(peerAddress, responseRatchet);
} catch (err) {
throw new InternalFileError(
`RPC ${kind}: response decrypt failed: ${(err as Error).message}`,
);
}
const classified = tryParseEnvelope(responsePlaintext);
if (classified === null) {
throw new InternalFileError(
`RPC ${kind}: response plaintext is not a valid @shade/files envelope`,
);
}
if (classified.kind === 'error') {
throw fileErrorFromPayload(classified.envelope.error);
}
if (classified.kind !== 'response') {
throw new InternalFileError(
`RPC ${kind}: unexpected response envelope kind: ${classified.kind}`,
);
}
if (classified.envelope.id !== requestEnv.id) {
throw new InternalFileError(
`RPC ${kind}: response correlation id mismatch (got ${classified.envelope.id}, expected ${requestEnv.id})`,
);
}
return resultSchema.parse(classified.envelope.result) as TResult;
}
return {
async list(path, opts): Promise<ListResult> {
const args = ListArgsSchema.parse({
path,
...(opts?.cursor !== undefined ? { cursor: opts.cursor } : {}),
...(opts?.pageSize !== undefined ? { pageSize: opts.pageSize } : {}),
...(opts?.filter !== undefined ? { filter: opts.filter } : {}),
});
return await roundTrip<ListResult>(
KIND_LIST_V1,
'list',
args,
ListResultSchema,
opts,
);
},
async stat(path, opts): Promise<StatResult> {
const args = StatArgsSchema.parse({ path });
return await roundTrip<StatResult>(KIND_STAT_V1, 'stat', args, StatResultSchema, opts);
},
async mkdir(path, opts): Promise<MkdirResult> {
const args = MkdirArgsSchema.parse({
path,
...(opts?.recursive !== undefined ? { recursive: opts.recursive } : {}),
});
return await roundTrip<MkdirResult>(
KIND_MKDIR_V1,
'mkdir',
args,
MkdirResultSchema,
opts,
);
},
async delete(path, opts): Promise<DeleteResult> {
const args = DeleteArgsSchema.parse({
path,
...(opts?.recursive !== undefined ? { recursive: opts.recursive } : {}),
});
return await roundTrip<DeleteResult>(
KIND_DELETE_V1,
'delete',
args,
DeleteResultSchema,
opts,
);
},
async move(src, dst, opts): Promise<MoveResult> {
const args = MoveArgsSchema.parse({
src,
dst,
...(opts?.overwrite !== undefined ? { overwrite: opts.overwrite } : {}),
});
return await roundTrip<MoveResult>(KIND_MOVE_V1, 'move', args, MoveResultSchema, opts);
},
async read(path, opts: ReadOpts = {}): Promise<ReadOutput> {
const args = ReadArgsSchema.parse({
path,
...(opts.range !== undefined ? { range: opts.range } : {}),
...(opts.preferInline !== undefined ? { preferInline: opts.preferInline } : {}),
});
const wire = await roundTrip<import('../schemas/ops.js').ReadResult>(
KIND_READ_V1,
'read',
args,
ReadResultSchema,
opts,
);
if (wire.kind === 'inline') {
const bytes = base64ToBytes(wire.bytesB64);
const out: ReadOutput = {
kind: 'inline',
bytes,
size: wire.size,
sha256: wire.sha256,
...(wire.contentType !== undefined ? { contentType: wire.contentType } : {}),
};
return out;
}
// Streamed read — only supported when the queue drainer is wired.
if (drainer === null) {
throw new InternalFileError(
`http RPC client received a streamed read (size ${wire.size}) but is in inline-only mode. Pass { outboundQueueUrl, transferBaseUrl } when constructing the client to enable streamed reads.`,
);
}
const bridge = await getStreamsBridge();
const bridgeSignal = opts.signal ?? new AbortController().signal;
const parked = await bridge.awaitRead(wire.streamId, {
expectedFrom: peerAddress,
signal: bridgeSignal,
timeoutMs: ioTimeoutMs,
});
const out: ReadOutput = {
kind: 'streams',
stream: parked.readable,
size: wire.size,
sha256: wire.sha256,
...(wire.contentType !== undefined ? { contentType: wire.contentType } : {}),
done: async () => {
await parked.done;
},
};
return out;
},
async write(path, input: WriteSource, opts: WriteOpts = {}): Promise<WriteResult> {
const decision = await decideInline(input);
const overwrite = opts.overwrite ?? false;
const contentType = opts.contentType ?? decision.contentType;
if (decision.kind === 'inline' || opts.forceInline === true) {
const bytes = decision.kind === 'inline' ? decision.bytes : null;
if (bytes === null) {
// forceInline === true with a streams-typed decision —
// decideInline always produced a `streams` shape because the
// input was a bare ReadableStream. We can't drain a stream
// synchronously here without a streams-bridge.
throw new ConflictError(
'http RPC client cannot forceInline a streamed input — pass a Uint8Array / Blob, or pre-buffer the stream.',
);
}
if (bytes.byteLength > INLINE_THRESHOLD) {
throw new ConflictError(
`inline write exceeds ${INLINE_THRESHOLD}-byte threshold (got ${bytes.byteLength}); pass forceInline=true to override`,
);
}
const args = WriteArgsSchema.parse({
kind: 'inline',
path,
bytesB64: bytesToBase64(bytes),
...(contentType !== undefined ? { contentType } : {}),
overwrite,
});
return await roundTrip<WriteResult>(
KIND_WRITE_V1,
'write',
args,
WriteResultSchema,
opts,
);
}
// Streamed write — requires the queue drainer + streams-bridge.
if (drainer === null) {
throw new ConflictError(
`http RPC client supports inline writes only (≤ ${INLINE_THRESHOLD} bytes). The supplied input was promoted to streams (size ${decision.size ?? 'unknown'}). Pass { outboundQueueUrl, transferBaseUrl } to enable streamed writes.`,
);
}
const bridge = await getStreamsBridge();
const size = decision.size;
if (size === undefined) {
throw new ConflictError(
'streams write requires a known plaintext size; pass `{ stream, size }` instead of a bare ReadableStream',
);
}
const { writeId, handle } = await bridge.initiateWrite({
peer: peerAddress,
stream: decision.stream,
size,
...(contentType !== undefined ? { contentType } : {}),
name: path,
...(opts.signal !== undefined ? { signal: opts.signal } : {}),
});
const args = WriteArgsSchema.parse({
kind: 'streams',
path,
size,
...(contentType !== undefined ? { contentType } : {}),
overwrite,
writeId,
});
try {
const [result] = await Promise.all([
roundTrip<WriteResult>(KIND_WRITE_V1, 'write', args, WriteResultSchema, opts),
handle.done(),
]);
return result;
} catch (err) {
await handle.abort('rpc-failed').catch(() => undefined);
throw err;
}
},
async getThumbnail(path, size: ThumbnailSize, opts): Promise<ThumbnailResult> {
const args = GetThumbnailArgsSchema.parse({
path,
size,
...(opts?.format !== undefined ? { format: opts.format } : {}),
});
const raw = await roundTrip<import('../schemas/ops.js').GetThumbnailResult>(
KIND_GET_THUMBNAIL_V1,
'getThumbnail',
args,
GetThumbnailResultSchema,
opts,
);
return {
bytes: base64ToBytes(raw.bytesB64),
format: raw.format,
width: raw.width,
height: raw.height,
sha256: raw.sha256,
};
},
async custom(name, args, opts?: BaseOpts): Promise<unknown> {
const wireArgs = CustomArgsSchema.parse({ name, args });
return await roundTrip(KIND_CUSTOM_V1, 'custom', wireArgs, CustomResultSchema, opts);
},
close(): void {
// Stop the long-poll drainer + tear down the streams-bridge if
// we built one. Idempotent — safe to call multiple times.
drainer?.stop();
drainer = null;
if (streamsBridge !== null) {
void streamsBridge.destroy().catch(() => undefined);
streamsBridge = null;
}
streamsBridgePromise = null;
},
} as FileClient;
}

View File

@@ -0,0 +1,172 @@
/**
* Browser-side drainer for the pull-mode outbound queue.
*
* Background task that long-polls the server's `/queue` endpoint,
* decodes each event, and dispatches it into the consumer's Shade
* instance via `shade.acceptTransferEnvelope`. Same wire-shape as the
* server-to-server case where the engine receives chunks via direct
* HTTP POSTs — we just flip the direction so the browser pulls
* instead of accepts.
*/
import type { ShadeBridge } from '../integration/shade-bridge.js';
export interface QueueDrainerOptions {
/**
* Server endpoint that hosts `transferQueueRoute()`. Typically:
* `https://server.example.com/api/v1/shade-files/queue`.
*/
outboundQueueUrl: string;
/** Peer the queue is pulled FROM (the server's address). */
peerAddress: string;
/** Address we identify ourselves with on the long-poll. */
senderAddress: string;
/** Optional `fetch` override. Default `globalThis.fetch`. */
fetch?: typeof globalThis.fetch;
/** Extra headers applied to every poll request. */
headers?: Record<string, string>;
/**
* Long-poll request timeout (server-side block). Default 30_000.
* Server clamps to its own `maxBlockMs` (default 55_000).
*/
blockMs?: number;
/**
* Backoff after a network error before re-polling. Default 2_000.
* Doubles up to `maxBackoffMs` on consecutive failures.
*/
initialBackoffMs?: number;
maxBackoffMs?: number;
/**
* Called when a poll cycle fails. Defaults to logging via `console.error`.
* Throwing from this hook does NOT stop the drainer — it backs off
* and retries.
*/
onError?: (err: unknown) => void;
}
export interface QueueDrainerHandle {
/** Stop the drainer. Pending fetch is aborted; the loop exits. */
stop(): void;
/** Promise that resolves once the drainer has fully stopped. */
stopped: Promise<void>;
}
interface PolledEvent {
id: number;
timestampMs: number;
kind: 'envelope' | 'chunk';
bytesB64: string;
meta?: { streamId: string; laneId: number; seq: number };
}
interface PollResponseBody {
events: PolledEvent[];
nextSince: number;
}
const DEFAULT_BLOCK_MS = 30_000;
const DEFAULT_INITIAL_BACKOFF_MS = 2_000;
const DEFAULT_MAX_BACKOFF_MS = 30_000;
/**
* Start a long-poll loop that drains queued envelopes + chunks from
* the server and dispatches them into the local Shade instance.
*
* Returns a handle the caller can use to stop the drainer when the
* `httpClient` is closed (e.g. on tab unload).
*/
export function startQueueDrainer(
shade: ShadeBridge,
options: QueueDrainerOptions,
): QueueDrainerHandle {
if (shade.acceptTransferEnvelope === undefined) {
throw new Error(
'startQueueDrainer: shade.acceptTransferEnvelope is required for pull-mode streams. The supplied ShadeBridge implementation must surface it.',
);
}
const accept = shade.acceptTransferEnvelope.bind(shade);
const fetchFn = options.fetch ?? globalThis.fetch.bind(globalThis);
const blockMs = options.blockMs ?? DEFAULT_BLOCK_MS;
const onError = options.onError ?? ((err: unknown) => console.error('[shade.files queue-drainer]', err));
const initialBackoffMs = options.initialBackoffMs ?? DEFAULT_INITIAL_BACKOFF_MS;
const maxBackoffMs = options.maxBackoffMs ?? DEFAULT_MAX_BACKOFF_MS;
const ac = new AbortController();
let stopped = false;
let resolveStopped!: () => void;
const stoppedPromise = new Promise<void>((r) => {
resolveStopped = r;
});
void (async () => {
let since = 0;
let backoff = initialBackoffMs;
while (!stopped) {
try {
const response = await fetchFn(options.outboundQueueUrl, {
method: 'POST',
signal: ac.signal,
headers: {
'Content-Type': 'application/json',
'X-Shade-Sender-Address': options.senderAddress,
...(options.headers ?? {}),
},
body: JSON.stringify({ since, blockMs }),
});
if (!response.ok) {
throw new Error(`queue poll → ${response.status} ${response.statusText}`);
}
const body = (await response.json()) as PollResponseBody;
if (Array.isArray(body.events) && body.events.length > 0) {
for (const event of body.events) {
if (stopped) break;
try {
const bytes = base64ToBytes(event.bytesB64);
await accept(options.peerAddress, bytes);
} catch (err) {
// Per-event dispatch failure should not kill the loop —
// resume picks up missing chunks via @shade/transfer's
// built-in lane-resume protocol.
onError(err);
}
}
}
since = typeof body.nextSince === 'number' ? body.nextSince : since;
backoff = initialBackoffMs;
} catch (err) {
if (stopped || ac.signal.aborted) break;
onError(err);
// Exponential backoff with jitter — caps at maxBackoffMs.
const jitter = Math.floor(Math.random() * Math.min(backoff, 1_000));
await new Promise<void>((r) => {
const t = setTimeout(r, backoff + jitter);
(t as unknown as { unref?: () => void }).unref?.();
ac.signal.addEventListener(
'abort',
() => {
clearTimeout(t);
r();
},
{ once: true },
);
});
backoff = Math.min(maxBackoffMs, backoff * 2);
}
}
resolveStopped();
})();
return {
stop(): void {
if (stopped) return;
stopped = true;
ac.abort(new Error('queue drainer stopped'));
},
stopped: stoppedPromise,
};
}
function base64ToBytes(b64: string): Uint8Array {
const bin = atob(b64);
const out = new Uint8Array(bin.length);
for (let i = 0; i < bin.length; i++) out[i] = bin.charCodeAt(i);
return out;
}

View File

@@ -102,7 +102,16 @@ export async function createClientStreamsBridge(
const readStreamId = incoming.metadata.userMetadata?.[META_KEY_READ_STREAM_ID]; const readStreamId = incoming.metadata.userMetadata?.[META_KEY_READ_STREAM_ID];
if (readStreamId === undefined) return; if (readStreamId === undefined) return;
const ts = new TransformStream<Uint8Array, Uint8Array>(); // Generous HWM so the receiver-side write loop (drainer →
// engine.receiveChunk → sink.write) doesn't stall on backpressure
// before the consumer's reader is wired up. The reader still
// applies its own backpressure once it's consuming, but we no
// longer race fs.read's await on stream-init against the consumer
// attaching its reader.
const ts = new TransformStream<Uint8Array, Uint8Array>(undefined, undefined, {
highWaterMark: 64,
size: (chunk?: Uint8Array) => (chunk === undefined ? 0 : 1),
});
let handle: TransferHandle; let handle: TransferHandle;
try { try {
handle = await incoming.accept({ handle = await incoming.accept({

View File

@@ -183,6 +183,24 @@ export { MAX_SIGNATURE_AGE_MS } from './server/handler.js';
export { createFilesNamespace } from './integration/files-namespace.js'; export { createFilesNamespace } from './integration/files-namespace.js';
export type { FilesNamespace } from './integration/files-namespace.js'; export type { FilesNamespace } from './integration/files-namespace.js';
// Request-response HTTP transport — for browser-style consumers
// (one HTTP POST per RPC, no inbound channel needed). See
// `docs/files.md § HTTP RPC`.
export { createFilesRpcRoute } from './server/rpc-route.js';
export type { FilesRpcRouteOptions } from './server/rpc-route.js';
export { createFilesHttpClient } from './client/http-client.js';
export type { FilesHttpClientOptions } from './client/http-client.js';
export { startQueueDrainer } from './client/queue-drainer.js';
export type {
QueueDrainerHandle,
QueueDrainerOptions,
} from './client/queue-drainer.js';
// Shared structural surface @shade/files needs from a Shade instance —
// exposed so consumers building custom Shade-shaped bridges can verify
// they implement every required member.
export type { ShadeBridge } from './integration/shade-bridge.js';
// Integration helpers — wire handler + pending registry onto a channel // Integration helpers — wire handler + pending registry onto a channel
export { attachFileHandler } from './integration/wire-server.js'; export { attachFileHandler } from './integration/wire-server.js';
export { attachClientRouting } from './integration/wire-client.js'; export { attachClientRouting } from './integration/wire-client.js';

View File

@@ -4,6 +4,7 @@
* so a single Shade can simultaneously serve files AND consume them from * so a single Shade can simultaneously serve files AND consume them from
* peers without paying the setup cost twice. * peers without paying the setup cost twice.
*/ */
import type { Hono } from 'hono';
import type { ShadeBridge } from './shade-bridge.js'; import type { ShadeBridge } from './shade-bridge.js';
import { import {
attachClientRouting, attachClientRouting,
@@ -11,6 +12,8 @@ import {
createClientStreamsBridge, createClientStreamsBridge,
createFileClient, createFileClient,
createFileHandler, createFileHandler,
createFilesHttpClient,
createFilesRpcRoute,
createServerStreamsBridge, createServerStreamsBridge,
PendingRpcRegistry, PendingRpcRegistry,
ShadeFileRpcChannel, ShadeFileRpcChannel,
@@ -19,22 +22,79 @@ import {
type FileClient, type FileClient,
type FileHandler, type FileHandler,
type FileHandlerConfig, type FileHandlerConfig,
type FilesHttpClientOptions,
type FilesRpcRouteOptions,
type ServerStreamsBridge, type ServerStreamsBridge,
} from '../index.js'; } from '../index.js';
import { IdempotencyCache } from '../server/idempotency-cache.js'; import { IdempotencyCache } from '../server/idempotency-cache.js';
export interface ServeOptions {
/**
* Skip the streams bridge setup. Required for deployments that only
* use the HTTP RPC route ({@link FilesNamespace.rpcRoute}) — those
* deployments don't need to configure `@shade/transfer` because the
* RPC route only services inline payloads (≤ 256 KiB). Without this
* flag, `serve()` calls `createServerStreamsBridge(shade)` which
* eagerly instantiates the transfer engine and fails when
* `configureTransfers({ resolveBaseUrl })` has not been called.
*
* Default: `false` (build the streams bridge — full server-to-server
* stack with streamed reads/writes).
*/
inlineOnly?: boolean;
}
export interface FilesNamespace { export interface FilesNamespace {
/** /**
* Register a file handler. Throws if a handler is already attached on * Register a file handler. Throws if a handler is already attached on
* this Shade — only one server per Shade. The returned function detaches * this Shade — only one server per Shade. The returned function detaches
* the handler and tears down its idempotency / retention timers. * the handler and tears down its idempotency / retention timers.
*
* Pass `{ inlineOnly: true }` for HTTP-RPC-only deployments to skip
* the streams-bridge setup (and the implied `configureTransfers`
* pre-condition).
*/ */
serve(handler: FileHandlerConfig): Promise<() => Promise<void>>; serve(
handler: FileHandlerConfig,
options?: ServeOptions,
): Promise<() => Promise<void>>;
/** /**
* Build a typed file client for `peer`. Multiple concurrent clients to * Build a typed file client for `peer`. Multiple concurrent clients to
* different peers share the same channel + streams bridge. * different peers share the same channel + streams bridge.
*
* Use this for **server-to-server** deployments where both peers can
* receive inbound HTTP. For browser clients (no inbound listener),
* use {@link httpClient} instead.
*/ */
client(peer: string, opts?: Omit<CreateFileClientOptions, 'streamsBridge'>): Promise<FileClient>; client(peer: string, opts?: Omit<CreateFileClientOptions, 'streamsBridge'>): Promise<FileClient>;
/**
* Build a request-response `FileClient` for browser-style consumers.
* Each RPC is one HTTP POST to the supplied `rpcUrl`; the encrypted
* response rides back in the same response body. No inbound channel
* required on the client side.
*
* Inline payloads only (≤ 256 KiB). Streamed reads/writes throw a
* clear error directing callers to {@link client} instead.
*
* Pre-condition: the session for `peer` must already be established
* (typically via `shade.initSessionFromBundle(peer, bundle)`).
*/
httpClient(peer: string, opts: FilesHttpClientOptions): FileClient;
/**
* Mount the server-side request-response RPC route. Returns a Hono
* app exposing `POST /rpc` that accepts encrypted file-RPC envelopes
* and returns encrypted responses in the same HTTP roundtrip.
*
* Mount under any base path:
* ```ts
* app.route('/api/v1/shade-files', shade.files.rpcRoute());
* ```
*
* Requires `shade.files.serve(...)` to have been called first —
* the route dispatches incoming requests through the attached
* handler.
*/
rpcRoute(opts?: FilesRpcRouteOptions): Hono;
/** Tear down channel + bridges. After destroy(), serve()/client() throw. */ /** Tear down channel + bridges. After destroy(), serve()/client() throw. */
destroy(): Promise<void>; destroy(): Promise<void>;
} }
@@ -71,24 +131,39 @@ export function createFilesNamespace(shade: ShadeBridge): FilesNamespace {
} }
return { return {
async serve(handlerConfig) { async serve(handlerConfig, options = {}) {
ensureAlive(); ensureAlive();
if (state.serverHandler !== null) { if (state.serverHandler !== null) {
throw new Error('FilesNamespace: a handler is already registered (one per Shade)'); throw new Error('FilesNamespace: a handler is already registered (one per Shade)');
} }
// Lazy server-side streams bridge. // Lazy server-side streams bridge — skip when the deployment is
if (state.serverBridge === null) { // HTTP-RPC-only and does not need `@shade/transfer` wired up.
if (!options.inlineOnly && state.serverBridge === null) {
state.serverBridge = await createServerStreamsBridge(shade); state.serverBridge = await createServerStreamsBridge(shade);
} }
const inheritedObservability = shade.getObservability?.(); const inheritedObservability = shade.getObservability?.();
const handler = createFileHandler(shade, { const handler = createFileHandler(shade, {
...handlerConfig, ...handlerConfig,
streamsBridge: state.serverBridge, ...(state.serverBridge !== null
? { streamsBridge: state.serverBridge }
: {}),
...(handlerConfig.observability === undefined && inheritedObservability !== undefined ...(handlerConfig.observability === undefined && inheritedObservability !== undefined
? { observability: inheritedObservability } ? { observability: inheritedObservability }
: {}), : {}),
}); });
const detach = attachFileHandler(state.channel, handler); // In inlineOnly mode, the rpc-route is the sole inbound path —
// do NOT also subscribe the channel's onMessage handler to this
// file handler, because that would cause every incoming request
// to be dispatched twice (once by the rpc-route's direct call,
// once by the channel's onMessage handler) and the channel-side
// response would attempt an outbound POST via
// `deliverControlEnvelope`, which is exactly the path that fails
// for browser clients.
const detach: () => void = options.inlineOnly
? () => {
/* no channel subscription to detach */
}
: attachFileHandler(state.channel, handler);
state.serverHandler = handler; state.serverHandler = handler;
state.serverDetach = detach; state.serverDetach = detach;
@@ -131,6 +206,21 @@ export function createFilesNamespace(shade: ShadeBridge): FilesNamespace {
}); });
}, },
httpClient(peer, opts) {
ensureAlive();
return createFilesHttpClient(shade, peer, opts);
},
rpcRoute(opts = {}) {
ensureAlive();
if (state.serverHandler === null) {
throw new Error(
'FilesNamespace.rpcRoute(): no handler attached. Call shade.files.serve(...) before mounting the RPC route.',
);
}
return createFilesRpcRoute(shade, state.serverHandler, opts);
},
async destroy() { async destroy() {
if (state.destroyed) return; if (state.destroyed) return;
state.destroyed = true; state.destroyed = true;

View File

@@ -32,6 +32,12 @@ export interface ShadeBridge {
/** Encrypt + send `plaintext` to `peer`; returns the wire envelope. */ /** Encrypt + send `plaintext` to `peer`; returns the wire envelope. */
send(peer: string, plaintext: string): Promise<ShadeEnvelope>; send(peer: string, plaintext: string): Promise<ShadeEnvelope>;
/**
* Decrypt an inbound envelope from `peer` and return the plaintext.
* Used by the request-response RPC route on the server side.
*/
receive(peer: string, envelope: ShadeEnvelope): Promise<string>;
/** /**
* Subscribe to incoming ratchet plaintext. Returns an unsubscribe. * Subscribe to incoming ratchet plaintext. Returns an unsubscribe.
* Handlers may be sync or async; async handlers are awaited in * Handlers may be sync or async; async handlers are awaited in
@@ -64,4 +70,25 @@ export interface ShadeBridge {
/** Optional control-envelope passthrough used by the WebRTC bridge. */ /** Optional control-envelope passthrough used by the WebRTC bridge. */
deliverControlEnvelope?(peer: string, envelope: ShadeEnvelope): Promise<void>; deliverControlEnvelope?(peer: string, envelope: ShadeEnvelope): Promise<void>;
/**
* Hand a freshly-decoded wire envelope (control or chunk) to the
* transfer engine. Required by the pull-mode HTTP client when it
* drains queued events from the server: each polled chunk / control
* envelope is dispatched here so the engine sees it just as if it
* had arrived via an HTTP POST on `/v1/transfer/...`.
*/
acceptTransferEnvelope?(from: string, env: ShadeEnvelope | Uint8Array): Promise<void>;
/**
* Configure the transfer stack. Called by the pull-mode HTTP client
* to point the browser's outgoing chunks + control envelopes at the
* server's transferQueueRoute mount. Optional because the
* server-to-server path uses a separate, app-driven configuration.
*/
configureTransfers?(opts: {
resolveBaseUrl?: (peerAddress: string) => Promise<string>;
transport?: unknown;
envelopeTransport?: unknown;
}): void;
} }

View File

@@ -0,0 +1,126 @@
/**
* Shared RPC-request construction.
*
* Both the channel-based `FileClient` (`createFileClient`) and the
* HTTP-based `FilesHttpClient` build identical `RpcRequest` envelopes
* — they differ only in *transport* (channel.send → ratchet via
* Shade.send/onMessage vs HTTP POST → ratchet via single
* request-response). This module is the single source of truth for
* the wire shape so the two clients can never drift.
*/
import type { ZodTypeAny } from 'zod';
import {
KIND_DELETE_V1,
KIND_GET_THUMBNAIL_V1,
KIND_LIST_V1,
KIND_MKDIR_V1,
KIND_MOVE_V1,
KIND_READ_V1,
KIND_STAT_V1,
KIND_WRITE_V1,
MUTATION_OPS,
type StandardOp,
} from './kinds.js';
import { generateIdempotencyKey, generateRequestId } from './correlate.js';
import { canonicalRpcBytes, hashArgs } from './canonical.js';
import type { RpcRequest } from '../schemas/envelope.js';
export const KIND_BY_OP: Record<StandardOp, string> = {
list: KIND_LIST_V1,
stat: KIND_STAT_V1,
mkdir: KIND_MKDIR_V1,
delete: KIND_DELETE_V1,
move: KIND_MOVE_V1,
read: KIND_READ_V1,
write: KIND_WRITE_V1,
getThumbnail: KIND_GET_THUMBNAIL_V1,
};
export type SignRequest = (canonicalBytes: Uint8Array) => Promise<string> | string;
export interface BuildRpcRequestOptions {
/** Address that this RPC call originates from. */
senderAddress: string;
/** RPC kind — `KIND_*_V1` constants for standard ops, `KIND_CUSTOM_V1` otherwise. */
kind: string;
/** Op classifier so mutations get an auto-generated idempotency key. */
op: StandardOp | 'custom';
/** Validated args object. The caller is responsible for `Zod.parse(args)`. */
args: unknown;
/** Caller-supplied idempotency key. Mutations get one auto-generated. */
idempotencyKey?: string;
/** Optional Ed25519-style signer over the canonical bytes. */
signRequest?: SignRequest;
}
/**
* Build a single `RpcRequest` envelope. Generates `id`, `signedAt`,
* idempotency key (mutations only), and the canonical signature.
*/
export async function buildRpcRequest(
options: BuildRpcRequestOptions,
): Promise<RpcRequest> {
const { senderAddress, kind, op, args, signRequest } = options;
const requestId = generateRequestId();
const isMutation = MUTATION_OPS.has(op);
const idempotencyKey =
options.idempotencyKey ?? (isMutation ? generateIdempotencyKey() : undefined);
const signedAt = Date.now();
let sig = 'unsigned';
if (signRequest !== undefined) {
const canonical = canonicalRpcBytes({
address: senderAddress,
signedAt,
kind,
id: requestId,
argsHash: hashArgs(args),
});
sig = await signRequest(canonical);
}
const env: RpcRequest = {
kind,
id: requestId,
args,
...(idempotencyKey !== undefined ? { idempotencyKey } : {}),
sig,
signedAt,
};
return env;
}
/**
* Helper for the typed standard ops: validates args via the supplied Zod
* schema, calls {@link buildRpcRequest}, and forwards the result.
*
* The HTTP and channel clients both call this from each op-method to
* guarantee identical wire-shape. Custom ops use {@link buildRpcRequest}
* directly because they ship `KIND_CUSTOM_V1` plus runtime-validated args.
*/
export async function buildStandardRpcRequest<TArgs>(
schema: { parse(input: unknown): TArgs },
rawArgs: unknown,
options: Omit<BuildRpcRequestOptions, 'kind' | 'op' | 'args'> & {
op: StandardOp;
},
): Promise<{ request: RpcRequest; args: TArgs }> {
const args = schema.parse(rawArgs) as TArgs;
const kind = KIND_BY_OP[options.op];
const request = await buildRpcRequest({
...options,
kind,
op: options.op,
args,
});
return { request, args };
}
/**
* Validate a returned RpcResponse `result` against the supplied schema.
* Centralised so the HTTP and channel clients fail-loudly with the same
* error type when a server returns an off-spec response.
*/
export function parseResult<T>(schema: ZodTypeAny, raw: unknown): T {
return schema.parse(raw) as T;
}

View File

@@ -0,0 +1,218 @@
/**
* Request-response RPC route for `@shade/files`.
*
* Mounts a single `POST /rpc` Hono endpoint that accepts an encrypted
* `RpcRequest` envelope, dispatches it through the file handler, and
* returns the encrypted `RpcResponse` (or `RpcError`) envelope in the
* SAME HTTP response.
*
* This is the browser-friendly transport: the server never needs to
* make outbound calls back to the client, so a browser tab — which
* cannot host an HTTP server — can fully consume `@shade/files`.
*
* ### Wire contract
*
* Request:
* ```
* POST <mount>/rpc HTTP/1.1
* Content-Type: application/octet-stream
* X-Shade-Sender-Address: <peer address>
*
* <wire-encoded ShadeEnvelope (0x01 PreKeyMessage or 0x02 RatchetMessage)
* containing JSON-encoded RpcRequest>
* ```
*
* Response (success):
* ```
* 200 OK
* Content-Type: application/octet-stream
*
* <wire-encoded ShadeEnvelope (0x02 RatchetMessage)
* containing JSON-encoded RpcResponse | RpcError>
* ```
*
* Response (transport-level failure — no session, undecryptable, etc.):
* ```
* 4xx
* Content-Type: application/json
*
* { "error": "..." }
* ```
*
* ### Symmetry with shade-auth-middleware
*
* The shape mirrors `@shade/server`'s shade-auth-middleware: an
* encrypted envelope rides the request body, the server decrypts via
* the existing ratchet session, performs the protected operation,
* and returns an encrypted envelope in the response. No bidirectional
* channel required.
*
* @see {@link createFilesHttpClient} for the matching browser client.
*/
import { Hono } from 'hono';
import { decodeEnvelope, encodeEnvelope } from '@shade/proto';
import type { ShadeBridge } from '../integration/shade-bridge.js';
import {
encodeEnvelope as encodeRpcEnvelope,
tryParseEnvelope,
} from '../protocol/envelope-codec.js';
import type { FileHandler } from './handler.js';
import type { RpcError, RpcRequest, RpcResponse } from '../schemas/envelope.js';
import { KIND_ERROR_V1 } from '../protocol/kinds.js';
export interface FilesRpcRouteOptions {
/**
* Maximum request body size in bytes. Default 1 MiB. Inline payloads
* are capped at 256 KiB by the protocol; the headroom is for
* custom-op payloads and base64 inflation.
*/
maxBodyBytes?: number;
/**
* Allow this server to accept the very first message (PreKeyMessage,
* `0x01`) over the RPC route. Disabled by default — most browser
* clients establish a session via `shade.initSessionFromBundle`
* before the first RPC. Enable when you want the RPC route to also
* be the X3DH carrier (uncommon but supported).
*/
acceptFirstMessage?: boolean;
}
const DEFAULT_MAX_BODY_BYTES = 1 * 1024 * 1024;
/**
* Build a Hono app with a single `POST /rpc` route. Mount under any
* base path: `app.route('/api/v1/shade-files', shade.files.rpcRoute())`.
*
* The `handler` must already be attached (typically via
* `shade.files.serve(handlerConfig)`); this route only ships the
* transport — it does not register a new file handler.
*/
export function createFilesRpcRoute(
shade: ShadeBridge,
handler: FileHandler,
options: FilesRpcRouteOptions = {},
): Hono {
const maxBodyBytes = options.maxBodyBytes ?? DEFAULT_MAX_BODY_BYTES;
const app = new Hono();
app.post('/rpc', async (c) => {
const senderAddress = c.req.header('X-Shade-Sender-Address');
if (senderAddress === undefined || senderAddress === '') {
return c.json({ error: 'missing X-Shade-Sender-Address header' }, 400);
}
const contentLengthHeader = c.req.header('Content-Length');
if (contentLengthHeader !== undefined) {
const contentLength = Number.parseInt(contentLengthHeader, 10);
if (Number.isFinite(contentLength) && contentLength > maxBodyBytes) {
return c.json(
{ error: `body exceeds maxBodyBytes (${contentLength} > ${maxBodyBytes})` },
413,
);
}
}
let bodyBytes: Uint8Array;
try {
const ab = await c.req.arrayBuffer();
if (ab.byteLength > maxBodyBytes) {
return c.json(
{ error: `body exceeds maxBodyBytes (${ab.byteLength} > ${maxBodyBytes})` },
413,
);
}
bodyBytes = new Uint8Array(ab);
} catch (err) {
return c.json({ error: `failed to read request body: ${(err as Error).message}` }, 400);
}
if (bodyBytes.byteLength === 0) {
return c.json({ error: 'empty request body' }, 400);
}
// Decode the wire envelope. `decodeEnvelope` handles both `0x01`
// PreKeyMessage and `0x02` RatchetMessage shapes.
let plaintext: string;
try {
const envelope = decodeEnvelope(bodyBytes);
// First-message gate: only allow `prekey` envelopes when the
// operator has explicitly opted in.
if (options.acceptFirstMessage !== true && envelope.type === 'prekey') {
return c.json(
{
error:
'PreKeyMessage envelopes are not accepted on this RPC route — establish the session first via shade.initSessionFromBundle, or set acceptFirstMessage: true',
},
400,
);
}
plaintext = await shade.receive(senderAddress, envelope);
} catch (err) {
// Decryption failure — could be no session, corrupted envelope,
// or sender address mismatch. Treat as 401 since the envelope is
// self-authenticating: a valid sender would decrypt cleanly.
return c.json({ error: `decrypt failed: ${(err as Error).message}` }, 401);
}
// Parse the plaintext as an RpcRequest.
const classified = tryParseEnvelope(plaintext);
if (classified === null) {
return c.json({ error: 'plaintext is not a valid @shade/files envelope' }, 400);
}
if (classified.kind !== 'request') {
// Cancel envelopes are silently dropped — RPC route is request/
// response only. Cancellation across HTTP is achieved via
// AbortController on the client side, not protocol-level.
if (classified.kind === 'cancel') {
handler.handleCancel(senderAddress, classified.envelope);
// No response body — the cancel was best-effort.
return new Response(null, { status: 204 });
}
return c.json(
{ error: `unexpected envelope kind on RPC route: ${classified.kind}` },
400,
);
}
const request: RpcRequest = classified.envelope;
// Dispatch through the file handler.
let result: RpcResponse | RpcError;
try {
result = await handler.handleRequest(senderAddress, request);
} catch (err) {
// Should never happen — handler.handleRequest catches its own
// errors and returns RpcError. If it didn't, that's a bug; emit
// a generic transport-level RpcError so the client can surface
// it deterministically.
result = {
kind: KIND_ERROR_V1,
id: request.id,
error: {
code: 'INTERNAL',
message: `handler raised: ${(err as Error).message}`,
},
};
}
// Encrypt the response and return it as wire bytes.
let responseBytes: Uint8Array;
try {
const responsePlaintext = encodeRpcEnvelope(result);
const responseEnvelope = await shade.send(senderAddress, responsePlaintext);
responseBytes = encodeEnvelope(responseEnvelope);
} catch (err) {
return c.json(
{ error: `failed to encrypt response: ${(err as Error).message}` },
500,
);
}
return new Response(new Blob([responseBytes as unknown as ArrayBuffer]), {
status: 200,
headers: { 'Content-Type': 'application/octet-stream' },
});
});
return app;
}

View File

@@ -0,0 +1,184 @@
import { describe, expect, test } from 'bun:test';
import { createShade } from '@shade/sdk';
import {
createPrekeyServer,
MemoryPrekeyStore,
PrekeyServerEvents,
} from '@shade/server';
import { SubtleCryptoProvider } from '@shade/crypto-web';
import { Hono } from 'hono';
const crypto = new SubtleCryptoProvider();
/**
* Concurrent-ratchet hardening tests.
*
* Reproduces the scenario described in the v4.2.0 ratchet-desync bug
* report: with the queue-drainer running on Alice and many concurrent
* `shade.send`/RPC operations against the same peer, do
* encrypt/decrypt paths share the per-peer mutex on
* `ShadeSessionManager` so that no path observes a stale ratchet
* state?
*
* If the lock coverage regresses (a future change re-introduces a
* sidekanal bypass), one of these tests will fail with
* `DecryptionError: Failed to decrypt message — wrong key or
* tampered data`.
*/
async function setupPullRig() {
const prekey = createPrekeyServer({
crypto,
store: new MemoryPrekeyStore(),
disableRateLimit: true,
events: new PrekeyServerEvents(),
});
const prekeyServer = Bun.serve({ port: 0, fetch: prekey.fetch });
const prekeyUrl = `http://localhost:${prekeyServer.port}`;
const alice = await createShade({ prekeyServer: prekeyUrl, address: 'alice' });
const bob = await createShade({ prekeyServer: prekeyUrl, address: 'bob' });
const queueRoute = await bob.transferQueueRoute({ blockMs: 500 });
await bob.files.serve({
stat: async () => ({
name: '_',
kind: 'dir' as const,
size: 0,
mtime: 0,
metadata: {},
}),
});
const rpcRoute = bob.files.rpcRoute({ acceptFirstMessage: true });
const app = new Hono();
app.route('/', queueRoute);
app.route('/', rpcRoute);
const bobServer = Bun.serve({ port: 0, fetch: app.fetch });
const baseUrl = `http://localhost:${bobServer.port}`;
const fs = alice.files.httpClient('bob', {
rpcUrl: `${baseUrl}/rpc`,
outboundQueueUrl: `${baseUrl}/queue`,
transferBaseUrl: baseUrl,
defaultTimeoutMs: 10_000,
queueBlockMs: 500,
});
return {
alice,
bob,
fs,
baseUrl,
teardown: async () => {
fs.close();
await alice.shutdown();
await bob.shutdown();
bobServer.stop();
prekeyServer.stop();
},
};
}
describe('@shade/files — concurrent ratchet under drainer', () => {
test('100 parallel httpClient RPCs while drainer runs — no DecryptionError', async () => {
const rig = await setupPullRig();
try {
// Warm-up: establishes the X3DH session (Alice → Bob first message
// is a PreKeyMessage; subsequent messages are pure ratchet).
const first = await rig.fs.stat('/');
expect(first.kind).toBe('dir');
// Fire 100 concurrent stat RPCs. Each one is a full ratchet
// round-trip: encrypt request, POST, decrypt response. They all
// contend for `manager.peerOpChains["bob"]` on Alice's side
// (encrypt + decrypt) and `manager.peerOpChains["alice"]` on
// Bob's side. Drainer is running in the background polling
// Bob's queue — its decrypt path also funnels through the same
// per-peer lock.
// 100 concurrent — minimal repro (after warm-up only).
const N = 100;
const results = await Promise.allSettled(
Array.from({ length: N }, () => rig.fs.stat('/')),
);
const failures = results.filter((s) => s.status === 'rejected') as Array<
PromiseRejectedResult
>;
if (failures.length > 0) {
const sample = failures.slice(0, 1).map((f) => String(f.reason));
throw new Error(`${failures.length}/${N} concurrent RPCs failed: ${sample[0]}`);
}
} finally {
await rig.teardown();
}
}, 30_000);
test('parallel shade.send + drainer + RPCs — ratchet stays in sync', async () => {
const rig = await setupPullRig();
try {
// Establish session via one warm-up RPC.
await rig.fs.stat('/');
// Subscribe Bob to inbound plaintext from Alice — when Alice's
// raw `shade.send` plaintext arrives, Bob echoes a reply back
// through `shade.send` + `deliverControlEnvelope`, which the
// pull-mode envelope transport enqueues for Alice's drainer.
// This injects extra inbound traffic into Alice's drainer in
// parallel with her ongoing RPCs.
const echoes: string[] = [];
rig.bob.onMessage(async (from, plaintext) => {
if (from !== 'alice') return;
if (!plaintext.startsWith('ping:')) return;
echoes.push(plaintext);
const reply = `pong:${plaintext.slice('ping:'.length)}`;
const env = await rig.bob.send('alice', reply);
await rig.bob.deliverControlEnvelope('alice', env);
});
const inboundDrained: string[] = [];
rig.alice.onMessage((from, plaintext) => {
if (from !== 'bob') return;
if (plaintext.startsWith('pong:')) inboundDrained.push(plaintext);
});
// Mix three concurrent workloads against the same peer:
// - 50 inline file RPCs through httpClient (encrypt + decrypt)
// - 50 raw `shade.send` deliveries via control envelope
// - drainer pulling Bob's responses + echoes
const N = 50;
const rpcs = Array.from({ length: N }, () => rig.fs.stat('/'));
const sends = Array.from({ length: N }, async (_, i) => {
const env = await rig.alice.send('bob', `ping:${i}`);
await rig.alice.deliverControlEnvelope('bob', env);
});
const settled = await Promise.allSettled([...rpcs, ...sends]);
const failures = settled.filter((s) => s.status === 'rejected') as Array<
PromiseRejectedResult
>;
if (failures.length > 0) {
const sample = failures.slice(0, 3).map((f) => String(f.reason));
throw new Error(
`${failures.length}/${settled.length} concurrent ops failed: ${sample.join(' | ')}`,
);
}
// Give Bob's queue + Alice's drainer a beat to drain pongs back.
// Echoes round-trip Alice → Bob (control envelope) → Bob's
// onMessage → Bob.send + deliver (queue) → Alice's drainer →
// Alice's onMessage. We just verify some make it back without
// any DecryptionError surfacing.
const deadline = Date.now() + 5_000;
while (inboundDrained.length < N && Date.now() < deadline) {
await new Promise((r) => setTimeout(r, 50));
}
// Don't gate on every echo arriving — the long-poll cadence and
// bun's serve/abort timing can lag a few. We only care that the
// ratchet didn't desync; if it had, every subsequent op would
// throw DecryptionError above.
expect(echoes.length).toBe(N);
expect(inboundDrained.length).toBeGreaterThan(0);
} finally {
await rig.teardown();
}
}, 30_000);
});

View File

@@ -0,0 +1,208 @@
import { describe, expect, test } from 'bun:test';
import { createShade } from '@shade/sdk';
import {
createPrekeyServer,
MemoryPrekeyStore,
PrekeyServerEvents,
} from '@shade/server';
import { SubtleCryptoProvider } from '@shade/crypto-web';
import { Hono } from 'hono';
const crypto = new SubtleCryptoProvider();
/**
* Stand up the full pull-mode rig:
* - Prekey server (for X3DH)
* - Bob: file handler + rpcRoute + transferQueueRoute, all on one server
* - Alice: httpClient with outboundQueueUrl + transferBaseUrl wired
*
* Returns Alice's `FileClient`, which speaks browser-style: ONE base URL,
* no inbound listener, streams supported via long-poll.
*/
async function setupPullRig(opts: {
bobHandler: Parameters<NonNullable<Awaited<ReturnType<typeof createShade>>['files']>['serve']>[0];
}) {
const prekey = createPrekeyServer({
crypto,
store: new MemoryPrekeyStore(),
disableRateLimit: true,
events: new PrekeyServerEvents(),
});
const prekeyServer = Bun.serve({ port: 0, fetch: prekey.fetch });
const prekeyUrl = `http://localhost:${prekeyServer.port}`;
const alice = await createShade({ prekeyServer: prekeyUrl, address: 'alice' });
const bob = await createShade({ prekeyServer: prekeyUrl, address: 'bob' });
// Bob: queue-route FIRST (configures bob's transports), then files.serve.
const queueRoute = await bob.transferQueueRoute({ blockMs: 1_500 });
await bob.files.serve(opts.bobHandler);
const rpcRoute = bob.files.rpcRoute({ acceptFirstMessage: true });
const app = new Hono();
app.route('/', queueRoute);
app.route('/', rpcRoute);
const bobServer = Bun.serve({ port: 0, fetch: app.fetch });
const baseUrl = `http://localhost:${bobServer.port}`;
const fs = alice.files.httpClient('bob', {
rpcUrl: `${baseUrl}/rpc`,
outboundQueueUrl: `${baseUrl}/queue`,
transferBaseUrl: baseUrl,
defaultTimeoutMs: 10_000,
queueBlockMs: 1_000,
});
return {
alice,
bob,
fs,
baseUrl,
teardown: async () => {
fs.close();
await alice.shutdown();
await bob.shutdown();
bobServer.stop();
prekeyServer.stop();
},
};
}
describe('@shade/files HTTP RPC — pull-mode streams', () => {
test('streamed read (4 MiB) via long-poll queue', async () => {
const payload = new Uint8Array(4 * 1024 * 1024);
for (let i = 0; i < payload.length; i++) payload[i] = (i * 97) & 0xff;
const rig = await setupPullRig({
bobHandler: {
read: async () => {
// Return the payload as a streamed read so the rpc-handler
// promotes it via the streams-bridge into a transfer.
const stream = new ReadableStream<Uint8Array>({
start(controller) {
const CHUNK = 256 * 1024;
for (let off = 0; off < payload.byteLength; off += CHUNK) {
controller.enqueue(payload.slice(off, Math.min(off + CHUNK, payload.byteLength)));
}
controller.close();
},
});
// Need a precomputed sha256 for streamed reads. Use the
// crypto provider's sha256 directly.
const digest = new Uint8Array(await globalThis.crypto.subtle.digest('SHA-256', payload));
const sha256Hex = Array.from(digest, (b) => b.toString(16).padStart(2, '0')).join('');
return {
kind: 'streams' as const,
stream,
size: payload.byteLength,
sha256: sha256Hex,
contentType: 'application/octet-stream',
};
},
},
});
try {
const result = await rig.fs.read('/big.bin');
expect(result.kind).toBe('streams');
if (result.kind !== 'streams') return;
// Drain the stream and compare.
const reader = result.stream.getReader();
const got = new Uint8Array(payload.byteLength);
let offset = 0;
while (true) {
const { value, done } = await reader.read();
if (done) break;
if (value !== undefined) {
got.set(value, offset);
offset += value.byteLength;
}
}
reader.releaseLock();
await result.done();
expect(offset).toBe(payload.byteLength);
// Compare in 64KiB strides for speed.
let mismatch = -1;
for (let i = 0; i < payload.byteLength; i++) {
if (got[i] !== payload[i]) {
mismatch = i;
break;
}
}
expect(mismatch).toBe(-1);
} finally {
await rig.teardown();
}
}, 30_000);
test('streamed read fails with clear error when outboundQueueUrl is omitted', async () => {
const rig = await setupPullRig({
bobHandler: {
read: async () => {
const stream = new ReadableStream<Uint8Array>({
start(c) {
c.enqueue(new Uint8Array(512 * 1024));
c.close();
},
});
const digest = new Uint8Array(await globalThis.crypto.subtle.digest('SHA-256', new Uint8Array(512 * 1024)));
const sha256Hex = Array.from(digest, (b) => b.toString(16).padStart(2, '0')).join('');
return {
kind: 'streams' as const,
stream,
size: 512 * 1024,
sha256: sha256Hex,
};
},
},
});
// Tear down the rig's drainer so we can construct an inline-only client
rig.fs.close();
const inlineOnly = rig.alice.files.httpClient('bob', {
rpcUrl: `${rig.baseUrl}/rpc`,
defaultTimeoutMs: 10_000,
});
try {
await expect(inlineOnly.read('/big.bin')).rejects.toThrow(/streamed read/);
} finally {
inlineOnly.close();
await rig.teardown();
}
}, 15_000);
test('long-poll returns empty events on idle timeout', async () => {
const rig = await setupPullRig({
bobHandler: {
stat: async () => ({
name: '_',
kind: 'dir' as const,
size: 0,
mtime: 0,
metadata: {},
}),
},
});
try {
// Direct poll without any pending events — should return after blockMs.
const start = Date.now();
const res = await fetch(`${rig.baseUrl}/queue`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Shade-Sender-Address': 'alice',
},
body: JSON.stringify({ since: 0, blockMs: 500 }),
});
expect(res.status).toBe(200);
const body = (await res.json()) as { events: unknown[]; nextSince: number };
expect(body.events).toHaveLength(0);
expect(Date.now() - start).toBeGreaterThanOrEqual(400);
} finally {
await rig.teardown();
}
}, 10_000);
});

View File

@@ -0,0 +1,321 @@
import { describe, expect, test } from 'bun:test';
import { createShade } from '@shade/sdk';
import {
createPrekeyServer,
MemoryPrekeyStore,
PrekeyServerEvents,
} from '@shade/server';
import { SubtleCryptoProvider } from '@shade/crypto-web';
const crypto = new SubtleCryptoProvider();
/**
* Stand up a prekey server + two Shades + Bob's file handler + RPC route
* mounted on Bun.serve, then return Alice's HTTP-only `FileClient`.
*
* Mirrors the request-response setup a browser client would use against a
* Bun-style server.
*/
async function setupHttpRig(opts: {
bobHandler: Parameters<NonNullable<Awaited<ReturnType<typeof createShade>>['files']>['serve']>[0];
}) {
// 1. Prekey server.
const prekeyEvents = new PrekeyServerEvents();
const prekey = createPrekeyServer({
crypto,
store: new MemoryPrekeyStore(),
disableRateLimit: true,
events: prekeyEvents,
});
const prekeyServer = Bun.serve({ port: 0, fetch: prekey.fetch });
const prekeyUrl = `http://localhost:${prekeyServer.port}`;
// 2. Two Shades. Alice plays the browser client (no transferRoute);
// Bob is the server.
const alice = await createShade({ prekeyServer: prekeyUrl, address: 'alice' });
const bob = await createShade({ prekeyServer: prekeyUrl, address: 'bob' });
// 3. Bob: register file handler (HTTP-only — no streams) + mount
// the RPC route.
await bob.files.serve(opts.bobHandler, { inlineOnly: true });
const rpcRoute = bob.files.rpcRoute({ acceptFirstMessage: true });
const bobServer = Bun.serve({ port: 0, fetch: rpcRoute.fetch });
const rpcUrl = `http://localhost:${bobServer.port}/rpc`;
// 5. Alice: build the HTTP-only file client.
const fs = alice.files.httpClient('bob', { rpcUrl, defaultTimeoutMs: 5000 });
return {
alice,
bob,
fs,
rpcUrl,
teardown: async () => {
await alice.shutdown();
await bob.shutdown();
bobServer.stop();
prekeyServer.stop();
},
};
}
describe('@shade/files HTTP RPC — round-trip', () => {
test('list → mkdir → stat → write inline → read inline → delete via httpClient', async () => {
interface VfsEntry {
kind: 'file' | 'dir';
bytes?: Uint8Array;
contentType?: string;
}
const vfs = new Map<string, VfsEntry>([
['/', { kind: 'dir' }],
['/photos', { kind: 'dir' }],
]);
const rig = await setupHttpRig({
bobHandler: {
list: async (ctx) => {
const prefix = ctx.path.endsWith('/') ? ctx.path : `${ctx.path}/`;
const entries = Array.from(vfs.entries())
.filter(([p]) => p.startsWith(prefix) && p !== ctx.path && !p.slice(prefix.length).includes('/'))
.map(([p, e]) => ({
name: p.slice(prefix.length) || p,
kind: e.kind,
size: e.bytes?.byteLength ?? 0,
mtime: 0,
metadata: {},
}));
return { entries, hasMore: false };
},
stat: async (ctx) => {
const e = vfs.get(ctx.path);
if (!e) throw new (await import('../../src/index.js')).NotFoundError(`stat ${ctx.path}`);
return {
name: ctx.path.split('/').pop() ?? ctx.path,
kind: e.kind,
size: e.bytes?.byteLength ?? 0,
mtime: 0,
metadata: {},
...(e.contentType !== undefined ? { contentType: e.contentType } : {}),
};
},
mkdir: async (ctx) => {
vfs.set(ctx.path, { kind: 'dir' });
return { entry: { name: ctx.path.split('/').pop() ?? ctx.path, kind: 'dir' as const, size: 0, mtime: 0, metadata: {} } };
},
delete: async (ctx) => {
if (!vfs.has(ctx.path)) {
throw new (await import('../../src/index.js')).NotFoundError(`delete ${ctx.path}`);
}
vfs.delete(ctx.path);
return { deletedCount: 1 };
},
read: async (ctx) => {
const e = vfs.get(ctx.path);
if (!e || e.kind !== 'file' || !e.bytes) {
throw new (await import('../../src/index.js')).NotFoundError(`read ${ctx.path}`);
}
// Omit sha256 — dispatcher computes it from the bytes.
return {
kind: 'inline' as const,
bytes: e.bytes,
...(e.contentType !== undefined ? { contentType: e.contentType } : {}),
};
},
write: async (ctx) => {
if (ctx.args.content.kind !== 'inline') {
throw new (await import('../../src/index.js')).ConflictError('streams not supported in this test handler');
}
vfs.set(ctx.args.path, {
kind: 'file',
bytes: ctx.args.content.bytes,
...(ctx.args.contentType !== undefined ? { contentType: ctx.args.contentType } : {}),
});
return {
entry: {
name: ctx.args.path.split('/').pop() ?? ctx.args.path,
kind: 'file' as const,
size: ctx.args.content.bytes.byteLength,
mtime: 0,
metadata: {},
...(ctx.args.contentType !== undefined ? { contentType: ctx.args.contentType } : {}),
},
};
},
},
});
try {
// list
const listed = await rig.fs.list('/');
expect(listed.entries.map((e) => e.name).sort()).toContain('photos');
// mkdir
await rig.fs.mkdir('/docs');
const stat = await rig.fs.stat('/docs');
expect(stat.kind).toBe('dir');
// write inline
const payload = new TextEncoder().encode('hello browser-friendly world');
const writeResult = await rig.fs.write('/docs/greeting.txt', payload, {
contentType: 'text/plain',
});
expect(writeResult.entry.size).toBe(payload.byteLength);
// read inline
const readResult = await rig.fs.read('/docs/greeting.txt');
expect(readResult.kind).toBe('inline');
if (readResult.kind === 'inline') {
expect(new TextDecoder().decode(readResult.bytes)).toBe('hello browser-friendly world');
expect(readResult.contentType).toBe('text/plain');
}
// delete
const del = await rig.fs.delete('/docs/greeting.txt');
expect(del.deletedCount).toBe(1);
// stat the deleted path → typed NotFoundError
const { NotFoundError } = await import('../../src/index.js');
await expect(rig.fs.stat('/docs/greeting.txt')).rejects.toBeInstanceOf(NotFoundError);
} finally {
await rig.teardown();
}
});
test('streamed write (> 256 KiB) is rejected with a clear error', async () => {
const rig = await setupHttpRig({
bobHandler: {
write: async () => ({
entry: { name: 'unused', kind: 'file' as const, size: 0, mtime: 0, metadata: {} },
}),
},
});
try {
const big = new Uint8Array(257 * 1024);
const { ConflictError } = await import('../../src/index.js');
await expect(rig.fs.write('/big.bin', big)).rejects.toBeInstanceOf(ConflictError);
} finally {
await rig.teardown();
}
});
test('rpcRoute() throws when no handler is attached', async () => {
// Don't call shade.files.serve(...) — rpcRoute() should refuse.
const prekeyEvents = new PrekeyServerEvents();
const prekey = createPrekeyServer({
crypto,
store: new MemoryPrekeyStore(),
disableRateLimit: true,
events: prekeyEvents,
});
const prekeyServer = Bun.serve({ port: 0, fetch: prekey.fetch });
const bob = await createShade({
prekeyServer: `http://localhost:${prekeyServer.port}`,
address: 'bob',
});
try {
expect(() => bob.files.rpcRoute()).toThrow(/no handler attached/);
} finally {
await bob.shutdown();
prekeyServer.stop();
}
});
test('missing X-Shade-Sender-Address header → 400', async () => {
const rig = await setupHttpRig({
bobHandler: {
stat: async () => ({
name: '_', kind: 'dir' as const, size: 0, mtime: 0, metadata: {},
}),
},
});
try {
const res = await fetch(rig.rpcUrl, {
method: 'POST',
body: new Uint8Array([0]),
});
expect(res.status).toBe(400);
const body = (await res.json()) as { error: string };
expect(body.error).toMatch(/X-Shade-Sender-Address/);
} finally {
await rig.teardown();
}
});
test('empty body → 400', async () => {
const rig = await setupHttpRig({
bobHandler: {
stat: async () => ({
name: '_', kind: 'dir' as const, size: 0, mtime: 0, metadata: {},
}),
},
});
try {
const res = await fetch(rig.rpcUrl, {
method: 'POST',
headers: { 'X-Shade-Sender-Address': 'alice' },
});
expect(res.status).toBe(400);
} finally {
await rig.teardown();
}
});
test('garbage body → 401 decrypt failure', async () => {
const rig = await setupHttpRig({
bobHandler: {
stat: async () => ({
name: '_', kind: 'dir' as const, size: 0, mtime: 0, metadata: {},
}),
},
});
try {
const res = await fetch(rig.rpcUrl, {
method: 'POST',
headers: { 'X-Shade-Sender-Address': 'alice' },
body: new Uint8Array([0x02, 0xff, 0xff, 0xff]),
});
// 400 from envelope decode failure or 401 from decrypt failure.
expect([400, 401]).toContain(res.status);
} finally {
await rig.teardown();
}
});
test('body past maxBodyBytes → 413', async () => {
const prekeyEvents = new PrekeyServerEvents();
const prekey = createPrekeyServer({
crypto,
store: new MemoryPrekeyStore(),
disableRateLimit: true,
events: prekeyEvents,
});
const prekeyServer = Bun.serve({ port: 0, fetch: prekey.fetch });
const bob = await createShade({
prekeyServer: `http://localhost:${prekeyServer.port}`,
address: 'bob',
});
await bob.files.serve(
{
stat: async () => ({
name: '_', kind: 'dir' as const, size: 0, mtime: 0, metadata: {},
}),
},
{ inlineOnly: true },
);
const route = bob.files.rpcRoute({ maxBodyBytes: 1024 });
const server = Bun.serve({ port: 0, fetch: route.fetch });
try {
const big = new Uint8Array(2048);
const res = await fetch(`http://localhost:${server.port}/rpc`, {
method: 'POST',
headers: { 'X-Shade-Sender-Address': 'alice' },
body: big,
});
expect(res.status).toBe(413);
} finally {
await bob.shutdown();
server.stop();
prekeyServer.stop();
}
});
});

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/inbox-server", "name": "@shade/inbox-server",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/inbox", "name": "@shade/inbox",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/key-transparency", "name": "@shade/key-transparency",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/keychain", "name": "@shade/keychain",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/observability", "name": "@shade/observability",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/observer", "name": "@shade/observer",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/proto", "name": "@shade/proto",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/recovery", "name": "@shade/recovery",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/sdk", "name": "@shade/sdk",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -31,6 +31,8 @@ import {
type TransferHandle, type TransferHandle,
type TransferOptions, type TransferOptions,
type TransferSummary, type TransferSummary,
type OutboundQueue as OutboundQueueLike,
type QueuedEventInput,
} from '@shade/transfer'; } from '@shade/transfer';
import type { Hono } from 'hono'; import type { Hono } from 'hono';
import { BackgroundTasks } from './background.js'; import { BackgroundTasks } from './background.js';
@@ -151,6 +153,8 @@ export class Shade {
private controlChannel: ShadeControlChannel | null = null; private controlChannel: ShadeControlChannel | null = null;
private peerBaseUrlResolver: ((peerAddress: string) => Promise<string>) | null = null; private peerBaseUrlResolver: ((peerAddress: string) => Promise<string>) | null = null;
private envelopeOutboxes: ControlEnvelopeTransport | null = null; private envelopeOutboxes: ControlEnvelopeTransport | null = null;
private transferTransportOverride: ITransferTransport | null = null;
private transferQueue: OutboundQueueLike | null = null;
// `@shade/files` namespace, lazy + memoized. // `@shade/files` namespace, lazy + memoized.
private filesNamespace: FilesNamespace | null = null; private filesNamespace: FilesNamespace | null = null;
@@ -746,12 +750,52 @@ export class Shade {
* HTTP POSTs to `<base>/v1/transfer/control`). * HTTP POSTs to `<base>/v1/transfer/control`).
*/ */
configureTransfers(opts: { configureTransfers(opts: {
resolveBaseUrl: (peerAddress: string) => Promise<string>; /**
* Resolver for the peer's HTTP base URL (used by the default
* `ShadeTransferHttpTransport` to POST chunks). Optional when a
* custom `transport` and `envelopeTransport` are supplied — e.g.
* for pull-mode browser servers (`@shade/files transferQueueRoute`)
* which never POST chunks anywhere.
*/
resolveBaseUrl?: (peerAddress: string) => Promise<string>;
/**
* Override the chunk-level transport. Defaults to
* `ShadeTransferHttpTransport` (HTTP POSTs per chunk) when
* `resolveBaseUrl` is supplied. Required when `resolveBaseUrl`
* is omitted.
*/
transport?: ITransferTransport;
/**
* Override the control-envelope transport. Defaults to HTTP POSTs
* to `<base>/v1/transfer/control` when `resolveBaseUrl` is
* supplied. Required when `resolveBaseUrl` is omitted.
*/
envelopeTransport?: ControlEnvelopeTransport; envelopeTransport?: ControlEnvelopeTransport;
}): void { }): void {
this.peerBaseUrlResolver = opts.resolveBaseUrl; if (opts.resolveBaseUrl === undefined) {
this.envelopeOutboxes = if (opts.transport === undefined || opts.envelopeTransport === undefined) {
opts.envelopeTransport ?? new HttpEnvelopeTransport(opts.resolveBaseUrl, this.address); throw new Error(
'configureTransfers: resolveBaseUrl is required unless both `transport` and `envelopeTransport` are supplied (e.g. for pull-mode queue servers).',
);
}
this.peerBaseUrlResolver = async () => {
throw new Error(
'resolveBaseUrl was not configured — this Shade is in queue/pull mode and does not POST chunks. Configure a custom transport instead.',
);
};
} else {
this.peerBaseUrlResolver = opts.resolveBaseUrl;
}
this.transferTransportOverride = opts.transport ?? null;
if (opts.envelopeTransport !== undefined) {
this.envelopeOutboxes = opts.envelopeTransport;
} else if (opts.resolveBaseUrl !== undefined) {
this.envelopeOutboxes = new HttpEnvelopeTransport(opts.resolveBaseUrl, this.address);
} else {
throw new Error(
'configureTransfers: envelopeTransport is required when resolveBaseUrl is omitted.',
);
}
} }
/** /**
@@ -898,6 +942,109 @@ export class Shade {
return (await this.engine()).onIncomingTransfer(handler); return (await this.engine()).onIncomingTransfer(handler);
} }
/**
* Mount the **pull-mode** transfer routes on a Hono app. Mount under
* any base path: `app.route('/api/v1/shade-files', shade.transferQueueRoute())`.
*
* Configures this Shade instance to queue all outbound chunks +
* control envelopes per peer instead of POSTing them. Browser-style
* receivers drain the queue via long-polling — no inbound HTTP
* listener required on the receiver.
*
* Routes mounted (relative to the base path):
* POST /queue — long-poll the per-peer outbound queue
* POST /v1/transfer/:streamId/chunk — receive incoming chunks (browser → server)
* GET /v1/transfer/:streamId/state — resume-state lookup
* POST /v1/transfer/control — receive incoming control envelopes
* GET /v1/transfer/health — peer reachability probe
*
* **Idempotent**: calling twice returns a fresh `Hono` app each
* time but reuses the underlying queue + transport (so the engine
* stays single).
*
* **Ordering**: must be called **before** `shade.files.serve(...)`
* (or any other path that builds the engine), because configuring
* the queue transport mutates the transfer stack. Calling after the
* engine is built throws.
*/
async transferQueueRoute(opts: TransferQueueRouteOptions = {}): Promise<Hono> {
if (this.transferEngine !== null && this.transferTransportOverride === null) {
throw new Error(
'transferQueueRoute(): the transfer engine has already been built with the default HTTP transport. Call transferQueueRoute() before any upload()/onIncomingTransfer()/configureTransfers().',
);
}
const { OutboundQueue, QueueTransferTransport } = await import('@shade/transfer');
if (this.transferQueue === null) {
this.transferQueue = new OutboundQueue({
...(opts.maxEventsPerPeer !== undefined ? { maxEventsPerPeer: opts.maxEventsPerPeer } : {}),
...(opts.idleEvictionMs !== undefined ? { idleEvictionMs: opts.idleEvictionMs } : {}),
});
}
if (this.transferTransportOverride === null) {
const queueTransport = new QueueTransferTransport(this.transferQueue);
const queueEnvelopeTransport = new QueueEnvelopeTransport(this.transferQueue);
this.configureTransfers({
transport: queueTransport,
envelopeTransport: queueEnvelopeTransport,
});
}
const queue = this.transferQueue;
const blockMs = opts.blockMs ?? 30_000;
const maxBlockMs = opts.maxBlockMs ?? 55_000;
const engine = await this.engine();
const { createTransferRoutes, PermissiveAuthenticator } = await import('@shade/transfer');
const app = await createTransferRoutes(engine, {
authenticator: PermissiveAuthenticator,
});
app.post('/v1/transfer/control', async (c) => {
const senderAddress = c.req.header('X-Shade-Sender-Address');
if (senderAddress === undefined || senderAddress === '') {
return c.json({ error: 'missing X-Shade-Sender-Address' }, 400);
}
const ab = await c.req.arrayBuffer();
const bytes = new Uint8Array(ab);
try {
await this.acceptTransferEnvelope(senderAddress, bytes);
} catch (err) {
return c.json({ error: (err as Error).message }, 400);
}
return c.json({ ok: true });
});
// Long-poll endpoint.
app.post('/queue', async (c) => {
const senderAddress = c.req.header('X-Shade-Sender-Address');
if (senderAddress === undefined || senderAddress === '') {
return c.json({ error: 'missing X-Shade-Sender-Address' }, 400);
}
let body: { since?: unknown; blockMs?: unknown };
try {
body = (await c.req.json()) as { since?: unknown; blockMs?: unknown };
} catch {
return c.json({ error: 'invalid JSON body' }, 400);
}
const since = typeof body.since === 'number' && Number.isFinite(body.since) ? body.since : 0;
const requestedBlockMs =
typeof body.blockMs === 'number' && Number.isFinite(body.blockMs)
? Math.max(0, Math.min(maxBlockMs, body.blockMs))
: blockMs;
// Bun-side short-circuit if the request was aborted while we
// were holding the long-poll. AbortSignal from the request body
// is already surfaced via `c.req.raw.signal` in Hono.
const events = await queue.drain(senderAddress, since, requestedBlockMs, c.req.raw.signal);
return c.json({
events: events.map((e) => ({
id: e.id,
timestampMs: e.timestampMs,
kind: e.kind,
bytesB64: bytesToBase64Std(e.bytes),
...(e.kind === 'chunk' ? { meta: e.meta } : {}),
})),
nextSince: events.length > 0 ? events[events.length - 1]!.id : since,
});
});
return app;
}
/** /**
* Mount the receiver-side HTTP routes on a Hono app. Mount under any * Mount the receiver-side HTTP routes on a Hono app. Mount under any
* base path: `app.route('/shade', await shade.transferRoute())`. * base path: `app.route('/shade', await shade.transferRoute())`.
@@ -1019,16 +1166,23 @@ export class Shade {
); );
} }
this.controlChannel = new ShadeControlChannel(this, this.envelopeOutboxes); this.controlChannel = new ShadeControlChannel(this, this.envelopeOutboxes);
const httpTransport: ITransferTransport = new ShadeTransferHttpTransport({ let transport: ITransferTransport;
resolveBaseUrl: this.peerBaseUrlResolver,
authenticator: await this.makeAuthenticator(),
});
let transport: ITransferTransport = httpTransport;
let webrtcRuntime: ShadeWebRtcRuntime | null = null; let webrtcRuntime: ShadeWebRtcRuntime | null = null;
if (this.webrtcConfig !== null) { if (this.transferTransportOverride !== null) {
webrtcRuntime = await this.buildWebRtcRuntime(this.webrtcConfig, httpTransport); // Custom transport (queue, in-memory, custom adapter) — used as-is.
transport = webrtcRuntime.fallback; // WebRTC fallback only attaches when the default HTTP transport is
// active because WebRTC's `MultiTransportFallback` is HTTP-shaped.
transport = this.transferTransportOverride;
} else {
const httpTransport: ITransferTransport = new ShadeTransferHttpTransport({
resolveBaseUrl: this.peerBaseUrlResolver,
authenticator: await this.makeAuthenticator(),
});
transport = httpTransport;
if (this.webrtcConfig !== null) {
webrtcRuntime = await this.buildWebRtcRuntime(this.webrtcConfig, httpTransport);
transport = webrtcRuntime.fallback;
}
} }
this.transferEngine = new TransferEngine({ this.transferEngine = new TransferEngine({
@@ -1256,6 +1410,53 @@ function parseChunkHeader(bytes: Uint8Array): {
return { streamId, laneId, seq }; return { streamId, laneId, seq };
} }
// ─── Queue-mode (pull) envelope transport ─────────────────────
/**
* Configuration for {@link Shade.transferQueueRoute}. All fields are
* optional with sensible production defaults.
*/
export interface TransferQueueRouteOptions {
/**
* Long-poll timeout in milliseconds. Server holds the request open
* up to this long before returning an empty `events` array. Default
* 30_000.
*/
blockMs?: number;
/**
* Hard cap on long-poll timeout (clamps client-supplied `blockMs`).
* Default 55_000 — under typical reverse-proxy idle thresholds (60s
* on most CDNs).
*/
maxBlockMs?: number;
/**
* Per-peer ring-buffer size. When the queue is full, oldest events
* are dropped on enqueue. Receivers detect the gap via missing
* sequence numbers and re-resume from `since=0`. Default 1000.
*/
maxEventsPerPeer?: number;
/**
* Drop a peer's queue + reject pending pollers after this much
* silence. Default 10 minutes. Setting to `0` disables idle-eviction.
*/
idleEvictionMs?: number;
}
/**
* `ControlEnvelopeTransport` that enqueues outbound envelopes into an
* `OutboundQueue` for browser-style receivers to long-poll. Mirrors
* `HttpEnvelopeTransport` shape (one `send(peer, envelope)` method);
* the difference is the destination — local queue, not remote HTTP.
*/
class QueueEnvelopeTransport implements ControlEnvelopeTransport {
constructor(private readonly queue: OutboundQueueLike) {}
async send(peerAddress: string, envelope: ShadeEnvelope): Promise<void> {
const bytes = encodeEnvelope(envelope);
const event: QueuedEventInput = { kind: 'envelope', bytes };
this.queue.enqueue(peerAddress, event);
}
}
// ─── Default HTTP envelope transport ────────────────────────── // ─── Default HTTP envelope transport ──────────────────────────
class HttpEnvelopeTransport implements ControlEnvelopeTransport { class HttpEnvelopeTransport implements ControlEnvelopeTransport {

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/server", "name": "@shade/server",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/storage-encrypted", "name": "@shade/storage-encrypted",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/storage-postgres", "name": "@shade/storage-postgres",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/storage-sqlite", "name": "@shade/storage-sqlite",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/streams", "name": "@shade/streams",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/transfer", "name": "@shade/transfer",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -7,6 +7,7 @@ export * from './transport/memory.js';
export * from './transport/http-transport.js'; export * from './transport/http-transport.js';
export * from './transport/ws-transport.js'; export * from './transport/ws-transport.js';
export * from './transport/multi-fallback.js'; export * from './transport/multi-fallback.js';
export * from './transport/queue-transport.js';
export * from './engine.js'; export * from './engine.js';
export { export {
createTransferRoutes, createTransferRoutes,

View File

@@ -0,0 +1,332 @@
/**
* Per-peer outbound queue + queue-transport for browser-friendly
* pull-mode streams.
*
* The default `ShadeTransferHttpTransport` POSTs each chunk directly
* to the receiver's `/v1/transfer/<streamId>/chunk` route. That
* requires the receiver to host an HTTP server, which a browser tab
* cannot. `QueueTransferTransport` flips the direction: it queues
* chunks per peer and lets the receiver pull them via a long-poll
* endpoint.
*
* The companion `OutboundQueue` data structure is plain server-side
* state — wired up by `@shade/files`'s `transferQueueRoute()` (and any
* future consumer) to expose the long-poll surface and feed envelopes
* + chunks into the browser receiver.
*/
import type {
ChunkAck,
ChunkSendOptions,
ITransferTransport,
TransferResumeState,
} from './transport.js';
import { TransferTransportError } from '../errors.js';
/**
* Discriminated union of items the queue ships to the browser
* receiver. Both kinds carry **wire-encoded bytes** of an envelope —
* the receiver decodes via `decodeEnvelope` (control envelopes) or
* forwards directly to `engine.receiveChunk` (chunk envelopes).
*/
export type QueuedEvent =
| {
id: number;
timestampMs: number;
kind: 'envelope';
/** Wire-encoded `0x02` ratchet envelope (or `0x01` first-message). */
bytes: Uint8Array;
}
| {
id: number;
timestampMs: number;
kind: 'chunk';
/** Wire-encoded `0x11` stream-chunk envelope. */
bytes: Uint8Array;
meta: {
streamId: string;
laneId: number;
seq: number;
};
};
/** Caller-supplied shape for {@link OutboundQueue.enqueue} — the queue assigns `id` + `timestampMs`. */
export type QueuedEventInput =
| { kind: 'envelope'; bytes: Uint8Array }
| {
kind: 'chunk';
bytes: Uint8Array;
meta: { streamId: string; laneId: number; seq: number };
};
export interface OutboundQueueOptions {
/**
* Maximum events held per peer. When the queue is full, the oldest
* unacked event is dropped on next enqueue. Default 1000 — at the
* default chunk size (256 KiB plaintext) this caps a single peer's
* outbound buffer at ~256 MiB. Tune up for fewer/bigger streams,
* down for many concurrent small flows.
*/
maxEventsPerPeer?: number;
/**
* After a peer has not polled for this long, the queue's events are
* dropped and any pending waiters are released. Default 10 minutes.
* Setting to `0` disables idle-eviction.
*/
idleEvictionMs?: number;
}
const DEFAULT_MAX_EVENTS = 1000;
const DEFAULT_IDLE_EVICTION_MS = 10 * 60 * 1000;
interface PendingWaiter {
resolve(events: QueuedEvent[]): void;
reject(err: Error): void;
/**
* The waiter's `since` cursor — only events with `id > since` should
* be delivered when this waiter is resolved. Without this, an
* enqueue that arrives while a poller is waiting would replay
* already-processed events, causing the receiver to double-decrypt
* (and corrupt ratchet state).
*/
since: number;
timer: ReturnType<typeof setTimeout>;
abortHandler?: () => void;
signal?: AbortSignal;
}
interface PerPeerState {
nextId: number;
events: QueuedEvent[];
waiters: PendingWaiter[];
lastTouchedMs: number;
}
/**
* Per-peer monotonic event log with long-poll semantics.
*
* `enqueue` appends; `drain` returns all events with `id > since`,
* blocking up to `blockMs` if there are none. `since`-based pagination
* is the resume mechanism: a client crashing mid-stream restarts with
* its last-processed id and the queue replays everything after it
* (subject to `maxEventsPerPeer` retention).
*/
export class OutboundQueue {
private peers = new Map<string, PerPeerState>();
private readonly maxEvents: number;
private readonly idleEvictionMs: number;
private evictTimer: ReturnType<typeof setTimeout> | null = null;
private destroyed = false;
constructor(opts: OutboundQueueOptions = {}) {
this.maxEvents = opts.maxEventsPerPeer ?? DEFAULT_MAX_EVENTS;
this.idleEvictionMs = opts.idleEvictionMs ?? DEFAULT_IDLE_EVICTION_MS;
if (this.idleEvictionMs > 0) this.scheduleEviction();
}
/** Append an event and wake any waiters for that peer. */
enqueue(peer: string, ev: QueuedEventInput): QueuedEvent {
if (this.destroyed) throw new Error('OutboundQueue: destroyed');
const state = this.getOrCreate(peer);
const event: QueuedEvent =
ev.kind === 'chunk'
? {
id: state.nextId++,
timestampMs: Date.now(),
kind: 'chunk',
bytes: ev.bytes,
meta: ev.meta,
}
: {
id: state.nextId++,
timestampMs: Date.now(),
kind: 'envelope',
bytes: ev.bytes,
};
state.events.push(event);
state.lastTouchedMs = Date.now();
// Cap: drop oldest. Lost events trigger receiver-side resume from
// last polled id; the @shade/transfer engine handles missing seqs
// by re-sending on resume.
while (state.events.length > this.maxEvents) state.events.shift();
// Wake each waiter with events newer than ITS OWN `since`. Using a
// shared snapshot from `since=0` would replay events the waiter has
// already processed once a fresh enqueue arrived mid-poll, which on
// the receiver side double-dispatches an envelope into shade.receive
// → manager.decrypt and consumes the same skipped-key twice (the
// second dispatch corrupts the ratchet chain).
if (state.waiters.length > 0) {
const waiters = state.waiters.splice(0);
for (const w of waiters) {
clearTimeout(w.timer);
if (w.abortHandler !== undefined && w.signal !== undefined) {
w.signal.removeEventListener('abort', w.abortHandler);
}
const wDrained = this.collect(state, w.since);
w.resolve(wDrained);
}
}
return event;
}
/**
* Drain events with `id > since`. If none are available, block up
* to `blockMs` until any arrive. `signal` cancels the wait early.
*/
async drain(
peer: string,
since: number,
blockMs: number,
signal?: AbortSignal,
): Promise<QueuedEvent[]> {
if (this.destroyed) throw new Error('OutboundQueue: destroyed');
const state = this.getOrCreate(peer);
state.lastTouchedMs = Date.now();
const ready = this.collect(state, since);
if (ready.length > 0 || blockMs <= 0) return ready;
if (signal?.aborted) return [];
return await new Promise<QueuedEvent[]>((resolve, reject) => {
const timer = setTimeout(() => {
const idx = state.waiters.indexOf(waiter);
if (idx >= 0) state.waiters.splice(idx, 1);
if (waiter.abortHandler !== undefined && waiter.signal !== undefined) {
waiter.signal.removeEventListener('abort', waiter.abortHandler);
}
// Empty drain on timeout — that's the "no new events" signal.
resolve([]);
}, blockMs);
const waiter: PendingWaiter = { resolve, reject, since, timer };
if (signal !== undefined) {
const handler = () => {
const idx = state.waiters.indexOf(waiter);
if (idx >= 0) state.waiters.splice(idx, 1);
clearTimeout(timer);
resolve([]);
};
signal.addEventListener('abort', handler, { once: true });
waiter.abortHandler = handler;
waiter.signal = signal;
}
state.waiters.push(waiter);
});
}
/** Drop a peer's queue + reject waiters. */
evict(peer: string): void {
const state = this.peers.get(peer);
if (state === undefined) return;
this.peers.delete(peer);
for (const w of state.waiters) {
clearTimeout(w.timer);
if (w.abortHandler !== undefined && w.signal !== undefined) {
w.signal.removeEventListener('abort', w.abortHandler);
}
w.reject(new Error('OutboundQueue: peer evicted'));
}
}
/** Peer-specific snapshot for diagnostics. */
stats(peer: string): { eventCount: number; nextId: number; waiters: number } | null {
const state = this.peers.get(peer);
if (state === undefined) return null;
return {
eventCount: state.events.length,
nextId: state.nextId,
waiters: state.waiters.length,
};
}
/** Tear everything down. Pending waiters are rejected. */
destroy(): void {
if (this.destroyed) return;
this.destroyed = true;
if (this.evictTimer !== null) clearTimeout(this.evictTimer);
for (const peer of [...this.peers.keys()]) this.evict(peer);
}
// ─── internals ──────────────────────────────────────────────
private getOrCreate(peer: string): PerPeerState {
let state = this.peers.get(peer);
if (state === undefined) {
state = {
nextId: 1,
events: [],
waiters: [],
lastTouchedMs: Date.now(),
};
this.peers.set(peer, state);
}
return state;
}
private collect(state: PerPeerState, since: number): QueuedEvent[] {
if (state.events.length === 0) return [];
return state.events.filter((e) => e.id > since);
}
private scheduleEviction(): void {
const interval = Math.max(60_000, Math.floor(this.idleEvictionMs / 4));
this.evictTimer = setTimeout(() => {
if (this.destroyed) return;
const cutoff = Date.now() - this.idleEvictionMs;
for (const [peer, state] of this.peers.entries()) {
if (state.lastTouchedMs < cutoff) this.evict(peer);
}
this.scheduleEviction();
}, interval);
(this.evictTimer as unknown as { unref?: () => void }).unref?.();
}
}
/**
* Chunk transport that enqueues into an {@link OutboundQueue} instead
* of POSTing.
*
* Returns an optimistic `ChunkAck` immediately because the queue *is*
* the delivery — the receiver polls and dispatches. Browser receivers
* cannot synchronously confirm receipt before the next chunk; the
* engine's stream-protocol catches dropped chunks at finish-time
* integrity check, and chunk-resume restarts the lane from the last
* polled `since`.
*/
export class QueueTransferTransport implements ITransferTransport {
constructor(private readonly queue: OutboundQueue) {}
async probe(_peer: string): Promise<void> {
// The queue is local. Reachability is "is there a poller?" which
// is decided by `idleEvictionMs`. We don't synchronously check
// here; the engine retries via `withRetry` on `sendChunk` errors.
}
async sendChunk(
peer: string,
streamId: string,
laneId: number,
seq: number | bigint,
bytes: Uint8Array,
options?: ChunkSendOptions,
): Promise<ChunkAck> {
if (options?.signal?.aborted) {
throw new TransferTransportError('sendChunk aborted by caller');
}
const seqNum = typeof seq === 'bigint' ? Number(seq) : seq;
this.queue.enqueue(peer, {
kind: 'chunk',
bytes,
meta: { streamId, laneId, seq: seqNum },
});
return { lastSeq: seqNum };
}
async fetchResumeState(
_peer: string,
_streamId: string,
): Promise<TransferResumeState | null> {
// Pull-mode receivers report resume state by re-polling with the
// `since` cursor they last successfully processed; the queue does
// not need to query the receiver. Return null so the engine
// restarts from seq 0 (deterministic), and the queue replays from
// `since=0` if the client reconnects fresh.
return null;
}
}

View File

@@ -0,0 +1,60 @@
import { describe, expect, test } from 'bun:test';
import { OutboundQueue } from '../src/index.js';
/**
* Regression coverage for the long-poll waiter `since` cursor.
*
* The bug being guarded against: when `enqueue` woke a pending
* `drain` waiter, it used a `since=0` snapshot and replayed every
* event that had ever been queued — including the ones the waiter
* had already processed in a previous poll. Downstream the queue
* fed `Shade.acceptTransferEnvelope`, so the duplicate replay
* dispatched the same envelope into `manager.decrypt` twice. The
* second decrypt consumed an already-used skipped key, fell into
* the stale-counter branch of `ratchetDecrypt`, and corrupted the
* Double Ratchet receive chain — surfacing as
* `DecryptionError: wrong key or tampered data` on every
* subsequent message.
*/
describe('OutboundQueue — waiter since cursor', () => {
test('mid-poll enqueue must not replay events the waiter already saw', async () => {
const queue = new OutboundQueue({ idleEvictionMs: 0 });
const peer = 'alice';
const e1 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([1]) });
const e2 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([2]) });
// First poll drains both events (no blocking — they're already there).
const first = await queue.drain(peer, 0, 0);
expect(first.map((e) => e.id)).toEqual([e1.id, e2.id]);
// Now the waiter polls past the last seen id. It blocks because
// there are no events newer than `since`. Concurrently a fresh
// event gets enqueued — that's the path the bug fired on.
const blockMs = 5_000;
const polling = queue.drain(peer, e2.id, blockMs);
// Yield so `drain` actually parks on the waiter list before we
// race the enqueue against it.
await Promise.resolve();
const e3 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([3]) });
const woken = await polling;
// Pre-fix: would resolve with [e1, e2, e3] (a `since=0` snapshot
// drained verbatim). Post-fix: only the events newer than the
// waiter's recorded `since` come through.
expect(woken.map((e) => e.id)).toEqual([e3.id]);
});
test('parked waiter at the head still gets the new event when others have polled past it', async () => {
const queue = new OutboundQueue({ idleEvictionMs: 0 });
const peer = 'alice';
const e1 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([1]) });
// A waiter that parks past the head — there are no events newer
// than e1.id, so it has to block.
const polling = queue.drain(peer, e1.id, 5_000);
await Promise.resolve();
const e2 = queue.enqueue(peer, { kind: 'envelope', bytes: new Uint8Array([2]) });
const woken = await polling;
expect(woken.map((e) => e.id)).toEqual([e2.id]);
});
});

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/transport-bridge", "name": "@shade/transport-bridge",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/transport-webrtc", "name": "@shade/transport-webrtc",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/transport", "name": "@shade/transport",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@shade/widgets", "name": "@shade/widgets",
"version": "4.0.2", "version": "4.2.1",
"type": "module", "type": "module",
"main": "src/index.ts", "main": "src/index.ts",
"types": "src/index.ts", "types": "src/index.ts",