feat(files): @shade/files 0.3.0 — E2EE filesystem RPC primitive
Some checks failed
Test / test (push) Has been cancelled

M-Files-1..6 land the full files-RPC layer + everything 0.3.0 needs to
ship. Apps keep their own UI; this layer ships the typed RPC, the
streams bridge for content I/O, and production hooks (rate limit,
retention, fingerprint gate, metrics).

@shade/files (NEW)
- Standard ops: list/stat/mkdir/delete/move/read/write/getThumbnail with
  Zod-validated wire schemas + clean user-handler types.
- Custom ops: typed via TypeScript declaration merging on CustomOpsMap
  + per-op Zod schemas; client.custom('app.foo', {...}) is fully typed.
- Content I/O: inline (≤ 256 KiB plaintext) base64-in-RPC; streams
  (> 256 KiB) ride @shade/transfer via userMetadata.shadeFilesWriteId
  / shadeFilesReadStreamId correlation. Server-side TransformStream
  bridges accept inbound transfers immediately (engine rejects chunks
  that arrive before accept) and park the readable for the matching
  RPC.
- Directory ops: walk(path, opts) async-iterable depth-first walker;
  uploadDirectory()/downloadDirectory() with bounded concurrency pool
  (default 4, cap 16), aggregated progress, abort.
- Production hooks (callback-based, vendor-neutral): rate-limit (op +
  byte), idempotency cache (LRU + TTL + in-flight de-dupe), path
  policy (traversal + percent-decode hardening), fingerprint gate
  (required/optional/reject), pluggable Ed25519 sig verification with
  ±5 min replay window, onMetric sink (standard names).
- React hooks (subpath @shade/files/react): ShadeFilesProvider,
  useShadeFiles, useFileList, useFileTransfer/Upload/Download.
- Shade.files.serve(handler) + Shade.files.client(peer) high-level
  entrypoint in @shade/sdk; lazy + memoized; one handler per Shade.

Wire format bump
- @shade/proto wire VERSION 0x01 → 0x02. Length prefixes changed from
  u16 to u32. The previous u16 silently truncated payloads above
  64 KiB — a hard correctness ceiling that blocked inline file ops
  up to 256 KiB. Wire-incompatible with 0.2.x peers; new sessions
  only. Cross-platform Kotlin port (android/shade-android) updated to
  match; test-vectors/wire-format.json regenerated.

Concurrency safety
- ShadeSessionManager.encrypt/.decrypt now run under per-peer mutex.
  Concurrent decryptions of the same peer raced ratchet state
  (manifested as sporadic "Failed to decrypt — wrong key or tampered
  data" under load — surfaced once concurrent uploadDirectory pumped
  many writes in flight). Encrypt was already serialized via
  Shade.send's encryptChains; decrypt is now serialized at the
  manager layer too.

@shade/streams extension
- StreamMetadata.userMetadata?: Record<string, string> for
  application-level key/value pairs that round-trip verbatim through
  stream-init plaintext. Used by @shade/files for write/read
  correlation; available to any consumer.

@shade/sdk extension
- Shade.files getter (lazy + memoized).
- BackgroundHooks.onPruneFiles + periodic timer (default 5 min) +
  BackgroundTasks.setHook(name, fn) for runtime hook registration.

Bundles in-flight 0.2.0 work
- packages/shade-streams/, packages/shade-transfer/, related
  shade-sdk streams-bridge + shade-widgets transfer hooks were
  uncommitted prior to this session. Including them keeps the
  workspace consistent at 0.3.0 since @shade/files depends on them.

Tests
- 74 new tests in @shade/files (572 → 646 workspace pass; 0 fail;
  3× stable). Coverage spans unit (inline-threshold + concurrency),
  integration (read-write inline + streams up to 1 MiB, walk +
  upload/download directory, custom-op, metrics, SDK namespace
  end-to-end), and security (tampered-envelope sig verification,
  replay window, fingerprint gate, rate-limit + quota).

Release artifacts
- All packages bumped to 0.3.0 via scripts/bump-version.ts.
- scripts/publish-all.ts PACKAGES updated with shade-files in
  topological order (after shade-transfer, before shade-sdk).
- bun run publish:dry clean (14 packed, 0 failed).
- examples/08-files-browser/ — three-process CLI demo (prekey + Bob
  server + Alice CLI) covering list/stat/mkdir/delete/upload/download.
- docs/files.md — full API + design doc.
- CHANGELOG.md 0.3.0 entry.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-02 14:00:01 +02:00
parent 7e0f7320a9
commit fa770d3063
198 changed files with 20412 additions and 256 deletions

View File

@@ -0,0 +1,21 @@
{
"name": "@shade/transfer",
"version": "0.3.0",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",
"dependencies": {
"@shade/core": "workspace:*",
"@shade/crypto-web": "workspace:*",
"@shade/proto": "workspace:*",
"@shade/streams": "workspace:*"
},
"peerDependencies": {
"hono": "^4"
},
"peerDependenciesMeta": {
"hono": {
"optional": true
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,53 @@
import { ShadeError } from '@shade/core';
export class TransferError extends ShadeError {
constructor(code: string, message: string) {
super(code, message);
this.name = 'TransferError';
}
}
export class TransferAbortError extends TransferError {
constructor(message = 'Transfer aborted') {
super('SHADE_TRANSFER_ABORT', message);
this.name = 'TransferAbortError';
}
}
export class TransferIntegrityError extends TransferError {
constructor(message: string) {
super('SHADE_TRANSFER_INTEGRITY', message);
this.name = 'TransferIntegrityError';
}
}
export class TransferProtocolError extends TransferError {
constructor(message: string) {
super('SHADE_TRANSFER_PROTOCOL', message);
this.name = 'TransferProtocolError';
}
}
export class TransferOfflineError extends TransferError {
constructor(peer: string, message?: string) {
super(
'SHADE_TRANSFER_OFFLINE',
message ?? `Peer ${peer} is offline. Queue/relay support deferred to a later version.`,
);
this.name = 'TransferOfflineError';
}
}
export class TransferResumeError extends TransferError {
constructor(message: string) {
super('SHADE_TRANSFER_RESUME', message);
this.name = 'TransferResumeError';
}
}
export class TransferTransportError extends TransferError {
constructor(message: string, public readonly statusCode?: number) {
super('SHADE_TRANSFER_TRANSPORT', message);
this.name = 'TransferTransportError';
}
}

View File

@@ -0,0 +1,29 @@
export * from './errors.js';
export * from './types.js';
export * from './retry.js';
export * from './progress.js';
export * from './transport/transport.js';
export * from './transport/memory.js';
export * from './transport/http-transport.js';
export * from './transport/ws-transport.js';
export * from './engine.js';
export {
createTransferRoutes,
PermissiveAuthenticator,
} from './receiver/server-handler.js';
export type {
TransferRouteOptions,
TransferRouteAuthenticator,
} from './receiver/server-handler.js';
export { resolveOutputSink } from './receiver/output.js';
export type { OutputSink } from './receiver/output.js';
export { normalizeInput } from './sender/input.js';
export type { NormalizedInput } from './sender/input.js';
export {
MemoryResumeStore,
StorageBackedResumeStore,
deriveDeviceKey,
wrapStreamSecret,
unwrapStreamSecret,
} from './persistence/resume.js';
export type { ResumeStore } from './persistence/resume.js';

View File

@@ -0,0 +1,136 @@
/**
* Resume-store contract for `@shade/transfer`.
*
* The engine persists in-flight transfer state through this interface so a
* crashed sender or receiver can pick up where it left off. v0.2.0 ships
* three implementations:
*
* - `MemoryResumeStore` — never persists; suitable for ephemeral
* transfers where resume is not needed.
* - `StorageBackedResumeStore` — wraps a Shade `StorageProvider`, with
* `deviceKey`-based AES-GCM encryption of the streamSecret at rest.
* - `IndexedDbResumeStore` — browser-side; built on top of the same
* `PersistedStreamState` shape.
*/
import type { CryptoProvider, PersistedStreamState, StorageProvider } from '@shade/core';
export interface ResumeStore {
/** Persist or update a stream's resume record. Idempotent on streamId. */
save(state: PersistedStreamState): Promise<void>;
/** Look up a stream's record by streamId. Returns null when absent. */
get(streamId: string): Promise<PersistedStreamState | null>;
/** Remove a stream's record (e.g. on completion). */
remove(streamId: string): Promise<void>;
/** List all non-final records (status `'active' | 'paused'`). */
listActive(direction?: 'send' | 'receive'): Promise<PersistedStreamState[]>;
}
/** In-memory store; useful for tests and ephemeral transfers. */
export class MemoryResumeStore implements ResumeStore {
private map = new Map<string, PersistedStreamState>();
async save(state: PersistedStreamState): Promise<void> {
this.map.set(state.streamId, { ...state });
}
async get(streamId: string): Promise<PersistedStreamState | null> {
const s = this.map.get(streamId);
return s ? { ...s } : null;
}
async remove(streamId: string): Promise<void> {
this.map.delete(streamId);
}
async listActive(direction?: 'send' | 'receive'): Promise<PersistedStreamState[]> {
const out: PersistedStreamState[] = [];
for (const s of this.map.values()) {
if (s.status !== 'active' && s.status !== 'paused') continue;
if (direction !== undefined && s.direction !== direction) continue;
out.push({ ...s });
}
out.sort((a, b) => b.updatedAt - a.updatedAt);
return out;
}
}
/**
* Storage-backed resume store. Delegates persistence to a `StorageProvider`
* that implements the optional stream-state methods. Throws clearly if the
* provider does not.
*/
export class StorageBackedResumeStore implements ResumeStore {
constructor(private readonly storage: StorageProvider) {
if (
storage.saveStreamState === undefined ||
storage.getStreamState === undefined ||
storage.removeStreamState === undefined ||
storage.listActiveStreamStates === undefined
) {
throw new Error(
'StorageBackedResumeStore requires a StorageProvider with stream-state methods',
);
}
}
async save(state: PersistedStreamState): Promise<void> {
await this.storage.saveStreamState!(state);
}
async get(streamId: string): Promise<PersistedStreamState | null> {
return this.storage.getStreamState!(streamId);
}
async remove(streamId: string): Promise<void> {
await this.storage.removeStreamState!(streamId);
}
async listActive(direction?: 'send' | 'receive'): Promise<PersistedStreamState[]> {
return this.storage.listActiveStreamStates!(direction);
}
}
/**
* Derive a device-specific encryption key from the local identity signing
* key. Used to wrap the streamSecret before persisting it.
*
* deviceKey = HKDF(
* ikm = signingPrivateKey,
* salt = "shade-stream-resume/v1/device",
* info = "device-encryption-key",
* length = 32,
* )
*
* Identity rotation invalidates all existing device-keyed records (resume
* stops working until a fresh transfer is initiated). This is intentional:
* a stolen pre-rotation DB cannot decrypt a post-rotation in-flight stream.
*/
export async function deriveDeviceKey(
crypto: CryptoProvider,
signingPrivateKey: Uint8Array,
): Promise<Uint8Array> {
const enc = new TextEncoder();
return crypto.hkdf(
signingPrivateKey,
enc.encode('shade-stream-resume/v1/device'),
enc.encode('device-encryption-key'),
32,
);
}
/** Encrypt a streamSecret under a deviceKey. Returns ciphertext + nonce. */
export async function wrapStreamSecret(
crypto: CryptoProvider,
deviceKey: Uint8Array,
streamSecret: Uint8Array,
streamId: string,
): Promise<{ ciphertext: Uint8Array; nonce: Uint8Array }> {
const aad = new TextEncoder().encode(`resume:${streamId}`);
return crypto.aesGcmEncrypt(deviceKey, streamSecret, aad);
}
/** Decrypt a streamSecret under a deviceKey. */
export async function unwrapStreamSecret(
crypto: CryptoProvider,
deviceKey: Uint8Array,
ciphertext: Uint8Array,
nonce: Uint8Array,
streamId: string,
): Promise<Uint8Array> {
const aad = new TextEncoder().encode(`resume:${streamId}`);
return crypto.aesGcmDecrypt(deviceKey, ciphertext, nonce, aad);
}

View File

@@ -0,0 +1,93 @@
/**
* Throughput tracking for an in-flight transfer.
*
* Smooths byte-rate via exponential moving average over the last `windowMs`
* window, recomputed on every progress sample. Returns `bytesPerSecond` and
* `etaSeconds` when total size is known.
*/
export interface ProgressSample {
bytesPerSecond: number;
etaSeconds: number | undefined;
percent: number | undefined;
}
export class ProgressTracker {
private bytesProcessed = 0;
private startedAt = -1;
private lastSampleAt = -1;
private lastBytes = 0;
/** EMA of byte rate (bytes per second). */
private rateEma = 0;
/** Smoothing factor in [0, 1]. Higher = more responsive, less smooth. */
private readonly alpha: number;
constructor(
private readonly bytesTotal: number | undefined,
/** Window over which EMA decay equals 1 - 1/e (~63 %). */
windowMs = 5000,
/** Sample period — sampling more often than this returns the cached EMA. */
private readonly sampleEveryMs = 250,
) {
// Convert window to per-sample alpha: alpha = 1 - exp(-dt/window)
// Approximate over `sampleEveryMs` per tick.
this.alpha = 1 - Math.exp(-this.sampleEveryMs / windowMs);
}
/** Record bytes processed since last call. */
add(bytes: number): void {
if (this.startedAt < 0) {
this.startedAt = nowMs();
this.lastSampleAt = this.startedAt;
}
this.bytesProcessed += bytes;
}
/** Total bytes accounted-for so far. */
get totalProcessed(): number {
return this.bytesProcessed;
}
/** Total elapsed wall-clock since first byte. */
get elapsedMs(): number {
if (this.startedAt < 0) return 0;
return nowMs() - this.startedAt;
}
/** Compute a smoothed sample. Cheap to call repeatedly. */
sample(): ProgressSample {
const now = nowMs();
if (this.startedAt < 0) {
return { bytesPerSecond: 0, etaSeconds: undefined, percent: this.percent() };
}
const dt = now - this.lastSampleAt;
if (dt >= this.sampleEveryMs) {
const deltaBytes = this.bytesProcessed - this.lastBytes;
const instantRate = dt > 0 ? (deltaBytes * 1000) / dt : 0;
this.rateEma =
this.rateEma === 0 ? instantRate : this.alpha * instantRate + (1 - this.alpha) * this.rateEma;
this.lastSampleAt = now;
this.lastBytes = this.bytesProcessed;
}
const bps = Math.max(0, this.rateEma);
let etaSeconds: number | undefined;
if (this.bytesTotal !== undefined && bps > 0) {
const remaining = Math.max(0, this.bytesTotal - this.bytesProcessed);
etaSeconds = remaining / bps;
} else if (this.bytesTotal !== undefined && this.bytesProcessed >= this.bytesTotal) {
etaSeconds = 0;
}
return { bytesPerSecond: bps, etaSeconds, percent: this.percent() };
}
private percent(): number | undefined {
if (this.bytesTotal === undefined) return undefined;
if (this.bytesTotal === 0) return 1;
return Math.min(1, this.bytesProcessed / this.bytesTotal);
}
}
function nowMs(): number {
return typeof performance !== 'undefined' ? performance.now() : Date.now();
}

View File

@@ -0,0 +1,166 @@
import type { TransferOutput } from '../types.js';
/**
* Generic chunk sink. The orchestrator writes plaintext chunks IN ORIGINAL
* BYTE ORDER, then calls `finalize()`. `toBytes()` is only valid for
* `'buffer'` sinks.
*/
export interface OutputSink {
write(chunk: Uint8Array): Promise<void>;
finalize(): Promise<void>;
abort(reason?: string): Promise<void>;
toBytes(): Uint8Array | null;
}
/**
* Adapter from a `TransferOutput` (consumer-supplied) to an `OutputSink`.
*
* The `'file'` sink is Bun-specific (uses `Bun.file(path).writer()`) — for
* Node use a `'pipe'` sink with an `fs.createWriteStream`.
*/
export async function resolveOutputSink(out: TransferOutput): Promise<OutputSink> {
switch (out.kind) {
case 'pipe':
return createPipeSink(out.pipeTo);
case 'callback':
return createCallbackSink(out.onChunk);
case 'buffer':
return createBufferSink();
case 'file':
return createBunFileSink(out.path);
case 'fileHandle':
return createFileHandleSink(out.handle);
}
}
function createPipeSink(stream: WritableStream<Uint8Array>): OutputSink {
const writer = stream.getWriter();
return {
async write(chunk) {
await writer.write(chunk);
},
async finalize() {
try {
await writer.close();
} finally {
writer.releaseLock?.();
}
},
async abort(reason) {
try {
await writer.abort(reason);
} catch {
/* swallow */
} finally {
writer.releaseLock?.();
}
},
toBytes() {
return null;
},
};
}
function createCallbackSink(
onChunk: (chunk: Uint8Array) => void | Promise<void>,
): OutputSink {
return {
async write(chunk) {
await onChunk(chunk);
},
async finalize() {
/* no-op */
},
async abort() {
/* no-op */
},
toBytes() {
return null;
},
};
}
function createBufferSink(): OutputSink {
const parts: Uint8Array[] = [];
let totalLen = 0;
return {
async write(chunk) {
parts.push(chunk);
totalLen += chunk.length;
},
async finalize() {
/* no-op */
},
async abort() {
parts.length = 0;
totalLen = 0;
},
toBytes() {
const out = new Uint8Array(totalLen);
let off = 0;
for (const p of parts) {
out.set(p, off);
off += p.length;
}
return out;
},
};
}
async function createBunFileSink(path: string): Promise<OutputSink> {
// Lazy import so non-Bun runtimes don't fail at import time.
const bunGlobal = (globalThis as unknown as { Bun?: { file: (p: string) => { writer: () => unknown } } })
.Bun;
if (bunGlobal === undefined) {
throw new Error("'file' output sink requires Bun runtime");
}
const writer = bunGlobal.file(path).writer() as {
write: (chunk: Uint8Array) => number | Promise<number>;
end: () => Promise<number> | number;
flush: () => Promise<number> | number;
};
return {
async write(chunk) {
await Promise.resolve(writer.write(chunk));
},
async finalize() {
await Promise.resolve(writer.flush());
await Promise.resolve(writer.end());
},
async abort() {
try {
await Promise.resolve(writer.end());
} catch {
/* swallow */
}
},
toBytes() {
return null;
},
};
}
async function createFileHandleSink(handle: {
createWritable(): Promise<unknown>;
}): Promise<OutputSink> {
const writable = (await handle.createWritable()) as {
write: (chunk: Uint8Array) => Promise<void>;
close: () => Promise<void>;
abort?: (reason?: unknown) => Promise<void>;
};
return {
async write(chunk) {
await writable.write(chunk);
},
async finalize() {
await writable.close();
},
async abort(reason) {
if (writable.abort) await writable.abort(reason);
else await writable.close();
},
toBytes() {
return null;
},
};
}

View File

@@ -0,0 +1,162 @@
import type { Hono, Context } from 'hono';
import type { TransferEngine } from '../engine.js';
import { TransferProtocolError } from '../errors.js';
/**
* Auth contract for incoming chunk POSTs / control GETs. Verifies the
* signature attached by the client's `TransferAuthenticator` and returns
* the resolved sender address.
*/
export interface TransferRouteAuthenticator {
verifyChunk(args: {
request: Request;
streamId: string;
laneId: number;
seq: bigint;
bodyHash: Uint8Array;
}): Promise<{ senderAddress: string }>;
verifyControl(args: {
request: Request;
streamId: string;
method: string;
path: string;
}): Promise<{ senderAddress: string }>;
}
export interface TransferRouteOptions {
/** Maximum chunk body size in bytes. Default 16 MiB. */
maxChunkBytes?: number;
/** Server-side authenticator. Defaults to a permissive one for trusted/local nets. */
authenticator?: TransferRouteAuthenticator;
}
/**
* Permissive authenticator: extracts sender address from the
* `X-Shade-Sender-Address` header without verification. Suitable ONLY for
* trusted/local network testing; the SDK's M-Stream-5 integration replaces
* this with an Ed25519-verifying implementation.
*/
export const PermissiveAuthenticator: TransferRouteAuthenticator = {
async verifyChunk({ request }) {
const senderAddress = request.headers.get('X-Shade-Sender-Address');
if (senderAddress === null || senderAddress === '') {
throw new TransferProtocolError('Missing X-Shade-Sender-Address header');
}
return { senderAddress };
},
async verifyControl({ request }) {
const senderAddress = request.headers.get('X-Shade-Sender-Address');
if (senderAddress === null || senderAddress === '') {
throw new TransferProtocolError('Missing X-Shade-Sender-Address header');
}
return { senderAddress };
},
};
const DEFAULT_MAX_CHUNK_BYTES = 16 * 1024 * 1024 + 1024;
/**
* Mount the transfer-receive routes on a Hono router. Returns the same
* Hono instance for fluent composition. The consumer mounts under any
* base path (e.g. `app.route('/shade', createTransferRoutes(engine))`).
*
* Hono is a peer-dep so non-server consumers can omit it.
*/
export async function createTransferRoutes(
engine: TransferEngine,
options: TransferRouteOptions = {},
): Promise<Hono> {
const { Hono: HonoCtor } = (await import('hono')) as { Hono: new () => Hono };
const app = new HonoCtor();
const auth = options.authenticator ?? PermissiveAuthenticator;
const maxBytes = options.maxChunkBytes ?? DEFAULT_MAX_CHUNK_BYTES;
app.get('/v1/transfer/health', (c) => c.json({ ok: true }));
app.post('/v1/transfer/:streamId/chunk', async (c) => {
const streamId = c.req.param('streamId');
const laneIdRaw = c.req.header('X-Shade-Lane-Id');
const seqRaw = c.req.header('X-Shade-Seq');
if (laneIdRaw === undefined || seqRaw === undefined) {
return c.json({ error: 'missing X-Shade-Lane-Id or X-Shade-Seq' }, 400);
}
const laneId = Number(laneIdRaw);
const seq = BigInt(seqRaw);
if (!Number.isInteger(laneId) || laneId < 0) {
return c.json({ error: 'invalid lane id' }, 400);
}
const contentLength = c.req.header('content-length');
if (contentLength !== undefined && Number(contentLength) > maxBytes) {
return c.json({ error: `chunk exceeds maxChunkBytes (${maxBytes})` }, 413);
}
const ab = await c.req.arrayBuffer();
if (ab.byteLength > maxBytes) {
return c.json({ error: `chunk exceeds maxChunkBytes (${maxBytes})` }, 413);
}
const body = new Uint8Array(ab);
const bodyHash = new Uint8Array(
await globalThis.crypto.subtle.digest('SHA-256', ab),
);
let senderAddress: string;
try {
const result = await auth.verifyChunk({
request: c.req.raw,
streamId,
laneId,
seq,
bodyHash,
});
senderAddress = result.senderAddress;
} catch (err) {
return errorResponse(c, err);
}
try {
const ack = await engine.receiveChunk(senderAddress, streamId, laneId, seq, body);
return c.json(ack);
} catch (err) {
return errorResponse(c, err);
}
});
app.get('/v1/transfer/:streamId/state', async (c) => {
const streamId = c.req.param('streamId');
let senderAddress: string;
try {
const result = await auth.verifyControl({
request: c.req.raw,
streamId,
method: 'GET',
path: c.req.path,
});
senderAddress = result.senderAddress;
} catch (err) {
return errorResponse(c, err);
}
const state = await engine.getResumeState(senderAddress, streamId);
if (state === null) return c.json({ error: 'no state' }, 404);
return c.json(state);
});
return app;
}
function errorResponse(c: Context, err: unknown): Response {
const message = err instanceof Error ? err.message : String(err);
const code =
err instanceof Error && (err as unknown as { code?: unknown }).code !== undefined
? String((err as unknown as { code: unknown }).code)
: 'UNKNOWN';
let status = 500;
if (code === 'SHADE_TRANSFER_PROTOCOL') status = 400;
if (code === 'SHADE_VALIDATION') status = 400;
if (code === 'SHADE_UNAUTHORIZED') status = 401;
if (code === 'SHADE_INVALID_SIGNATURE') status = 401;
if (code === 'SHADE_STREAM_REPLAY') status = 409;
if (code === 'SHADE_STREAM_OUT_OF_ORDER') status = 409;
return c.json({ error: message, code }, status as 400 | 401 | 409 | 500);
}

View File

@@ -0,0 +1,105 @@
import { TransferAbortError, TransferTransportError } from './errors.js';
export interface RetryPolicy {
maxAttempts: number;
/** Base delay in ms; doubled per attempt with jitter. */
baseDelayMs: number;
/** Hard cap on a single delay step. */
maxDelayMs: number;
/** Jitter factor in [0, 1); 0 = deterministic, 0.5 = ±25 % spread. */
jitter: number;
}
export const DEFAULT_RETRY: RetryPolicy = {
maxAttempts: 5,
baseDelayMs: 250,
maxDelayMs: 30_000,
jitter: 0.25,
};
export interface RetryContext {
attempt: number;
lastError: unknown;
willRetryInMs: number;
}
/**
* Run an idempotent operation with exponential backoff. Aborts immediately
* via the supplied signal. `onAttempt` runs BEFORE each retry sleep.
*
* The operation is treated as retryable unless it throws an
* `AbortError`/`TransferAbortError` or a non-network error wrapped in
* `TransferTransportError` with a 4xx status code (those are
* deterministic and retrying won't help).
*/
export async function withRetry<T>(
op: (attempt: number) => Promise<T>,
options?: {
policy?: RetryPolicy;
signal?: AbortSignal;
onAttempt?: (ctx: RetryContext) => void;
isRetryable?: (err: unknown) => boolean;
},
): Promise<T> {
const policy = options?.policy ?? DEFAULT_RETRY;
const signal = options?.signal;
const onAttempt = options?.onAttempt;
const isRetryable = options?.isRetryable ?? defaultIsRetryable;
let lastError: unknown;
for (let attempt = 1; attempt <= policy.maxAttempts; attempt++) {
if (signal?.aborted) throw new TransferAbortError('Aborted before attempt');
try {
return await op(attempt);
} catch (err) {
lastError = err;
if (signal?.aborted) throw new TransferAbortError('Aborted during attempt');
if (!isRetryable(err) || attempt === policy.maxAttempts) throw err;
const delay = computeDelay(attempt, policy);
onAttempt?.({ attempt, lastError: err, willRetryInMs: delay });
await sleep(delay, signal);
}
}
throw lastError ?? new Error('withRetry: unreachable');
}
function defaultIsRetryable(err: unknown): boolean {
if (err instanceof TransferAbortError) return false;
if (err instanceof TransferTransportError) {
if (err.statusCode === undefined) return true; // network-level failure
if (err.statusCode >= 500) return true;
if (err.statusCode === 408 || err.statusCode === 429) return true;
return false;
}
// Network errors / DOMException['AbortError']
if (typeof err === 'object' && err !== null) {
const name = (err as { name?: unknown }).name;
if (name === 'AbortError') return false;
if (name === 'TypeError') return true; // fetch network failure
}
return true;
}
function computeDelay(attempt: number, policy: RetryPolicy): number {
const base = Math.min(policy.maxDelayMs, policy.baseDelayMs * 2 ** (attempt - 1));
const half = base * policy.jitter * 0.5;
return Math.max(0, base - half + Math.random() * half * 2);
}
function sleep(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
if (signal?.aborted) {
reject(new TransferAbortError('Aborted'));
return;
}
const timer = setTimeout(() => {
signal?.removeEventListener('abort', onAbort);
resolve();
}, ms);
function onAbort() {
clearTimeout(timer);
reject(new TransferAbortError('Aborted'));
}
signal?.addEventListener('abort', onAbort, { once: true });
});
}

View File

@@ -0,0 +1,124 @@
import { ValidationError } from '@shade/core';
import type { TransferInput } from '../types.js';
export interface NormalizedInput {
/** Total plaintext bytes when known. `undefined` for indeterminate streams. */
size: number | undefined;
name?: string;
contentType?: string;
/** Read bytes [start, end). End is exclusive. Must be supported when `size` is known. */
read(start: number, end: number): Promise<Uint8Array>;
/** Sequential reader for unknown-size inputs. Returns `null` at end-of-stream. */
readNext(maxBytes: number): Promise<Uint8Array | null>;
/** Whether the source supports random `read(start, end)` access. */
readonly randomAccess: boolean;
/** Free any underlying resources (no-op when nothing to release). */
close(): Promise<void>;
}
/**
* Normalize any of the supported `TransferInput` shapes into a uniform
* reader. Determines size eagerly when possible.
*/
export async function normalizeInput(input: TransferInput): Promise<NormalizedInput> {
if (input instanceof Uint8Array) return fromUint8Array(input);
if (typeof Blob !== 'undefined' && input instanceof Blob) return fromBlob(input);
if (isReadableStream(input)) return fromReadableStream(input);
throw new ValidationError('Unsupported TransferInput', 'input');
}
function fromUint8Array(buf: Uint8Array): NormalizedInput {
return {
size: buf.length,
randomAccess: true,
async read(start, end) {
return buf.slice(start, end);
},
async readNext() {
throw new Error('readNext is not supported on random-access inputs');
},
async close() {
/* nothing to release */
},
};
}
function fromBlob(blob: Blob): NormalizedInput {
// Blob.slice / Blob.arrayBuffer are not in `lib: ["ES2022"]`. We cast
// through `unknown` to avoid pulling in the full DOM lib.
const blobLike = blob as unknown as {
size: number;
slice(start: number, end: number): { arrayBuffer(): Promise<ArrayBuffer> };
arrayBuffer(): Promise<ArrayBuffer>;
};
const out: NormalizedInput = {
size: blobLike.size,
randomAccess: true,
async read(start, end) {
const slice = blobLike.slice(start, end);
const ab = await slice.arrayBuffer();
return new Uint8Array(ab);
},
async readNext() {
throw new Error('readNext is not supported on random-access inputs');
},
async close() {
/* nothing to release */
},
};
// File extends Blob with name/type
const f = blob as Blob & { name?: string; type?: string };
if (typeof f.name === 'string') out.name = f.name;
if (typeof f.type === 'string' && f.type.length > 0) out.contentType = f.type;
return out;
}
function fromReadableStream(stream: ReadableStream<Uint8Array>): NormalizedInput {
const reader = stream.getReader();
let pending: Uint8Array = new Uint8Array(0);
let done = false;
return {
size: undefined,
randomAccess: false,
async read() {
throw new Error('Random access not supported for ReadableStream input');
},
async readNext(maxBytes) {
while (pending.length < maxBytes && !done) {
const r = await reader.read();
if (r.done) {
done = true;
break;
}
pending = concat(pending, r.value as Uint8Array);
}
if (pending.length === 0 && done) return null;
const out = pending.subarray(0, Math.min(maxBytes, pending.length));
pending = pending.subarray(out.length);
return out;
},
async close() {
try {
await reader.cancel();
} catch {
/* already closed */
}
},
};
}
function isReadableStream(x: unknown): x is ReadableStream<Uint8Array> {
return (
typeof x === 'object' &&
x !== null &&
typeof (x as { getReader?: unknown }).getReader === 'function'
);
}
function concat(a: Uint8Array, b: Uint8Array): Uint8Array {
const out = new Uint8Array(a.length + b.length);
out.set(a, 0);
out.set(b, a.length);
return out as Uint8Array;
}

View File

@@ -0,0 +1,88 @@
/**
* Bounded async FIFO. The producer awaits when the queue is full; the
* consumer awaits when the queue is empty. Used by the upload orchestrator
* to pace per-lane chunk dispatch without unbounded memory growth.
*/
export class BoundedAsyncQueue<T> {
private items: T[] = [];
private waiters: Array<(value: IteratorResult<T>) => void> = [];
private spaceWaiters: Array<() => void> = [];
private closed = false;
private aborted = false;
private abortReason: unknown = null;
constructor(private readonly capacity: number) {
if (capacity < 1) throw new Error(`capacity must be >= 1`);
}
/** Push an item. Awaits if the queue is at capacity. */
async push(item: T): Promise<void> {
if (this.aborted) throw this.abortReason;
if (this.closed) throw new Error('queue closed');
while (this.items.length >= this.capacity) {
await new Promise<void>((resolve) => this.spaceWaiters.push(resolve));
if (this.aborted) throw this.abortReason;
if (this.closed) throw new Error('queue closed mid-push');
}
this.items.push(item);
const waiter = this.waiters.shift();
if (waiter !== undefined) {
const next = this.items.shift()!;
waiter({ value: next, done: false });
this.notifySpace();
}
}
/** Returns next item or `{done: true}` when closed and drained. */
async next(): Promise<IteratorResult<T>> {
if (this.aborted) throw this.abortReason;
if (this.items.length > 0) {
const value = this.items.shift()!;
this.notifySpace();
return { value, done: false };
}
if (this.closed) return { value: undefined as never, done: true };
return new Promise<IteratorResult<T>>((resolve) => {
this.waiters.push(resolve);
});
}
/** Mark the queue as closed; subsequent `next()` calls drain remaining items. */
close(): void {
if (this.closed) return;
this.closed = true;
for (const waiter of this.waiters) {
waiter({ value: undefined as never, done: true });
}
this.waiters = [];
}
abort(reason: unknown): void {
if (this.aborted) return;
this.aborted = true;
this.abortReason = reason;
this.closed = true;
for (const w of this.spaceWaiters) w();
this.spaceWaiters = [];
for (const w of this.waiters) {
// Reject pending consumers via a thrown abort; consumers handle.
// We can't reject here because the next() returns IteratorResult, not a rejected promise.
// So instead, re-resolve as `done`; consumers check `aborted` separately.
w({ value: undefined as never, done: true });
}
this.waiters = [];
}
get size(): number {
return this.items.length;
}
get isClosed(): boolean {
return this.closed;
}
private notifySpace(): void {
const waiter = this.spaceWaiters.shift();
if (waiter !== undefined) waiter();
}
}

View File

@@ -0,0 +1,169 @@
import { TransferAbortError, TransferTransportError } from '../errors.js';
import { withRetry, type RetryPolicy } from '../retry.js';
import type {
ChunkAck,
ChunkSendOptions,
ITransferTransport,
TransferResumeState,
} from './transport.js';
/**
* Resolves the base URL for a given peer address.
*
* In M-Stream-5 the SDK wires this to the prekey-server's directory or to a
* peer-supplied `transfer.baseUrl` field on the prekey bundle. For
* standalone use, the caller can pass a static map.
*/
export type PeerBaseUrlResolver = (peerAddress: string) => Promise<string>;
/**
* Authentication hook. Adds outgoing request headers and produces a
* "request signature" that the receiver verifies. The default is a no-op
* authenticator suitable for trusted/local networks; for production use the
* SDK provides an Ed25519-signing implementation in M-Stream-5.
*/
export interface TransferAuthenticator {
signChunk(args: {
streamId: string;
laneId: number;
seq: bigint;
bodyHash: Uint8Array;
}): Promise<Record<string, string>>;
signControl(args: { streamId: string; method: string; path: string }): Promise<
Record<string, string>
>;
}
export const NoopAuthenticator: TransferAuthenticator = {
async signChunk() {
return {};
},
async signControl() {
return {};
},
};
export interface ShadeTransferHttpTransportOptions {
/** Resolves the peer's HTTP base URL (e.g., `https://server.example.com/shade`). */
resolveBaseUrl: PeerBaseUrlResolver;
/** Optional authenticator (defaults to no-op). */
authenticator?: TransferAuthenticator;
/** Override `fetch` (e.g., for tests). Defaults to `globalThis.fetch`. */
fetch?: typeof fetch;
/** Retry policy for chunk POSTs. */
retryPolicy?: RetryPolicy;
}
/**
* HTTP-based chunk transport. POSTs each `0x11` envelope to
* `<base>/v1/transfer/:streamId/chunk` and parses the JSON ACK response.
*
* Mounting on the receiver side: `createTransferRoutes(engine, options)`.
*/
export class ShadeTransferHttpTransport implements ITransferTransport {
private readonly fetchFn: typeof fetch;
private readonly auth: TransferAuthenticator;
constructor(private readonly options: ShadeTransferHttpTransportOptions) {
this.fetchFn = options.fetch ?? globalThis.fetch;
this.auth = options.authenticator ?? NoopAuthenticator;
}
async probe(peerAddress: string): Promise<void> {
const base = await this.options.resolveBaseUrl(peerAddress);
const url = `${stripTrailingSlash(base)}/v1/transfer/health`;
let res: Response;
try {
res = await this.fetchFn(url, { method: 'GET' });
} catch (err) {
throw new TransferTransportError(`probe failed: ${(err as Error).message}`);
}
if (!res.ok) {
throw new TransferTransportError(`probe failed: HTTP ${res.status}`, res.status);
}
}
async sendChunk(
peerAddress: string,
streamId: string,
laneId: number,
seq: number | bigint,
bytes: Uint8Array,
options?: ChunkSendOptions,
): Promise<ChunkAck> {
const base = await this.options.resolveBaseUrl(peerAddress);
const url = `${stripTrailingSlash(base)}/v1/transfer/${encodeURIComponent(streamId)}/chunk`;
const seqBig = typeof seq === 'bigint' ? seq : BigInt(seq);
const bodyHash = await sha256(bytes);
const headers: Record<string, string> = {
'Content-Type': 'application/octet-stream',
'X-Shade-Lane-Id': String(laneId),
'X-Shade-Seq': seqBig.toString(),
};
Object.assign(
headers,
await this.auth.signChunk({ streamId, laneId, seq: seqBig, bodyHash }),
);
const policy = this.options.retryPolicy;
return withRetry(
async () => {
let res: Response;
try {
res = await this.fetchFn(url, {
method: 'POST',
headers,
// BodyInit isn't in lib:["ES2022"]; cast through unknown.
body: bytes as unknown as never,
...(options?.signal !== undefined ? { signal: options.signal } : {}),
});
} catch (err) {
if ((err as { name?: string }).name === 'AbortError') {
throw new TransferAbortError('signal aborted');
}
throw new TransferTransportError(`sendChunk failed: ${(err as Error).message}`);
}
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new TransferTransportError(
`sendChunk failed: HTTP ${res.status} ${text}`,
res.status,
);
}
const body = (await res.json()) as { lastSeq: number; bytesReceived?: number };
return {
lastSeq: body.lastSeq,
...(body.bytesReceived !== undefined ? { bytesReceived: body.bytesReceived } : {}),
};
},
{
...(policy !== undefined ? { policy } : {}),
...(options?.signal !== undefined ? { signal: options.signal } : {}),
},
);
}
async fetchResumeState(
peerAddress: string,
streamId: string,
): Promise<TransferResumeState | null> {
const base = await this.options.resolveBaseUrl(peerAddress);
const url = `${stripTrailingSlash(base)}/v1/transfer/${encodeURIComponent(streamId)}/state`;
const headers = await this.auth.signControl({ streamId, method: 'GET', path: url });
const res = await this.fetchFn(url, { method: 'GET', headers });
if (res.status === 404) return null;
if (!res.ok) {
throw new TransferTransportError(`fetchResumeState failed: HTTP ${res.status}`, res.status);
}
return (await res.json()) as TransferResumeState;
}
}
function stripTrailingSlash(s: string): string {
return s.endsWith('/') ? s.slice(0, -1) : s;
}
async function sha256(data: Uint8Array): Promise<Uint8Array> {
const buf = await globalThis.crypto.subtle.digest('SHA-256', data as unknown as ArrayBuffer);
return new Uint8Array(buf);
}

View File

@@ -0,0 +1,163 @@
import { TransferTransportError } from '../errors.js';
import type {
ChunkAck,
ChunkSendOptions,
IControlChannel,
ITransferTransport,
TransferResumeState,
} from './transport.js';
import type { StreamControlMessage } from '@shade/streams';
/**
* In-process control channel useful for tests and same-process upload/
* download (e.g., a CLI that uploads to its own embedded server).
*
* Two endpoints are paired via {@link MemoryControlChannel.linked}: a
* `send()` on one delivers to the other's subscribers.
*/
export class MemoryControlChannel implements IControlChannel {
private peer: MemoryControlChannel | null = null;
private myAddress: string;
private handlers = new Set<
(from: string, message: StreamControlMessage) => void | Promise<void>
>();
private constructor(address: string) {
this.myAddress = address;
}
static linked(addressA: string, addressB: string): {
a: MemoryControlChannel;
b: MemoryControlChannel;
} {
const a = new MemoryControlChannel(addressA);
const b = new MemoryControlChannel(addressB);
a.peer = b;
b.peer = a;
return { a, b };
}
async send(peerAddress: string, message: StreamControlMessage): Promise<void> {
if (this.peer === null) {
throw new TransferTransportError('MemoryControlChannel: not linked');
}
if (peerAddress !== this.peer.myAddress) {
throw new TransferTransportError(
`MemoryControlChannel: peer mismatch (expected ${this.peer.myAddress}, got ${peerAddress})`,
);
}
const target = this.peer;
const from = this.myAddress;
// Awaiting handler completion is critical: callers (the engine) rely on
// stream-init being processed BEFORE chunk POSTs go out. Real transports
// (HTTP/WS) await delivery similarly.
for (const handler of [...target.handlers]) {
try {
await handler(from, message);
} catch (err) {
console.error('[MemoryControlChannel] handler error:', err);
throw err;
}
}
}
onMessage(
handler: (from: string, message: StreamControlMessage) => void | Promise<void>,
): () => void {
this.handlers.add(handler);
return () => this.handlers.delete(handler);
}
}
/**
* In-process chunk transport. Routes 0x11 envelopes directly to a paired
* receiver-engine via a registered hook. Used in tests and same-process
* usage to avoid an HTTP roundtrip.
*/
export class MemoryTransferTransport implements ITransferTransport {
private peer: MemoryTransferTransport | null = null;
private chunkHandler:
| ((from: string, streamId: string, laneId: number, seq: bigint, bytes: Uint8Array) => Promise<ChunkAck>)
| null = null;
private resumeProvider:
| ((from: string, streamId: string) => Promise<TransferResumeState | null>)
| null = null;
private myAddress: string;
private constructor(address: string) {
this.myAddress = address;
}
static linked(addressA: string, addressB: string): {
a: MemoryTransferTransport;
b: MemoryTransferTransport;
} {
const a = new MemoryTransferTransport(addressA);
const b = new MemoryTransferTransport(addressB);
a.peer = b;
b.peer = a;
return { a, b };
}
/** Receiver-side: register the handler invoked when a chunk arrives. */
setChunkHandler(
handler: (
from: string,
streamId: string,
laneId: number,
seq: bigint,
bytes: Uint8Array,
) => Promise<ChunkAck>,
): void {
this.chunkHandler = handler;
}
/** Receiver-side: register the resume-state lookup. */
setResumeProvider(
provider: (from: string, streamId: string) => Promise<TransferResumeState | null>,
): void {
this.resumeProvider = provider;
}
async probe(peerAddress: string): Promise<void> {
if (this.peer === null || peerAddress !== this.peer.myAddress) {
throw new TransferTransportError(`Memory peer ${peerAddress} not reachable`);
}
}
async sendChunk(
peerAddress: string,
streamId: string,
laneId: number,
seq: number | bigint,
bytes: Uint8Array,
_options?: ChunkSendOptions,
): Promise<ChunkAck> {
if (this.peer === null) throw new TransferTransportError('Not linked');
if (peerAddress !== this.peer.myAddress) {
throw new TransferTransportError(`Peer mismatch: ${peerAddress} vs ${this.peer.myAddress}`);
}
if (this.peer.chunkHandler === null) {
throw new TransferTransportError('Peer has no chunk handler');
}
return this.peer.chunkHandler(
this.myAddress,
streamId,
laneId,
typeof seq === 'bigint' ? seq : BigInt(seq),
bytes,
);
}
async fetchResumeState(
peerAddress: string,
streamId: string,
): Promise<TransferResumeState | null> {
if (this.peer === null) throw new TransferTransportError('Not linked');
if (peerAddress !== this.peer.myAddress) {
throw new TransferTransportError(`Peer mismatch: ${peerAddress} vs ${this.peer.myAddress}`);
}
if (this.peer.resumeProvider === null) return null;
return this.peer.resumeProvider(this.myAddress, streamId);
}
}

View File

@@ -0,0 +1,117 @@
/**
* Transport contracts for @shade/transfer.
*
* The library splits its wire concerns in two:
*
* - {@link IControlChannel} — carries the FOUR control messages
* (stream-init, stream-finish, stream-abort, stream-resume-*) as
* Double-Ratchet-encrypted plaintext over the application's existing
* Shade transport. The `@shade/sdk` integration (M-Stream-5) provides a
* `ShadeControlChannel` that wraps `Shade.send`/`Shade.receive`.
*
* - {@link ITransferTransport} — carries the encrypted CHUNK envelopes
* (wire type 0x11) directly between sender and receiver. This is where
* parallel-lane throughput happens; the default implementation is HTTP
* POST to a `transferRoute()`-mounted server, with optional WS upgrade.
*
* Splitting along this seam lets resume/probe/health concerns live in the
* chunk transport without leaking into the SDK's per-message ratchet API,
* and lets test code swap either side independently.
*/
import type {
StreamControlMessage,
StreamInitMessage,
} from '@shade/streams';
/** Outcome of a successful chunk POST. */
export interface ChunkAck {
/** Last sequence number the receiver has confirmed for this lane. */
lastSeq: number;
/** Receiver-side observed bytes-received. Useful for sanity checking. */
bytesReceived?: number;
}
export interface ChunkSendOptions {
/** Caller-side request abort signal. */
signal?: AbortSignal;
}
/**
* Chunk transport contract.
*
* Implementations:
* - `ShadeTransferHttpTransport` — POST per chunk, signed via Ed25519.
* - `ShadeTransferWsTransport` — WebSocket framing (M-Stream-7).
* - `MemoryTransferTransport` — in-process pipe for tests.
*/
export interface ITransferTransport {
/** Probe peer reachability. Throws on unreachable. */
probe(peerAddress: string): Promise<void>;
/**
* Ship a single encoded `0x11` envelope. Returns when the receiver has
* accepted it (and updated its persisted state, if any). Idempotent on
* (streamId, laneId, seq) — replays return the same `ChunkAck`.
*/
sendChunk(
peerAddress: string,
streamId: string,
laneId: number,
seq: number | bigint,
bytes: Uint8Array,
options?: ChunkSendOptions,
): Promise<ChunkAck>;
/**
* Ask the peer for its resume state for a given streamId. Returns
* `null` when the peer has no record (sender then restarts from seq 0
* across all lanes — chunks are deterministic).
*/
fetchResumeState(
peerAddress: string,
streamId: string,
): Promise<TransferResumeState | null>;
}
/**
* Receiver-reported resume state. `lastSeqAcked = -1` means no chunk for
* that lane has been received.
*/
export interface TransferResumeState {
streamId: string;
lanes: Array<{ laneId: number; lastSeqAcked: number }>;
}
/**
* Control-channel contract.
*
* Implementations layered on top of `Shade.send`/`Shade.receive` (M-Stream-5
* `ShadeControlChannel`) or in-memory channels (tests).
*/
export interface IControlChannel {
/** Send a plaintext control message to a peer. Returns when delivered. */
send(peerAddress: string, message: StreamControlMessage): Promise<void>;
/**
* Subscribe to incoming control messages for THIS endpoint. The handler
* runs once per incoming message; multiple subscriptions are allowed.
* Returns an unsubscribe function.
*/
onMessage(
handler: (
from: string,
message: StreamControlMessage,
) => void | Promise<void>,
): () => void;
}
/**
* Sender-side handle returned by `IControlChannel.send` for `stream-init`.
*
* Empty in v0.2.0 (the orchestrator tracks state directly), kept as a
* forward-compatible extension point.
*/
export interface InitDispatch {
init: StreamInitMessage;
}

View File

@@ -0,0 +1,331 @@
import { TransferAbortError, TransferTransportError } from '../errors.js';
import type {
ChunkAck,
ChunkSendOptions,
ITransferTransport,
TransferResumeState,
} from './transport.js';
import type { TransferAuthenticator } from './http-transport.js';
import { NoopAuthenticator } from './http-transport.js';
/**
* WebSocket-based chunk transport (opt-in).
*
* One connection per peer, multiplexed across all in-flight streams. Each
* outgoing chunk is sent as a binary frame; the server replies with a JSON
* ACK keyed by an internal request id. Failed connections trigger
* `TransferTransportError`; the wrapping `FallbackTransferTransport` swaps
* to HTTP.
*
* Wire format (client→server):
* 1 byte type (0x01 = chunk, 0x02 = resume-state-query)
* 16 bytes requestId (opaque)
* payload (chunk: streamId(16) + laneId(4 BE) + seq(8 BE) + envelopeBytes;
* resume-query: streamId(16))
*
* Wire format (server→client):
* 1 byte type (0x81 = chunk-ack, 0x82 = resume-state, 0xFE = error)
* 16 bytes requestId (echoes the request)
* payload (chunk-ack: u32 lastSeq + u32 bytesReceived;
* resume-state: JSON bytes;
* error: JSON bytes)
*/
export interface ShadeTransferWsTransportOptions {
resolveWsUrl: (peerAddress: string) => Promise<string>;
authenticator?: TransferAuthenticator;
/** WebSocket constructor override (browsers vs node). */
WebSocketCtor?: typeof WebSocket;
/** Connect timeout in ms. */
connectTimeoutMs?: number;
}
const TYPE_CHUNK = 0x01;
const TYPE_RESUME_QUERY = 0x02;
const TYPE_CHUNK_ACK = 0x81;
const TYPE_RESUME_STATE = 0x82;
const TYPE_ERROR = 0xfe;
export class ShadeTransferWsTransport implements ITransferTransport {
private readonly auth: TransferAuthenticator;
private readonly WebSocketCtor: typeof WebSocket;
private readonly connectTimeoutMs: number;
private readonly connections = new Map<string, Promise<WebSocket>>();
constructor(private readonly options: ShadeTransferWsTransportOptions) {
this.auth = options.authenticator ?? NoopAuthenticator;
const ctor =
options.WebSocketCtor ??
((globalThis as unknown as { WebSocket?: typeof WebSocket }).WebSocket);
if (ctor === undefined) {
throw new TransferTransportError('WebSocket constructor not available');
}
this.WebSocketCtor = ctor;
this.connectTimeoutMs = options.connectTimeoutMs ?? 5000;
}
async probe(peerAddress: string): Promise<void> {
// Probe by opening a connection; close immediately on success.
const ws = await this.connect(peerAddress);
void ws;
}
async sendChunk(
peerAddress: string,
streamId: string,
laneId: number,
seq: number | bigint,
bytes: Uint8Array,
options?: ChunkSendOptions,
): Promise<ChunkAck> {
const ws = await this.connect(peerAddress);
const requestId = randomBytes(16);
const seqBig = typeof seq === 'bigint' ? seq : BigInt(seq);
const sidBytes = base64UrlToBytes(streamId);
if (sidBytes.length !== 16) {
throw new TransferTransportError(`streamId must decode to 16 bytes`);
}
const header = new Uint8Array(1 + 16 + 16 + 4 + 8);
header[0] = TYPE_CHUNK;
header.set(requestId, 1);
header.set(sidBytes, 17);
new DataView(header.buffer).setUint32(33, laneId, false);
new DataView(header.buffer).setBigUint64(37, seqBig, false);
const frame = new Uint8Array(header.length + bytes.length);
frame.set(header, 0);
frame.set(bytes, header.length);
// Sign chunk for auth — included in WS-level metadata frame would be
// ideal, but for v0.2.0 we just call the authenticator and discard,
// matching the HTTP path's contract. Real auth lands in 0.3.
void (await this.auth.signChunk({
streamId,
laneId,
seq: seqBig,
bodyHash: await sha256(bytes),
}));
const response = await this.request(ws, requestId, frame, options?.signal);
const view = new DataView(response);
const type = view.getUint8(0);
if (type === TYPE_ERROR) {
const text = new TextDecoder().decode(new Uint8Array(response, 17));
throw new TransferTransportError(`WS sendChunk error: ${text}`);
}
if (type !== TYPE_CHUNK_ACK) {
throw new TransferTransportError(`unexpected WS response type 0x${type.toString(16)}`);
}
const lastSeq = view.getUint32(17, false);
const bytesReceived = view.getUint32(21, false);
return { lastSeq, bytesReceived };
}
async fetchResumeState(
peerAddress: string,
streamId: string,
): Promise<TransferResumeState | null> {
const ws = await this.connect(peerAddress);
const requestId = randomBytes(16);
const sidBytes = base64UrlToBytes(streamId);
const frame = new Uint8Array(1 + 16 + 16);
frame[0] = TYPE_RESUME_QUERY;
frame.set(requestId, 1);
frame.set(sidBytes, 17);
const response = await this.request(ws, requestId, frame);
const view = new DataView(response);
const type = view.getUint8(0);
if (type === TYPE_ERROR) {
const text = new TextDecoder().decode(new Uint8Array(response, 17));
if (text.includes('not found')) return null;
throw new TransferTransportError(`WS fetchResumeState error: ${text}`);
}
if (type !== TYPE_RESUME_STATE) {
throw new TransferTransportError(`unexpected WS response type 0x${type.toString(16)}`);
}
const json = new TextDecoder().decode(new Uint8Array(response, 17));
return JSON.parse(json) as TransferResumeState;
}
/** Force-close all connections (call from `engine.destroy()`). */
closeAll(): void {
for (const conn of this.connections.values()) {
void conn.then((ws) => ws.close()).catch(() => {});
}
this.connections.clear();
}
private connect(peerAddress: string): Promise<WebSocket> {
let conn = this.connections.get(peerAddress);
if (conn === undefined) {
conn = this.openConnection(peerAddress);
this.connections.set(peerAddress, conn);
conn.catch(() => {
this.connections.delete(peerAddress);
});
}
return conn;
}
private async openConnection(peerAddress: string): Promise<WebSocket> {
const url = await this.options.resolveWsUrl(peerAddress);
const ws = new this.WebSocketCtor(url);
ws.binaryType = 'arraybuffer';
return new Promise<WebSocket>((resolve, reject) => {
const timer = setTimeout(() => {
ws.close();
reject(new TransferTransportError(`WS connect timeout to ${url}`));
}, this.connectTimeoutMs);
ws.addEventListener('open', () => {
clearTimeout(timer);
resolve(ws);
});
ws.addEventListener('error', () => {
clearTimeout(timer);
reject(new TransferTransportError(`WS connect failed to ${url}`));
});
ws.addEventListener('close', () => {
this.connections.delete(peerAddress);
});
});
}
private async request(
ws: WebSocket,
requestId: Uint8Array,
frame: Uint8Array,
signal?: AbortSignal,
): Promise<ArrayBuffer> {
if (signal?.aborted) throw new TransferAbortError('aborted');
return new Promise<ArrayBuffer>((resolve, reject) => {
const onMsg = (ev: MessageEvent): void => {
if (!(ev.data instanceof ArrayBuffer)) return;
const buf = ev.data;
if (buf.byteLength < 17) return;
const idView = new Uint8Array(buf, 1, 16);
if (!byteEqual(idView, requestId)) return;
cleanup();
resolve(buf);
};
const onErr = (): void => {
cleanup();
reject(new TransferTransportError('WS errored mid-request'));
};
const onClose = (): void => {
cleanup();
reject(new TransferTransportError('WS closed mid-request'));
};
const onAbort = (): void => {
cleanup();
reject(new TransferAbortError('aborted'));
};
function cleanup() {
ws.removeEventListener('message', onMsg);
ws.removeEventListener('error', onErr);
ws.removeEventListener('close', onClose);
signal?.removeEventListener('abort', onAbort);
}
ws.addEventListener('message', onMsg);
ws.addEventListener('error', onErr);
ws.addEventListener('close', onClose);
signal?.addEventListener('abort', onAbort, { once: true });
ws.send(frame as unknown as ArrayBuffer);
});
}
}
/**
* Tries the primary transport first, falls back to secondary on
* `TransferTransportError`. Does NOT retry per-chunk between transports —
* the engine's retry layer handles that — but switches sticky after the
* first failure so subsequent calls go to the fallback.
*/
export class FallbackTransferTransport implements ITransferTransport {
private failed = false;
constructor(
private readonly primary: ITransferTransport,
private readonly fallback: ITransferTransport,
) {}
private active(): ITransferTransport {
return this.failed ? this.fallback : this.primary;
}
async probe(peerAddress: string): Promise<void> {
if (this.failed) return this.fallback.probe(peerAddress);
try {
await this.primary.probe(peerAddress);
} catch (err) {
if (err instanceof TransferTransportError) {
this.failed = true;
return this.fallback.probe(peerAddress);
}
throw err;
}
}
async sendChunk(
peerAddress: string,
streamId: string,
laneId: number,
seq: number | bigint,
bytes: Uint8Array,
options?: ChunkSendOptions,
): Promise<ChunkAck> {
if (this.failed) return this.fallback.sendChunk(peerAddress, streamId, laneId, seq, bytes, options);
try {
return await this.primary.sendChunk(peerAddress, streamId, laneId, seq, bytes, options);
} catch (err) {
if (err instanceof TransferTransportError) {
this.failed = true;
return this.fallback.sendChunk(peerAddress, streamId, laneId, seq, bytes, options);
}
throw err;
}
}
async fetchResumeState(
peerAddress: string,
streamId: string,
): Promise<TransferResumeState | null> {
return this.active().fetchResumeState(peerAddress, streamId);
}
/** True if the primary failed and we've fallen back. */
get fellBack(): boolean {
return this.failed;
}
}
// ─── Helpers ─────────────────────────────────────────────────
function randomBytes(n: number): Uint8Array {
const b = new Uint8Array(n);
globalThis.crypto.getRandomValues(b);
return b;
}
async function sha256(data: Uint8Array): Promise<Uint8Array> {
const buf = await globalThis.crypto.subtle.digest('SHA-256', data as unknown as ArrayBuffer);
return new Uint8Array(buf);
}
function byteEqual(a: Uint8Array, b: Uint8Array): boolean {
if (a.length !== b.length) return false;
for (let i = 0; i < a.length; i++) {
if (a[i] !== b[i]) return false;
}
return true;
}
function base64UrlToBytes(s: string): Uint8Array {
const padded = s.replace(/-/g, '+').replace(/_/g, '/');
const pad = padded.length % 4 === 0 ? '' : '='.repeat(4 - (padded.length % 4));
const bin = atob(padded + pad);
const out = new Uint8Array(bin.length);
for (let i = 0; i < bin.length; i++) out[i] = bin.charCodeAt(i);
return out;
}

View File

@@ -0,0 +1,139 @@
import type { StreamMetadata, LaneInitSpec } from '@shade/streams';
/**
* Browser File System Access API handle. Declared locally as an opaque
* type so this package builds without `lib: ["DOM"]`. Consumers in browsers
* pass a real `FileSystemFileHandle`; the runtime structural shape is
* checked at use-sites.
*/
export interface FileSystemFileHandleLike {
readonly kind: 'file';
readonly name: string;
createWritable(): Promise<unknown>;
}
/** Default per-chunk plaintext size (bytes). */
export const DEFAULT_CHUNK_SIZE = 1024 * 1024; // 1 MiB
/** Hard cap on per-chunk plaintext size. */
export const DEFAULT_MAX_CHUNK_SIZE = 16 * 1024 * 1024; // 16 MiB
/** Default lane count for parallel transfers. */
export const DEFAULT_LANE_COUNT = 4;
/**
* Anything we accept as the byte source for an upload. The orchestrator
* normalizes these (see `sender/input.ts`) into a chunk reader.
*/
export type TransferInput =
| Uint8Array
| Blob
| File
| ReadableStream<Uint8Array>;
/** Sink kinds for an incoming transfer. The receiver picks ONE. */
export type TransferOutput =
| { kind: 'pipe'; pipeTo: WritableStream<Uint8Array> }
| { kind: 'callback'; onChunk: (chunk: Uint8Array) => void | Promise<void> }
| { kind: 'buffer' }
| { kind: 'file'; path: string }
| { kind: 'fileHandle'; handle: FileSystemFileHandleLike };
export type TransferDirection = 'send' | 'receive';
export type TransferStatus =
| 'pending'
| 'active'
| 'paused'
| 'finished'
| 'aborted'
| 'failed';
export interface LaneProgress {
laneId: number;
bytesSent: number;
bytesTotal?: number;
seq: number;
state: 'idle' | 'sending' | 'paused' | 'done' | 'error';
}
export interface TransferProgress {
streamId: string;
bytesSent: number;
bytesTotal?: number;
/** EMA-smoothed throughput in bytes/sec. */
bytesPerSecond: number;
etaSeconds?: number;
/** [0, 1] when bytesTotal is known; undefined otherwise. */
percent?: number;
lanes: LaneProgress[];
}
export type TransferEvent =
| { type: 'start'; streamId: string }
| { type: 'progress'; progress: TransferProgress }
| { type: 'lane-done'; laneId: number }
| { type: 'pause'; reason: 'network' | 'manual' }
| { type: 'resume' }
| { type: 'complete'; streamId: string; sha256: string; durationMs: number }
| { type: 'abort'; reason: string }
| { type: 'error'; error: unknown };
export interface TransferResult {
streamId: string;
bytesSent: number;
/** Hex-encoded sha256 over the original plaintext. */
sha256: string;
durationMs: number;
}
export interface TransferOptions {
to: string;
input: TransferInput;
metadata?: Partial<StreamMetadata>;
/** Lane count override; capped at 64 internally. */
lanes?: number;
chunkSize?: number;
maxChunkSize?: number;
partition?: 'auto' | 'range' | 'round-robin';
signal?: AbortSignal;
onProgress?: (p: TransferProgress) => void;
onEvent?: (e: TransferEvent) => void;
}
export interface TransferHandle {
readonly streamId: string;
readonly events: AsyncIterable<TransferEvent>;
pause(): Promise<void>;
resume(): Promise<void>;
abort(reason?: string): Promise<void>;
done(): Promise<TransferResult>;
}
export interface IncomingTransferAcceptOptions {
output: TransferOutput;
onProgress?: (p: TransferProgress) => void;
onEvent?: (e: TransferEvent) => void;
}
export interface IncomingTransfer {
readonly streamId: string;
readonly from: string;
readonly metadata: StreamMetadata;
readonly lanes: LaneInitSpec[];
/** Accept and start receiving. Throws if already accepted/declined. */
accept(options: IncomingTransferAcceptOptions): Promise<TransferHandle>;
/** Reject; sends abort to sender. */
decline(reason?: string): Promise<void>;
}
export interface TransferSummary {
streamId: string;
direction: TransferDirection;
peerAddress: string;
status: TransferStatus;
bytesTotal?: number;
bytesProcessed: number;
createdAt: number;
updatedAt: number;
metadata: StreamMetadata | null;
}

View File

@@ -0,0 +1,204 @@
import { describe, test, expect, afterAll } from 'bun:test';
import { SubtleCryptoProvider } from '@shade/crypto-web';
import { sha256Once } from '@shade/streams';
import {
TransferEngine,
MemoryControlChannel,
ShadeTransferHttpTransport,
createTransferRoutes,
type TransferHandle,
type TransferResult,
} from '../src/index.js';
const crypto = new SubtleCryptoProvider();
function hex(bytes: Uint8Array): string {
return Array.from(bytes, (b) => b.toString(16).padStart(2, '0')).join('');
}
interface IntegrationServers {
baseUrl: string;
senderEngine: TransferEngine;
receiverEngine: TransferEngine;
serverHandle: { stop: () => void };
}
const cleanups: Array<() => void> = [];
afterAll(() => {
for (const c of cleanups) c();
});
async function setup(): Promise<IntegrationServers> {
const { a: ctrlA, b: ctrlB } = MemoryControlChannel.linked('alice', 'bob');
const receiverEngine = new TransferEngine({
crypto,
controlChannel: ctrlB,
transport: {
// Receiver-side transport is unused for outgoing operations; we only
// route incoming chunks via the Hono server.
probe: async () => undefined,
sendChunk: async () => {
throw new Error('receiver-side sendChunk should not be called');
},
fetchResumeState: async () => null,
},
myAddress: 'bob',
});
// Spin up a Hono+Bun server that routes chunks into receiverEngine.
const app = await createTransferRoutes(receiverEngine);
// Bun's `Bun.serve` accepts a Hono app's `fetch` handler.
const bunGlobal = (globalThis as unknown as { Bun?: { serve: (opts: unknown) => { url: URL; stop: () => void } } }).Bun;
if (bunGlobal === undefined) throw new Error('Bun runtime required for this test');
const server = bunGlobal.serve({
port: 0,
fetch: (req: Request) => app.fetch(req),
});
const baseUrl = server.url.toString().replace(/\/$/, '');
const senderEngine = new TransferEngine({
crypto,
controlChannel: ctrlA,
transport: new ShadeTransferHttpTransport({
resolveBaseUrl: async () => baseUrl,
authenticator: {
signChunk: async () => ({ 'X-Shade-Sender-Address': 'alice' }),
signControl: async () => ({ 'X-Shade-Sender-Address': 'alice' }),
},
}),
myAddress: 'alice',
});
cleanups.push(() => {
server.stop();
senderEngine.destroy();
receiverEngine.destroy();
});
return { baseUrl, senderEngine, receiverEngine, serverHandle: server };
}
async function uploadRoundtrip(
servers: IntegrationServers,
input: Uint8Array,
opts?: { lanes?: number; chunkSize?: number; partition?: 'auto' | 'range' | 'round-robin' },
): Promise<{ senderResult: TransferResult; received: Uint8Array }> {
let resolveRecv!: (h: TransferHandle) => void;
const recvHandlePromise = new Promise<TransferHandle>((r) => {
resolveRecv = r;
});
const unsubscribe = servers.receiverEngine.onIncomingTransfer(async (incoming) => {
const h = await incoming.accept({ output: { kind: 'buffer' } });
resolveRecv(h);
});
const handle = await servers.senderEngine.upload({
to: 'bob',
input,
...(opts?.lanes !== undefined ? { lanes: opts.lanes } : {}),
...(opts?.chunkSize !== undefined ? { chunkSize: opts.chunkSize } : {}),
...(opts?.partition !== undefined ? { partition: opts.partition } : {}),
metadata: { name: 'http-test.bin' },
});
const recvHandle = await recvHandlePromise;
const [senderResult, recvResult] = await Promise.all([handle.done(), recvHandle.done()]);
unsubscribe();
const received =
(recvResult as TransferResult & { bytes?: Uint8Array }).bytes ?? new Uint8Array();
return { senderResult, received };
}
describe('HTTP transport — Bun.serve loopback', () => {
test('100 KiB / 1 lane', async () => {
const servers = await setup();
const input = crypto.randomBytes(100 * 1024);
const { senderResult, received } = await uploadRoundtrip(servers, input, {
lanes: 1,
chunkSize: 32 * 1024,
});
expect(received).toEqual(input);
expect(senderResult.sha256).toBe(hex(sha256Once(input)));
expect(senderResult.bytesSent).toBe(input.length);
});
test('1 MiB / 4 lanes range', async () => {
const servers = await setup();
const input = crypto.randomBytes(1024 * 1024);
const { senderResult, received } = await uploadRoundtrip(servers, input, {
lanes: 4,
chunkSize: 64 * 1024,
partition: 'range',
});
expect(received).toEqual(input);
expect(senderResult.sha256).toBe(hex(sha256Once(input)));
});
test('1 MiB / 4 lanes round-robin', async () => {
const servers = await setup();
const input = crypto.randomBytes(1024 * 1024);
const { senderResult, received } = await uploadRoundtrip(servers, input, {
lanes: 4,
chunkSize: 64 * 1024,
partition: 'round-robin',
});
expect(received).toEqual(input);
expect(senderResult.sha256).toBe(hex(sha256Once(input)));
});
test(
'8 MiB / 4 lanes range (ship-gate proxy for 100 MB)',
async () => {
const servers = await setup();
const input = crypto.randomBytes(8 * 1024 * 1024);
const { senderResult, received } = await uploadRoundtrip(servers, input, {
lanes: 4,
chunkSize: 256 * 1024,
partition: 'range',
});
expect(received).toEqual(input);
expect(senderResult.sha256).toBe(hex(sha256Once(input)));
},
30_000,
);
test('upload with ReadableStream input — round-robin auto-pick', async () => {
const servers = await setup();
const input = crypto.randomBytes(512 * 1024);
const stream = new ReadableStream<Uint8Array>({
start(controller) {
for (let off = 0; off < input.length; off += 32 * 1024) {
controller.enqueue(input.subarray(off, Math.min(off + 32 * 1024, input.length)));
}
controller.close();
},
});
const { senderResult, received } = await uploadRoundtrip(servers, stream as unknown as Uint8Array, {
lanes: 4,
chunkSize: 32 * 1024,
});
expect(received).toEqual(input);
expect(senderResult.sha256).toBe(hex(sha256Once(input)));
});
test('peer offline → TransferOfflineError', async () => {
const { a: ctrlA } = MemoryControlChannel.linked('alice', 'bob');
const sender = new TransferEngine({
crypto,
controlChannel: ctrlA,
transport: new ShadeTransferHttpTransport({
resolveBaseUrl: async () => 'http://127.0.0.1:1', // intentionally unreachable
}),
myAddress: 'alice',
});
cleanups.push(() => sender.destroy());
await expect(
sender.upload({
to: 'bob',
input: crypto.randomBytes(64),
}),
).rejects.toThrow(/offline|fetch failed|connection|ECONN|connect/i);
});
});

View File

@@ -0,0 +1,193 @@
import { describe, test, expect } from 'bun:test';
import { SubtleCryptoProvider } from '@shade/crypto-web';
import { sha256Once } from '@shade/streams';
import {
TransferEngine,
MemoryControlChannel,
MemoryTransferTransport,
} from '../src/index.js';
import type { IncomingTransfer, TransferResult } from '../src/index.js';
const crypto = new SubtleCryptoProvider();
function hex(bytes: Uint8Array): string {
return Array.from(bytes, (b) => b.toString(16).padStart(2, '0')).join('');
}
interface PairedEngines {
sender: TransferEngine;
receiver: TransferEngine;
}
function makePair(): PairedEngines {
const { a: ctrlA, b: ctrlB } = MemoryControlChannel.linked('alice', 'bob');
const { a: txA, b: txB } = MemoryTransferTransport.linked('alice', 'bob');
const senderEngine = new TransferEngine({
crypto,
controlChannel: ctrlA,
transport: txA,
myAddress: 'alice',
});
const receiverEngine = new TransferEngine({
crypto,
controlChannel: ctrlB,
transport: txB,
myAddress: 'bob',
});
// Wire receiver-side transport to route chunks into receiver-engine.
txB.setChunkHandler(async (from, streamId, laneId, seq, bytes) =>
receiverEngine.receiveChunk(from, streamId, laneId, seq, bytes),
);
txB.setResumeProvider(async (from, streamId) =>
receiverEngine.getResumeState(from, streamId),
);
return { sender: senderEngine, receiver: receiverEngine };
}
async function uploadAndAwait(
pair: PairedEngines,
input: Uint8Array,
opts?: { lanes?: number; chunkSize?: number; partition?: 'auto' | 'range' | 'round-robin' },
): Promise<{ result: TransferResult; received: Uint8Array }> {
// The handler accepts and PUBLISHES the receive-handle out-of-band so
// it can return promptly (control channel awaits handler completion).
let resolveReceiveHandle!: (h: import('../src/index.js').TransferHandle) => void;
const receiveHandlePromise = new Promise<import('../src/index.js').TransferHandle>(
(r) => { resolveReceiveHandle = r; },
);
const unsubscribe = pair.receiver.onIncomingTransfer(async (incoming: IncomingTransfer) => {
const handle = await incoming.accept({ output: { kind: 'buffer' } });
resolveReceiveHandle(handle);
});
const handle = await pair.sender.upload({
to: 'bob',
input,
...(opts?.lanes !== undefined ? { lanes: opts.lanes } : {}),
...(opts?.chunkSize !== undefined ? { chunkSize: opts.chunkSize } : {}),
...(opts?.partition !== undefined ? { partition: opts.partition } : {}),
metadata: { name: 'test.bin', contentType: 'application/octet-stream' },
});
const recvHandle = await receiveHandlePromise;
const [senderResult, receiverResult] = await Promise.all([handle.done(), recvHandle.done()]);
unsubscribe();
const bytes =
(receiverResult as TransferResult & { bytes?: Uint8Array }).bytes ?? new Uint8Array(0);
return { result: senderResult, received: bytes };
}
describe('TransferEngine (memory loopback)', () => {
test('1 KiB upload — 1 lane (auto-degrade for small file)', async () => {
const pair = makePair();
const input = crypto.randomBytes(1024);
const { result, received } = await uploadAndAwait(pair, input, { lanes: 4, chunkSize: 256 });
expect(received).toEqual(input);
expect(result.sha256).toBe(hex(sha256Once(input)));
});
test('256 KiB upload — 4 lanes range partition', async () => {
const pair = makePair();
const input = crypto.randomBytes(256 * 1024);
const { result, received } = await uploadAndAwait(pair, input, {
lanes: 4,
chunkSize: 16 * 1024,
partition: 'range',
});
expect(received).toEqual(input);
expect(result.sha256).toBe(hex(sha256Once(input)));
});
test('1 MiB upload — 4 lanes round-robin partition', async () => {
const pair = makePair();
const input = crypto.randomBytes(1024 * 1024);
const { result, received } = await uploadAndAwait(pair, input, {
lanes: 4,
chunkSize: 64 * 1024,
partition: 'round-robin',
});
expect(received).toEqual(input);
expect(result.sha256).toBe(hex(sha256Once(input)));
});
test('integrity: same sha256 across lane counts', async () => {
const input = crypto.randomBytes(512 * 1024);
const expected = hex(sha256Once(input));
for (const lanes of [1, 2, 4, 8]) {
const pair = makePair();
const { result, received } = await uploadAndAwait(pair, input, { lanes, chunkSize: 8 * 1024 });
expect(received).toEqual(input);
expect(result.sha256).toBe(expected);
}
});
test('upload with ReadableStream input → round-robin partition', async () => {
const pair = makePair();
const input = crypto.randomBytes(300 * 1024);
const stream = new ReadableStream<Uint8Array>({
start(controller) {
for (let off = 0; off < input.length; off += 64 * 1024) {
controller.enqueue(input.subarray(off, Math.min(off + 64 * 1024, input.length)));
}
controller.close();
},
});
let resolveRecv!: (h: import('../src/index.js').TransferHandle) => void;
const recvHandlePromise = new Promise<import('../src/index.js').TransferHandle>((r) => {
resolveRecv = r;
});
const unsubscribe = pair.receiver.onIncomingTransfer(async (incoming) => {
const h = await incoming.accept({ output: { kind: 'buffer' } });
resolveRecv(h);
});
const handle = await pair.sender.upload({
to: 'bob',
input: stream,
lanes: 4,
chunkSize: 32 * 1024,
});
const recvHandle = await recvHandlePromise;
const [, recvResult] = await Promise.all([handle.done(), recvHandle.done()]);
unsubscribe();
const bytes = (recvResult as TransferResult & { bytes?: Uint8Array }).bytes ?? new Uint8Array();
expect(bytes).toEqual(input);
});
test('progress events fire and end with complete', async () => {
const pair = makePair();
const input = crypto.randomBytes(64 * 1024);
const senderProgressSamples: number[] = [];
const receiverEvents: string[] = [];
let resolveRecv!: (h: import('../src/index.js').TransferHandle) => void;
const recvHandlePromise = new Promise<import('../src/index.js').TransferHandle>((r) => {
resolveRecv = r;
});
const unsub = pair.receiver.onIncomingTransfer(async (incoming) => {
const h = await incoming.accept({
output: { kind: 'buffer' },
onEvent: (e) => {
receiverEvents.push(e.type);
},
});
resolveRecv(h);
});
const handle = await pair.sender.upload({
to: 'bob',
input,
lanes: 2,
chunkSize: 8 * 1024,
onProgress: (p) => senderProgressSamples.push(p.bytesSent),
});
const recvHandle = await recvHandlePromise;
await Promise.all([handle.done(), recvHandle.done()]);
unsub();
expect(senderProgressSamples.length).toBeGreaterThan(0);
expect(senderProgressSamples[senderProgressSamples.length - 1]).toBe(64 * 1024);
expect(receiverEvents).toContain('start');
expect(receiverEvents).toContain('complete');
});
});

View File

@@ -0,0 +1,152 @@
import { describe, test, expect } from 'bun:test';
import { SubtleCryptoProvider } from '@shade/crypto-web';
import { sha256Once } from '@shade/streams';
import {
TransferEngine,
MemoryControlChannel,
MemoryResumeStore,
ShadeTransferHttpTransport,
createTransferRoutes,
type TransferHandle,
type TransferResult,
} from '../src/index.js';
const crypto = new SubtleCryptoProvider();
function hex(b: Uint8Array): string {
return Array.from(b, (x) => x.toString(16).padStart(2, '0')).join('');
}
describe('Resume protocol — kill-restart-verify', () => {
test('sender crash mid-transfer → resumeUpload completes the same stream', async () => {
const senderResumeStore = new MemoryResumeStore();
// Stable deviceKey shared across engine instances — simulates a stable
// identity-derived key. In real use this is `deriveDeviceKey(identity)`.
const deviceKey = crypto.randomBytes(32);
// Receiver is a single, long-lived engine. Its in-memory IncomingState
// already tracks accepted chunks; we don't need to persist receiver state
// for the in-process scenario.
const { a: ctrlA, b: ctrlB } = MemoryControlChannel.linked('alice', 'bob');
const receiverEngine = new TransferEngine({
crypto,
controlChannel: ctrlB,
transport: {
probe: async () => undefined,
sendChunk: async () => {
throw new Error('receiver-side sendChunk should not be called');
},
fetchResumeState: async () => null,
},
myAddress: 'bob',
});
const receiverApp = await createTransferRoutes(receiverEngine);
const port = 22000 + Math.floor(Math.random() * 500);
const server = Bun.serve({ port, fetch: receiverApp.fetch });
const baseUrl = `http://localhost:${port}`;
// Receiver accepts incoming.
let resolveRecv!: (h: TransferHandle) => void;
const recvHandlePromise = new Promise<TransferHandle>((r) => {
resolveRecv = r;
});
receiverEngine.onIncomingTransfer(async (incoming) => {
const h = await incoming.accept({ output: { kind: 'buffer' } });
resolveRecv(h);
});
// Sender #1 — this one will "crash" partway through.
const senderEngine1 = new TransferEngine({
crypto,
controlChannel: ctrlA,
transport: new ShadeTransferHttpTransport({
resolveBaseUrl: async () => baseUrl,
authenticator: {
signChunk: async () => ({ 'X-Shade-Sender-Address': 'alice' }),
signControl: async () => ({ 'X-Shade-Sender-Address': 'alice' }),
},
}),
myAddress: 'alice',
resumeStore: senderResumeStore,
deviceKey,
});
const input = crypto.randomBytes(256 * 1024); // 256 KiB
// Start an upload that will be intentionally interrupted. We pause the
// sender by aborting the upload after ~25% bytes. The receiver still
// holds its IncomingState — chunks already accepted stay tracked.
const abort = new AbortController();
let bytesAtAbort = 0;
const handle1 = await senderEngine1.upload({
to: 'bob',
input,
lanes: 4,
chunkSize: 8 * 1024,
partition: 'range',
signal: abort.signal,
onProgress: (p) => {
if (p.bytesSent > input.length / 4 && bytesAtAbort === 0) {
bytesAtAbort = p.bytesSent;
abort.abort();
}
},
});
const streamId = handle1.streamId;
// Wait for the upload to fail (abort).
let firstErrCaught = false;
try {
await handle1.done();
} catch {
firstErrCaught = true;
}
expect(firstErrCaught).toBe(true);
expect(bytesAtAbort).toBeGreaterThan(0);
// Tear down the first sender engine — simulates a crashed/restarted client.
senderEngine1.destroy();
// Sender #2 — fresh engine, same resume store.
const senderEngine2 = new TransferEngine({
crypto,
controlChannel: ctrlA, // re-use the link; in real life this would
transport: new ShadeTransferHttpTransport({
resolveBaseUrl: async () => baseUrl,
authenticator: {
signChunk: async () => ({ 'X-Shade-Sender-Address': 'alice' }),
signControl: async () => ({ 'X-Shade-Sender-Address': 'alice' }),
},
}),
myAddress: 'alice',
resumeStore: senderResumeStore,
deviceKey,
});
// Verify state is still in the resume store.
const persisted = await senderResumeStore.get(streamId);
expect(persisted).not.toBeNull();
expect(persisted!.status).toBe('active');
// Resume the upload with the same input bytes.
const handle2 = await senderEngine2.resumeUpload(streamId, input);
const senderResult = await handle2.done();
expect(senderResult.streamId).toBe(streamId);
// Receiver finishes too.
const recvHandle = await recvHandlePromise;
const recvResult = await recvHandle.done();
const received =
(recvResult as TransferResult & { bytes?: Uint8Array }).bytes ?? new Uint8Array();
expect(received).toEqual(input);
expect(senderResult.sha256).toBe(hex(sha256Once(input)));
// Persisted state should now be finished.
const after = await senderResumeStore.get(streamId);
expect(after?.status).toBe('finished');
senderEngine2.destroy();
receiverEngine.destroy();
server.stop();
}, 30_000);
});

View File

@@ -0,0 +1,98 @@
import { describe, test, expect, afterAll } from 'bun:test';
import { SubtleCryptoProvider } from '@shade/crypto-web';
import { sha256Once } from '@shade/streams';
import {
TransferEngine,
MemoryControlChannel,
ShadeTransferHttpTransport,
ShadeTransferWsTransport,
FallbackTransferTransport,
createTransferRoutes,
type TransferHandle,
type TransferResult,
} from '../src/index.js';
const crypto = new SubtleCryptoProvider();
function hex(b: Uint8Array): string {
return Array.from(b, (x) => x.toString(16).padStart(2, '0')).join('');
}
const cleanups: Array<() => void> = [];
afterAll(() => {
for (const c of cleanups) c();
});
describe('WS opt-in transport with HTTP fallback', () => {
test('WS connect failure → falls back to HTTP transparently', async () => {
const { a: ctrlA, b: ctrlB } = MemoryControlChannel.linked('alice', 'bob');
const receiverEngine = new TransferEngine({
crypto,
controlChannel: ctrlB,
transport: {
probe: async () => undefined,
sendChunk: async () => {
throw new Error('not used');
},
fetchResumeState: async () => null,
},
myAddress: 'bob',
});
const httpApp = await createTransferRoutes(receiverEngine);
const httpPort = 23000 + Math.floor(Math.random() * 500);
const httpServer = Bun.serve({ port: httpPort, fetch: httpApp.fetch });
const httpBaseUrl = `http://localhost:${httpPort}`;
cleanups.push(() => httpServer.stop());
const ws = new ShadeTransferWsTransport({
// Resolve to a closed port — guaranteed connect failure.
resolveWsUrl: async () => `ws://127.0.0.1:1`,
connectTimeoutMs: 500,
});
const http = new ShadeTransferHttpTransport({
resolveBaseUrl: async () => httpBaseUrl,
authenticator: {
signChunk: async () => ({ 'X-Shade-Sender-Address': 'alice' }),
signControl: async () => ({ 'X-Shade-Sender-Address': 'alice' }),
},
});
const fallback = new FallbackTransferTransport(ws, http);
const senderEngine = new TransferEngine({
crypto,
controlChannel: ctrlA,
transport: fallback,
myAddress: 'alice',
});
cleanups.push(() => senderEngine.destroy());
cleanups.push(() => receiverEngine.destroy());
let resolveRecv!: (h: TransferHandle) => void;
const recvHandlePromise = new Promise<TransferHandle>((r) => {
resolveRecv = r;
});
receiverEngine.onIncomingTransfer(async (incoming) => {
const h = await incoming.accept({ output: { kind: 'buffer' } });
resolveRecv(h);
});
const input = crypto.randomBytes(64 * 1024);
const handle = await senderEngine.upload({
to: 'bob',
input,
lanes: 2,
chunkSize: 8 * 1024,
});
const recvHandle = await recvHandlePromise;
const [senderResult, recvResult] = await Promise.all([
handle.done(),
recvHandle.done(),
]);
expect(fallback.fellBack).toBe(true);
const received =
(recvResult as TransferResult & { bytes?: Uint8Array }).bytes ?? new Uint8Array();
expect(received).toEqual(input);
expect(senderResult.sha256).toBe(hex(sha256Once(input)));
});
});

View File

@@ -0,0 +1,8 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "dist",
"rootDir": "src"
},
"include": ["src"]
}