release(v4.8.5): kill flushOnce 15s success-backoff + per-recipient parallel drain
Some checks failed
Test / test (push) Has been cancelled

Prism filed a per-recipient-flush-concurrency FR pointing at
serial-per-flush. Investigation surfaced the actual culprit:
`scheduleFlush` was using a 15 s backoff on **both** the success and
failure paths, so envelopes enqueued *during* an in-flight flush
sat ~15 s behind the next drain — visible as "10 s of silence then
25-frame burst" on the receiving side under sustained sender output.

Two fixes:

1. `scheduleFlush` now uses 0 ms delay when `flushOnce` delivered
   ≥1 envelope and more is queued (network healthy → drain
   remainder immediately). 15 s reserved for the actual failure
   case where every attempt this round failed. `flushOnce` returns
   `{ delivered, remaining } | null` so concurrent-flush early
   returns don't double-schedule.

2. `flushOnce` groups the outgoing queue by `recipientAddress` and
   drains buckets via `Promise.all`. Per-peer order preserved
   (sequential within a bucket); a slow POST to recipient A no
   longer head-of-line-blocks frames bound for B.

`Inbox.tick` public shape unchanged. `OutgoingQueueStore`
implementations see the same per-entry list/remove/bumpAttempts/
size contract; only cross-recipient interleaving changes.

Tests cover (1) 25-envelope burst behind a 100 ms slow PUT drains
within 1 s, and (2) carol's PUT lands within 150 ms even when bob's
PUT stalls 200 ms.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-08 22:56:27 +02:00
parent a98ea8a1bd
commit 3c0db14904
28 changed files with 334 additions and 59 deletions

View File

@@ -247,7 +247,10 @@ export class Inbox {
* after a push-trigger arrives). Does not throw on transient errors.
*/
async tick(): Promise<{ flushed: number; received: number }> {
const flushed = await this.flushOnce();
const flushResult = await this.flushOnce();
// `null` means another flush was concurrent; report 0 newly-flushed
// for this caller (the other flush counted them).
const flushed = flushResult?.delivered ?? 0;
const received = await this.pollOnce();
return { flushed, received };
}
@@ -301,11 +304,27 @@ export class Inbox {
this.flushTimer = setTimeout(() => {
this.flushTimer = null;
this.flushOnce()
.then(() => {
// If anything is still queued, retry with backoff.
this.queueStore.size().then((n) => {
if (n > 0 && this.started) this.scheduleFlush(15_000);
});
.then((result) => {
// `result === null` means another flush was already in flight
// and this call early-returned via the `flushing` guard. The
// already-running flush will reschedule itself when it
// finishes; do not double-schedule from here.
if (result === null) return;
if (result.remaining === 0) return;
// V4.8.5 — distinguish healthy-drain-but-more-queued from
// all-attempts-failed. Pre-fix, both cases used a 15 s
// backoff. Under sustained traffic (Prism's typing-into-a-
// chatty-shell pattern), bursts of envelopes enqueued
// *during* a flush would sit ~1015 s behind the backoff
// timer before the next drain — visible to the receiver as a
// "10 s silence then 25-frame burst" wave. Healthy drain
// (delivered > 0) means the network is fine and we should
// immediately drain whatever piled up; reserve the 15 s
// retry for the case where every attempt this round failed.
if (this.started) {
const delay = result.delivered > 0 ? 0 : 15_000;
this.scheduleFlush(delay);
}
})
.catch(() => {
if (this.started) this.scheduleFlush(15_000);
@@ -324,45 +343,87 @@ export class Inbox {
}, delayMs);
}
private async flushOnce(): Promise<number> {
if (this.flushing) return 0;
/**
* Drain the outgoing queue. Returns `null` if another flush is already
* in flight (the running flush owns the rescheduling); otherwise
* returns the count of newly-delivered envelopes and the queue size
* after the drain so the caller can decide whether to immediately
* re-flush (more piled up during the drain — healthy network) or
* back off (everything failed).
*
* V4.8.5: drain is parallel-per-recipient. Each `recipientAddress`
* gets its own sequential worker (so per-peer order is preserved),
* but distinct recipients run concurrently. Pre-fix, a single slow
* POST head-of-line-blocked the entire queue — including small
* frames bound for unrelated peers. See Prism FR
* `per-recipient-flush-concurrency-v4.8.md`.
*/
private async flushOnce(): Promise<{ delivered: number; remaining: number } | null> {
if (this.flushing) return null;
this.flushing = true;
let delivered = 0;
try {
const entries = await this.queueStore.list();
// Group by recipient. Within a bucket we drain sequentially so
// per-peer message order matches enqueue order (the relay
// assigns `receivedAt` on PUT arrival; concurrent POSTs to the
// same peer would let the second arrive first and the recipient
// would observe out-of-order envelopes). Across buckets, no
// ordering guarantee exists in Shade's wire model anyway, so
// parallel drain is safe.
const buckets = new Map<string, OutgoingEntry[]>();
for (const entry of entries) {
try {
const result = await this.client.put({
recipientAddress: entry.recipientAddress,
senderSigningKey: this.options.signingPublicKey,
envelope: entry.ciphertext,
ttlSeconds: entry.ttlSeconds,
});
await this.queueStore.remove(entry.recipientAddress, entry.msgId);
delivered++;
this.events.emit('inbox.message_delivered', {
recipientAddress: entry.recipientAddress,
msgId: result.msgId,
idempotent: result.idempotent,
});
} catch (err) {
await this.queueStore.bumpAttempts(entry.recipientAddress, entry.msgId);
const attempts = entry.attempts + 1;
this.events.emit('inbox.message_failed', {
recipientAddress: entry.recipientAddress,
msgId: entry.msgId,
attempts,
error: (err as Error).message,
});
if (attempts >= this.maxAttempts) {
let bucket = buckets.get(entry.recipientAddress);
if (!bucket) {
bucket = [];
buckets.set(entry.recipientAddress, bucket);
}
bucket.push(entry);
}
const drainBucket = async (bucket: OutgoingEntry[]): Promise<number> => {
let count = 0;
for (const entry of bucket) {
try {
const result = await this.client.put({
recipientAddress: entry.recipientAddress,
senderSigningKey: this.options.signingPublicKey,
envelope: entry.ciphertext,
ttlSeconds: entry.ttlSeconds,
});
await this.queueStore.remove(entry.recipientAddress, entry.msgId);
count++;
this.events.emit('inbox.message_delivered', {
recipientAddress: entry.recipientAddress,
msgId: result.msgId,
idempotent: result.idempotent,
});
} catch (err) {
await this.queueStore.bumpAttempts(entry.recipientAddress, entry.msgId);
const attempts = entry.attempts + 1;
this.events.emit('inbox.message_failed', {
recipientAddress: entry.recipientAddress,
msgId: entry.msgId,
attempts,
error: (err as Error).message,
});
if (attempts >= this.maxAttempts) {
await this.queueStore.remove(entry.recipientAddress, entry.msgId);
}
}
}
}
return count;
};
const counts = await Promise.all(
Array.from(buckets.values(), drainBucket),
);
delivered = counts.reduce((a, b) => a + b, 0);
} finally {
this.flushing = false;
}
return delivered;
const remaining = await this.queueStore.size();
return { delivered, remaining };
}
private async pollOnce(): Promise<number> {