5 Commits

Author SHA1 Message Date
8c606ad498 release(v4.8.2): per-from receive serialization + per-connection bridge dedup
Some checks failed
Test / test (push) Has been cancelled
Two interlocking robustness fixes for the duplicate-fan-out / first-contact
class of failures Prism reported.

1. `Shade.receive(from, env)` now queues its `manager.decrypt` step
   per `from` so concurrent dispatches can't race the SessionManager
   ratchet or the StorageProvider (sqlite "database is locked", IDB
   transaction conflicts). User message handlers run *outside* the
   queue so streams + file-RPC's nested `shade.receive` calls don't
   self-deadlock.

2. Bridge WS + SSE handlers now run a per-connection bounded msgId
   LRU as defense-in-depth against any flushTo re-entry (event-storm,
   future refactor). Pending-flush chains are wrapped in `.catch(() =>
   {})` so a transient `ws.send` rejection no longer poisons the
   connection's flush loop.

Tests: storming `inbox.blob_stored` 10× per PUT yields exactly one WS/
SSE frame; 8 concurrent `bob.receive('alice', envelope)` calls keep
the ratchet intact and never surface "database is locked".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 12:13:46 +02:00
680d6386f3 release(v4.8.1): SHADE_DISABLE_RATE_LIMIT env var for single-tenant deploys
Plumbing fix only — both createPrekeyRoutes and createInboxRoutes
already accepted disableRateLimit; standalone.ts just didn't read
the env. Now SHADE_DISABLE_RATE_LIMIT=1 turns off IP rate-limits on
every prekey + inbox route, with a WARN log on startup so operators
see it.

Single-tenant deployments only — multi-tenant relays must leave it
unset. Documented in docs/DEPLOYMENT.md.

Reported by Prism: ~6 pair attempts/hour from a single dev IP +
the sidecar's register call tripped the 5/hour REGISTER_LIMIT every
dev iteration.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 00:55:57 +02:00
1fb59a7076 release(v4.8.0): sender-fingerprint attribution + Inbox.start race fix
Two unblocking changes for first-contact flows.

Sender attribution: relay captures shortHash(senderSigningKey) at
PUT time (after signature verification, no new trust surface) and
surfaces it on bridge push (IncomingMessage.from) + inbox-fetch
(FetchedBlob.from) + DecryptHandler raw arg. Apps receiving a prekey
envelope from a never-before-seen peer can now bootstrap X3DH via
shade.receive('fp:<hex>', env) — pre-4.8 the wire envelope didn't
authenticate the sender and there was no out-of-band hint to use.
Idempotent ALTER TABLE migrations for SQLite + Postgres add a
sender_fp TEXT column; legacy rows surface as from=undefined
(inter-version compat).

Inbox.start() race: pre-4.8 start() called register() fire-and-forget
AND schedulePoll(0) synchronously, so the first poll on a fresh
address often beat the register HTTP RTT and got SHADE_NOT_FOUND.
start() now defers; register() success kicks schedulePoll(0). Manual
tick() is unaffected (deliberate user action, no gating).

Both reported by Prism. Tests cover all five acceptance criteria
from the sender-attribution request (PUT capture, bridge surface,
fetch surface, inter-version compat, end-to-end pair smoke) plus
the three from the race-fix request.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 00:11:59 +02:00
594992a183 release(v4.7.0): peer-presence events for instant BroadcastChannel revoke
Adds the bridge-connection-lifecycle signal that closes Prism's
~45s revoke window down to one server→client round-trip (~50ms).

Server (`@shade/inbox-server`):
- `inbox.peer_connected` / `inbox.peer_disconnected` events on the
  0↔1 boundary across WS + SSE bridges. Long-poll deliberately not
  tracked (every poll boundary would flap; push transports are also
  the only ones where instant revoke matters).
- `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE
  during fallback handover) into one connect/disconnect pair.
- `GET /v1/bridge/presence` SSE endpoint: signed query with
  `kind: 'presence'`, `watched: string[]`; on open streams a
  per-address snapshot, then change frames filtered server-side.
  MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as
  a peer-bridge connection.
- `createBridgeRoutes` now returns `{ app, websocket, presence }`.

Client (`@shade/transport-bridge`):
- `PresenceBridge.subscribe({ watch, onPresenceChange })` →
  `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer
  mutate via reconnect with a fresh signed query.
- `signPresenceQuery` helper for non-PresenceBridge consumers.

Tests cover all four acceptance criteria from the Prism request:
server-event smoke, online→offline subscription, address scoping
(carol invisible to a [alice]-only sub), reconnect, plus an
addPeer/removePeer regression.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
8746571d2a release(v4.6.1): bind globalThis.fetch in browser-receiver-sensitive call sites
Browsers' Window.fetch is a WebIDL bound operation; storing it as
this.fetchImpl / this.fetchFn and calling via the instance receiver
threw "Illegal invocation" on the first request. Bind once at
construction in InboxClient, LongPollBridge, and SseBridge. Reported
by Prism (multi-device E2EE terminal), blocking every browser
consumer of the v4.6 transport stack on inbox.start() / bridge.connect().

WsBridge unaffected (uses WebSocket). Node/Bun fetch tolerates a free
receiver, so the bug never surfaced server-side — added regression
tests that install a strict-receiver globalThis.fetch to catch the
issue without an actual browser harness.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:00:58 +02:00
51 changed files with 2386 additions and 105 deletions

View File

@@ -5,6 +5,425 @@ All notable changes to Shade are documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [4.8.2] — 2026-05-08 — Per-`from` decrypt serialization + per-connection bridge dedup
Two interlocking robustness fixes for the first-contact / duplicate-fan-out
class of failures Prism reported. Either fix on its own would help; together
they make the receiver path tolerant of any combination of relay duplicates
and concurrent dispatchers.
**(1) `Shade.receive(from, env)` now serializes its ratchet/storage
step per `from`.** The send path has had a per-address `encryptChains`
mutex since V1 — receive did not. Concurrent decrypts for the same peer
raced the `SessionManager` ratchet (mutated in place) and the
`StorageProvider` (which is not required to be a concurrent-safe
writer — `bun:sqlite` throws `database is locked`, IndexedDB throws
transaction conflicts). Symptom in production: a single relay PUT that
fans out 8× over a WS bridge gets dispatched as 8 parallel
`shade.receive` calls; one wins the X3DH prekey race, the other 7 fail
with `database is locked` or `one-time prekey not found: <id>`, and the
post-decrypt side effects (`markPeerVerified`,
`BroadcastChannel.addMember`, paired-reply `inbox.send`) get lost in
the rubble. The decrypt step is now chained off a per-`from` promise
queue. Crucially, the user-facing **message handlers run outside the
queue** — streams + file-RPC issue nested `shade.receive` calls for the
same peer from inside their handlers (e.g. `stream-end` arrives while a
write-RPC is still waiting on chunks), and holding the queue across the
handler would self-deadlock. Only the atomic ratchet+storage step is
protected.
**(2) Bridge handlers (WS + SSE) now run a per-connection msgId
LRU dedup.** Cursor-based delivery already de-duplicates in the happy
path, but the gate is a defense-in-depth against any subtle re-entry of
`flushTo` (event-storm, future refactor, fallback-timer race). The chain
that drives flush is now also wrapped in `.catch(() => {})` so a
transient `ws.send` / SSE write rejection doesn't poison every future
push on the connection.
Both reported by Prism (multi-device E2EE terminal). Wave-3 pair
handshake is unblocked even when the receiver runs multiple bridges or
the relay double-fires `inbox.blob_stored`.
### Fixed
#### `@shade/sdk` — `Shade.receive` per-`from` serialization
- `Shade` gains a private `decryptChains: Map<string, Promise<unknown>>`
mirroring the existing `encryptChains` on the send path.
- `Shade.receive(from, env)` chains its `manager.decrypt(from, env)`
call off the prior decrypt promise for the same `from`. The
post-decrypt control-plaintext check and user `messageHandlers` run
*outside* the chain so nested `shade.receive` calls from inside a
handler don't self-deadlock (streams + file-RPC depend on this).
- The stored chain is `decryptPromise.catch(() => undefined)` so a
rejection in one decrypt doesn't sabotage the next; this caller
still sees its own rejection through the original promise.
- External signature unchanged.
#### `@shade/inbox-server` — bridge per-connection msgId dedup
- New internal `DeliveredIdLru` (4096-entry bounded set, FIFO eviction)
per WS / SSE connection. `flushTo` skips emit when a row's `msgId` is
already in the LRU. Long-poll handlers don't need it (each request is
isolated).
- `pendingFlushPromise` chains in both WS and SSE handlers now
terminate in `.catch(() => {})` so a transient emit failure doesn't
silently kill the connection's flush loop.
### Tests
- `packages/shade-transport-bridge/tests/bridge.test.ts` — new
"Bridge dedup" describe block: storms `inbox.blob_stored` 10× for one
PUT and asserts WS / SSE both deliver exactly one frame.
- `packages/shade-sdk/tests/sdk.test.ts` — new
"concurrent receive(from, env) for same `from` does not race the
ratchet" exercises 8 parallel `bob.receive('alice', env)` for the
same envelope and asserts:
1. at least one fulfills with the right plaintext;
2. no rejection mentions `database is locked`;
3. the next legitimate message still decrypts (ratchet intact).
### Migration
None. Drop-in. Bridges and receivers behave identically on non-
duplicate paths; the new gates only kick in when a duplicate would
otherwise have been emitted / dispatched.
## [4.8.1] — 2026-05-08 — `SHADE_DISABLE_RATE_LIMIT` env var for single-tenant deploys
The standalone server's `routes.ts` and `inbox-server`'s
`createInboxRoutes` already accepted a `disableRateLimit?: boolean`
option, but the standalone entry just didn't read it from environment.
Self-hosted single-tenant deploys (Prism's relay is a typical case —
only Prism PC clients + their paired browsers) tripped the
`REGISTER_LIMIT` (5/hour per IP) every dev iteration: ~6 pair attempts
in an hour from the same IP plus the sidecar's register call killed
the dev loop until the bucket refilled (~1 token per 12 minutes).
Reported by Prism. Two-line plumbing fix: `standalone.ts` now reads
`SHADE_DISABLE_RATE_LIMIT=1` and forwards `disableRateLimit` to both
`createPrekeyRoutes` and `createInboxRoutes`.
### Added
#### `@shade/server`
- `SHADE_DISABLE_RATE_LIMIT=1` env var disables IP rate-limits on every
prekey + inbox route in `standalone.ts`. Logged as a `WARN` on startup
(`SHADE_DISABLE_RATE_LIMIT=1 — IP rate limits OFF on prekey + inbox
routes`) so operators see it in stderr/log aggregation.
- **Single-tenant deployments only** — multi-tenant relays must leave
this unset. The rate-limit defends multi-tenant relays against abuse;
flipping it off is appropriate for self-hosted single-team setups
where every caller is a known client. Documented in
[`docs/DEPLOYMENT.md`](./docs/DEPLOYMENT.md) under "Environment variable
reference".
### Tests
- `packages/shade-server/tests/rate-limit.test.ts` — the existing
"register endpoint rate-limits per IP" test verifies the default-on
path; a new sister test exercises
`createPrekeyServer({ disableRateLimit: true })` and confirms 12
consecutive register calls from the same IP all return 200 (no 429).
The env-var → option conversion in `standalone.ts` is a one-liner
verified by inspection.
### Migration
None. Default is unchanged (rate limits stay ON). Self-hosted
single-tenant operators add `SHADE_DISABLE_RATE_LIMIT=1` to their
deployment env to flip it off.
## [4.8.0] — 2026-05-08 — Sender-fingerprint attribution + `Inbox.start()` race fix
Two unblocking changes for first-contact flows. First, the relay now
captures the sender's signing-key fingerprint at PUT time and surfaces
it on every downstream delivery — bridge push (`IncomingMessage.from`)
and inbox-fetch response (`FetchedBlob.from`). Without it, an app
receiving a prekey envelope from a never-before-seen peer cannot
decrypt it: `shade.receive(from, env)` requires a sender address and
the wire envelope itself doesn't authenticate the sender. The
fingerprint is the same 8-byte hex of SHA-256(senderSigningKey) that
`IncomingMessage.from` was already documented as carrying; the field
just wasn't populated.
Second, `Inbox.start()` no longer races register vs the first poll.
Pre-fix, a fresh address calling `start()` saw the very first
`/v1/inbox/{addr}/fetch` POST race the register HTTP RTT and return
`SHADE_NOT_FOUND` — confusing 404 in DevTools, ~30s gap until the next
scheduled poll, and inbox-fetch silently dark for the gap (bridge push
covered for it, which is why this slipped through). `start()` now
defers the first poll; `register()` success kicks `schedulePoll(0)`.
Both reported by Prism (multi-device E2EE terminal). Wave-3 pair
handshake is unblocked: web POSTs pair frame to PC inbox, PC's
`onIncoming` gets `raw.from = "fp:<hex>"`, calls
`shade.receive('fp:<hex>', env)`, parses plaintext, learns real
address, sends paired-reply.
### Added
#### `@shade/inbox-server`
- `InboxStore.putBlob({ ..., senderFp? })` — store interface accepts an
optional 8-byte hex fingerprint. `MemoryInboxStore`,
`SqliteInboxStore` (`@shade/storage-sqlite`), and `PostgresInboxStore`
(`@shade/storage-postgres`) all persist + return it.
- `InboxStore.fetchBlobs(...)` rows expose `senderFp?: string`.
Undefined for legacy rows persisted by a pre-4.8 relay.
- `POST /v1/inbox/:address` route computes `shortHash(senderSigningKey)`
after the sender's signature is verified and forwards it to
`store.putBlob({ ..., senderFp })`. The signature verification path
authorizes the same fingerprint that gets persisted — no new trust
surface.
- `POST /v1/inbox/:address/fetch` response includes `from` per blob
when the row has a fingerprint. Absent on legacy rows.
- Bridge endpoints (`/v1/bridge/{stream,poll,ws}`) now populate
`BridgeWireMessage.from` from the row's `senderFp`. The
`transport-bridge` wire format already accepted `from`; v4.7 just
never filled it.
#### `@shade/inbox`
- `FetchedBlob.from?: string` — relay-supplied sender fingerprint hint,
parsed from the fetch response.
- `DecryptHandler` raw arg gains `from?: string`. Apps that ignore it
keep working unchanged (back-compat: the field is optional).
### Fixed
#### `@shade/inbox` — `Inbox.start()` register/poll race
`start()` no longer schedules the first poll synchronously alongside
the fire-and-forget `register()`. Instead, `register()` success kicks
`schedulePoll(0)`, so the first poll fires after the server has
acknowledged the address. Already-registered instances (where the
local `this.registered` flag is true at `start()` time, e.g. after a
restart that hydrated state) get an immediate poll as before.
### Storage migrations
Idempotent ALTER TABLE for live deployments:
- **SQLite** (`@shade/storage-sqlite`): on open, the store does
`PRAGMA table_info(inbox_blobs)` and runs
`ALTER TABLE inbox_blobs ADD COLUMN sender_fp TEXT` if the column is
missing. Fresh databases get the column from the `CREATE TABLE IF
NOT EXISTS` directly.
- **Postgres** (`@shade/storage-postgres`): `ensureInboxServerTables`
runs `ALTER TABLE shade_inbox_blobs ADD COLUMN IF NOT EXISTS
sender_fp TEXT`.
Both leave existing rows with `sender_fp = NULL`. The fetch path emits
`from` only when the column is non-empty, so legacy blobs surface as
`from: undefined` (acceptance criterion (4): inter-version compat).
### Tests
- `packages/shade-inbox/tests/client.test.ts`:
- **Race fix**: spy fetch records the order of `register` and `fetch`
requests; first `fetch` (if any) must follow `register`. Pre-fix
the recording fetch threw "fetch fired before register completed
(race not fixed)".
- **Fetch attribution**: `FetchedBlob.from` matches
`SHA-256(senderSigningKey)[:8]` in hex.
- **DecryptHandler propagation**: `raw.from` arrives in the app's
handler.
- `packages/shade-transport-bridge/tests/bridge.test.ts`: same
fingerprint regression for SSE, WS, and long-poll bridges
(`IncomingMessage.from` non-empty + matches the expected digest).
- `packages/shade-storage-sqlite/tests/sqlite-inbox-store.test.ts`:
- senderFp round-trip through put + fetch.
- senderFp omitted on put → fetched row has `senderFp: undefined`.
- **Pre-4.8 schema migration**: open a DB seeded with a v4.7
`inbox_blobs` schema (no `sender_fp` column), reopen via
`SqliteInboxStore`, verify the legacy row survives + new writes
carry the new field.
### Migration
None required for app code. Existing handlers that ignore
`raw.from` / `IncomingMessage.from` keep working unchanged. Apps that
want sender-attributed first-contact:
```ts
inbox.onIncoming(async (raw) => {
const tentativeAddr = raw.from ? `fp:${raw.from}` : null;
if (!tentativeAddr) return null; // legacy relay; drop
const env = decodeEnvelope(raw.ciphertext);
const plaintext = await shade.receive(tentativeAddr, env);
// pair frame announces real address; reconcile fp:<hex> → real
return null;
});
```
For Prism specifically: drop the `await this.inbox.register()`
workaround in `apps/web/src/shade/transport.ts` and
`packages/shade-sidecar/src/transport.ts`. `inbox.start()` on 4.8+
no longer races and the explicit pre-register is redundant.
## [4.7.0] — 2026-05-07 — Peer-presence events for instant `BroadcastChannel` revoke
`BroadcastChannel.removeMember` (v4.6) is the right primitive for revoking a
paired peer's sender-key membership when, say, a tab closes or a laptop
locks — but until now there was no signal saying "this peer's bridge just
went away". Apps had to fall back to client-side heartbeats:
`apps/web/src/shade/heartbeat.ts`-style 20s pings + a 10s GC sweep, with a
~45s worst-case revoke window. For a terminal-mirroring product whose
threat model includes *"someone takes the unattended laptop"*, 45s of
legitimate broadcast access for the attacker is too long.
This release surfaces the bridge-connection-lifecycle signal that
`createBridgeRoutes` already had internally. The inbox event bus now emits
`inbox.peer_connected` / `inbox.peer_disconnected` on the 0↔1 boundary
across WS + SSE bridges, and a new `/v1/bridge/presence` SSE endpoint plus
the `PresenceBridge` client class let any authenticated SDK subscribe to
presence transitions for a watcher-declared address list. The SDK glue
collapses to ~5 lines:
```ts
const sub = await new PresenceBridge({ baseUrl, crypto, signingPrivateKey, address }).subscribe({
watch: paired_peers,
onPresenceChange: (e) => {
if (e.status === 'offline') void channel.removeMember(e.address);
},
});
```
Reported by Prism — collapses Prism's wave-3 heartbeat-based revoke from
~45s to ~50ms (one network round-trip) for the overwhelmingly common case
of a clean WS close.
### Added
#### `@shade/inbox-server`
- `InboxServerEventMap` gains two new event names:
- `inbox.peer_connected``{ address, bridgeKind: 'ws' | 'sse' }`
fires when an address transitions from zero to ≥1 active push-bridge
connections.
- `inbox.peer_disconnected``{ address, bridgeKind, reason: 'closed' | 'error' }`
— fires when the last push-bridge connection for the address closes.
- New `PresenceTracker` class (`packages/shade-inbox-server/src/presence.ts`)
— per-address connection-count map; emits transitions into a wired
`InboxServerEvents`. Two parallel bridges (WS + SSE during a fallback
handover) collapse into one `peer_connected` / `peer_disconnected`
pair so consumers don't see flicker.
- `createBridgeRoutes` now returns `{ app, websocket, presence }` so
operators / tests can read the live presence map. A `presenceTracker`
option lets multiple route mounts share state.
- New `GET /v1/bridge/presence` SSE endpoint:
- Auth: signed query `{ address, kind: 'presence', watched: string[],
signedAt, signature }` against the watcher's registered owner key.
`kind: 'presence'` is bound into the canonical signed payload to
prevent cross-endpoint replay against `/v1/bridge/{stream,poll,ws}`.
- On open: emits one `event: presence` SSE frame per watched address
with the current online/offline snapshot.
- On change: streams `{ address, status, at, via: 'ws'|'sse' }` frames
filtered server-side to the watcher's address list.
- Subscribing does NOT itself count as a peer-bridge connection — a
PresenceBridge open will not make the watcher appear online to
other watchers.
- `MAX_WATCHED_ADDRESSES = 64` per subscription.
#### `@shade/transport-bridge`
- New `PresenceBridge` class with `subscribe({ watch, onPresenceChange,
onError? })` returning `{ addPeer, removePeer, watching, unsubscribe }`.
- `addPeer` / `removePeer` mutate the watched set by aborting the
current SSE connection so the run loop reopens with a fresh signed
query. Mutations are expected to be rare (only on pair / unpair) so
the brief reconnect gap is acceptable.
- Auto-reconnect with exponential backoff (250ms → 10s, same defaults
as `SseBridge`); `disableAutoReconnect: true` for tests.
- `signPresenceQuery` helper exported from `@shade/transport-bridge/auth`
for non-PresenceBridge consumers (manual EventSource, observability
scrapers, etc.).
### Why long-poll is NOT tracked
A long-poll client toggles in/out of `/v1/bridge/poll` every few seconds,
and treating each request boundary as a presence transition would
dominate the event stream with flapping. Push transports are also the
only ones where a ~50ms revoke window matters — long-poll users are
already on a slow path. Apps that need presence over long-poll continue
to use client-side heartbeats.
### Tests
- `packages/shade-transport-bridge/tests/bridge.test.ts` — four blocks
covering all acceptance criteria from the request:
- **(1)** `WsBridge.connect()` then `disconnect()` → operator's
`events.on(...)` sees `inbox.peer_connected` then
`inbox.peer_disconnected` with `address: 'alice'`, `bridgeKind: 'ws'`.
- **(2A)** Bob subscribes presence on `[alice]`; alice opens a
WsBridge → bob's `onPresenceChange` fires `online` within 2s.
- **(3)** Bob's `[alice]` subscription must NOT receive frames for
an unrelated `carol` address opening her own bridge.
- **(4)** Alice's bridge reopens after a drop → bob sees `online`
again on the same subscription.
- Plus an `addPeer` / `removePeer` regression that verifies the
reconnect-on-mutation path delivers a fresh snapshot for the new
address and stops delivering for the removed one.
### Migration
None. Strict additive — existing `InboxServerEvents` consumers keep
working unchanged. `createBridgeRoutes`'s return type added a
`presence` field; destructuring code that names only `app, websocket`
keeps compiling.
For Prism specifically: drop the wave-3 heartbeat module
(`apps/web/src/shade/heartbeat.ts`) on the PC sidecar and replace with
a `PresenceBridge` subscription on the paired-peer set. Keep the
heartbeat as a network-partition fallback if you want a belt-and-
braces revoke story; with presence-events the worst-case revoke window
drops from ~45s to one server→PC round-trip.
## [4.6.1] — 2026-05-07 — Browser `fetch` receiver lost in `Inbox` and HTTP bridges
Every browser consumer of the v4.6.0 transport stack crashed on the
*first* network call with:
```
Failed to execute 'fetch' on 'Window': Illegal invocation
```
`@shade/inbox`, `@shade/transport-bridge` (`SseBridge`, `LongPollBridge`)
each cached the default `globalThis.fetch` reference as a class property
and later invoked it as `this.fetchImpl(url, …)` / `this.fetchFn(url, …)`.
The browser's `fetch` is a WebIDL bound operation: calling it as a
method on any object other than the `Window` rejects with the error
above. Node/Bun `fetch` tolerates a free receiver, so the bug only
manifested in actual browsers and slipped through the SDK test suite.
Reported by Prism (multi-device E2EE terminal) — `inbox.start()` →
`register()` → `client.register()` → `this.fetchImpl(url, …)` threw on
the first `/v1/inbox/register` POST, so `transport.start()` never sent
the pair handshake and the web side timed out after 30s with "PC did
not reply".
### Fixed
#### `@shade/inbox` — `InboxClient` constructor
`fetchImpl` is now `(options.fetch ?? globalThis.fetch).bind(globalThis)`.
A consumer-supplied `options.fetch` is bound too — a custom fetch with
its own receiver requirements must bind itself; binding to `globalThis`
is otherwise a no-op for free functions.
#### `@shade/transport-bridge` — `LongPollBridge` and `SseBridge` constructors
Same binding fix in both. `WsBridge` was unaffected (uses `WebSocket`).
### Tests
- `packages/shade-inbox/tests/client.test.ts` — installs a strict-receiver
`globalThis.fetch` that mimics the WebIDL "Illegal invocation" check,
constructs `InboxClient` with no `fetch` override, runs `register()`,
and asserts the strict fetch saw `globalThis` as `this`. Pre-fix this
throws; post-fix it passes.
- `packages/shade-transport-bridge/tests/bridge.test.ts` — same regression
for both `LongPollBridge.connect()` (probe call) and `SseBridge.connect()`
(open-once call).
### Migration
None. Existing `options.fetch` overrides keep working unchanged. Apps
shipping a workaround like
```ts
new Inbox({ ..., fetch: globalThis.fetch.bind(globalThis) });
```
can drop the `.bind(globalThis)` and the redundant `fetch:` option once
they're on `4.6.1`.
## [4.6.0] — 2026-05-07 — Broadcast channels (Signal sender-keys for one-to-many fan-out)
Prism's PC desktop is the *sender* in a one-to-many fan-out — one PTY

View File

@@ -81,6 +81,7 @@ Tables will be created automatically with the `shade_server_*` prefix, so they c
| `SHADE_CLEANUP_INTERVAL_HOURS` | `24` | Cleanup cycle interval |
| `SHADE_LOG_LEVEL` | `info` | `debug` / `info` / `warn` / `error` |
| `SHADE_OTEL_ENABLED` | unset | Set to `1`/`true` to enable OpenTelemetry tracing on `withTracer()`-configured deployments. See [`observability.md`](./observability.md). |
| `SHADE_DISABLE_RATE_LIMIT` | unset | Set to `1` to disable IP rate-limits on every prekey + inbox route. **Single-tenant deployments only** — multi-tenant relays must leave this unset to keep the abuse defenses on. |
## Health and observability

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/cli",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/cli.ts",
"bin": {

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/core",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/crypto-web",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/dashboard",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"scripts": {
"dev": "vite",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/files",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/inbox-server",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -38,8 +38,15 @@ import {
import { verifyPayload, validateAddress } from '@shade/server';
import type { InboxStore } from './store.js';
import type { InboxServerEvents } from './events.js';
import { PresenceTracker, type TrackedBridgeKind } from './presence.js';
export type BridgeKind = 'stream' | 'poll' | 'ws';
/**
* Wire-protocol kind tag for `/v1/bridge/presence`. Distinct from
* `BridgeKind` because the canonical signed payload is shaped
* differently (`watched: string[]` instead of `since: number`).
*/
export type PresenceKind = 'presence';
export interface BridgeRoutesOptions {
store: InboxStore;
@@ -60,6 +67,13 @@ export interface BridgeRoutesOptions {
* Default 1_000.
*/
fallbackPollIntervalMs?: number;
/**
* Inject an existing presence tracker. Useful when multiple
* `createBridgeRoutes` calls need to share state (e.g. mounting the
* routes under several hostnames in a single process). When omitted,
* the bridge auto-creates an internal tracker bound to `events`.
*/
presenceTracker?: PresenceTracker;
}
interface VerifiedBridgeRequest {
@@ -68,6 +82,13 @@ interface VerifiedBridgeRequest {
since: number;
}
interface VerifiedPresenceRequest {
/** The watcher's address (signer of the request). */
address: string;
/** Addresses whose presence the watcher is asking to track. */
watched: string[];
}
/**
* Build the bridge Hono router and a paired Bun-WebSocket handler.
*
@@ -80,6 +101,8 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): {
app: Hono;
/** Pass to `Bun.serve({ websocket })`. Undefined if Bun adapter is missing. */
websocket: unknown;
/** Live presence tracker. Tests + observers can read it; routes update it. */
presence: PresenceTracker;
} {
const app = new Hono();
const pageLimit = opts.pageLimit ?? 50;
@@ -87,6 +110,7 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): {
const longPollDefault = opts.longPollTimeoutMs ?? 25_000;
const longPollMax = opts.longPollMaxTimeoutMs ?? 55_000;
const fallbackPollIntervalMs = opts.fallbackPollIntervalMs ?? 1_000;
const presence = opts.presenceTracker ?? new PresenceTracker(opts.events ?? null);
app.onError((err, c) => {
if (err instanceof ShadeError) {
@@ -102,17 +126,32 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): {
const verified = await verifyBridgeAuth(c, opts, 'stream');
return streamSSE(c, async (stream) => {
const address = verified.address;
const connId = presence.newConnectionId();
presence.markConnected(address, 'sse', connId);
let presenceClosed = false;
const closePresence = (reason: 'closed' | 'error'): void => {
if (presenceClosed) return;
presenceClosed = true;
presence.markDisconnected(address, 'sse', connId, reason);
};
let cursor = verified.since;
const writer = makeBlobWriter(opts.store, pageLimit);
const delivered = new DeliveredIdLru();
// Initial backlog drain.
const flushed = await flushTo(writer, address, cursor, async (blob) => {
const flushed = await flushTo(
writer,
address,
cursor,
async (blob) => {
await stream.writeSSE({
id: String(blob.receivedAt),
event: 'envelope',
data: JSON.stringify(serializeBlob(blob)),
});
});
},
delivered,
);
cursor = Math.max(cursor, flushed);
// Hook up event-driven push if available, else fall back to a poll
@@ -124,19 +163,31 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): {
const triggerFlush = (): void => {
signalled = true;
// Serialize fan-in so concurrent triggers don't double-fetch.
pendingFlushPromise = pendingFlushPromise.then(async () => {
// `.catch(() => {})` keeps the chain alive across transient
// emit failures (e.g. a closed SSE write throws) — without it
// one rejection silently kills every future flush on this
// connection.
pendingFlushPromise = pendingFlushPromise
.then(async () => {
while (signalled) {
signalled = false;
const drained = await flushTo(writer, address, cursor, async (blob) => {
const drained = await flushTo(
writer,
address,
cursor,
async (blob) => {
await stream.writeSSE({
id: String(blob.receivedAt),
event: 'envelope',
data: JSON.stringify(serializeBlob(blob)),
});
});
},
delivered,
);
if (drained > cursor) cursor = drained;
}
});
})
.catch(() => {});
};
if (opts.events) {
@@ -163,6 +214,68 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): {
clearInterval(fallbackTimer);
clearInterval(heartbeat);
await pendingFlushPromise.catch(() => {});
closePresence('closed');
});
});
// ─── Presence (V4.7) ──────────────────────────────────────────
// SSE feed of `peer_connected` / `peer_disconnected` events filtered
// by a watcher-supplied address list. Subscribing does NOT count as
// a peer-bridge connection (it doesn't call `markConnected`) so
// monitoring presence doesn't make you appear online to others.
app.get('/v1/bridge/presence', async (c) => {
const verified = await verifyPresenceAuth(c, opts);
return streamSSE(c, async (stream) => {
const watched = new Set(verified.watched);
// Initial snapshot — one frame per watched address with current
// status. Lets subscribers render UI immediately rather than
// waiting for the next state change.
const now = Date.now();
for (const addr of verified.watched) {
await stream.writeSSE({
event: 'presence',
data: JSON.stringify({
address: addr,
status: presence.isOnline(addr) ? 'online' : 'offline',
at: now,
}),
});
}
let unsubscribe: (() => void) | null = null;
if (opts.events) {
unsubscribe = opts.events.on((e) => {
if (e.name !== 'inbox.peer_connected' && e.name !== 'inbox.peer_disconnected') return;
const data = e.data as { address: string; bridgeKind: TrackedBridgeKind };
if (!watched.has(data.address)) return;
const status = e.name === 'inbox.peer_connected' ? 'online' : 'offline';
// Fire-and-forget: drop the frame if the stream has gone away.
void stream
.writeSSE({
event: 'presence',
data: JSON.stringify({
address: data.address,
status,
at: e.timestamp,
via: data.bridgeKind,
}),
})
.catch(() => {});
});
}
const heartbeat = setInterval(() => {
stream.write(`: ping ${Date.now()}\n\n`).catch(() => {});
}, heartbeatIntervalMs);
await new Promise<void>((resolve) => {
const sig = c.req.raw.signal;
if (sig.aborted) return resolve();
sig.addEventListener('abort', () => resolve(), { once: true });
});
unsubscribe?.();
clearInterval(heartbeat);
});
});
@@ -230,30 +343,50 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): {
}
const address = verified.address;
const connId = presence.newConnectionId();
let cursor = verified.since;
const writer = makeBlobWriter(opts.store, pageLimit);
const delivered = new DeliveredIdLru();
let unsubscribe: (() => void) | null = null;
let fallbackTimer: ReturnType<typeof setInterval> | null = null;
let pendingFlushPromise: Promise<void> = Promise.resolve();
let signalled = false;
let connected = true;
let presenceClosed = false;
const closePresence = (reason: 'closed' | 'error'): void => {
if (presenceClosed) return;
presenceClosed = true;
presence.markDisconnected(address, 'ws', connId, reason);
};
return {
onOpen(_evt: unknown, ws: {
send: (data: string) => void;
close: (code?: number, reason?: string) => void;
}) {
presence.markConnected(address, 'ws', connId);
const triggerFlush = (): void => {
signalled = true;
pendingFlushPromise = pendingFlushPromise.then(async () => {
// `.catch(() => {})` mirrors the SSE chain — keeps the
// pending-flush queue alive across transient ws.send errors
// (e.g. partial close, backpressure overflow).
pendingFlushPromise = pendingFlushPromise
.then(async () => {
while (signalled && connected) {
signalled = false;
const drained = await flushTo(writer, address, cursor, async (blob) => {
const drained = await flushTo(
writer,
address,
cursor,
async (blob) => {
ws.send(JSON.stringify(serializeBlob(blob)));
});
},
delivered,
);
if (drained > cursor) cursor = drained;
}
});
})
.catch(() => {});
};
if (opts.events) {
unsubscribe = opts.events.on((e) => {
@@ -269,12 +402,19 @@ export function createBridgeRoutes(opts: BridgeRoutesOptions): {
connected = false;
unsubscribe?.();
if (fallbackTimer) clearInterval(fallbackTimer);
closePresence('closed');
},
onError() {
connected = false;
unsubscribe?.();
if (fallbackTimer) clearInterval(fallbackTimer);
closePresence('error');
},
};
}),
);
return { app, websocket };
return { app, websocket, presence };
}
// ─── helpers ──────────────────────────────────────────────────
@@ -321,11 +461,75 @@ async function verifyBridgeAuth(
return { address, kind: kind as BridgeKind, since };
}
const MAX_WATCHED_ADDRESSES = 64;
/**
* Verify a `/v1/bridge/presence` request.
*
* Signed canonical payload: `{address, kind: 'presence', watched: string[],
* signedAt}`. The watcher's address must be a registered inbox; the
* signature is verified against the registered owner key for that
* address. The `watched` list bounds what the subscription will
* receive — server-side filtering is enforced inside the handler.
*/
async function verifyPresenceAuth(
c: Context,
opts: BridgeRoutesOptions,
): Promise<VerifiedPresenceRequest> {
const url = new URL(c.req.url);
const qs = url.searchParams;
const address = validateAddress(qs.get('address'));
const kind = qs.get('kind');
if (kind !== 'presence') {
throw new ValidationError(`bridge kind mismatch: expected presence`, 'kind');
}
const watchedRaw = qs.get('watched');
if (watchedRaw === null) throw new ValidationError('missing watched', 'watched');
// Empty subscription is allowed (subscribe to nothing — useful for a
// client that intends to call addPeer right after open). A null/
// missing param is still rejected so the canonicalization is
// unambiguous.
const watched =
watchedRaw === ''
? []
: watchedRaw.split(',').map((a) => validateAddress(a));
if (watched.length > MAX_WATCHED_ADDRESSES) {
throw new ValidationError(
`watched list too large: ${watched.length} > ${MAX_WATCHED_ADDRESSES}`,
'watched',
);
}
const signedAtStr = qs.get('signedAt');
const signature = qs.get('signature');
if (signedAtStr === null) throw new ValidationError('missing signedAt', 'signedAt');
if (!signature) throw new ValidationError('missing signature', 'signature');
const signedAt = Number(signedAtStr);
if (!Number.isFinite(signedAt)) {
throw new ValidationError('signedAt must be a number', 'signedAt');
}
const owner = await opts.store.getAddressOwner(address);
if (!owner) {
throw new UnauthorizedError(`address ${address} is not registered`);
}
await verifyPayload(opts.crypto, owner, {
address,
kind,
watched,
signedAt,
signature,
});
return { address, watched };
}
interface BlobRow {
msgId: string;
ciphertext: Uint8Array;
receivedAt: number;
expiresAt: number;
/** V4.8 — relay-captured sender fingerprint. Optional for legacy rows. */
senderFp?: string;
}
interface BlobWriter {
@@ -345,11 +549,41 @@ function makeBlobWriter(store: InboxStore, pageLimit: number): BlobWriter {
};
}
/**
* Per-connection bounded msgId tracker — defense in depth against duplicate
* delivery of the same blob to the same bridge socket. Cursor pagination
* already guarantees uniqueness in the happy path, but a dedup gate at the
* emit boundary catches any subtle bug (e.g. a flushTo race, a future
* refactor, an event-emit retry) without changing wire semantics.
*
* The cap is intentionally large enough to cover any realistic bridge
* pageLimit and small enough to bound memory under long-running streams.
*/
const DELIVERED_LRU_CAP = 4096;
class DeliveredIdLru {
private readonly seen = new Set<string>();
private readonly order: string[] = [];
/** Returns true if `msgId` has not been seen on this connection yet. */
add(msgId: string): boolean {
if (this.seen.has(msgId)) return false;
this.seen.add(msgId);
this.order.push(msgId);
if (this.order.length > DELIVERED_LRU_CAP) {
const evicted = this.order.shift()!;
this.seen.delete(evicted);
}
return true;
}
}
async function flushTo(
writer: BlobWriter,
address: string,
startCursor: number,
emit: (blob: BlobRow) => Promise<void>,
delivered?: DeliveredIdLru,
): Promise<number> {
let cursor = startCursor;
// Drain page-by-page so a backlog larger than `pageLimit` still flushes.
@@ -358,7 +592,12 @@ async function flushTo(
const page = await writer.fetchPage(address, cursor);
if (page.length === 0) break;
for (const row of page) {
// Per-connection dedup gate — prevents the same msgId from being
// emitted twice if flushTo is somehow re-entered before the cursor
// catches up. See comment on `DeliveredIdLru`.
if (!delivered || delivered.add(row.msgId)) {
await emit(row);
}
if (row.receivedAt > cursor) cursor = row.receivedAt;
}
if (page.length === 0) break;
@@ -371,13 +610,26 @@ function serializeBlob(blob: BlobRow): {
ciphertext: string;
receivedAt: number;
expiresAt: number;
from?: string;
} {
return {
const out: {
msgId: string;
ciphertext: string;
receivedAt: number;
expiresAt: number;
from?: string;
} = {
msgId: blob.msgId,
ciphertext: toBase64(blob.ciphertext),
receivedAt: blob.receivedAt,
expiresAt: blob.expiresAt,
};
// V4.8 — relay-captured sender fingerprint. The transport-bridge
// wire format already accepts `from`; populating it lets receivers
// bootstrap unknown-sender first-contact via `shade.receive('fp:<hex>',
// env)` without requiring an out-of-band sender hint.
if (blob.senderFp) out.from = blob.senderFp;
return out;
}
function buildPollResponse(blobs: BlobRow[], sinceFallback: number): {

View File

@@ -21,6 +21,19 @@ export interface InboxServerEventMap {
'inbox.expired_purged': { count: number };
'inbox.rate_limited': { route: string; key: string };
'inbox.quota_rejected': { address: string; reason: 'address-quota' | 'sender-quota' | 'body-too-large' };
// V4.7 — bridge presence transitions. Emitted on the 0↔1 boundary
// across tracked transports for a given address. Long-poll is
// intentionally NOT tracked: an LP client toggles in/out of a request
// every few seconds, and the resulting flapping would dominate the
// event stream. Push transports (WS, SSE) are also the only ones
// where the ~50ms revoke window for `BroadcastChannel.removeMember`
// matters — long-poll users are already on a slow path.
'inbox.peer_connected': { address: string; bridgeKind: 'ws' | 'sse' };
'inbox.peer_disconnected': {
address: string;
bridgeKind: 'ws' | 'sse';
reason: 'closed' | 'error';
};
}
export type InboxServerEventName = keyof InboxServerEventMap;

View File

@@ -32,6 +32,8 @@ export {
export type { InboxQuotaConfig } from './quota.js';
export { createBridgeRoutes } from './bridge.js';
export type { BridgeRoutesOptions, BridgeKind } from './bridge.js';
export { PresenceTracker } from './presence.js';
export type { TrackedBridgeKind } from './presence.js';
/**
* Create a standalone Shade Inbox Server.

View File

@@ -5,6 +5,7 @@ interface BlobRow {
ciphertext: Uint8Array;
receivedAt: number;
expiresAt: number;
senderFp?: string;
}
/**
@@ -33,6 +34,7 @@ export class MemoryInboxStore implements InboxStore {
msgId: string;
ciphertext: Uint8Array;
expiresAt: number;
senderFp?: string;
}): Promise<{ created: boolean; receivedAt: number }> {
const list = this.blobs.get(args.address) ?? [];
const existing = list.find((r) => r.msgId === args.msgId);
@@ -41,12 +43,14 @@ export class MemoryInboxStore implements InboxStore {
// multiple blobs land in the same millisecond.
const receivedAt = Math.max(this.nextReceivedAt + 1, Date.now());
this.nextReceivedAt = receivedAt;
list.push({
const row: BlobRow = {
msgId: args.msgId,
ciphertext: new Uint8Array(args.ciphertext),
receivedAt,
expiresAt: args.expiresAt,
});
};
if (args.senderFp !== undefined) row.senderFp = args.senderFp;
list.push(row);
this.blobs.set(args.address, list);
return { created: true, receivedAt };
}
@@ -56,18 +60,36 @@ export class MemoryInboxStore implements InboxStore {
sinceCursor: number;
now: number;
limit: number;
}): Promise<Array<{ msgId: string; ciphertext: Uint8Array; receivedAt: number; expiresAt: number }>> {
}): Promise<
Array<{
msgId: string;
ciphertext: Uint8Array;
receivedAt: number;
expiresAt: number;
senderFp?: string;
}>
> {
const list = this.blobs.get(args.address) ?? [];
return list
.filter((r) => r.receivedAt > args.sinceCursor && r.expiresAt > args.now)
.sort((a, b) => a.receivedAt - b.receivedAt)
.slice(0, args.limit)
.map((r) => ({
.map((r) => {
const out: {
msgId: string;
ciphertext: Uint8Array;
receivedAt: number;
expiresAt: number;
senderFp?: string;
} = {
msgId: r.msgId,
ciphertext: new Uint8Array(r.ciphertext),
receivedAt: r.receivedAt,
expiresAt: r.expiresAt,
}));
};
if (r.senderFp !== undefined) out.senderFp = r.senderFp;
return out;
});
}
async deleteBlob(address: string, msgId: string): Promise<boolean> {

View File

@@ -0,0 +1,75 @@
/**
* V4.7 — bridge-connection presence tracking.
*
* The bridge handlers (`/v1/bridge/stream` and `/v1/bridge/ws`) call
* `markConnected` on open and `markDisconnected` on close. The tracker
* keeps a per-address set of connection ids; the `inbox.peer_connected`
* / `inbox.peer_disconnected` events fire only on the 0↔1 boundary so
* that two simultaneous bridges (e.g. SSE + WS during a transport-
* fallback handover) collapse into a single connected/disconnected
* pair from the consumer's point of view.
*
* Long-poll (`/v1/bridge/poll`) is intentionally NOT tracked — see the
* note on `InboxServerEventMap` in `events.ts`.
*/
import type { InboxServerEvents } from './events.js';
export type TrackedBridgeKind = 'ws' | 'sse';
export class PresenceTracker {
private readonly connections = new Map<string, Set<string>>();
private nextConnId = 1;
constructor(private readonly events: InboxServerEvents | null) {}
/** Allocate a fresh connection id for `markConnected` / `markDisconnected`. */
newConnectionId(): string {
return `c${this.nextConnId++}`;
}
/**
* Snapshot: is `address` currently connected over any tracked transport?
* Used by `/v1/bridge/presence` to push the initial state to a new
* subscriber.
*/
isOnline(address: string): boolean {
const set = this.connections.get(address);
return set !== undefined && set.size > 0;
}
markConnected(address: string, bridgeKind: TrackedBridgeKind, connectionId: string): void {
let set = this.connections.get(address);
const wasOnline = set !== undefined && set.size > 0;
if (!set) {
set = new Set();
this.connections.set(address, set);
}
set.add(connectionId);
if (!wasOnline) {
this.events?.emit('inbox.peer_connected', { address, bridgeKind });
}
}
markDisconnected(
address: string,
bridgeKind: TrackedBridgeKind,
connectionId: string,
reason: 'closed' | 'error',
): void {
const set = this.connections.get(address);
if (!set) return;
if (!set.delete(connectionId)) return;
if (set.size === 0) {
this.connections.delete(address);
this.events?.emit('inbox.peer_disconnected', { address, bridgeKind, reason });
}
}
/** Inspect the underlying map. Test/observability use only. */
snapshot(): Map<string, ReadonlySet<string>> {
return new Map(
Array.from(this.connections.entries(), ([k, v]) => [k, v as ReadonlySet<string>]),
);
}
}

View File

@@ -255,11 +255,19 @@ export function createInboxRoutes(
);
}
// V4.8: capture sender fingerprint at PUT time. The sender's
// signing key was just verified for this request, so the fingerprint
// is bound to the same authentication path that authorized the
// store. Surfaced on bridge push + inbox-fetch responses to
// bootstrap unknown-sender first-contact (X3DH pair handshake).
const senderFp = await shortHash(senderKey);
const result = await store.putBlob({
address,
msgId,
ciphertext: ctBytes,
expiresAt,
senderFp,
});
if (result.created) {
events?.emit('inbox.blob_stored', {
@@ -319,12 +327,22 @@ export function createInboxRoutes(
let bytes = 0;
const blobs = rows.map((r) => {
bytes += r.ciphertext.length;
return {
const out: {
msgId: string;
ciphertext: string;
receivedAt: number;
expiresAt: number;
from?: string;
} = {
msgId: r.msgId,
ciphertext: toBase64(r.ciphertext),
receivedAt: r.receivedAt,
expiresAt: r.expiresAt,
};
// V4.8: surface sender fingerprint when present. Empty for blobs
// persisted by a pre-4.8 relay that didn't track sender provenance.
if (r.senderFp) out.from = r.senderFp;
return out;
});
const nextCursor = rows.length > 0 ? rows[rows.length - 1]!.receivedAt : sinceCursor;

View File

@@ -36,12 +36,20 @@ export interface InboxStore {
* **Idempotent**: if a row already exists for `(address, msgId)` the
* implementation MUST return `{ created: false }` and leave the existing
* row untouched. A fresh insert returns `{ created: true, receivedAt }`.
*
* `senderFp` (V4.8+) is the 8-byte hex of SHA-256(senderSigningKey)
* — captured at PUT time when the relay verified the sender's
* signature. Optional so legacy 4.7 callers compile, but populated by
* `createInboxRoutes` from 4.8 onward and surfaced on bridge push +
* inbox-fetch responses to bootstrap unknown-sender first-contact
* (X3DH pair handshake).
*/
putBlob(args: {
address: string;
msgId: string;
ciphertext: Uint8Array;
expiresAt: number;
senderFp?: string;
}): Promise<{ created: boolean; receivedAt: number }>;
/**
@@ -57,7 +65,20 @@ export interface InboxStore {
sinceCursor: number;
now: number;
limit: number;
}): Promise<Array<{ msgId: string; ciphertext: Uint8Array; receivedAt: number; expiresAt: number }>>;
}): Promise<
Array<{
msgId: string;
ciphertext: Uint8Array;
receivedAt: number;
expiresAt: number;
/**
* Sender fingerprint — 8-byte hex of SHA-256(senderSigningKey)
* captured at PUT time. Empty/undefined for blobs persisted by a
* pre-4.8 relay that didn't track sender provenance.
*/
senderFp?: string;
}>
>;
/**
* Delete a single blob by `(address, msgId)`. Returns true if a row was

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/inbox",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -40,6 +40,16 @@ export interface FetchedBlob {
receivedAt: number;
/** Absolute expiry time (ms since epoch) reported by the server. */
expiresAt: number;
/**
* Sender fingerprint — 8-byte hex of SHA-256(senderSigningKey),
* captured by the relay at PUT time when the sender's signature was
* verified. Empty/undefined when the relay is pre-4.8 or the blob
* predates sender-fingerprint tracking. Use as an unknown-sender
* bootstrap label (`fp:<hex>`) for X3DH first-contact; the
* authoritative sender identity is recovered post-decrypt from the
* envelope itself, so `from` is a hint, not a trust anchor.
*/
from?: string;
}
export interface FetchResult {
@@ -52,7 +62,14 @@ export class InboxClient {
private readonly fetchImpl: typeof fetch;
constructor(private readonly options: InboxClientOptions) {
this.fetchImpl = options.fetch ?? globalThis.fetch;
// Bind once. The browser's `globalThis.fetch` is a WebIDL bound
// operation that throws "Illegal invocation" when called as a method
// on another object (which is what `this.fetchImpl(...)` does).
// Node/Bun fetch tolerates a free receiver, but binding is harmless.
// A consumer-supplied `options.fetch` is bound to the global too —
// a fetch that requires a specific receiver must bind itself.
const f = options.fetch ?? globalThis.fetch;
this.fetchImpl = f.bind(globalThis);
}
/**
@@ -144,12 +161,18 @@ export class InboxClient {
);
const blobs = Array.isArray(json.blobs) ? json.blobs : [];
return {
blobs: blobs.map((b: any) => ({
blobs: blobs.map((b: any): FetchedBlob => {
const out: FetchedBlob = {
msgId: String(b.msgId),
ciphertext: fromBase64(String(b.ciphertext)),
receivedAt: Number(b.receivedAt),
expiresAt: Number(b.expiresAt),
})),
};
// V4.8 — relay-supplied sender fingerprint hint. Optional; absent
// on pre-4.8 relays or for blobs persisted before tracking landed.
if (typeof b.from === 'string' && b.from.length > 0) out.from = b.from;
return out;
}),
cursor: Number(json.cursor ?? sinceCursor),
hasMore: Boolean(json.hasMore),
};

View File

@@ -16,9 +16,20 @@ import { InboxClientEvents, type InboxClientListener } from './events.js';
* decrypt path it owns) and either return a sender-hint for telemetry
* (the address the SDK extracted, or `null`) or throw to keep the blob
* on the server for a later retry.
*
* V4.8: `raw.from` is the relay-captured sender fingerprint (8-byte hex
* of SHA-256 over the sender's signing key). Empty when the relay is
* pre-4.8 or didn't track the sender. Use it as the `fp:<hex>` bootstrap
* label for X3DH first-contact handshakes.
*/
export type DecryptHandler = (
raw: { msgId: string; ciphertext: Uint8Array; receivedAt: number; expiresAt: number },
raw: {
msgId: string;
ciphertext: Uint8Array;
receivedAt: number;
expiresAt: number;
from?: string;
},
) => Promise<string | null | undefined> | string | null | undefined;
export interface InboxOptions {
@@ -149,6 +160,14 @@ export class Inbox {
signingKey: this.options.signingPublicKey,
});
this.registered = true;
// V4.8: gate the first poll on register success. `start()` calls
// `register()` fire-and-forget; without this kick, the very first
// `pollOnce()` (scheduled synchronously alongside register) would
// race the register HTTP RTT and return SHADE_NOT_FOUND. The
// pollOnce() guard skips polls until `registered === true`; this
// immediate schedule ensures we don't wait the full pollIntervalMs
// for the next attempt once register lands.
if (this.started) this.schedulePoll(0);
}
/** Drop the address from the server. Local queue/cursor are preserved. */
@@ -217,7 +236,11 @@ export class Inbox {
this.scheduleRegisterRetry();
});
this.scheduleFlush();
this.schedulePoll(0);
// V4.8: do NOT schedule the first poll synchronously. `register()`
// success kicks `schedulePoll(0)` so the first poll fires after the
// server has acknowledged the address. Pre-fix this raced register
// and burned a 404 on every fresh-address `start()`.
if (this.registered) this.schedulePoll(0);
}
/** Stop background timers. Pending entries remain in the queue. */
@@ -375,12 +398,20 @@ export class Inbox {
let senderHint: string | null = null;
try {
const result = await this.incomingHandler({
const raw: {
msgId: string;
ciphertext: Uint8Array;
receivedAt: number;
expiresAt: number;
from?: string;
} = {
msgId: blob.msgId,
ciphertext: blob.ciphertext,
receivedAt: blob.receivedAt,
expiresAt: blob.expiresAt,
});
};
if (blob.from !== undefined) raw.from = blob.from;
const result = await this.incomingHandler(raw);
senderHint = result ?? null;
} catch (err) {
this.events.emit('inbox.message_decrypt_failed', {

View File

@@ -281,3 +281,195 @@ describe('tamper detection', () => {
expect(result.received).toBe(0);
});
});
describe('InboxClient — default fetch is bound to globalThis', () => {
// Regression: browsers' `fetch` is a WebIDL bound operation that throws
// "Illegal invocation" when called as a method on another object. The
// class stores `fetchImpl` and calls `this.fetchImpl(...)`, which strips
// the Window receiver. Constructor must `bind(globalThis)`.
test('default path passes globalThis as `this` (no Illegal invocation)', async () => {
const realFetch = globalThis.fetch;
let observedReceiver: unknown = 'unset';
function strictFetch(this: unknown, _input: unknown, _init?: unknown): Promise<Response> {
observedReceiver = this;
if (this !== globalThis) {
throw new TypeError("Failed to execute 'fetch' on 'Window': Illegal invocation");
}
return Promise.resolve(
new Response('{}', {
status: 200,
headers: { 'content-type': 'application/json' },
}),
);
}
Object.defineProperty(globalThis, 'fetch', {
configurable: true,
writable: true,
value: strictFetch,
});
try {
const id = await makeIdentity();
const client = new InboxClient({
baseUrl: 'http://example.invalid',
crypto,
signingPrivateKey: id.signingPrivateKey,
// No `fetch` override on purpose — this exercises the default path.
});
await client.register({ address: 'whoever', signingKey: id.signingPublicKey });
expect(observedReceiver).toBe(globalThis);
} finally {
Object.defineProperty(globalThis, 'fetch', {
configurable: true,
writable: true,
value: realFetch,
});
}
});
});
describe('Inbox.start() — fresh-address register/poll race (V4.8)', () => {
// Regression: pre-4.8 `start()` called `register()` fire-and-forget AND
// `schedulePoll(0)` synchronously, so the first poll often beat the
// register HTTP RTT and got SHADE_NOT_FOUND on a fresh address. Fix:
// start() defers the first poll; register() success kicks it.
test('fresh address: no fetch fires before register completes', async () => {
const store = new MemoryInboxStore();
const app = createInboxServer({ crypto, store, disableRateLimit: true });
const alice = await makeIdentity();
// Order observed by the server: must be register-then-fetch, never
// fetch-then-register.
const calls: Array<'register' | 'fetch' | 'put'> = [];
let registerArrived = false;
const recordingFetch: typeof fetch = (async (input, init) => {
const u =
typeof input === 'string'
? input
: input instanceof URL
? input.toString()
: (input as Request).url;
if (u.includes('/v1/inbox/register')) {
calls.push('register');
// Hold register for a tick to widen the race window.
await new Promise((r) => setTimeout(r, 25));
registerArrived = true;
} else if (u.endsWith('/fetch')) {
// Any fetch arriving before register is the race we're guarding
// against.
if (!registerArrived) {
throw new Error('fetch fired before register completed (race not fixed)');
}
calls.push('fetch');
} else if (u.includes('/v1/inbox/')) {
calls.push('put');
}
return honoFetch(app)(input, init);
}) as typeof fetch;
const inbox = new Inbox({
baseUrl: 'http://localhost',
ownAddress: 'alice',
crypto,
signingPrivateKey: alice.signingPrivateKey,
signingPublicKey: alice.signingPublicKey,
pollIntervalMs: 30_000, // Long enough that only register's kick triggers.
fetch: recordingFetch,
});
inbox.onIncoming(() => null);
inbox.start();
// Wait until register has completed and the success-kick poll lands.
await new Promise((r) => setTimeout(r, 100));
inbox.stop();
expect(calls[0]).toBe('register');
// First fetch (if any) must be after register.
const firstFetchIdx = calls.indexOf('fetch');
if (firstFetchIdx !== -1) {
expect(firstFetchIdx).toBeGreaterThan(calls.indexOf('register'));
}
});
});
describe('FetchedBlob.from — relay-supplied sender fingerprint (V4.8)', () => {
test('inbox-fetch response carries from = 8-byte hex of SHA-256(senderSigningKey)', async () => {
const store = new MemoryInboxStore();
const app = createInboxServer({ crypto, store, disableRateLimit: true });
const bob = await makeIdentity();
const alice = await makeIdentity();
const bobClient = new InboxClient({
baseUrl: 'http://localhost',
crypto,
signingPrivateKey: bob.signingPrivateKey,
fetch: honoFetch(app),
});
const aliceClient = new InboxClient({
baseUrl: 'http://localhost',
crypto,
signingPrivateKey: alice.signingPrivateKey,
fetch: honoFetch(app),
});
await bobClient.register({ address: 'bob', signingKey: bob.signingPublicKey });
await aliceClient.put({
recipientAddress: 'bob',
senderSigningKey: alice.signingPublicKey,
envelope: randBytes(64),
});
const fetched = await bobClient.fetch({ address: 'bob' });
expect(fetched.blobs.length).toBe(1);
const fp = fetched.blobs[0]!.from;
expect(fp).toBeDefined();
expect(fp).toMatch(/^[0-9a-f]{16}$/);
// Must be reproducible: SHA-256(alice.signingPublicKey) → first 8 bytes hex.
const digest = await globalThis.crypto.subtle.digest(
'SHA-256',
alice.signingPublicKey as unknown as ArrayBuffer,
);
const expected = Array.from(new Uint8Array(digest).slice(0, 8), (b) =>
b.toString(16).padStart(2, '0'),
).join('');
expect(fp).toBe(expected);
});
test('DecryptHandler raw arg propagates from to the app', async () => {
const store = new MemoryInboxStore();
const app = createInboxServer({ crypto, store, disableRateLimit: true });
const bob = await makeIdentity();
const alice = await makeIdentity();
const aliceClient = new InboxClient({
baseUrl: 'http://localhost',
crypto,
signingPrivateKey: alice.signingPrivateKey,
fetch: honoFetch(app),
});
const bobInbox = new Inbox({
baseUrl: 'http://localhost',
ownAddress: 'bob',
crypto,
signingPrivateKey: bob.signingPrivateKey,
signingPublicKey: bob.signingPublicKey,
pollIntervalMs: 0,
fetch: honoFetch(app),
});
await bobInbox.register();
await aliceClient.put({
recipientAddress: 'bob',
senderSigningKey: alice.signingPublicKey,
envelope: randBytes(40),
});
let observed: string | undefined = undefined;
bobInbox.onIncoming((raw) => {
observed = raw.from;
return null;
});
await bobInbox.tick();
expect(observed).toMatch(/^[0-9a-f]{16}$/);
});
});

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/key-transparency",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/keychain",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/observability",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/observer",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/proto",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/recovery",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/sdk",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -153,6 +153,13 @@ export class Shade {
private establishing = new Map<string, Promise<void>>();
// Per-address encrypt queue to serialize ratchet mutations
private encryptChains = new Map<string, Promise<unknown>>();
// Per-`from` decrypt queue: serializes incoming receives so two concurrent
// shade.receive(from, env) calls can't race the ratchet/storage. Without
// this, parallel deliveries (relay duplicate fan-out, fast pipelined
// sends) hit `database is locked` (sqlite) or transaction conflicts (IDB)
// because the underlying StorageProvider isn't required to be a
// concurrent-safe writer. See V4.8.2 changelog.
private decryptChains = new Map<string, Promise<unknown>>();
// Message handlers — may be sync or async; receive() awaits each. The
// optional third arg distinguishes direct vs broadcast plaintexts;
@@ -436,7 +443,35 @@ export class Shade {
*/
async receive(from: string, envelope: ShadeEnvelope): Promise<string> {
if (!this.initialized) throw new Error('Not initialized');
const plaintext = await this.manager.decrypt(from, envelope);
// Serialize ONLY the ratchet/storage write portion of receive (the
// call into `manager.decrypt`). Concurrent decrypts race the
// SessionManager ratchet (mutated in place) and the StorageProvider
// (not required to be a concurrent-safe writer — `bun:sqlite`
// throws `database is locked`, IDB throws transaction conflicts).
// The Prism FR called this out: a relay-duplicated WS fan-out
// dispatched 8 parallel `shade.receive(from, env)` calls, one won
// the X3DH prekey race and the other 7 failed with
// `database is locked` / `one-time prekey not found`. The fix is
// to queue per-`from` decrypts so the ratchet step is sequential.
//
// Crucially the user-visible MESSAGE HANDLERS run *outside* the
// queue. Streams + file-RPC issue nested `shade.receive` calls for
// the same peer from inside their handlers (e.g. `stream-end`
// arrives while a write-RPC is still waiting on chunks); holding
// the queue across the handler would self-deadlock. The atomic
// unit we have to protect is just the ratchet+storage step, not
// the consumer's reaction to it.
const previous = this.decryptChains.get(from) ?? Promise.resolve();
const decryptPromise = previous
.catch(() => undefined) // don't propagate upstream failures
.then(() => this.manager.decrypt(from, envelope));
// Store a never-rejecting copy so the next chained receive doesn't
// see a rejection from this one (we still surface our own rejection
// to *this* caller via the original `decryptPromise`).
this.decryptChains.set(from, decryptPromise.catch(() => undefined));
const plaintext = await decryptPromise;
const consumed = await maybeHandleControlPlaintext(
this.broadcastHooks(),
from,

View File

@@ -131,6 +131,53 @@ describe('createShade — happy path', () => {
await expect(alice.send('nobody', 'ghost')).rejects.toThrow();
});
test('concurrent receive(from, env) for same `from` does not race the ratchet (V4.8.2)', async () => {
// Reproduces the Prism FR scenario: a single PUT is fanned out
// multiple times by the relay (or any duplicating transport), the
// receiver dispatches several `shade.receive(from, env)` in
// parallel, and the underlying SessionManager + StorageProvider
// would race on the ratchet (and on storage writes — sqlite throws
// "database is locked", IDB throws transaction conflicts) without
// per-`from` serialization. We pre-establish a session, then fire
// the same envelope at `bob.receive` from many concurrent callers
// and verify all of them either decrypt to the same plaintext or
// surface a benign "already-consumed" error. Crucially: no
// unhandled storage races, no ratchet corruption, and the next
// legitimate message still decrypts.
alice = await createShade({ prekeyServer: server.url, address: 'alice' });
bob = await createShade({ prekeyServer: server.url, address: 'bob' });
const env1 = await alice.send('bob', 'first');
expect(await bob.receive('alice', env1)).toBe('first');
const env2 = await alice.send('bob', 'second');
// Fan the same envelope out to 8 concurrent receives — exactly the
// shape of the relay duplicate fan-out described in the FR.
const dispatches = await Promise.allSettled(
Array.from({ length: 8 }, () => bob.receive('alice', env2)),
);
// At least one must have succeeded with the right plaintext; the
// others may legitimately reject (replay protection / OTPK
// already-consumed) but MUST NOT corrupt the ratchet or throw
// "database is locked".
const fulfilled = dispatches.filter((d) => d.status === 'fulfilled') as Array<
PromiseFulfilledResult<string>
>;
expect(fulfilled.length).toBeGreaterThan(0);
expect(fulfilled[0]!.value).toBe('second');
for (const d of dispatches) {
if (d.status === 'rejected') {
const msg = String((d.reason as Error)?.message ?? d.reason);
expect(msg).not.toMatch(/database is locked/i);
}
}
// Ratchet must still advance — the next legitimate message decrypts.
const env3 = await alice.send('bob', 'third');
expect(await bob.receive('alice', env3)).toBe('third');
});
test('verify fingerprint matches pinned identity', async () => {
alice = await createShade({ prekeyServer: server.url, address: 'alice' });
bob = await createShade({ prekeyServer: server.url, address: 'bob' });

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/server",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -154,6 +154,20 @@ const inboxEvents = new InboxServerEvents();
// SHADE_KT_PG_URL (or SHADE_PREKEY_PG_URL) is set, else memory.
const kt = await maybeCreateKT();
// V4.8.1 — `SHADE_DISABLE_RATE_LIMIT=1` turns off the IP-based
// register/replenish/fetch token-buckets on every prekey + inbox
// route. INTENDED FOR SELF-HOSTED SINGLE-TEAM (DEV / SINGLE-TENANT)
// DEPLOYMENTS ONLY — the rate-limit defends multi-tenant relays
// against abuse, so a public/shared deployment must leave this
// unset. Without it, the existing 5/hour REGISTER_LIMIT etc. apply
// unchanged.
const disableRateLimit = process.env.SHADE_DISABLE_RATE_LIMIT === '1';
if (disableRateLimit) {
logger.warn(
'SHADE_DISABLE_RATE_LIMIT=1 — IP rate limits OFF on prekey + inbox routes. Use only for single-tenant deployments.',
);
}
// Compose the full app: metrics middleware + health + metrics + prekey routes
const app = new Hono();
app.use('*', metricsMiddleware());
@@ -164,10 +178,17 @@ app.route(
'/',
createPrekeyRoutes(store, crypto, {
events,
disableRateLimit,
...(kt ? { keyTransparency: kt } : {}),
}),
);
app.route('/', createInboxRoutes(inboxStore, crypto, { events: inboxEvents }));
app.route(
'/',
createInboxRoutes(inboxStore, crypto, {
events: inboxEvents,
disableRateLimit,
}),
);
// V3.7 transport-bridge — SSE / long-poll / WS fallbacks for the inbox.
// Held as a top-level reference so the WebSocket handler can be passed to

View File

@@ -102,6 +102,48 @@ describe('Rate limiting integration with routes', () => {
expect(results.filter((s) => s === 429).length).toBeGreaterThanOrEqual(1);
});
// V4.8.1 — `SHADE_DISABLE_RATE_LIMIT=1` in standalone.ts is plumbed
// through to `createPrekeyServer({ disableRateLimit })`. This test
// covers the "what happens when the flag is true" path; the env-var
// → option conversion in standalone.ts is a one-liner verified by
// inspection.
test('register endpoint allows >5/hour from a single IP when disableRateLimit is set', async () => {
const app = createPrekeyServer({
crypto,
store: new MemoryPrekeyStore(),
disableRateLimit: true,
});
async function doRegister(addressSuffix: number) {
const identity = await generateIdentityKeyPair(crypto);
const body: any = {
address: `user${addressSuffix}`,
identitySigningKey: Buffer.from(identity.signingPublicKey).toString('base64'),
identityDHKey: Buffer.from(identity.dhPublicKey).toString('base64'),
signedPreKey: {
keyId: 1,
publicKey: Buffer.from(crypto.randomBytes(32)).toString('base64'),
signature: Buffer.from(crypto.randomBytes(64)).toString('base64'),
},
};
const signed = await signPayload(crypto, identity.signingPrivateKey, body);
return app.request('/v1/keys/register', {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'x-forwarded-for': '203.0.113.1' },
body: JSON.stringify(signed),
});
}
const results: number[] = [];
for (let i = 0; i < 12; i++) {
const res = await doRegister(i);
results.push(res.status);
}
// No 429 anywhere — the limit is OFF.
expect(results.filter((s) => s === 429).length).toBe(0);
expect(results.filter((s) => s === 200).length).toBe(12);
});
test('rate limit returns Retry-After header', async () => {
const app = createPrekeyServer({ crypto, store: new MemoryPrekeyStore() });

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/storage-encrypted",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/storage-indexeddb",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/storage-postgres",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -223,9 +223,16 @@ export async function ensureInboxServerTables(sql: Sql): Promise<void> {
ciphertext TEXT NOT NULL,
received_at BIGINT NOT NULL,
expires_at BIGINT NOT NULL,
sender_fp TEXT,
PRIMARY KEY (address, msg_id)
)
`;
// V4.8 — sender fingerprint column. Idempotent ADD COLUMN for live
// databases that came up under a 4.7-or-earlier schema.
await sql`
ALTER TABLE shade_inbox_blobs
ADD COLUMN IF NOT EXISTS sender_fp TEXT
`;
await sql`
CREATE INDEX IF NOT EXISTS shade_inbox_addr_expires_idx
ON shade_inbox_blobs(address, expires_at)

View File

@@ -56,18 +56,20 @@ export class PostgresInboxStore implements InboxStore {
msgId: string;
ciphertext: Uint8Array;
expiresAt: number;
senderFp?: string;
}): Promise<{ created: boolean; receivedAt: number }> {
// ON CONFLICT DO NOTHING + RETURNING keeps it idempotent and atomic.
// When a row already exists, we look up its received_at in a follow-up
// SELECT.
const inserted = await this.sql<Array<{ received_at: string }>>`
INSERT INTO shade_inbox_blobs (address, msg_id, ciphertext, received_at, expires_at)
INSERT INTO shade_inbox_blobs (address, msg_id, ciphertext, received_at, expires_at, sender_fp)
VALUES (
${args.address},
${args.msgId},
${toBase64(args.ciphertext)},
nextval('shade_inbox_seq'),
${args.expiresAt}
${args.expiresAt},
${args.senderFp ?? null}
)
ON CONFLICT (address, msg_id) DO NOTHING
RETURNING received_at::text
@@ -90,14 +92,23 @@ export class PostgresInboxStore implements InboxStore {
sinceCursor: number;
now: number;
limit: number;
}): Promise<Array<{ msgId: string; ciphertext: Uint8Array; receivedAt: number; expiresAt: number }>> {
}): Promise<
Array<{
msgId: string;
ciphertext: Uint8Array;
receivedAt: number;
expiresAt: number;
senderFp?: string;
}>
> {
const rows = await this.sql<Array<{
msg_id: string;
ciphertext: string;
received_at: string;
expires_at: string;
sender_fp: string | null;
}>>`
SELECT msg_id, ciphertext, received_at::text, expires_at::text
SELECT msg_id, ciphertext, received_at::text, expires_at::text, sender_fp
FROM shade_inbox_blobs
WHERE address = ${args.address}
AND received_at > ${args.sinceCursor}
@@ -105,12 +116,22 @@ export class PostgresInboxStore implements InboxStore {
ORDER BY received_at ASC
LIMIT ${args.limit}
`;
return rows.map((r) => ({
return rows.map((r) => {
const out: {
msgId: string;
ciphertext: Uint8Array;
receivedAt: number;
expiresAt: number;
senderFp?: string;
} = {
msgId: r.msg_id,
ciphertext: fromBase64(r.ciphertext),
receivedAt: parseInt(r.received_at, 10),
expiresAt: parseInt(r.expires_at, 10),
}));
};
if (r.sender_fp) out.senderFp = r.sender_fp;
return out;
});
}
async deleteBlob(address: string, msgId: string): Promise<boolean> {

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/storage-sqlite",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -53,6 +53,7 @@ export class SqliteInboxStore implements InboxStore {
ciphertext TEXT NOT NULL,
received_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
sender_fp TEXT,
PRIMARY KEY (address, msg_id)
);
CREATE INDEX IF NOT EXISTS idx_inbox_addr_expires
@@ -62,6 +63,14 @@ export class SqliteInboxStore implements InboxStore {
CREATE INDEX IF NOT EXISTS idx_inbox_expires
ON inbox_blobs(expires_at);
`);
// V4.8 — sender fingerprint column. Idempotent ALTER for live
// databases that came up under a 4.7-or-earlier schema.
const cols = this.db
.prepare(`PRAGMA table_info(inbox_blobs)`)
.all() as Array<{ name: string }>;
if (!cols.some((c) => c.name === 'sender_fp')) {
this.db.exec(`ALTER TABLE inbox_blobs ADD COLUMN sender_fp TEXT`);
}
}
private prepareStatements() {
@@ -73,13 +82,13 @@ export class SqliteInboxStore implements InboxStore {
deleteOwner: this.db.prepare('DELETE FROM inbox_owners WHERE address = ?'),
deleteOwnerBlobs: this.db.prepare('DELETE FROM inbox_blobs WHERE address = ?'),
insertBlob: this.db.prepare(
'INSERT INTO inbox_blobs (address, msg_id, ciphertext, received_at, expires_at) VALUES (?, ?, ?, ?, ?)',
'INSERT INTO inbox_blobs (address, msg_id, ciphertext, received_at, expires_at, sender_fp) VALUES (?, ?, ?, ?, ?, ?)',
),
findBlob: this.db.prepare(
'SELECT received_at FROM inbox_blobs WHERE address = ? AND msg_id = ?',
),
fetchSince: this.db.prepare(
`SELECT msg_id, ciphertext, received_at, expires_at
`SELECT msg_id, ciphertext, received_at, expires_at, sender_fp
FROM inbox_blobs
WHERE address = ? AND received_at > ? AND expires_at > ?
ORDER BY received_at ASC
@@ -124,6 +133,7 @@ export class SqliteInboxStore implements InboxStore {
msgId: string;
ciphertext: Uint8Array;
expiresAt: number;
senderFp?: string;
}): Promise<{ created: boolean; receivedAt: number }> {
const existing = this.stmts.findBlob.get(args.address, args.msgId) as
| { received_at: number }
@@ -139,6 +149,7 @@ export class SqliteInboxStore implements InboxStore {
toBase64(args.ciphertext),
receivedAt,
args.expiresAt,
args.senderFp ?? null,
);
return { created: true, receivedAt };
}
@@ -148,19 +159,43 @@ export class SqliteInboxStore implements InboxStore {
sinceCursor: number;
now: number;
limit: number;
}): Promise<Array<{ msgId: string; ciphertext: Uint8Array; receivedAt: number; expiresAt: number }>> {
}): Promise<
Array<{
msgId: string;
ciphertext: Uint8Array;
receivedAt: number;
expiresAt: number;
senderFp?: string;
}>
> {
const rows = this.stmts.fetchSince.all(
args.address,
args.sinceCursor,
args.now,
args.limit,
) as Array<{ msg_id: string; ciphertext: string; received_at: number; expires_at: number }>;
return rows.map((r) => ({
) as Array<{
msg_id: string;
ciphertext: string;
received_at: number;
expires_at: number;
sender_fp: string | null;
}>;
return rows.map((r) => {
const out: {
msgId: string;
ciphertext: Uint8Array;
receivedAt: number;
expiresAt: number;
senderFp?: string;
} = {
msgId: r.msg_id,
ciphertext: fromBase64(r.ciphertext),
receivedAt: r.received_at,
expiresAt: r.expires_at,
}));
};
if (r.sender_fp) out.senderFp = r.sender_fp;
return out;
});
}
async deleteBlob(address: string, msgId: string): Promise<boolean> {

View File

@@ -178,6 +178,104 @@ describe('SqliteInboxStore', () => {
expect(blobs.length).toBe(0);
});
test('senderFp round-trips through put + fetch (V4.8)', async () => {
const ct = randBytes(40);
const fp = '0123456789abcdef';
await store.putBlob({
address: 'bob',
msgId: 'a'.repeat(64),
ciphertext: ct,
expiresAt: Date.now() + 60_000,
senderFp: fp,
});
const blobs = await store.fetchBlobs({
address: 'bob',
sinceCursor: 0,
now: Date.now(),
limit: 10,
});
expect(blobs.length).toBe(1);
expect(blobs[0]!.senderFp).toBe(fp);
});
test('senderFp omitted on put → fetched row has senderFp undefined (V4.8 backward-compat)', async () => {
const ct = randBytes(40);
await store.putBlob({
address: 'bob',
msgId: 'b'.repeat(64),
ciphertext: ct,
expiresAt: Date.now() + 60_000,
});
const blobs = await store.fetchBlobs({
address: 'bob',
sinceCursor: 0,
now: Date.now(),
limit: 10,
});
expect(blobs.length).toBe(1);
expect(blobs[0]!.senderFp).toBeUndefined();
});
test('ALTER TABLE migration adds sender_fp to a pre-4.8 schema (V4.8)', async () => {
// Reproduce a pre-4.8 schema in a fresh DB, then reopen via
// SqliteInboxStore which should run the idempotent ALTER without
// dropping the existing rows.
store.close();
try {
unlinkSync(dbPath);
} catch {}
try {
unlinkSync(dbPath + '-wal');
} catch {}
try {
unlinkSync(dbPath + '-shm');
} catch {}
const { Database } = await import('bun:sqlite');
const legacy = new Database(dbPath, { create: true });
legacy.exec(`
CREATE TABLE inbox_blobs (
address TEXT NOT NULL,
msg_id TEXT NOT NULL,
ciphertext TEXT NOT NULL,
received_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
PRIMARY KEY (address, msg_id)
);
`);
legacy.prepare(
'INSERT INTO inbox_blobs (address, msg_id, ciphertext, received_at, expires_at) VALUES (?, ?, ?, ?, ?)',
).run('bob', 'c'.repeat(64), 'AAAA', Date.now(), Date.now() + 60_000);
legacy.close();
store = new SqliteInboxStore(dbPath);
const blobs = await store.fetchBlobs({
address: 'bob',
sinceCursor: 0,
now: Date.now(),
limit: 10,
});
expect(blobs.length).toBe(1);
expect(blobs[0]!.senderFp).toBeUndefined();
// New writes after migration carry senderFp.
await store.putBlob({
address: 'bob',
msgId: 'd'.repeat(64),
ciphertext: randBytes(8),
expiresAt: Date.now() + 60_000,
senderFp: 'feedfacedeadbeef',
});
const after = await store.fetchBlobs({
address: 'bob',
sinceCursor: 0,
now: Date.now(),
limit: 10,
});
const newer = after.find((b) => b.msgId === 'd'.repeat(64));
expect(newer?.senderFp).toBe('feedfacedeadbeef');
});
test('countBlobs ignores expired entries', async () => {
const now = Date.now();
await store.putBlob({

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/streams",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/transfer",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/transport-bridge",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -62,3 +62,39 @@ export function bridgeQueryToCanonical(qs: URLSearchParams): {
if (!Number.isFinite(signedAt)) return null;
return { address, kind, since, signedAt, signature };
}
// ─── V4.7 — presence subscription auth ────────────────────────────
export interface PresenceAuthInput {
crypto: CryptoProvider;
signingPrivateKey: Uint8Array;
/** The watcher's own address (signer of the request). */
address: string;
/** Addresses to subscribe presence updates for. May be empty. */
watched: readonly string[];
}
/**
* Build the signed query string for `GET /v1/bridge/presence`. The
* `kind: 'presence'` field is bound into the canonical payload to
* prevent cross-endpoint replay against `/v1/bridge/{stream,poll,ws}`.
*
* The `watched` array is sorted to give the signed bytes a canonical
* order; the wire form encodes it as a single comma-separated
* `watched=` parameter (address syntax disallows `,`).
*/
export async function signPresenceQuery(input: PresenceAuthInput): Promise<URLSearchParams> {
const watched = [...input.watched].sort();
const signed = await signPayload(input.crypto, input.signingPrivateKey, {
address: input.address,
kind: 'presence',
watched,
});
const qs = new URLSearchParams();
qs.set('address', input.address);
qs.set('kind', 'presence');
qs.set('watched', watched.join(','));
qs.set('signedAt', String(signed.signedAt));
qs.set('signature', signed.signature);
return qs;
}

View File

@@ -23,8 +23,17 @@ export { decodeWireMessage } from './types.js';
export { BridgeError } from './errors.js';
export { signBridgeQuery, bridgeQueryToCanonical } from './auth.js';
export type { BridgeKind, BridgeAuthInput } from './auth.js';
export { signBridgeQuery, bridgeQueryToCanonical, signPresenceQuery } from './auth.js';
export type { BridgeKind, BridgeAuthInput, PresenceAuthInput } from './auth.js';
export { PresenceBridge } from './presence-bridge.js';
export type {
PresenceBridgeOptions,
PresenceSubscribeOptions,
PresenceSubscription,
PresenceChange,
PresenceVia,
} from './presence-bridge.js';
export { SseBridge } from './sse-bridge.js';
export type { SseBridgeOptions } from './sse-bridge.js';

View File

@@ -51,7 +51,10 @@ export class LongPollBridge implements BridgeTransport {
private loopPromise: Promise<void> | null = null;
constructor(private readonly options: LongPollBridgeOptions) {
this.fetchFn = options.fetch ?? globalThis.fetch;
// Bind to globalThis: browser `fetch` is a WebIDL bound operation
// and throws "Illegal invocation" when called as `this.fetchFn(...)`.
const f = options.fetch ?? globalThis.fetch;
this.fetchFn = f.bind(globalThis);
this.cursor = options.startCursor ?? 0;
}

View File

@@ -0,0 +1,337 @@
/**
* V4.7 — presence subscription client.
*
* Consumes the SSE feed at `<base>/v1/bridge/presence?…` and fires
* `onPresenceChange` whenever a watched address transitions
* online/offline. Tracking is server-side: the inbox-server emits
* presence events on the 0↔1 boundary across WS + SSE bridge
* connections, and this client filters by the watcher's declared
* address list.
*
* Threat model context: the typical consumer (Prism, password
* managers, anything sender-key-broadcasting) wires this to
* `BroadcastChannel.removeMember` so a clean WS/SSE close on a
* paired-peer device revokes its sender-key membership within
* ~50ms. Long-poll bridges are deliberately NOT tracked on the
* server (see `inbox-server` `events.ts`); presence here is
* push-transport only.
*
* Watched-list mutations (`addPeer` / `removePeer`) trigger a
* reconnect with a fresh signed query so the server-side filter
* reflects the new set. Mutations are expected to be rare (only on
* pair / unpair, not on every message), so the brief reconnect gap
* is acceptable.
*/
import type { CryptoProvider } from '@shade/core';
import { signPresenceQuery } from './auth.js';
import { BridgeError } from './errors.js';
export type PresenceVia = 'ws' | 'sse';
export interface PresenceChange {
address: string;
status: 'online' | 'offline';
/** Server's wall-clock time (ms since epoch) when the change happened. */
at: number;
/** Which transport carried the connection. Absent on the initial snapshot. */
via?: PresenceVia;
}
export interface PresenceBridgeOptions {
/** Bridge base URL — same as `LongPollBridge` / `SseBridge`. */
baseUrl: string;
crypto: CryptoProvider;
/** Watcher's Ed25519 signing key (the address must be a registered inbox). */
signingPrivateKey: Uint8Array;
/** Watcher's address (the registered inbox owner). */
address: string;
/** Override `fetch` (tests). */
fetch?: typeof fetch;
/** Initial reconnect backoff (ms). Default 250. */
initialBackoffMs?: number;
/** Max reconnect backoff (ms). Default 10_000. */
maxBackoffMs?: number;
/** Disable automatic reconnect. Default false. */
disableAutoReconnect?: boolean;
}
export interface PresenceSubscribeOptions {
/** Initial set of addresses to watch. May be empty. */
watch: readonly string[];
/** Fired whenever a watched address transitions, plus once per address on initial open. */
onPresenceChange: (change: PresenceChange) => void | Promise<void>;
/** Optional reconnect / parse error reporter. */
onError?: (err: Error) => void;
}
export interface PresenceSubscription {
/** Add an address to the watched set. Triggers a reconnect. */
addPeer(address: string): Promise<void>;
/** Remove an address from the watched set. Triggers a reconnect. */
removePeer(address: string): Promise<void>;
/** Snapshot of the currently-watched addresses. */
watching(): readonly string[];
/** Tear down. Idempotent. */
unsubscribe(): Promise<void>;
}
const DEFAULT_INITIAL_BACKOFF = 250;
const DEFAULT_MAX_BACKOFF = 10_000;
export class PresenceBridge {
private readonly fetchFn: typeof fetch;
constructor(private readonly options: PresenceBridgeOptions) {
const f = options.fetch ?? globalThis.fetch;
this.fetchFn = f.bind(globalThis);
}
async subscribe(opts: PresenceSubscribeOptions): Promise<PresenceSubscription> {
const session = new PresenceSession(this.options, this.fetchFn, opts);
await session.start();
return session;
}
}
class PresenceSession implements PresenceSubscription {
private watched: string[];
private abortController: AbortController | null = null;
private currentReader: ReadableStreamDefaultReader<Uint8Array> | null = null;
private disposed = false;
private readLoopPromise: Promise<void> | null = null;
private readonly onPresenceChange: PresenceSubscribeOptions['onPresenceChange'];
private readonly onError: NonNullable<PresenceSubscribeOptions['onError']>;
private firstOpenResolve: (() => void) | null = null;
private firstOpenReject: ((err: Error) => void) | null = null;
private firstOpenSettled = false;
constructor(
private readonly options: PresenceBridgeOptions,
private readonly fetchFn: typeof fetch,
opts: PresenceSubscribeOptions,
) {
this.watched = [...opts.watch];
this.onPresenceChange = opts.onPresenceChange;
this.onError =
opts.onError ?? ((err) => console.warn('[shade-bridge:presence]', err.message));
}
watching(): readonly string[] {
return [...this.watched];
}
async start(): Promise<void> {
return this.openAndPump();
}
/**
* Open one SSE connection and resolve once the first response has
* been received (so that callers can `await subscribe()` and know
* the connection is established before the first state change).
* The read loop continues in the background.
*/
private openAndPump(): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.firstOpenSettled = false;
this.firstOpenResolve = () => {
if (this.firstOpenSettled) return;
this.firstOpenSettled = true;
resolve();
};
this.firstOpenReject = (err: Error) => {
if (this.firstOpenSettled) return;
this.firstOpenSettled = true;
reject(err);
};
this.readLoopPromise = this.runLoop();
});
}
private async runLoop(): Promise<void> {
let backoff = this.options.initialBackoffMs ?? DEFAULT_INITIAL_BACKOFF;
const maxBackoff = this.options.maxBackoffMs ?? DEFAULT_MAX_BACKOFF;
let firstAttempt = true;
while (!this.disposed) {
try {
await this.openOnce();
if (firstAttempt) {
firstAttempt = false;
this.firstOpenResolve?.();
}
// Reset backoff on a successful open.
backoff = this.options.initialBackoffMs ?? DEFAULT_INITIAL_BACKOFF;
await this.consume();
} catch (err) {
if (this.disposed) return;
if (firstAttempt) {
// Failed before we ever got a 200 — surface to the caller of subscribe().
this.firstOpenReject?.(err as Error);
return;
}
this.onError(err as Error);
}
this.currentReader = null;
if (this.disposed || this.options.disableAutoReconnect) return;
await sleep(backoff);
backoff = Math.min(backoff * 2, maxBackoff);
}
}
private async openOnce(): Promise<void> {
const qs = await signPresenceQuery({
crypto: this.options.crypto,
signingPrivateKey: this.options.signingPrivateKey,
address: this.options.address,
watched: this.watched,
});
const url = `${stripTrailingSlash(this.options.baseUrl)}/v1/bridge/presence?${qs.toString()}`;
this.abortController = new AbortController();
let res: Response;
try {
res = await this.fetchFn(url, {
method: 'GET',
headers: { accept: 'text/event-stream', 'cache-control': 'no-cache' },
signal: this.abortController.signal,
});
} catch (err) {
throw new BridgeError(`presence connect failed: ${(err as Error).message}`);
}
if (!res.ok) {
throw new BridgeError(`presence connect failed: HTTP ${res.status}`, res.status);
}
if (!res.body) {
throw new BridgeError('presence response has no body');
}
this.currentReader = res.body.getReader() as ReadableStreamDefaultReader<Uint8Array>;
}
private async consume(): Promise<void> {
const reader = this.currentReader;
if (!reader) return;
const decoder = new TextDecoder();
let buf = '';
let dataLines: string[] = [];
let eventName: string | null = null;
while (true) {
let chunk: Awaited<ReturnType<typeof reader.read>>;
try {
chunk = await reader.read();
} catch (err) {
// Reader cancelled (mutation / unsubscribe) — exit cleanly.
if (this.disposed || (err as Error).name === 'AbortError') return;
throw err;
}
if (chunk.done) return;
buf += decoder.decode(chunk.value, { stream: true });
let idx;
while ((idx = buf.indexOf('\n')) !== -1) {
const rawLine = buf.slice(0, idx);
buf = buf.slice(idx + 1);
const line = rawLine.endsWith('\r') ? rawLine.slice(0, -1) : rawLine;
if (line === '') {
if (dataLines.length > 0) {
await this.dispatch(eventName, dataLines.join('\n'));
}
dataLines = [];
eventName = null;
continue;
}
if (line.startsWith(':')) continue;
const colon = line.indexOf(':');
const field = colon === -1 ? line : line.slice(0, colon);
let val = colon === -1 ? '' : line.slice(colon + 1);
if (val.startsWith(' ')) val = val.slice(1);
if (field === 'data') dataLines.push(val);
else if (field === 'event') eventName = val;
}
}
}
private async dispatch(name: string | null, data: string): Promise<void> {
if (name !== null && name !== '' && name !== 'presence') return;
let parsed: unknown;
try {
parsed = JSON.parse(data);
} catch (err) {
this.onError(new BridgeError(`malformed presence data: ${(err as Error).message}`));
return;
}
const change = parsed as PresenceChange;
if (
typeof change.address !== 'string' ||
(change.status !== 'online' && change.status !== 'offline') ||
typeof change.at !== 'number'
) {
this.onError(new BridgeError('presence frame missing required fields'));
return;
}
try {
await this.onPresenceChange(change);
} catch (err) {
this.onError(err as Error);
}
}
async addPeer(address: string): Promise<void> {
if (this.disposed) throw new BridgeError('PresenceBridge subscription disposed');
if (this.watched.includes(address)) return;
this.watched = [...this.watched, address];
await this.reconnect();
}
async removePeer(address: string): Promise<void> {
if (this.disposed) throw new BridgeError('PresenceBridge subscription disposed');
if (!this.watched.includes(address)) return;
this.watched = this.watched.filter((a) => a !== address);
await this.reconnect();
}
/**
* Tear down the current SSE connection so the run loop reopens with
* the new watched list. Cancels via abort + reader.cancel — both are
* tolerated by the consume() catch path.
*/
private async reconnect(): Promise<void> {
const reader = this.currentReader;
this.currentReader = null;
this.abortController?.abort();
if (reader) {
try {
await reader.cancel();
} catch {
/* ignore */
}
}
}
async unsubscribe(): Promise<void> {
if (this.disposed) return;
this.disposed = true;
const reader = this.currentReader;
this.currentReader = null;
this.abortController?.abort();
if (reader) {
try {
await reader.cancel();
} catch {
/* ignore */
}
}
if (this.readLoopPromise) {
try {
await this.readLoopPromise;
} catch {
/* ignore */
}
}
}
}
function stripTrailingSlash(s: string): string {
return s.endsWith('/') ? s.slice(0, -1) : s;
}
function sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms));
}

View File

@@ -58,7 +58,10 @@ export class SseBridge implements BridgeTransport {
console.warn('[shade-bridge:sse]', err.message);
constructor(private readonly options: SseBridgeOptions) {
this.fetchFn = options.fetch ?? globalThis.fetch;
// Bind to globalThis: browser `fetch` is a WebIDL bound operation
// and throws "Illegal invocation" when called as `this.fetchFn(...)`.
const f = options.fetch ?? globalThis.fetch;
this.fetchFn = f.bind(globalThis);
this.cursor = options.startCursor ?? 0;
}

View File

@@ -28,7 +28,9 @@ import {
WsBridge,
FallbackBridgeTransport,
signBridgeQuery,
PresenceBridge,
type IncomingMessage,
type PresenceChange,
} from '../src/index.js';
const crypto = new SubtleCryptoProvider();
@@ -510,3 +512,519 @@ describe('Bridge cursor resume', () => {
await sseB.disconnect();
});
});
describe('Bridges — default fetch is bound to globalThis', () => {
// Regression: browsers' `fetch` is a WebIDL bound operation that throws
// "Illegal invocation" when called via `this.fetchFn(...)`. Constructors
// for LongPollBridge / SseBridge must `bind(globalThis)`.
function installStrictFetch(): { restore: () => void; getReceiver: () => unknown } {
const realFetch = globalThis.fetch;
let observedReceiver: unknown = 'unset';
function strictFetch(this: unknown, _input: unknown, _init?: unknown): Promise<Response> {
observedReceiver = this;
if (this !== globalThis) {
throw new TypeError("Failed to execute 'fetch' on 'Window': Illegal invocation");
}
return Promise.resolve(
new Response('{"blobs":[]}', {
status: 200,
headers: { 'content-type': 'application/json' },
}),
);
}
Object.defineProperty(globalThis, 'fetch', {
configurable: true,
writable: true,
value: strictFetch,
});
return {
restore: () =>
Object.defineProperty(globalThis, 'fetch', {
configurable: true,
writable: true,
value: realFetch,
}),
getReceiver: () => observedReceiver,
};
}
test('LongPollBridge default path passes globalThis as `this`', async () => {
const { restore, getReceiver } = installStrictFetch();
try {
const id = await generateIdentityKeyPair(crypto);
const bridge = new LongPollBridge({
baseUrl: 'http://example.invalid',
auth: { crypto, signingPrivateKey: id.signingPrivateKey, address: 'foo' },
pollTimeoutMs: 100,
requestTimeoutMs: 200,
errorBackoffMs: 50,
disableLoop: true,
});
await bridge.connect({ onMessage: () => {} });
expect(getReceiver()).toBe(globalThis);
await bridge.disconnect();
} finally {
restore();
}
});
test('SseBridge default path passes globalThis as `this`', async () => {
// For SSE we only need to know the very first fetch was bound; the
// 200-with-empty-stream response will let openOnce return cleanly,
// and disableAutoReconnect prevents an infinite reconnect loop.
const realFetch = globalThis.fetch;
let observedReceiver: unknown = 'unset';
function strictFetch(this: unknown, _input: unknown, _init?: unknown): Promise<Response> {
observedReceiver = this;
if (this !== globalThis) {
throw new TypeError("Failed to execute 'fetch' on 'Window': Illegal invocation");
}
// Empty SSE-shaped body: stream closes immediately.
return Promise.resolve(
new Response('', {
status: 200,
headers: { 'content-type': 'text/event-stream' },
}),
);
}
Object.defineProperty(globalThis, 'fetch', {
configurable: true,
writable: true,
value: strictFetch,
});
try {
const id = await generateIdentityKeyPair(crypto);
const bridge = new SseBridge({
baseUrl: 'http://example.invalid',
auth: { crypto, signingPrivateKey: id.signingPrivateKey, address: 'foo' },
initialBackoffMs: 50,
maxBackoffMs: 100,
disableAutoReconnect: true,
});
await bridge.connect({ onMessage: () => {} });
expect(observedReceiver).toBe(globalThis);
await bridge.disconnect();
} finally {
Object.defineProperty(globalThis, 'fetch', {
configurable: true,
writable: true,
value: realFetch,
});
}
});
});
// ─── V4.7 — presence events ────────────────────────────────────────
async function registerAddress(
baseUrl: string,
address: string,
identity: Awaited<ReturnType<typeof generateIdentityKeyPair>>,
): Promise<void> {
const body = await signPayload(crypto, identity.signingPrivateKey, {
address,
signingKey: toBase64(identity.signingPublicKey),
});
const res = await fetch(`${baseUrl}/v1/inbox/register`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(body),
});
expect(res.status).toBe(200);
}
describe('Presence — server emits peer_connected / peer_disconnected', () => {
let h: Harness;
beforeAll(async () => {
h = await bootstrap();
// bootstrap registers bob; alice needs registration too so her bridge
// can authenticate against /v1/bridge/ws.
await registerAddress(h.baseUrl, 'alice', h.alice);
});
afterAll(() => {
h.server.stop(true);
});
test('open + close a WsBridge → events fire on the inbox event bus (acceptance 1)', async () => {
const seen: Array<{ name: string; address: string; bridgeKind: string }> = [];
const off = h.events.on((e) => {
if (e.name === 'inbox.peer_connected' || e.name === 'inbox.peer_disconnected') {
seen.push({
name: e.name,
address: e.data.address,
bridgeKind: e.data.bridgeKind,
});
}
});
try {
const ws = new WsBridge({
baseUrl: h.baseUrl,
auth: { crypto, signingPrivateKey: h.alice.signingPrivateKey, address: 'alice' },
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await ws.connect({ onMessage: () => {} });
await waitFor(() => seen.some((s) => s.name === 'inbox.peer_connected'), 2_000);
await ws.disconnect();
await waitFor(() => seen.some((s) => s.name === 'inbox.peer_disconnected'), 2_000);
expect(seen[0]).toEqual({ name: 'inbox.peer_connected', address: 'alice', bridgeKind: 'ws' });
expect(seen[seen.length - 1]).toEqual({
name: 'inbox.peer_disconnected',
address: 'alice',
bridgeKind: 'ws',
});
} finally {
off();
}
});
});
describe('PresenceBridge — subscribe to remote presence changes', () => {
let h: Harness;
beforeAll(async () => {
h = await bootstrap();
await registerAddress(h.baseUrl, 'alice', h.alice);
});
afterAll(() => {
h.server.stop(true);
});
test('online → offline → online over a single subscription (acceptance 2A + 4)', async () => {
const presence = new PresenceBridge({
baseUrl: h.baseUrl,
crypto,
signingPrivateKey: h.bob.signingPrivateKey,
address: 'bob',
initialBackoffMs: 50,
maxBackoffMs: 200,
});
const changes: PresenceChange[] = [];
const sub = await presence.subscribe({
watch: ['alice'],
onPresenceChange: (e) => {
changes.push(e);
},
});
try {
// Initial snapshot — alice not yet connected.
await waitFor(() => changes.length >= 1, 2_000);
expect(changes[0]!.address).toBe('alice');
expect(changes[0]!.status).toBe('offline');
expect(changes[0]!.via).toBeUndefined();
// Alice opens a WsBridge → bob must see online (acceptance 2A).
const aliceWs = new WsBridge({
baseUrl: h.baseUrl,
auth: { crypto, signingPrivateKey: h.alice.signingPrivateKey, address: 'alice' },
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await aliceWs.connect({ onMessage: () => {} });
await waitFor(
() => changes.some((c) => c.status === 'online' && c.address === 'alice'),
2_000,
);
const onlineFrame = changes.find((c) => c.status === 'online' && c.address === 'alice')!;
expect(onlineFrame.via).toBe('ws');
// Alice's bridge drops → bob must see offline.
await aliceWs.disconnect();
await waitFor(
() =>
changes.filter((c) => c.address === 'alice' && c.status === 'offline').length >= 2,
2_000,
);
// Reconnect: alice reopens → bob sees online again (acceptance 4).
const aliceWs2 = new WsBridge({
baseUrl: h.baseUrl,
auth: { crypto, signingPrivateKey: h.alice.signingPrivateKey, address: 'alice' },
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await aliceWs2.connect({ onMessage: () => {} });
await waitFor(
() => changes.filter((c) => c.address === 'alice' && c.status === 'online').length >= 2,
2_000,
);
await aliceWs2.disconnect();
} finally {
await sub.unsubscribe();
}
});
test('subscription on [alice] does not leak carol (acceptance 3)', async () => {
const carol = await generateIdentityKeyPair(crypto);
await registerAddress(h.baseUrl, 'carol', carol);
const presence = new PresenceBridge({
baseUrl: h.baseUrl,
crypto,
signingPrivateKey: h.bob.signingPrivateKey,
address: 'bob',
initialBackoffMs: 50,
maxBackoffMs: 200,
});
const changes: PresenceChange[] = [];
const sub = await presence.subscribe({
watch: ['alice'],
onPresenceChange: (e) => {
changes.push(e);
},
});
try {
// Drain the initial snapshot for alice.
await waitFor(() => changes.length >= 1, 2_000);
const baseline = changes.length;
// Carol opens a bridge — bob's alice-only subscription must NOT see her.
const carolWs = new WsBridge({
baseUrl: h.baseUrl,
auth: { crypto, signingPrivateKey: carol.signingPrivateKey, address: 'carol' },
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await carolWs.connect({ onMessage: () => {} });
// Give the server time to emit + filter.
await new Promise((r) => setTimeout(r, 250));
await carolWs.disconnect();
await new Promise((r) => setTimeout(r, 100));
const newFrames = changes.slice(baseline);
for (const f of newFrames) {
expect(f.address).toBe('alice');
expect(f.address).not.toBe('carol');
}
} finally {
await sub.unsubscribe();
}
});
test('addPeer / removePeer mutate the watched set via reconnect', async () => {
const carol = await generateIdentityKeyPair(crypto);
await registerAddress(h.baseUrl, 'carol-2', carol);
const presence = new PresenceBridge({
baseUrl: h.baseUrl,
crypto,
signingPrivateKey: h.bob.signingPrivateKey,
address: 'bob',
initialBackoffMs: 50,
maxBackoffMs: 200,
});
const changes: PresenceChange[] = [];
const sub = await presence.subscribe({
watch: ['alice'],
onPresenceChange: (e) => {
changes.push(e);
},
});
try {
await waitFor(() => changes.length >= 1, 2_000);
expect(sub.watching()).toEqual(['alice']);
// Add carol-2 — reconnect should deliver a fresh snapshot that
// includes the new address.
await sub.addPeer('carol-2');
await waitFor(() => changes.some((c) => c.address === 'carol-2'), 2_000);
expect(sub.watching().sort()).toEqual(['alice', 'carol-2']);
const carolWs = new WsBridge({
baseUrl: h.baseUrl,
auth: { crypto, signingPrivateKey: carol.signingPrivateKey, address: 'carol-2' },
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await carolWs.connect({ onMessage: () => {} });
await waitFor(
() => changes.some((c) => c.address === 'carol-2' && c.status === 'online'),
2_000,
);
await carolWs.disconnect();
// removePeer → carol-2 events must stop arriving.
await sub.removePeer('carol-2');
const baseline = changes.filter((c) => c.address === 'carol-2').length;
const carolWs2 = new WsBridge({
baseUrl: h.baseUrl,
auth: { crypto, signingPrivateKey: carol.signingPrivateKey, address: 'carol-2' },
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await carolWs2.connect({ onMessage: () => {} });
await new Promise((r) => setTimeout(r, 250));
await carolWs2.disconnect();
await new Promise((r) => setTimeout(r, 100));
const after = changes.filter((c) => c.address === 'carol-2').length;
expect(after).toBe(baseline);
expect(sub.watching()).toEqual(['alice']);
} finally {
await sub.unsubscribe();
}
});
});
// ─── V4.8 — sender-fingerprint propagation on bridge push ──────────
describe('Sender attribution — bridge push surfaces IncomingMessage.from', () => {
test('SSE push carries from = 8-byte hex of SHA-256(senderSigningKey)', async () => {
const h = await bootstrap();
try {
const received: IncomingMessage[] = [];
const bridge = new SseBridge({
baseUrl: h.baseUrl,
auth: bobAuth(h),
initialBackoffMs: 50,
maxBackoffMs: 200,
disableAutoReconnect: true,
});
await bridge.connect({ onMessage: (m) => received.push(m) });
try {
await putBlob(h, rand(48));
await waitFor(() => received.length === 1, 5_000);
const fp = received[0]!.from;
expect(fp).toMatch(/^[0-9a-f]{16}$/);
const digest = await globalThis.crypto.subtle.digest(
'SHA-256',
h.alice.signingPublicKey as unknown as ArrayBuffer,
);
const expected = Array.from(new Uint8Array(digest).slice(0, 8), (b) =>
b.toString(16).padStart(2, '0'),
).join('');
expect(fp).toBe(expected);
} finally {
await bridge.disconnect();
}
} finally {
h.server.stop(true);
}
});
test('WS push carries from likewise', async () => {
const h = await bootstrap();
try {
const received: IncomingMessage[] = [];
const bridge = new WsBridge({
baseUrl: h.baseUrl,
auth: bobAuth(h),
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await bridge.connect({ onMessage: (m) => received.push(m) });
try {
await putBlob(h, rand(48));
await waitFor(() => received.length === 1, 5_000);
expect(received[0]!.from).toMatch(/^[0-9a-f]{16}$/);
} finally {
await bridge.disconnect();
}
} finally {
h.server.stop(true);
}
});
test('long-poll push carries from likewise', async () => {
const h = await bootstrap();
try {
const received: IncomingMessage[] = [];
const bridge = new LongPollBridge({
baseUrl: h.baseUrl,
auth: bobAuth(h),
pollTimeoutMs: 500,
requestTimeoutMs: 1_500,
errorBackoffMs: 50,
});
await bridge.connect({ onMessage: (m) => received.push(m) });
try {
await putBlob(h, rand(48));
await waitFor(() => received.length === 1, 5_000);
expect(received[0]!.from).toMatch(/^[0-9a-f]{16}$/);
} finally {
await bridge.disconnect();
}
} finally {
h.server.stop(true);
}
});
});
// ─── V4.8.2 — per-connection msgId dedup (Prism FR: duplicate fan-out) ─
describe('Bridge dedup — single PUT yields exactly one push per connection', () => {
test('WS: storming inbox.blob_stored does not duplicate frames for one msgId', async () => {
const h = await bootstrap();
try {
const received: IncomingMessage[] = [];
const bridge = new WsBridge({
baseUrl: h.baseUrl,
auth: bobAuth(h),
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await bridge.connect({ onMessage: (m) => received.push(m) });
try {
// One real PUT + replay the inbox.blob_stored event ten times to
// simulate any future code path (or external bug) that double-
// fires the trigger. The cursor in flushTo would already cover
// the happy case, but the per-connection LRU is the explicit
// dedup gate that survives even if cursor logic regresses.
const msgId = await putBlob(h, rand(48));
for (let i = 0; i < 10; i++) {
h.events.emit('inbox.blob_stored', {
address: 'bob',
msgId,
bytes: 48,
ttlSeconds: 60,
});
}
await waitFor(() => received.length >= 1, 2_000);
// Give any stragglers a chance to arrive and inflate the count.
await new Promise((r) => setTimeout(r, 250));
expect(received.length).toBe(1);
expect(received[0]!.msgId).toBe(msgId);
} finally {
await bridge.disconnect();
}
} finally {
h.server.stop(true);
}
});
test('SSE: same dedup contract', async () => {
const h = await bootstrap();
try {
const received: IncomingMessage[] = [];
const bridge = new SseBridge({
baseUrl: h.baseUrl,
auth: bobAuth(h),
initialBackoffMs: 50,
maxBackoffMs: 200,
disableAutoReconnect: true,
});
await bridge.connect({ onMessage: (m) => received.push(m) });
try {
const msgId = await putBlob(h, rand(48));
for (let i = 0; i < 10; i++) {
h.events.emit('inbox.blob_stored', {
address: 'bob',
msgId,
bytes: 48,
ttlSeconds: 60,
});
}
await waitFor(() => received.length >= 1, 2_000);
await new Promise((r) => setTimeout(r, 250));
expect(received.length).toBe(1);
expect(received[0]!.msgId).toBe(msgId);
} finally {
await bridge.disconnect();
}
} finally {
h.server.stop(true);
}
});
});

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/transport-webrtc",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/transport",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/widgets",
"version": "4.6.0",
"version": "4.8.2",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",