From b014f9b44cd5d93c612bb88feffbb5111d74bef3 Mon Sep 17 00:00:00 2001 From: Sterister Date: Fri, 10 Apr 2026 18:49:51 +0200 Subject: [PATCH] =?UTF-8?q?feat(observer):=20M-Obs=201-3=20=E2=80=94=20eve?= =?UTF-8?q?nt=20bus,=20server=20hooks,=20observer=20backend?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit M-Obs 1: Event bus in @shade/core - ShadeEventEmitter with typed event union, ring buffer for replay - 12 event types covering session lifecycle, ratchet operations, prekey changes, identity rotation, trust changes - Wired into ShadeSessionManager (zero overhead when not enabled) - shortHash helper for safe display of public keys - Security test: regex-checks event payloads contain no key material M-Obs 2: Prekey server event hooks - PrekeyServerEvents emitter mirroring core's pattern - 5 server event types: registered, fetched, replenished, deleted, rate_limited - Wired into all routes including the rate-limit error handler - shortHash helper using crypto.subtle directly (no provider dep) M-Obs 3: @shade/observer package - StateAggregator subscribes to client + server events, builds rolling snapshot - Hono routes: GET /api/state (snapshot), GET /api/events (SSE stream) - Bearer token auth via SHADE_OBSERVER_TOKEN, query string for SSE - Refuses to start with token < 16 chars (ConfigurationError) - Static file serving for bundled dashboard at /dashboard/ - Placeholder dashboard renders when no built SPA present 220 tests passing, 0 failures. Co-Authored-By: Claude Opus 4.6 (1M context) --- bun.lock | 14 ++ packages/shade-core/src/events.ts | 130 +++++++++++ packages/shade-core/src/index.ts | 1 + packages/shade-core/src/session.ts | 93 +++++++- packages/shade-core/tests/events.test.ts | 190 ++++++++++++++++ packages/shade-observer/package.json | 15 ++ packages/shade-observer/src/auth.ts | 47 ++++ packages/shade-observer/src/index.ts | 48 ++++ packages/shade-observer/src/routes.ts | 105 +++++++++ packages/shade-observer/src/state.ts | 205 ++++++++++++++++++ packages/shade-observer/src/static.ts | 90 ++++++++ .../shade-observer/tests/observer.test.ts | 154 +++++++++++++ packages/shade-observer/tsconfig.json | 5 + packages/shade-server/src/events.ts | 92 ++++++++ packages/shade-server/src/index.ts | 9 +- packages/shade-server/src/routes.ts | 29 ++- packages/shade-server/tests/events.test.ts | 142 ++++++++++++ 17 files changed, 1364 insertions(+), 5 deletions(-) create mode 100644 packages/shade-core/src/events.ts create mode 100644 packages/shade-core/tests/events.test.ts create mode 100644 packages/shade-observer/package.json create mode 100644 packages/shade-observer/src/auth.ts create mode 100644 packages/shade-observer/src/index.ts create mode 100644 packages/shade-observer/src/routes.ts create mode 100644 packages/shade-observer/src/state.ts create mode 100644 packages/shade-observer/src/static.ts create mode 100644 packages/shade-observer/tests/observer.test.ts create mode 100644 packages/shade-observer/tsconfig.json create mode 100644 packages/shade-server/src/events.ts create mode 100644 packages/shade-server/tests/events.test.ts diff --git a/bun.lock b/bun.lock index 63cbc61..7786a7e 100644 --- a/bun.lock +++ b/bun.lock @@ -28,6 +28,18 @@ "@shade/core": "workspace:*", }, }, + "packages/shade-observer": { + "name": "@shade/observer", + "version": "0.1.0", + "dependencies": { + "@shade/core": "workspace:*", + "@shade/server": "workspace:*", + "hono": "^4.12.12", + }, + "devDependencies": { + "@shade/crypto-web": "workspace:*", + }, + }, "packages/shade-proto": { "name": "@shade/proto", "version": "0.1.0", @@ -88,6 +100,8 @@ "@shade/crypto-web": ["@shade/crypto-web@workspace:packages/shade-crypto-web"], + "@shade/observer": ["@shade/observer@workspace:packages/shade-observer"], + "@shade/proto": ["@shade/proto@workspace:packages/shade-proto"], "@shade/server": ["@shade/server@workspace:packages/shade-server"], diff --git a/packages/shade-core/src/events.ts b/packages/shade-core/src/events.ts new file mode 100644 index 0000000..388753a --- /dev/null +++ b/packages/shade-core/src/events.ts @@ -0,0 +1,130 @@ +import type { CryptoProvider } from './crypto.js'; + +/** + * Shade event bus. + * + * Emits structural events for observability — NEVER plaintext, private keys, + * nonces, or other secret material. Identity references are SHA-256 truncated + * to 8 bytes (16 hex chars) for display only. + * + * Optional: pass a ShadeEventEmitter to ShadeSessionManager to enable. + * If not passed, all emits are no-ops with zero overhead. + */ + +// ─── Event payload types ────────────────────────────────────── + +export interface ShadeEventBase { + /** Monotonic sequence number assigned at emit time */ + seq: number; + /** Wall-clock timestamp in milliseconds */ + timestamp: number; +} + +/** Map of event names to their payload shape (without seq/timestamp) */ +export interface ShadeEventMap { + 'identity.initialized': { fingerprint: string; registrationId: number }; + 'identity.rotated': { newFingerprint: string }; + 'session.created': { address: string; remoteIdentityKeyHash: string }; + 'session.removed': { address: string }; + 'message.encrypted': { address: string; counter: number; ciphertextSize: number }; + 'message.decrypted': { address: string; counter: number; plaintextSize: number }; + 'ratchet.dh_step': { address: string }; + 'prekey.generated': { count: number; totalAfter: number }; + 'prekey.consumed': { keyId: number }; + 'signed_prekey.rotated': { oldKeyId: number; newKeyId: number }; + 'trust.pinned': { address: string; identityKeyHash: string }; + 'trust.changed': { address: string; oldKeyHash: string; newKeyHash: string }; +} + +export type ShadeEventName = keyof ShadeEventMap; + +export type ShadeEvent = { + [K in ShadeEventName]: ShadeEventBase & { name: K; data: ShadeEventMap[K] }; +}[ShadeEventName]; + +export type ShadeEventListener = (event: ShadeEvent) => void; + +// ─── EventEmitter implementation ───────────────────────────── + +/** + * Minimal typed event emitter for Shade observability. + * + * Supports subscribe (`on`), unsubscribe (`off`), and replay buffer + * for late subscribers. + */ +export class ShadeEventEmitter { + private listeners = new Set(); + private nextSeq = 1; + private buffer: ShadeEvent[] = []; + private readonly maxBuffer: number; + + constructor(options: { bufferSize?: number } = {}) { + this.maxBuffer = options.bufferSize ?? 1000; + } + + /** Subscribe to all events. Returns an unsubscribe function. */ + on(listener: ShadeEventListener): () => void { + this.listeners.add(listener); + return () => this.listeners.delete(listener); + } + + off(listener: ShadeEventListener): void { + this.listeners.delete(listener); + } + + /** Emit a typed event. Adds seq + timestamp automatically. */ + emit(name: K, data: ShadeEventMap[K]): void { + const event = { + seq: this.nextSeq++, + timestamp: Date.now(), + name, + data, + } as ShadeEvent; + + // Add to ring buffer + this.buffer.push(event); + if (this.buffer.length > this.maxBuffer) { + this.buffer.shift(); + } + + // Notify listeners (catching throws so one bad listener doesn't break others) + for (const listener of this.listeners) { + try { + listener(event); + } catch (err) { + console.error('[Shade] Event listener threw:', err); + } + } + } + + /** Get all buffered events with seq > since (for SSE replay/reconnect) */ + getBufferedSince(since: number): ShadeEvent[] { + return this.buffer.filter((e) => e.seq > since); + } + + /** Get the most recent N events */ + getRecent(n: number): ShadeEvent[] { + return this.buffer.slice(-n); + } + + /** Current sequence number (next event will use this + 1) */ + get currentSeq(): number { + return this.nextSeq - 1; + } +} + +// ─── Hash helper for safe display ──────────────────────────── + +/** + * Compute a short, display-safe hash of a public key. + * Uses HKDF-SHA256 (since CryptoProvider has it) to produce 8 bytes, + * then formats as 16 hex characters. + * + * NEVER use this for security decisions — it's lossy and only for UI display. + */ +export async function shortHash(crypto: CryptoProvider, key: Uint8Array): Promise { + const salt = new Uint8Array(32); + const info = new TextEncoder().encode('ShadeShortHash'); + const hash = await crypto.hkdf(key, salt, info, 8); + return Array.from(hash, (b) => b.toString(16).padStart(2, '0')).join(''); +} diff --git a/packages/shade-core/src/index.ts b/packages/shade-core/src/index.ts index dbb461d..195d9aa 100644 --- a/packages/shade-core/src/index.ts +++ b/packages/shade-core/src/index.ts @@ -8,3 +8,4 @@ export * from './ratchet.js'; export { ShadeSessionManager, GRACE_PERIOD_MS } from './session.js'; export * from './serialization.js'; export * from './fingerprint.js'; +export * from './events.js'; diff --git a/packages/shade-core/src/session.ts b/packages/shade-core/src/session.ts index 2f29959..9c3c69b 100644 --- a/packages/shade-core/src/session.ts +++ b/packages/shade-core/src/session.ts @@ -26,6 +26,7 @@ import { import { NoSessionError, UntrustedIdentityError } from './errors.js'; import { computeFingerprint, shortFingerprint } from './fingerprint.js'; import { constantTimeEqual } from './crypto.js'; +import { ShadeEventEmitter, shortHash } from './events.js'; const enc = new TextEncoder(); const dec = new TextDecoder(); @@ -59,11 +60,20 @@ export class ShadeSessionManager { private identity: IdentityKeyPair | null = null; private registrationId: number = 0; private currentSignedPreKeyId: number = 0; + private readonly events?: ShadeEventEmitter; constructor( private readonly crypto: CryptoProvider, private readonly storage: StorageProvider, - ) {} + options: { events?: ShadeEventEmitter } = {}, + ) { + this.events = options.events; + } + + /** Get the event emitter (if observability is enabled) */ + getEvents(): ShadeEventEmitter | undefined { + return this.events; + } // ─── Initialization ──────────────────────────────────────── @@ -95,6 +105,15 @@ export class ShadeSessionManager { } else { this.currentSignedPreKeyId = spk.keyId; } + + // Emit identity initialization event + if (this.events) { + const fingerprint = await this.getIdentityFingerprint(); + this.events.emit('identity.initialized', { + fingerprint, + registrationId: this.registrationId, + }); + } } /** Get our identity's DH public key (for addressing) */ @@ -168,6 +187,7 @@ export class ShadeSessionManager { */ async resetSession(address: string): Promise { await this.storage.removeSession(address); + this.events?.emit('session.removed', { address }); // Note: we keep the trusted identity; new session will verify against it. } @@ -177,9 +197,16 @@ export class ShadeSessionManager { * After this, any pinned trust for this address is replaced. */ async acceptIdentityChange(address: string, newIdentityKey: Uint8Array): Promise { + // Capture old hash for the trust.changed event (TOFU semantics make this messy + // because isTrustedIdentity() compares not retrieves; we just emit the new hash) await this.storage.saveTrustedIdentity(address, newIdentityKey); - // Also reset the session so the next message triggers a fresh X3DH await this.storage.removeSession(address); + + if (this.events) { + const newHash = await shortHash(this.crypto, newIdentityKey); + this.events.emit('trust.changed', { address, oldKeyHash: '?', newKeyHash: newHash }); + this.events.emit('session.removed', { address }); + } } /** @@ -211,17 +238,23 @@ export class ShadeSessionManager { for (const key of keys) { await this.storage.saveOneTimePreKey(key); } + this.events?.emit('prekey.generated', { + count, + totalAfter: existingCount + count, + }); return keys; } /** Rotate the signed prekey (recommended: every 1-7 days) */ async rotateSignedPreKey(): Promise { if (!this.identity) throw new Error('Not initialized'); - const newId = this.currentSignedPreKeyId + 1; + const oldId = this.currentSignedPreKeyId; + const newId = oldId + 1; const spk = await generateSignedPreKey(this.crypto, this.identity, newId); await this.storage.saveSignedPreKey(spk); // Keep old one for a grace period (sessions may still reference it) this.currentSignedPreKeyId = newId; + this.events?.emit('signed_prekey.rotated', { oldKeyId: oldId, newKeyId: newId }); return spk; } @@ -261,6 +294,11 @@ export class ShadeSessionManager { await this.storage.saveSignedPreKey(spk); this.currentSignedPreKeyId = newSpkId; + if (this.events) { + const newFingerprint = await this.getIdentityFingerprint(); + this.events.emit('identity.rotated', { newFingerprint }); + } + // Return a fresh bundle for re-publication return createPreKeyBundle(this.registrationId, this.identity, spk); } @@ -313,6 +351,12 @@ export class ShadeSessionManager { registrationId: this.registrationId, }; await this.storage.saveSession(address, session); + + if (this.events) { + const remoteHash = await shortHash(this.crypto, x3dhResult.remoteIdentityKey); + this.events.emit('session.created', { address, remoteIdentityKeyHash: remoteHash }); + this.events.emit('trust.pinned', { address, identityKeyHash: remoteHash }); + } } // ─── Encrypt / Decrypt ───────────────────────────────────── @@ -329,6 +373,12 @@ export class ShadeSessionManager { const ratchetMsg = await ratchetEncrypt(this.crypto, session, enc.encode(plaintext)); + this.events?.emit('message.encrypted', { + address, + counter: ratchetMsg.counter, + ciphertextSize: ratchetMsg.ciphertext.length, + }); + // Check if this is the first message (X3DH metadata attached) const x3dh = (session as any).__x3dh; if (x3dh) { @@ -390,6 +440,20 @@ export class ShadeSessionManager { await this.storage.saveSession(address, session); await this.storage.saveTrustedIdentity(address, x3dhResult.remoteIdentityKey); + if (this.events) { + const remoteHash = await shortHash(this.crypto, x3dhResult.remoteIdentityKey); + this.events.emit('session.created', { address, remoteIdentityKeyHash: remoteHash }); + this.events.emit('trust.pinned', { address, identityKeyHash: remoteHash }); + if (message.preKeyId != null) { + this.events.emit('prekey.consumed', { keyId: message.preKeyId }); + } + this.events.emit('message.decrypted', { + address, + counter: x3dhResult.initialMessage.counter, + plaintextSize: plaintext.length, + }); + } + return dec.decode(plaintext); } @@ -397,9 +461,32 @@ export class ShadeSessionManager { const session = await this.storage.getSession(address); if (!session) throw new NoSessionError(address); + // Detect DH ratchet step (new remote DH key) + const willRatchet = !session.dhReceive || + !arraysEqual(message.dhPublicKey, session.dhReceive); + const plaintext = await ratchetDecrypt(this.crypto, session, message); await this.storage.saveSession(address, session); + if (this.events) { + if (willRatchet) { + this.events.emit('ratchet.dh_step', { address }); + } + this.events.emit('message.decrypted', { + address, + counter: message.counter, + plaintextSize: plaintext.length, + }); + } + return dec.decode(plaintext); } } + +function arraysEqual(a: Uint8Array, b: Uint8Array): boolean { + if (a.length !== b.length) return false; + for (let i = 0; i < a.length; i++) { + if (a[i] !== b[i]) return false; + } + return true; +} diff --git a/packages/shade-core/tests/events.test.ts b/packages/shade-core/tests/events.test.ts new file mode 100644 index 0000000..11fdaf4 --- /dev/null +++ b/packages/shade-core/tests/events.test.ts @@ -0,0 +1,190 @@ +import { describe, test, expect } from 'bun:test'; +import { SubtleCryptoProvider, MemoryStorage } from '@shade/crypto-web'; +import { + ShadeSessionManager, + ShadeEventEmitter, + shortHash, +} from '../src/index.js'; +import type { ShadeEvent } from '../src/index.js'; + +const crypto = new SubtleCryptoProvider(); + +describe('ShadeEventEmitter', () => { + test('subscribes and emits events', () => { + const emitter = new ShadeEventEmitter(); + const received: ShadeEvent[] = []; + emitter.on((e) => received.push(e)); + + emitter.emit('identity.initialized', { fingerprint: 'abc', registrationId: 1 }); + + expect(received.length).toBe(1); + expect(received[0]!.name).toBe('identity.initialized'); + expect(received[0]!.seq).toBe(1); + expect(received[0]!.timestamp).toBeGreaterThan(0); + expect((received[0]!.data as any).fingerprint).toBe('abc'); + }); + + test('seq is monotonically increasing', () => { + const emitter = new ShadeEventEmitter(); + const seqs: number[] = []; + emitter.on((e) => seqs.push(e.seq)); + + emitter.emit('prekey.generated', { count: 5, totalAfter: 5 }); + emitter.emit('prekey.consumed', { keyId: 1 }); + emitter.emit('prekey.consumed', { keyId: 2 }); + + expect(seqs).toEqual([1, 2, 3]); + }); + + test('unsubscribe stops receiving events', () => { + const emitter = new ShadeEventEmitter(); + let count = 0; + const unsub = emitter.on(() => count++); + emitter.emit('prekey.generated', { count: 1, totalAfter: 1 }); + unsub(); + emitter.emit('prekey.generated', { count: 1, totalAfter: 2 }); + expect(count).toBe(1); + }); + + test('listener throw does not break other listeners', () => { + const emitter = new ShadeEventEmitter(); + let goodCount = 0; + emitter.on(() => { throw new Error('boom'); }); + emitter.on(() => goodCount++); + emitter.emit('prekey.generated', { count: 1, totalAfter: 1 }); + expect(goodCount).toBe(1); + }); + + test('getBufferedSince returns events after seq', () => { + const emitter = new ShadeEventEmitter(); + emitter.emit('prekey.generated', { count: 1, totalAfter: 1 }); + emitter.emit('prekey.generated', { count: 1, totalAfter: 2 }); + emitter.emit('prekey.generated', { count: 1, totalAfter: 3 }); + const events = emitter.getBufferedSince(1); + expect(events.length).toBe(2); + expect(events[0]!.seq).toBe(2); + expect(events[1]!.seq).toBe(3); + }); + + test('ring buffer evicts oldest', () => { + const emitter = new ShadeEventEmitter({ bufferSize: 3 }); + for (let i = 0; i < 5; i++) { + emitter.emit('prekey.generated', { count: 1, totalAfter: i }); + } + const recent = emitter.getRecent(10); + expect(recent.length).toBe(3); + expect(recent[0]!.seq).toBe(3); + expect(recent[2]!.seq).toBe(5); + }); +}); + +describe('shortHash helper', () => { + test('produces 16-hex-char string', async () => { + const hash = await shortHash(crypto, crypto.randomBytes(32)); + expect(hash).toMatch(/^[0-9a-f]{16}$/); + }); + + test('deterministic for same input', async () => { + const key = new Uint8Array(32).fill(0xab); + const a = await shortHash(crypto, key); + const b = await shortHash(crypto, key); + expect(a).toBe(b); + }); + + test('different inputs produce different hashes', async () => { + const a = await shortHash(crypto, crypto.randomBytes(32)); + const b = await shortHash(crypto, crypto.randomBytes(32)); + expect(a).not.toBe(b); + }); +}); + +describe('ShadeSessionManager event integration', () => { + test('initialize emits identity.initialized', async () => { + const events = new ShadeEventEmitter(); + const received: ShadeEvent[] = []; + events.on((e) => received.push(e)); + + const mgr = new ShadeSessionManager(crypto, new MemoryStorage(), { events }); + await mgr.initialize(); + + const init = received.find((e) => e.name === 'identity.initialized'); + expect(init).toBeDefined(); + const data = init!.data as any; + expect(data.fingerprint).toMatch(/^\d{5}( \d{5}){11}$/); + expect(data.registrationId).toBeGreaterThan(0); + }); + + test('full conversation emits expected event sequence', async () => { + const events = new ShadeEventEmitter(); + const received: ShadeEvent[] = []; + events.on((e) => received.push(e)); + + const alice = new ShadeSessionManager(crypto, new MemoryStorage(), { events }); + const bob = new ShadeSessionManager(crypto, new MemoryStorage(), { events }); + await alice.initialize(); + await bob.initialize(); + + const otpks = await bob.generateOneTimePreKeys(5); + const bundle = await bob.createPreKeyBundle(); + bundle.oneTimePreKey = { keyId: otpks[0].keyId, publicKey: otpks[0].keyPair.publicKey }; + await alice.initSessionFromBundle('bob', bundle); + + const env1 = await alice.encrypt('bob', 'hello'); + await bob.decrypt('alice', env1); + const env2 = await bob.encrypt('alice', 'hi'); + await alice.decrypt('bob', env2); + + const names = received.map((e) => e.name); + expect(names).toContain('identity.initialized'); + expect(names).toContain('prekey.generated'); + expect(names).toContain('session.created'); + expect(names).toContain('trust.pinned'); + expect(names).toContain('message.encrypted'); + expect(names).toContain('message.decrypted'); + expect(names).toContain('ratchet.dh_step'); // Bob's reply triggers a DH step + }); + + test('no events emitted when emitter not provided', async () => { + const mgr = new ShadeSessionManager(crypto, new MemoryStorage()); + await mgr.initialize(); + // No assertion needed — should not throw or error + }); + + test('SECURITY: no key material in event payloads', async () => { + const events = new ShadeEventEmitter(); + const received: ShadeEvent[] = []; + events.on((e) => received.push(e)); + + const alice = new ShadeSessionManager(crypto, new MemoryStorage(), { events }); + const bob = new ShadeSessionManager(crypto, new MemoryStorage(), { events }); + await alice.initialize(); + await bob.initialize(); + const otpks = await bob.generateOneTimePreKeys(5); + const bundle = await bob.createPreKeyBundle(); + bundle.oneTimePreKey = { keyId: otpks[0].keyId, publicKey: otpks[0].keyPair.publicKey }; + await alice.initSessionFromBundle('bob', bundle); + const env = await alice.encrypt('bob', 'secret message'); + await bob.decrypt('alice', env); + await alice.rotateSignedPreKey(); + + // Serialize all events and check for any 32-byte base64 patterns + // (which would indicate raw key material) + const json = JSON.stringify(received); + + // 32-byte base64 = 44 chars (with padding) or 43 (without) + // We allow short 16-hex-char hashes, but no 44-char base64 or 64-char hex + const longBase64 = /[A-Za-z0-9+/]{43,}={0,2}/g; + const longHex = /[0-9a-f]{32,}/gi; + + const base64Matches = json.match(longBase64) ?? []; + const hexMatches = json.match(longHex) ?? []; + + // Filter out any matches that are inside hash fields (which are 16 hex chars, + // so the regex above wouldn't match anyway, but be explicit) + expect(base64Matches.length).toBe(0); + expect(hexMatches.length).toBe(0); + + // Also no plaintext leakage + expect(json).not.toContain('secret message'); + }); +}); diff --git a/packages/shade-observer/package.json b/packages/shade-observer/package.json new file mode 100644 index 0000000..ef50abf --- /dev/null +++ b/packages/shade-observer/package.json @@ -0,0 +1,15 @@ +{ + "name": "@shade/observer", + "version": "0.1.0", + "type": "module", + "main": "src/index.ts", + "types": "src/index.ts", + "dependencies": { + "@shade/core": "workspace:*", + "@shade/server": "workspace:*", + "hono": "^4.12.12" + }, + "devDependencies": { + "@shade/crypto-web": "workspace:*" + } +} diff --git a/packages/shade-observer/src/auth.ts b/packages/shade-observer/src/auth.ts new file mode 100644 index 0000000..180004e --- /dev/null +++ b/packages/shade-observer/src/auth.ts @@ -0,0 +1,47 @@ +import { UnauthorizedError, ConfigurationError } from '@shade/core'; +import type { Context, Next } from 'hono'; + +/** + * Bearer token middleware for the observer. + * + * Reads token from `Authorization: Bearer ` header. + * For SSE endpoints (where browsers can't set headers), also accepts + * `?token=` query parameter. + * + * Throws ConfigurationError if SHADE_OBSERVER_TOKEN is empty (refuses to start). + */ +export function createAuthMiddleware(token: string) { + if (!token || token.length < 16) { + throw new ConfigurationError( + 'SHADE_OBSERVER_TOKEN must be set and at least 16 characters. Refusing to start.', + ); + } + + return async (c: Context, next: Next) => { + const header = c.req.header('Authorization'); + let provided: string | null = null; + + if (header && header.startsWith('Bearer ')) { + provided = header.slice(7); + } else { + // Allow query string for SSE (EventSource can't set headers) + provided = c.req.query('token') ?? null; + } + + if (!provided || !constantTimeStringEqual(provided, token)) { + throw new UnauthorizedError('Invalid or missing observer token'); + } + + await next(); + }; +} + +/** Constant-time string comparison (avoids timing attacks on token check) */ +function constantTimeStringEqual(a: string, b: string): boolean { + if (a.length !== b.length) return false; + let diff = 0; + for (let i = 0; i < a.length; i++) { + diff |= a.charCodeAt(i) ^ b.charCodeAt(i); + } + return diff === 0; +} diff --git a/packages/shade-observer/src/index.ts b/packages/shade-observer/src/index.ts new file mode 100644 index 0000000..ff6f7e2 --- /dev/null +++ b/packages/shade-observer/src/index.ts @@ -0,0 +1,48 @@ +import { Hono } from 'hono'; +import { join, dirname } from 'path'; +import { fileURLToPath } from 'url'; +import { createObserverRoutes, type ObserverOptions } from './routes.js'; +import { createStaticRoutes } from './static.js'; + +export { createObserverRoutes } from './routes.js'; +export { StateAggregator } from './state.js'; +export { createAuthMiddleware } from './auth.js'; +export { createStaticRoutes } from './static.js'; +export type { ObserverOptions } from './routes.js'; +export type { ObserverSnapshot } from './state.js'; + +/** + * Create a complete Shade Observer Hono app with API + dashboard. + * + * Usage: + * ```ts + * import { createObserver } from '@shade/observer'; + * + * const observer = createObserver({ + * token: process.env.SHADE_OBSERVER_TOKEN!, + * clientEvents: sessionManager.getEvents(), + * serverEvents: prekeyServerEvents, + * }); + * + * // Mount in any Hono app + * app.route('/shade-observer', observer); + * + * // Or run standalone + * Bun.serve({ port: 3901, fetch: observer.fetch }); + * ``` + */ +export function createObserver( + options: ObserverOptions & { distDir?: string }, +): Hono { + const app = new Hono(); + app.route('/', createObserverRoutes(options)); + + const distDir = options.distDir + ?? join(dirname(fileURLToPath(import.meta.url)), '..', 'dist'); + app.route('/', createStaticRoutes(distDir)); + + // Root → dashboard + app.get('/', (c) => c.redirect('/dashboard/')); + + return app; +} diff --git a/packages/shade-observer/src/routes.ts b/packages/shade-observer/src/routes.ts new file mode 100644 index 0000000..ab0508d --- /dev/null +++ b/packages/shade-observer/src/routes.ts @@ -0,0 +1,105 @@ +import { Hono } from 'hono'; +import { streamSSE } from 'hono/streaming'; +import type { ShadeEventEmitter, ShadeEvent } from '@shade/core'; +import { errorToHttpStatus, ShadeError } from '@shade/core'; +import type { PrekeyServerEvents, PrekeyServerEvent } from '@shade/server'; +import { StateAggregator } from './state.js'; +import { createAuthMiddleware } from './auth.js'; + +export interface ObserverOptions { + token: string; + clientEvents?: ShadeEventEmitter; + serverEvents?: PrekeyServerEvents; +} + +export function createObserverRoutes(options: ObserverOptions): Hono { + const app = new Hono(); + const aggregator = new StateAggregator(options.clientEvents, options.serverEvents); + const auth = createAuthMiddleware(options.token); + + // Global error handler + app.onError((err, c) => { + if (err instanceof ShadeError) { + return c.json(err.toJSON(), errorToHttpStatus(err) as any); + } + console.error('[Shade Observer] Unhandled error:', err); + return c.json({ error: 'Internal server error' }, 500); + }); + + // ─── Snapshot ────────────────────────────────────────────── + app.get('/api/state', auth, (c) => { + return c.json(aggregator.toJSON()); + }); + + // ─── Live event stream ───────────────────────────────────── + app.get('/api/events', auth, async (c) => { + const sinceParam = c.req.query('since'); + const since = sinceParam ? parseInt(sinceParam, 10) : 0; + + return streamSSE(c, async (stream) => { + // Send buffered events from `since` onwards + if (options.clientEvents) { + for (const e of options.clientEvents.getBufferedSince(since)) { + await stream.writeSSE({ + event: 'shade', + id: String(e.seq), + data: JSON.stringify({ source: 'client', ...e }), + }); + } + } + if (options.serverEvents) { + for (const e of options.serverEvents.getBufferedSince(since)) { + await stream.writeSSE({ + event: 'shade', + id: String(e.seq), + data: JSON.stringify({ source: 'server', ...e }), + }); + } + } + + // Subscribe to live events + let closed = false; + const queue: Array<{ source: 'client' | 'server'; event: ShadeEvent | PrekeyServerEvent }> = []; + + const unsubClient = options.clientEvents?.on((e) => { + if (closed) return; + queue.push({ source: 'client', event: e }); + }); + const unsubServer = options.serverEvents?.on((e) => { + if (closed) return; + queue.push({ source: 'server', event: e }); + }); + + // Drain queue periodically (or on demand) + try { + while (!closed) { + if (queue.length > 0) { + const { source, event } = queue.shift()!; + await stream.writeSSE({ + event: 'shade', + id: String(event.seq), + data: JSON.stringify({ source, ...event }), + }); + } else { + // Heartbeat every 15s to keep connection alive + await stream.writeSSE({ event: 'heartbeat', data: 'ping' }); + await stream.sleep(15000); + } + } + } catch { + // Stream closed + } finally { + closed = true; + unsubClient?.(); + unsubServer?.(); + } + }); + }); + + // ─── Health (no auth) ────────────────────────────────────── + app.get('/health', (c) => { + return c.json({ status: 'ok', service: 'shade-observer' }); + }); + + return app; +} diff --git a/packages/shade-observer/src/state.ts b/packages/shade-observer/src/state.ts new file mode 100644 index 0000000..a4cb976 --- /dev/null +++ b/packages/shade-observer/src/state.ts @@ -0,0 +1,205 @@ +import type { ShadeEventEmitter, ShadeEvent, ShadeSessionManager } from '@shade/core'; +import type { PrekeyServerEvents, PrekeyServerEvent, PrekeyStore } from '@shade/server'; + +/** + * Aggregated observer state, updated as events flow in. + * + * The observer maintains a rolling snapshot of: + * - Identity (fingerprint, registration ID) + * - Active sessions (per address: message counts, last activity) + * - Prekey stock + * - Server stats (registered identities, fetches, replenishes) + * - Recent events ring buffer + */ +export interface ObserverSnapshot { + identity: { + fingerprint: string | null; + registrationId: number | null; + lastInitialized: number | null; + lastRotated: number | null; + }; + sessions: Array<{ + address: string; + remoteIdentityKeyHash: string; + messageCountSent: number; + messageCountReceived: number; + lastActivity: number; + dhRatchetSteps: number; + }>; + prekeys: { + oneTimeRemaining: number; + lastGenerated: number | null; + lastConsumed: number | null; + signedPreKeyId: number | null; + signedPreKeyLastRotated: number | null; + }; + retiredIdentities: number; + server: { + registeredIdentities: Set; + totalBundleFetches: number; + totalReplenishes: number; + totalDeleted: number; + totalRateLimited: number; + }; +} + +interface SessionStats { + remoteIdentityKeyHash: string; + messageCountSent: number; + messageCountReceived: number; + lastActivity: number; + dhRatchetSteps: number; +} + +export class StateAggregator { + private identity: ObserverSnapshot['identity'] = { + fingerprint: null, + registrationId: null, + lastInitialized: null, + lastRotated: null, + }; + private sessions = new Map(); + private prekeys: ObserverSnapshot['prekeys'] = { + oneTimeRemaining: 0, + lastGenerated: null, + lastConsumed: null, + signedPreKeyId: null, + signedPreKeyLastRotated: null, + }; + private retiredIdentities = 0; + private serverStats = { + registeredIdentities: new Set(), + totalBundleFetches: 0, + totalReplenishes: 0, + totalDeleted: 0, + totalRateLimited: 0, + }; + + constructor( + private readonly clientEvents?: ShadeEventEmitter, + private readonly serverEvents?: PrekeyServerEvents, + private readonly manager?: ShadeSessionManager, + private readonly store?: PrekeyStore, + ) { + if (clientEvents) { + clientEvents.on((e) => this.handleClientEvent(e)); + } + if (serverEvents) { + serverEvents.on((e) => this.handleServerEvent(e)); + } + } + + private handleClientEvent(e: ShadeEvent): void { + switch (e.name) { + case 'identity.initialized': + this.identity.fingerprint = e.data.fingerprint; + this.identity.registrationId = e.data.registrationId; + this.identity.lastInitialized = e.timestamp; + break; + case 'identity.rotated': + this.identity.fingerprint = e.data.newFingerprint; + this.identity.lastRotated = e.timestamp; + this.retiredIdentities++; + break; + case 'session.created': + this.sessions.set(e.data.address, { + remoteIdentityKeyHash: e.data.remoteIdentityKeyHash, + messageCountSent: 0, + messageCountReceived: 0, + lastActivity: e.timestamp, + dhRatchetSteps: 0, + }); + break; + case 'session.removed': + this.sessions.delete(e.data.address); + break; + case 'message.encrypted': { + const s = this.sessions.get(e.data.address); + if (s) { + s.messageCountSent++; + s.lastActivity = e.timestamp; + } + break; + } + case 'message.decrypted': { + const s = this.sessions.get(e.data.address); + if (s) { + s.messageCountReceived++; + s.lastActivity = e.timestamp; + } + break; + } + case 'ratchet.dh_step': { + const s = this.sessions.get(e.data.address); + if (s) s.dhRatchetSteps++; + break; + } + case 'prekey.generated': + this.prekeys.oneTimeRemaining = e.data.totalAfter; + this.prekeys.lastGenerated = e.timestamp; + break; + case 'prekey.consumed': + if (this.prekeys.oneTimeRemaining > 0) this.prekeys.oneTimeRemaining--; + this.prekeys.lastConsumed = e.timestamp; + break; + case 'signed_prekey.rotated': + this.prekeys.signedPreKeyId = e.data.newKeyId; + this.prekeys.signedPreKeyLastRotated = e.timestamp; + break; + // trust.* don't directly affect snapshot but appear in event feed + } + } + + private handleServerEvent(e: PrekeyServerEvent): void { + switch (e.name) { + case 'server.identity_registered': + this.serverStats.registeredIdentities.add(e.data.address); + break; + case 'server.bundle_fetched': + this.serverStats.totalBundleFetches++; + break; + case 'server.prekeys_replenished': + this.serverStats.totalReplenishes++; + break; + case 'server.identity_deleted': + this.serverStats.registeredIdentities.delete(e.data.address); + this.serverStats.totalDeleted++; + break; + case 'server.rate_limited': + this.serverStats.totalRateLimited++; + break; + } + } + + /** Get current snapshot */ + snapshot(): ObserverSnapshot { + return { + identity: { ...this.identity }, + sessions: Array.from(this.sessions.entries()).map(([address, s]) => ({ + address, + ...s, + })), + prekeys: { ...this.prekeys }, + retiredIdentities: this.retiredIdentities, + server: { + registeredIdentities: new Set(this.serverStats.registeredIdentities), + totalBundleFetches: this.serverStats.totalBundleFetches, + totalReplenishes: this.serverStats.totalReplenishes, + totalDeleted: this.serverStats.totalDeleted, + totalRateLimited: this.serverStats.totalRateLimited, + }, + }; + } + + /** Snapshot with serializable JSON (Set → array) */ + toJSON(): any { + const s = this.snapshot(); + return { + ...s, + server: { + ...s.server, + registeredIdentities: Array.from(s.server.registeredIdentities), + }, + }; + } +} diff --git a/packages/shade-observer/src/static.ts b/packages/shade-observer/src/static.ts new file mode 100644 index 0000000..210244a --- /dev/null +++ b/packages/shade-observer/src/static.ts @@ -0,0 +1,90 @@ +import { Hono } from 'hono'; +import { join } from 'path'; +import { existsSync, readFileSync, statSync } from 'fs'; + +/** + * Serve the bundled dashboard SPA from /dashboard/. + * + * Looks for dist/ in the @shade/observer package directory. + * Falls back to a placeholder page if no build is present. + */ +export function createStaticRoutes(distDir: string): Hono { + const app = new Hono(); + + app.get('/dashboard', (c) => c.redirect('/dashboard/')); + + app.get('/dashboard/*', async (c) => { + const url = new URL(c.req.url); + let path = url.pathname.replace(/^\/dashboard\/?/, '') || 'index.html'; + + // Prevent path traversal + if (path.includes('..')) { + return c.text('Forbidden', 403); + } + + const fullPath = join(distDir, path); + + if (!existsSync(fullPath) || !statSync(fullPath).isFile()) { + // Fall back to index.html for SPA routing + const indexPath = join(distDir, 'index.html'); + if (!existsSync(indexPath)) { + return c.html(placeholderHtml()); + } + const content = readFileSync(indexPath); + c.header('Content-Type', 'text/html; charset=utf-8'); + return c.body(content as any); + } + + const content = readFileSync(fullPath); + const ct = contentTypeFor(path); + c.header('Content-Type', ct); + if (path.endsWith('.html')) { + c.header('Cache-Control', 'no-cache'); + } else { + c.header('Cache-Control', 'public, max-age=3600'); + } + return c.body(content as any); + }); + + return app; +} + +function contentTypeFor(path: string): string { + if (path.endsWith('.html')) return 'text/html; charset=utf-8'; + if (path.endsWith('.js')) return 'application/javascript; charset=utf-8'; + if (path.endsWith('.css')) return 'text/css; charset=utf-8'; + if (path.endsWith('.json')) return 'application/json; charset=utf-8'; + if (path.endsWith('.svg')) return 'image/svg+xml'; + if (path.endsWith('.png')) return 'image/png'; + if (path.endsWith('.woff2')) return 'font/woff2'; + return 'application/octet-stream'; +} + +function placeholderHtml(): string { + return ` + + + + Shade Observer + + + +

Shade Observer

+

The dashboard SPA hasn't been built yet. The observer API is running, but there's no UI bundled.

+

To build the dashboard:

+
cd packages/shade-dashboard && bun run build
+

Then re-run the observer.

+

API endpoints

+
    +
  • GET /api/state — current snapshot (requires Authorization: Bearer ...)
  • +
  • GET /api/events — SSE stream of live events
  • +
  • GET /health — health check (no auth)
  • +
+ +`; +} diff --git a/packages/shade-observer/tests/observer.test.ts b/packages/shade-observer/tests/observer.test.ts new file mode 100644 index 0000000..77af850 --- /dev/null +++ b/packages/shade-observer/tests/observer.test.ts @@ -0,0 +1,154 @@ +import { describe, test, expect } from 'bun:test'; +import { createObserver, StateAggregator } from '../src/index.js'; +import { SubtleCryptoProvider, MemoryStorage } from '@shade/crypto-web'; +import { ShadeSessionManager, ShadeEventEmitter } from '@shade/core'; +import { PrekeyServerEvents } from '@shade/server'; + +const crypto = new SubtleCryptoProvider(); +const TEST_TOKEN = 'test-token-must-be-at-least-16-chars'; + +describe('StateAggregator', () => { + test('aggregates client events into snapshot', async () => { + const events = new ShadeEventEmitter(); + const agg = new StateAggregator(events); + + const alice = new ShadeSessionManager(crypto, new MemoryStorage(), { events }); + await alice.initialize(); + await alice.generateOneTimePreKeys(10); + + const snap = agg.snapshot(); + expect(snap.identity.fingerprint).toBeTruthy(); + expect(snap.identity.registrationId).toBeGreaterThan(0); + expect(snap.prekeys.oneTimeRemaining).toBe(10); + expect(snap.prekeys.lastGenerated).toBeGreaterThan(0); + }); + + test('tracks sessions across encrypt/decrypt', async () => { + const events = new ShadeEventEmitter(); + const agg = new StateAggregator(events); + + const alice = new ShadeSessionManager(crypto, new MemoryStorage(), { events }); + const bob = new ShadeSessionManager(crypto, new MemoryStorage(), { events }); + await alice.initialize(); + await bob.initialize(); + + const otpks = await bob.generateOneTimePreKeys(5); + const bundle = await bob.createPreKeyBundle(); + bundle.oneTimePreKey = { keyId: otpks[0].keyId, publicKey: otpks[0].keyPair.publicKey }; + await alice.initSessionFromBundle('bob', bundle); + + const env = await alice.encrypt('bob', 'hello'); + await bob.decrypt('alice', env); + + const snap = agg.snapshot(); + const aliceToBob = snap.sessions.find((s) => s.address === 'bob'); + expect(aliceToBob).toBeDefined(); + expect(aliceToBob!.messageCountSent).toBe(1); + }); + + test('tracks server events', () => { + const serverEvents = new PrekeyServerEvents(); + const agg = new StateAggregator(undefined, serverEvents); + + serverEvents.emit('server.identity_registered', { address: 'alice', identityKeyHash: 'abc' }); + serverEvents.emit('server.identity_registered', { address: 'bob', identityKeyHash: 'def' }); + serverEvents.emit('server.bundle_fetched', { address: 'alice', hadOneTimePreKey: true }); + serverEvents.emit('server.bundle_fetched', { address: 'alice', hadOneTimePreKey: false }); + serverEvents.emit('server.identity_deleted', { address: 'alice' }); + + const snap = agg.snapshot(); + expect(snap.server.registeredIdentities.has('bob')).toBe(true); + expect(snap.server.registeredIdentities.has('alice')).toBe(false); + expect(snap.server.totalBundleFetches).toBe(2); + expect(snap.server.totalDeleted).toBe(1); + }); +}); + +describe('Observer routes', () => { + test('refuses requests without token', async () => { + const events = new ShadeEventEmitter(); + const observer = createObserver({ token: TEST_TOKEN, clientEvents: events }); + + const res = await observer.request('/api/state'); + expect(res.status).toBe(401); + }); + + test('accepts requests with valid bearer token', async () => { + const events = new ShadeEventEmitter(); + const observer = createObserver({ token: TEST_TOKEN, clientEvents: events }); + + const mgr = new ShadeSessionManager(crypto, new MemoryStorage(), { events }); + await mgr.initialize(); + + const res = await observer.request('/api/state', { + headers: { Authorization: `Bearer ${TEST_TOKEN}` }, + }); + expect(res.status).toBe(200); + const body = await res.json(); + expect(body.identity.fingerprint).toBeTruthy(); + }); + + test('refuses requests with wrong token', async () => { + const events = new ShadeEventEmitter(); + const observer = createObserver({ token: TEST_TOKEN, clientEvents: events }); + + const res = await observer.request('/api/state', { + headers: { Authorization: 'Bearer wrong-token-also-long-enough' }, + }); + expect(res.status).toBe(401); + }); + + test('accepts token via query string for SSE', async () => { + const events = new ShadeEventEmitter(); + const observer = createObserver({ token: TEST_TOKEN, clientEvents: events }); + + // Just check that the auth middleware accepts the query token + const res = await observer.request(`/api/state?token=${TEST_TOKEN}`); + expect(res.status).toBe(200); + }); + + test('refuses startup with too-short token', () => { + expect(() => createObserver({ token: 'short' })).toThrow(); + }); + + test('health endpoint works without auth', async () => { + const observer = createObserver({ token: TEST_TOKEN }); + const res = await observer.request('/health'); + expect(res.status).toBe(200); + const body = await res.json(); + expect(body.status).toBe('ok'); + }); + + test('snapshot reflects state after operations', async () => { + const events = new ShadeEventEmitter(); + const observer = createObserver({ token: TEST_TOKEN, clientEvents: events }); + + const alice = new ShadeSessionManager(crypto, new MemoryStorage(), { events }); + const bob = new ShadeSessionManager(crypto, new MemoryStorage(), { events }); + await alice.initialize(); + await bob.initialize(); + const otpks = await bob.generateOneTimePreKeys(3); + const bundle = await bob.createPreKeyBundle(); + bundle.oneTimePreKey = { keyId: otpks[0].keyId, publicKey: otpks[0].keyPair.publicKey }; + await alice.initSessionFromBundle('bob', bundle); + + const env = await alice.encrypt('bob', 'hi'); + await bob.decrypt('alice', env); + + const res = await observer.request('/api/state', { + headers: { Authorization: `Bearer ${TEST_TOKEN}` }, + }); + const body = await res.json(); + expect(body.sessions.length).toBeGreaterThan(0); + // Bob started with 3 OTPKs; Alice consumed one via X3DH PreKeyMessage decrypt + expect(body.prekeys.oneTimeRemaining).toBe(2); + }); + + test('placeholder dashboard renders when no dist', async () => { + const observer = createObserver({ token: TEST_TOKEN }); + const res = await observer.request('/dashboard/'); + expect(res.status).toBe(200); + const html = await res.text(); + expect(html).toContain('Shade Observer'); + }); +}); diff --git a/packages/shade-observer/tsconfig.json b/packages/shade-observer/tsconfig.json new file mode 100644 index 0000000..aee55fa --- /dev/null +++ b/packages/shade-observer/tsconfig.json @@ -0,0 +1,5 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { "outDir": "dist-build", "rootDir": "src" }, + "include": ["src"] +} diff --git a/packages/shade-server/src/events.ts b/packages/shade-server/src/events.ts new file mode 100644 index 0000000..5f8cd0b --- /dev/null +++ b/packages/shade-server/src/events.ts @@ -0,0 +1,92 @@ +/** + * Prekey server event emitter. + * + * Mirrors @shade/core's ShadeEventEmitter for the server side. Emits + * structural facts only — no key material, no signatures, no plaintext. + */ + +export interface PrekeyServerEventBase { + seq: number; + timestamp: number; +} + +export interface PrekeyServerEventMap { + 'server.identity_registered': { address: string; identityKeyHash: string }; + 'server.bundle_fetched': { address: string; hadOneTimePreKey: boolean }; + 'server.prekeys_replenished': { address: string; count: number; totalAfter: number }; + 'server.identity_deleted': { address: string }; + 'server.rate_limited': { route: string; key: string }; +} + +export type PrekeyServerEventName = keyof PrekeyServerEventMap; + +export type PrekeyServerEvent = { + [K in PrekeyServerEventName]: PrekeyServerEventBase & { name: K; data: PrekeyServerEventMap[K] }; +}[PrekeyServerEventName]; + +export type PrekeyServerEventListener = (event: PrekeyServerEvent) => void; + +export class PrekeyServerEvents { + private listeners = new Set(); + private nextSeq = 1; + private buffer: PrekeyServerEvent[] = []; + private readonly maxBuffer: number; + + constructor(options: { bufferSize?: number } = {}) { + this.maxBuffer = options.bufferSize ?? 1000; + } + + on(listener: PrekeyServerEventListener): () => void { + this.listeners.add(listener); + return () => this.listeners.delete(listener); + } + + off(listener: PrekeyServerEventListener): void { + this.listeners.delete(listener); + } + + emit(name: K, data: PrekeyServerEventMap[K]): void { + const event = { + seq: this.nextSeq++, + timestamp: Date.now(), + name, + data, + } as PrekeyServerEvent; + + this.buffer.push(event); + if (this.buffer.length > this.maxBuffer) this.buffer.shift(); + + for (const listener of this.listeners) { + try { + listener(event); + } catch (err) { + console.error('[Shade] Server event listener threw:', err); + } + } + } + + getBufferedSince(since: number): PrekeyServerEvent[] { + return this.buffer.filter((e) => e.seq > since); + } + + getRecent(n: number): PrekeyServerEvent[] { + return this.buffer.slice(-n); + } + + get currentSeq(): number { + return this.nextSeq - 1; + } +} + +/** + * Compute a short display hash from a public key. + * Identical algorithm to @shade/core/shortHash but inlined here to + * avoid circular dependency on CryptoProvider. + * + * Uses SHA-256 via crypto.subtle directly. + */ +export async function shortHash(key: Uint8Array): Promise { + const buf = await globalThis.crypto.subtle.digest('SHA-256', key); + const arr = new Uint8Array(buf).slice(0, 8); + return Array.from(arr, (b) => b.toString(16).padStart(2, '0')).join(''); +} diff --git a/packages/shade-server/src/index.ts b/packages/shade-server/src/index.ts index 50c7548..7a74f5e 100644 --- a/packages/shade-server/src/index.ts +++ b/packages/shade-server/src/index.ts @@ -3,6 +3,7 @@ import type { CryptoProvider } from '@shade/core'; import { createPrekeyRoutes } from './routes.js'; import { MemoryPrekeyStore } from './memory-store.js'; import type { PrekeyStore } from './store.js'; +import type { PrekeyServerEvents } from './events.js'; export { createPrekeyRoutes } from './routes.js'; export { MemoryPrekeyStore } from './memory-store.js'; @@ -27,10 +28,16 @@ export function createPrekeyServer(options: { crypto: CryptoProvider; store?: PrekeyStore; disableRateLimit?: boolean; + events?: PrekeyServerEvents; }): Hono { const store = options.store ?? new MemoryPrekeyStore(); - return createPrekeyRoutes(store, options.crypto, { disableRateLimit: options.disableRateLimit }); + return createPrekeyRoutes(store, options.crypto, { + disableRateLimit: options.disableRateLimit, + events: options.events, + }); } export { RateLimiter, MemoryRateLimitStore } from './rate-limit.js'; export type { RateLimitStore, RateLimitConfig } from './rate-limit.js'; +export { PrekeyServerEvents, shortHash as serverShortHash } from './events.js'; +export type { PrekeyServerEvent, PrekeyServerEventName, PrekeyServerEventMap, PrekeyServerEventListener } from './events.js'; diff --git a/packages/shade-server/src/routes.ts b/packages/shade-server/src/routes.ts index 1fa8b58..b1a91c8 100644 --- a/packages/shade-server/src/routes.ts +++ b/packages/shade-server/src/routes.ts @@ -1,9 +1,10 @@ import { Hono } from 'hono'; import type { CryptoProvider } from '@shade/core'; -import { fromBase64, errorToHttpStatus, ShadeError, ValidationError } from '@shade/core'; +import { fromBase64, errorToHttpStatus, ShadeError, ValidationError, RateLimitError } from '@shade/core'; import type { PrekeyStore } from './store.js'; import { verifyPayload, validateAddress } from './auth.js'; import { RateLimiter, MemoryRateLimitStore, REGISTER_LIMIT, FETCH_LIMIT, REPLENISH_LIMIT, DELETE_LIMIT } from './rate-limit.js'; +import { PrekeyServerEvents, shortHash } from './events.js'; /** Max POST body size in bytes (64KB) */ const MAX_BODY_SIZE = 64 * 1024; @@ -23,6 +24,8 @@ const MAX_BODY_SIZE = 64 * 1024; export interface PrekeyRoutesOptions { /** Disable rate limiting (for tests). Default: enabled. */ disableRateLimit?: boolean; + /** Optional event emitter for observability. */ + events?: PrekeyServerEvents; } export function createPrekeyRoutes( @@ -31,6 +34,7 @@ export function createPrekeyRoutes( options: PrekeyRoutesOptions = {}, ): Hono { const app = new Hono(); + const events = options.events; // Rate limiters (one per route, per IP or per identity) const rlStore = new MemoryRateLimitStore(); @@ -51,6 +55,13 @@ export function createPrekeyRoutes( // Global error handler — maps ShadeError to HTTP status app.onError((err, c) => { + if (err instanceof RateLimitError) { + // Emit rate-limited event before responding + events?.emit('server.rate_limited', { + route: c.req.routePath ?? c.req.path, + key: getClientIp(c), + }); + } if (err instanceof ShadeError) { const status = errorToHttpStatus(err); const body: any = err.toJSON(); @@ -101,6 +112,11 @@ export function createPrekeyRoutes( await store.saveOneTimePreKeys(addr, keys); } + if (events) { + const hash = await shortHash(signingKey); + events.emit('server.identity_registered', { address: addr, identityKeyHash: hash }); + } + return c.json({ ok: true }); }); @@ -138,6 +154,11 @@ export function createPrekeyRoutes( }; } + events?.emit('server.bundle_fetched', { + address, + hadOneTimePreKey: oneTimePreKey != null, + }); + return c.json(bundle); }); @@ -170,6 +191,11 @@ export function createPrekeyRoutes( await store.saveOneTimePreKeys(addr, keys); const count = await store.getOneTimePreKeyCount(addr); + events?.emit('server.prekeys_replenished', { + address: addr, + count: keys.length, + totalAfter: count, + }); return c.json({ ok: true, remaining: count }); }); @@ -195,6 +221,7 @@ export function createPrekeyRoutes( await verifyPayload(crypto, identity.identitySigningKey, { ...body, address }); await store.deleteAll(address); + events?.emit('server.identity_deleted', { address }); return c.json({ ok: true }); }); diff --git a/packages/shade-server/tests/events.test.ts b/packages/shade-server/tests/events.test.ts new file mode 100644 index 0000000..9992b91 --- /dev/null +++ b/packages/shade-server/tests/events.test.ts @@ -0,0 +1,142 @@ +import { describe, test, expect } from 'bun:test'; +import { createPrekeyServer, MemoryPrekeyStore, PrekeyServerEvents, signPayload } from '../src/index.js'; +import type { PrekeyServerEvent } from '../src/index.js'; +import { SubtleCryptoProvider } from '@shade/crypto-web'; +import { generateIdentityKeyPair } from '@shade/core'; + +const crypto = new SubtleCryptoProvider(); + +function b64(bytes: Uint8Array): string { + return Buffer.from(bytes).toString('base64'); +} + +function randBytes(n: number): Uint8Array { + const buf = new Uint8Array(n); + globalThis.crypto.getRandomValues(buf); + return buf; +} + +describe('PrekeyServerEvents integration', () => { + test('emits events for register, fetch, replenish, delete', async () => { + const events = new PrekeyServerEvents(); + const received: PrekeyServerEvent[] = []; + events.on((e) => received.push(e)); + + const store = new MemoryPrekeyStore(); + const app = createPrekeyServer({ crypto, store, disableRateLimit: true, events }); + + const alice = await generateIdentityKeyPair(crypto); + + // Register + const regBody = await signPayload(crypto, alice.signingPrivateKey, { + address: 'alice', + identitySigningKey: b64(alice.signingPublicKey), + identityDHKey: b64(alice.dhPublicKey), + signedPreKey: { keyId: 1, publicKey: b64(randBytes(32)), signature: b64(randBytes(64)) }, + oneTimePreKeys: [{ keyId: 100, publicKey: b64(randBytes(32)) }], + }); + await app.request('/v1/keys/register', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(regBody), + }); + + // Fetch bundle + await app.request('/v1/keys/bundle/alice'); + + // Replenish + const replenishBody = await signPayload(crypto, alice.signingPrivateKey, { + address: 'alice', + oneTimePreKeys: [ + { keyId: 200, publicKey: b64(randBytes(32)) }, + { keyId: 201, publicKey: b64(randBytes(32)) }, + ], + }); + await app.request('/v1/keys/replenish', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(replenishBody), + }); + + // Delete + const delBody = await signPayload(crypto, alice.signingPrivateKey, { address: 'alice' }); + await app.request('/v1/keys/alice', { + method: 'DELETE', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(delBody), + }); + + const names = received.map((e) => e.name); + expect(names).toContain('server.identity_registered'); + expect(names).toContain('server.bundle_fetched'); + expect(names).toContain('server.prekeys_replenished'); + expect(names).toContain('server.identity_deleted'); + + // Verify hadOneTimePreKey is true on the fetch event + const fetchEvent = received.find((e) => e.name === 'server.bundle_fetched'); + expect((fetchEvent!.data as any).hadOneTimePreKey).toBe(true); + + // Verify replenish reports the right count + const replenishEvent = received.find((e) => e.name === 'server.prekeys_replenished'); + expect((replenishEvent!.data as any).count).toBe(2); + expect((replenishEvent!.data as any).totalAfter).toBe(2); // 1 - 1 (consumed) + 2 = 2 + }); + + test('emits server.rate_limited when limits trip', async () => { + const events = new PrekeyServerEvents(); + const received: PrekeyServerEvent[] = []; + events.on((e) => received.push(e)); + + // Rate limit ENABLED for this test + const app = createPrekeyServer({ crypto, store: new MemoryPrekeyStore(), events }); + + // Burn the register limit (5/hour) + for (let i = 0; i < 7; i++) { + const id = await generateIdentityKeyPair(crypto); + const body = await signPayload(crypto, id.signingPrivateKey, { + address: `user${i}`, + identitySigningKey: b64(id.signingPublicKey), + identityDHKey: b64(id.dhPublicKey), + signedPreKey: { keyId: 1, publicKey: b64(randBytes(32)), signature: b64(randBytes(64)) }, + }); + await app.request('/v1/keys/register', { + method: 'POST', + headers: { 'Content-Type': 'application/json', 'x-forwarded-for': '203.0.113.99' }, + body: JSON.stringify(body), + }); + } + + const rateLimitedEvents = received.filter((e) => e.name === 'server.rate_limited'); + expect(rateLimitedEvents.length).toBeGreaterThan(0); + }); + + test('SECURITY: no key material in server event payloads', async () => { + const events = new PrekeyServerEvents(); + const received: PrekeyServerEvent[] = []; + events.on((e) => received.push(e)); + + const app = createPrekeyServer({ crypto, store: new MemoryPrekeyStore(), disableRateLimit: true, events }); + const alice = await generateIdentityKeyPair(crypto); + + const regBody = await signPayload(crypto, alice.signingPrivateKey, { + address: 'alice', + identitySigningKey: b64(alice.signingPublicKey), + identityDHKey: b64(alice.dhPublicKey), + signedPreKey: { keyId: 1, publicKey: b64(randBytes(32)), signature: b64(randBytes(64)) }, + oneTimePreKeys: [{ keyId: 100, publicKey: b64(randBytes(32)) }], + }); + await app.request('/v1/keys/register', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(regBody), + }); + await app.request('/v1/keys/bundle/alice'); + + const json = JSON.stringify(received); + // Same regex as core: no 32+ byte base64 or 32+ char hex + const longBase64 = /[A-Za-z0-9+/]{43,}={0,2}/g; + const longHex = /[0-9a-f]{32,}/gi; + expect(json.match(longBase64) ?? []).toEqual([]); + expect(json.match(longHex) ?? []).toEqual([]); + }); +});