Files
Shade/packages/shade-files/src/server/handler.ts

702 lines
25 KiB
TypeScript
Raw Normal View History

feat(files): @shade/files 0.3.0 — E2EE filesystem RPC primitive M-Files-1..6 land the full files-RPC layer + everything 0.3.0 needs to ship. Apps keep their own UI; this layer ships the typed RPC, the streams bridge for content I/O, and production hooks (rate limit, retention, fingerprint gate, metrics). @shade/files (NEW) - Standard ops: list/stat/mkdir/delete/move/read/write/getThumbnail with Zod-validated wire schemas + clean user-handler types. - Custom ops: typed via TypeScript declaration merging on CustomOpsMap + per-op Zod schemas; client.custom('app.foo', {...}) is fully typed. - Content I/O: inline (≤ 256 KiB plaintext) base64-in-RPC; streams (> 256 KiB) ride @shade/transfer via userMetadata.shadeFilesWriteId / shadeFilesReadStreamId correlation. Server-side TransformStream bridges accept inbound transfers immediately (engine rejects chunks that arrive before accept) and park the readable for the matching RPC. - Directory ops: walk(path, opts) async-iterable depth-first walker; uploadDirectory()/downloadDirectory() with bounded concurrency pool (default 4, cap 16), aggregated progress, abort. - Production hooks (callback-based, vendor-neutral): rate-limit (op + byte), idempotency cache (LRU + TTL + in-flight de-dupe), path policy (traversal + percent-decode hardening), fingerprint gate (required/optional/reject), pluggable Ed25519 sig verification with ±5 min replay window, onMetric sink (standard names). - React hooks (subpath @shade/files/react): ShadeFilesProvider, useShadeFiles, useFileList, useFileTransfer/Upload/Download. - Shade.files.serve(handler) + Shade.files.client(peer) high-level entrypoint in @shade/sdk; lazy + memoized; one handler per Shade. Wire format bump - @shade/proto wire VERSION 0x01 → 0x02. Length prefixes changed from u16 to u32. The previous u16 silently truncated payloads above 64 KiB — a hard correctness ceiling that blocked inline file ops up to 256 KiB. Wire-incompatible with 0.2.x peers; new sessions only. Cross-platform Kotlin port (android/shade-android) updated to match; test-vectors/wire-format.json regenerated. Concurrency safety - ShadeSessionManager.encrypt/.decrypt now run under per-peer mutex. Concurrent decryptions of the same peer raced ratchet state (manifested as sporadic "Failed to decrypt — wrong key or tampered data" under load — surfaced once concurrent uploadDirectory pumped many writes in flight). Encrypt was already serialized via Shade.send's encryptChains; decrypt is now serialized at the manager layer too. @shade/streams extension - StreamMetadata.userMetadata?: Record<string, string> for application-level key/value pairs that round-trip verbatim through stream-init plaintext. Used by @shade/files for write/read correlation; available to any consumer. @shade/sdk extension - Shade.files getter (lazy + memoized). - BackgroundHooks.onPruneFiles + periodic timer (default 5 min) + BackgroundTasks.setHook(name, fn) for runtime hook registration. Bundles in-flight 0.2.0 work - packages/shade-streams/, packages/shade-transfer/, related shade-sdk streams-bridge + shade-widgets transfer hooks were uncommitted prior to this session. Including them keeps the workspace consistent at 0.3.0 since @shade/files depends on them. Tests - 74 new tests in @shade/files (572 → 646 workspace pass; 0 fail; 3× stable). Coverage spans unit (inline-threshold + concurrency), integration (read-write inline + streams up to 1 MiB, walk + upload/download directory, custom-op, metrics, SDK namespace end-to-end), and security (tampered-envelope sig verification, replay window, fingerprint gate, rate-limit + quota). Release artifacts - All packages bumped to 0.3.0 via scripts/bump-version.ts. - scripts/publish-all.ts PACKAGES updated with shade-files in topological order (after shade-transfer, before shade-sdk). - bun run publish:dry clean (14 packed, 0 failed). - examples/08-files-browser/ — three-process CLI demo (prekey + Bob server + Alice CLI) covering list/stat/mkdir/delete/upload/download. - docs/files.md — full API + design doc. - CHANGELOG.md 0.3.0 entry. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 14:00:01 +02:00
import type { Shade } from '@shade/sdk';
import type { ZodTypeAny } from 'zod';
import {
MUTATION_OPS,
opOfKind,
responseKindOf,
type StandardOp,
} from '../protocol/kinds.js';
import {
DeleteArgsSchema,
DeleteResultSchema,
GetThumbnailArgsSchema,
GetThumbnailResultSchema,
ListArgsSchema,
ListResultSchema,
MkdirArgsSchema,
MkdirResultSchema,
MoveArgsSchema,
MoveResultSchema,
ReadArgsSchema,
ReadResultSchema,
StatArgsSchema,
StatResultSchema,
WriteArgsSchema,
WriteResultSchema,
type DeleteArgs,
type DeleteResult,
type GetThumbnailArgs,
type ListArgs,
type ListResult,
type MkdirArgs,
type MkdirResult,
type MoveArgs,
type MoveResult,
type ReadArgs,
type StatArgs,
type StatResult,
type WriteArgs,
type WriteResult,
} from '../schemas/ops.js';
import {
CancelledError,
CustomOpRejectedError,
FileError,
FingerprintRequiredError,
InvalidArgsError,
InvalidSignatureError,
NotImplementedError,
PathValidationError,
payloadFromError,
} from '../schemas/errors.js';
import type { RpcCancel, RpcError, RpcRequest, RpcResponse } from '../schemas/envelope.js';
import { buildOpContext, type OpContext } from './handler-context.js';
import { IdempotencyCache, type IdempotencyCacheOptions } from './idempotency-cache.js';
import { RateLimiter, type RateLimitConfig } from './rate-limiter.js';
import { validatePath, type PathPolicy } from './path-policy.js';
import {
adaptReadResult,
adaptThumbnailResult,
adaptWriteArgs,
} from './io-adapters.js';
import type {
UserReadResult,
UserThumbnailResult,
UserWriteArgs,
} from './io-types.js';
import type { ServerStreamsBridge } from './streams-bridge.js';
import type { CustomOpRegistrations } from './custom-ops.js';
import {
METRIC_BYTES_IN,
METRIC_BYTES_OUT,
METRIC_FINGERPRINT_REJECT_TOTAL,
METRIC_IDEMPOTENCY_CONFLICT_TOTAL,
METRIC_IDEMPOTENCY_HIT_TOTAL,
METRIC_OP_DURATION_MS,
METRIC_OP_TOTAL,
METRIC_RATE_LIMIT_REJECT_TOTAL,
METRIC_SIGNATURE_REJECT_TOTAL,
NOOP_METRIC_SINK,
type MetricSink,
} from './metrics.js';
import {
CustomArgsSchema,
CustomResultSchema,
type CustomArgs,
} from '../schemas/ops.js';
import { canonicalRpcBytes, hashArgs } from '../protocol/canonical.js';
/** Replay window for the `signedAt` field on inbound RPC envelopes. */
export const MAX_SIGNATURE_AGE_MS = 5 * 60 * 1000;
// ─── Public types ────────────────────────────────────────────
export interface FileHandlerOps {
list?: (ctx: OpContext<ListArgs>) => Promise<ListResult>;
stat?: (ctx: OpContext<StatArgs>) => Promise<StatResult>;
mkdir?: (ctx: OpContext<MkdirArgs>) => Promise<MkdirResult>;
delete?: (ctx: OpContext<DeleteArgs>) => Promise<DeleteResult>;
move?: (ctx: OpContext<MoveArgs>) => Promise<MoveResult>;
/**
* User-supplied read handler. Returns either an `inline` payload ( 256
* KiB) or a `streams` payload with a precomputed sha256. The dispatcher
* adapts to the wire shape.
*/
read?: (ctx: OpContext<ReadArgs>) => Promise<UserReadResult>;
/**
* User-supplied write handler. Receives `UserWriteArgs` with a clean
* `Uint8Array` (inline) or `ReadableStream` + sha256-promise (streams)
* shape the dispatcher hides the base64 / writeId wire details.
*/
write?: (ctx: OpContext<UserWriteArgs>) => Promise<WriteResult>;
/**
* User-supplied thumbnail handler. Bytes are validated for format magic
* before they're shipped to prevent format misclassification attacks.
*/
getThumbnail?: (ctx: OpContext<GetThumbnailArgs>) => Promise<UserThumbnailResult>;
}
export interface FileHandlerConfig extends FileHandlerOps {
pathPolicy?: PathPolicy;
rateLimits?: RateLimitConfig;
idempotency?: IdempotencyCacheOptions;
/**
* Required for read/write `streams` ops. Coordinates the inbound/outbound
* `@shade/transfer` transfers via `userMetadata.shadeFiles*Id` keys.
*/
streamsBridge?: ServerStreamsBridge;
/** Custom ops registry — see `CustomOpsMap` declaration-merging. */
custom?: CustomOpRegistrations;
/**
* Verify the Ed25519 signature on inbound RPC envelopes. Pluggable so
* apps can plug their own peer-identity store. Returning `false` rejects
* with `InvalidSignatureError`. Default: skip (Double Ratchet AEAD on
* the underlying envelope already authenticates the sender).
*
* The `signedAt` replay-window check (±5 min) is enforced regardless.
*/
verifySender?: (
sender: string,
canonicalBytes: Uint8Array,
sig: string,
) => boolean | Promise<boolean>;
/**
* Per-op fingerprint-verification gate. Return `'required'` to demand
* the peer's fingerprint has been verified out-of-band (via
* `isFingerprintVerified`); `'reject'` to deny outright;
* `'optional'` (default) to allow.
*/
requireFingerprintVerifiedFor?: (
ctx: OpContext<unknown>,
) => 'required' | 'optional' | 'reject' | Promise<'required' | 'optional' | 'reject'>;
/** Lookup whether the consumer has out-of-band verified the peer. */
isFingerprintVerified?: (sender: string) => boolean | Promise<boolean>;
/** Vendor-neutral metrics sink. */
onMetric?: MetricSink;
/** Called BEFORE the handler runs. Throw to deny. */
beforeOp?: (ctx: OpContext<unknown>) => void | Promise<void>;
/** Called AFTER the handler returns. Result is the validated response. */
afterOp?: (ctx: OpContext<unknown>, result: unknown) => void | Promise<void>;
/** Called when an op fails. Receives the error and the context. */
onError?: (err: unknown, ctx: OpContext<unknown>) => void;
/** Default per-op timeout in ms. Default 60_000. */
defaultTimeoutMs?: number;
/** Hard deadline for streams-bridge awaits / outbound transfers. Default 60_000. */
ioTimeoutMs?: number;
}
export interface FileHandler {
/** Handle an incoming request envelope. Returns the envelope to send back. */
handleRequest(from: string, request: RpcRequest): Promise<RpcResponse | RpcError>;
/** Handle an incoming cancel envelope. */
handleCancel(from: string, cancel: RpcCancel): void;
/** Free up internal state (timers, abort listeners). */
destroy(): void;
}
interface OpSchemaPair {
args: ZodTypeAny;
result: ZodTypeAny;
}
const OP_SCHEMAS: Record<StandardOp, OpSchemaPair> = {
list: { args: ListArgsSchema, result: ListResultSchema },
stat: { args: StatArgsSchema, result: StatResultSchema },
mkdir: { args: MkdirArgsSchema, result: MkdirResultSchema },
delete: { args: DeleteArgsSchema, result: DeleteResultSchema },
move: { args: MoveArgsSchema, result: MoveResultSchema },
read: { args: ReadArgsSchema, result: ReadResultSchema },
write: { args: WriteArgsSchema, result: WriteResultSchema },
getThumbnail: { args: GetThumbnailArgsSchema, result: GetThumbnailResultSchema },
};
// ─── createFileHandler ───────────────────────────────────────
/**
* Build a `FileHandler` for the server side. The returned object is
* registered with `ShadeFileRpcChannel.setHooks({ onRequest })` (typically
* via `Shade.files.serve(...)` in the SDK).
*/
export function createFileHandler(
shade: Shade,
config: FileHandlerConfig,
): FileHandler {
const idempotency = new IdempotencyCache(config.idempotency);
const rateLimiter = new RateLimiter(config.rateLimits);
const inflightCancellers = new Map<string, AbortController>();
const defaultTimeoutMs = config.defaultTimeoutMs ?? 60_000;
const ioTimeoutMs = config.ioTimeoutMs ?? 60_000;
const metrics: MetricSink = config.onMetric ?? NOOP_METRIC_SINK;
const customRegistrations = config.custom ?? {};
const isCustomKind = (kind: string): boolean => kind === 'shade.fs.custom/v1';
async function handleRequest(
from: string,
request: RpcRequest,
): Promise<RpcResponse | RpcError> {
// 0. Replay-window check (independent of sig — defends against
// intercept-and-resend even when sig verification is disabled).
const skewMs = Math.abs(Date.now() - request.signedAt);
if (skewMs > MAX_SIGNATURE_AGE_MS) {
metrics(METRIC_SIGNATURE_REJECT_TOTAL, 1, { reason: 'skew' });
return makeErrorEnvelope(
request,
new InvalidSignatureError(
`signedAt is ${skewMs}ms outside the ±${MAX_SIGNATURE_AGE_MS}ms replay window`,
),
);
}
// 0b. Optional Ed25519 sig verification — pluggable. The canonical
// bytes bind sender + kind + id + signedAt + sha256(args).
if (config.verifySender !== undefined) {
const argsHashBytes = hashArgs(request.args);
const canonical = canonicalRpcBytes({
address: from,
signedAt: request.signedAt,
kind: request.kind,
id: request.id,
argsHash: argsHashBytes,
});
let ok = false;
try {
ok = await config.verifySender(from, canonical, request.sig);
} catch (err) {
metrics(METRIC_SIGNATURE_REJECT_TOTAL, 1, { reason: 'throw' });
return makeErrorEnvelope(request, err);
}
if (!ok) {
metrics(METRIC_SIGNATURE_REJECT_TOTAL, 1, { reason: 'mismatch' });
return makeErrorEnvelope(request, new InvalidSignatureError());
}
}
// 1. Resolve op + handler. Custom ops route through `config.custom`.
let op: StandardOp | 'custom';
let argSchema: ZodTypeAny;
let resultSchema: ZodTypeAny;
let customHandler: CustomOpRegistrations[string] | undefined;
if (isCustomKind(request.kind)) {
op = 'custom';
argSchema = CustomArgsSchema;
resultSchema = CustomResultSchema;
} else {
const std = opOfKind(request.kind);
if (std === null) {
return makeErrorEnvelope(request, new NotImplementedError(request.kind));
}
op = std;
argSchema = OP_SCHEMAS[std].args;
resultSchema = OP_SCHEMAS[std].result;
const handler = config[std];
if (handler === undefined) {
return makeErrorEnvelope(request, new NotImplementedError(std));
}
}
// 2. Args validation (wire shape)
const argParse = argSchema.safeParse(request.args);
if (!argParse.success) {
const issue = argParse.error.issues[0];
return makeErrorEnvelope(
request,
new InvalidArgsError(
issue?.message ?? 'invalid arguments',
issue?.path.join('.') || undefined,
),
);
}
const parsedArgs = argParse.data as unknown;
// 2b. Custom op resolution
let resolvedOpKind: string = op;
if (op === 'custom') {
const customArgs = parsedArgs as CustomArgs;
customHandler = customRegistrations[customArgs.name];
if (customHandler === undefined) {
return makeErrorEnvelope(
request,
new NotImplementedError(`custom:${customArgs.name}`),
);
}
resolvedOpKind = `custom:${customArgs.name}`;
// Validate inner payload against the custom op's args schema
const payloadParse = customHandler.args.safeParse(customArgs.payload);
if (!payloadParse.success) {
const issue = payloadParse.error.issues[0];
return makeErrorEnvelope(
request,
new InvalidArgsError(
issue?.message ?? 'invalid custom-op payload',
issue?.path.join('.') || undefined,
),
);
}
// Replace payload with validated value (Zod may apply defaults).
(parsedArgs as CustomArgs).payload = payloadParse.data;
}
// 3. Path validation (skip ops without a path)
let primaryPath = '';
if (op === 'move') {
primaryPath = (parsedArgs as MoveArgs).src;
} else if (op !== 'custom' && 'path' in (parsedArgs as object)) {
primaryPath = (parsedArgs as { path: string }).path;
}
let normalizedPath = primaryPath;
if (primaryPath !== '') {
const validated = validatePath(primaryPath, config.pathPolicy);
if (!validated.ok) {
return makeErrorEnvelope(request, new PathValidationError(validated.reason));
}
normalizedPath = validated.normalized;
if (op === 'move') {
const dstValid = validatePath((parsedArgs as MoveArgs).dst, config.pathPolicy);
if (!dstValid.ok) {
return makeErrorEnvelope(
request,
new PathValidationError(dstValid.reason, 'dst'),
);
}
(parsedArgs as MoveArgs).src = normalizedPath;
(parsedArgs as MoveArgs).dst = dstValid.normalized;
} else {
(parsedArgs as { path: string }).path = normalizedPath;
}
}
// 4. Rate-limit acquire
const opCostKey = op === 'custom' ? 'custom' : op;
const customCost = customHandler?.cost;
const estimatedBytes = estimateBytes(op === 'custom' ? 'custom' : op, parsedArgs);
try {
if (customCost !== undefined) {
// Custom op-specific cost: acquire that many op-tokens manually.
// Fall back to standard `custom` bucket cost for non-overridden.
rateLimiter.acquire(from, opCostKey, estimatedBytes);
// For overridden cost > 1, deduct extra tokens from the same bucket.
// The simplest route: re-acquire (cost - 1) more.
for (let i = 1; i < customCost; i++) {
rateLimiter.acquire(from, opCostKey, 0);
}
} else {
rateLimiter.acquire(from, opCostKey, estimatedBytes);
}
} catch (err) {
metrics(METRIC_RATE_LIMIT_REJECT_TOTAL, 1, { op: resolvedOpKind });
return makeErrorEnvelope(request, err);
}
// 5. Idempotency (mutations only)
const isMutation = MUTATION_OPS.has(opCostKey);
let commitIdem: ((response: unknown) => void) | null = null;
let abandonIdem: (() => void) | null = null;
if (isMutation && request.idempotencyKey !== undefined) {
try {
const begin = idempotency.begin(from, request.idempotencyKey, parsedArgs);
if (begin.status === 'replay') {
rateLimiter.release(from, opCostKey, estimatedBytes);
metrics(METRIC_IDEMPOTENCY_HIT_TOTAL, 1, { op: resolvedOpKind });
return makeResponseEnvelope(request, begin.response);
}
if (begin.status === 'wait') {
const cached = await begin.promise;
rateLimiter.release(from, opCostKey, estimatedBytes);
metrics(METRIC_IDEMPOTENCY_HIT_TOTAL, 1, { op: resolvedOpKind });
return makeResponseEnvelope(request, cached);
}
commitIdem = begin.commit;
abandonIdem = begin.abandon;
} catch (err) {
rateLimiter.release(from, opCostKey, estimatedBytes);
if (
err !== null &&
typeof err === 'object' &&
(err as { code?: string }).code === 'SHADE_FS_IDEMPOTENCY_CONFLICT'
) {
metrics(METRIC_IDEMPOTENCY_CONFLICT_TOTAL, 1, { op: resolvedOpKind });
}
return makeErrorEnvelope(request, err);
}
}
// 6. Build context + abort controller
const controller = new AbortController();
inflightCancellers.set(request.id, controller);
const ctx = buildOpContext({
op: op === 'custom' ? (resolvedOpKind as `custom:${string}`) : op,
path: normalizedPath,
parsedArgs,
sender: from,
signal: controller.signal,
idempotencyKey: request.idempotencyKey,
attemptNumber: request.attempt ?? 1,
shade,
});
// 7. Fingerprint gate
if (config.requireFingerprintVerifiedFor !== undefined) {
let gate: 'required' | 'optional' | 'reject';
try {
gate = await config.requireFingerprintVerifiedFor(ctx as OpContext<unknown>);
} catch (err) {
cleanup({ release: true });
return makeErrorEnvelope(request, err);
}
if (gate === 'reject') {
cleanup({ release: true });
metrics(METRIC_FINGERPRINT_REJECT_TOTAL, 1, { op: resolvedOpKind, gate: 'reject' });
return makeErrorEnvelope(
request,
new FingerprintRequiredError('operation rejected by fingerprint policy'),
);
}
if (gate === 'required') {
let verified = false;
try {
verified = config.isFingerprintVerified !== undefined
? Boolean(await config.isFingerprintVerified(from))
: false;
} catch (err) {
cleanup({ release: true });
return makeErrorEnvelope(request, err);
}
if (!verified) {
cleanup({ release: true });
metrics(METRIC_FINGERPRINT_REJECT_TOTAL, 1, { op: resolvedOpKind, gate: 'required' });
return makeErrorEnvelope(request, new FingerprintRequiredError());
}
}
}
// 8. beforeOp
try {
if (config.beforeOp !== undefined) {
await config.beforeOp(ctx as OpContext<unknown>);
}
} catch (err) {
cleanup({ release: true });
return makeErrorEnvelope(request, err);
}
// 9. Run handler with timeout race — adapting I/O ops as needed.
const timeoutMs = Math.min(
request.deadlineMs ?? defaultTimeoutMs,
defaultTimeoutMs,
);
let wireResult: unknown;
const startedAt = Date.now();
try {
wireResult = await runWithTimeout(
() => invokeOpHandler({
op,
stdHandler: op === 'custom' ? undefined : (config[op] as unknown),
customHandler,
ctx: ctx as OpContext<unknown>,
parsedArgs,
sender: from,
signal: controller.signal,
streamsBridge: config.streamsBridge,
ioTimeoutMs,
}),
timeoutMs,
controller,
);
} catch (err) {
const durationMs = Date.now() - startedAt;
metrics(METRIC_OP_DURATION_MS, durationMs, { op: resolvedOpKind, result: 'error' });
metrics(METRIC_OP_TOTAL, 1, { op: resolvedOpKind, result: 'error' });
cleanup({ release: true });
if (config.onError !== undefined) {
try {
config.onError(err, ctx as OpContext<unknown>);
} catch (hookErr) {
console.error('[FileHandler] onError hook threw:', hookErr);
}
}
return makeErrorEnvelope(request, err);
}
// 10. Custom-op response validation against the registered schema.
if (op === 'custom' && customHandler !== undefined) {
const innerParse = customHandler.response.safeParse(wireResult);
if (!innerParse.success) {
cleanup({ release: true });
return makeErrorEnvelope(
request,
new CustomOpRejectedError(
`custom-op response shape rejected by registered schema: ${innerParse.error.issues[0]?.message ?? 'unknown'}`,
),
);
}
wireResult = { result: innerParse.data };
}
// 11. Defensive response validation against the wire schema.
const resultParse = resultSchema.safeParse(wireResult);
if (!resultParse.success) {
cleanup({ release: true });
return makeErrorEnvelope(
request,
new InvalidArgsError(
`handler for ${resolvedOpKind} returned invalid response shape`,
),
);
}
// 12. afterOp
try {
if (config.afterOp !== undefined) {
await config.afterOp(ctx as OpContext<unknown>, resultParse.data);
}
} catch (err) {
cleanup({ release: true });
return makeErrorEnvelope(request, err);
}
// 13. Commit idempotency + emit metrics
commitIdem?.(resultParse.data);
cleanup({ release: false });
const durationMs = Date.now() - startedAt;
metrics(METRIC_OP_DURATION_MS, durationMs, { op: resolvedOpKind, result: 'ok' });
metrics(METRIC_OP_TOTAL, 1, { op: resolvedOpKind, result: 'ok' });
if (estimatedBytes > 0) {
// Inbound bytes (write) vs outbound (read) — both reuse the same
// pre-call `estimatedBytes`, since post-execution reconciliation
// would require deeper plumbing.
const direction = op === 'write' ? METRIC_BYTES_IN : op === 'read' ? METRIC_BYTES_OUT : null;
if (direction !== null) {
metrics(direction, estimatedBytes, { op: resolvedOpKind });
}
}
return makeResponseEnvelope(request, resultParse.data);
function cleanup(opts: { release: boolean }): void {
inflightCancellers.delete(request.id);
if (opts.release) {
abandonIdem?.();
rateLimiter.release(from, opCostKey, estimatedBytes);
}
}
}
function handleCancel(_from: string, cancel: RpcCancel): void {
const controller = inflightCancellers.get(cancel.id);
if (controller !== undefined) {
controller.abort(new CancelledError(cancel.reason ?? 'cancelled by sender'));
inflightCancellers.delete(cancel.id);
}
}
function destroy(): void {
for (const c of inflightCancellers.values()) {
c.abort(new CancelledError('handler destroyed'));
}
inflightCancellers.clear();
}
return Object.assign({ handleRequest, handleCancel, destroy }, {
[INTERNAL_SYMBOL]: { idempotency, rateLimiter },
});
}
export const INTERNAL_SYMBOL = Symbol.for('@shade/files/internal');
// ─── Op invoker (handles I/O adapters) ───────────────────────
interface InvokeArgs {
op: StandardOp | 'custom';
stdHandler: unknown;
customHandler: CustomOpRegistrations[string] | undefined;
ctx: OpContext<unknown>;
parsedArgs: unknown;
sender: string;
signal: AbortSignal;
streamsBridge: ServerStreamsBridge | undefined;
ioTimeoutMs: number;
}
async function invokeOpHandler(args: InvokeArgs): Promise<unknown> {
const { op, stdHandler, customHandler, ctx, parsedArgs, sender, signal, streamsBridge, ioTimeoutMs } = args;
const adapterDeps = { streamsBridge, sender, signal, ioTimeoutMs };
switch (op) {
case 'write': {
const wireArgs = parsedArgs as WriteArgs;
const { userArgs, awaitTransferDone } = await adaptWriteArgs(wireArgs, adapterDeps);
const userCtx = { ...ctx, args: userArgs } as OpContext<UserWriteArgs>;
const userResult = await (stdHandler as (c: OpContext<UserWriteArgs>) => Promise<WriteResult>)(userCtx);
await awaitTransferDone();
return userResult;
}
case 'read': {
const readArgs = parsedArgs as ReadArgs;
const userResult = await (stdHandler as (c: OpContext<ReadArgs>) => Promise<UserReadResult>)(ctx as OpContext<ReadArgs>);
return await adaptReadResult(userResult, readArgs, adapterDeps);
}
case 'getThumbnail': {
const userResult = await (stdHandler as (c: OpContext<GetThumbnailArgs>) => Promise<UserThumbnailResult>)(ctx as OpContext<GetThumbnailArgs>);
return adaptThumbnailResult(userResult);
}
case 'custom': {
if (customHandler === undefined) {
throw new NotImplementedError('custom op without registration');
}
const customArgs = parsedArgs as CustomArgs;
const innerCtx = { ...ctx, args: customArgs } as OpContext<{ name: string; payload: unknown }>;
// Pass the validated inner payload as the first arg, the OpContext as the second.
return await customHandler.handler(
customArgs.payload,
innerCtx as OpContext<{ name: string; payload: unknown }>,
);
}
default:
// Pass-through for list/stat/mkdir/delete/move.
return await (stdHandler as (c: OpContext<unknown>) => Promise<unknown>)(ctx);
}
}
// ─── Helpers ─────────────────────────────────────────────────
function makeResponseEnvelope(req: RpcRequest, result: unknown): RpcResponse {
return {
kind: responseKindOf(req.kind),
id: req.id,
result,
};
}
function makeErrorEnvelope(req: RpcRequest, err: unknown): RpcError {
return {
kind: 'shade.fs.error/v1',
id: req.id,
error: payloadFromError(err),
};
}
function estimateBytes(op: StandardOp | 'custom', args: unknown): number {
if (op === 'write') {
const w = args as { kind: 'inline' | 'streams'; bytesB64?: string; size?: number };
if (w.kind === 'inline' && typeof w.bytesB64 === 'string') {
return Math.floor((w.bytesB64.length * 3) / 4);
}
return w.size ?? 0;
}
if (op === 'read') {
const r = args as { range?: { start: number; end: number } };
if (r.range !== undefined) return r.range.end - r.range.start;
return 0;
}
return 0;
}
async function runWithTimeout<T>(
fn: () => Promise<T>,
timeoutMs: number,
controller: AbortController,
): Promise<T> {
let timer: ReturnType<typeof setTimeout> | null = null;
const timeout = new Promise<never>((_, reject) => {
timer = setTimeout(() => {
controller.abort(new Error('timeout'));
reject(new (FileError as unknown as { new (p: { code: string; message: string }): FileError })({
code: 'OPERATION_TIMEOUT',
message: `operation timed out after ${timeoutMs}ms`,
}));
}, timeoutMs);
});
try {
return await Promise.race([fn(), timeout]);
} finally {
if (timer !== null) clearTimeout(timer);
}
}