feat(profile): event-driven invalidation (#81 phase B.2)
Features now declare invalidatedBy subjects in the registry; the new profile/subscriber.ts subscribes to each unique subject and drops matching stored rows for the userId in the payload. Next getProfile call recomputes from current data instead of waiting up to ttlSec. Wiring: completion_rate_30d, dismiss_rate_30d, mean_dwell_ms_30d, preferred_hour ← signals.tip.feedback tip_volume_30d ← signals.tip.served TTL stays as a safety net for clock drift and dropped events. Registration validates each declared subject against KNOWN_SUBJECTS (mirror of EventMap) so typos throw at startup, not silently. ADR-0011 updated. Refs #81. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -60,13 +60,18 @@ Initial features: `completion_rate_30d`, `dismiss_rate_30d`,
|
|||||||
Phase B replaces this with event-driven incremental updates subscribing to
|
Phase B replaces this with event-driven incremental updates subscribing to
|
||||||
`signals.tip.feedback`.
|
`signals.tip.feedback`.
|
||||||
|
|
||||||
## Phase B (deferred)
|
## Phase B
|
||||||
|
|
||||||
- Subscribe to `signals.tip.feedback` for incremental updates instead of TTL.
|
- ✅ **B.1** — Per-user profile view + rebuild action in `/admin/users/:id`.
|
||||||
- Extend the bandit feature vector to include profile features (deliberate
|
- ✅ **B.2** — Event-driven invalidation: features declare `invalidatedBy`
|
||||||
`D` change with state-migration plan).
|
subjects in the registry; `profile/subscriber.ts` deletes the affected stored
|
||||||
- Admin page: per-user profile view + manual rebuild button.
|
rows on publish so the next `getProfile` call recomputes immediately rather
|
||||||
- Staleness/data-quality alerts in `/admin/data-quality`.
|
than waiting up to `ttlSec`. TTL stays as a safety net for clock drift /
|
||||||
|
dropped events.
|
||||||
|
- ✅ **B.4** — Staleness panel in `/admin/data-quality` (counts missing + stale
|
||||||
|
per feature across eligible users).
|
||||||
|
- ⏳ **B.3** — Extend the bandit feature vector to include profile features
|
||||||
|
(deliberate `D` change with state-migration plan + shadow rollout per ADR-0002).
|
||||||
|
|
||||||
## Alternatives considered
|
## Alternatives considered
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,8 @@ import { requireAdmin } from './middleware/admin.js';
|
|||||||
import type { Request, Response } from 'express';
|
import type { Request, Response } from 'express';
|
||||||
import { connectNats } from './events/nats.js';
|
import { connectNats } from './events/nats.js';
|
||||||
import { startTodoistSyncScheduler } from './signals/scheduler.js';
|
import { startTodoistSyncScheduler } from './signals/scheduler.js';
|
||||||
|
import { bus } from './events/bus.js';
|
||||||
|
import { registerProfileSubscriptions } from './profile/subscriber.js';
|
||||||
|
|
||||||
await mkdir(dirname(config.DATABASE_PATH), { recursive: true });
|
await mkdir(dirname(config.DATABASE_PATH), { recursive: true });
|
||||||
runMigrations();
|
runMigrations();
|
||||||
@@ -96,3 +98,7 @@ if (config.NATS_URL) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
startTodoistSyncScheduler(config.TODOIST_SYNC_INTERVAL_MS);
|
startTodoistSyncScheduler(config.TODOIST_SYNC_INTERVAL_MS);
|
||||||
|
|
||||||
|
// Profile features are invalidated on relevant signals (#81 phase B.2);
|
||||||
|
// TTL stays as a safety net for clock drift / dropped events.
|
||||||
|
registerProfileSubscriptions(bus);
|
||||||
|
|||||||
122
services/api/src/profile/__tests__/subscriber.test.ts
Normal file
122
services/api/src/profile/__tests__/subscriber.test.ts
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
/**
|
||||||
|
* Profile event subscriber tests (#81 phase B.2).
|
||||||
|
* Constructs a fresh Bus per test, subscribes the profile invalidator,
|
||||||
|
* publishes events, asserts that the corresponding stored rows are deleted.
|
||||||
|
*/
|
||||||
|
import { describe, it, expect, vi, beforeAll, beforeEach } from 'vitest';
|
||||||
|
import { Bus } from '../../events/bus.js';
|
||||||
|
import { makeTestDb } from '../../test/db.js';
|
||||||
|
import { users, userProfileFeatures } from '../../db/schema.js';
|
||||||
|
import { eq } from 'drizzle-orm';
|
||||||
|
|
||||||
|
const testDb = makeTestDb();
|
||||||
|
vi.mock('../../db/index.js', () => ({ db: testDb, rawSqlite: testDb.rawSqlite }));
|
||||||
|
|
||||||
|
const { registerProfileSubscriptions } = await import('../subscriber.js');
|
||||||
|
|
||||||
|
const NOW = new Date().toISOString();
|
||||||
|
const STALE_BASE = {
|
||||||
|
value: 0.42,
|
||||||
|
valueText: null as string | null,
|
||||||
|
updatedAt: NOW,
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
await testDb.insert(users).values([
|
||||||
|
{ id: 'sub-user-1', email: 'sub1@test.com', role: 'user', consentGiven: true, consentAt: NOW, createdAt: NOW },
|
||||||
|
{ id: 'sub-user-2', email: 'sub2@test.com', role: 'user', consentGiven: true, consentAt: NOW, createdAt: NOW },
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
await testDb.delete(userProfileFeatures);
|
||||||
|
// Pre-seed all 5 features for both users so we can prove which got invalidated.
|
||||||
|
for (const userId of ['sub-user-1', 'sub-user-2']) {
|
||||||
|
for (const name of [
|
||||||
|
'completion_rate_30d',
|
||||||
|
'dismiss_rate_30d',
|
||||||
|
'mean_dwell_ms_30d',
|
||||||
|
'preferred_hour',
|
||||||
|
'tip_volume_30d',
|
||||||
|
]) {
|
||||||
|
await testDb.insert(userProfileFeatures).values({
|
||||||
|
userId,
|
||||||
|
name,
|
||||||
|
...STALE_BASE,
|
||||||
|
ttlSec: 6 * 3600,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
async function namesFor(userId: string): Promise<string[]> {
|
||||||
|
const rows = await testDb
|
||||||
|
.select({ name: userProfileFeatures.name })
|
||||||
|
.from(userProfileFeatures)
|
||||||
|
.where(eq(userProfileFeatures.userId, userId));
|
||||||
|
return rows.map((r) => r.name).sort();
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('registerProfileSubscriptions', () => {
|
||||||
|
it('signals.tip.feedback invalidates the 4 reaction-derived features for the affected user only', async () => {
|
||||||
|
const bus = new Bus();
|
||||||
|
registerProfileSubscriptions(bus);
|
||||||
|
|
||||||
|
bus.publish('signals.tip.feedback', {
|
||||||
|
userId: 'sub-user-1',
|
||||||
|
tipId: 'tip:x',
|
||||||
|
action: 'done',
|
||||||
|
reward: 1,
|
||||||
|
dwellMs: 60_000,
|
||||||
|
createdAt: NOW,
|
||||||
|
});
|
||||||
|
|
||||||
|
// user-1 should have only tip_volume_30d remaining; others wiped
|
||||||
|
expect(await namesFor('sub-user-1')).toEqual(['tip_volume_30d']);
|
||||||
|
// user-2 untouched
|
||||||
|
expect((await namesFor('sub-user-2')).length).toBe(5);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('signals.tip.served invalidates only tip_volume_30d', async () => {
|
||||||
|
const bus = new Bus();
|
||||||
|
registerProfileSubscriptions(bus);
|
||||||
|
|
||||||
|
bus.publish('signals.tip.served', {
|
||||||
|
userId: 'sub-user-1',
|
||||||
|
tipId: 'tip:y',
|
||||||
|
policy: 'egreedy',
|
||||||
|
servedAt: NOW,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(await namesFor('sub-user-1')).toEqual([
|
||||||
|
'completion_rate_30d',
|
||||||
|
'dismiss_rate_30d',
|
||||||
|
'mean_dwell_ms_30d',
|
||||||
|
'preferred_hour',
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('events without userId in payload are ignored silently', async () => {
|
||||||
|
const bus = new Bus();
|
||||||
|
registerProfileSubscriptions(bus);
|
||||||
|
|
||||||
|
// emit a feedback subject with no userId
|
||||||
|
(bus as unknown as { emit: (s: string, p: unknown) => void }).emit('signals.tip.feedback', {});
|
||||||
|
|
||||||
|
expect((await namesFor('sub-user-1')).length).toBe(5);
|
||||||
|
expect((await namesFor('sub-user-2')).length).toBe(5);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('throws at registration if a registry entry references an unknown subject', async () => {
|
||||||
|
// Spike a bogus subject into one feature, attempt registration, expect throw.
|
||||||
|
const { FEATURES } = await import('../registry.js');
|
||||||
|
const original = FEATURES[0]?.invalidatedBy;
|
||||||
|
(FEATURES[0] as { invalidatedBy: readonly string[] }).invalidatedBy = ['signals.bogus'];
|
||||||
|
try {
|
||||||
|
const bus = new Bus();
|
||||||
|
expect(() => registerProfileSubscriptions(bus)).toThrow(/unknown subject/);
|
||||||
|
} finally {
|
||||||
|
(FEATURES[0] as { invalidatedBy: readonly string[] | undefined }).invalidatedBy = original;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -23,6 +23,13 @@ export interface FeatureDefinition {
|
|||||||
ttlSec: number;
|
ttlSec: number;
|
||||||
/** Human-readable purpose; surfaced in admin UI eventually. */
|
/** Human-readable purpose; surfaced in admin UI eventually. */
|
||||||
description: string;
|
description: string;
|
||||||
|
/**
|
||||||
|
* Bus subjects whose payloads invalidate this feature for the affected user
|
||||||
|
* (#81 phase B.2). Subjects must be `keyof EventMap` from `events/bus.ts` —
|
||||||
|
* the subscriber validates this at registration. Each event payload is
|
||||||
|
* required to carry a `userId` field. Empty/omitted = TTL-only refresh.
|
||||||
|
*/
|
||||||
|
invalidatedBy?: readonly string[];
|
||||||
/** Compute the feature for one user against the live DB. */
|
/** Compute the feature for one user against the live DB. */
|
||||||
compute(userId: string, db: Database.Database): FeatureValue;
|
compute(userId: string, db: Database.Database): FeatureValue;
|
||||||
}
|
}
|
||||||
@@ -39,6 +46,7 @@ export const FEATURES: readonly FeatureDefinition[] = [
|
|||||||
name: 'completion_rate_30d',
|
name: 'completion_rate_30d',
|
||||||
dtype: 'numeric',
|
dtype: 'numeric',
|
||||||
ttlSec: 6 * HOUR,
|
ttlSec: 6 * HOUR,
|
||||||
|
invalidatedBy: ['signals.tip.feedback'],
|
||||||
description: 'Fraction of tips served in the last 30 days that received a "done" reaction.',
|
description: 'Fraction of tips served in the last 30 days that received a "done" reaction.',
|
||||||
compute(userId, db) {
|
compute(userId, db) {
|
||||||
const row = db
|
const row = db
|
||||||
@@ -58,6 +66,7 @@ export const FEATURES: readonly FeatureDefinition[] = [
|
|||||||
name: 'dismiss_rate_30d',
|
name: 'dismiss_rate_30d',
|
||||||
dtype: 'numeric',
|
dtype: 'numeric',
|
||||||
ttlSec: 6 * HOUR,
|
ttlSec: 6 * HOUR,
|
||||||
|
invalidatedBy: ['signals.tip.feedback'],
|
||||||
description: 'Fraction of tips served in the last 30 days that received a "dismiss" reaction.',
|
description: 'Fraction of tips served in the last 30 days that received a "dismiss" reaction.',
|
||||||
compute(userId, db) {
|
compute(userId, db) {
|
||||||
const row = db
|
const row = db
|
||||||
@@ -77,6 +86,7 @@ export const FEATURES: readonly FeatureDefinition[] = [
|
|||||||
name: 'mean_dwell_ms_30d',
|
name: 'mean_dwell_ms_30d',
|
||||||
dtype: 'numeric',
|
dtype: 'numeric',
|
||||||
ttlSec: 6 * HOUR,
|
ttlSec: 6 * HOUR,
|
||||||
|
invalidatedBy: ['signals.tip.feedback'],
|
||||||
description: 'Average dwell time (ms between served and reacted) over the last 30 days.',
|
description: 'Average dwell time (ms between served and reacted) over the last 30 days.',
|
||||||
compute(userId, db) {
|
compute(userId, db) {
|
||||||
const row = db
|
const row = db
|
||||||
@@ -93,6 +103,7 @@ export const FEATURES: readonly FeatureDefinition[] = [
|
|||||||
name: 'preferred_hour',
|
name: 'preferred_hour',
|
||||||
dtype: 'numeric',
|
dtype: 'numeric',
|
||||||
ttlSec: DAY,
|
ttlSec: DAY,
|
||||||
|
invalidatedBy: ['signals.tip.feedback'],
|
||||||
description: 'Hour-of-day with the most "done" reactions in the last 30 days (0–23).',
|
description: 'Hour-of-day with the most "done" reactions in the last 30 days (0–23).',
|
||||||
compute(userId, db) {
|
compute(userId, db) {
|
||||||
const row = db
|
const row = db
|
||||||
@@ -114,6 +125,7 @@ export const FEATURES: readonly FeatureDefinition[] = [
|
|||||||
name: 'tip_volume_30d',
|
name: 'tip_volume_30d',
|
||||||
dtype: 'numeric',
|
dtype: 'numeric',
|
||||||
ttlSec: HOUR,
|
ttlSec: HOUR,
|
||||||
|
invalidatedBy: ['signals.tip.served'],
|
||||||
description: 'Number of tips served to the user in the last 30 days.',
|
description: 'Number of tips served to the user in the last 30 days.',
|
||||||
compute(userId, db) {
|
compute(userId, db) {
|
||||||
const row = db
|
const row = db
|
||||||
|
|||||||
68
services/api/src/profile/subscriber.ts
Normal file
68
services/api/src/profile/subscriber.ts
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
/**
|
||||||
|
* Profile event subscriber (#81 phase B.2).
|
||||||
|
*
|
||||||
|
* Replaces (well — complements) TTL-based refresh with event-driven
|
||||||
|
* invalidation. When a relevant signal fires (e.g. `signals.tip.feedback`),
|
||||||
|
* the affected feature's stored row is deleted so the next `getProfile`
|
||||||
|
* call recomputes from current data instead of waiting up to `ttlSec`.
|
||||||
|
*
|
||||||
|
* The TTL itself stays as a safety net for clock drift and event drops.
|
||||||
|
*/
|
||||||
|
import type { Bus } from '../events/bus.js';
|
||||||
|
import { rawSqlite } from '../db/index.js';
|
||||||
|
import { FEATURES } from './registry.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subjects we know how to handle. Anything declared in a feature's
|
||||||
|
* `invalidatedBy` that isn't here triggers a registration-time error so typos
|
||||||
|
* don't silently disable invalidation. Keep in sync with `EventMap` in
|
||||||
|
* `events/bus.ts`.
|
||||||
|
*/
|
||||||
|
const KNOWN_SUBJECTS = [
|
||||||
|
'signals.tip.served',
|
||||||
|
'signals.tip.feedback',
|
||||||
|
'signals.tip.reward_failed',
|
||||||
|
'signals.task.synced',
|
||||||
|
'signals.integration.token_expired',
|
||||||
|
] as const;
|
||||||
|
type KnownSubject = (typeof KNOWN_SUBJECTS)[number];
|
||||||
|
|
||||||
|
function isKnown(subject: string): subject is KnownSubject {
|
||||||
|
return (KNOWN_SUBJECTS as readonly string[]).includes(subject);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Drop the stored row for one feature so the next read recomputes it. */
|
||||||
|
function invalidate(userId: string, featureName: string): void {
|
||||||
|
rawSqlite
|
||||||
|
.prepare(`DELETE FROM user_profile_features WHERE user_id = ? AND name = ?`)
|
||||||
|
.run(userId, featureName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Walk the registry, group features by the subjects that should invalidate
|
||||||
|
* them, and subscribe one handler per subject (so we don't fire multiple
|
||||||
|
* deletes per event when several features share a subject).
|
||||||
|
*/
|
||||||
|
export function registerProfileSubscriptions(bus: Bus): void {
|
||||||
|
const bySubject = new Map<KnownSubject, string[]>();
|
||||||
|
for (const feature of FEATURES) {
|
||||||
|
for (const subject of feature.invalidatedBy ?? []) {
|
||||||
|
if (!isKnown(subject)) {
|
||||||
|
throw new Error(
|
||||||
|
`profile/subscriber: feature "${feature.name}" lists unknown subject "${subject}". ` +
|
||||||
|
`Add it to KNOWN_SUBJECTS or fix the registry.`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (!bySubject.has(subject)) bySubject.set(subject, []);
|
||||||
|
bySubject.get(subject)!.push(feature.name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const [subject, featureNames] of bySubject) {
|
||||||
|
bus.subscribe(subject, (payload: unknown) => {
|
||||||
|
const userId = (payload as { userId?: string } | null | undefined)?.userId;
|
||||||
|
if (!userId) return; // not all event shapes carry userId; skip silently
|
||||||
|
for (const name of featureNames) invalidate(userId, name);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user