Files
Shade/packages/shade-inbox-server/src/bridge.ts

714 lines
25 KiB
TypeScript
Raw Normal View History

/**
* Bridge routes V3.7.
*
* Three transports, one delivery semantic. Each one streams the same
* not-yet-acked inbox blobs for an authenticated address:
*
* GET /v1/bridge/stream SSE feed, one envelope per `event: envelope`
* GET /v1/bridge/poll long-poll, returns at most one batch then closes
* GET /v1/bridge/ws WebSocket, JSON frame per envelope
*
* Auth: signed query string (`address`, `kind`, `since`, `signedAt`,
* `signature`). The signature is verified against the address's owner key
* registered via `/v1/inbox/register`. The `kind` field is bound into the
* canonical signed payload to prevent cross-endpoint replay.
*
* Cursor semantics: `since` is the highest `receivedAt` the client already
* processed. The server returns blobs strictly greater than that cursor and
* advances the client's cursor by emitting a fresh `id:` (SSE) or by
* including the highest seen `receivedAt` in the JSON response (poll/ws).
*
* The implementations subscribe to {@link InboxServerEvents} so that newly
* stored blobs land on connected clients without polling the store. The
* fallback path (no events configured) relies on a small in-process polling
* timer with a configurable interval.
*/
import { Hono, type Context } from 'hono';
import { streamSSE } from 'hono/streaming';
import { createBunWebSocket } from 'hono/bun';
import type { CryptoProvider } from '@shade/core';
import {
errorToHttpStatus,
ShadeError,
ValidationError,
UnauthorizedError,
toBase64,
} from '@shade/core';
import { verifyPayload, validateAddress } from '@shade/server';
import type { InboxStore } from './store.js';
import type { InboxServerEvents } from './events.js';
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
import { PresenceTracker, type TrackedBridgeKind } from './presence.js';
export type BridgeKind = 'stream' | 'poll' | 'ws';
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
/**
* Wire-protocol kind tag for `/v1/bridge/presence`. Distinct from
* `BridgeKind` because the canonical signed payload is shaped
* differently (`watched: string[]` instead of `since: number`).
*/
export type PresenceKind = 'presence';
export interface BridgeRoutesOptions {
store: InboxStore;
crypto: CryptoProvider;
/** Optional events emitter — enables push-style delivery. */
events?: InboxServerEvents;
/** Maximum blobs returned per fetch page. Default 50. */
pageLimit?: number;
/** Default long-poll hold (ms). Default 25_000 (under typical proxy cutoffs). */
longPollTimeoutMs?: number;
/** Maximum long-poll hold (ms). Hard cap. Default 55_000. */
longPollMaxTimeoutMs?: number;
/** SSE heartbeat interval (ms). Default 15_000. */
heartbeatIntervalMs?: number;
/**
* Fallback poll interval (ms) used when no `events` emitter is wired in.
* The bridge will re-check the store at this cadence to detect new blobs.
* Default 1_000.
*/
fallbackPollIntervalMs?: number;
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
/**
* Inject an existing presence tracker. Useful when multiple
* `createBridgeRoutes` calls need to share state (e.g. mounting the
* routes under several hostnames in a single process). When omitted,
* the bridge auto-creates an internal tracker bound to `events`.
*/
presenceTracker?: PresenceTracker;
}
interface VerifiedBridgeRequest {
address: string;
kind: BridgeKind;
since: number;
}
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
interface VerifiedPresenceRequest {
/** The watcher's address (signer of the request). */
address: string;
/** Addresses whose presence the watcher is asking to track. */
watched: string[];
}
/**
* Build the bridge Hono router and a paired Bun-WebSocket handler.
*
* The HTTP routes (`/v1/bridge/stream`, `/v1/bridge/poll`) work on every
* Hono runtime. The `/v1/bridge/ws` route requires `hono/adapter/bun` to be
* available we lazy-require it so that non-Bun deployments aren't
* forced to ship the import.
*/
export function createBridgeRoutes(opts: BridgeRoutesOptions): {
app: Hono;
/** Pass to `Bun.serve({ websocket })`. Undefined if Bun adapter is missing. */
websocket: unknown;
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
/** Live presence tracker. Tests + observers can read it; routes update it. */
presence: PresenceTracker;
} {
const app = new Hono();
const pageLimit = opts.pageLimit ?? 50;
const heartbeatIntervalMs = opts.heartbeatIntervalMs ?? 15_000;
const longPollDefault = opts.longPollTimeoutMs ?? 25_000;
const longPollMax = opts.longPollMaxTimeoutMs ?? 55_000;
const fallbackPollIntervalMs = opts.fallbackPollIntervalMs ?? 1_000;
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
const presence = opts.presenceTracker ?? new PresenceTracker(opts.events ?? null);
app.onError((err, c) => {
if (err instanceof ShadeError) {
const status = errorToHttpStatus(err);
return c.json(err.toJSON(), status as any);
}
console.error('[Shade] Unhandled bridge error:', err);
return c.json({ error: 'Internal server error' }, 500);
});
// ─── SSE ──────────────────────────────────────────────────────
app.get('/v1/bridge/stream', async (c) => {
const verified = await verifyBridgeAuth(c, opts, 'stream');
return streamSSE(c, async (stream) => {
const address = verified.address;
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
const connId = presence.newConnectionId();
presence.markConnected(address, 'sse', connId);
let presenceClosed = false;
const closePresence = (reason: 'closed' | 'error'): void => {
if (presenceClosed) return;
presenceClosed = true;
presence.markDisconnected(address, 'sse', connId, reason);
};
let cursor = verified.since;
const writer = makeBlobWriter(opts.store, pageLimit);
const delivered = new DeliveredIdLru();
// Initial backlog drain.
const flushed = await flushTo(
writer,
address,
cursor,
async (blob) => {
await stream.writeSSE({
id: String(blob.receivedAt),
event: 'envelope',
data: JSON.stringify(serializeBlob(blob)),
});
},
delivered,
);
cursor = Math.max(cursor, flushed);
// Hook up event-driven push if available, else fall back to a poll
// timer that does the same scan.
let cleanupSubscription: (() => void) | null = null;
let signalled = false;
let pendingFlushPromise: Promise<void> = Promise.resolve();
const triggerFlush = (): void => {
signalled = true;
// Serialize fan-in so concurrent triggers don't double-fetch.
// `.catch(() => {})` keeps the chain alive across transient
// emit failures (e.g. a closed SSE write throws) — without it
// one rejection silently kills every future flush on this
// connection.
pendingFlushPromise = pendingFlushPromise
.then(async () => {
while (signalled) {
signalled = false;
const drained = await flushTo(
writer,
address,
cursor,
async (blob) => {
await stream.writeSSE({
id: String(blob.receivedAt),
event: 'envelope',
data: JSON.stringify(serializeBlob(blob)),
});
},
delivered,
);
if (drained > cursor) cursor = drained;
}
})
.catch(() => {});
};
if (opts.events) {
cleanupSubscription = opts.events.on((e) => {
if (e.name === 'inbox.blob_stored' && e.data.address === address) {
triggerFlush();
}
});
}
const fallbackTimer = setInterval(() => triggerFlush(), fallbackPollIntervalMs);
const heartbeat = setInterval(() => {
// Comment lines are valid SSE keepalives.
stream.write(`: ping ${Date.now()}\n\n`).catch(() => {});
}, heartbeatIntervalMs);
// Wait for the request to abort (client disconnect).
await new Promise<void>((resolve) => {
const sig = c.req.raw.signal;
if (sig.aborted) return resolve();
sig.addEventListener('abort', () => resolve(), { once: true });
});
cleanupSubscription?.();
clearInterval(fallbackTimer);
clearInterval(heartbeat);
await pendingFlushPromise.catch(() => {});
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
closePresence('closed');
});
});
// ─── Presence (V4.7) ──────────────────────────────────────────
// SSE feed of `peer_connected` / `peer_disconnected` events filtered
// by a watcher-supplied address list. Subscribing does NOT count as
// a peer-bridge connection (it doesn't call `markConnected`) so
// monitoring presence doesn't make you appear online to others.
app.get('/v1/bridge/presence', async (c) => {
const verified = await verifyPresenceAuth(c, opts);
return streamSSE(c, async (stream) => {
const watched = new Set(verified.watched);
// Initial snapshot — one frame per watched address with current
// status. Lets subscribers render UI immediately rather than
// waiting for the next state change.
const now = Date.now();
for (const addr of verified.watched) {
await stream.writeSSE({
event: 'presence',
data: JSON.stringify({
address: addr,
status: presence.isOnline(addr) ? 'online' : 'offline',
at: now,
}),
});
}
let unsubscribe: (() => void) | null = null;
if (opts.events) {
unsubscribe = opts.events.on((e) => {
if (e.name !== 'inbox.peer_connected' && e.name !== 'inbox.peer_disconnected') return;
const data = e.data as { address: string; bridgeKind: TrackedBridgeKind };
if (!watched.has(data.address)) return;
const status = e.name === 'inbox.peer_connected' ? 'online' : 'offline';
// Fire-and-forget: drop the frame if the stream has gone away.
void stream
.writeSSE({
event: 'presence',
data: JSON.stringify({
address: data.address,
status,
at: e.timestamp,
via: data.bridgeKind,
}),
})
.catch(() => {});
});
}
const heartbeat = setInterval(() => {
stream.write(`: ping ${Date.now()}\n\n`).catch(() => {});
}, heartbeatIntervalMs);
await new Promise<void>((resolve) => {
const sig = c.req.raw.signal;
if (sig.aborted) return resolve();
sig.addEventListener('abort', () => resolve(), { once: true });
});
unsubscribe?.();
clearInterval(heartbeat);
});
});
// ─── Long-poll ────────────────────────────────────────────────
app.get('/v1/bridge/poll', async (c) => {
const verified = await verifyBridgeAuth(c, opts, 'poll');
const requestedTimeout = Number(c.req.query('timeoutMs') ?? longPollDefault);
const timeoutMs = Math.min(
Math.max(0, Number.isFinite(requestedTimeout) ? requestedTimeout : longPollDefault),
longPollMax,
);
// Try immediate fetch first.
let blobs = await opts.store.fetchBlobs({
address: verified.address,
sinceCursor: verified.since,
now: Date.now(),
limit: pageLimit,
});
if (blobs.length > 0) {
return c.json(buildPollResponse(blobs, verified.since));
}
// Otherwise, wait for either a new event or the timeout.
blobs = await waitForBlobs({
events: opts.events ?? null,
store: opts.store,
address: verified.address,
since: verified.since,
timeoutMs,
pageLimit,
fallbackPollIntervalMs,
abortSignal: c.req.raw.signal,
});
return c.json(buildPollResponse(blobs, verified.since));
});
// ─── WebSocket ────────────────────────────────────────────────
// Hono's Bun adapter resolves `getBunServer` from the request's `env`
// (the second argument of Bun.serve's fetch). On non-Bun runtimes the
// upgrade simply fails at runtime; the SSE/long-poll routes still work.
const { upgradeWebSocket, websocket } = createBunWebSocket();
app.get(
'/v1/bridge/ws',
upgradeWebSocket(async (c: Context) => {
let verified: VerifiedBridgeRequest | null = null;
let upgradeError: Error | null = null;
try {
verified = await verifyBridgeAuth(c, opts, 'ws');
} catch (err) {
upgradeError = err as Error;
}
if (!verified) {
// Hono's API doesn't let us reject the upgrade with a status code
// before opening the socket; close immediately on open with a 4xxx
// policy code so the client can fall back to a different bridge.
return {
onOpen(_evt: unknown, ws: { close: (code?: number, reason?: string) => void }) {
const status =
upgradeError instanceof ShadeError ? errorToHttpStatus(upgradeError) : 500;
ws.close(4000 + (status % 1000), upgradeError?.message ?? 'unauthorized');
},
};
}
const address = verified.address;
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
const connId = presence.newConnectionId();
let cursor = verified.since;
const writer = makeBlobWriter(opts.store, pageLimit);
const delivered = new DeliveredIdLru();
let unsubscribe: (() => void) | null = null;
let fallbackTimer: ReturnType<typeof setInterval> | null = null;
let pendingFlushPromise: Promise<void> = Promise.resolve();
let signalled = false;
let connected = true;
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
let presenceClosed = false;
const closePresence = (reason: 'closed' | 'error'): void => {
if (presenceClosed) return;
presenceClosed = true;
presence.markDisconnected(address, 'ws', connId, reason);
};
return {
onOpen(_evt: unknown, ws: {
send: (data: string) => void;
close: (code?: number, reason?: string) => void;
}) {
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
presence.markConnected(address, 'ws', connId);
const triggerFlush = (): void => {
signalled = true;
// `.catch(() => {})` mirrors the SSE chain — keeps the
// pending-flush queue alive across transient ws.send errors
// (e.g. partial close, backpressure overflow).
pendingFlushPromise = pendingFlushPromise
.then(async () => {
while (signalled && connected) {
signalled = false;
const drained = await flushTo(
writer,
address,
cursor,
async (blob) => {
ws.send(JSON.stringify(serializeBlob(blob)));
},
delivered,
);
if (drained > cursor) cursor = drained;
}
})
.catch(() => {});
};
if (opts.events) {
unsubscribe = opts.events.on((e) => {
if (e.name === 'inbox.blob_stored' && e.data.address === address) {
triggerFlush();
}
});
}
fallbackTimer = setInterval(() => triggerFlush(), fallbackPollIntervalMs);
triggerFlush();
},
onClose() {
connected = false;
unsubscribe?.();
if (fallbackTimer) clearInterval(fallbackTimer);
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
closePresence('closed');
},
onError() {
connected = false;
unsubscribe?.();
if (fallbackTimer) clearInterval(fallbackTimer);
closePresence('error');
},
};
}),
);
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
return { app, websocket, presence };
}
// ─── helpers ──────────────────────────────────────────────────
async function verifyBridgeAuth(
c: Context,
opts: BridgeRoutesOptions,
expectedKind: BridgeKind,
): Promise<VerifiedBridgeRequest> {
const url = new URL(c.req.url);
const qs = url.searchParams;
const address = validateAddress(qs.get('address'));
const kind = qs.get('kind');
if (kind !== expectedKind) {
throw new ValidationError(`bridge kind mismatch: expected ${expectedKind}`, 'kind');
}
const sinceStr = qs.get('since');
const signedAtStr = qs.get('signedAt');
const signature = qs.get('signature');
if (sinceStr === null) throw new ValidationError('missing since', 'since');
if (signedAtStr === null) throw new ValidationError('missing signedAt', 'signedAt');
if (!signature) throw new ValidationError('missing signature', 'signature');
const since = Number(sinceStr);
const signedAt = Number(signedAtStr);
if (!Number.isFinite(since) || since < 0) {
throw new ValidationError('since must be a non-negative number', 'since');
}
if (!Number.isFinite(signedAt)) {
throw new ValidationError('signedAt must be a number', 'signedAt');
}
const owner = await opts.store.getAddressOwner(address);
if (!owner) {
throw new UnauthorizedError(`address ${address} is not registered`);
}
await verifyPayload(opts.crypto, owner, {
address,
kind,
since,
signedAt,
signature,
});
return { address, kind: kind as BridgeKind, since };
}
release(v4.7.0): peer-presence events for instant BroadcastChannel revoke Adds the bridge-connection-lifecycle signal that closes Prism's ~45s revoke window down to one server→client round-trip (~50ms). Server (`@shade/inbox-server`): - `inbox.peer_connected` / `inbox.peer_disconnected` events on the 0↔1 boundary across WS + SSE bridges. Long-poll deliberately not tracked (every poll boundary would flap; push transports are also the only ones where instant revoke matters). - `PresenceTracker` collapses two parallel bridges (e.g. WS + SSE during fallback handover) into one connect/disconnect pair. - `GET /v1/bridge/presence` SSE endpoint: signed query with `kind: 'presence'`, `watched: string[]`; on open streams a per-address snapshot, then change frames filtered server-side. MAX_WATCHED_ADDRESSES = 64. Subscribing does not itself count as a peer-bridge connection. - `createBridgeRoutes` now returns `{ app, websocket, presence }`. Client (`@shade/transport-bridge`): - `PresenceBridge.subscribe({ watch, onPresenceChange })` → `{ addPeer, removePeer, watching, unsubscribe }`. addPeer/removePeer mutate via reconnect with a fresh signed query. - `signPresenceQuery` helper for non-PresenceBridge consumers. Tests cover all four acceptance criteria from the Prism request: server-event smoke, online→offline subscription, address scoping (carol invisible to a [alice]-only sub), reconnect, plus an addPeer/removePeer regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:16:35 +02:00
const MAX_WATCHED_ADDRESSES = 64;
/**
* Verify a `/v1/bridge/presence` request.
*
* Signed canonical payload: `{address, kind: 'presence', watched: string[],
* signedAt}`. The watcher's address must be a registered inbox; the
* signature is verified against the registered owner key for that
* address. The `watched` list bounds what the subscription will
* receive server-side filtering is enforced inside the handler.
*/
async function verifyPresenceAuth(
c: Context,
opts: BridgeRoutesOptions,
): Promise<VerifiedPresenceRequest> {
const url = new URL(c.req.url);
const qs = url.searchParams;
const address = validateAddress(qs.get('address'));
const kind = qs.get('kind');
if (kind !== 'presence') {
throw new ValidationError(`bridge kind mismatch: expected presence`, 'kind');
}
const watchedRaw = qs.get('watched');
if (watchedRaw === null) throw new ValidationError('missing watched', 'watched');
// Empty subscription is allowed (subscribe to nothing — useful for a
// client that intends to call addPeer right after open). A null/
// missing param is still rejected so the canonicalization is
// unambiguous.
const watched =
watchedRaw === ''
? []
: watchedRaw.split(',').map((a) => validateAddress(a));
if (watched.length > MAX_WATCHED_ADDRESSES) {
throw new ValidationError(
`watched list too large: ${watched.length} > ${MAX_WATCHED_ADDRESSES}`,
'watched',
);
}
const signedAtStr = qs.get('signedAt');
const signature = qs.get('signature');
if (signedAtStr === null) throw new ValidationError('missing signedAt', 'signedAt');
if (!signature) throw new ValidationError('missing signature', 'signature');
const signedAt = Number(signedAtStr);
if (!Number.isFinite(signedAt)) {
throw new ValidationError('signedAt must be a number', 'signedAt');
}
const owner = await opts.store.getAddressOwner(address);
if (!owner) {
throw new UnauthorizedError(`address ${address} is not registered`);
}
await verifyPayload(opts.crypto, owner, {
address,
kind,
watched,
signedAt,
signature,
});
return { address, watched };
}
interface BlobRow {
msgId: string;
ciphertext: Uint8Array;
receivedAt: number;
expiresAt: number;
/** V4.8 — relay-captured sender fingerprint. Optional for legacy rows. */
senderFp?: string;
}
interface BlobWriter {
fetchPage(address: string, cursor: number): Promise<BlobRow[]>;
}
function makeBlobWriter(store: InboxStore, pageLimit: number): BlobWriter {
return {
async fetchPage(address, cursor) {
return store.fetchBlobs({
address,
sinceCursor: cursor,
now: Date.now(),
limit: pageLimit,
});
},
};
}
/**
* Per-connection bounded msgId tracker defense in depth against duplicate
* delivery of the same blob to the same bridge socket. Cursor pagination
* already guarantees uniqueness in the happy path, but a dedup gate at the
* emit boundary catches any subtle bug (e.g. a flushTo race, a future
* refactor, an event-emit retry) without changing wire semantics.
*
* The cap is intentionally large enough to cover any realistic bridge
* pageLimit and small enough to bound memory under long-running streams.
*/
const DELIVERED_LRU_CAP = 4096;
class DeliveredIdLru {
private readonly seen = new Set<string>();
private readonly order: string[] = [];
/** Returns true if `msgId` has not been seen on this connection yet. */
add(msgId: string): boolean {
if (this.seen.has(msgId)) return false;
this.seen.add(msgId);
this.order.push(msgId);
if (this.order.length > DELIVERED_LRU_CAP) {
const evicted = this.order.shift()!;
this.seen.delete(evicted);
}
return true;
}
}
async function flushTo(
writer: BlobWriter,
address: string,
startCursor: number,
emit: (blob: BlobRow) => Promise<void>,
delivered?: DeliveredIdLru,
): Promise<number> {
let cursor = startCursor;
// Drain page-by-page so a backlog larger than `pageLimit` still flushes.
// eslint-disable-next-line no-constant-condition
while (true) {
const page = await writer.fetchPage(address, cursor);
if (page.length === 0) break;
for (const row of page) {
// Per-connection dedup gate — prevents the same msgId from being
// emitted twice if flushTo is somehow re-entered before the cursor
// catches up. See comment on `DeliveredIdLru`.
if (!delivered || delivered.add(row.msgId)) {
await emit(row);
}
if (row.receivedAt > cursor) cursor = row.receivedAt;
}
if (page.length === 0) break;
}
return cursor;
}
function serializeBlob(blob: BlobRow): {
msgId: string;
ciphertext: string;
receivedAt: number;
expiresAt: number;
from?: string;
} {
const out: {
msgId: string;
ciphertext: string;
receivedAt: number;
expiresAt: number;
from?: string;
} = {
msgId: blob.msgId,
ciphertext: toBase64(blob.ciphertext),
receivedAt: blob.receivedAt,
expiresAt: blob.expiresAt,
};
// V4.8 — relay-captured sender fingerprint. The transport-bridge
// wire format already accepts `from`; populating it lets receivers
// bootstrap unknown-sender first-contact via `shade.receive('fp:<hex>',
// env)` without requiring an out-of-band sender hint.
if (blob.senderFp) out.from = blob.senderFp;
return out;
}
function buildPollResponse(blobs: BlobRow[], sinceFallback: number): {
blobs: ReturnType<typeof serializeBlob>[];
cursor: number;
hasMore: boolean;
} {
const out = blobs.map(serializeBlob);
const cursor = blobs.length > 0 ? blobs[blobs.length - 1]!.receivedAt : sinceFallback;
return { blobs: out, cursor, hasMore: false };
}
interface WaitForBlobsArgs {
events: InboxServerEvents | null;
store: InboxStore;
address: string;
since: number;
timeoutMs: number;
pageLimit: number;
fallbackPollIntervalMs: number;
abortSignal?: AbortSignal;
}
async function waitForBlobs(args: WaitForBlobsArgs): Promise<BlobRow[]> {
if (args.timeoutMs === 0) return [];
return new Promise<BlobRow[]>((resolve) => {
let resolved = false;
let unsubscribe: (() => void) | null = null;
let timer: ReturnType<typeof setTimeout> | null = null;
let fallback: ReturnType<typeof setInterval> | null = null;
let abortHandler: (() => void) | null = null;
const finish = (blobs: BlobRow[]) => {
if (resolved) return;
resolved = true;
if (timer) clearTimeout(timer);
if (fallback) clearInterval(fallback);
if (unsubscribe) unsubscribe();
if (abortHandler && args.abortSignal) {
args.abortSignal.removeEventListener('abort', abortHandler);
}
resolve(blobs);
};
const tryFetch = async (): Promise<void> => {
try {
const rows = await args.store.fetchBlobs({
address: args.address,
sinceCursor: args.since,
now: Date.now(),
limit: args.pageLimit,
});
if (rows.length > 0) finish(rows);
} catch {
// swallow — let timeout handle it
}
};
if (args.events) {
unsubscribe = args.events.on((e) => {
if (e.name === 'inbox.blob_stored' && e.data.address === args.address) {
void tryFetch();
}
});
}
fallback = setInterval(tryFetch, args.fallbackPollIntervalMs);
timer = setTimeout(() => finish([]), args.timeoutMs);
if (args.abortSignal) {
if (args.abortSignal.aborted) {
finish([]);
return;
}
abortHandler = () => finish([]);
args.abortSignal.addEventListener('abort', abortHandler, { once: true });
}
});
}