/** * Bridge routes — V3.7. * * Three transports, one delivery semantic. Each one streams the same * not-yet-acked inbox blobs for an authenticated address: * * GET /v1/bridge/stream — SSE feed, one envelope per `event: envelope` * GET /v1/bridge/poll — long-poll, returns at most one batch then closes * GET /v1/bridge/ws — WebSocket, JSON frame per envelope * * Auth: signed query string (`address`, `kind`, `since`, `signedAt`, * `signature`). The signature is verified against the address's owner key * registered via `/v1/inbox/register`. The `kind` field is bound into the * canonical signed payload to prevent cross-endpoint replay. * * Cursor semantics: `since` is the highest `receivedAt` the client already * processed. The server returns blobs strictly greater than that cursor and * advances the client's cursor by emitting a fresh `id:` (SSE) or by * including the highest seen `receivedAt` in the JSON response (poll/ws). * * The implementations subscribe to {@link InboxServerEvents} so that newly * stored blobs land on connected clients without polling the store. The * fallback path (no events configured) relies on a small in-process polling * timer with a configurable interval. */ import { Hono, type Context } from 'hono'; import { streamSSE } from 'hono/streaming'; import { createBunWebSocket } from 'hono/bun'; import type { CryptoProvider } from '@shade/core'; import { errorToHttpStatus, ShadeError, ValidationError, UnauthorizedError, toBase64, } from '@shade/core'; import { verifyPayload, validateAddress } from '@shade/server'; import type { InboxStore } from './store.js'; import type { InboxServerEvents } from './events.js'; import { PresenceTracker, type TrackedBridgeKind } from './presence.js'; import { BridgeDeliveryLog } from './bridge-delivery-log.js'; export type BridgeKind = 'stream' | 'poll' | 'ws'; /** * Wire-protocol kind tag for `/v1/bridge/presence`. Distinct from * `BridgeKind` because the canonical signed payload is shaped * differently (`watched: string[]` instead of `since: number`). */ export type PresenceKind = 'presence'; export interface BridgeRoutesOptions { store: InboxStore; crypto: CryptoProvider; /** Optional events emitter — enables push-style delivery. */ events?: InboxServerEvents; /** Maximum blobs returned per fetch page. Default 50. */ pageLimit?: number; /** Default long-poll hold (ms). Default 25_000 (under typical proxy cutoffs). */ longPollTimeoutMs?: number; /** Maximum long-poll hold (ms). Hard cap. Default 55_000. */ longPollMaxTimeoutMs?: number; /** SSE heartbeat interval (ms). Default 15_000. */ heartbeatIntervalMs?: number; /** * Fallback poll interval (ms) used when no `events` emitter is wired in. * The bridge will re-check the store at this cadence to detect new blobs. * Default 1_000. */ fallbackPollIntervalMs?: number; /** * Inject an existing presence tracker. Useful when multiple * `createBridgeRoutes` calls need to share state (e.g. mounting the * routes under several hostnames in a single process). When omitted, * the bridge auto-creates an internal tracker bound to `events`. */ presenceTracker?: PresenceTracker; /** * V4.8.4 — shared bridge delivery log. After every successful WS / * SSE push we record `(address, msgId, now)` here so the inbox-fetch * route can suppress the same blob from a subsequent inbox-poll * within the log's grace window. Pass the same instance to * `createInboxRoutes` (or use the auto-created one returned in * `bridgeRoutes.bridgeDeliveryLog`). When omitted, the bridge * auto-creates its own log. */ bridgeDeliveryLog?: BridgeDeliveryLog; } interface VerifiedBridgeRequest { address: string; kind: BridgeKind; since: number; } interface VerifiedPresenceRequest { /** The watcher's address (signer of the request). */ address: string; /** Addresses whose presence the watcher is asking to track. */ watched: string[]; } /** * Build the bridge Hono router and a paired Bun-WebSocket handler. * * The HTTP routes (`/v1/bridge/stream`, `/v1/bridge/poll`) work on every * Hono runtime. The `/v1/bridge/ws` route requires `hono/adapter/bun` to be * available — we lazy-require it so that non-Bun deployments aren't * forced to ship the import. */ export function createBridgeRoutes(opts: BridgeRoutesOptions): { app: Hono; /** Pass to `Bun.serve({ websocket })`. Undefined if Bun adapter is missing. */ websocket: unknown; /** Live presence tracker. Tests + observers can read it; routes update it. */ presence: PresenceTracker; /** * V4.8.4 — the shared bridge-delivery log this router writes to on * every successful push. Wire the same instance into * `createInboxRoutes({ bridgeDeliveryLog })` so the inbox-fetch route * can suppress recently-pushed blobs. */ bridgeDeliveryLog: BridgeDeliveryLog; } { const app = new Hono(); const pageLimit = opts.pageLimit ?? 50; const heartbeatIntervalMs = opts.heartbeatIntervalMs ?? 15_000; const longPollDefault = opts.longPollTimeoutMs ?? 25_000; const longPollMax = opts.longPollMaxTimeoutMs ?? 55_000; const fallbackPollIntervalMs = opts.fallbackPollIntervalMs ?? 1_000; const presence = opts.presenceTracker ?? new PresenceTracker(opts.events ?? null); const bridgeDeliveryLog = opts.bridgeDeliveryLog ?? new BridgeDeliveryLog(); app.onError((err, c) => { if (err instanceof ShadeError) { const status = errorToHttpStatus(err); return c.json(err.toJSON(), status as any); } console.error('[Shade] Unhandled bridge error:', err); return c.json({ error: 'Internal server error' }, 500); }); // ─── SSE ────────────────────────────────────────────────────── app.get('/v1/bridge/stream', async (c) => { const verified = await verifyBridgeAuth(c, opts, 'stream'); return streamSSE(c, async (stream) => { const address = verified.address; const connId = presence.newConnectionId(); presence.markConnected(address, 'sse', connId); let presenceClosed = false; const closePresence = (reason: 'closed' | 'error'): void => { if (presenceClosed) return; presenceClosed = true; presence.markDisconnected(address, 'sse', connId, reason); }; let cursor = verified.since; const writer = makeBlobWriter(opts.store, pageLimit); const delivered = new DeliveredIdLru(); const recordPush = (msgId: string): void => { bridgeDeliveryLog.recordDelivered(address, msgId, Date.now()); }; // Initial backlog drain. const flushed = await flushTo( writer, address, cursor, async (blob) => { await stream.writeSSE({ id: String(blob.receivedAt), event: 'envelope', data: JSON.stringify(serializeBlob(blob)), }); recordPush(blob.msgId); }, delivered, ); cursor = Math.max(cursor, flushed); // Hook up event-driven push if available, else fall back to a poll // timer that does the same scan. let cleanupSubscription: (() => void) | null = null; let signalled = false; let pendingFlushPromise: Promise = Promise.resolve(); const triggerFlush = (): void => { signalled = true; // Serialize fan-in so concurrent triggers don't double-fetch. // `.catch(() => {})` keeps the chain alive across transient // emit failures (e.g. a closed SSE write throws) — without it // one rejection silently kills every future flush on this // connection. pendingFlushPromise = pendingFlushPromise .then(async () => { while (signalled) { signalled = false; const drained = await flushTo( writer, address, cursor, async (blob) => { await stream.writeSSE({ id: String(blob.receivedAt), event: 'envelope', data: JSON.stringify(serializeBlob(blob)), }); recordPush(blob.msgId); }, delivered, ); if (drained > cursor) cursor = drained; } }) .catch(() => {}); }; if (opts.events) { cleanupSubscription = opts.events.on((e) => { if (e.name === 'inbox.blob_stored' && e.data.address === address) { triggerFlush(); } }); } const fallbackTimer = setInterval(() => triggerFlush(), fallbackPollIntervalMs); const heartbeat = setInterval(() => { // Comment lines are valid SSE keepalives. stream.write(`: ping ${Date.now()}\n\n`).catch(() => {}); }, heartbeatIntervalMs); // Wait for the request to abort (client disconnect). await new Promise((resolve) => { const sig = c.req.raw.signal; if (sig.aborted) return resolve(); sig.addEventListener('abort', () => resolve(), { once: true }); }); cleanupSubscription?.(); clearInterval(fallbackTimer); clearInterval(heartbeat); await pendingFlushPromise.catch(() => {}); closePresence('closed'); }); }); // ─── Presence (V4.7) ────────────────────────────────────────── // SSE feed of `peer_connected` / `peer_disconnected` events filtered // by a watcher-supplied address list. Subscribing does NOT count as // a peer-bridge connection (it doesn't call `markConnected`) so // monitoring presence doesn't make you appear online to others. app.get('/v1/bridge/presence', async (c) => { const verified = await verifyPresenceAuth(c, opts); return streamSSE(c, async (stream) => { const watched = new Set(verified.watched); // Initial snapshot — one frame per watched address with current // status. Lets subscribers render UI immediately rather than // waiting for the next state change. const now = Date.now(); for (const addr of verified.watched) { await stream.writeSSE({ event: 'presence', data: JSON.stringify({ address: addr, status: presence.isOnline(addr) ? 'online' : 'offline', at: now, }), }); } let unsubscribe: (() => void) | null = null; if (opts.events) { unsubscribe = opts.events.on((e) => { if (e.name !== 'inbox.peer_connected' && e.name !== 'inbox.peer_disconnected') return; const data = e.data as { address: string; bridgeKind: TrackedBridgeKind }; if (!watched.has(data.address)) return; const status = e.name === 'inbox.peer_connected' ? 'online' : 'offline'; // Fire-and-forget: drop the frame if the stream has gone away. void stream .writeSSE({ event: 'presence', data: JSON.stringify({ address: data.address, status, at: e.timestamp, via: data.bridgeKind, }), }) .catch(() => {}); }); } const heartbeat = setInterval(() => { stream.write(`: ping ${Date.now()}\n\n`).catch(() => {}); }, heartbeatIntervalMs); await new Promise((resolve) => { const sig = c.req.raw.signal; if (sig.aborted) return resolve(); sig.addEventListener('abort', () => resolve(), { once: true }); }); unsubscribe?.(); clearInterval(heartbeat); }); }); // ─── Long-poll ──────────────────────────────────────────────── app.get('/v1/bridge/poll', async (c) => { const verified = await verifyBridgeAuth(c, opts, 'poll'); const requestedTimeout = Number(c.req.query('timeoutMs') ?? longPollDefault); const timeoutMs = Math.min( Math.max(0, Number.isFinite(requestedTimeout) ? requestedTimeout : longPollDefault), longPollMax, ); // Try immediate fetch first. let blobs = await opts.store.fetchBlobs({ address: verified.address, sinceCursor: verified.since, now: Date.now(), limit: pageLimit, }); if (blobs.length > 0) { const now = Date.now(); for (const b of blobs) bridgeDeliveryLog.recordDelivered(verified.address, b.msgId, now); return c.json(buildPollResponse(blobs, verified.since)); } // Otherwise, wait for either a new event or the timeout. blobs = await waitForBlobs({ events: opts.events ?? null, store: opts.store, address: verified.address, since: verified.since, timeoutMs, pageLimit, fallbackPollIntervalMs, abortSignal: c.req.raw.signal, }); const now = Date.now(); for (const b of blobs) bridgeDeliveryLog.recordDelivered(verified.address, b.msgId, now); return c.json(buildPollResponse(blobs, verified.since)); }); // ─── WebSocket ──────────────────────────────────────────────── // Hono's Bun adapter resolves `getBunServer` from the request's `env` // (the second argument of Bun.serve's fetch). On non-Bun runtimes the // upgrade simply fails at runtime; the SSE/long-poll routes still work. const { upgradeWebSocket, websocket } = createBunWebSocket(); app.get( '/v1/bridge/ws', upgradeWebSocket(async (c: Context) => { let verified: VerifiedBridgeRequest | null = null; let upgradeError: Error | null = null; try { verified = await verifyBridgeAuth(c, opts, 'ws'); } catch (err) { upgradeError = err as Error; } if (!verified) { // Hono's API doesn't let us reject the upgrade with a status code // before opening the socket; close immediately on open with a 4xxx // policy code so the client can fall back to a different bridge. return { onOpen(_evt: unknown, ws: { close: (code?: number, reason?: string) => void }) { const status = upgradeError instanceof ShadeError ? errorToHttpStatus(upgradeError) : 500; ws.close(4000 + (status % 1000), upgradeError?.message ?? 'unauthorized'); }, }; } const address = verified.address; const connId = presence.newConnectionId(); let cursor = verified.since; const writer = makeBlobWriter(opts.store, pageLimit); const delivered = new DeliveredIdLru(); let unsubscribe: (() => void) | null = null; let fallbackTimer: ReturnType | null = null; let pendingFlushPromise: Promise = Promise.resolve(); let signalled = false; let connected = true; let presenceClosed = false; const closePresence = (reason: 'closed' | 'error'): void => { if (presenceClosed) return; presenceClosed = true; presence.markDisconnected(address, 'ws', connId, reason); }; return { onOpen(_evt: unknown, ws: { send: (data: string) => void; close: (code?: number, reason?: string) => void; }) { presence.markConnected(address, 'ws', connId); const triggerFlush = (): void => { signalled = true; // `.catch(() => {})` mirrors the SSE chain — keeps the // pending-flush queue alive across transient ws.send errors // (e.g. partial close, backpressure overflow). pendingFlushPromise = pendingFlushPromise .then(async () => { while (signalled && connected) { signalled = false; const drained = await flushTo( writer, address, cursor, async (blob) => { ws.send(JSON.stringify(serializeBlob(blob))); bridgeDeliveryLog.recordDelivered(address, blob.msgId, Date.now()); }, delivered, ); if (drained > cursor) cursor = drained; } }) .catch(() => {}); }; if (opts.events) { unsubscribe = opts.events.on((e) => { if (e.name === 'inbox.blob_stored' && e.data.address === address) { triggerFlush(); } }); } fallbackTimer = setInterval(() => triggerFlush(), fallbackPollIntervalMs); triggerFlush(); }, onClose() { connected = false; unsubscribe?.(); if (fallbackTimer) clearInterval(fallbackTimer); closePresence('closed'); }, onError() { connected = false; unsubscribe?.(); if (fallbackTimer) clearInterval(fallbackTimer); closePresence('error'); }, }; }), ); return { app, websocket, presence, bridgeDeliveryLog }; } // ─── helpers ────────────────────────────────────────────────── async function verifyBridgeAuth( c: Context, opts: BridgeRoutesOptions, expectedKind: BridgeKind, ): Promise { const url = new URL(c.req.url); const qs = url.searchParams; const address = validateAddress(qs.get('address')); const kind = qs.get('kind'); if (kind !== expectedKind) { throw new ValidationError(`bridge kind mismatch: expected ${expectedKind}`, 'kind'); } const sinceStr = qs.get('since'); const signedAtStr = qs.get('signedAt'); const signature = qs.get('signature'); if (sinceStr === null) throw new ValidationError('missing since', 'since'); if (signedAtStr === null) throw new ValidationError('missing signedAt', 'signedAt'); if (!signature) throw new ValidationError('missing signature', 'signature'); const since = Number(sinceStr); const signedAt = Number(signedAtStr); if (!Number.isFinite(since) || since < 0) { throw new ValidationError('since must be a non-negative number', 'since'); } if (!Number.isFinite(signedAt)) { throw new ValidationError('signedAt must be a number', 'signedAt'); } const owner = await opts.store.getAddressOwner(address); if (!owner) { throw new UnauthorizedError(`address ${address} is not registered`); } await verifyPayload(opts.crypto, owner, { address, kind, since, signedAt, signature, }); return { address, kind: kind as BridgeKind, since }; } const MAX_WATCHED_ADDRESSES = 64; /** * Verify a `/v1/bridge/presence` request. * * Signed canonical payload: `{address, kind: 'presence', watched: string[], * signedAt}`. The watcher's address must be a registered inbox; the * signature is verified against the registered owner key for that * address. The `watched` list bounds what the subscription will * receive — server-side filtering is enforced inside the handler. */ async function verifyPresenceAuth( c: Context, opts: BridgeRoutesOptions, ): Promise { const url = new URL(c.req.url); const qs = url.searchParams; const address = validateAddress(qs.get('address')); const kind = qs.get('kind'); if (kind !== 'presence') { throw new ValidationError(`bridge kind mismatch: expected presence`, 'kind'); } const watchedRaw = qs.get('watched'); if (watchedRaw === null) throw new ValidationError('missing watched', 'watched'); // Empty subscription is allowed (subscribe to nothing — useful for a // client that intends to call addPeer right after open). A null/ // missing param is still rejected so the canonicalization is // unambiguous. const watched = watchedRaw === '' ? [] : watchedRaw.split(',').map((a) => validateAddress(a)); if (watched.length > MAX_WATCHED_ADDRESSES) { throw new ValidationError( `watched list too large: ${watched.length} > ${MAX_WATCHED_ADDRESSES}`, 'watched', ); } const signedAtStr = qs.get('signedAt'); const signature = qs.get('signature'); if (signedAtStr === null) throw new ValidationError('missing signedAt', 'signedAt'); if (!signature) throw new ValidationError('missing signature', 'signature'); const signedAt = Number(signedAtStr); if (!Number.isFinite(signedAt)) { throw new ValidationError('signedAt must be a number', 'signedAt'); } const owner = await opts.store.getAddressOwner(address); if (!owner) { throw new UnauthorizedError(`address ${address} is not registered`); } await verifyPayload(opts.crypto, owner, { address, kind, watched, signedAt, signature, }); return { address, watched }; } interface BlobRow { msgId: string; ciphertext: Uint8Array; receivedAt: number; expiresAt: number; /** V4.8 — relay-captured sender fingerprint. Optional for legacy rows. */ senderFp?: string; } interface BlobWriter { fetchPage(address: string, cursor: number): Promise; } function makeBlobWriter(store: InboxStore, pageLimit: number): BlobWriter { return { async fetchPage(address, cursor) { return store.fetchBlobs({ address, sinceCursor: cursor, now: Date.now(), limit: pageLimit, }); }, }; } /** * Per-connection bounded msgId tracker — defense in depth against duplicate * delivery of the same blob to the same bridge socket. Cursor pagination * already guarantees uniqueness in the happy path, but a dedup gate at the * emit boundary catches any subtle bug (e.g. a flushTo race, a future * refactor, an event-emit retry) without changing wire semantics. * * The cap is intentionally large enough to cover any realistic bridge * pageLimit and small enough to bound memory under long-running streams. */ const DELIVERED_LRU_CAP = 4096; class DeliveredIdLru { private readonly seen = new Set(); private readonly order: string[] = []; /** Returns true if `msgId` has not been seen on this connection yet. */ add(msgId: string): boolean { if (this.seen.has(msgId)) return false; this.seen.add(msgId); this.order.push(msgId); if (this.order.length > DELIVERED_LRU_CAP) { const evicted = this.order.shift()!; this.seen.delete(evicted); } return true; } } async function flushTo( writer: BlobWriter, address: string, startCursor: number, emit: (blob: BlobRow) => Promise, delivered?: DeliveredIdLru, ): Promise { let cursor = startCursor; // Drain page-by-page so a backlog larger than `pageLimit` still flushes. // eslint-disable-next-line no-constant-condition while (true) { const page = await writer.fetchPage(address, cursor); if (page.length === 0) break; for (const row of page) { // Per-connection dedup gate — prevents the same msgId from being // emitted twice if flushTo is somehow re-entered before the cursor // catches up. See comment on `DeliveredIdLru`. if (!delivered || delivered.add(row.msgId)) { await emit(row); } if (row.receivedAt > cursor) cursor = row.receivedAt; } if (page.length === 0) break; } return cursor; } function serializeBlob(blob: BlobRow): { msgId: string; ciphertext: string; receivedAt: number; expiresAt: number; from?: string; } { const out: { msgId: string; ciphertext: string; receivedAt: number; expiresAt: number; from?: string; } = { msgId: blob.msgId, ciphertext: toBase64(blob.ciphertext), receivedAt: blob.receivedAt, expiresAt: blob.expiresAt, }; // V4.8 — relay-captured sender fingerprint. The transport-bridge // wire format already accepts `from`; populating it lets receivers // bootstrap unknown-sender first-contact via `shade.receive('fp:', // env)` without requiring an out-of-band sender hint. if (blob.senderFp) out.from = blob.senderFp; return out; } function buildPollResponse(blobs: BlobRow[], sinceFallback: number): { blobs: ReturnType[]; cursor: number; hasMore: boolean; } { const out = blobs.map(serializeBlob); const cursor = blobs.length > 0 ? blobs[blobs.length - 1]!.receivedAt : sinceFallback; return { blobs: out, cursor, hasMore: false }; } interface WaitForBlobsArgs { events: InboxServerEvents | null; store: InboxStore; address: string; since: number; timeoutMs: number; pageLimit: number; fallbackPollIntervalMs: number; abortSignal?: AbortSignal; } async function waitForBlobs(args: WaitForBlobsArgs): Promise { if (args.timeoutMs === 0) return []; return new Promise((resolve) => { let resolved = false; let unsubscribe: (() => void) | null = null; let timer: ReturnType | null = null; let fallback: ReturnType | null = null; let abortHandler: (() => void) | null = null; const finish = (blobs: BlobRow[]) => { if (resolved) return; resolved = true; if (timer) clearTimeout(timer); if (fallback) clearInterval(fallback); if (unsubscribe) unsubscribe(); if (abortHandler && args.abortSignal) { args.abortSignal.removeEventListener('abort', abortHandler); } resolve(blobs); }; const tryFetch = async (): Promise => { try { const rows = await args.store.fetchBlobs({ address: args.address, sinceCursor: args.since, now: Date.now(), limit: args.pageLimit, }); if (rows.length > 0) finish(rows); } catch { // swallow — let timeout handle it } }; if (args.events) { unsubscribe = args.events.on((e) => { if (e.name === 'inbox.blob_stored' && e.data.address === args.address) { void tryFetch(); } }); } fallback = setInterval(tryFetch, args.fallbackPollIntervalMs); timer = setTimeout(() => finish([]), args.timeoutMs); if (args.abortSignal) { if (args.abortSignal.aborted) { finish([]); return; } abortHandler = () => finish([]); args.abortSignal.addEventListener('abort', abortHandler, { once: true }); } }); }