From c65bedcf6875d5d0334185f23701a8ab6bd90fdc Mon Sep 17 00:00:00 2001 From: alvis Date: Mon, 4 May 2026 10:37:15 +0000 Subject: [PATCH] =?UTF-8?q?feat(api):=20orchestrator=20cutover=20=E2=80=94?= =?UTF-8?q?=20replace=20bandit=20with=20multi-agent=20pipeline=20(ADR-0013?= =?UTF-8?q?=20step=206)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit POST /recommend now calls ml/serving /recommend with pre-computed agent snippets + task context instead of /generate + /score/egreedy/v2. Falls back to a random signal candidate when ml/serving is unavailable. Removes: remotePolicy, fetchLlmCandidates, sendRewardWithRetry, candidateCache, pickPromptVersion. Feedback handler keeps inferReward + tipFeedback writes for observability; reward delivery to the bandit is gone. tipScores.policy is now 'orchestrator'; promptVersion is 'v4-orchestrator'. Co-Authored-By: Claude Sonnet 4.6 --- .../src/routes/__tests__/recommender.test.ts | 101 ++--- .../routes/__tests__/recommender.unit.test.ts | 45 +-- services/api/src/routes/recommender.ts | 362 ++++-------------- services/api/src/test/db.ts | 11 + 4 files changed, 117 insertions(+), 402 deletions(-) diff --git a/services/api/src/routes/__tests__/recommender.test.ts b/services/api/src/routes/__tests__/recommender.test.ts index f68475c..b2fee13 100644 --- a/services/api/src/routes/__tests__/recommender.test.ts +++ b/services/api/src/routes/__tests__/recommender.test.ts @@ -4,6 +4,10 @@ * inside beforeAll (same pattern as admin.test.ts) to avoid TDZ issues. * Uses http.request (not fetch) as the test client so that globalThis.fetch * mocking doesn't interfere with the test runner itself. + * + * The orchestrator path (ADR-0013): signals fetched for task context/fallback, + * then ml/serving /recommend called. agent_outputs table is empty in tests so + * the orchestrator always uses the raw-task fallback path. */ import { describe, it, expect, vi, beforeAll, afterEach } from 'vitest'; import express from 'express'; @@ -48,7 +52,7 @@ describe('POST /recommend integration', () => { let server: http.Server; let baseUrl: string; let savedFetch: typeof globalThis.fetch; - let clearCache: () => void; + let clearSignalCache: () => void; beforeAll(async () => { await testDb.insert(users).values({ @@ -58,11 +62,12 @@ describe('POST /recommend integration', () => { await testDb.insert(integrationTokens).values({ id: 'tok-1', userId: 'user-1', provider: 'todoist', accessToken: 'fake-token', connectedAt: new Date().toISOString(), + tokenStatus: 'active', }); const mod = await import('../recommender.js'); const { recommenderRouter } = mod; - clearCache = (mod as any)._clearCandidateCacheForTests; + clearSignalCache = (mod as any)._clearSignalCacheForTests; const app = express(); app.use(express.json()); app.use('/api', recommenderRouter); @@ -74,19 +79,22 @@ describe('POST /recommend integration', () => { afterEach(() => { globalThis.fetch = savedFetch; - clearCache?.(); + clearSignalCache?.(); }); - it('returns 204 when Todoist + LLM both return empty', async () => { - globalThis.fetch = vi.fn().mockResolvedValue({ - ok: true, status: 200, - json: async () => ({ results: [] }), - } as any); + it('returns 204 when Todoist is empty and orchestrator fails', async () => { + globalThis.fetch = vi.fn().mockImplementation((url: string) => { + if (String(url).includes('todoist.com')) { + return Promise.resolve({ ok: true, status: 200, json: async () => ({ results: [] }) } as any); + } + // /recommend fails → orchestrator returns null, random fallback also empty → 204 + return Promise.resolve({ ok: false, status: 503 } as any); + }); const { status } = await post(`${baseUrl}/api/recommend`); expect(status).toBe(204); }); - it('serves todoist tip and writes correct tip_scores columns', async () => { + it('serves orchestrator tip and writes correct tip_scores columns', async () => { globalThis.fetch = vi.fn().mockImplementation((url: string) => { if (String(url).includes('todoist.com')) { return Promise.resolve({ @@ -96,55 +104,16 @@ describe('POST /recommend integration', () => { }), } as any); } - if (String(url).includes('/generate')) { - return Promise.resolve({ ok: false, status: 503, json: async () => ({}) } as any); - } - if (String(url).includes('/score')) { - return Promise.resolve({ - ok: true, status: 200, - json: async () => ({ tip_id: 'todoist:task-1', score: 0.8 }), - } as any); - } - return Promise.resolve({ ok: false, status: 500, json: async () => ({}) } as any); - }); - - const { status, body } = await post(`${baseUrl}/api/recommend`); - expect(status).toBe(200); - expect(body.tip.source).toBe('todoist'); - expect(body.tip.kind).toBe('task'); - - const rows = await testDb.select().from(tipScores); - const row = rows[rows.length - 1]; - expect(row.tipKind).toBe('task'); - expect(row.promptVersion).toBeNull(); - expect(row.llmModel).toBeNull(); - }); - - it('writes prompt_version + llm_model when LLM tip is served', async () => { - globalThis.fetch = vi.fn().mockImplementation((url: string) => { - if (String(url).includes('todoist.com')) { - return Promise.resolve({ - ok: true, status: 200, - json: async () => ({ results: [] }), - } as any); - } - if (String(url).includes('/generate')) { + if (String(url).includes('/recommend')) { return Promise.resolve({ ok: true, status: 200, json: async () => ({ - candidates: [{ id: 'adv-1', content: 'Take a break.', rationale: 'You deserve it.' }], + tip: { id: 'adv-1', content: 'Take a break.', rationale: 'You deserve it.' }, model: 'tip-generator', - prompt_version: 'v1', }), } as any); } - if (String(url).includes('/score')) { - return Promise.resolve({ - ok: true, status: 200, - json: async () => ({ tip_id: 'llm:adv-1', score: 0.9 }), - } as any); - } - return Promise.resolve({ ok: false, status: 500, json: async () => ({}) } as any); + return Promise.resolve({ ok: false, status: 500 } as any); }); const { status, body } = await post(`${baseUrl}/api/recommend`); @@ -155,12 +124,14 @@ describe('POST /recommend integration', () => { const rows = await testDb.select().from(tipScores); const row = rows[rows.length - 1]; - expect(row.promptVersion).toBe('v1'); + expect(row.policy).toBe('orchestrator'); + expect(row.promptVersion).toBe('v4-orchestrator'); expect(row.llmModel).toBe('tip-generator'); + expect(row.mlScore).toBeNull(); expect(row.tipKind).toBe('advice'); }); - it('falls back to todoist tip when /generate returns non-200', async () => { + it('falls back to random signal tip when orchestrator fails', async () => { globalThis.fetch = vi.fn().mockImplementation((url: string) => { if (String(url).includes('todoist.com')) { return Promise.resolve({ @@ -170,22 +141,18 @@ describe('POST /recommend integration', () => { }), } as any); } - if (String(url).includes('/generate')) { - return Promise.resolve({ ok: false, status: 502, json: async () => ({}) } as any); - } - if (String(url).includes('/score')) { - return Promise.resolve({ - ok: true, status: 200, - json: async () => ({ tip_id: 'todoist:fallback-1', score: 0.5 }), - } as any); - } - return Promise.resolve({ ok: false, status: 500, json: async () => ({}) } as any); + // /recommend fails → falls back to random signal candidate + return Promise.resolve({ ok: false, status: 502 } as any); }); const { status, body } = await post(`${baseUrl}/api/recommend`); - expect([200, 204]).toContain(status); - if (status === 200) { - expect(body.tip.source).toBe('todoist'); - } + expect(status).toBe(200); + expect(body.tip.source).toBe('todoist'); + + const rows = await testDb.select().from(tipScores); + const row = rows[rows.length - 1]; + expect(row.policy).toBe('random'); + expect(row.promptVersion).toBeNull(); + expect(row.llmModel).toBeNull(); }); }); diff --git a/services/api/src/routes/__tests__/recommender.unit.test.ts b/services/api/src/routes/__tests__/recommender.unit.test.ts index d8d73ae..ad07aff 100644 --- a/services/api/src/routes/__tests__/recommender.unit.test.ts +++ b/services/api/src/routes/__tests__/recommender.unit.test.ts @@ -3,8 +3,7 @@ * These can import directly from the module without any mocking. */ import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; -import { inferReward, dueAgeDays, pickPromptVersion } from '../recommender.js'; -import { config } from '../../config.js'; +import { inferReward, dueAgeDays } from '../recommender.js'; describe('inferReward', () => { it('dismiss → -1', () => expect(inferReward('dismiss', null)).toBe(-1.0)); @@ -38,45 +37,3 @@ describe('dueAgeDays', () => { expect(dueAgeDays({ date: yesterday })).toBeGreaterThan(0); }); }); - -describe('pickPromptVersion', () => { - // Save + restore the original env-driven config field across tests. - let original: string; - beforeEach(() => { original = config.TIP_PROMPT_VERSION; }); - afterEach(() => { (config as { TIP_PROMPT_VERSION: string }).TIP_PROMPT_VERSION = original; }); - - it('empty config → null (let ml/serving pick its default)', () => { - (config as { TIP_PROMPT_VERSION: string }).TIP_PROMPT_VERSION = ''; - expect(pickPromptVersion()).toBeNull(); - }); - - it('whitespace-only config → null', () => { - (config as { TIP_PROMPT_VERSION: string }).TIP_PROMPT_VERSION = ' '; - expect(pickPromptVersion()).toBeNull(); - }); - - it('single value → that value', () => { - (config as { TIP_PROMPT_VERSION: string }).TIP_PROMPT_VERSION = 'v2-mentor'; - expect(pickPromptVersion()).toBe('v2-mentor'); - }); - - it('comma-separated → uniformly samples from the set', () => { - (config as { TIP_PROMPT_VERSION: string }).TIP_PROMPT_VERSION = 'v1,v2-mentor,v3-few-shot'; - const seen = new Set(); - // With 100 trials, the chance of missing any of 3 buckets is (2/3)^100 ≈ 0 — test is reliable. - for (let i = 0; i < 100; i++) { - const picked = pickPromptVersion(); - expect(picked).not.toBeNull(); - seen.add(picked!); - } - expect(seen).toEqual(new Set(['v1', 'v2-mentor', 'v3-few-shot'])); - }); - - it('trims whitespace around comma-separated entries', () => { - (config as { TIP_PROMPT_VERSION: string }).TIP_PROMPT_VERSION = ' v1 , v2-mentor '; - for (let i = 0; i < 20; i++) { - const picked = pickPromptVersion()!; - expect(['v1', 'v2-mentor']).toContain(picked); - } - }); -}); diff --git a/services/api/src/routes/recommender.ts b/services/api/src/routes/recommender.ts index a949c57..5428bd0 100644 --- a/services/api/src/routes/recommender.ts +++ b/services/api/src/routes/recommender.ts @@ -11,57 +11,15 @@ import type { TipCandidate, Signal } from '@oo/shared-types'; import { todoistSource, dueAgeDays } from '../signals/todoist.js'; export { dueAgeDays }; import { SignalAggregator } from '../signals/aggregator.js'; -import { getProfile, type Profile } from '../profile/builder.js'; +import { getActiveAgentOutputs } from './agent-outputs.js'; const router: ExpressRouter = Router(); -/** - * Pick a prompt version for this request. `config.TIP_PROMPT_VERSION` is either - * empty (let ml/serving pick its default), a single version, or a comma-separated - * list to rotate uniformly across requests so the #92 dashboard accumulates - * comparable buckets per variant. Exported for testing. - */ -export function pickPromptVersion(): string | null { - const raw = config.TIP_PROMPT_VERSION.trim(); - if (!raw) return null; - const versions = raw.split(',').map((v) => v.trim()).filter(Boolean); - if (!versions.length) return null; - return versions[Math.floor(Math.random() * versions.length)] ?? null; -} - // --------------------------------------------------------------------------- // Signal aggregator — register sources here as new integrations are added // --------------------------------------------------------------------------- export const aggregator = new SignalAggregator().register(todoistSource); - -// --------------------------------------------------------------------------- -// Candidate cache — stores the last assembled candidate set per user so the -// feedback handler can look up features for reward delivery. -// --------------------------------------------------------------------------- -const candidateCache = new Map(); -export const _clearCandidateCacheForTests = () => { - candidateCache.clear(); - todoistSource.clearCache(); -}; - -// --------------------------------------------------------------------------- -// Shadow-policy registry -// --------------------------------------------------------------------------- -const shadowPolicies = new Map([ - // egreedy-v2 promoted to active policy (ADR-0012). Shadow entry kept for - // rollback toggle; leave disabled in normal operation. - ['egreedy-v2-shadow', { active: false }], -]); - -export function getShadowPolicies() { - return Array.from(shadowPolicies.entries()).map(([name, s]) => ({ name, ...s })); -} - -export function setPolicyActive(name: string, active: boolean): boolean { - if (!shadowPolicies.has(name)) return false; - shadowPolicies.set(name, { active }); - return true; -} +export const _clearSignalCacheForTests = () => todoistSource.clearCache(); // --------------------------------------------------------------------------- // Signal → TipCandidate conversion @@ -78,131 +36,97 @@ function signalToCandidate(signal: Signal): TipCandidate { }; } -// --------------------------------------------------------------------------- -// Stage 2: score candidates via ml/serving bandit -// --------------------------------------------------------------------------- - -async function remotePolicy( - userId: string, - tasks: TipCandidate[], - profile: Profile, - traceparent?: string, -): Promise<{ tipId: string; score: number; policy: string } | null> { - const hour = new Date().getHours(); - const dayOfWeek = new Date().getDay(); - - const body = { - user_id: userId, - candidates: tasks.map((t) => ({ - id: t.id, - content: t.content, - source: t.source, - source_id: t.sourceId ?? null, - features: t.features, - })), - context: { hour_of_day: hour, day_of_week: dayOfWeek }, - profile_features: profile, - }; - - try { - const res = await fetch(`${config.ML_SERVING_URL}/score/egreedy/v2`, { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...(traceparent ? { traceparent } : {}) }, - body: JSON.stringify(body), - signal: AbortSignal.timeout(3000), - }); - if (!res.ok) return null; - const data = (await res.json()) as { tip_id: string; score: number }; - return { tipId: data.tip_id, score: data.score, policy: 'egreedy-v2' }; - } catch { - return null; - } -} - function randomPolicy(candidates: TipCandidate[]): TipCandidate | null { if (!candidates.length) return null; return candidates[Math.floor(Math.random() * candidates.length)]; } // --------------------------------------------------------------------------- -// Stage 1b: fetch LLM candidates from ml/serving /generate +// Shadow-policy registry — kept for step-10 cleanup; no active shadows. +// --------------------------------------------------------------------------- +const shadowPolicies = new Map([ + ['egreedy-v2-shadow', { active: false }], +]); + +export function getShadowPolicies() { + return Array.from(shadowPolicies.entries()).map(([name, s]) => ({ name, ...s })); +} + +export function setPolicyActive(name: string, active: boolean): boolean { + if (!shadowPolicies.has(name)) return false; + shadowPolicies.set(name, { active }); + return true; +} + +// --------------------------------------------------------------------------- +// Orchestrator: fetch agent snippets + call ml/serving /recommend // --------------------------------------------------------------------------- -interface LlmCandidate { - id: string; - content: string; - rationale?: string; -} - -interface LlmGenerateResult { - candidates: TipCandidate[]; - promptVersion: string | null; +interface OrchestratorResult { + tip: TipCandidate; model: string | null; + agentIds: string[]; } -async function fetchLlmCandidates( +async function fetchOrchestratorTip( userId: string, signals: Signal[], hour: number, dayOfWeek: number, - promptVersion: string | null, - profile: Profile, traceparent?: string, -): Promise { +): Promise { + const agentRows = await getActiveAgentOutputs(userId); + const agentOutputs = agentRows.map((r) => ({ + agent_id: r.agentId, + prompt_text: r.promptText, + })); + + const tasks = signals.slice(0, 10).map((s) => ({ + content: s.content, + priority: s.features.priority, + is_overdue: s.features.is_overdue, + task_age_days: s.features.task_age_days, + })); + try { - const tasks = signals.slice(0, 10).map((s) => ({ - content: s.content, - priority: s.features.priority, - is_overdue: s.features.is_overdue, - task_age_days: s.features.task_age_days, - })); - const res = await fetch(`${config.ML_SERVING_URL}/generate`, { + const res = await fetch(`${config.ML_SERVING_URL}/recommend`, { method: 'POST', headers: { 'Content-Type': 'application/json', ...(traceparent ? { traceparent } : {}) }, - body: JSON.stringify({ - user_id: userId, - context: { tasks, hour_of_day: hour, day_of_week: dayOfWeek }, - n: 3, - profile_features: profile, - ...(promptVersion ? { prompt_version: promptVersion } : {}), - }), + body: JSON.stringify({ user_id: userId, agent_outputs: agentOutputs, tasks, hour_of_day: hour, day_of_week: dayOfWeek }), signal: AbortSignal.timeout(15_000), }); - if (!res.ok) return { candidates: [], promptVersion: null, model: null }; + if (!res.ok) return null; const data = (await res.json()) as { - candidates: LlmCandidate[]; + tip: { id: string; content: string; rationale?: string }; model?: string; - prompt_version?: string; }; const now = new Date().toISOString(); - const candidates: TipCandidate[] = data.candidates.map((c) => ({ - id: `llm:${c.id}`, - content: c.content, - source: 'llm' as const, - kind: 'advice' as const, - rationale: c.rationale, - createdAt: now, - features: { is_overdue: false, task_age_days: 0, priority: 1 }, - })); return { - candidates, - promptVersion: data.prompt_version ?? null, + tip: { + id: `llm:${data.tip.id}`, + content: data.tip.content, + source: 'llm' as const, + kind: 'advice' as const, + rationale: data.tip.rationale, + createdAt: now, + features: { is_overdue: false, task_age_days: 0, priority: 1 }, + }, model: data.model ?? null, + agentIds: agentOutputs.map((a) => a.agent_id), }; } catch { - return { candidates: [], promptVersion: null, model: null }; + return null; } } // --------------------------------------------------------------------------- // POST /api/recommend -// Pipeline: [Stage 1] assemble candidates → [Stage 2] score → [Stage 3] serve +// Pipeline: fetch signals → orchestrator → serve; random fallback on failure // --------------------------------------------------------------------------- router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Response) => { const hour = new Date().getHours(); const dayOfWeek = new Date().getDay(); - // Fail fast if no source tokens are connected const anyToken = await db .select({ id: integrationTokens.id }) .from(integrationTokens) @@ -214,49 +138,19 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re return; } - // Stage 1: assemble candidates — aggregated signals + LLM-generated advice (parallel) const signals = await aggregator.fetchAll(req.userId!); - // Refresh + load the user-level profile feature dict (lazy TTL refresh). - const profile = await getProfile(req.userId!); - - const signalCandidates = signals.map(signalToCandidate); - const requestedPromptVersion = pickPromptVersion(); - const llmResult = await fetchLlmCandidates( - req.userId!, - signals, - hour, - dayOfWeek, - requestedPromptVersion, - profile, - req.traceparent, - ); - - const allCandidates: TipCandidate[] = [...signalCandidates, ...llmResult.candidates]; - if (!allCandidates.length) { - res.status(204).end(); - return; - } - - // Cache candidates so the feedback handler can retrieve features - candidateCache.set(req.userId!, allCandidates); const t0 = Date.now(); - - // Stage 2: score — egreedy bandit with random fallback - const scored = await remotePolicy(req.userId!, allCandidates, profile, req.traceparent); + const orchestrated = await fetchOrchestratorTip(req.userId!, signals, hour, dayOfWeek, req.traceparent); const latencyMs = Date.now() - t0; - const tip = scored - ? (allCandidates.find((t) => t.id === scored.tipId) ?? randomPolicy(allCandidates)) - : randomPolicy(allCandidates); + const tip = orchestrated?.tip ?? randomPolicy(signals.map(signalToCandidate)); if (!tip) { res.status(204).end(); return; } - // Stage 3: serve + log - const policy = scored ? scored.policy : 'random'; - const isLlmTip = tip.source === 'llm'; + const policy = orchestrated ? 'orchestrator' : 'random'; const servedAt = new Date().toISOString(); await db.insert(tipViews).values({ id: nanoid(), userId: req.userId!, tipId: tip.id, servedAt }); @@ -266,19 +160,17 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re userId: req.userId!, tipId: tip.id, policy, - mlScore: scored ? Math.round(scored.score * 1000) : null, - featuresJson: JSON.stringify({ - ...tip.features, - hour_of_day: hour, - day_of_week: dayOfWeek, - }), - candidateCount: allCandidates.length, + mlScore: null, + featuresJson: JSON.stringify( + orchestrated + ? { agent_ids: orchestrated.agentIds, hour_of_day: hour, day_of_week: dayOfWeek } + : { ...tip.features, hour_of_day: hour, day_of_week: dayOfWeek }, + ), + candidateCount: orchestrated ? 1 : signals.length, latencyMs, servedAt, - // Trust the version/model the generator reports; falls back to whatever - // we asked for so the bucket isn't mislabeled if /generate omits it. - promptVersion: isLlmTip ? (llmResult.promptVersion ?? requestedPromptVersion ?? null) : null, - llmModel: isLlmTip ? (llmResult.model ?? 'tip-generator') : null, + promptVersion: orchestrated ? 'v4-orchestrator' : null, + llmModel: orchestrated ? orchestrated.model : null, tipKind: tip.kind ?? null, }); @@ -289,56 +181,6 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re servedAt, }); - // Run shadow policies (fire-and-forget, no effect on user) - for (const [name, s] of shadowPolicies) { - if (!s.active) continue; - if (name.startsWith('random')) { - const shadowTip = randomPolicy(allCandidates); - bus.publish('signals.tip.served', { - userId: req.userId!, - tipId: shadowTip?.id ?? 'none', - policy: `shadow:${name}`, - servedAt, - }); - } else if (name === 'egreedy-v2-shadow') { - // Call v2 endpoint with the same payload used for the active policy. - // No reward is delivered — offline sim is the reward measurement for shadow. - void (async () => { - try { - const body = { - user_id: req.userId!, - candidates: allCandidates.map((t) => ({ - id: t.id, - content: t.content, - source: t.source, - source_id: t.sourceId ?? null, - features: t.features, - })), - context: { hour_of_day: hour, day_of_week: dayOfWeek }, - profile_features: profile, - }; - const res = await fetch(`${config.ML_SERVING_URL}/score/egreedy/v2`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(body), - signal: AbortSignal.timeout(3000), - }); - if (res.ok) { - const data = (await res.json()) as { tip_id: string }; - bus.publish('signals.tip.served', { - userId: req.userId!, - tipId: data.tip_id, - policy: `shadow:${name}`, - servedAt, - }); - } - } catch { - // shadow is best-effort - } - })(); - } - } - res.json({ tip }); }); @@ -359,60 +201,11 @@ export function inferReward(action: string, dwellMs: number | null): number { if (action === 'snooze') return 0.1; if (action === 'helpful') return 0.5; if (action === 'not_helpful') return -0.5; - // done — use dwell time - if (dwellMs === null || dwellMs < 0) return 0.5; // unknown dwell: neutral positive - if (dwellMs < 15_000) return -0.3; // stale / reflex - if (dwellMs < 120_000) return 1.0; // magic zone - if (dwellMs < 600_000) return 0.6; // good - return 0.3; // eventually -} - -// --------------------------------------------------------------------------- -// Reward delivery with retry (bug #75 — was fire-and-forget) -// --------------------------------------------------------------------------- -async function sendRewardWithRetry( - userId: string, - tipId: string, - reward: number, - features: TipCandidate['features'], - profile: Profile, - traceparent?: string, -): Promise { - const body = JSON.stringify({ - user_id: userId, - tip_id: tipId, - reward, - features, - day_of_week: new Date().getDay(), - profile_features: profile, - }); - - for (let attempt = 1; attempt <= 3; attempt++) { - try { - const res = await fetch(`${config.ML_SERVING_URL}/reward/egreedy/v2`, { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...(traceparent ? { traceparent } : {}) }, - body, - signal: AbortSignal.timeout(3000), - }); - if (res.ok) return; - throw new Error(`HTTP ${res.status}`); - } catch (err: any) { - if (attempt === 3) { - logger.error({ tipId, err }, 'reward: failed after 3 attempts'); - bus.publish('signals.tip.reward_failed', { - userId, - tipId, - reward, - attempts: 3, - error: err.message, - failedAt: new Date().toISOString(), - }); - return; - } - await new Promise((r) => setTimeout(r, 250 * Math.pow(2, attempt))); - } - } + if (dwellMs === null || dwellMs < 0) return 0.5; + if (dwellMs < 15_000) return -0.3; + if (dwellMs < 120_000) return 1.0; + if (dwellMs < 600_000) return 0.6; + return 0.3; } // --------------------------------------------------------------------------- @@ -429,7 +222,6 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest, return; } - // Compute dwell time from the most recent tipViews record for this user+tip let dwellMs: number | null = null; const [lastView] = await db .select({ servedAt: tipViews.servedAt }) @@ -455,11 +247,6 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest, createdAt: now.toISOString(), }); - // Look up cached candidate for reward features; invalidate after - const cached = candidateCache.get(req.userId!); - const candidate = cached?.find((t) => t.id === tipId); - candidateCache.delete(req.userId!); - bus.publish('signals.tip.feedback', { userId: req.userId!, tipId, @@ -469,13 +256,6 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest, createdAt: now.toISOString(), }); - if (candidate) { - // Re-fetch profile for the v2 ridge update; TTL cache makes this near-instant. - const profile = await getProfile(req.userId!); - sendRewardWithRetry(req.userId!, tipId, reward, candidate.features, profile, req.traceparent); - } - - // Delegate action to the owning signal source (e.g. mark done in Todoist) await aggregator.act(req.userId!, tipId, action); res.json({ ok: true }); diff --git a/services/api/src/test/db.ts b/services/api/src/test/db.ts index ca4aceb..f88e8ca 100644 --- a/services/api/src/test/db.ts +++ b/services/api/src/test/db.ts @@ -131,6 +131,17 @@ export function makeTestDb(): DrizzleDb & { rawSqlite: BetterSqlite3Database } { finished_at TEXT ); + CREATE TABLE IF NOT EXISTS agent_outputs ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL REFERENCES users(id), + agent_id TEXT NOT NULL, + prompt_text TEXT NOT NULL, + signals_snapshot TEXT, + computed_at TEXT NOT NULL, + expires_at TEXT NOT NULL, + agent_version TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS sim_events ( id TEXT PRIMARY KEY, run_id TEXT NOT NULL REFERENCES sim_runs(id),