feat: M2 AI tips — LiteLLM gateway, context assembler, end-to-end generation pipeline

Issues closed: #86, #87, #88, #89, #90, #91, #79, #80, #82

infra:
- docker-compose `ai` profile: Ollama + LiteLLM services
- infra/litellm/litellm_config.yaml: tip-generator / embedder / judge aliases
- .env.example: LITELLM_URL, LITELLM_MASTER_KEY, OLLAMA_URL

ml/serving:
- POST /generate: calls LiteLLM tip-generator alias, returns TipCandidate[]
- JSON retry loop (2 retries with correction prompt on malformed response)
- _parse_llm_json strips markdown fences

ml/features:
- context.py: build_context() assembles user signals → PromptContext
  (sorts overdue/high-priority tasks first for LLM prompt quality)

shared-types:
- TipKind, TipSource, TipCandidate types
- Tip gains kind + rationale fields

services/api:
- recommender: 3-stage pipeline (assemble → score → serve)
  Stage 1: Todoist tasks + LLM candidates fetched in parallel
  Stage 2: egreedy bandit scores merged candidate pool
  Stage 3: serve + log with prompt_version, llm_model, tip_kind
- tip_scores: prompt_version, llm_model, tip_kind columns + migrations
- config: LITELLM_URL added
- integrations: surface token_status in /integrations response

tests:
- ml/serving/tests/test_generate.py: 13 tests (retry, 502/503, fence variants)
- ml/features/test_context.py: 9 tests (sorting, edge cases)
- services/api recommender.unit.test.ts: 16 pure-function tests (inferReward, dueAgeDays)
- services/api recommender.test.ts: 4 integration tests (tip_scores columns, LLM fallback)
- shared-types: TipCandidate, rationale, full TipFeedback action set

docs:
- ADR-0008: LiteLLM AI gateway decision
- overview.md: M2 pipeline description updated
- ml/README.md: serving + features roles updated

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-17 14:09:02 +00:00
parent 85367aeaa0
commit ffdf70733f
22 changed files with 1017 additions and 45 deletions

View File

@@ -6,23 +6,15 @@ import { eq, and, desc } from 'drizzle-orm';
import { requireAuth, AuthenticatedRequest } from '../middleware/session.js';
import { config } from '../config.js';
import { bus } from '../events/bus.js';
import type { Tip } from '@oo/shared-types';
import type { TipCandidate } from '@oo/shared-types';
const router: ExpressRouter = Router();
const CACHE_TTL_MS = 30_000;
const PROMPT_VERSION = 'v1';
interface TaskFeatures {
is_overdue: boolean;
task_age_days: number;
priority: number;
}
interface CachedTask extends Tip {
features: TaskFeatures;
}
const taskCache = new Map<string, { tasks: CachedTask[]; fetchedAt: number }>();
const taskCache = new Map<string, { tasks: TipCandidate[]; fetchedAt: number }>();
export const _clearTaskCacheForTests = () => taskCache.clear();
// ---------------------------------------------------------------------------
// Shadow-policy registry
@@ -49,7 +41,7 @@ export function setPolicyActive(name: string, active: boolean): boolean {
// Todoist helpers
// ---------------------------------------------------------------------------
function dueAgeDays(due: { date?: string; datetime?: string } | null | undefined): number {
export function dueAgeDays(due: { date?: string; datetime?: string } | null | undefined): number {
if (!due) return 0;
const dateStr = due.datetime ?? due.date;
if (!dateStr) return 0;
@@ -57,7 +49,7 @@ function dueAgeDays(due: { date?: string; datetime?: string } | null | undefined
return Math.max(0, (Date.now() - dueMs) / (1000 * 60 * 60 * 24));
}
async function fetchTodoistTasks(userId: string, accessToken: string): Promise<CachedTask[]> {
async function fetchTodoistTasks(userId: string, accessToken: string): Promise<TipCandidate[]> {
const cached = taskCache.get(userId);
if (cached && Date.now() - cached.fetchedAt < CACHE_TTL_MS) return cached.tasks;
@@ -73,6 +65,10 @@ async function fetchTodoistTasks(userId: string, accessToken: string): Promise<C
provider: 'todoist',
detectedAt: new Date().toISOString(),
});
await db
.update(integrationTokens)
.set({ tokenStatus: 'needs_reconnect' })
.where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'todoist')));
}
return cached?.tasks ?? [];
}
@@ -87,13 +83,14 @@ async function fetchTodoistTasks(userId: string, accessToken: string): Promise<C
};
const now = new Date();
const tasks: CachedTask[] = (body.results ?? []).map((t) => {
const tasks: TipCandidate[] = (body.results ?? []).map((t) => {
const ageDays = dueAgeDays(t.due);
const isOverdue = ageDays > 0;
return {
id: `todoist:${t.id}`,
content: t.content,
source: 'todoist' as const,
kind: 'task' as const,
sourceId: t.id,
createdAt: now.toISOString(),
features: {
@@ -111,10 +108,14 @@ async function fetchTodoistTasks(userId: string, accessToken: string): Promise<C
return tasks;
}
// ---------------------------------------------------------------------------
// Stage 2: score candidates via ml/serving bandit
// ---------------------------------------------------------------------------
/** Call ml/serving for scored selection; returns { tip_id, score } or null on failure */
async function remotePolicy(
userId: string,
tasks: CachedTask[],
tasks: TipCandidate[],
): Promise<{ tipId: string; score: number; policy: string } | null> {
const hour = new Date().getHours();
const dayOfWeek = new Date().getDay();
@@ -147,13 +148,64 @@ async function remotePolicy(
}
}
function randomPolicy(candidates: CachedTask[]): CachedTask | null {
function randomPolicy(candidates: TipCandidate[]): TipCandidate | null {
if (!candidates.length) return null;
return candidates[Math.floor(Math.random() * candidates.length)];
}
// ---------------------------------------------------------------------------
// Stage 1b: fetch LLM candidates from ml/serving /generate
// ---------------------------------------------------------------------------
interface LlmCandidate {
id: string;
content: string;
rationale?: string;
}
async function fetchLlmCandidates(
userId: string,
todoistTasks: TipCandidate[],
hour: number,
dayOfWeek: number,
): Promise<TipCandidate[]> {
try {
const tasks = todoistTasks.slice(0, 10).map((t) => ({
content: t.content,
priority: t.features.priority,
is_overdue: t.features.is_overdue,
task_age_days: t.features.task_age_days,
}));
const res = await fetch(`${config.ML_SERVING_URL}/generate`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
user_id: userId,
context: { tasks, hour_of_day: hour, day_of_week: dayOfWeek },
n: 3,
}),
signal: AbortSignal.timeout(15_000),
});
if (!res.ok) return [];
const data = (await res.json()) as { candidates: LlmCandidate[]; model?: string };
const now = new Date().toISOString();
return data.candidates.map((c) => ({
id: `llm:${c.id}`,
content: c.content,
source: 'llm' as const,
kind: 'advice' as const,
rationale: c.rationale,
createdAt: now,
features: { is_overdue: false, task_age_days: 0, priority: 1 },
}));
} catch {
return [];
}
}
// ---------------------------------------------------------------------------
// POST /api/recommend
// Pipeline: [Stage 1] assemble candidates → [Stage 2] score → [Stage 3] serve
// ---------------------------------------------------------------------------
router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
const [token] = await db
@@ -167,34 +219,42 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
return;
}
const tasks = await fetchTodoistTasks(req.userId!, token.accessToken);
if (!tasks.length) {
const hour = new Date().getHours();
const dayOfWeek = new Date().getDay();
// Stage 1: assemble candidates — Todoist tasks + LLM-generated advice (parallel)
const [todoistTasks, llmCandidates] = await Promise.all([
fetchTodoistTasks(req.userId!, token.accessToken),
fetchLlmCandidates(req.userId!, taskCache.get(req.userId!)?.tasks ?? [], hour, dayOfWeek),
]);
const allCandidates: TipCandidate[] = [...todoistTasks, ...llmCandidates];
if (!allCandidates.length) {
res.status(204).end();
return;
}
const hour = new Date().getHours();
const dayOfWeek = new Date().getDay();
const t0 = Date.now();
// RemotePolicy with RandomPolicy fallback
const scored = await remotePolicy(req.userId!, tasks);
// Stage 2: score — egreedy bandit with random fallback
const scored = await remotePolicy(req.userId!, allCandidates);
const latencyMs = Date.now() - t0;
const tip = scored
? (tasks.find((t) => t.id === scored.tipId) ?? randomPolicy(tasks))
: randomPolicy(tasks);
? (allCandidates.find((t) => t.id === scored.tipId) ?? randomPolicy(allCandidates))
: randomPolicy(allCandidates);
if (!tip) {
res.status(204).end();
return;
}
// Stage 3: serve + log
const policy = scored ? scored.policy : 'random';
const isLlmTip = tip.source === 'llm';
const servedAt = new Date().toISOString();
await db.insert(tipViews).values({ id: nanoid(), userId: req.userId!, tipId: tip.id, servedAt });
// Log recommendation explainability
await db.insert(tipScores).values({
id: nanoid(),
userId: req.userId!,
@@ -208,9 +268,12 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
hour_of_day: hour,
day_of_week: dayOfWeek,
}),
candidateCount: tasks.length,
candidateCount: allCandidates.length,
latencyMs,
servedAt,
promptVersion: isLlmTip ? PROMPT_VERSION : null,
llmModel: isLlmTip ? 'tip-generator' : null,
tipKind: tip.kind ?? null,
});
bus.publish('signals.tip.served', {
@@ -224,7 +287,7 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
for (const [name, s] of shadowPolicies) {
if (!s.active) continue;
if (name.startsWith('random')) {
const shadowTip = randomPolicy(tasks);
const shadowTip = randomPolicy(allCandidates);
bus.publish('signals.tip.served', {
userId: req.userId!,
tipId: shadowTip?.id ?? 'none',
@@ -249,7 +312,7 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
// done 2 10 min → +0.6 (good: user engaged, acted in same session)
// done > 10 min → +0.3 (eventually done; tip may have helped, unclear)
// ---------------------------------------------------------------------------
function inferReward(action: string, dwellMs: number | null): number {
export function inferReward(action: string, dwellMs: number | null): number {
if (action === 'dismiss') return -1.0;
if (action === 'snooze') return 0.1;
if (action === 'helpful') return 0.5;
@@ -269,7 +332,7 @@ async function sendRewardWithRetry(
userId: string,
tipId: string,
reward: number,
features: TaskFeatures,
features: TipCandidate['features'],
): Promise<void> {
const body = JSON.stringify({
user_id: userId,
@@ -347,7 +410,7 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest,
createdAt: now.toISOString(),
});
const task = taskCache.get(req.userId!)?.tasks.find((t) => t.id === tipId);
const task: TipCandidate | undefined = taskCache.get(req.userId!)?.tasks.find((t) => t.id === tipId);
taskCache.delete(req.userId!);