feat(observer): M-Obs 1-3 — event bus, server hooks, observer backend

M-Obs 1: Event bus in @shade/core
- ShadeEventEmitter with typed event union, ring buffer for replay
- 12 event types covering session lifecycle, ratchet operations,
  prekey changes, identity rotation, trust changes
- Wired into ShadeSessionManager (zero overhead when not enabled)
- shortHash helper for safe display of public keys
- Security test: regex-checks event payloads contain no key material

M-Obs 2: Prekey server event hooks
- PrekeyServerEvents emitter mirroring core's pattern
- 5 server event types: registered, fetched, replenished, deleted, rate_limited
- Wired into all routes including the rate-limit error handler
- shortHash helper using crypto.subtle directly (no provider dep)

M-Obs 3: @shade/observer package
- StateAggregator subscribes to client + server events, builds rolling snapshot
- Hono routes: GET /api/state (snapshot), GET /api/events (SSE stream)
- Bearer token auth via SHADE_OBSERVER_TOKEN, query string for SSE
- Refuses to start with token < 16 chars (ConfigurationError)
- Static file serving for bundled dashboard at /dashboard/
- Placeholder dashboard renders when no built SPA present

220 tests passing, 0 failures.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-10 18:49:51 +02:00
parent 75008b623a
commit b014f9b44c
17 changed files with 1364 additions and 5 deletions

View File

@@ -0,0 +1,47 @@
import { UnauthorizedError, ConfigurationError } from '@shade/core';
import type { Context, Next } from 'hono';
/**
* Bearer token middleware for the observer.
*
* Reads token from `Authorization: Bearer <token>` header.
* For SSE endpoints (where browsers can't set headers), also accepts
* `?token=<token>` query parameter.
*
* Throws ConfigurationError if SHADE_OBSERVER_TOKEN is empty (refuses to start).
*/
export function createAuthMiddleware(token: string) {
if (!token || token.length < 16) {
throw new ConfigurationError(
'SHADE_OBSERVER_TOKEN must be set and at least 16 characters. Refusing to start.',
);
}
return async (c: Context, next: Next) => {
const header = c.req.header('Authorization');
let provided: string | null = null;
if (header && header.startsWith('Bearer ')) {
provided = header.slice(7);
} else {
// Allow query string for SSE (EventSource can't set headers)
provided = c.req.query('token') ?? null;
}
if (!provided || !constantTimeStringEqual(provided, token)) {
throw new UnauthorizedError('Invalid or missing observer token');
}
await next();
};
}
/** Constant-time string comparison (avoids timing attacks on token check) */
function constantTimeStringEqual(a: string, b: string): boolean {
if (a.length !== b.length) return false;
let diff = 0;
for (let i = 0; i < a.length; i++) {
diff |= a.charCodeAt(i) ^ b.charCodeAt(i);
}
return diff === 0;
}

View File

@@ -0,0 +1,48 @@
import { Hono } from 'hono';
import { join, dirname } from 'path';
import { fileURLToPath } from 'url';
import { createObserverRoutes, type ObserverOptions } from './routes.js';
import { createStaticRoutes } from './static.js';
export { createObserverRoutes } from './routes.js';
export { StateAggregator } from './state.js';
export { createAuthMiddleware } from './auth.js';
export { createStaticRoutes } from './static.js';
export type { ObserverOptions } from './routes.js';
export type { ObserverSnapshot } from './state.js';
/**
* Create a complete Shade Observer Hono app with API + dashboard.
*
* Usage:
* ```ts
* import { createObserver } from '@shade/observer';
*
* const observer = createObserver({
* token: process.env.SHADE_OBSERVER_TOKEN!,
* clientEvents: sessionManager.getEvents(),
* serverEvents: prekeyServerEvents,
* });
*
* // Mount in any Hono app
* app.route('/shade-observer', observer);
*
* // Or run standalone
* Bun.serve({ port: 3901, fetch: observer.fetch });
* ```
*/
export function createObserver(
options: ObserverOptions & { distDir?: string },
): Hono {
const app = new Hono();
app.route('/', createObserverRoutes(options));
const distDir = options.distDir
?? join(dirname(fileURLToPath(import.meta.url)), '..', 'dist');
app.route('/', createStaticRoutes(distDir));
// Root → dashboard
app.get('/', (c) => c.redirect('/dashboard/'));
return app;
}

View File

@@ -0,0 +1,105 @@
import { Hono } from 'hono';
import { streamSSE } from 'hono/streaming';
import type { ShadeEventEmitter, ShadeEvent } from '@shade/core';
import { errorToHttpStatus, ShadeError } from '@shade/core';
import type { PrekeyServerEvents, PrekeyServerEvent } from '@shade/server';
import { StateAggregator } from './state.js';
import { createAuthMiddleware } from './auth.js';
export interface ObserverOptions {
token: string;
clientEvents?: ShadeEventEmitter;
serverEvents?: PrekeyServerEvents;
}
export function createObserverRoutes(options: ObserverOptions): Hono {
const app = new Hono();
const aggregator = new StateAggregator(options.clientEvents, options.serverEvents);
const auth = createAuthMiddleware(options.token);
// Global error handler
app.onError((err, c) => {
if (err instanceof ShadeError) {
return c.json(err.toJSON(), errorToHttpStatus(err) as any);
}
console.error('[Shade Observer] Unhandled error:', err);
return c.json({ error: 'Internal server error' }, 500);
});
// ─── Snapshot ──────────────────────────────────────────────
app.get('/api/state', auth, (c) => {
return c.json(aggregator.toJSON());
});
// ─── Live event stream ─────────────────────────────────────
app.get('/api/events', auth, async (c) => {
const sinceParam = c.req.query('since');
const since = sinceParam ? parseInt(sinceParam, 10) : 0;
return streamSSE(c, async (stream) => {
// Send buffered events from `since` onwards
if (options.clientEvents) {
for (const e of options.clientEvents.getBufferedSince(since)) {
await stream.writeSSE({
event: 'shade',
id: String(e.seq),
data: JSON.stringify({ source: 'client', ...e }),
});
}
}
if (options.serverEvents) {
for (const e of options.serverEvents.getBufferedSince(since)) {
await stream.writeSSE({
event: 'shade',
id: String(e.seq),
data: JSON.stringify({ source: 'server', ...e }),
});
}
}
// Subscribe to live events
let closed = false;
const queue: Array<{ source: 'client' | 'server'; event: ShadeEvent | PrekeyServerEvent }> = [];
const unsubClient = options.clientEvents?.on((e) => {
if (closed) return;
queue.push({ source: 'client', event: e });
});
const unsubServer = options.serverEvents?.on((e) => {
if (closed) return;
queue.push({ source: 'server', event: e });
});
// Drain queue periodically (or on demand)
try {
while (!closed) {
if (queue.length > 0) {
const { source, event } = queue.shift()!;
await stream.writeSSE({
event: 'shade',
id: String(event.seq),
data: JSON.stringify({ source, ...event }),
});
} else {
// Heartbeat every 15s to keep connection alive
await stream.writeSSE({ event: 'heartbeat', data: 'ping' });
await stream.sleep(15000);
}
}
} catch {
// Stream closed
} finally {
closed = true;
unsubClient?.();
unsubServer?.();
}
});
});
// ─── Health (no auth) ──────────────────────────────────────
app.get('/health', (c) => {
return c.json({ status: 'ok', service: 'shade-observer' });
});
return app;
}

View File

@@ -0,0 +1,205 @@
import type { ShadeEventEmitter, ShadeEvent, ShadeSessionManager } from '@shade/core';
import type { PrekeyServerEvents, PrekeyServerEvent, PrekeyStore } from '@shade/server';
/**
* Aggregated observer state, updated as events flow in.
*
* The observer maintains a rolling snapshot of:
* - Identity (fingerprint, registration ID)
* - Active sessions (per address: message counts, last activity)
* - Prekey stock
* - Server stats (registered identities, fetches, replenishes)
* - Recent events ring buffer
*/
export interface ObserverSnapshot {
identity: {
fingerprint: string | null;
registrationId: number | null;
lastInitialized: number | null;
lastRotated: number | null;
};
sessions: Array<{
address: string;
remoteIdentityKeyHash: string;
messageCountSent: number;
messageCountReceived: number;
lastActivity: number;
dhRatchetSteps: number;
}>;
prekeys: {
oneTimeRemaining: number;
lastGenerated: number | null;
lastConsumed: number | null;
signedPreKeyId: number | null;
signedPreKeyLastRotated: number | null;
};
retiredIdentities: number;
server: {
registeredIdentities: Set<string>;
totalBundleFetches: number;
totalReplenishes: number;
totalDeleted: number;
totalRateLimited: number;
};
}
interface SessionStats {
remoteIdentityKeyHash: string;
messageCountSent: number;
messageCountReceived: number;
lastActivity: number;
dhRatchetSteps: number;
}
export class StateAggregator {
private identity: ObserverSnapshot['identity'] = {
fingerprint: null,
registrationId: null,
lastInitialized: null,
lastRotated: null,
};
private sessions = new Map<string, SessionStats>();
private prekeys: ObserverSnapshot['prekeys'] = {
oneTimeRemaining: 0,
lastGenerated: null,
lastConsumed: null,
signedPreKeyId: null,
signedPreKeyLastRotated: null,
};
private retiredIdentities = 0;
private serverStats = {
registeredIdentities: new Set<string>(),
totalBundleFetches: 0,
totalReplenishes: 0,
totalDeleted: 0,
totalRateLimited: 0,
};
constructor(
private readonly clientEvents?: ShadeEventEmitter,
private readonly serverEvents?: PrekeyServerEvents,
private readonly manager?: ShadeSessionManager,
private readonly store?: PrekeyStore,
) {
if (clientEvents) {
clientEvents.on((e) => this.handleClientEvent(e));
}
if (serverEvents) {
serverEvents.on((e) => this.handleServerEvent(e));
}
}
private handleClientEvent(e: ShadeEvent): void {
switch (e.name) {
case 'identity.initialized':
this.identity.fingerprint = e.data.fingerprint;
this.identity.registrationId = e.data.registrationId;
this.identity.lastInitialized = e.timestamp;
break;
case 'identity.rotated':
this.identity.fingerprint = e.data.newFingerprint;
this.identity.lastRotated = e.timestamp;
this.retiredIdentities++;
break;
case 'session.created':
this.sessions.set(e.data.address, {
remoteIdentityKeyHash: e.data.remoteIdentityKeyHash,
messageCountSent: 0,
messageCountReceived: 0,
lastActivity: e.timestamp,
dhRatchetSteps: 0,
});
break;
case 'session.removed':
this.sessions.delete(e.data.address);
break;
case 'message.encrypted': {
const s = this.sessions.get(e.data.address);
if (s) {
s.messageCountSent++;
s.lastActivity = e.timestamp;
}
break;
}
case 'message.decrypted': {
const s = this.sessions.get(e.data.address);
if (s) {
s.messageCountReceived++;
s.lastActivity = e.timestamp;
}
break;
}
case 'ratchet.dh_step': {
const s = this.sessions.get(e.data.address);
if (s) s.dhRatchetSteps++;
break;
}
case 'prekey.generated':
this.prekeys.oneTimeRemaining = e.data.totalAfter;
this.prekeys.lastGenerated = e.timestamp;
break;
case 'prekey.consumed':
if (this.prekeys.oneTimeRemaining > 0) this.prekeys.oneTimeRemaining--;
this.prekeys.lastConsumed = e.timestamp;
break;
case 'signed_prekey.rotated':
this.prekeys.signedPreKeyId = e.data.newKeyId;
this.prekeys.signedPreKeyLastRotated = e.timestamp;
break;
// trust.* don't directly affect snapshot but appear in event feed
}
}
private handleServerEvent(e: PrekeyServerEvent): void {
switch (e.name) {
case 'server.identity_registered':
this.serverStats.registeredIdentities.add(e.data.address);
break;
case 'server.bundle_fetched':
this.serverStats.totalBundleFetches++;
break;
case 'server.prekeys_replenished':
this.serverStats.totalReplenishes++;
break;
case 'server.identity_deleted':
this.serverStats.registeredIdentities.delete(e.data.address);
this.serverStats.totalDeleted++;
break;
case 'server.rate_limited':
this.serverStats.totalRateLimited++;
break;
}
}
/** Get current snapshot */
snapshot(): ObserverSnapshot {
return {
identity: { ...this.identity },
sessions: Array.from(this.sessions.entries()).map(([address, s]) => ({
address,
...s,
})),
prekeys: { ...this.prekeys },
retiredIdentities: this.retiredIdentities,
server: {
registeredIdentities: new Set(this.serverStats.registeredIdentities),
totalBundleFetches: this.serverStats.totalBundleFetches,
totalReplenishes: this.serverStats.totalReplenishes,
totalDeleted: this.serverStats.totalDeleted,
totalRateLimited: this.serverStats.totalRateLimited,
},
};
}
/** Snapshot with serializable JSON (Set → array) */
toJSON(): any {
const s = this.snapshot();
return {
...s,
server: {
...s.server,
registeredIdentities: Array.from(s.server.registeredIdentities),
},
};
}
}

View File

@@ -0,0 +1,90 @@
import { Hono } from 'hono';
import { join } from 'path';
import { existsSync, readFileSync, statSync } from 'fs';
/**
* Serve the bundled dashboard SPA from /dashboard/.
*
* Looks for dist/ in the @shade/observer package directory.
* Falls back to a placeholder page if no build is present.
*/
export function createStaticRoutes(distDir: string): Hono {
const app = new Hono();
app.get('/dashboard', (c) => c.redirect('/dashboard/'));
app.get('/dashboard/*', async (c) => {
const url = new URL(c.req.url);
let path = url.pathname.replace(/^\/dashboard\/?/, '') || 'index.html';
// Prevent path traversal
if (path.includes('..')) {
return c.text('Forbidden', 403);
}
const fullPath = join(distDir, path);
if (!existsSync(fullPath) || !statSync(fullPath).isFile()) {
// Fall back to index.html for SPA routing
const indexPath = join(distDir, 'index.html');
if (!existsSync(indexPath)) {
return c.html(placeholderHtml());
}
const content = readFileSync(indexPath);
c.header('Content-Type', 'text/html; charset=utf-8');
return c.body(content as any);
}
const content = readFileSync(fullPath);
const ct = contentTypeFor(path);
c.header('Content-Type', ct);
if (path.endsWith('.html')) {
c.header('Cache-Control', 'no-cache');
} else {
c.header('Cache-Control', 'public, max-age=3600');
}
return c.body(content as any);
});
return app;
}
function contentTypeFor(path: string): string {
if (path.endsWith('.html')) return 'text/html; charset=utf-8';
if (path.endsWith('.js')) return 'application/javascript; charset=utf-8';
if (path.endsWith('.css')) return 'text/css; charset=utf-8';
if (path.endsWith('.json')) return 'application/json; charset=utf-8';
if (path.endsWith('.svg')) return 'image/svg+xml';
if (path.endsWith('.png')) return 'image/png';
if (path.endsWith('.woff2')) return 'font/woff2';
return 'application/octet-stream';
}
function placeholderHtml(): string {
return `<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Shade Observer</title>
<style>
body { font-family: system-ui; max-width: 600px; margin: 80px auto; padding: 0 20px; color: #d4d4d4; background: #0a0a0a; }
h1 { color: #f7c948; }
code { background: #1a1a1a; padding: 2px 6px; border-radius: 4px; }
a { color: #f7c948; }
</style>
</head>
<body>
<h1>Shade Observer</h1>
<p>The dashboard SPA hasn't been built yet. The observer API is running, but there's no UI bundled.</p>
<p>To build the dashboard:</p>
<pre><code>cd packages/shade-dashboard && bun run build</code></pre>
<p>Then re-run the observer.</p>
<h2>API endpoints</h2>
<ul>
<li><code>GET /api/state</code> — current snapshot (requires <code>Authorization: Bearer ...</code>)</li>
<li><code>GET /api/events</code> — SSE stream of live events</li>
<li><code>GET /health</code> — health check (no auth)</li>
</ul>
</body>
</html>`;
}