88 lines
2.9 KiB
TypeScript
88 lines
2.9 KiB
TypeScript
|
|
import { ValidationError } from '@shade/core';
|
||
|
|
import type { LaneInitSpec, LanePartition } from './types.js';
|
||
|
|
|
||
|
|
/** Build a range-partition plan: contiguous byte ranges, one per lane. */
|
||
|
|
export function planRangePartition(
|
||
|
|
totalBytes: number,
|
||
|
|
laneCount: number,
|
||
|
|
): LaneInitSpec[] {
|
||
|
|
if (!Number.isInteger(totalBytes) || totalBytes < 0) {
|
||
|
|
throw new ValidationError(`totalBytes must be a non-negative integer`, 'totalBytes');
|
||
|
|
}
|
||
|
|
if (!Number.isInteger(laneCount) || laneCount < 1) {
|
||
|
|
throw new ValidationError(`laneCount must be >= 1`, 'laneCount');
|
||
|
|
}
|
||
|
|
const lanes: LaneInitSpec[] = [];
|
||
|
|
// Use ceil so the first lanes get the extra byte when not evenly divisible.
|
||
|
|
// Each lane's start = previous end. Last lane's end = totalBytes.
|
||
|
|
const baseSize = Math.floor(totalBytes / laneCount);
|
||
|
|
const remainder = totalBytes - baseSize * laneCount;
|
||
|
|
let cursor = 0;
|
||
|
|
for (let i = 0; i < laneCount; i++) {
|
||
|
|
const extra = i < remainder ? 1 : 0;
|
||
|
|
const size = baseSize + extra;
|
||
|
|
const startByte = cursor;
|
||
|
|
const endByte = cursor + size;
|
||
|
|
lanes.push({
|
||
|
|
laneId: i,
|
||
|
|
partition: { kind: 'range', startByte, endByte, startChunk: 0 },
|
||
|
|
});
|
||
|
|
cursor = endByte;
|
||
|
|
}
|
||
|
|
return lanes;
|
||
|
|
}
|
||
|
|
|
||
|
|
/** Build a round-robin partition plan: chunk i goes to lane (i mod count). */
|
||
|
|
export function planRoundRobinPartition(laneCount: number): LaneInitSpec[] {
|
||
|
|
if (!Number.isInteger(laneCount) || laneCount < 1) {
|
||
|
|
throw new ValidationError(`laneCount must be >= 1`, 'laneCount');
|
||
|
|
}
|
||
|
|
const lanes: LaneInitSpec[] = [];
|
||
|
|
for (let i = 0; i < laneCount; i++) {
|
||
|
|
lanes.push({
|
||
|
|
laneId: i,
|
||
|
|
partition: { kind: 'round-robin', lane: i, count: laneCount },
|
||
|
|
});
|
||
|
|
}
|
||
|
|
return lanes;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Split an arbitrary byte range into chunkSize-sized slices.
|
||
|
|
* Returns an array of [startByte, endByte) tuples for each chunk.
|
||
|
|
*/
|
||
|
|
export function chunkRange(
|
||
|
|
startByte: number,
|
||
|
|
endByte: number,
|
||
|
|
chunkSize: number,
|
||
|
|
): Array<{ start: number; end: number }> {
|
||
|
|
if (chunkSize <= 0) {
|
||
|
|
throw new ValidationError(`chunkSize must be positive`, 'chunkSize');
|
||
|
|
}
|
||
|
|
const out: Array<{ start: number; end: number }> = [];
|
||
|
|
if (endByte === startByte) {
|
||
|
|
// Empty range: emit one empty chunk so isLast can be carried.
|
||
|
|
out.push({ start: startByte, end: startByte });
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
for (let off = startByte; off < endByte; off += chunkSize) {
|
||
|
|
out.push({ start: off, end: Math.min(off + chunkSize, endByte) });
|
||
|
|
}
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Validate that a `LanePartition` matches the global stream parameters.
|
||
|
|
* Used by the receiver to detect partition-mismatch on resume.
|
||
|
|
*/
|
||
|
|
export function partitionsEqual(a: LanePartition, b: LanePartition): boolean {
|
||
|
|
if (a.kind !== b.kind) return false;
|
||
|
|
if (a.kind === 'range' && b.kind === 'range') {
|
||
|
|
return a.startByte === b.startByte && a.endByte === b.endByte && a.startChunk === b.startChunk;
|
||
|
|
}
|
||
|
|
if (a.kind === 'round-robin' && b.kind === 'round-robin') {
|
||
|
|
return a.lane === b.lane && a.count === b.count;
|
||
|
|
}
|
||
|
|
return false;
|
||
|
|
}
|