diff --git a/docs/adr/0011-user-profile-features.md b/docs/adr/0011-user-profile-features.md index cc9e340..8023a8e 100644 --- a/docs/adr/0011-user-profile-features.md +++ b/docs/adr/0011-user-profile-features.md @@ -60,13 +60,18 @@ Initial features: `completion_rate_30d`, `dismiss_rate_30d`, Phase B replaces this with event-driven incremental updates subscribing to `signals.tip.feedback`. -## Phase B (deferred) +## Phase B -- Subscribe to `signals.tip.feedback` for incremental updates instead of TTL. -- Extend the bandit feature vector to include profile features (deliberate - `D` change with state-migration plan). -- Admin page: per-user profile view + manual rebuild button. -- Staleness/data-quality alerts in `/admin/data-quality`. +- ✅ **B.1** — Per-user profile view + rebuild action in `/admin/users/:id`. +- ✅ **B.2** — Event-driven invalidation: features declare `invalidatedBy` + subjects in the registry; `profile/subscriber.ts` deletes the affected stored + rows on publish so the next `getProfile` call recomputes immediately rather + than waiting up to `ttlSec`. TTL stays as a safety net for clock drift / + dropped events. +- ✅ **B.4** — Staleness panel in `/admin/data-quality` (counts missing + stale + per feature across eligible users). +- ⏳ **B.3** — Extend the bandit feature vector to include profile features + (deliberate `D` change with state-migration plan + shadow rollout per ADR-0002). ## Alternatives considered diff --git a/services/api/src/index.ts b/services/api/src/index.ts index e67c280..f68719c 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -20,6 +20,8 @@ import { requireAdmin } from './middleware/admin.js'; import type { Request, Response } from 'express'; import { connectNats } from './events/nats.js'; import { startTodoistSyncScheduler } from './signals/scheduler.js'; +import { bus } from './events/bus.js'; +import { registerProfileSubscriptions } from './profile/subscriber.js'; await mkdir(dirname(config.DATABASE_PATH), { recursive: true }); runMigrations(); @@ -96,3 +98,7 @@ if (config.NATS_URL) { } startTodoistSyncScheduler(config.TODOIST_SYNC_INTERVAL_MS); + +// Profile features are invalidated on relevant signals (#81 phase B.2); +// TTL stays as a safety net for clock drift / dropped events. +registerProfileSubscriptions(bus); diff --git a/services/api/src/profile/__tests__/subscriber.test.ts b/services/api/src/profile/__tests__/subscriber.test.ts new file mode 100644 index 0000000..d03bf06 --- /dev/null +++ b/services/api/src/profile/__tests__/subscriber.test.ts @@ -0,0 +1,122 @@ +/** + * Profile event subscriber tests (#81 phase B.2). + * Constructs a fresh Bus per test, subscribes the profile invalidator, + * publishes events, asserts that the corresponding stored rows are deleted. + */ +import { describe, it, expect, vi, beforeAll, beforeEach } from 'vitest'; +import { Bus } from '../../events/bus.js'; +import { makeTestDb } from '../../test/db.js'; +import { users, userProfileFeatures } from '../../db/schema.js'; +import { eq } from 'drizzle-orm'; + +const testDb = makeTestDb(); +vi.mock('../../db/index.js', () => ({ db: testDb, rawSqlite: testDb.rawSqlite })); + +const { registerProfileSubscriptions } = await import('../subscriber.js'); + +const NOW = new Date().toISOString(); +const STALE_BASE = { + value: 0.42, + valueText: null as string | null, + updatedAt: NOW, +}; + +beforeAll(async () => { + await testDb.insert(users).values([ + { id: 'sub-user-1', email: 'sub1@test.com', role: 'user', consentGiven: true, consentAt: NOW, createdAt: NOW }, + { id: 'sub-user-2', email: 'sub2@test.com', role: 'user', consentGiven: true, consentAt: NOW, createdAt: NOW }, + ]); +}); + +beforeEach(async () => { + await testDb.delete(userProfileFeatures); + // Pre-seed all 5 features for both users so we can prove which got invalidated. + for (const userId of ['sub-user-1', 'sub-user-2']) { + for (const name of [ + 'completion_rate_30d', + 'dismiss_rate_30d', + 'mean_dwell_ms_30d', + 'preferred_hour', + 'tip_volume_30d', + ]) { + await testDb.insert(userProfileFeatures).values({ + userId, + name, + ...STALE_BASE, + ttlSec: 6 * 3600, + }); + } + } +}); + +async function namesFor(userId: string): Promise { + const rows = await testDb + .select({ name: userProfileFeatures.name }) + .from(userProfileFeatures) + .where(eq(userProfileFeatures.userId, userId)); + return rows.map((r) => r.name).sort(); +} + +describe('registerProfileSubscriptions', () => { + it('signals.tip.feedback invalidates the 4 reaction-derived features for the affected user only', async () => { + const bus = new Bus(); + registerProfileSubscriptions(bus); + + bus.publish('signals.tip.feedback', { + userId: 'sub-user-1', + tipId: 'tip:x', + action: 'done', + reward: 1, + dwellMs: 60_000, + createdAt: NOW, + }); + + // user-1 should have only tip_volume_30d remaining; others wiped + expect(await namesFor('sub-user-1')).toEqual(['tip_volume_30d']); + // user-2 untouched + expect((await namesFor('sub-user-2')).length).toBe(5); + }); + + it('signals.tip.served invalidates only tip_volume_30d', async () => { + const bus = new Bus(); + registerProfileSubscriptions(bus); + + bus.publish('signals.tip.served', { + userId: 'sub-user-1', + tipId: 'tip:y', + policy: 'egreedy', + servedAt: NOW, + }); + + expect(await namesFor('sub-user-1')).toEqual([ + 'completion_rate_30d', + 'dismiss_rate_30d', + 'mean_dwell_ms_30d', + 'preferred_hour', + ]); + }); + + it('events without userId in payload are ignored silently', async () => { + const bus = new Bus(); + registerProfileSubscriptions(bus); + + // emit a feedback subject with no userId + (bus as unknown as { emit: (s: string, p: unknown) => void }).emit('signals.tip.feedback', {}); + + expect((await namesFor('sub-user-1')).length).toBe(5); + expect((await namesFor('sub-user-2')).length).toBe(5); + }); + + it('throws at registration if a registry entry references an unknown subject', async () => { + // Spike a bogus subject into one feature, attempt registration, expect throw. + const { FEATURES } = await import('../registry.js'); + const original = FEATURES[0]?.invalidatedBy; + (FEATURES[0] as { invalidatedBy: readonly string[] }).invalidatedBy = ['signals.bogus']; + try { + const bus = new Bus(); + expect(() => registerProfileSubscriptions(bus)).toThrow(/unknown subject/); + } finally { + (FEATURES[0] as { invalidatedBy: readonly string[] | undefined }).invalidatedBy = original; + } + }); +}); diff --git a/services/api/src/profile/registry.ts b/services/api/src/profile/registry.ts index f38e01d..33e1ffd 100644 --- a/services/api/src/profile/registry.ts +++ b/services/api/src/profile/registry.ts @@ -23,6 +23,13 @@ export interface FeatureDefinition { ttlSec: number; /** Human-readable purpose; surfaced in admin UI eventually. */ description: string; + /** + * Bus subjects whose payloads invalidate this feature for the affected user + * (#81 phase B.2). Subjects must be `keyof EventMap` from `events/bus.ts` — + * the subscriber validates this at registration. Each event payload is + * required to carry a `userId` field. Empty/omitted = TTL-only refresh. + */ + invalidatedBy?: readonly string[]; /** Compute the feature for one user against the live DB. */ compute(userId: string, db: Database.Database): FeatureValue; } @@ -39,6 +46,7 @@ export const FEATURES: readonly FeatureDefinition[] = [ name: 'completion_rate_30d', dtype: 'numeric', ttlSec: 6 * HOUR, + invalidatedBy: ['signals.tip.feedback'], description: 'Fraction of tips served in the last 30 days that received a "done" reaction.', compute(userId, db) { const row = db @@ -58,6 +66,7 @@ export const FEATURES: readonly FeatureDefinition[] = [ name: 'dismiss_rate_30d', dtype: 'numeric', ttlSec: 6 * HOUR, + invalidatedBy: ['signals.tip.feedback'], description: 'Fraction of tips served in the last 30 days that received a "dismiss" reaction.', compute(userId, db) { const row = db @@ -77,6 +86,7 @@ export const FEATURES: readonly FeatureDefinition[] = [ name: 'mean_dwell_ms_30d', dtype: 'numeric', ttlSec: 6 * HOUR, + invalidatedBy: ['signals.tip.feedback'], description: 'Average dwell time (ms between served and reacted) over the last 30 days.', compute(userId, db) { const row = db @@ -93,6 +103,7 @@ export const FEATURES: readonly FeatureDefinition[] = [ name: 'preferred_hour', dtype: 'numeric', ttlSec: DAY, + invalidatedBy: ['signals.tip.feedback'], description: 'Hour-of-day with the most "done" reactions in the last 30 days (0–23).', compute(userId, db) { const row = db @@ -114,6 +125,7 @@ export const FEATURES: readonly FeatureDefinition[] = [ name: 'tip_volume_30d', dtype: 'numeric', ttlSec: HOUR, + invalidatedBy: ['signals.tip.served'], description: 'Number of tips served to the user in the last 30 days.', compute(userId, db) { const row = db diff --git a/services/api/src/profile/subscriber.ts b/services/api/src/profile/subscriber.ts new file mode 100644 index 0000000..e72e529 --- /dev/null +++ b/services/api/src/profile/subscriber.ts @@ -0,0 +1,68 @@ +/** + * Profile event subscriber (#81 phase B.2). + * + * Replaces (well — complements) TTL-based refresh with event-driven + * invalidation. When a relevant signal fires (e.g. `signals.tip.feedback`), + * the affected feature's stored row is deleted so the next `getProfile` + * call recomputes from current data instead of waiting up to `ttlSec`. + * + * The TTL itself stays as a safety net for clock drift and event drops. + */ +import type { Bus } from '../events/bus.js'; +import { rawSqlite } from '../db/index.js'; +import { FEATURES } from './registry.js'; + +/** + * Subjects we know how to handle. Anything declared in a feature's + * `invalidatedBy` that isn't here triggers a registration-time error so typos + * don't silently disable invalidation. Keep in sync with `EventMap` in + * `events/bus.ts`. + */ +const KNOWN_SUBJECTS = [ + 'signals.tip.served', + 'signals.tip.feedback', + 'signals.tip.reward_failed', + 'signals.task.synced', + 'signals.integration.token_expired', +] as const; +type KnownSubject = (typeof KNOWN_SUBJECTS)[number]; + +function isKnown(subject: string): subject is KnownSubject { + return (KNOWN_SUBJECTS as readonly string[]).includes(subject); +} + +/** Drop the stored row for one feature so the next read recomputes it. */ +function invalidate(userId: string, featureName: string): void { + rawSqlite + .prepare(`DELETE FROM user_profile_features WHERE user_id = ? AND name = ?`) + .run(userId, featureName); +} + +/** + * Walk the registry, group features by the subjects that should invalidate + * them, and subscribe one handler per subject (so we don't fire multiple + * deletes per event when several features share a subject). + */ +export function registerProfileSubscriptions(bus: Bus): void { + const bySubject = new Map(); + for (const feature of FEATURES) { + for (const subject of feature.invalidatedBy ?? []) { + if (!isKnown(subject)) { + throw new Error( + `profile/subscriber: feature "${feature.name}" lists unknown subject "${subject}". ` + + `Add it to KNOWN_SUBJECTS or fix the registry.`, + ); + } + if (!bySubject.has(subject)) bySubject.set(subject, []); + bySubject.get(subject)!.push(feature.name); + } + } + + for (const [subject, featureNames] of bySubject) { + bus.subscribe(subject, (payload: unknown) => { + const userId = (payload as { userId?: string } | null | undefined)?.userId; + if (!userId) return; // not all event shapes carry userId; skip silently + for (const name of featureNames) invalidate(userId, name); + }); + } +}