feat(profile): user-profile feature registry + builder (phase A)

Centralizes user-level features (completion_rate_30d, dismiss_rate_30d,
mean_dwell_ms_30d, preferred_hour, tip_volume_30d) in a TS registry that
owns both definition and SQL aggregation, since the data lives in the
TS-owned SQLite tables (tip_views/tip_feedback). Lazy TTL refresh keeps
recommend latency bounded; values persist in user_profile_features (KV).

ml/serving accepts profile_features on /score + /generate but does not
yet consume them — extending the bandit feature vector changes D and
resets every user's learned state, so that's a deliberate phase-B step.

Includes ml/features/profile_schema.py as a contract mirror with a sync
test that diffs name sets against registry.ts.

ADR-0011 records the data-locality reasoning (registry in TS, not Python
as the issue originally suggested).

Phase B (deferred): event-driven incremental updates, bandit consumption
with state migration, admin per-user profile page, staleness alerts.

Refs #81.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-25 00:22:22 +00:00
parent 430804e9a5
commit 7d4c29e137
13 changed files with 636 additions and 2 deletions

View File

@@ -100,6 +100,16 @@ export function runMigrations() {
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS user_profile_features (
user_id TEXT NOT NULL REFERENCES users(id),
name TEXT NOT NULL,
value REAL,
value_text TEXT,
updated_at TEXT NOT NULL,
ttl_sec INTEGER NOT NULL,
PRIMARY KEY (user_id, name)
);
CREATE TABLE IF NOT EXISTS sim_runs (
id TEXT PRIMARY KEY,
policy_a TEXT NOT NULL,

View File

@@ -87,6 +87,20 @@ export const tipScores = sqliteTable('tip_scores', {
tipKind: text('tip_kind'), // 'task' | 'advice' | 'insight' | 'reminder'
});
// ── User profile features (#81 phase A) ────────────────────────────────────
// One row per (userId, name). KV store for aggregated user-level features
// computed from tip_views/tip_feedback/tip_scores. Numeric values land in
// `value`; categorical/string values use `value_text` (never both). Entries
// are recomputed lazily by the profile builder when older than `ttl_sec`.
export const userProfileFeatures = sqliteTable('user_profile_features', {
userId: text('user_id').notNull().references(() => users.id),
name: text('name').notNull(), // e.g. 'completion_rate_30d'
value: integer('value', { mode: 'number' }), // numeric (REAL stored as number); null if categorical
valueText: text('value_text'), // categorical/string; null if numeric
updatedAt: text('updated_at').notNull(),
ttlSec: integer('ttl_sec').notNull(), // staleness threshold; 0 = never auto-refresh
});
// ── Simulation runs ──────────────────────────────────────────────────────────
// One row per offline simulation run (two-policy comparison).
export const simRuns = sqliteTable('sim_runs', {

View File

@@ -0,0 +1,159 @@
/**
* Profile builder tests — exercise lazy refresh, TTL skip, and rebuild.
* Uses an in-memory SQLite with the profile + tip tables; mocks db/index.js
* so getProfile/rebuildProfile see the test DB.
*/
import { describe, it, expect, vi, beforeAll, beforeEach } from 'vitest';
import { makeTestDb } from '../../test/db.js';
import {
users,
tipViews,
tipFeedback,
userProfileFeatures,
} from '../../db/schema.js';
import { eq } from 'drizzle-orm';
const testDb = makeTestDb();
vi.mock('../../db/index.js', () => ({ db: testDb, rawSqlite: testDb.rawSqlite }));
const { getProfile, rebuildProfile, readProfile } = await import('../builder.js');
const NOW = new Date().toISOString();
// 30s ago — magic-zone dwell when paired with a 'done'
const SHORT_AGO = new Date(Date.now() - 30_000).toISOString();
beforeAll(async () => {
await testDb.insert(users).values([
{ id: 'pf-user-1', email: 'pf1@test.com', role: 'user', consentGiven: true, consentAt: NOW, createdAt: NOW },
{ id: 'pf-user-empty', email: 'pfempty@test.com', role: 'user', consentGiven: true, consentAt: NOW, createdAt: NOW },
]);
});
beforeEach(async () => {
// Re-seed tip data each test so mutations in one test don't leak into the next.
// pf-user-1: 4 tips served, 2 done, 1 dismiss, 1 no reaction.
await testDb.delete(userProfileFeatures);
await testDb.delete(tipFeedback);
await testDb.delete(tipViews);
await testDb.insert(tipViews).values([
{ id: 'pv-1', userId: 'pf-user-1', tipId: 'tip:1', servedAt: SHORT_AGO },
{ id: 'pv-2', userId: 'pf-user-1', tipId: 'tip:2', servedAt: SHORT_AGO },
{ id: 'pv-3', userId: 'pf-user-1', tipId: 'tip:3', servedAt: SHORT_AGO },
{ id: 'pv-4', userId: 'pf-user-1', tipId: 'tip:4', servedAt: SHORT_AGO },
]);
await testDb.insert(tipFeedback).values([
{ id: 'pf-1', userId: 'pf-user-1', tipId: 'tip:1', action: 'done', dwellMs: 30_000, rewardMilli: 1000, createdAt: NOW },
{ id: 'pf-2', userId: 'pf-user-1', tipId: 'tip:2', action: 'done', dwellMs: 60_000, rewardMilli: 1000, createdAt: NOW },
{ id: 'pf-3', userId: 'pf-user-1', tipId: 'tip:3', action: 'dismiss', dwellMs: 10_000, rewardMilli: -1000, createdAt: NOW },
]);
});
describe('getProfile — first call computes + stores all features', () => {
it('returns numeric values matching the seeded data', async () => {
const profile = await getProfile('pf-user-1');
// 2 done out of 4 served = 0.5
expect(profile.completion_rate_30d).toBeCloseTo(0.5, 5);
// 1 dismiss out of 4 = 0.25
expect(profile.dismiss_rate_30d).toBeCloseTo(0.25, 5);
// mean dwell over 3 reacted: (30 + 60 + 10) / 3 = 33.33s
expect(profile.mean_dwell_ms_30d).toBeCloseTo(33_333.33, 0);
// tip volume served = 4
expect(profile.tip_volume_30d).toBe(4);
// preferred_hour = hour of NOW (only 'done' reactions seeded)
expect(typeof profile.preferred_hour).toBe('number');
});
it('persists rows with updated_at + ttl_sec', async () => {
await getProfile('pf-user-1');
const rows = await testDb
.select()
.from(userProfileFeatures)
.where(eq(userProfileFeatures.userId, 'pf-user-1'));
// 5 features in the registry → 5 rows
expect(rows).toHaveLength(5);
for (const row of rows) {
expect(row.updatedAt).toBeTruthy();
expect(row.ttlSec).toBeGreaterThan(0);
}
});
});
describe('getProfile — fresh row is returned without recompute', () => {
it('does not change updated_at on the second call within TTL', async () => {
await getProfile('pf-user-1');
const before = await testDb
.select()
.from(userProfileFeatures)
.where(eq(userProfileFeatures.userId, 'pf-user-1'));
const beforeTimestamps = before.map((r) => `${r.name}:${r.updatedAt}`).sort();
// Bend reality: change underlying tip data after the first compute.
// If the cache is honored, the returned value should NOT reflect this change.
await testDb.delete(tipFeedback);
const profile = await getProfile('pf-user-1');
const after = await testDb
.select()
.from(userProfileFeatures)
.where(eq(userProfileFeatures.userId, 'pf-user-1'));
const afterTimestamps = after.map((r) => `${r.name}:${r.updatedAt}`).sort();
expect(afterTimestamps).toEqual(beforeTimestamps);
// Cached completion_rate_30d should still be 0.5, not 0
expect(profile.completion_rate_30d).toBeCloseTo(0.5, 5);
});
});
describe('getProfile — stale row triggers recompute', () => {
it('refreshes when updated_at is older than ttl_sec', async () => {
// Pre-seed a stale row for completion_rate_30d (1 hour old, ttl = 6h is the registry default).
// To make it stale we need an updated_at older than ttl_sec — pick way-back.
const yearAgo = new Date(Date.now() - 365 * 86_400_000).toISOString();
await testDb.insert(userProfileFeatures).values({
userId: 'pf-user-1',
name: 'completion_rate_30d',
value: 0.99, // nonsense value to prove a refresh happened
valueText: null,
updatedAt: yearAgo,
ttlSec: 6 * 3600,
});
const profile = await getProfile('pf-user-1');
expect(profile.completion_rate_30d).toBeCloseTo(0.5, 5); // recomputed, not cached
});
});
describe('rebuildProfile + readProfile', () => {
it('rebuildProfile recomputes everything regardless of TTL', async () => {
await rebuildProfile('pf-user-1');
// Wipe reactions; views remain. SUM(done)/COUNT(*) = 0/4 = 0, not null.
await testDb.delete(tipFeedback);
const refreshed = await rebuildProfile('pf-user-1');
expect(refreshed.completion_rate_30d).toBe(0);
expect(refreshed.dismiss_rate_30d).toBe(0);
});
it('readProfile returns whatever is stored without computing', async () => {
expect(readProfile('pf-user-1')).toEqual({});
await getProfile('pf-user-1');
const stored = readProfile('pf-user-1');
expect(Object.keys(stored).sort()).toEqual([
'completion_rate_30d',
'dismiss_rate_30d',
'mean_dwell_ms_30d',
'preferred_hour',
'tip_volume_30d',
]);
});
});
describe('getProfile — empty user', () => {
it('returns nulls / zeros without throwing', async () => {
const profile = await getProfile('pf-user-empty');
expect(profile.completion_rate_30d).toBeNull(); // 0/0 → NULLIF guard
expect(profile.dismiss_rate_30d).toBeNull();
expect(profile.mean_dwell_ms_30d).toBeNull();
expect(profile.preferred_hour).toBeNull();
expect(profile.tip_volume_30d).toBe(0);
});
});

View File

@@ -0,0 +1,107 @@
/**
* User-profile builder (#81 phase A).
*
* Lazy refresh: `getProfile(userId)` returns every registered feature as a
* dict, recomputing any entry whose stored row is older than its `ttlSec`
* (or doesn't exist yet). All compute calls go against the raw better-sqlite3
* client because the SQL aggregations live in the registry.
*
* Phase B will add an event subscription so reactions update the relevant
* features incrementally instead of waiting for the next TTL window.
*/
import type Database from 'better-sqlite3';
import { db, rawSqlite } from '../db/index.js';
import { userProfileFeatures } from '../db/schema.js';
import { and, eq } from 'drizzle-orm';
import { FEATURES, type FeatureDefinition, type FeatureValue } from './registry.js';
export type Profile = Record<string, number | string | null>;
interface StoredRow {
name: string;
value: number | null;
value_text: string | null;
updated_at: string;
ttl_sec: number;
}
function readStored(userId: string): Map<string, StoredRow> {
const rows = rawSqlite
.prepare(`SELECT name, value, value_text, updated_at, ttl_sec FROM user_profile_features WHERE user_id = ?`)
.all(userId) as StoredRow[];
return new Map(rows.map((r) => [r.name, r]));
}
function isFresh(row: StoredRow, now: number): boolean {
if (row.ttl_sec === 0) return true;
const ageSec = (now - new Date(row.updated_at).getTime()) / 1000;
return ageSec < row.ttl_sec;
}
function valueOf(row: StoredRow): number | string | null {
return row.value ?? row.value_text;
}
async function upsertFeature(
userId: string,
feature: FeatureDefinition,
result: FeatureValue,
updatedAt: string,
): Promise<void> {
const value = result.kind === 'numeric' ? result.value : null;
const valueText = result.kind === 'categorical' ? result.value : null;
await db
.insert(userProfileFeatures)
.values({
userId,
name: feature.name,
value,
valueText,
updatedAt,
ttlSec: feature.ttlSec,
})
.onConflictDoUpdate({
target: [userProfileFeatures.userId, userProfileFeatures.name],
set: { value, valueText, updatedAt, ttlSec: feature.ttlSec },
});
}
/** Lazily refresh + return the full profile as `{ feature_name: value }`. */
export async function getProfile(userId: string, sqlite: Database.Database = rawSqlite): Promise<Profile> {
const stored = readStored(userId);
const now = Date.now();
const nowIso = new Date(now).toISOString();
const out: Profile = {};
for (const feature of FEATURES) {
const row = stored.get(feature.name);
if (row && isFresh(row, now)) {
out[feature.name] = valueOf(row);
continue;
}
const result = feature.compute(userId, sqlite);
await upsertFeature(userId, feature, result, nowIso);
out[feature.name] = result.value;
}
return out;
}
/** Force-recompute every feature. Useful for tests + future admin "rebuild" button. */
export async function rebuildProfile(userId: string, sqlite: Database.Database = rawSqlite): Promise<Profile> {
const nowIso = new Date().toISOString();
const out: Profile = {};
for (const feature of FEATURES) {
const result = feature.compute(userId, sqlite);
await upsertFeature(userId, feature, result, nowIso);
out[feature.name] = result.value;
}
return out;
}
/** Read-only fetch — no compute, no upsert. Returns whatever is stored (possibly stale, possibly empty). */
export function readProfile(userId: string): Profile {
const stored = readStored(userId);
const out: Profile = {};
for (const [name, row] of stored) out[name] = valueOf(row);
return out;
}

View File

@@ -0,0 +1,133 @@
/**
* User-profile feature registry (#81 phase A).
*
* Each entry declares a feature that can be aggregated from existing tables
* (tip_views, tip_feedback, tip_scores) and stored in user_profile_features.
* The Python schema mirror in `ml/features/profile_schema.py` documents the
* same contract so ml/serving knows what fields to expect.
*
* Compute functions take the raw better-sqlite3 client (rather than drizzle)
* because every feature here is a single SQL aggregation — query builder
* overhead and type yoga aren't worth it for one-line SELECTs.
*/
import type Database from 'better-sqlite3';
export type FeatureValue =
| { kind: 'numeric'; value: number | null }
| { kind: 'categorical'; value: string | null };
export interface FeatureDefinition {
name: string;
dtype: 'numeric' | 'categorical';
/** Seconds before the stored value is considered stale and recomputed. */
ttlSec: number;
/** Human-readable purpose; surfaced in admin UI eventually. */
description: string;
/** Compute the feature for one user against the live DB. */
compute(userId: string, db: Database.Database): FeatureValue;
}
const HOUR = 3600;
const DAY = 86_400;
// 30-day window pinned to "now" each compute. Using `datetime('now','-30 days')`
// keeps the SQL portable between the prod DB and in-memory test DBs.
const SINCE_30D = `datetime('now', '-30 days')`;
export const FEATURES: readonly FeatureDefinition[] = [
{
name: 'completion_rate_30d',
dtype: 'numeric',
ttlSec: 6 * HOUR,
description: 'Fraction of tips served in the last 30 days that received a "done" reaction.',
compute(userId, db) {
const row = db
.prepare(`
SELECT
CAST(SUM(CASE WHEN tf.action = 'done' THEN 1 ELSE 0 END) AS REAL)
/ NULLIF(COUNT(*), 0) AS rate
FROM tip_views tv
LEFT JOIN tip_feedback tf ON tf.tip_id = tv.tip_id
WHERE tv.user_id = ? AND tv.served_at >= ${SINCE_30D}
`)
.get(userId) as { rate: number | null } | undefined;
return { kind: 'numeric', value: row?.rate ?? null };
},
},
{
name: 'dismiss_rate_30d',
dtype: 'numeric',
ttlSec: 6 * HOUR,
description: 'Fraction of tips served in the last 30 days that received a "dismiss" reaction.',
compute(userId, db) {
const row = db
.prepare(`
SELECT
CAST(SUM(CASE WHEN tf.action = 'dismiss' THEN 1 ELSE 0 END) AS REAL)
/ NULLIF(COUNT(*), 0) AS rate
FROM tip_views tv
LEFT JOIN tip_feedback tf ON tf.tip_id = tv.tip_id
WHERE tv.user_id = ? AND tv.served_at >= ${SINCE_30D}
`)
.get(userId) as { rate: number | null } | undefined;
return { kind: 'numeric', value: row?.rate ?? null };
},
},
{
name: 'mean_dwell_ms_30d',
dtype: 'numeric',
ttlSec: 6 * HOUR,
description: 'Average dwell time (ms between served and reacted) over the last 30 days.',
compute(userId, db) {
const row = db
.prepare(`
SELECT AVG(tf.dwell_ms) AS avg_ms
FROM tip_feedback tf
WHERE tf.user_id = ? AND tf.created_at >= ${SINCE_30D} AND tf.dwell_ms IS NOT NULL
`)
.get(userId) as { avg_ms: number | null } | undefined;
return { kind: 'numeric', value: row?.avg_ms ?? null };
},
},
{
name: 'preferred_hour',
dtype: 'numeric',
ttlSec: DAY,
description: 'Hour-of-day with the most "done" reactions in the last 30 days (023).',
compute(userId, db) {
const row = db
.prepare(`
SELECT CAST(strftime('%H', tf.created_at) AS INTEGER) AS hour, COUNT(*) AS n
FROM tip_feedback tf
WHERE tf.user_id = ?
AND tf.action = 'done'
AND tf.created_at >= ${SINCE_30D}
GROUP BY hour
ORDER BY n DESC
LIMIT 1
`)
.get(userId) as { hour: number; n: number } | undefined;
return { kind: 'numeric', value: row?.hour ?? null };
},
},
{
name: 'tip_volume_30d',
dtype: 'numeric',
ttlSec: HOUR,
description: 'Number of tips served to the user in the last 30 days.',
compute(userId, db) {
const row = db
.prepare(`
SELECT COUNT(*) AS n
FROM tip_views
WHERE user_id = ? AND served_at >= ${SINCE_30D}
`)
.get(userId) as { n: number } | undefined;
return { kind: 'numeric', value: row?.n ?? 0 };
},
},
] as const;
export function findFeature(name: string): FeatureDefinition | undefined {
return FEATURES.find((f) => f.name === name);
}

View File

@@ -13,7 +13,7 @@ import { users, integrationTokens, tipScores } from '../../db/schema.js';
const testDb = makeTestDb();
vi.mock('../../db/index.js', () => ({ db: testDb }));
vi.mock('../../db/index.js', () => ({ db: testDb, rawSqlite: testDb.rawSqlite }));
vi.mock('../../middleware/session.js', () => ({
sessionMiddleware: (_req: express.Request, _res: express.Response, next: express.NextFunction) => next(),
requireAuth: (req: express.Request, _res: express.Response, next: express.NextFunction) => {

View File

@@ -10,6 +10,7 @@ import type { TipCandidate, Signal } from '@oo/shared-types';
import { todoistSource, dueAgeDays } from '../signals/todoist.js';
export { dueAgeDays };
import { SignalAggregator } from '../signals/aggregator.js';
import { getProfile, type Profile } from '../profile/builder.js';
const router: ExpressRouter = Router();
@@ -82,6 +83,7 @@ function signalToCandidate(signal: Signal): TipCandidate {
async function remotePolicy(
userId: string,
tasks: TipCandidate[],
profile: Profile,
): Promise<{ tipId: string; score: number; policy: string } | null> {
const hour = new Date().getHours();
const dayOfWeek = new Date().getDay();
@@ -96,6 +98,7 @@ async function remotePolicy(
features: t.features,
})),
context: { hour_of_day: hour, day_of_week: dayOfWeek },
profile_features: profile,
};
// Active policy: egreedy-v1 (selected over linucb-v1 after offline sim — ADR-0007)
@@ -141,6 +144,7 @@ async function fetchLlmCandidates(
hour: number,
dayOfWeek: number,
promptVersion: string | null,
profile: Profile,
): Promise<LlmGenerateResult> {
try {
const tasks = signals.slice(0, 10).map((s) => ({
@@ -156,6 +160,7 @@ async function fetchLlmCandidates(
user_id: userId,
context: { tasks, hour_of_day: hour, day_of_week: dayOfWeek },
n: 3,
profile_features: profile,
...(promptVersion ? { prompt_version: promptVersion } : {}),
}),
signal: AbortSignal.timeout(15_000),
@@ -208,6 +213,8 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
// Stage 1: assemble candidates — aggregated signals + LLM-generated advice (parallel)
const signals = await aggregator.fetchAll(req.userId!);
// Refresh + load the user-level profile feature dict (lazy TTL refresh).
const profile = await getProfile(req.userId!);
const signalCandidates = signals.map(signalToCandidate);
const requestedPromptVersion = pickPromptVersion();
@@ -217,6 +224,7 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
hour,
dayOfWeek,
requestedPromptVersion,
profile,
);
const allCandidates: TipCandidate[] = [...signalCandidates, ...llmResult.candidates];
@@ -231,7 +239,7 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
const t0 = Date.now();
// Stage 2: score — egreedy bandit with random fallback
const scored = await remotePolicy(req.userId!, allCandidates);
const scored = await remotePolicy(req.userId!, allCandidates, profile);
const latencyMs = Date.now() - t0;
const tip = scored
? (allCandidates.find((t) => t.id === scored.tipId) ?? randomPolicy(allCandidates))

View File

@@ -105,6 +105,16 @@ export function makeTestDb(): DrizzleDb & { rawSqlite: BetterSqlite3Database } {
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS user_profile_features (
user_id TEXT NOT NULL REFERENCES users(id),
name TEXT NOT NULL,
value REAL,
value_text TEXT,
updated_at TEXT NOT NULL,
ttl_sec INTEGER NOT NULL,
PRIMARY KEY (user_id, name)
);
CREATE TABLE IF NOT EXISTS sim_runs (
id TEXT PRIMARY KEY,
policy_a TEXT NOT NULL,