Files
oO/services/api/src/routes/recommender.ts
alvis e62c726ea4 feat: M1 admin console — all 10 remaining pages + signal/quality/ops infrastructure
Admin console (issues #63–72):
- Event stream viewer: live-tail ring buffer (500 events) with subject/user filters
- Feature store browser: per-user feature vector history from ml/serving
- Model registry panel: MLflow embed at /admin/models
- Experiment dashboard: LinUCB per-user stats (pulls, reward, θ) + bandit reset
- Recommendation log: per-tip explainability (policy, score, features, latency)
- Reward analytics: daily reaction breakdown + per-policy compare
- Data quality widget: missing-feature rate, stale-token rate, daily completeness
- Ops actions: replay-signal, policy enable/disable; user actions link to Users page
- SQL runner: read-only SELECT runner with saved queries
- Health rollup: fan-out to api/ml/sqlite/event-bus with auto-refresh

Backend:
- tip_scores table: logs features+policy+score+latency at every scoring call (#67)
- saved_queries table: per-admin saved SQL (#71)
- Event bus: 500-event ring buffer + tail() API (#63)
- Admin routes: /events, /tips, /reward-analytics, /data-quality, /health,
  /policies, /replay-signal, /sql, /saved-queries endpoints
- /api/ml/* admin-gated proxy to ml/serving (#64, #66)
- Shadow-policy registry in recommender (#56)

ML serving:
- /reset/{user_id}: clear bandit state + feature history (#66)
- /stats/{user_id}: pulls, cumulative reward, estimated mean, θ (#66)
- /features/{user_id}: last 100 feature vectors logged at scoring time (#64)
- Meta (pulls, rewards) persisted alongside A/b matrices

Web:
- Tip action sheet adds Helpful / Not helpful buttons (#62)
- TipFeedback type extended with helpful/not_helpful actions
- Rewards mapped: helpful=+0.5, not_helpful=−0.5

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-16 03:56:48 +00:00

310 lines
9.2 KiB
TypeScript

import { type Router as ExpressRouter, Router, Response } from 'express';
import { nanoid } from 'nanoid';
import { db } from '../db/index.js';
import { integrationTokens, tipFeedback, tipViews, tipScores } from '../db/schema.js';
import { eq, and } from 'drizzle-orm';
import { requireAuth, AuthenticatedRequest } from '../middleware/session.js';
import { config } from '../config.js';
import { bus } from '../events/bus.js';
import type { Tip } from '@oo/shared-types';
const router: ExpressRouter = Router();
const CACHE_TTL_MS = 30_000;
interface TaskFeatures {
is_overdue: boolean;
task_age_days: number;
priority: number;
}
interface CachedTask extends Tip {
features: TaskFeatures;
}
const taskCache = new Map<string, { tasks: CachedTask[]; fetchedAt: number }>();
// ---------------------------------------------------------------------------
// Shadow-policy registry
// ---------------------------------------------------------------------------
// A shadow policy runs alongside the active policy, logs its picks, but does
// NOT affect what the user sees. Promotion to A/B or live is a manual step.
// Structure: Map<policyName, { active: boolean }>
const shadowPolicies = new Map<string, { active: boolean }>([
// Example: enable random as a shadow baseline
// ('random-shadow', { active: true }),
]);
export function getShadowPolicies() {
return Array.from(shadowPolicies.entries()).map(([name, s]) => ({ name, ...s }));
}
export function setPolicyActive(name: string, active: boolean): boolean {
if (!shadowPolicies.has(name)) return false;
shadowPolicies.set(name, { active });
return true;
}
// ---------------------------------------------------------------------------
// Todoist helpers
// ---------------------------------------------------------------------------
function dueAgeDays(due: { date?: string; datetime?: string } | null | undefined): number {
if (!due) return 0;
const dateStr = due.datetime ?? due.date;
if (!dateStr) return 0;
const dueMs = new Date(dateStr).getTime();
return Math.max(0, (Date.now() - dueMs) / (1000 * 60 * 60 * 24));
}
async function fetchTodoistTasks(userId: string, accessToken: string): Promise<CachedTask[]> {
const cached = taskCache.get(userId);
if (cached && Date.now() - cached.fetchedAt < CACHE_TTL_MS) return cached.tasks;
const res = await fetch('https://api.todoist.com/api/v1/tasks?filter=today%7Coverdue', {
headers: { Authorization: `Bearer ${accessToken}` },
});
if (!res.ok) return cached?.tasks ?? [];
const body = (await res.json()) as {
results: Array<{
id: string;
content: string;
priority: number;
due: { date?: string; datetime?: string; is_recurring?: boolean } | null;
}>;
};
const now = new Date();
const tasks: CachedTask[] = (body.results ?? []).map((t) => {
const ageDays = dueAgeDays(t.due);
const isOverdue = ageDays > 0;
return {
id: `todoist:${t.id}`,
content: t.content,
source: 'todoist' as const,
sourceId: t.id,
createdAt: now.toISOString(),
features: {
is_overdue: isOverdue,
task_age_days: ageDays,
priority: t.priority ?? 1,
},
};
});
taskCache.set(userId, { tasks, fetchedAt: Date.now() });
bus.publish('signals.task.synced', { userId, count: tasks.length, syncedAt: now.toISOString() });
return tasks;
}
/** Call ml/serving for scored selection; returns { tip_id, score } or null on failure */
async function remotePolicy(
userId: string,
tasks: CachedTask[],
): Promise<{ tipId: string; score: number } | null> {
const hour = new Date().getHours();
const dayOfWeek = new Date().getDay();
const body = {
user_id: userId,
candidates: tasks.map((t) => ({
id: t.id,
content: t.content,
source: t.source,
source_id: t.sourceId ?? null,
features: t.features,
})),
context: { hour_of_day: hour, day_of_week: dayOfWeek },
};
try {
const res = await fetch(`${config.ML_SERVING_URL}/score`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
signal: AbortSignal.timeout(3000),
});
if (!res.ok) return null;
const data = (await res.json()) as { tip_id: string; score: number };
return { tipId: data.tip_id, score: data.score };
} catch {
return null;
}
}
function randomPolicy(candidates: CachedTask[]): CachedTask | null {
if (!candidates.length) return null;
return candidates[Math.floor(Math.random() * candidates.length)];
}
// ---------------------------------------------------------------------------
// POST /api/recommend
// ---------------------------------------------------------------------------
router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
const [token] = await db
.select()
.from(integrationTokens)
.where(and(eq(integrationTokens.userId, req.userId!), eq(integrationTokens.provider, 'todoist')))
.limit(1);
if (!token) {
res.status(422).json({ error: 'No integrations connected' });
return;
}
const tasks = await fetchTodoistTasks(req.userId!, token.accessToken);
if (!tasks.length) {
res.status(204).end();
return;
}
const hour = new Date().getHours();
const dayOfWeek = new Date().getDay();
const t0 = Date.now();
// RemotePolicy with RandomPolicy fallback
const scored = await remotePolicy(req.userId!, tasks);
const latencyMs = Date.now() - t0;
const tip = scored
? (tasks.find((t) => t.id === scored.tipId) ?? randomPolicy(tasks))
: randomPolicy(tasks);
if (!tip) {
res.status(204).end();
return;
}
const policy = scored ? 'linucb-v1' : 'random';
const servedAt = new Date().toISOString();
await db.insert(tipViews).values({ id: nanoid(), userId: req.userId!, tipId: tip.id, servedAt });
// Log recommendation explainability
await db.insert(tipScores).values({
id: nanoid(),
userId: req.userId!,
tipId: tip.id,
policy,
mlScore: scored ? Math.round(scored.score * 1000) : null,
featuresJson: JSON.stringify({
is_overdue: tip.features.is_overdue,
task_age_days: tip.features.task_age_days,
priority: tip.features.priority,
hour_of_day: hour,
day_of_week: dayOfWeek,
}),
candidateCount: tasks.length,
latencyMs,
servedAt,
});
bus.publish('signals.tip.served', {
userId: req.userId!,
tipId: tip.id,
policy,
servedAt,
});
// Run shadow policies (fire-and-forget, no effect on user)
for (const [name, s] of shadowPolicies) {
if (!s.active) continue;
if (name.startsWith('random')) {
const shadowTip = randomPolicy(tasks);
bus.publish('signals.tip.served', {
userId: req.userId!,
tipId: shadowTip?.id ?? 'none',
policy: `shadow:${name}`,
servedAt,
});
}
}
res.json({ tip });
});
// ---------------------------------------------------------------------------
// POST /api/tip/:id/feedback
// ---------------------------------------------------------------------------
router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
const { action } = req.body as { action: string };
const tipId = String(req.params.id);
const validActions = ['done', 'dismiss', 'snooze', 'helpful', 'not_helpful'];
if (!validActions.includes(action)) {
res.status(400).json({ error: 'Invalid action' });
return;
}
await db.insert(tipFeedback).values({
id: nanoid(),
userId: req.userId!,
tipId,
action,
sourceId: tipId.startsWith('todoist:') ? tipId.slice(8) : null,
createdAt: new Date().toISOString(),
});
// Map action to reward (helpful/not_helpful supplement behavioural signals)
const rewardMap: Record<string, number> = {
done: 1.0,
helpful: 0.5,
snooze: 0.0,
not_helpful: -0.5,
dismiss: -1.0,
};
const reward = rewardMap[action] ?? 0.0;
const task = taskCache.get(req.userId!)?.tasks.find((t) => t.id === tipId);
// Clear cache on behavioural actions (not on explicit helpful/not_helpful)
if (['done', 'dismiss', 'snooze'].includes(action)) {
taskCache.delete(req.userId!);
}
bus.publish('signals.tip.feedback', {
userId: req.userId!,
tipId,
action: action as 'done' | 'dismiss' | 'snooze' | 'helpful' | 'not_helpful',
reward,
createdAt: new Date().toISOString(),
});
if (task) {
fetch(`${config.ML_SERVING_URL}/reward`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
user_id: req.userId!,
tip_id: tipId,
reward,
features: task.features,
}),
}).catch(() => {});
}
// Mark complete in Todoist if done
if (action === 'done' && tipId.startsWith('todoist:')) {
const todoistId = tipId.slice(8);
const [tok] = await db
.select()
.from(integrationTokens)
.where(and(eq(integrationTokens.userId, req.userId!), eq(integrationTokens.provider, 'todoist')))
.limit(1);
if (tok) {
await fetch(`https://api.todoist.com/api/v1/tasks/${todoistId}/close`, {
method: 'POST',
headers: { Authorization: `Bearer ${tok.accessToken}` },
}).catch(() => {});
}
}
res.json({ ok: true });
});
export { router as recommenderRouter };