Files
Shade/packages/shade-inbox/tests/client.test.ts

476 lines
16 KiB
TypeScript
Raw Normal View History

import { describe, test, expect } from 'bun:test';
import { Inbox, InboxClient, computeMsgId, MemoryOutgoingQueueStore } from '../src/index.js';
import {
createInboxServer,
MemoryInboxStore,
} from '@shade/inbox-server';
import { SubtleCryptoProvider } from '@shade/crypto-web';
import { generateIdentityKeyPair } from '@shade/core';
import type { Hono } from 'hono';
const crypto = new SubtleCryptoProvider();
async function makeIdentity() {
return generateIdentityKeyPair(crypto);
}
function randBytes(n: number): Uint8Array {
const buf = new Uint8Array(n);
globalThis.crypto.getRandomValues(buf);
return buf;
}
/**
* Wrap a Hono app as a fetch implementation. Strips the protocol/host so
* `app.request(path, init)` works.
*/
function honoFetch(app: Hono): typeof fetch {
return (async (input: RequestInfo | URL, init?: RequestInit) => {
const url = typeof input === 'string' ? input : input instanceof URL ? input.toString() : input.url;
const path = url.startsWith('http://localhost') ? url.slice('http://localhost'.length) : url;
return app.request(path, init);
}) as typeof fetch;
}
describe('InboxClient', () => {
test('register + put + fetch + ack roundtrip', async () => {
const store = new MemoryInboxStore();
const app = createInboxServer({ crypto, store, disableRateLimit: true });
const bob = await makeIdentity();
const alice = await makeIdentity();
const bobClient = new InboxClient({
baseUrl: 'http://localhost',
crypto,
signingPrivateKey: bob.signingPrivateKey,
fetch: honoFetch(app),
});
const aliceClient = new InboxClient({
baseUrl: 'http://localhost',
crypto,
signingPrivateKey: alice.signingPrivateKey,
fetch: honoFetch(app),
});
await bobClient.register({ address: 'bob', signingKey: bob.signingPublicKey });
const ct = randBytes(64);
const msgId = await computeMsgId(ct);
const result = await aliceClient.put({
recipientAddress: 'bob',
senderSigningKey: alice.signingPublicKey,
envelope: ct,
});
expect(result.msgId).toBe(msgId);
expect(result.idempotent).toBe(false);
const second = await aliceClient.put({
recipientAddress: 'bob',
senderSigningKey: alice.signingPublicKey,
envelope: ct,
});
expect(second.idempotent).toBe(true);
const fetched = await bobClient.fetch({ address: 'bob' });
expect(fetched.blobs.length).toBe(1);
expect(fetched.blobs[0]!.msgId).toBe(msgId);
expect(fetched.blobs[0]!.ciphertext).toEqual(ct);
const acked = await bobClient.ack({ address: 'bob', msgId });
expect(acked).toBe(true);
const second2 = await bobClient.fetch({ address: 'bob' });
expect(second2.blobs.length).toBe(0);
});
});
describe('Inbox orchestrator', () => {
test('queue → flush → server-side blob shows up', async () => {
const store = new MemoryInboxStore();
const app = createInboxServer({ crypto, store, disableRateLimit: true });
const bob = await makeIdentity();
const alice = await makeIdentity();
const aliceInbox = new Inbox({
baseUrl: 'http://localhost',
ownAddress: 'alice',
crypto,
signingPrivateKey: alice.signingPrivateKey,
signingPublicKey: alice.signingPublicKey,
pollIntervalMs: 0,
fetch: honoFetch(app),
});
const bobInbox = new Inbox({
baseUrl: 'http://localhost',
ownAddress: 'bob',
crypto,
signingPrivateKey: bob.signingPrivateKey,
signingPublicKey: bob.signingPublicKey,
pollIntervalMs: 0,
fetch: honoFetch(app),
});
// Bob registers so he can receive.
await bobInbox.register();
// Alice queues a message.
const ct = randBytes(64);
const msgId = await aliceInbox.send({ recipientAddress: 'bob', envelope: ct });
expect(await aliceInbox.pendingCount()).toBe(1);
// Alice ticks: flushes + (no incoming because no handler).
await aliceInbox.tick();
expect(await aliceInbox.pendingCount()).toBe(0);
// Bob ticks: should see the blob via incoming handler.
let received: { msgId: string; bytes: number } | null = null;
bobInbox.onIncoming(async (raw) => {
received = { msgId: raw.msgId, bytes: raw.ciphertext.length };
return 'alice';
});
const result = await bobInbox.tick();
expect(result.received).toBe(1);
expect(received).not.toBeNull();
expect(received!.msgId).toBe(msgId);
expect(received!.bytes).toBe(ct.length);
// No re-delivery on second tick (cursor advanced + ack performed).
const r2 = await bobInbox.tick();
expect(r2.received).toBe(0);
});
test('onMessageQueued hook fires for each enqueue', async () => {
const store = new MemoryInboxStore();
const app = createInboxServer({ crypto, store, disableRateLimit: true });
const alice = await makeIdentity();
const inbox = new Inbox({
baseUrl: 'http://localhost',
ownAddress: 'alice',
crypto,
signingPrivateKey: alice.signingPrivateKey,
signingPublicKey: alice.signingPublicKey,
pollIntervalMs: 0,
fetch: honoFetch(app),
});
const seen: Array<{ to: string; msgId: string }> = [];
inbox.onMessageQueued((to, msgId) => {
seen.push({ to, msgId });
});
await inbox.send({ recipientAddress: 'bob', envelope: randBytes(10) });
await inbox.send({ recipientAddress: 'carol', envelope: randBytes(20) });
// Wait for the (sync) hook to flush.
await new Promise((r) => setTimeout(r, 5));
expect(seen.length).toBe(2);
expect(seen[0]!.to).toBe('bob');
expect(seen[1]!.to).toBe('carol');
});
test('flush retries on transient server failure', async () => {
const store = new MemoryInboxStore();
const app = createInboxServer({ crypto, store, disableRateLimit: true });
const alice = await makeIdentity();
const bob = await makeIdentity();
// Register bob via direct API.
const bobClient = new InboxClient({
baseUrl: 'http://localhost',
crypto,
signingPrivateKey: bob.signingPrivateKey,
fetch: honoFetch(app),
});
await bobClient.register({ address: 'bob', signingKey: bob.signingPublicKey });
// Wrap fetch so first PUT fails, subsequent succeed.
let failsLeft = 1;
const flakyFetch: typeof fetch = (async (input, init) => {
const m = (init as RequestInit | undefined)?.method ?? 'GET';
const u = typeof input === 'string' ? input : input instanceof URL ? input.toString() : (input as Request).url;
if (m === 'POST' && u.includes('/v1/inbox/bob') && !u.includes('/fetch') && failsLeft > 0) {
failsLeft--;
throw new Error('transient network');
}
return honoFetch(app)(input, init);
}) as typeof fetch;
const aliceInbox = new Inbox({
baseUrl: 'http://localhost',
ownAddress: 'alice',
crypto,
signingPrivateKey: alice.signingPrivateKey,
signingPublicKey: alice.signingPublicKey,
pollIntervalMs: 0,
fetch: flakyFetch,
queueStore: new MemoryOutgoingQueueStore(),
});
await aliceInbox.send({ recipientAddress: 'bob', envelope: randBytes(40) });
// First flush fails.
await aliceInbox.tick();
expect(await aliceInbox.pendingCount()).toBe(1);
// Second flush succeeds.
await aliceInbox.tick();
expect(await aliceInbox.pendingCount()).toBe(0);
});
});
describe('tamper detection', () => {
test('client rejects blob whose msgId does not match recomputed hash', async () => {
const store = new MemoryInboxStore();
const app = createInboxServer({ crypto, store, disableRateLimit: true });
const bob = await makeIdentity();
const alice = await makeIdentity();
// Register Bob.
const bobClient = new InboxClient({
baseUrl: 'http://localhost',
crypto,
signingPrivateKey: bob.signingPrivateKey,
fetch: honoFetch(app),
});
await bobClient.register({ address: 'bob', signingKey: bob.signingPublicKey });
// Alice puts a real blob.
const ct = randBytes(64);
const aliceClient = new InboxClient({
baseUrl: 'http://localhost',
crypto,
signingPrivateKey: alice.signingPrivateKey,
fetch: honoFetch(app),
});
await aliceClient.put({
recipientAddress: 'bob',
senderSigningKey: alice.signingPublicKey,
envelope: ct,
});
// Tamper: flip a byte in the in-memory store.
const list: any = (store as any).blobs.get('bob');
list[0].ciphertext[0] ^= 0xff;
const bobInbox = new Inbox({
baseUrl: 'http://localhost',
ownAddress: 'bob',
crypto,
signingPrivateKey: bob.signingPrivateKey,
signingPublicKey: bob.signingPublicKey,
pollIntervalMs: 0,
fetch: honoFetch(app),
});
let decryptCalls = 0;
let failures = 0;
bobInbox.onIncoming(() => {
decryptCalls++;
return null;
});
bobInbox.on((e) => {
if (e.name === 'inbox.message_decrypt_failed') failures++;
});
const result = await bobInbox.tick();
// Tampered blob: handler must NOT be called; decrypt-failed event fires.
expect(decryptCalls).toBe(0);
expect(failures).toBeGreaterThan(0);
expect(result.received).toBe(0);
});
});
describe('InboxClient — default fetch is bound to globalThis', () => {
// Regression: browsers' `fetch` is a WebIDL bound operation that throws
// "Illegal invocation" when called as a method on another object. The
// class stores `fetchImpl` and calls `this.fetchImpl(...)`, which strips
// the Window receiver. Constructor must `bind(globalThis)`.
test('default path passes globalThis as `this` (no Illegal invocation)', async () => {
const realFetch = globalThis.fetch;
let observedReceiver: unknown = 'unset';
function strictFetch(this: unknown, _input: unknown, _init?: unknown): Promise<Response> {
observedReceiver = this;
if (this !== globalThis) {
throw new TypeError("Failed to execute 'fetch' on 'Window': Illegal invocation");
}
return Promise.resolve(
new Response('{}', {
status: 200,
headers: { 'content-type': 'application/json' },
}),
);
}
Object.defineProperty(globalThis, 'fetch', {
configurable: true,
writable: true,
value: strictFetch,
});
try {
const id = await makeIdentity();
const client = new InboxClient({
baseUrl: 'http://example.invalid',
crypto,
signingPrivateKey: id.signingPrivateKey,
// No `fetch` override on purpose — this exercises the default path.
});
await client.register({ address: 'whoever', signingKey: id.signingPublicKey });
expect(observedReceiver).toBe(globalThis);
} finally {
Object.defineProperty(globalThis, 'fetch', {
configurable: true,
writable: true,
value: realFetch,
});
}
});
});
describe('Inbox.start() — fresh-address register/poll race (V4.8)', () => {
// Regression: pre-4.8 `start()` called `register()` fire-and-forget AND
// `schedulePoll(0)` synchronously, so the first poll often beat the
// register HTTP RTT and got SHADE_NOT_FOUND on a fresh address. Fix:
// start() defers the first poll; register() success kicks it.
test('fresh address: no fetch fires before register completes', async () => {
const store = new MemoryInboxStore();
const app = createInboxServer({ crypto, store, disableRateLimit: true });
const alice = await makeIdentity();
// Order observed by the server: must be register-then-fetch, never
// fetch-then-register.
const calls: Array<'register' | 'fetch' | 'put'> = [];
let registerArrived = false;
const recordingFetch: typeof fetch = (async (input, init) => {
const u =
typeof input === 'string'
? input
: input instanceof URL
? input.toString()
: (input as Request).url;
if (u.includes('/v1/inbox/register')) {
calls.push('register');
// Hold register for a tick to widen the race window.
await new Promise((r) => setTimeout(r, 25));
registerArrived = true;
} else if (u.endsWith('/fetch')) {
// Any fetch arriving before register is the race we're guarding
// against.
if (!registerArrived) {
throw new Error('fetch fired before register completed (race not fixed)');
}
calls.push('fetch');
} else if (u.includes('/v1/inbox/')) {
calls.push('put');
}
return honoFetch(app)(input, init);
}) as typeof fetch;
const inbox = new Inbox({
baseUrl: 'http://localhost',
ownAddress: 'alice',
crypto,
signingPrivateKey: alice.signingPrivateKey,
signingPublicKey: alice.signingPublicKey,
pollIntervalMs: 30_000, // Long enough that only register's kick triggers.
fetch: recordingFetch,
});
inbox.onIncoming(() => null);
inbox.start();
// Wait until register has completed and the success-kick poll lands.
await new Promise((r) => setTimeout(r, 100));
inbox.stop();
expect(calls[0]).toBe('register');
// First fetch (if any) must be after register.
const firstFetchIdx = calls.indexOf('fetch');
if (firstFetchIdx !== -1) {
expect(firstFetchIdx).toBeGreaterThan(calls.indexOf('register'));
}
});
});
describe('FetchedBlob.from — relay-supplied sender fingerprint (V4.8)', () => {
test('inbox-fetch response carries from = 8-byte hex of SHA-256(senderSigningKey)', async () => {
const store = new MemoryInboxStore();
const app = createInboxServer({ crypto, store, disableRateLimit: true });
const bob = await makeIdentity();
const alice = await makeIdentity();
const bobClient = new InboxClient({
baseUrl: 'http://localhost',
crypto,
signingPrivateKey: bob.signingPrivateKey,
fetch: honoFetch(app),
});
const aliceClient = new InboxClient({
baseUrl: 'http://localhost',
crypto,
signingPrivateKey: alice.signingPrivateKey,
fetch: honoFetch(app),
});
await bobClient.register({ address: 'bob', signingKey: bob.signingPublicKey });
await aliceClient.put({
recipientAddress: 'bob',
senderSigningKey: alice.signingPublicKey,
envelope: randBytes(64),
});
const fetched = await bobClient.fetch({ address: 'bob' });
expect(fetched.blobs.length).toBe(1);
const fp = fetched.blobs[0]!.from;
expect(fp).toBeDefined();
expect(fp).toMatch(/^[0-9a-f]{16}$/);
// Must be reproducible: SHA-256(alice.signingPublicKey) → first 8 bytes hex.
const digest = await globalThis.crypto.subtle.digest(
'SHA-256',
alice.signingPublicKey as unknown as ArrayBuffer,
);
const expected = Array.from(new Uint8Array(digest).slice(0, 8), (b) =>
b.toString(16).padStart(2, '0'),
).join('');
expect(fp).toBe(expected);
});
test('DecryptHandler raw arg propagates from to the app', async () => {
const store = new MemoryInboxStore();
const app = createInboxServer({ crypto, store, disableRateLimit: true });
const bob = await makeIdentity();
const alice = await makeIdentity();
const aliceClient = new InboxClient({
baseUrl: 'http://localhost',
crypto,
signingPrivateKey: alice.signingPrivateKey,
fetch: honoFetch(app),
});
const bobInbox = new Inbox({
baseUrl: 'http://localhost',
ownAddress: 'bob',
crypto,
signingPrivateKey: bob.signingPrivateKey,
signingPublicKey: bob.signingPublicKey,
pollIntervalMs: 0,
fetch: honoFetch(app),
});
await bobInbox.register();
await aliceClient.put({
recipientAddress: 'bob',
senderSigningKey: alice.signingPublicKey,
envelope: randBytes(40),
});
let observed: string | undefined = undefined;
bobInbox.onIncoming((raw) => {
observed = raw.from;
return null;
});
await bobInbox.tick();
expect(observed).toMatch(/^[0-9a-f]{16}$/);
});
});