Files
Shade/packages/shade-transport-bridge/src/long-poll-bridge.ts

162 lines
5.8 KiB
TypeScript
Raw Normal View History

/**
* Long-poll bridge.
*
* Repeatedly issues `GET /v1/bridge/poll?since=<cursor>&…` with a 25-second
* default timeout (under typical proxy idle cutoffs). Each request blocks on
* the server until either a new envelope is available for the address or the
* timeout fires. The contract is simple by design at any time exactly one
* outstanding request per client.
*
* The signed query string is regenerated for every request because the
* inbox auth layer rejects signatures older than 5 minutes long-poll
* connections may live indefinitely, but each individual request lives at
* most `pollTimeoutMs` (server) + a small buffer.
*/
import type { BridgeConnectOptions, BridgeTransport, BridgeWireMessage } from './types.js';
import { decodeWireMessage } from './types.js';
import type { BridgeAuthInput } from './auth.js';
import { signBridgeQuery } from './auth.js';
import { BridgeError } from './errors.js';
export interface LongPollBridgeOptions {
baseUrl: string;
auth: Omit<BridgeAuthInput, 'kind' | 'since'>;
fetch?: typeof fetch;
/** Server-side hold timeout, in ms. Default 25_000. */
pollTimeoutMs?: number;
/** Client-side request budget, must exceed pollTimeoutMs. Default +5_000. */
requestTimeoutMs?: number;
/** Initial cursor — start of stream by default. */
startCursor?: number;
/** Backoff after an unrecoverable network error. Default 2_000. */
errorBackoffMs?: number;
/** Disable auto-loop (single request only — for tests). */
disableLoop?: boolean;
}
const DEFAULT_POLL_TIMEOUT_MS = 25_000;
const DEFAULT_ERROR_BACKOFF_MS = 2_000;
export class LongPollBridge implements BridgeTransport {
readonly kind = 'long-poll';
private readonly fetchFn: typeof fetch;
private cursor: number;
private disposed = false;
private connectStarted = false;
private inflight: AbortController | null = null;
private onMessage: BridgeConnectOptions['onMessage'] | null = null;
private onError: NonNullable<BridgeConnectOptions['onError']> = (err) =>
console.warn('[shade-bridge:long-poll]', err.message);
private loopPromise: Promise<void> | null = null;
constructor(private readonly options: LongPollBridgeOptions) {
this.fetchFn = options.fetch ?? globalThis.fetch;
this.cursor = options.startCursor ?? 0;
}
async connect(opts: BridgeConnectOptions): Promise<void> {
if (this.connectStarted) throw new BridgeError('LongPollBridge.connect already called');
this.connectStarted = true;
this.onMessage = opts.onMessage;
if (opts.onError) this.onError = opts.onError;
// Single probe request — establishes that the endpoint is reachable and
// authenticates correctly. We surface auth/4xx errors as connect-time
// failures so a fallback chain can switch to the next transport.
await this.pollOnce({ probe: true });
if (this.options.disableLoop) return;
this.loopPromise = this.runLoop();
}
async disconnect(): Promise<void> {
this.disposed = true;
this.inflight?.abort();
if (this.loopPromise) {
try {
await this.loopPromise;
} catch {
/* swallow loop teardown errors */
}
}
}
getCursor(): number {
return this.cursor;
}
private async runLoop(): Promise<void> {
while (!this.disposed) {
try {
await this.pollOnce({ probe: false });
} catch (err) {
if (this.disposed) return;
this.onError(err as Error);
await sleep(this.options.errorBackoffMs ?? DEFAULT_ERROR_BACKOFF_MS);
}
}
}
private async pollOnce(args: { probe: boolean }): Promise<void> {
const qs = await signBridgeQuery({
crypto: this.options.auth.crypto,
signingPrivateKey: this.options.auth.signingPrivateKey,
address: this.options.auth.address,
kind: 'poll',
since: this.cursor,
});
const pollTimeoutMs = this.options.pollTimeoutMs ?? DEFAULT_POLL_TIMEOUT_MS;
qs.set('timeoutMs', String(pollTimeoutMs));
const url = `${stripTrailingSlash(this.options.baseUrl)}/v1/bridge/poll?${qs.toString()}`;
this.inflight = new AbortController();
const requestBudget = this.options.requestTimeoutMs ?? pollTimeoutMs + 5_000;
const watchdog = setTimeout(() => this.inflight?.abort(), requestBudget);
try {
const res = await this.fetchFn(url, {
method: 'GET',
headers: { accept: 'application/json' },
signal: this.inflight.signal,
});
if (!res.ok) {
// For a probe call, surface the error so the caller can fall back.
// For a steady-state call, the loop's catch will rate-limit retries.
throw new BridgeError(`long-poll failed: HTTP ${res.status}`, res.status);
}
const body = (await res.json()) as { blobs: BridgeWireMessage[]; cursor?: number };
if (!Array.isArray(body.blobs)) {
throw new BridgeError('long-poll body missing `blobs` array');
}
for (const wire of body.blobs) {
if (typeof wire.receivedAt === 'number' && wire.receivedAt > this.cursor) {
this.cursor = wire.receivedAt;
}
const msg = decodeWireMessage(wire);
if (this.onMessage) {
try {
await this.onMessage(msg);
} catch (err) {
this.onError(err as Error);
}
}
}
if (typeof body.cursor === 'number' && body.cursor > this.cursor) {
this.cursor = body.cursor;
}
} catch (err) {
if (this.disposed) return;
// Probe call surfaces the error; loop call passes it back to the loop.
if (args.probe) throw err;
throw err;
} finally {
clearTimeout(watchdog);
}
}
}
function stripTrailingSlash(s: string): string {
return s.endsWith('/') ? s.slice(0, -1) : s;
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}