Files
Shade/packages/shade-transport-bridge/src/sse-bridge.ts

247 lines
8.3 KiB
TypeScript
Raw Normal View History

/**
* SSE (Server-Sent Events) bridge.
*
* Connects to `<base>/v1/bridge/stream?…` with a signed query string and
* parses an SSE feed manually using `fetch` + `ReadableStream`. We intentionally
* do NOT use the platform `EventSource` because:
*
* 1. `EventSource` does not let callers attach custom headers query-param
* auth is the only portable route, but custom retry / reconnect knobs
* are also useful and the platform API hides them.
* 2. `EventSource` is not available in Node by default; rolling our own
* reader keeps the package zero-dep across runtimes.
*
* The wire format follows the standard SSE spec:
* `id: <receivedAt>\nevent: envelope\ndata: <json>\n\n`
*
* Only the `envelope` event carries payload; comment lines (`: ping`) are
* tolerated as keepalives. The `id` field is fed back to the server as
* `Last-Event-ID` (also encoded as the `since` query param) on reconnect to
* resume from the highest-seen cursor.
*/
import type { BridgeConnectOptions, BridgeTransport, IncomingMessage } from './types.js';
import { decodeWireMessage } from './types.js';
import type { BridgeAuthInput } from './auth.js';
import { signBridgeQuery } from './auth.js';
import { BridgeError } from './errors.js';
export interface SseBridgeOptions {
baseUrl: string;
auth: Omit<BridgeAuthInput, 'kind' | 'since'>;
/** Override `fetch` (tests). */
fetch?: typeof fetch;
/** Initial reconnect backoff (ms). Default 250. */
initialBackoffMs?: number;
/** Max backoff (ms). Default 10_000. */
maxBackoffMs?: number;
/** Initial cursor — start of stream by default. */
startCursor?: number;
/** Disable auto-reconnect (tests). Default false. */
disableAutoReconnect?: boolean;
}
const DEFAULT_INITIAL_BACKOFF = 250;
const DEFAULT_MAX_BACKOFF = 10_000;
export class SseBridge implements BridgeTransport {
readonly kind = 'sse';
private readonly fetchFn: typeof fetch;
private cursor: number;
private abortController: AbortController | null = null;
private connected = false;
private disposed = false;
private connectStarted = false;
private currentReader: ReadableStreamDefaultReader<Uint8Array> | null = null;
private onMessage: BridgeConnectOptions['onMessage'] | null = null;
private onError: NonNullable<BridgeConnectOptions['onError']> = (err) =>
console.warn('[shade-bridge:sse]', err.message);
constructor(private readonly options: SseBridgeOptions) {
this.fetchFn = options.fetch ?? globalThis.fetch;
this.cursor = options.startCursor ?? 0;
}
async connect(opts: BridgeConnectOptions): Promise<void> {
if (this.connectStarted) throw new BridgeError('SseBridge.connect already called');
this.connectStarted = true;
this.onMessage = opts.onMessage;
if (opts.onError) this.onError = opts.onError;
// Open the first connection; throw if it fails immediately so callers
// can fall back to a different transport.
await this.openOnce();
// Spawn the read loop; subsequent reconnects happen in the background.
void this.readLoop();
}
async disconnect(): Promise<void> {
this.disposed = true;
if (this.currentReader) {
try {
await this.currentReader.cancel();
} catch {
/* ignore */
}
}
this.abortController?.abort();
this.connected = false;
}
/** Public so tests / observability can read the latest cursor. */
getCursor(): number {
return this.cursor;
}
/**
* Opens a single SSE connection and stores the reader on the instance.
* Throws on hard errors (network refused, non-200 status). Caller drives
* the read loop.
*/
private async openOnce(): Promise<void> {
const qs = await signBridgeQuery({
crypto: this.options.auth.crypto,
signingPrivateKey: this.options.auth.signingPrivateKey,
address: this.options.auth.address,
kind: 'stream',
since: this.cursor,
});
const url = `${stripTrailingSlash(this.options.baseUrl)}/v1/bridge/stream?${qs.toString()}`;
this.abortController = new AbortController();
let res: Response;
try {
res = await this.fetchFn(url, {
method: 'GET',
headers: {
accept: 'text/event-stream',
'cache-control': 'no-cache',
'last-event-id': String(this.cursor),
},
signal: this.abortController.signal,
});
} catch (err) {
throw new BridgeError(`SSE connect failed: ${(err as Error).message}`);
}
if (!res.ok) {
throw new BridgeError(`SSE connect failed: HTTP ${res.status}`, res.status);
}
if (!res.body) {
throw new BridgeError('SSE response has no body');
}
release(v4.0.1): strict-TS publishability fixes 4.0.0 shipped TypeScript source as published main/types, but several files only compiled inside the monorepo. Consumer projects (Dispatch, etc.) running their own strict tsc against our published source hit: - @shade/key-transparency: 4 noUnusedLocals violations (IndexAbsenceProof, IndexInclusionProof, IndexProofWire, nodeHash) - @shade/sdk: KT verifier callbacks returned Promise<unknown> instead of Promise<STHWire> / Promise<{ proof: string[] }> - @shade/sdk: thumbnail.ts globalThis cast collided with consumer's lib.dom-supplied createImageBitmap signature - @shade/files: cycle with @shade/sdk produced "this is not assignable to type 'Shade'" because hoisted node_modules layouts duplicated the Shade class. Broken by replacing `import type { Shade }` with a local structural ShadeBridge interface. - @shade/storage-encrypted: KeyUsage (lib.dom) used under lib: ["ES2022"] - @shade/transport-bridge: ReadableStreamDefaultReader<any> ↔ <Uint8Array> mismatch - @shade/keychain / @shade/dashboard / @shade/storage-encrypted tsconfig rootDir / include hygiene Tooling: scripts/typecheck-all.ts runs `bunx tsc --noEmit` against every workspace package's tsconfig and fails on any error. Wired into publish:dry / publish:all and publish-shade.sh as a hard gate so this class of bug cannot recur. All 24 packages bumped to 4.0.1 in lockstep. Migration: <ShadeFilesProvider> now requires an explicit `files` prop (pass `shade.files`). Wire format unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 19:36:47 +02:00
this.currentReader = res.body.getReader() as ReadableStreamDefaultReader<Uint8Array>;
this.connected = true;
}
private async readLoop(): Promise<void> {
let backoff = this.options.initialBackoffMs ?? DEFAULT_INITIAL_BACKOFF;
const maxBackoff = this.options.maxBackoffMs ?? DEFAULT_MAX_BACKOFF;
while (!this.disposed) {
try {
if (!this.currentReader) {
await this.openOnce();
}
await this.consume(this.currentReader!);
// Stream closed cleanly — server-side close. Reconnect.
} catch (err) {
if (this.disposed) return;
this.onError(err as Error);
}
this.currentReader = null;
this.connected = false;
if (this.disposed || this.options.disableAutoReconnect) return;
await sleep(backoff);
backoff = Math.min(backoff * 2, maxBackoff);
}
}
private async consume(reader: ReadableStreamDefaultReader<Uint8Array>): Promise<void> {
const decoder = new TextDecoder();
let buf = '';
let dataLines: string[] = [];
let eventName: string | null = null;
let eventId: string | null = null;
while (true) {
const { value, done } = await reader.read();
if (done) return;
buf += decoder.decode(value, { stream: true });
let idx;
while ((idx = buf.indexOf('\n')) !== -1) {
const rawLine = buf.slice(0, idx);
buf = buf.slice(idx + 1);
const line = rawLine.endsWith('\r') ? rawLine.slice(0, -1) : rawLine;
if (line === '') {
// dispatch
if (dataLines.length > 0) {
const dataStr = dataLines.join('\n');
await this.handleEvent(eventName, eventId, dataStr);
}
dataLines = [];
eventName = null;
eventId = null;
continue;
}
if (line.startsWith(':')) continue; // comment / keepalive
const colon = line.indexOf(':');
const field = colon === -1 ? line : line.slice(0, colon);
let val = colon === -1 ? '' : line.slice(colon + 1);
if (val.startsWith(' ')) val = val.slice(1);
switch (field) {
case 'data':
dataLines.push(val);
break;
case 'event':
eventName = val;
break;
case 'id':
eventId = val;
break;
// 'retry' ignored; we drive backoff ourselves.
}
}
}
}
private async handleEvent(name: string | null, id: string | null, data: string): Promise<void> {
if (id !== null) {
const n = Number(id);
if (Number.isFinite(n) && n > this.cursor) this.cursor = n;
}
if (name && name !== 'envelope' && name !== 'message' && name !== '') {
// Ignore non-payload events (e.g. ready, heartbeat).
return;
}
let parsed: unknown;
try {
parsed = JSON.parse(data);
} catch (err) {
this.onError(new BridgeError(`malformed SSE data: ${(err as Error).message}`));
return;
}
const wire = parsed as { msgId: string; ciphertext: string; receivedAt: number; from?: string };
if (typeof wire.ciphertext !== 'string' || typeof wire.receivedAt !== 'number') {
this.onError(new BridgeError('SSE event missing required fields'));
return;
}
const msg: IncomingMessage = decodeWireMessage(wire);
if (this.onMessage !== null) {
try {
await this.onMessage(msg);
} catch (err) {
this.onError(err as Error);
}
}
}
/** True between connect()'s first successful open and disconnect/error. */
get isConnected(): boolean {
return this.connected;
}
}
function stripTrailingSlash(s: string): string {
return s.endsWith('/') ? s.slice(0, -1) : s;
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}