release(v4.7.0): peer-presence events for instant BroadcastChannel revoke

Adds the bridge-connection-lifecycle signal that closes Prism's
~45s revoke window down to one server→client round-trip (~50ms).

Server (`@shade/inbox-server`):
- `inbox.peer_connected` / `inbox.peer_disconnected` events on the
  0↔1 boundary across WS + SSE bridges. Long-poll deliberately not
  tracked (every poll boundary would flap; push transports are also
  the only ones where instant revoke matters).
- `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE
  during fallback handover) into one connect/disconnect pair.
- `GET /v1/bridge/presence` SSE endpoint: signed query with
  `kind: 'presence'`, `watched: string[]`; on open streams a
  per-address snapshot, then change frames filtered server-side.
  MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as
  a peer-bridge connection.
- `createBridgeRoutes` now returns `{ app, websocket, presence }`.

Client (`@shade/transport-bridge`):
- `PresenceBridge.subscribe({ watch, onPresenceChange })` →
  `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer
  mutate via reconnect with a fresh signed query.
- `signPresenceQuery` helper for non-PresenceBridge consumers.

Tests cover all four acceptance criteria from the Prism request:
server-event smoke, online→offline subscription, address scoping
(carol invisible to a [alice]-only sub), reconnect, plus an
addPeer/removePeer regression.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-07 23:16:35 +02:00
parent 8746571d2a
commit 594992a183
34 changed files with 1042 additions and 28 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "@shade/transport-bridge",
"version": "4.6.1",
"version": "4.7.0",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",

View File

@@ -62,3 +62,39 @@ export function bridgeQueryToCanonical(qs: URLSearchParams): {
if (!Number.isFinite(signedAt)) return null;
return { address, kind, since, signedAt, signature };
}
// ─── V4.7 — presence subscription auth ────────────────────────────
export interface PresenceAuthInput {
crypto: CryptoProvider;
signingPrivateKey: Uint8Array;
/** The watcher's own address (signer of the request). */
address: string;
/** Addresses to subscribe presence updates for. May be empty. */
watched: readonly string[];
}
/**
* Build the signed query string for `GET /v1/bridge/presence`. The
* `kind: 'presence'` field is bound into the canonical payload to
* prevent cross-endpoint replay against `/v1/bridge/{stream,poll,ws}`.
*
* The `watched` array is sorted to give the signed bytes a canonical
* order; the wire form encodes it as a single comma-separated
* `watched=` parameter (address syntax disallows `,`).
*/
export async function signPresenceQuery(input: PresenceAuthInput): Promise<URLSearchParams> {
const watched = [...input.watched].sort();
const signed = await signPayload(input.crypto, input.signingPrivateKey, {
address: input.address,
kind: 'presence',
watched,
});
const qs = new URLSearchParams();
qs.set('address', input.address);
qs.set('kind', 'presence');
qs.set('watched', watched.join(','));
qs.set('signedAt', String(signed.signedAt));
qs.set('signature', signed.signature);
return qs;
}

View File

@@ -23,8 +23,17 @@ export { decodeWireMessage } from './types.js';
export { BridgeError } from './errors.js';
export { signBridgeQuery, bridgeQueryToCanonical } from './auth.js';
export type { BridgeKind, BridgeAuthInput } from './auth.js';
export { signBridgeQuery, bridgeQueryToCanonical, signPresenceQuery } from './auth.js';
export type { BridgeKind, BridgeAuthInput, PresenceAuthInput } from './auth.js';
export { PresenceBridge } from './presence-bridge.js';
export type {
PresenceBridgeOptions,
PresenceSubscribeOptions,
PresenceSubscription,
PresenceChange,
PresenceVia,
} from './presence-bridge.js';
export { SseBridge } from './sse-bridge.js';
export type { SseBridgeOptions } from './sse-bridge.js';

View File

@@ -0,0 +1,337 @@
/**
* V4.7 — presence subscription client.
*
* Consumes the SSE feed at `<base>/v1/bridge/presence?…` and fires
* `onPresenceChange` whenever a watched address transitions
* online/offline. Tracking is server-side: the inbox-server emits
* presence events on the 0↔1 boundary across WS + SSE bridge
* connections, and this client filters by the watcher's declared
* address list.
*
* Threat model context: the typical consumer (Prism, password
* managers, anything sender-key-broadcasting) wires this to
* `BroadcastChannel.removeMember` so a clean WS/SSE close on a
* paired-peer device revokes its sender-key membership within
* ~50ms. Long-poll bridges are deliberately NOT tracked on the
* server (see `inbox-server` `events.ts`); presence here is
* push-transport only.
*
* Watched-list mutations (`addPeer` / `removePeer`) trigger a
* reconnect with a fresh signed query so the server-side filter
* reflects the new set. Mutations are expected to be rare (only on
* pair / unpair, not on every message), so the brief reconnect gap
* is acceptable.
*/
import type { CryptoProvider } from '@shade/core';
import { signPresenceQuery } from './auth.js';
import { BridgeError } from './errors.js';
export type PresenceVia = 'ws' | 'sse';
export interface PresenceChange {
address: string;
status: 'online' | 'offline';
/** Server's wall-clock time (ms since epoch) when the change happened. */
at: number;
/** Which transport carried the connection. Absent on the initial snapshot. */
via?: PresenceVia;
}
export interface PresenceBridgeOptions {
/** Bridge base URL — same as `LongPollBridge` / `SseBridge`. */
baseUrl: string;
crypto: CryptoProvider;
/** Watcher's Ed25519 signing key (the address must be a registered inbox). */
signingPrivateKey: Uint8Array;
/** Watcher's address (the registered inbox owner). */
address: string;
/** Override `fetch` (tests). */
fetch?: typeof fetch;
/** Initial reconnect backoff (ms). Default 250. */
initialBackoffMs?: number;
/** Max reconnect backoff (ms). Default 10_000. */
maxBackoffMs?: number;
/** Disable automatic reconnect. Default false. */
disableAutoReconnect?: boolean;
}
export interface PresenceSubscribeOptions {
/** Initial set of addresses to watch. May be empty. */
watch: readonly string[];
/** Fired whenever a watched address transitions, plus once per address on initial open. */
onPresenceChange: (change: PresenceChange) => void | Promise<void>;
/** Optional reconnect / parse error reporter. */
onError?: (err: Error) => void;
}
export interface PresenceSubscription {
/** Add an address to the watched set. Triggers a reconnect. */
addPeer(address: string): Promise<void>;
/** Remove an address from the watched set. Triggers a reconnect. */
removePeer(address: string): Promise<void>;
/** Snapshot of the currently-watched addresses. */
watching(): readonly string[];
/** Tear down. Idempotent. */
unsubscribe(): Promise<void>;
}
const DEFAULT_INITIAL_BACKOFF = 250;
const DEFAULT_MAX_BACKOFF = 10_000;
export class PresenceBridge {
private readonly fetchFn: typeof fetch;
constructor(private readonly options: PresenceBridgeOptions) {
const f = options.fetch ?? globalThis.fetch;
this.fetchFn = f.bind(globalThis);
}
async subscribe(opts: PresenceSubscribeOptions): Promise<PresenceSubscription> {
const session = new PresenceSession(this.options, this.fetchFn, opts);
await session.start();
return session;
}
}
class PresenceSession implements PresenceSubscription {
private watched: string[];
private abortController: AbortController | null = null;
private currentReader: ReadableStreamDefaultReader<Uint8Array> | null = null;
private disposed = false;
private readLoopPromise: Promise<void> | null = null;
private readonly onPresenceChange: PresenceSubscribeOptions['onPresenceChange'];
private readonly onError: NonNullable<PresenceSubscribeOptions['onError']>;
private firstOpenResolve: (() => void) | null = null;
private firstOpenReject: ((err: Error) => void) | null = null;
private firstOpenSettled = false;
constructor(
private readonly options: PresenceBridgeOptions,
private readonly fetchFn: typeof fetch,
opts: PresenceSubscribeOptions,
) {
this.watched = [...opts.watch];
this.onPresenceChange = opts.onPresenceChange;
this.onError =
opts.onError ?? ((err) => console.warn('[shade-bridge:presence]', err.message));
}
watching(): readonly string[] {
return [...this.watched];
}
async start(): Promise<void> {
return this.openAndPump();
}
/**
* Open one SSE connection and resolve once the first response has
* been received (so that callers can `await subscribe()` and know
* the connection is established before the first state change).
* The read loop continues in the background.
*/
private openAndPump(): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.firstOpenSettled = false;
this.firstOpenResolve = () => {
if (this.firstOpenSettled) return;
this.firstOpenSettled = true;
resolve();
};
this.firstOpenReject = (err: Error) => {
if (this.firstOpenSettled) return;
this.firstOpenSettled = true;
reject(err);
};
this.readLoopPromise = this.runLoop();
});
}
private async runLoop(): Promise<void> {
let backoff = this.options.initialBackoffMs ?? DEFAULT_INITIAL_BACKOFF;
const maxBackoff = this.options.maxBackoffMs ?? DEFAULT_MAX_BACKOFF;
let firstAttempt = true;
while (!this.disposed) {
try {
await this.openOnce();
if (firstAttempt) {
firstAttempt = false;
this.firstOpenResolve?.();
}
// Reset backoff on a successful open.
backoff = this.options.initialBackoffMs ?? DEFAULT_INITIAL_BACKOFF;
await this.consume();
} catch (err) {
if (this.disposed) return;
if (firstAttempt) {
// Failed before we ever got a 200 — surface to the caller of subscribe().
this.firstOpenReject?.(err as Error);
return;
}
this.onError(err as Error);
}
this.currentReader = null;
if (this.disposed || this.options.disableAutoReconnect) return;
await sleep(backoff);
backoff = Math.min(backoff * 2, maxBackoff);
}
}
private async openOnce(): Promise<void> {
const qs = await signPresenceQuery({
crypto: this.options.crypto,
signingPrivateKey: this.options.signingPrivateKey,
address: this.options.address,
watched: this.watched,
});
const url = `${stripTrailingSlash(this.options.baseUrl)}/v1/bridge/presence?${qs.toString()}`;
this.abortController = new AbortController();
let res: Response;
try {
res = await this.fetchFn(url, {
method: 'GET',
headers: { accept: 'text/event-stream', 'cache-control': 'no-cache' },
signal: this.abortController.signal,
});
} catch (err) {
throw new BridgeError(`presence connect failed: ${(err as Error).message}`);
}
if (!res.ok) {
throw new BridgeError(`presence connect failed: HTTP ${res.status}`, res.status);
}
if (!res.body) {
throw new BridgeError('presence response has no body');
}
this.currentReader = res.body.getReader() as ReadableStreamDefaultReader<Uint8Array>;
}
private async consume(): Promise<void> {
const reader = this.currentReader;
if (!reader) return;
const decoder = new TextDecoder();
let buf = '';
let dataLines: string[] = [];
let eventName: string | null = null;
while (true) {
let chunk: Awaited<ReturnType<typeof reader.read>>;
try {
chunk = await reader.read();
} catch (err) {
// Reader cancelled (mutation / unsubscribe) — exit cleanly.
if (this.disposed || (err as Error).name === 'AbortError') return;
throw err;
}
if (chunk.done) return;
buf += decoder.decode(chunk.value, { stream: true });
let idx;
while ((idx = buf.indexOf('\n')) !== -1) {
const rawLine = buf.slice(0, idx);
buf = buf.slice(idx + 1);
const line = rawLine.endsWith('\r') ? rawLine.slice(0, -1) : rawLine;
if (line === '') {
if (dataLines.length > 0) {
await this.dispatch(eventName, dataLines.join('\n'));
}
dataLines = [];
eventName = null;
continue;
}
if (line.startsWith(':')) continue;
const colon = line.indexOf(':');
const field = colon === -1 ? line : line.slice(0, colon);
let val = colon === -1 ? '' : line.slice(colon + 1);
if (val.startsWith(' ')) val = val.slice(1);
if (field === 'data') dataLines.push(val);
else if (field === 'event') eventName = val;
}
}
}
private async dispatch(name: string | null, data: string): Promise<void> {
if (name !== null && name !== '' && name !== 'presence') return;
let parsed: unknown;
try {
parsed = JSON.parse(data);
} catch (err) {
this.onError(new BridgeError(`malformed presence data: ${(err as Error).message}`));
return;
}
const change = parsed as PresenceChange;
if (
typeof change.address !== 'string' ||
(change.status !== 'online' && change.status !== 'offline') ||
typeof change.at !== 'number'
) {
this.onError(new BridgeError('presence frame missing required fields'));
return;
}
try {
await this.onPresenceChange(change);
} catch (err) {
this.onError(err as Error);
}
}
async addPeer(address: string): Promise<void> {
if (this.disposed) throw new BridgeError('PresenceBridge subscription disposed');
if (this.watched.includes(address)) return;
this.watched = [...this.watched, address];
await this.reconnect();
}
async removePeer(address: string): Promise<void> {
if (this.disposed) throw new BridgeError('PresenceBridge subscription disposed');
if (!this.watched.includes(address)) return;
this.watched = this.watched.filter((a) => a !== address);
await this.reconnect();
}
/**
* Tear down the current SSE connection so the run loop reopens with
* the new watched list. Cancels via abort + reader.cancel — both are
* tolerated by the consume() catch path.
*/
private async reconnect(): Promise<void> {
const reader = this.currentReader;
this.currentReader = null;
this.abortController?.abort();
if (reader) {
try {
await reader.cancel();
} catch {
/* ignore */
}
}
}
async unsubscribe(): Promise<void> {
if (this.disposed) return;
this.disposed = true;
const reader = this.currentReader;
this.currentReader = null;
this.abortController?.abort();
if (reader) {
try {
await reader.cancel();
} catch {
/* ignore */
}
}
if (this.readLoopPromise) {
try {
await this.readLoopPromise;
} catch {
/* ignore */
}
}
}
}
function stripTrailingSlash(s: string): string {
return s.endsWith('/') ? s.slice(0, -1) : s;
}
function sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms));
}

View File

@@ -28,7 +28,9 @@ import {
WsBridge,
FallbackBridgeTransport,
signBridgeQuery,
PresenceBridge,
type IncomingMessage,
type PresenceChange,
} from '../src/index.js';
const crypto = new SubtleCryptoProvider();
@@ -611,3 +613,257 @@ describe('Bridges — default fetch is bound to globalThis', () => {
}
});
});
// ─── V4.7 — presence events ────────────────────────────────────────
async function registerAddress(
baseUrl: string,
address: string,
identity: Awaited<ReturnType<typeof generateIdentityKeyPair>>,
): Promise<void> {
const body = await signPayload(crypto, identity.signingPrivateKey, {
address,
signingKey: toBase64(identity.signingPublicKey),
});
const res = await fetch(`${baseUrl}/v1/inbox/register`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(body),
});
expect(res.status).toBe(200);
}
describe('Presence — server emits peer_connected / peer_disconnected', () => {
let h: Harness;
beforeAll(async () => {
h = await bootstrap();
// bootstrap registers bob; alice needs registration too so her bridge
// can authenticate against /v1/bridge/ws.
await registerAddress(h.baseUrl, 'alice', h.alice);
});
afterAll(() => {
h.server.stop(true);
});
test('open + close a WsBridge → events fire on the inbox event bus (acceptance 1)', async () => {
const seen: Array<{ name: string; address: string; bridgeKind: string }> = [];
const off = h.events.on((e) => {
if (e.name === 'inbox.peer_connected' || e.name === 'inbox.peer_disconnected') {
seen.push({
name: e.name,
address: e.data.address,
bridgeKind: e.data.bridgeKind,
});
}
});
try {
const ws = new WsBridge({
baseUrl: h.baseUrl,
auth: { crypto, signingPrivateKey: h.alice.signingPrivateKey, address: 'alice' },
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await ws.connect({ onMessage: () => {} });
await waitFor(() => seen.some((s) => s.name === 'inbox.peer_connected'), 2_000);
await ws.disconnect();
await waitFor(() => seen.some((s) => s.name === 'inbox.peer_disconnected'), 2_000);
expect(seen[0]).toEqual({ name: 'inbox.peer_connected', address: 'alice', bridgeKind: 'ws' });
expect(seen[seen.length - 1]).toEqual({
name: 'inbox.peer_disconnected',
address: 'alice',
bridgeKind: 'ws',
});
} finally {
off();
}
});
});
describe('PresenceBridge — subscribe to remote presence changes', () => {
let h: Harness;
beforeAll(async () => {
h = await bootstrap();
await registerAddress(h.baseUrl, 'alice', h.alice);
});
afterAll(() => {
h.server.stop(true);
});
test('online → offline → online over a single subscription (acceptance 2A + 4)', async () => {
const presence = new PresenceBridge({
baseUrl: h.baseUrl,
crypto,
signingPrivateKey: h.bob.signingPrivateKey,
address: 'bob',
initialBackoffMs: 50,
maxBackoffMs: 200,
});
const changes: PresenceChange[] = [];
const sub = await presence.subscribe({
watch: ['alice'],
onPresenceChange: (e) => {
changes.push(e);
},
});
try {
// Initial snapshot — alice not yet connected.
await waitFor(() => changes.length >= 1, 2_000);
expect(changes[0]!.address).toBe('alice');
expect(changes[0]!.status).toBe('offline');
expect(changes[0]!.via).toBeUndefined();
// Alice opens a WsBridge → bob must see online (acceptance 2A).
const aliceWs = new WsBridge({
baseUrl: h.baseUrl,
auth: { crypto, signingPrivateKey: h.alice.signingPrivateKey, address: 'alice' },
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await aliceWs.connect({ onMessage: () => {} });
await waitFor(
() => changes.some((c) => c.status === 'online' && c.address === 'alice'),
2_000,
);
const onlineFrame = changes.find((c) => c.status === 'online' && c.address === 'alice')!;
expect(onlineFrame.via).toBe('ws');
// Alice's bridge drops → bob must see offline.
await aliceWs.disconnect();
await waitFor(
() =>
changes.filter((c) => c.address === 'alice' && c.status === 'offline').length >= 2,
2_000,
);
// Reconnect: alice reopens → bob sees online again (acceptance 4).
const aliceWs2 = new WsBridge({
baseUrl: h.baseUrl,
auth: { crypto, signingPrivateKey: h.alice.signingPrivateKey, address: 'alice' },
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await aliceWs2.connect({ onMessage: () => {} });
await waitFor(
() => changes.filter((c) => c.address === 'alice' && c.status === 'online').length >= 2,
2_000,
);
await aliceWs2.disconnect();
} finally {
await sub.unsubscribe();
}
});
test('subscription on [alice] does not leak carol (acceptance 3)', async () => {
const carol = await generateIdentityKeyPair(crypto);
await registerAddress(h.baseUrl, 'carol', carol);
const presence = new PresenceBridge({
baseUrl: h.baseUrl,
crypto,
signingPrivateKey: h.bob.signingPrivateKey,
address: 'bob',
initialBackoffMs: 50,
maxBackoffMs: 200,
});
const changes: PresenceChange[] = [];
const sub = await presence.subscribe({
watch: ['alice'],
onPresenceChange: (e) => {
changes.push(e);
},
});
try {
// Drain the initial snapshot for alice.
await waitFor(() => changes.length >= 1, 2_000);
const baseline = changes.length;
// Carol opens a bridge — bob's alice-only subscription must NOT see her.
const carolWs = new WsBridge({
baseUrl: h.baseUrl,
auth: { crypto, signingPrivateKey: carol.signingPrivateKey, address: 'carol' },
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await carolWs.connect({ onMessage: () => {} });
// Give the server time to emit + filter.
await new Promise((r) => setTimeout(r, 250));
await carolWs.disconnect();
await new Promise((r) => setTimeout(r, 100));
const newFrames = changes.slice(baseline);
for (const f of newFrames) {
expect(f.address).toBe('alice');
expect(f.address).not.toBe('carol');
}
} finally {
await sub.unsubscribe();
}
});
test('addPeer / removePeer mutate the watched set via reconnect', async () => {
const carol = await generateIdentityKeyPair(crypto);
await registerAddress(h.baseUrl, 'carol-2', carol);
const presence = new PresenceBridge({
baseUrl: h.baseUrl,
crypto,
signingPrivateKey: h.bob.signingPrivateKey,
address: 'bob',
initialBackoffMs: 50,
maxBackoffMs: 200,
});
const changes: PresenceChange[] = [];
const sub = await presence.subscribe({
watch: ['alice'],
onPresenceChange: (e) => {
changes.push(e);
},
});
try {
await waitFor(() => changes.length >= 1, 2_000);
expect(sub.watching()).toEqual(['alice']);
// Add carol-2 — reconnect should deliver a fresh snapshot that
// includes the new address.
await sub.addPeer('carol-2');
await waitFor(() => changes.some((c) => c.address === 'carol-2'), 2_000);
expect(sub.watching().sort()).toEqual(['alice', 'carol-2']);
const carolWs = new WsBridge({
baseUrl: h.baseUrl,
auth: { crypto, signingPrivateKey: carol.signingPrivateKey, address: 'carol-2' },
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await carolWs.connect({ onMessage: () => {} });
await waitFor(
() => changes.some((c) => c.address === 'carol-2' && c.status === 'online'),
2_000,
);
await carolWs.disconnect();
// removePeer → carol-2 events must stop arriving.
await sub.removePeer('carol-2');
const baseline = changes.filter((c) => c.address === 'carol-2').length;
const carolWs2 = new WsBridge({
baseUrl: h.baseUrl,
auth: { crypto, signingPrivateKey: carol.signingPrivateKey, address: 'carol-2' },
connectTimeoutMs: 2_000,
disableAutoReconnect: true,
});
await carolWs2.connect({ onMessage: () => {} });
await new Promise((r) => setTimeout(r, 250));
await carolWs2.disconnect();
await new Promise((r) => setTimeout(r, 100));
const after = changes.filter((c) => c.address === 'carol-2').length;
expect(after).toBe(baseline);
expect(sub.watching()).toEqual(['alice']);
} finally {
await sub.unsubscribe();
}
});
});