import { type Router as ExpressRouter, Router, Response } from 'express'; import { nanoid } from 'nanoid'; import { logger } from '../logger.js'; import { db } from '../db/index.js'; import { integrationTokens, tipFeedback, tipViews, tipScores, userPreferences } from '../db/schema.js'; import { eq, and, desc } from 'drizzle-orm'; import { requireAuth, AuthenticatedRequest } from '../middleware/session.js'; import { config } from '../config.js'; import { bus } from '../events/bus.js'; import type { Tip, Signal } from '@oo/shared-types'; import { todoistSource, dueAgeDays } from '../signals/todoist.js'; export { dueAgeDays }; import { googleHealthSource } from '../signals/google-health.js'; import { SignalAggregator } from '../signals/aggregator.js'; import { getActiveAgentOutputs } from './agent-outputs.js'; import { getEligibleAgentIds } from '../profile/eligibility.js'; const router: ExpressRouter = Router(); // --------------------------------------------------------------------------- // Signal aggregator — register sources here as new integrations are added // --------------------------------------------------------------------------- export const aggregator = new SignalAggregator().register(todoistSource).register(googleHealthSource); export const _clearSignalCacheForTests = () => { todoistSource.clearCache(); googleHealthSource.clearCache(); }; // --------------------------------------------------------------------------- // Orchestrator: fetch agent snippets + call ml/serving /recommend // --------------------------------------------------------------------------- interface OrchestratorResult { tip: Tip; model: string | null; agentIds: string[]; } async function loadOrchestratorPref(userId: string, key: string): Promise { const rows = await db .select({ valueJson: userPreferences.valueJson }) .from(userPreferences) .where(and(eq(userPreferences.userId, userId), eq(userPreferences.scope, 'orchestrator'), eq(userPreferences.key, key))) .limit(1); if (!rows.length) return undefined; try { return JSON.parse(rows[0].valueJson) as T; } catch { return undefined; } } async function fetchOrchestratorTip( userId: string, signals: Signal[], hour: number, dayOfWeek: number, traceparent?: string, recentTip?: string, ): Promise { const [allAgentRows, eligibleIds, scienceDestiny] = await Promise.all([ getActiveAgentOutputs(userId), getEligibleAgentIds(userId), loadOrchestratorPref(userId, 'science_destiny'), ]); const agentOutputs = allAgentRows .filter((r) => eligibleIds.has(r.agentId)) .map((r) => ({ agent_id: r.agentId, prompt_text: r.promptText })); const tasks = signals.slice(0, 10).map((s) => ({ content: s.content, priority: s.features.priority, is_overdue: s.features.is_overdue, task_age_days: s.features.task_age_days, })); try { const res = await fetch(`${config.ML_SERVING_URL}/recommend`, { method: 'POST', headers: { 'Content-Type': 'application/json', ...(traceparent ? { traceparent } : {}) }, body: JSON.stringify({ user_id: userId, agent_outputs: agentOutputs, tasks, hour_of_day: hour, day_of_week: dayOfWeek, science_destiny: scienceDestiny ?? 50, recent_tip: recentTip ?? null }), signal: AbortSignal.timeout(15_000), }); if (!res.ok) return null; const data = (await res.json()) as { tip: { id: string; content: string; rationale?: string }; model?: string; }; const now = new Date().toISOString(); return { tip: { id: `llm:${data.tip.id}`, content: data.tip.content, source: 'llm' as const, kind: 'advice' as const, rationale: data.tip.rationale, createdAt: now, }, model: data.model ?? null, agentIds: agentOutputs.map((a) => a.agent_id), }; } catch { return null; } } // --------------------------------------------------------------------------- // POST /api/recommend // Pipeline: fetch signals → orchestrator → serve; random fallback on failure // --------------------------------------------------------------------------- router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Response) => { const hour = new Date().getHours(); const dayOfWeek = new Date().getDay(); const { recent_tip: recentTip } = req.body as { recent_tip?: string }; const anyToken = await db .select({ id: integrationTokens.id }) .from(integrationTokens) .where(eq(integrationTokens.userId, req.userId!)) .limit(1); if (!anyToken.length) { res.status(422).json({ error: 'No integrations connected' }); return; } const signals = await aggregator.fetchAll(req.userId!); const t0 = Date.now(); const orchestrated = await fetchOrchestratorTip(req.userId!, signals, hour, dayOfWeek, req.traceparent, recentTip); const latencyMs = Date.now() - t0; if (!orchestrated) { res.status(204).end(); return; } const tip = orchestrated.tip; const policy = 'orchestrator'; const servedAt = new Date().toISOString(); await db.insert(tipViews).values({ id: nanoid(), userId: req.userId!, tipId: tip.id, servedAt }); await db.insert(tipScores).values({ id: nanoid(), userId: req.userId!, tipId: tip.id, policy, mlScore: null, featuresJson: JSON.stringify({ agent_ids: orchestrated.agentIds, hour_of_day: hour, day_of_week: dayOfWeek }), candidateCount: 1, latencyMs, servedAt, promptVersion: 'v4-orchestrator', llmModel: orchestrated.model, tipKind: tip.kind ?? null, }); bus.publish('signals.tip.served', { userId: req.userId!, tipId: tip.id, policy, servedAt, }); res.json({ tip }); }); // --------------------------------------------------------------------------- // Reward inference from action + dwell time // // dismiss → -1.0 (clear rejection) // snooze → +0.1 (tip noticed, timing off — mild positive) // helpful → +0.5 (explicit positive signal) // not_helpful → -0.5 (explicit negative signal) // done < 15 s → -0.3 (almost certainly a stale task, not magic) // done 15 s – 2 min → +1.0 (magic zone: user saw tip and acted) // done 2 – 10 min → +0.6 (good: user engaged, acted in same session) // done > 10 min → +0.3 (eventually done; tip may have helped, unclear) // --------------------------------------------------------------------------- export function inferReward(action: string, dwellMs: number | null): number { if (action === 'dismiss') return -1.0; if (action === 'snooze') return 0.1; if (action === 'helpful') return 0.5; if (action === 'not_helpful') return -0.5; if (dwellMs === null || dwellMs < 0) return 0.5; if (dwellMs < 15_000) return -0.3; if (dwellMs < 120_000) return 1.0; if (dwellMs < 600_000) return 0.6; return 0.3; } // --------------------------------------------------------------------------- // POST /api/tip/:id/feedback // --------------------------------------------------------------------------- router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest, res: Response) => { const { action } = req.body as { action: string }; const tipId = String(req.params.id); const now = new Date(); const validActions = ['done', 'dismiss', 'snooze', 'helpful', 'not_helpful']; if (!validActions.includes(action)) { res.status(400).json({ error: 'Invalid action' }); return; } let dwellMs: number | null = null; const [lastView] = await db .select({ servedAt: tipViews.servedAt }) .from(tipViews) .where(and(eq(tipViews.userId, req.userId!), eq(tipViews.tipId, tipId))) .orderBy(desc(tipViews.servedAt)) .limit(1); if (lastView?.servedAt) { dwellMs = now.getTime() - new Date(lastView.servedAt).getTime(); } const reward = inferReward(action, dwellMs); await db.insert(tipFeedback).values({ id: nanoid(), userId: req.userId!, tipId, action, sourceId: tipId.startsWith('todoist:') ? tipId.slice(8) : null, dwellMs: dwellMs !== null ? Math.round(dwellMs) : null, rewardMilli: Math.round(reward * 1000), createdAt: now.toISOString(), }); bus.publish('signals.tip.feedback', { userId: req.userId!, tipId, action: action as 'done' | 'dismiss' | 'snooze' | 'helpful' | 'not_helpful', reward, dwellMs, createdAt: now.toISOString(), }); await aggregator.act(req.userId!, tipId, action); res.json({ ok: true }); }); export { router as recommenderRouter };