/** * Per-peer outbound queue + queue-transport for browser-friendly * pull-mode streams. * * The default `ShadeTransferHttpTransport` POSTs each chunk directly * to the receiver's `/v1/transfer//chunk` route. That * requires the receiver to host an HTTP server, which a browser tab * cannot. `QueueTransferTransport` flips the direction: it queues * chunks per peer and lets the receiver pull them via a long-poll * endpoint. * * The companion `OutboundQueue` data structure is plain server-side * state — wired up by `@shade/files`'s `transferQueueRoute()` (and any * future consumer) to expose the long-poll surface and feed envelopes * + chunks into the browser receiver. */ import type { ChunkAck, ChunkSendOptions, ITransferTransport, TransferResumeState, } from './transport.js'; import { TransferTransportError } from '../errors.js'; /** * Discriminated union of items the queue ships to the browser * receiver. Both kinds carry **wire-encoded bytes** of an envelope — * the receiver decodes via `decodeEnvelope` (control envelopes) or * forwards directly to `engine.receiveChunk` (chunk envelopes). */ export type QueuedEvent = | { id: number; timestampMs: number; kind: 'envelope'; /** Wire-encoded `0x02` ratchet envelope (or `0x01` first-message). */ bytes: Uint8Array; } | { id: number; timestampMs: number; kind: 'chunk'; /** Wire-encoded `0x11` stream-chunk envelope. */ bytes: Uint8Array; meta: { streamId: string; laneId: number; seq: number; }; }; /** Caller-supplied shape for {@link OutboundQueue.enqueue} — the queue assigns `id` + `timestampMs`. */ export type QueuedEventInput = | { kind: 'envelope'; bytes: Uint8Array } | { kind: 'chunk'; bytes: Uint8Array; meta: { streamId: string; laneId: number; seq: number }; }; export interface OutboundQueueOptions { /** * Maximum events held per peer. When the queue is full, the oldest * unacked event is dropped on next enqueue. Default 1000 — at the * default chunk size (256 KiB plaintext) this caps a single peer's * outbound buffer at ~256 MiB. Tune up for fewer/bigger streams, * down for many concurrent small flows. */ maxEventsPerPeer?: number; /** * After a peer has not polled for this long, the queue's events are * dropped and any pending waiters are released. Default 10 minutes. * Setting to `0` disables idle-eviction. */ idleEvictionMs?: number; } const DEFAULT_MAX_EVENTS = 1000; const DEFAULT_IDLE_EVICTION_MS = 10 * 60 * 1000; interface PendingWaiter { resolve(events: QueuedEvent[]): void; reject(err: Error): void; timer: ReturnType; abortHandler?: () => void; signal?: AbortSignal; } interface PerPeerState { nextId: number; events: QueuedEvent[]; waiters: PendingWaiter[]; lastTouchedMs: number; } /** * Per-peer monotonic event log with long-poll semantics. * * `enqueue` appends; `drain` returns all events with `id > since`, * blocking up to `blockMs` if there are none. `since`-based pagination * is the resume mechanism: a client crashing mid-stream restarts with * its last-processed id and the queue replays everything after it * (subject to `maxEventsPerPeer` retention). */ export class OutboundQueue { private peers = new Map(); private readonly maxEvents: number; private readonly idleEvictionMs: number; private evictTimer: ReturnType | null = null; private destroyed = false; constructor(opts: OutboundQueueOptions = {}) { this.maxEvents = opts.maxEventsPerPeer ?? DEFAULT_MAX_EVENTS; this.idleEvictionMs = opts.idleEvictionMs ?? DEFAULT_IDLE_EVICTION_MS; if (this.idleEvictionMs > 0) this.scheduleEviction(); } /** Append an event and wake any waiters for that peer. */ enqueue(peer: string, ev: QueuedEventInput): QueuedEvent { if (this.destroyed) throw new Error('OutboundQueue: destroyed'); const state = this.getOrCreate(peer); const event: QueuedEvent = ev.kind === 'chunk' ? { id: state.nextId++, timestampMs: Date.now(), kind: 'chunk', bytes: ev.bytes, meta: ev.meta, } : { id: state.nextId++, timestampMs: Date.now(), kind: 'envelope', bytes: ev.bytes, }; state.events.push(event); state.lastTouchedMs = Date.now(); // Cap: drop oldest. Lost events trigger receiver-side resume from // last polled id; the @shade/transfer engine handles missing seqs // by re-sending on resume. while (state.events.length > this.maxEvents) state.events.shift(); // Wake all waiters with whatever has accumulated. const drained = this.collect(state, 0); if (drained.length > 0) { const waiters = state.waiters.splice(0); for (const w of waiters) { clearTimeout(w.timer); if (w.abortHandler !== undefined && w.signal !== undefined) { w.signal.removeEventListener('abort', w.abortHandler); } w.resolve(drained); } } return event; } /** * Drain events with `id > since`. If none are available, block up * to `blockMs` until any arrive. `signal` cancels the wait early. */ async drain( peer: string, since: number, blockMs: number, signal?: AbortSignal, ): Promise { if (this.destroyed) throw new Error('OutboundQueue: destroyed'); const state = this.getOrCreate(peer); state.lastTouchedMs = Date.now(); const ready = this.collect(state, since); if (ready.length > 0 || blockMs <= 0) return ready; if (signal?.aborted) return []; return await new Promise((resolve, reject) => { const timer = setTimeout(() => { const idx = state.waiters.indexOf(waiter); if (idx >= 0) state.waiters.splice(idx, 1); if (waiter.abortHandler !== undefined && waiter.signal !== undefined) { waiter.signal.removeEventListener('abort', waiter.abortHandler); } // Empty drain on timeout — that's the "no new events" signal. resolve([]); }, blockMs); const waiter: PendingWaiter = { resolve, reject, timer }; if (signal !== undefined) { const handler = () => { const idx = state.waiters.indexOf(waiter); if (idx >= 0) state.waiters.splice(idx, 1); clearTimeout(timer); resolve([]); }; signal.addEventListener('abort', handler, { once: true }); waiter.abortHandler = handler; waiter.signal = signal; } state.waiters.push(waiter); }); } /** Drop a peer's queue + reject waiters. */ evict(peer: string): void { const state = this.peers.get(peer); if (state === undefined) return; this.peers.delete(peer); for (const w of state.waiters) { clearTimeout(w.timer); if (w.abortHandler !== undefined && w.signal !== undefined) { w.signal.removeEventListener('abort', w.abortHandler); } w.reject(new Error('OutboundQueue: peer evicted')); } } /** Peer-specific snapshot for diagnostics. */ stats(peer: string): { eventCount: number; nextId: number; waiters: number } | null { const state = this.peers.get(peer); if (state === undefined) return null; return { eventCount: state.events.length, nextId: state.nextId, waiters: state.waiters.length, }; } /** Tear everything down. Pending waiters are rejected. */ destroy(): void { if (this.destroyed) return; this.destroyed = true; if (this.evictTimer !== null) clearTimeout(this.evictTimer); for (const peer of [...this.peers.keys()]) this.evict(peer); } // ─── internals ────────────────────────────────────────────── private getOrCreate(peer: string): PerPeerState { let state = this.peers.get(peer); if (state === undefined) { state = { nextId: 1, events: [], waiters: [], lastTouchedMs: Date.now(), }; this.peers.set(peer, state); } return state; } private collect(state: PerPeerState, since: number): QueuedEvent[] { if (state.events.length === 0) return []; return state.events.filter((e) => e.id > since); } private scheduleEviction(): void { const interval = Math.max(60_000, Math.floor(this.idleEvictionMs / 4)); this.evictTimer = setTimeout(() => { if (this.destroyed) return; const cutoff = Date.now() - this.idleEvictionMs; for (const [peer, state] of this.peers.entries()) { if (state.lastTouchedMs < cutoff) this.evict(peer); } this.scheduleEviction(); }, interval); (this.evictTimer as unknown as { unref?: () => void }).unref?.(); } } /** * Chunk transport that enqueues into an {@link OutboundQueue} instead * of POSTing. * * Returns an optimistic `ChunkAck` immediately because the queue *is* * the delivery — the receiver polls and dispatches. Browser receivers * cannot synchronously confirm receipt before the next chunk; the * engine's stream-protocol catches dropped chunks at finish-time * integrity check, and chunk-resume restarts the lane from the last * polled `since`. */ export class QueueTransferTransport implements ITransferTransport { constructor(private readonly queue: OutboundQueue) {} async probe(_peer: string): Promise { // The queue is local. Reachability is "is there a poller?" which // is decided by `idleEvictionMs`. We don't synchronously check // here; the engine retries via `withRetry` on `sendChunk` errors. } async sendChunk( peer: string, streamId: string, laneId: number, seq: number | bigint, bytes: Uint8Array, options?: ChunkSendOptions, ): Promise { if (options?.signal?.aborted) { throw new TransferTransportError('sendChunk aborted by caller'); } const seqNum = typeof seq === 'bigint' ? Number(seq) : seq; this.queue.enqueue(peer, { kind: 'chunk', bytes, meta: { streamId, laneId, seq: seqNum }, }); return { lastSeq: seqNum }; } async fetchResumeState( _peer: string, _streamId: string, ): Promise { // Pull-mode receivers report resume state by re-polling with the // `since` cursor they last successfully processed; the queue does // not need to query the receiver. Return null so the engine // restarts from seq 0 (deterministic), and the queue replays from // `since=0` if the client reconnects fresh. return null; } }