/** * Bridge routes — V3.7. * * Three transports, one delivery semantic. Each one streams the same * not-yet-acked inbox blobs for an authenticated address: * * GET /v1/bridge/stream — SSE feed, one envelope per `event: envelope` * GET /v1/bridge/poll — long-poll, returns at most one batch then closes * GET /v1/bridge/ws — WebSocket, JSON frame per envelope * * Auth: signed query string (`address`, `kind`, `since`, `signedAt`, * `signature`). The signature is verified against the address's owner key * registered via `/v1/inbox/register`. The `kind` field is bound into the * canonical signed payload to prevent cross-endpoint replay. * * Cursor semantics: `since` is the highest `receivedAt` the client already * processed. The server returns blobs strictly greater than that cursor and * advances the client's cursor by emitting a fresh `id:` (SSE) or by * including the highest seen `receivedAt` in the JSON response (poll/ws). * * The implementations subscribe to {@link InboxServerEvents} so that newly * stored blobs land on connected clients without polling the store. The * fallback path (no events configured) relies on a small in-process polling * timer with a configurable interval. */ import { Hono, type Context } from 'hono'; import { streamSSE } from 'hono/streaming'; import { createBunWebSocket } from 'hono/bun'; import type { CryptoProvider } from '@shade/core'; import { errorToHttpStatus, ShadeError, ValidationError, UnauthorizedError, toBase64, } from '@shade/core'; import { verifyPayload, validateAddress } from '@shade/server'; import type { InboxStore } from './store.js'; import type { InboxServerEvents } from './events.js'; export type BridgeKind = 'stream' | 'poll' | 'ws'; export interface BridgeRoutesOptions { store: InboxStore; crypto: CryptoProvider; /** Optional events emitter — enables push-style delivery. */ events?: InboxServerEvents; /** Maximum blobs returned per fetch page. Default 50. */ pageLimit?: number; /** Default long-poll hold (ms). Default 25_000 (under typical proxy cutoffs). */ longPollTimeoutMs?: number; /** Maximum long-poll hold (ms). Hard cap. Default 55_000. */ longPollMaxTimeoutMs?: number; /** SSE heartbeat interval (ms). Default 15_000. */ heartbeatIntervalMs?: number; /** * Fallback poll interval (ms) used when no `events` emitter is wired in. * The bridge will re-check the store at this cadence to detect new blobs. * Default 1_000. */ fallbackPollIntervalMs?: number; } interface VerifiedBridgeRequest { address: string; kind: BridgeKind; since: number; } /** * Build the bridge Hono router and a paired Bun-WebSocket handler. * * The HTTP routes (`/v1/bridge/stream`, `/v1/bridge/poll`) work on every * Hono runtime. The `/v1/bridge/ws` route requires `hono/adapter/bun` to be * available — we lazy-require it so that non-Bun deployments aren't * forced to ship the import. */ export function createBridgeRoutes(opts: BridgeRoutesOptions): { app: Hono; /** Pass to `Bun.serve({ websocket })`. Undefined if Bun adapter is missing. */ websocket: unknown; } { const app = new Hono(); const pageLimit = opts.pageLimit ?? 50; const heartbeatIntervalMs = opts.heartbeatIntervalMs ?? 15_000; const longPollDefault = opts.longPollTimeoutMs ?? 25_000; const longPollMax = opts.longPollMaxTimeoutMs ?? 55_000; const fallbackPollIntervalMs = opts.fallbackPollIntervalMs ?? 1_000; app.onError((err, c) => { if (err instanceof ShadeError) { const status = errorToHttpStatus(err); return c.json(err.toJSON(), status as any); } console.error('[Shade] Unhandled bridge error:', err); return c.json({ error: 'Internal server error' }, 500); }); // ─── SSE ────────────────────────────────────────────────────── app.get('/v1/bridge/stream', async (c) => { const verified = await verifyBridgeAuth(c, opts, 'stream'); return streamSSE(c, async (stream) => { const address = verified.address; let cursor = verified.since; const writer = makeBlobWriter(opts.store, pageLimit); // Initial backlog drain. const flushed = await flushTo(writer, address, cursor, async (blob) => { await stream.writeSSE({ id: String(blob.receivedAt), event: 'envelope', data: JSON.stringify(serializeBlob(blob)), }); }); cursor = Math.max(cursor, flushed); // Hook up event-driven push if available, else fall back to a poll // timer that does the same scan. let cleanupSubscription: (() => void) | null = null; let signalled = false; let pendingFlushPromise: Promise = Promise.resolve(); const triggerFlush = (): void => { signalled = true; // Serialize fan-in so concurrent triggers don't double-fetch. pendingFlushPromise = pendingFlushPromise.then(async () => { while (signalled) { signalled = false; const drained = await flushTo(writer, address, cursor, async (blob) => { await stream.writeSSE({ id: String(blob.receivedAt), event: 'envelope', data: JSON.stringify(serializeBlob(blob)), }); }); if (drained > cursor) cursor = drained; } }); }; if (opts.events) { cleanupSubscription = opts.events.on((e) => { if (e.name === 'inbox.blob_stored' && e.data.address === address) { triggerFlush(); } }); } const fallbackTimer = setInterval(() => triggerFlush(), fallbackPollIntervalMs); const heartbeat = setInterval(() => { // Comment lines are valid SSE keepalives. stream.write(`: ping ${Date.now()}\n\n`).catch(() => {}); }, heartbeatIntervalMs); // Wait for the request to abort (client disconnect). await new Promise((resolve) => { const sig = c.req.raw.signal; if (sig.aborted) return resolve(); sig.addEventListener('abort', () => resolve(), { once: true }); }); cleanupSubscription?.(); clearInterval(fallbackTimer); clearInterval(heartbeat); await pendingFlushPromise.catch(() => {}); }); }); // ─── Long-poll ──────────────────────────────────────────────── app.get('/v1/bridge/poll', async (c) => { const verified = await verifyBridgeAuth(c, opts, 'poll'); const requestedTimeout = Number(c.req.query('timeoutMs') ?? longPollDefault); const timeoutMs = Math.min( Math.max(0, Number.isFinite(requestedTimeout) ? requestedTimeout : longPollDefault), longPollMax, ); // Try immediate fetch first. let blobs = await opts.store.fetchBlobs({ address: verified.address, sinceCursor: verified.since, now: Date.now(), limit: pageLimit, }); if (blobs.length > 0) { return c.json(buildPollResponse(blobs, verified.since)); } // Otherwise, wait for either a new event or the timeout. blobs = await waitForBlobs({ events: opts.events ?? null, store: opts.store, address: verified.address, since: verified.since, timeoutMs, pageLimit, fallbackPollIntervalMs, abortSignal: c.req.raw.signal, }); return c.json(buildPollResponse(blobs, verified.since)); }); // ─── WebSocket ──────────────────────────────────────────────── // Hono's Bun adapter resolves `getBunServer` from the request's `env` // (the second argument of Bun.serve's fetch). On non-Bun runtimes the // upgrade simply fails at runtime; the SSE/long-poll routes still work. const { upgradeWebSocket, websocket } = createBunWebSocket(); app.get( '/v1/bridge/ws', upgradeWebSocket(async (c: Context) => { let verified: VerifiedBridgeRequest | null = null; let upgradeError: Error | null = null; try { verified = await verifyBridgeAuth(c, opts, 'ws'); } catch (err) { upgradeError = err as Error; } if (!verified) { // Hono's API doesn't let us reject the upgrade with a status code // before opening the socket; close immediately on open with a 4xxx // policy code so the client can fall back to a different bridge. return { onOpen(_evt: unknown, ws: { close: (code?: number, reason?: string) => void }) { const status = upgradeError instanceof ShadeError ? errorToHttpStatus(upgradeError) : 500; ws.close(4000 + (status % 1000), upgradeError?.message ?? 'unauthorized'); }, }; } const address = verified.address; let cursor = verified.since; const writer = makeBlobWriter(opts.store, pageLimit); let unsubscribe: (() => void) | null = null; let fallbackTimer: ReturnType | null = null; let pendingFlushPromise: Promise = Promise.resolve(); let signalled = false; let connected = true; return { onOpen(_evt: unknown, ws: { send: (data: string) => void; close: (code?: number, reason?: string) => void; }) { const triggerFlush = (): void => { signalled = true; pendingFlushPromise = pendingFlushPromise.then(async () => { while (signalled && connected) { signalled = false; const drained = await flushTo(writer, address, cursor, async (blob) => { ws.send(JSON.stringify(serializeBlob(blob))); }); if (drained > cursor) cursor = drained; } }); }; if (opts.events) { unsubscribe = opts.events.on((e) => { if (e.name === 'inbox.blob_stored' && e.data.address === address) { triggerFlush(); } }); } fallbackTimer = setInterval(() => triggerFlush(), fallbackPollIntervalMs); triggerFlush(); }, onClose() { connected = false; unsubscribe?.(); if (fallbackTimer) clearInterval(fallbackTimer); }, }; }), ); return { app, websocket }; } // ─── helpers ────────────────────────────────────────────────── async function verifyBridgeAuth( c: Context, opts: BridgeRoutesOptions, expectedKind: BridgeKind, ): Promise { const url = new URL(c.req.url); const qs = url.searchParams; const address = validateAddress(qs.get('address')); const kind = qs.get('kind'); if (kind !== expectedKind) { throw new ValidationError(`bridge kind mismatch: expected ${expectedKind}`, 'kind'); } const sinceStr = qs.get('since'); const signedAtStr = qs.get('signedAt'); const signature = qs.get('signature'); if (sinceStr === null) throw new ValidationError('missing since', 'since'); if (signedAtStr === null) throw new ValidationError('missing signedAt', 'signedAt'); if (!signature) throw new ValidationError('missing signature', 'signature'); const since = Number(sinceStr); const signedAt = Number(signedAtStr); if (!Number.isFinite(since) || since < 0) { throw new ValidationError('since must be a non-negative number', 'since'); } if (!Number.isFinite(signedAt)) { throw new ValidationError('signedAt must be a number', 'signedAt'); } const owner = await opts.store.getAddressOwner(address); if (!owner) { throw new UnauthorizedError(`address ${address} is not registered`); } await verifyPayload(opts.crypto, owner, { address, kind, since, signedAt, signature, }); return { address, kind: kind as BridgeKind, since }; } interface BlobRow { msgId: string; ciphertext: Uint8Array; receivedAt: number; expiresAt: number; } interface BlobWriter { fetchPage(address: string, cursor: number): Promise; } function makeBlobWriter(store: InboxStore, pageLimit: number): BlobWriter { return { async fetchPage(address, cursor) { return store.fetchBlobs({ address, sinceCursor: cursor, now: Date.now(), limit: pageLimit, }); }, }; } async function flushTo( writer: BlobWriter, address: string, startCursor: number, emit: (blob: BlobRow) => Promise, ): Promise { let cursor = startCursor; // Drain page-by-page so a backlog larger than `pageLimit` still flushes. // eslint-disable-next-line no-constant-condition while (true) { const page = await writer.fetchPage(address, cursor); if (page.length === 0) break; for (const row of page) { await emit(row); if (row.receivedAt > cursor) cursor = row.receivedAt; } if (page.length === 0) break; } return cursor; } function serializeBlob(blob: BlobRow): { msgId: string; ciphertext: string; receivedAt: number; expiresAt: number; } { return { msgId: blob.msgId, ciphertext: toBase64(blob.ciphertext), receivedAt: blob.receivedAt, expiresAt: blob.expiresAt, }; } function buildPollResponse(blobs: BlobRow[], sinceFallback: number): { blobs: ReturnType[]; cursor: number; hasMore: boolean; } { const out = blobs.map(serializeBlob); const cursor = blobs.length > 0 ? blobs[blobs.length - 1]!.receivedAt : sinceFallback; return { blobs: out, cursor, hasMore: false }; } interface WaitForBlobsArgs { events: InboxServerEvents | null; store: InboxStore; address: string; since: number; timeoutMs: number; pageLimit: number; fallbackPollIntervalMs: number; abortSignal?: AbortSignal; } async function waitForBlobs(args: WaitForBlobsArgs): Promise { if (args.timeoutMs === 0) return []; return new Promise((resolve) => { let resolved = false; let unsubscribe: (() => void) | null = null; let timer: ReturnType | null = null; let fallback: ReturnType | null = null; let abortHandler: (() => void) | null = null; const finish = (blobs: BlobRow[]) => { if (resolved) return; resolved = true; if (timer) clearTimeout(timer); if (fallback) clearInterval(fallback); if (unsubscribe) unsubscribe(); if (abortHandler && args.abortSignal) { args.abortSignal.removeEventListener('abort', abortHandler); } resolve(blobs); }; const tryFetch = async (): Promise => { try { const rows = await args.store.fetchBlobs({ address: args.address, sinceCursor: args.since, now: Date.now(), limit: args.pageLimit, }); if (rows.length > 0) finish(rows); } catch { // swallow — let timeout handle it } }; if (args.events) { unsubscribe = args.events.on((e) => { if (e.name === 'inbox.blob_stored' && e.data.address === args.address) { void tryFetch(); } }); } fallback = setInterval(tryFetch, args.fallbackPollIntervalMs); timer = setTimeout(() => finish([]), args.timeoutMs); if (args.abortSignal) { if (args.abortSignal.aborted) { finish([]); return; } abortHandler = () => finish([]); args.abortSignal.addEventListener('abort', abortHandler, { once: true }); } }); }