diff --git a/docs/adr/0009-signal-normalization.md b/docs/adr/0009-signal-normalization.md new file mode 100644 index 0000000..312cc95 --- /dev/null +++ b/docs/adr/0009-signal-normalization.md @@ -0,0 +1,53 @@ +# ADR-0009 — Signal normalization strategy + +**Status:** Accepted +**Date:** 2026-04-18 +**Issue:** #78 + +## Context + +The recommender was hard-wired to Todoist: task fetch, cache, and feature extraction lived inside `recommender.ts` with no abstraction boundary. Adding Google Calendar, Apple Health, or manual input sources would have required forking the pipeline per source. + +## Decision + +Introduce two abstractions in `packages/shared-types`: + +```typescript +interface Signal { + id: string; + source: string; + kind: 'task' | 'event' | 'habit' | 'insight'; + content: string; + metadata: Record; // raw source fields, not used by bandit + features: Record; // bandit-ready features + timestamp: string; +} + +interface SignalSource { + readonly id: string; + fetchSignals(userId: string): Promise; + act?(userId: string, signalId: string, action: string): Promise; +} +``` + +`SignalAggregator` calls all registered sources in parallel, isolating failures per source. +`TodoistSignalSource` moves all Todoist-specific logic (fetch, 401 handling, cache, bus events) out of the recommender route. +The recommender maps `Signal[]` → `TipCandidate[]` via a thin adapter and registers action dispatch through the aggregator. + +## Consequences + +**Good:** +- Adding a new signal source is a single `aggregator.register(new MySource())` call. +- `TipCandidate.features` is now `Record`, matching `Signal.features`. Sources control their own feature names; the bandit serialises them as-is. +- Source failures are isolated: a broken Google Calendar connector does not prevent Todoist signals from reaching the bandit. +- `act()` on the aggregator routes actions back to the owning source (e.g. marking a Todoist task done), replacing ad-hoc source-specific logic in the feedback handler. + +**Trade-offs:** +- Feature names are no longer compile-time typed. Convention: sources document their feature keys in their class JSDoc. The Python bandit already treated features as an opaque dict. +- Each source is responsible for its own token lookup (DB access injected via module-level `db`). This is acceptable in a modular monolith; extract to a token vault interface if sources move to separate processes. + +## Alternatives considered + +**Typed feature schema per source kind** — rejected: would require union types across all sources and a discriminant on every consumer. The bandit doesn't benefit from TypeScript types at runtime. + +**Aggregator holds tokens, passes to sources** — rejected: leaks auth concerns into the aggregator. Sources know their own auth requirements. diff --git a/packages/shared-types/src/http/signal.ts b/packages/shared-types/src/http/signal.ts new file mode 100644 index 0000000..ce899da --- /dev/null +++ b/packages/shared-types/src/http/signal.ts @@ -0,0 +1,18 @@ +/** A normalized signal from any connected source */ +export interface Signal { + id: string; + source: string; // e.g. 'todoist', 'google-calendar', 'manual' + kind: 'task' | 'event' | 'habit' | 'insight'; + content: string; + metadata: Record; // source-specific raw fields + features: Record; // bandit-ready numeric/boolean features + timestamp: string; // ISO 8601 +} + +/** A pluggable data source that produces normalized signals */ +export interface SignalSource { + readonly id: string; + fetchSignals(userId: string): Promise; + /** Optional: perform an action on the originating system (e.g. mark task done) */ + act?(userId: string, signalId: string, action: string): Promise; +} diff --git a/packages/shared-types/src/http/tip.ts b/packages/shared-types/src/http/tip.ts index 3100c0b..05a254a 100644 --- a/packages/shared-types/src/http/tip.ts +++ b/packages/shared-types/src/http/tip.ts @@ -18,13 +18,11 @@ export interface Tip { /** * A scored tip candidate flowing through the bandit pipeline. * Extends Tip with features needed for scoring. + * features is a flexible map so new signal sources can contribute without + * schema changes — the bandit serialises them as-is. */ export interface TipCandidate extends Tip { - features: { - is_overdue: boolean; - task_age_days: number; - priority: number; - }; + features: Record; } /** POST /recommend response */ diff --git a/packages/shared-types/src/index.ts b/packages/shared-types/src/index.ts index 01ee464..5165daa 100644 --- a/packages/shared-types/src/index.ts +++ b/packages/shared-types/src/index.ts @@ -2,3 +2,4 @@ export * from './http/tip.js'; export * from './http/auth.js'; export * from './http/integrations.js'; export * from './http/user.js'; +export * from './http/signal.js'; diff --git a/services/api/src/routes/__tests__/recommender.test.ts b/services/api/src/routes/__tests__/recommender.test.ts index fda6e2b..962d65d 100644 --- a/services/api/src/routes/__tests__/recommender.test.ts +++ b/services/api/src/routes/__tests__/recommender.test.ts @@ -62,7 +62,7 @@ describe('POST /recommend integration', () => { const mod = await import('../recommender.js'); const { recommenderRouter } = mod; - clearCache = (mod as any)._clearTaskCacheForTests; + clearCache = (mod as any)._clearCandidateCacheForTests; const app = express(); app.use(express.json()); app.use('/api', recommenderRouter); diff --git a/services/api/src/routes/recommender.ts b/services/api/src/routes/recommender.ts index 53955a2..77bb743 100644 --- a/services/api/src/routes/recommender.ts +++ b/services/api/src/routes/recommender.ts @@ -6,22 +6,33 @@ import { eq, and, desc } from 'drizzle-orm'; import { requireAuth, AuthenticatedRequest } from '../middleware/session.js'; import { config } from '../config.js'; import { bus } from '../events/bus.js'; -import type { TipCandidate } from '@oo/shared-types'; +import type { TipCandidate, Signal } from '@oo/shared-types'; +import { todoistSource, dueAgeDays } from '../signals/todoist.js'; +export { dueAgeDays }; +import { SignalAggregator } from '../signals/aggregator.js'; const router: ExpressRouter = Router(); -const CACHE_TTL_MS = 30_000; const PROMPT_VERSION = 'v1'; -const taskCache = new Map(); -export const _clearTaskCacheForTests = () => taskCache.clear(); +// --------------------------------------------------------------------------- +// Signal aggregator — register sources here as new integrations are added +// --------------------------------------------------------------------------- +export const aggregator = new SignalAggregator().register(todoistSource); + +// --------------------------------------------------------------------------- +// Candidate cache — stores the last assembled candidate set per user so the +// feedback handler can look up features for reward delivery. +// --------------------------------------------------------------------------- +const candidateCache = new Map(); +export const _clearCandidateCacheForTests = () => { + candidateCache.clear(); + todoistSource.clearCache(); +}; // --------------------------------------------------------------------------- // Shadow-policy registry // --------------------------------------------------------------------------- -// A shadow policy runs alongside the active policy, logs its picks, but does -// NOT affect what the user sees. Promotion to A/B or live is a manual step. -// Structure: Map const shadowPolicies = new Map([ // Example: enable random as a shadow baseline // ('random-shadow', { active: true }), @@ -38,81 +49,24 @@ export function setPolicyActive(name: string, active: boolean): boolean { } // --------------------------------------------------------------------------- -// Todoist helpers +// Signal → TipCandidate conversion // --------------------------------------------------------------------------- - -export function dueAgeDays(due: { date?: string; datetime?: string } | null | undefined): number { - if (!due) return 0; - const dateStr = due.datetime ?? due.date; - if (!dateStr) return 0; - const dueMs = new Date(dateStr).getTime(); - return Math.max(0, (Date.now() - dueMs) / (1000 * 60 * 60 * 24)); -} - -async function fetchTodoistTasks(userId: string, accessToken: string): Promise { - const cached = taskCache.get(userId); - if (cached && Date.now() - cached.fetchedAt < CACHE_TTL_MS) return cached.tasks; - - const res = await fetch('https://api.todoist.com/api/v1/tasks?filter=today%7Coverdue', { - headers: { Authorization: `Bearer ${accessToken}` }, - }); - - if (!res.ok) { - if (res.status === 401) { - console.error(`[todoist] token expired for user ${userId}`); - bus.publish('signals.integration.token_expired', { - userId, - provider: 'todoist', - detectedAt: new Date().toISOString(), - }); - await db - .update(integrationTokens) - .set({ tokenStatus: 'needs_reconnect' }) - .where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'todoist'))); - } - return cached?.tasks ?? []; - } - - const body = (await res.json()) as { - results: Array<{ - id: string; - content: string; - priority: number; - due: { date?: string; datetime?: string; is_recurring?: boolean } | null; - }>; +function signalToCandidate(signal: Signal): TipCandidate { + return { + id: signal.id, + content: signal.content, + source: signal.source as TipCandidate['source'], + kind: signal.kind as TipCandidate['kind'], + sourceId: (signal.metadata.todoistId as string | undefined) ?? undefined, + createdAt: signal.timestamp, + features: signal.features, }; - - const now = new Date(); - const tasks: TipCandidate[] = (body.results ?? []).map((t) => { - const ageDays = dueAgeDays(t.due); - const isOverdue = ageDays > 0; - return { - id: `todoist:${t.id}`, - content: t.content, - source: 'todoist' as const, - kind: 'task' as const, - sourceId: t.id, - createdAt: now.toISOString(), - features: { - is_overdue: isOverdue, - task_age_days: ageDays, - priority: t.priority ?? 1, - }, - }; - }); - - taskCache.set(userId, { tasks, fetchedAt: Date.now() }); - - bus.publish('signals.task.synced', { userId, count: tasks.length, syncedAt: now.toISOString() }); - - return tasks; } // --------------------------------------------------------------------------- // Stage 2: score candidates via ml/serving bandit // --------------------------------------------------------------------------- -/** Call ml/serving for scored selection; returns { tip_id, score } or null on failure */ async function remotePolicy( userId: string, tasks: TipCandidate[], @@ -165,16 +119,16 @@ interface LlmCandidate { async function fetchLlmCandidates( userId: string, - todoistTasks: TipCandidate[], + signals: Signal[], hour: number, dayOfWeek: number, ): Promise { try { - const tasks = todoistTasks.slice(0, 10).map((t) => ({ - content: t.content, - priority: t.features.priority, - is_overdue: t.features.is_overdue, - task_age_days: t.features.task_age_days, + const tasks = signals.slice(0, 10).map((s) => ({ + content: s.content, + priority: s.features.priority, + is_overdue: s.features.is_overdue, + task_age_days: s.features.task_age_days, })); const res = await fetch(`${config.ML_SERVING_URL}/generate`, { method: 'POST', @@ -208,32 +162,36 @@ async function fetchLlmCandidates( // Pipeline: [Stage 1] assemble candidates → [Stage 2] score → [Stage 3] serve // --------------------------------------------------------------------------- router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Response) => { - const [token] = await db - .select() + const hour = new Date().getHours(); + const dayOfWeek = new Date().getDay(); + + // Fail fast if no source tokens are connected + const anyToken = await db + .select({ id: integrationTokens.id }) .from(integrationTokens) - .where(and(eq(integrationTokens.userId, req.userId!), eq(integrationTokens.provider, 'todoist'))) + .where(eq(integrationTokens.userId, req.userId!)) .limit(1); - if (!token) { + if (!anyToken.length) { res.status(422).json({ error: 'No integrations connected' }); return; } - const hour = new Date().getHours(); - const dayOfWeek = new Date().getDay(); + // Stage 1: assemble candidates — aggregated signals + LLM-generated advice (parallel) + const signals = await aggregator.fetchAll(req.userId!); - // Stage 1: assemble candidates — Todoist tasks + LLM-generated advice (parallel) - const [todoistTasks, llmCandidates] = await Promise.all([ - fetchTodoistTasks(req.userId!, token.accessToken), - fetchLlmCandidates(req.userId!, taskCache.get(req.userId!)?.tasks ?? [], hour, dayOfWeek), - ]); + const signalCandidates = signals.map(signalToCandidate); + const llmCandidates = await fetchLlmCandidates(req.userId!, signals, hour, dayOfWeek); - const allCandidates: TipCandidate[] = [...todoistTasks, ...llmCandidates]; + const allCandidates: TipCandidate[] = [...signalCandidates, ...llmCandidates]; if (!allCandidates.length) { res.status(204).end(); return; } + // Cache candidates so the feedback handler can retrieve features + candidateCache.set(req.userId!, allCandidates); + const t0 = Date.now(); // Stage 2: score — egreedy bandit with random fallback @@ -262,9 +220,7 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re policy, mlScore: scored ? Math.round(scored.score * 1000) : null, featuresJson: JSON.stringify({ - is_overdue: tip.features.is_overdue, - task_age_days: tip.features.task_age_days, - priority: tip.features.priority, + ...tip.features, hour_of_day: hour, day_of_week: dayOfWeek, }), @@ -410,9 +366,10 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest, createdAt: now.toISOString(), }); - const task: TipCandidate | undefined = taskCache.get(req.userId!)?.tasks.find((t) => t.id === tipId); - - taskCache.delete(req.userId!); + // Look up cached candidate for reward features; invalidate after + const cached = candidateCache.get(req.userId!); + const candidate = cached?.find((t) => t.id === tipId); + candidateCache.delete(req.userId!); bus.publish('signals.tip.feedback', { userId: req.userId!, @@ -423,26 +380,12 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest, createdAt: now.toISOString(), }); - if (task) { - sendRewardWithRetry(req.userId!, tipId, reward, task.features); + if (candidate) { + sendRewardWithRetry(req.userId!, tipId, reward, candidate.features); } - // Mark complete in Todoist if done - if (action === 'done' && tipId.startsWith('todoist:')) { - const todoistId = tipId.slice(8); - const [tok] = await db - .select() - .from(integrationTokens) - .where(and(eq(integrationTokens.userId, req.userId!), eq(integrationTokens.provider, 'todoist'))) - .limit(1); - - if (tok) { - await fetch(`https://api.todoist.com/api/v1/tasks/${todoistId}/close`, { - method: 'POST', - headers: { Authorization: `Bearer ${tok.accessToken}` }, - }).catch(() => {}); - } - } + // Delegate action to the owning signal source (e.g. mark done in Todoist) + await aggregator.act(req.userId!, tipId, action); res.json({ ok: true }); }); diff --git a/services/api/src/signals/aggregator.ts b/services/api/src/signals/aggregator.ts new file mode 100644 index 0000000..e622d40 --- /dev/null +++ b/services/api/src/signals/aggregator.ts @@ -0,0 +1,39 @@ +import type { Signal, SignalSource } from '@oo/shared-types'; + +/** + * Merges signals from all registered sources for a user. + * Sources run in parallel; failures are isolated — a broken source + * does not prevent other sources from contributing. + */ +export class SignalAggregator { + private sources: SignalSource[] = []; + + register(source: SignalSource): this { + this.sources.push(source); + return this; + } + + async fetchAll(userId: string): Promise { + const results = await Promise.allSettled( + this.sources.map((s) => s.fetchSignals(userId)), + ); + + const signals: Signal[] = []; + for (let i = 0; i < results.length; i++) { + const r = results[i]; + if (r.status === 'fulfilled') { + signals.push(...r.value); + } else { + console.error(`[aggregator] source '${this.sources[i].id}' failed:`, r.reason); + } + } + return signals; + } + + /** Dispatch an action to whichever source owns the signal */ + async act(userId: string, signalId: string, action: string): Promise { + const sourceId = signalId.split(':')[0]; + const source = this.sources.find((s) => s.id === sourceId); + await source?.act?.(userId, signalId, action); + } +} diff --git a/services/api/src/signals/todoist.ts b/services/api/src/signals/todoist.ts new file mode 100644 index 0000000..16d7f6c --- /dev/null +++ b/services/api/src/signals/todoist.ts @@ -0,0 +1,115 @@ +import type { Signal, SignalSource } from '@oo/shared-types'; +import { db } from '../db/index.js'; +import { integrationTokens } from '../db/schema.js'; +import { eq, and } from 'drizzle-orm'; +import { bus } from '../events/bus.js'; + +const CACHE_TTL_MS = 30_000; + +export function dueAgeDays(due: { date?: string; datetime?: string } | null | undefined): number { + if (!due) return 0; + const dateStr = due.datetime ?? due.date; + if (!dateStr) return 0; + return Math.max(0, (Date.now() - new Date(dateStr).getTime()) / (1000 * 60 * 60 * 24)); +} + +export class TodoistSignalSource implements SignalSource { + readonly id = 'todoist'; + + private cache = new Map(); + + /** Exposed for tests */ + clearCache(userId?: string): void { + if (userId) this.cache.delete(userId); + else this.cache.clear(); + } + + getCached(userId: string): Signal[] { + return this.cache.get(userId)?.signals ?? []; + } + + async fetchSignals(userId: string): Promise { + const entry = this.cache.get(userId); + if (entry && Date.now() - entry.fetchedAt < CACHE_TTL_MS) return entry.signals; + + const [token] = await db + .select() + .from(integrationTokens) + .where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'todoist'))) + .limit(1); + + if (!token) return []; + + const res = await fetch('https://api.todoist.com/api/v1/tasks?filter=today%7Coverdue', { + headers: { Authorization: `Bearer ${token.accessToken}` }, + }); + + if (!res.ok) { + if (res.status === 401) { + console.error(`[todoist] token expired for user ${userId}`); + bus.publish('signals.integration.token_expired', { + userId, + provider: 'todoist', + detectedAt: new Date().toISOString(), + }); + await db + .update(integrationTokens) + .set({ tokenStatus: 'needs_reconnect' }) + .where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'todoist'))); + } + return entry?.signals ?? []; + } + + const body = (await res.json()) as { + results: Array<{ + id: string; + content: string; + priority: number; + due: { date?: string; datetime?: string; is_recurring?: boolean } | null; + }>; + }; + + const now = new Date().toISOString(); + const signals: Signal[] = (body.results ?? []).map((t) => { + const ageDays = dueAgeDays(t.due); + return { + id: `todoist:${t.id}`, + source: 'todoist', + kind: 'task', + content: t.content, + metadata: { todoistId: t.id, due: t.due, priority: t.priority }, + features: { + is_overdue: ageDays > 0, + task_age_days: ageDays, + priority: t.priority ?? 1, + }, + timestamp: now, + }; + }); + + this.cache.set(userId, { signals, fetchedAt: Date.now() }); + bus.publish('signals.task.synced', { userId, count: signals.length, syncedAt: now }); + + return signals; + } + + async act(userId: string, signalId: string, action: string): Promise { + if (action !== 'done' || !signalId.startsWith('todoist:')) return; + const todoistId = signalId.slice(8); + + const [tok] = await db + .select() + .from(integrationTokens) + .where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'todoist'))) + .limit(1); + + if (!tok) return; + + await fetch(`https://api.todoist.com/api/v1/tasks/${todoistId}/close`, { + method: 'POST', + headers: { Authorization: `Bearer ${tok.accessToken}` }, + }).catch(() => {}); + } +} + +export const todoistSource = new TodoistSignalSource();