/** * SSE (Server-Sent Events) bridge. * * Connects to `/v1/bridge/stream?…` with a signed query string and * parses an SSE feed manually using `fetch` + `ReadableStream`. We intentionally * do NOT use the platform `EventSource` because: * * 1. `EventSource` does not let callers attach custom headers — query-param * auth is the only portable route, but custom retry / reconnect knobs * are also useful and the platform API hides them. * 2. `EventSource` is not available in Node by default; rolling our own * reader keeps the package zero-dep across runtimes. * * The wire format follows the standard SSE spec: * `id: \nevent: envelope\ndata: \n\n` * * Only the `envelope` event carries payload; comment lines (`: ping`) are * tolerated as keepalives. The `id` field is fed back to the server as * `Last-Event-ID` (also encoded as the `since` query param) on reconnect to * resume from the highest-seen cursor. */ import type { BridgeConnectOptions, BridgeTransport, IncomingMessage } from './types.js'; import { decodeWireMessage } from './types.js'; import type { BridgeAuthInput } from './auth.js'; import { signBridgeQuery } from './auth.js'; import { BridgeError } from './errors.js'; export interface SseBridgeOptions { baseUrl: string; auth: Omit; /** Override `fetch` (tests). */ fetch?: typeof fetch; /** Initial reconnect backoff (ms). Default 250. */ initialBackoffMs?: number; /** Max backoff (ms). Default 10_000. */ maxBackoffMs?: number; /** Initial cursor — start of stream by default. */ startCursor?: number; /** Disable auto-reconnect (tests). Default false. */ disableAutoReconnect?: boolean; } const DEFAULT_INITIAL_BACKOFF = 250; const DEFAULT_MAX_BACKOFF = 10_000; export class SseBridge implements BridgeTransport { readonly kind = 'sse'; private readonly fetchFn: typeof fetch; private cursor: number; private abortController: AbortController | null = null; private connected = false; private disposed = false; private connectStarted = false; private currentReader: ReadableStreamDefaultReader | null = null; private onMessage: BridgeConnectOptions['onMessage'] | null = null; private onError: NonNullable = (err) => console.warn('[shade-bridge:sse]', err.message); constructor(private readonly options: SseBridgeOptions) { this.fetchFn = options.fetch ?? globalThis.fetch; this.cursor = options.startCursor ?? 0; } async connect(opts: BridgeConnectOptions): Promise { if (this.connectStarted) throw new BridgeError('SseBridge.connect already called'); this.connectStarted = true; this.onMessage = opts.onMessage; if (opts.onError) this.onError = opts.onError; // Open the first connection; throw if it fails immediately so callers // can fall back to a different transport. await this.openOnce(); // Spawn the read loop; subsequent reconnects happen in the background. void this.readLoop(); } async disconnect(): Promise { this.disposed = true; if (this.currentReader) { try { await this.currentReader.cancel(); } catch { /* ignore */ } } this.abortController?.abort(); this.connected = false; } /** Public so tests / observability can read the latest cursor. */ getCursor(): number { return this.cursor; } /** * Opens a single SSE connection and stores the reader on the instance. * Throws on hard errors (network refused, non-200 status). Caller drives * the read loop. */ private async openOnce(): Promise { const qs = await signBridgeQuery({ crypto: this.options.auth.crypto, signingPrivateKey: this.options.auth.signingPrivateKey, address: this.options.auth.address, kind: 'stream', since: this.cursor, }); const url = `${stripTrailingSlash(this.options.baseUrl)}/v1/bridge/stream?${qs.toString()}`; this.abortController = new AbortController(); let res: Response; try { res = await this.fetchFn(url, { method: 'GET', headers: { accept: 'text/event-stream', 'cache-control': 'no-cache', 'last-event-id': String(this.cursor), }, signal: this.abortController.signal, }); } catch (err) { throw new BridgeError(`SSE connect failed: ${(err as Error).message}`); } if (!res.ok) { throw new BridgeError(`SSE connect failed: HTTP ${res.status}`, res.status); } if (!res.body) { throw new BridgeError('SSE response has no body'); } this.currentReader = res.body.getReader() as ReadableStreamDefaultReader; this.connected = true; } private async readLoop(): Promise { let backoff = this.options.initialBackoffMs ?? DEFAULT_INITIAL_BACKOFF; const maxBackoff = this.options.maxBackoffMs ?? DEFAULT_MAX_BACKOFF; while (!this.disposed) { try { if (!this.currentReader) { await this.openOnce(); } await this.consume(this.currentReader!); // Stream closed cleanly — server-side close. Reconnect. } catch (err) { if (this.disposed) return; this.onError(err as Error); } this.currentReader = null; this.connected = false; if (this.disposed || this.options.disableAutoReconnect) return; await sleep(backoff); backoff = Math.min(backoff * 2, maxBackoff); } } private async consume(reader: ReadableStreamDefaultReader): Promise { const decoder = new TextDecoder(); let buf = ''; let dataLines: string[] = []; let eventName: string | null = null; let eventId: string | null = null; while (true) { const { value, done } = await reader.read(); if (done) return; buf += decoder.decode(value, { stream: true }); let idx; while ((idx = buf.indexOf('\n')) !== -1) { const rawLine = buf.slice(0, idx); buf = buf.slice(idx + 1); const line = rawLine.endsWith('\r') ? rawLine.slice(0, -1) : rawLine; if (line === '') { // dispatch if (dataLines.length > 0) { const dataStr = dataLines.join('\n'); await this.handleEvent(eventName, eventId, dataStr); } dataLines = []; eventName = null; eventId = null; continue; } if (line.startsWith(':')) continue; // comment / keepalive const colon = line.indexOf(':'); const field = colon === -1 ? line : line.slice(0, colon); let val = colon === -1 ? '' : line.slice(colon + 1); if (val.startsWith(' ')) val = val.slice(1); switch (field) { case 'data': dataLines.push(val); break; case 'event': eventName = val; break; case 'id': eventId = val; break; // 'retry' ignored; we drive backoff ourselves. } } } } private async handleEvent(name: string | null, id: string | null, data: string): Promise { if (id !== null) { const n = Number(id); if (Number.isFinite(n) && n > this.cursor) this.cursor = n; } if (name && name !== 'envelope' && name !== 'message' && name !== '') { // Ignore non-payload events (e.g. ready, heartbeat). return; } let parsed: unknown; try { parsed = JSON.parse(data); } catch (err) { this.onError(new BridgeError(`malformed SSE data: ${(err as Error).message}`)); return; } const wire = parsed as { msgId: string; ciphertext: string; receivedAt: number; from?: string }; if (typeof wire.ciphertext !== 'string' || typeof wire.receivedAt !== 'number') { this.onError(new BridgeError('SSE event missing required fields')); return; } const msg: IncomingMessage = decodeWireMessage(wire); if (this.onMessage !== null) { try { await this.onMessage(msg); } catch (err) { this.onError(err as Error); } } } /** True between connect()'s first successful open and disconnect/error. */ get isConnected(): boolean { return this.connected; } } function stripTrailingSlash(s: string): string { return s.endsWith('/') ? s.slice(0, -1) : s; } function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); }