feat: M1 — LinUCB bandit, RemotePolicy, Web Push, event bus

ML serving:
- LinUCB contextual bandit (disjoint, d=5 features: hour_sin/cos, is_overdue, task_age, priority)
- /score endpoint replaces stub random; /reward endpoint for online learning
- Per-user model state persisted to disk as JSON (survives restarts)
- venv at ml/serving/.venv; start with pnpm dev from ml/serving

Recommender:
- Todoist fetch now extracts features (is_overdue, task_age_days, priority)
- RemotePolicy calls ml/serving with 3s timeout; falls back to RandomPolicy
- Reward sent to /reward on feedback (done=+1, snooze=0, dismiss=-1)

Web Push:
- VAPID keys in config; push_subscriptions table in DB
- POST/DELETE /api/push/subscribe; GET /api/push/vapid-public-key
- Service worker (public/sw.js): push → showNotification, notificationclick → focus/open
- "notify me" button on tip page; registers SW + subscribes on permission grant

Event bus:
- services/api/src/events/bus.ts: typed EventEmitter wrapper
- Subjects: signals.tip.served, signals.tip.feedback, signals.task.synced
- Same publish/subscribe API NATS JetStream will implement — swap is mechanical

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-15 14:08:00 +00:00
parent 08dfa1d8c9
commit c7edd92e15
16 changed files with 648 additions and 75 deletions

View File

@@ -32,4 +32,8 @@ export const config = {
WEB_BASE_URL: optional('WEB_BASE_URL', 'http://localhost:3000'),
ML_SERVING_URL: optional('ML_SERVING_URL', 'http://localhost:8000'),
VAPID_PUBLIC_KEY: optional('VAPID_PUBLIC_KEY', ''),
VAPID_PRIVATE_KEY: optional('VAPID_PRIVATE_KEY', ''),
VAPID_SUBJECT: optional('VAPID_SUBJECT', 'mailto:admin@localhost'),
};

View File

@@ -39,6 +39,15 @@ export const tipViews = sqliteTable('tip_views', {
servedAt: text('served_at').notNull(),
});
export const pushSubscriptions = sqliteTable('push_subscriptions', {
id: text('id').primaryKey(),
userId: text('user_id').notNull().references(() => users.id),
endpoint: text('endpoint').notNull().unique(),
p256dh: text('p256dh').notNull(),
auth: text('auth').notNull(),
createdAt: text('created_at').notNull(),
});
export const sessions = sqliteTable('sessions', {
id: text('id').primaryKey(),
userId: text('user_id').notNull().references(() => users.id),

View File

@@ -0,0 +1,52 @@
/**
* EventBus — in-process today, NATS JetStream tomorrow.
*
* To swap to NATS: replace the EventEmitter body with a NATS JetStream
* publish call. Subjects and payload shapes are the durable contract.
*
* Subjects follow the pattern: <domain>.<entity>.<verb>
* signals.tip.served — a tip was returned to the client
* signals.tip.feedback — user reacted (done / dismiss / snooze)
* signals.task.synced — Todoist task list refreshed for a user
*/
import { EventEmitter } from 'events';
export type TipServedEvent = {
userId: string;
tipId: string;
policy: string;
servedAt: string;
};
export type TipFeedbackEvent = {
userId: string;
tipId: string;
action: 'done' | 'dismiss' | 'snooze';
reward: number;
createdAt: string;
};
export type TaskSyncedEvent = {
userId: string;
count: number;
syncedAt: string;
};
type EventMap = {
'signals.tip.served': TipServedEvent;
'signals.tip.feedback': TipFeedbackEvent;
'signals.task.synced': TaskSyncedEvent;
};
class Bus extends EventEmitter {
publish<K extends keyof EventMap>(subject: K, payload: EventMap[K]): void {
this.emit(subject, payload);
}
subscribe<K extends keyof EventMap>(subject: K, handler: (payload: EventMap[K]) => void): void {
this.on(subject, handler);
}
}
export const bus = new Bus();

View File

@@ -9,6 +9,7 @@ import { authRouter } from './routes/auth.js';
import { integrationsRouter } from './routes/integrations.js';
import { recommenderRouter } from './routes/recommender.js';
import { userRouter } from './routes/user.js';
import { pushRouter } from './routes/push.js';
import { mkdir } from 'fs/promises';
import { dirname } from 'path';
@@ -33,6 +34,7 @@ app.use('/api/auth', authRouter);
app.use('/api/integrations', integrationsRouter);
app.use('/api', recommenderRouter);
app.use('/api/user', userRouter);
app.use('/api/push', pushRouter);
app.listen(config.PORT, () => {
console.log(`oO API listening on http://localhost:${config.PORT}`);

View File

@@ -0,0 +1,98 @@
import { type Router as ExpressRouter, Router, Response } from 'express';
import webpush from 'web-push';
import { nanoid } from 'nanoid';
import { db } from '../db/index.js';
import { pushSubscriptions } from '../db/schema.js';
import { eq, and } from 'drizzle-orm';
import { requireAuth, AuthenticatedRequest } from '../middleware/session.js';
import { config } from '../config.js';
const router: ExpressRouter = Router();
if (config.VAPID_PUBLIC_KEY && config.VAPID_PRIVATE_KEY) {
webpush.setVapidDetails(
config.VAPID_SUBJECT,
config.VAPID_PUBLIC_KEY,
config.VAPID_PRIVATE_KEY,
);
}
/** GET /api/push/vapid-public-key — client needs this to subscribe */
router.get('/vapid-public-key', (_req, res: Response) => {
res.json({ key: config.VAPID_PUBLIC_KEY });
});
/** POST /api/push/subscribe — save or refresh a push subscription */
router.post('/subscribe', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
const { endpoint, keys } = req.body as {
endpoint: string;
keys: { p256dh: string; auth: string };
};
if (!endpoint || !keys?.p256dh || !keys?.auth) {
res.status(400).json({ error: 'Invalid subscription' });
return;
}
// Upsert by endpoint
const existing = await db
.select()
.from(pushSubscriptions)
.where(eq(pushSubscriptions.endpoint, endpoint))
.limit(1);
if (existing.length) {
await db
.update(pushSubscriptions)
.set({ p256dh: keys.p256dh, auth: keys.auth })
.where(eq(pushSubscriptions.endpoint, endpoint));
} else {
await db.insert(pushSubscriptions).values({
id: nanoid(),
userId: req.userId!,
endpoint,
p256dh: keys.p256dh,
auth: keys.auth,
createdAt: new Date().toISOString(),
});
}
res.json({ ok: true });
});
/** DELETE /api/push/subscribe — unsubscribe */
router.delete('/subscribe', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
const { endpoint } = req.body as { endpoint: string };
if (endpoint) {
await db
.delete(pushSubscriptions)
.where(
and(
eq(pushSubscriptions.userId, req.userId!),
eq(pushSubscriptions.endpoint, endpoint),
),
);
}
res.json({ ok: true });
});
/** Send a push notification to a user — called internally */
export async function sendPushToUser(userId: string, payload: { title: string; body: string; url?: string }) {
if (!config.VAPID_PUBLIC_KEY) return;
const subs = await db
.select()
.from(pushSubscriptions)
.where(eq(pushSubscriptions.userId, userId));
await Promise.allSettled(
subs.map((sub) =>
webpush.sendNotification(
{ endpoint: sub.endpoint, keys: { p256dh: sub.p256dh, auth: sub.auth } },
JSON.stringify(payload),
).catch(() => {}),
),
);
}
export { router as pushRouter };

View File

@@ -4,44 +4,109 @@ import { db } from '../db/index.js';
import { integrationTokens, tipFeedback, tipViews } from '../db/schema.js';
import { eq, and } from 'drizzle-orm';
import { requireAuth, AuthenticatedRequest } from '../middleware/session.js';
import { config } from '../config.js';
import { bus } from '../events/bus.js';
import type { Tip } from '@oo/shared-types';
const router: ExpressRouter = Router();
const CACHE_TTL_MS = 30_000; // 30 seconds
const taskCache = new Map<string, { tips: Tip[]; fetchedAt: number }>();
const CACHE_TTL_MS = 30_000;
/** Fetch active Todoist tasks, with a 30s in-memory cache per user */
async function fetchTodoistTasks(userId: string, accessToken: string): Promise<Tip[]> {
interface TaskFeatures {
is_overdue: boolean;
task_age_days: number;
priority: number;
}
interface CachedTask extends Tip {
features: TaskFeatures;
}
const taskCache = new Map<string, { tasks: CachedTask[]; fetchedAt: number }>();
/** Parse a Todoist due date string into age in days (relative to now) */
function dueAgeDays(due: { date?: string; datetime?: string } | null | undefined): number {
if (!due) return 0;
const dateStr = due.datetime ?? due.date;
if (!dateStr) return 0;
const dueMs = new Date(dateStr).getTime();
return Math.max(0, (Date.now() - dueMs) / (1000 * 60 * 60 * 24));
}
async function fetchTodoistTasks(userId: string, accessToken: string): Promise<CachedTask[]> {
const cached = taskCache.get(userId);
if (cached && Date.now() - cached.fetchedAt < CACHE_TTL_MS) {
return cached.tips;
}
if (cached && Date.now() - cached.fetchedAt < CACHE_TTL_MS) return cached.tasks;
const res = await fetch('https://api.todoist.com/api/v1/tasks?filter=today%7Coverdue', {
headers: { Authorization: `Bearer ${accessToken}` },
});
if (!res.ok) return cached?.tips ?? [];
if (!res.ok) return cached?.tasks ?? [];
const body = (await res.json()) as { results: Array<{ id: string; content: string }> };
const tips: Tip[] = (body.results ?? []).map((t) => ({
id: `todoist:${t.id}`,
content: t.content,
source: 'todoist' as const,
sourceId: t.id,
createdAt: new Date().toISOString(),
}));
const body = (await res.json()) as {
results: Array<{
id: string;
content: string;
priority: number;
due: { date?: string; datetime?: string; is_recurring?: boolean } | null;
}>;
};
taskCache.set(userId, { tips, fetchedAt: Date.now() });
return tips;
const now = new Date();
const tasks: CachedTask[] = (body.results ?? []).map((t) => {
const ageDays = dueAgeDays(t.due);
const isOverdue = ageDays > 0;
return {
id: `todoist:${t.id}`,
content: t.content,
source: 'todoist' as const,
sourceId: t.id,
createdAt: now.toISOString(),
features: {
is_overdue: isOverdue,
task_age_days: ageDays,
priority: t.priority ?? 1,
},
};
});
taskCache.set(userId, { tasks, fetchedAt: Date.now() });
return tasks;
}
/**
* RandomPolicy — picks one task at random from the candidate set.
* Contract: same interface the ML scorer will implement.
*/
function randomPolicy(candidates: Tip[]): Tip | null {
/** Call ml/serving for scored selection; returns tip_id or null on failure */
async function remotePolicy(userId: string, tasks: CachedTask[]): Promise<string | null> {
const hour = new Date().getHours();
const dayOfWeek = new Date().getDay();
const body = {
user_id: userId,
candidates: tasks.map((t) => ({
id: t.id,
content: t.content,
source: t.source,
source_id: t.sourceId ?? null,
features: t.features,
})),
context: { hour_of_day: hour, day_of_week: dayOfWeek },
};
try {
const res = await fetch(`${config.ML_SERVING_URL}/score`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
signal: AbortSignal.timeout(3000),
});
if (!res.ok) return null;
const { tip_id } = (await res.json()) as { tip_id: string };
return tip_id;
} catch {
return null;
}
}
function randomPolicy(candidates: CachedTask[]): CachedTask | null {
if (!candidates.length) return null;
return candidates[Math.floor(Math.random() * candidates.length)];
}
@@ -51,12 +116,7 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
const [token] = await db
.select()
.from(integrationTokens)
.where(
and(
eq(integrationTokens.userId, req.userId!),
eq(integrationTokens.provider, 'todoist'),
),
)
.where(and(eq(integrationTokens.userId, req.userId!), eq(integrationTokens.provider, 'todoist')))
.limit(1);
if (!token) {
@@ -64,20 +124,31 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
return;
}
const candidates = await fetchTodoistTasks(req.userId!, token.accessToken);
const tip = randomPolicy(candidates);
const tasks = await fetchTodoistTasks(req.userId!, token.accessToken);
if (!tasks.length) {
res.status(204).end();
return;
}
// RemotePolicy with RandomPolicy fallback
const scoredId = await remotePolicy(req.userId!, tasks);
const tip = scoredId
? (tasks.find((t) => t.id === scoredId) ?? randomPolicy(tasks))
: randomPolicy(tasks);
if (!tip) {
res.status(204).end();
return;
}
// Record metric: tip served
await db.insert(tipViews).values({
id: nanoid(),
const servedAt = new Date().toISOString();
await db.insert(tipViews).values({ id: nanoid(), userId: req.userId!, tipId: tip.id, servedAt });
bus.publish('signals.tip.served', {
userId: req.userId!,
tipId: tip.id,
servedAt: new Date().toISOString(),
policy: scoredId ? 'linucb-v1' : 'random',
servedAt,
});
res.json({ tip });
@@ -102,27 +173,44 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest,
createdAt: new Date().toISOString(),
});
// Invalidate cache so next recommend fetches fresh tasks
// Capture task features before clearing cache
const reward = action === 'done' ? 1.0 : action === 'dismiss' ? -1.0 : 0.0;
const task = taskCache.get(req.userId!)?.tasks.find((t) => t.id === tipId);
taskCache.delete(req.userId!);
// If done, mark complete in Todoist
bus.publish('signals.tip.feedback', {
userId: req.userId!,
tipId,
action: action as 'done' | 'dismiss' | 'snooze',
reward,
createdAt: new Date().toISOString(),
});
if (task) {
fetch(`${config.ML_SERVING_URL}/reward`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
user_id: req.userId!,
tip_id: tipId,
reward,
features: task.features,
}),
}).catch(() => {});
}
// Mark complete in Todoist if done
if (action === 'done' && tipId.startsWith('todoist:')) {
const todoistId = tipId.slice(8);
const [token] = await db
const [tok] = await db
.select()
.from(integrationTokens)
.where(
and(
eq(integrationTokens.userId, req.userId!),
eq(integrationTokens.provider, 'todoist'),
),
)
.where(and(eq(integrationTokens.userId, req.userId!), eq(integrationTokens.provider, 'todoist')))
.limit(1);
if (token) {
if (tok) {
await fetch(`https://api.todoist.com/api/v1/tasks/${todoistId}/close`, {
method: 'POST',
headers: { Authorization: `Bearer ${token.accessToken}` },
headers: { Authorization: `Bearer ${tok.accessToken}` },
}).catch(() => {});
}
}