feat(api): orchestrator cutover — replace bandit with multi-agent pipeline (ADR-0013 step 6)

POST /recommend now calls ml/serving /recommend with pre-computed agent
snippets + task context instead of /generate + /score/egreedy/v2. Falls
back to a random signal candidate when ml/serving is unavailable.

Removes: remotePolicy, fetchLlmCandidates, sendRewardWithRetry,
candidateCache, pickPromptVersion. Feedback handler keeps inferReward +
tipFeedback writes for observability; reward delivery to the bandit is gone.
tipScores.policy is now 'orchestrator'; promptVersion is 'v4-orchestrator'.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-04 10:37:15 +00:00
parent 7e958a779d
commit c65bedcf68
4 changed files with 117 additions and 402 deletions

View File

@@ -4,6 +4,10 @@
* inside beforeAll (same pattern as admin.test.ts) to avoid TDZ issues. * inside beforeAll (same pattern as admin.test.ts) to avoid TDZ issues.
* Uses http.request (not fetch) as the test client so that globalThis.fetch * Uses http.request (not fetch) as the test client so that globalThis.fetch
* mocking doesn't interfere with the test runner itself. * mocking doesn't interfere with the test runner itself.
*
* The orchestrator path (ADR-0013): signals fetched for task context/fallback,
* then ml/serving /recommend called. agent_outputs table is empty in tests so
* the orchestrator always uses the raw-task fallback path.
*/ */
import { describe, it, expect, vi, beforeAll, afterEach } from 'vitest'; import { describe, it, expect, vi, beforeAll, afterEach } from 'vitest';
import express from 'express'; import express from 'express';
@@ -48,7 +52,7 @@ describe('POST /recommend integration', () => {
let server: http.Server; let server: http.Server;
let baseUrl: string; let baseUrl: string;
let savedFetch: typeof globalThis.fetch; let savedFetch: typeof globalThis.fetch;
let clearCache: () => void; let clearSignalCache: () => void;
beforeAll(async () => { beforeAll(async () => {
await testDb.insert(users).values({ await testDb.insert(users).values({
@@ -58,11 +62,12 @@ describe('POST /recommend integration', () => {
await testDb.insert(integrationTokens).values({ await testDb.insert(integrationTokens).values({
id: 'tok-1', userId: 'user-1', provider: 'todoist', id: 'tok-1', userId: 'user-1', provider: 'todoist',
accessToken: 'fake-token', connectedAt: new Date().toISOString(), accessToken: 'fake-token', connectedAt: new Date().toISOString(),
tokenStatus: 'active',
}); });
const mod = await import('../recommender.js'); const mod = await import('../recommender.js');
const { recommenderRouter } = mod; const { recommenderRouter } = mod;
clearCache = (mod as any)._clearCandidateCacheForTests; clearSignalCache = (mod as any)._clearSignalCacheForTests;
const app = express(); const app = express();
app.use(express.json()); app.use(express.json());
app.use('/api', recommenderRouter); app.use('/api', recommenderRouter);
@@ -74,19 +79,22 @@ describe('POST /recommend integration', () => {
afterEach(() => { afterEach(() => {
globalThis.fetch = savedFetch; globalThis.fetch = savedFetch;
clearCache?.(); clearSignalCache?.();
}); });
it('returns 204 when Todoist + LLM both return empty', async () => { it('returns 204 when Todoist is empty and orchestrator fails', async () => {
globalThis.fetch = vi.fn().mockResolvedValue({ globalThis.fetch = vi.fn().mockImplementation((url: string) => {
ok: true, status: 200, if (String(url).includes('todoist.com')) {
json: async () => ({ results: [] }), return Promise.resolve({ ok: true, status: 200, json: async () => ({ results: [] }) } as any);
} as any); }
// /recommend fails → orchestrator returns null, random fallback also empty → 204
return Promise.resolve({ ok: false, status: 503 } as any);
});
const { status } = await post(`${baseUrl}/api/recommend`); const { status } = await post(`${baseUrl}/api/recommend`);
expect(status).toBe(204); expect(status).toBe(204);
}); });
it('serves todoist tip and writes correct tip_scores columns', async () => { it('serves orchestrator tip and writes correct tip_scores columns', async () => {
globalThis.fetch = vi.fn().mockImplementation((url: string) => { globalThis.fetch = vi.fn().mockImplementation((url: string) => {
if (String(url).includes('todoist.com')) { if (String(url).includes('todoist.com')) {
return Promise.resolve({ return Promise.resolve({
@@ -96,55 +104,16 @@ describe('POST /recommend integration', () => {
}), }),
} as any); } as any);
} }
if (String(url).includes('/generate')) { if (String(url).includes('/recommend')) {
return Promise.resolve({ ok: false, status: 503, json: async () => ({}) } as any);
}
if (String(url).includes('/score')) {
return Promise.resolve({
ok: true, status: 200,
json: async () => ({ tip_id: 'todoist:task-1', score: 0.8 }),
} as any);
}
return Promise.resolve({ ok: false, status: 500, json: async () => ({}) } as any);
});
const { status, body } = await post(`${baseUrl}/api/recommend`);
expect(status).toBe(200);
expect(body.tip.source).toBe('todoist');
expect(body.tip.kind).toBe('task');
const rows = await testDb.select().from(tipScores);
const row = rows[rows.length - 1];
expect(row.tipKind).toBe('task');
expect(row.promptVersion).toBeNull();
expect(row.llmModel).toBeNull();
});
it('writes prompt_version + llm_model when LLM tip is served', async () => {
globalThis.fetch = vi.fn().mockImplementation((url: string) => {
if (String(url).includes('todoist.com')) {
return Promise.resolve({
ok: true, status: 200,
json: async () => ({ results: [] }),
} as any);
}
if (String(url).includes('/generate')) {
return Promise.resolve({ return Promise.resolve({
ok: true, status: 200, ok: true, status: 200,
json: async () => ({ json: async () => ({
candidates: [{ id: 'adv-1', content: 'Take a break.', rationale: 'You deserve it.' }], tip: { id: 'adv-1', content: 'Take a break.', rationale: 'You deserve it.' },
model: 'tip-generator', model: 'tip-generator',
prompt_version: 'v1',
}), }),
} as any); } as any);
} }
if (String(url).includes('/score')) { return Promise.resolve({ ok: false, status: 500 } as any);
return Promise.resolve({
ok: true, status: 200,
json: async () => ({ tip_id: 'llm:adv-1', score: 0.9 }),
} as any);
}
return Promise.resolve({ ok: false, status: 500, json: async () => ({}) } as any);
}); });
const { status, body } = await post(`${baseUrl}/api/recommend`); const { status, body } = await post(`${baseUrl}/api/recommend`);
@@ -155,12 +124,14 @@ describe('POST /recommend integration', () => {
const rows = await testDb.select().from(tipScores); const rows = await testDb.select().from(tipScores);
const row = rows[rows.length - 1]; const row = rows[rows.length - 1];
expect(row.promptVersion).toBe('v1'); expect(row.policy).toBe('orchestrator');
expect(row.promptVersion).toBe('v4-orchestrator');
expect(row.llmModel).toBe('tip-generator'); expect(row.llmModel).toBe('tip-generator');
expect(row.mlScore).toBeNull();
expect(row.tipKind).toBe('advice'); expect(row.tipKind).toBe('advice');
}); });
it('falls back to todoist tip when /generate returns non-200', async () => { it('falls back to random signal tip when orchestrator fails', async () => {
globalThis.fetch = vi.fn().mockImplementation((url: string) => { globalThis.fetch = vi.fn().mockImplementation((url: string) => {
if (String(url).includes('todoist.com')) { if (String(url).includes('todoist.com')) {
return Promise.resolve({ return Promise.resolve({
@@ -170,22 +141,18 @@ describe('POST /recommend integration', () => {
}), }),
} as any); } as any);
} }
if (String(url).includes('/generate')) { // /recommend fails → falls back to random signal candidate
return Promise.resolve({ ok: false, status: 502, json: async () => ({}) } as any); return Promise.resolve({ ok: false, status: 502 } as any);
}
if (String(url).includes('/score')) {
return Promise.resolve({
ok: true, status: 200,
json: async () => ({ tip_id: 'todoist:fallback-1', score: 0.5 }),
} as any);
}
return Promise.resolve({ ok: false, status: 500, json: async () => ({}) } as any);
}); });
const { status, body } = await post(`${baseUrl}/api/recommend`); const { status, body } = await post(`${baseUrl}/api/recommend`);
expect([200, 204]).toContain(status); expect(status).toBe(200);
if (status === 200) { expect(body.tip.source).toBe('todoist');
expect(body.tip.source).toBe('todoist');
} const rows = await testDb.select().from(tipScores);
const row = rows[rows.length - 1];
expect(row.policy).toBe('random');
expect(row.promptVersion).toBeNull();
expect(row.llmModel).toBeNull();
}); });
}); });

View File

@@ -3,8 +3,7 @@
* These can import directly from the module without any mocking. * These can import directly from the module without any mocking.
*/ */
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { inferReward, dueAgeDays, pickPromptVersion } from '../recommender.js'; import { inferReward, dueAgeDays } from '../recommender.js';
import { config } from '../../config.js';
describe('inferReward', () => { describe('inferReward', () => {
it('dismiss → -1', () => expect(inferReward('dismiss', null)).toBe(-1.0)); it('dismiss → -1', () => expect(inferReward('dismiss', null)).toBe(-1.0));
@@ -38,45 +37,3 @@ describe('dueAgeDays', () => {
expect(dueAgeDays({ date: yesterday })).toBeGreaterThan(0); expect(dueAgeDays({ date: yesterday })).toBeGreaterThan(0);
}); });
}); });
describe('pickPromptVersion', () => {
// Save + restore the original env-driven config field across tests.
let original: string;
beforeEach(() => { original = config.TIP_PROMPT_VERSION; });
afterEach(() => { (config as { TIP_PROMPT_VERSION: string }).TIP_PROMPT_VERSION = original; });
it('empty config → null (let ml/serving pick its default)', () => {
(config as { TIP_PROMPT_VERSION: string }).TIP_PROMPT_VERSION = '';
expect(pickPromptVersion()).toBeNull();
});
it('whitespace-only config → null', () => {
(config as { TIP_PROMPT_VERSION: string }).TIP_PROMPT_VERSION = ' ';
expect(pickPromptVersion()).toBeNull();
});
it('single value → that value', () => {
(config as { TIP_PROMPT_VERSION: string }).TIP_PROMPT_VERSION = 'v2-mentor';
expect(pickPromptVersion()).toBe('v2-mentor');
});
it('comma-separated → uniformly samples from the set', () => {
(config as { TIP_PROMPT_VERSION: string }).TIP_PROMPT_VERSION = 'v1,v2-mentor,v3-few-shot';
const seen = new Set<string>();
// With 100 trials, the chance of missing any of 3 buckets is (2/3)^100 ≈ 0 — test is reliable.
for (let i = 0; i < 100; i++) {
const picked = pickPromptVersion();
expect(picked).not.toBeNull();
seen.add(picked!);
}
expect(seen).toEqual(new Set(['v1', 'v2-mentor', 'v3-few-shot']));
});
it('trims whitespace around comma-separated entries', () => {
(config as { TIP_PROMPT_VERSION: string }).TIP_PROMPT_VERSION = ' v1 , v2-mentor ';
for (let i = 0; i < 20; i++) {
const picked = pickPromptVersion()!;
expect(['v1', 'v2-mentor']).toContain(picked);
}
});
});

View File

@@ -11,57 +11,15 @@ import type { TipCandidate, Signal } from '@oo/shared-types';
import { todoistSource, dueAgeDays } from '../signals/todoist.js'; import { todoistSource, dueAgeDays } from '../signals/todoist.js';
export { dueAgeDays }; export { dueAgeDays };
import { SignalAggregator } from '../signals/aggregator.js'; import { SignalAggregator } from '../signals/aggregator.js';
import { getProfile, type Profile } from '../profile/builder.js'; import { getActiveAgentOutputs } from './agent-outputs.js';
const router: ExpressRouter = Router(); const router: ExpressRouter = Router();
/**
* Pick a prompt version for this request. `config.TIP_PROMPT_VERSION` is either
* empty (let ml/serving pick its default), a single version, or a comma-separated
* list to rotate uniformly across requests so the #92 dashboard accumulates
* comparable buckets per variant. Exported for testing.
*/
export function pickPromptVersion(): string | null {
const raw = config.TIP_PROMPT_VERSION.trim();
if (!raw) return null;
const versions = raw.split(',').map((v) => v.trim()).filter(Boolean);
if (!versions.length) return null;
return versions[Math.floor(Math.random() * versions.length)] ?? null;
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Signal aggregator — register sources here as new integrations are added // Signal aggregator — register sources here as new integrations are added
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
export const aggregator = new SignalAggregator().register(todoistSource); export const aggregator = new SignalAggregator().register(todoistSource);
export const _clearSignalCacheForTests = () => todoistSource.clearCache();
// ---------------------------------------------------------------------------
// Candidate cache — stores the last assembled candidate set per user so the
// feedback handler can look up features for reward delivery.
// ---------------------------------------------------------------------------
const candidateCache = new Map<string, TipCandidate[]>();
export const _clearCandidateCacheForTests = () => {
candidateCache.clear();
todoistSource.clearCache();
};
// ---------------------------------------------------------------------------
// Shadow-policy registry
// ---------------------------------------------------------------------------
const shadowPolicies = new Map<string, { active: boolean }>([
// egreedy-v2 promoted to active policy (ADR-0012). Shadow entry kept for
// rollback toggle; leave disabled in normal operation.
['egreedy-v2-shadow', { active: false }],
]);
export function getShadowPolicies() {
return Array.from(shadowPolicies.entries()).map(([name, s]) => ({ name, ...s }));
}
export function setPolicyActive(name: string, active: boolean): boolean {
if (!shadowPolicies.has(name)) return false;
shadowPolicies.set(name, { active });
return true;
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Signal → TipCandidate conversion // Signal → TipCandidate conversion
@@ -78,131 +36,97 @@ function signalToCandidate(signal: Signal): TipCandidate {
}; };
} }
// ---------------------------------------------------------------------------
// Stage 2: score candidates via ml/serving bandit
// ---------------------------------------------------------------------------
async function remotePolicy(
userId: string,
tasks: TipCandidate[],
profile: Profile,
traceparent?: string,
): Promise<{ tipId: string; score: number; policy: string } | null> {
const hour = new Date().getHours();
const dayOfWeek = new Date().getDay();
const body = {
user_id: userId,
candidates: tasks.map((t) => ({
id: t.id,
content: t.content,
source: t.source,
source_id: t.sourceId ?? null,
features: t.features,
})),
context: { hour_of_day: hour, day_of_week: dayOfWeek },
profile_features: profile,
};
try {
const res = await fetch(`${config.ML_SERVING_URL}/score/egreedy/v2`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...(traceparent ? { traceparent } : {}) },
body: JSON.stringify(body),
signal: AbortSignal.timeout(3000),
});
if (!res.ok) return null;
const data = (await res.json()) as { tip_id: string; score: number };
return { tipId: data.tip_id, score: data.score, policy: 'egreedy-v2' };
} catch {
return null;
}
}
function randomPolicy(candidates: TipCandidate[]): TipCandidate | null { function randomPolicy(candidates: TipCandidate[]): TipCandidate | null {
if (!candidates.length) return null; if (!candidates.length) return null;
return candidates[Math.floor(Math.random() * candidates.length)]; return candidates[Math.floor(Math.random() * candidates.length)];
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Stage 1b: fetch LLM candidates from ml/serving /generate // Shadow-policy registry — kept for step-10 cleanup; no active shadows.
// ---------------------------------------------------------------------------
const shadowPolicies = new Map<string, { active: boolean }>([
['egreedy-v2-shadow', { active: false }],
]);
export function getShadowPolicies() {
return Array.from(shadowPolicies.entries()).map(([name, s]) => ({ name, ...s }));
}
export function setPolicyActive(name: string, active: boolean): boolean {
if (!shadowPolicies.has(name)) return false;
shadowPolicies.set(name, { active });
return true;
}
// ---------------------------------------------------------------------------
// Orchestrator: fetch agent snippets + call ml/serving /recommend
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
interface LlmCandidate { interface OrchestratorResult {
id: string; tip: TipCandidate;
content: string;
rationale?: string;
}
interface LlmGenerateResult {
candidates: TipCandidate[];
promptVersion: string | null;
model: string | null; model: string | null;
agentIds: string[];
} }
async function fetchLlmCandidates( async function fetchOrchestratorTip(
userId: string, userId: string,
signals: Signal[], signals: Signal[],
hour: number, hour: number,
dayOfWeek: number, dayOfWeek: number,
promptVersion: string | null,
profile: Profile,
traceparent?: string, traceparent?: string,
): Promise<LlmGenerateResult> { ): Promise<OrchestratorResult | null> {
const agentRows = await getActiveAgentOutputs(userId);
const agentOutputs = agentRows.map((r) => ({
agent_id: r.agentId,
prompt_text: r.promptText,
}));
const tasks = signals.slice(0, 10).map((s) => ({
content: s.content,
priority: s.features.priority,
is_overdue: s.features.is_overdue,
task_age_days: s.features.task_age_days,
}));
try { try {
const tasks = signals.slice(0, 10).map((s) => ({ const res = await fetch(`${config.ML_SERVING_URL}/recommend`, {
content: s.content,
priority: s.features.priority,
is_overdue: s.features.is_overdue,
task_age_days: s.features.task_age_days,
}));
const res = await fetch(`${config.ML_SERVING_URL}/generate`, {
method: 'POST', method: 'POST',
headers: { 'Content-Type': 'application/json', ...(traceparent ? { traceparent } : {}) }, headers: { 'Content-Type': 'application/json', ...(traceparent ? { traceparent } : {}) },
body: JSON.stringify({ body: JSON.stringify({ user_id: userId, agent_outputs: agentOutputs, tasks, hour_of_day: hour, day_of_week: dayOfWeek }),
user_id: userId,
context: { tasks, hour_of_day: hour, day_of_week: dayOfWeek },
n: 3,
profile_features: profile,
...(promptVersion ? { prompt_version: promptVersion } : {}),
}),
signal: AbortSignal.timeout(15_000), signal: AbortSignal.timeout(15_000),
}); });
if (!res.ok) return { candidates: [], promptVersion: null, model: null }; if (!res.ok) return null;
const data = (await res.json()) as { const data = (await res.json()) as {
candidates: LlmCandidate[]; tip: { id: string; content: string; rationale?: string };
model?: string; model?: string;
prompt_version?: string;
}; };
const now = new Date().toISOString(); const now = new Date().toISOString();
const candidates: TipCandidate[] = data.candidates.map((c) => ({
id: `llm:${c.id}`,
content: c.content,
source: 'llm' as const,
kind: 'advice' as const,
rationale: c.rationale,
createdAt: now,
features: { is_overdue: false, task_age_days: 0, priority: 1 },
}));
return { return {
candidates, tip: {
promptVersion: data.prompt_version ?? null, id: `llm:${data.tip.id}`,
content: data.tip.content,
source: 'llm' as const,
kind: 'advice' as const,
rationale: data.tip.rationale,
createdAt: now,
features: { is_overdue: false, task_age_days: 0, priority: 1 },
},
model: data.model ?? null, model: data.model ?? null,
agentIds: agentOutputs.map((a) => a.agent_id),
}; };
} catch { } catch {
return { candidates: [], promptVersion: null, model: null }; return null;
} }
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// POST /api/recommend // POST /api/recommend
// Pipeline: [Stage 1] assemble candidates → [Stage 2] score → [Stage 3] serve // Pipeline: fetch signals → orchestrator → serve; random fallback on failure
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Response) => { router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
const hour = new Date().getHours(); const hour = new Date().getHours();
const dayOfWeek = new Date().getDay(); const dayOfWeek = new Date().getDay();
// Fail fast if no source tokens are connected
const anyToken = await db const anyToken = await db
.select({ id: integrationTokens.id }) .select({ id: integrationTokens.id })
.from(integrationTokens) .from(integrationTokens)
@@ -214,49 +138,19 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
return; return;
} }
// Stage 1: assemble candidates — aggregated signals + LLM-generated advice (parallel)
const signals = await aggregator.fetchAll(req.userId!); const signals = await aggregator.fetchAll(req.userId!);
// Refresh + load the user-level profile feature dict (lazy TTL refresh).
const profile = await getProfile(req.userId!);
const signalCandidates = signals.map(signalToCandidate);
const requestedPromptVersion = pickPromptVersion();
const llmResult = await fetchLlmCandidates(
req.userId!,
signals,
hour,
dayOfWeek,
requestedPromptVersion,
profile,
req.traceparent,
);
const allCandidates: TipCandidate[] = [...signalCandidates, ...llmResult.candidates];
if (!allCandidates.length) {
res.status(204).end();
return;
}
// Cache candidates so the feedback handler can retrieve features
candidateCache.set(req.userId!, allCandidates);
const t0 = Date.now(); const t0 = Date.now();
const orchestrated = await fetchOrchestratorTip(req.userId!, signals, hour, dayOfWeek, req.traceparent);
// Stage 2: score — egreedy bandit with random fallback
const scored = await remotePolicy(req.userId!, allCandidates, profile, req.traceparent);
const latencyMs = Date.now() - t0; const latencyMs = Date.now() - t0;
const tip = scored
? (allCandidates.find((t) => t.id === scored.tipId) ?? randomPolicy(allCandidates))
: randomPolicy(allCandidates);
const tip = orchestrated?.tip ?? randomPolicy(signals.map(signalToCandidate));
if (!tip) { if (!tip) {
res.status(204).end(); res.status(204).end();
return; return;
} }
// Stage 3: serve + log const policy = orchestrated ? 'orchestrator' : 'random';
const policy = scored ? scored.policy : 'random';
const isLlmTip = tip.source === 'llm';
const servedAt = new Date().toISOString(); const servedAt = new Date().toISOString();
await db.insert(tipViews).values({ id: nanoid(), userId: req.userId!, tipId: tip.id, servedAt }); await db.insert(tipViews).values({ id: nanoid(), userId: req.userId!, tipId: tip.id, servedAt });
@@ -266,19 +160,17 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
userId: req.userId!, userId: req.userId!,
tipId: tip.id, tipId: tip.id,
policy, policy,
mlScore: scored ? Math.round(scored.score * 1000) : null, mlScore: null,
featuresJson: JSON.stringify({ featuresJson: JSON.stringify(
...tip.features, orchestrated
hour_of_day: hour, ? { agent_ids: orchestrated.agentIds, hour_of_day: hour, day_of_week: dayOfWeek }
day_of_week: dayOfWeek, : { ...tip.features, hour_of_day: hour, day_of_week: dayOfWeek },
}), ),
candidateCount: allCandidates.length, candidateCount: orchestrated ? 1 : signals.length,
latencyMs, latencyMs,
servedAt, servedAt,
// Trust the version/model the generator reports; falls back to whatever promptVersion: orchestrated ? 'v4-orchestrator' : null,
// we asked for so the bucket isn't mislabeled if /generate omits it. llmModel: orchestrated ? orchestrated.model : null,
promptVersion: isLlmTip ? (llmResult.promptVersion ?? requestedPromptVersion ?? null) : null,
llmModel: isLlmTip ? (llmResult.model ?? 'tip-generator') : null,
tipKind: tip.kind ?? null, tipKind: tip.kind ?? null,
}); });
@@ -289,56 +181,6 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
servedAt, servedAt,
}); });
// Run shadow policies (fire-and-forget, no effect on user)
for (const [name, s] of shadowPolicies) {
if (!s.active) continue;
if (name.startsWith('random')) {
const shadowTip = randomPolicy(allCandidates);
bus.publish('signals.tip.served', {
userId: req.userId!,
tipId: shadowTip?.id ?? 'none',
policy: `shadow:${name}`,
servedAt,
});
} else if (name === 'egreedy-v2-shadow') {
// Call v2 endpoint with the same payload used for the active policy.
// No reward is delivered — offline sim is the reward measurement for shadow.
void (async () => {
try {
const body = {
user_id: req.userId!,
candidates: allCandidates.map((t) => ({
id: t.id,
content: t.content,
source: t.source,
source_id: t.sourceId ?? null,
features: t.features,
})),
context: { hour_of_day: hour, day_of_week: dayOfWeek },
profile_features: profile,
};
const res = await fetch(`${config.ML_SERVING_URL}/score/egreedy/v2`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
signal: AbortSignal.timeout(3000),
});
if (res.ok) {
const data = (await res.json()) as { tip_id: string };
bus.publish('signals.tip.served', {
userId: req.userId!,
tipId: data.tip_id,
policy: `shadow:${name}`,
servedAt,
});
}
} catch {
// shadow is best-effort
}
})();
}
}
res.json({ tip }); res.json({ tip });
}); });
@@ -359,60 +201,11 @@ export function inferReward(action: string, dwellMs: number | null): number {
if (action === 'snooze') return 0.1; if (action === 'snooze') return 0.1;
if (action === 'helpful') return 0.5; if (action === 'helpful') return 0.5;
if (action === 'not_helpful') return -0.5; if (action === 'not_helpful') return -0.5;
// done — use dwell time if (dwellMs === null || dwellMs < 0) return 0.5;
if (dwellMs === null || dwellMs < 0) return 0.5; // unknown dwell: neutral positive if (dwellMs < 15_000) return -0.3;
if (dwellMs < 15_000) return -0.3; // stale / reflex if (dwellMs < 120_000) return 1.0;
if (dwellMs < 120_000) return 1.0; // magic zone if (dwellMs < 600_000) return 0.6;
if (dwellMs < 600_000) return 0.6; // good return 0.3;
return 0.3; // eventually
}
// ---------------------------------------------------------------------------
// Reward delivery with retry (bug #75 — was fire-and-forget)
// ---------------------------------------------------------------------------
async function sendRewardWithRetry(
userId: string,
tipId: string,
reward: number,
features: TipCandidate['features'],
profile: Profile,
traceparent?: string,
): Promise<void> {
const body = JSON.stringify({
user_id: userId,
tip_id: tipId,
reward,
features,
day_of_week: new Date().getDay(),
profile_features: profile,
});
for (let attempt = 1; attempt <= 3; attempt++) {
try {
const res = await fetch(`${config.ML_SERVING_URL}/reward/egreedy/v2`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...(traceparent ? { traceparent } : {}) },
body,
signal: AbortSignal.timeout(3000),
});
if (res.ok) return;
throw new Error(`HTTP ${res.status}`);
} catch (err: any) {
if (attempt === 3) {
logger.error({ tipId, err }, 'reward: failed after 3 attempts');
bus.publish('signals.tip.reward_failed', {
userId,
tipId,
reward,
attempts: 3,
error: err.message,
failedAt: new Date().toISOString(),
});
return;
}
await new Promise((r) => setTimeout(r, 250 * Math.pow(2, attempt)));
}
}
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -429,7 +222,6 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest,
return; return;
} }
// Compute dwell time from the most recent tipViews record for this user+tip
let dwellMs: number | null = null; let dwellMs: number | null = null;
const [lastView] = await db const [lastView] = await db
.select({ servedAt: tipViews.servedAt }) .select({ servedAt: tipViews.servedAt })
@@ -455,11 +247,6 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest,
createdAt: now.toISOString(), createdAt: now.toISOString(),
}); });
// Look up cached candidate for reward features; invalidate after
const cached = candidateCache.get(req.userId!);
const candidate = cached?.find((t) => t.id === tipId);
candidateCache.delete(req.userId!);
bus.publish('signals.tip.feedback', { bus.publish('signals.tip.feedback', {
userId: req.userId!, userId: req.userId!,
tipId, tipId,
@@ -469,13 +256,6 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest,
createdAt: now.toISOString(), createdAt: now.toISOString(),
}); });
if (candidate) {
// Re-fetch profile for the v2 ridge update; TTL cache makes this near-instant.
const profile = await getProfile(req.userId!);
sendRewardWithRetry(req.userId!, tipId, reward, candidate.features, profile, req.traceparent);
}
// Delegate action to the owning signal source (e.g. mark done in Todoist)
await aggregator.act(req.userId!, tipId, action); await aggregator.act(req.userId!, tipId, action);
res.json({ ok: true }); res.json({ ok: true });

View File

@@ -131,6 +131,17 @@ export function makeTestDb(): DrizzleDb & { rawSqlite: BetterSqlite3Database } {
finished_at TEXT finished_at TEXT
); );
CREATE TABLE IF NOT EXISTS agent_outputs (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL REFERENCES users(id),
agent_id TEXT NOT NULL,
prompt_text TEXT NOT NULL,
signals_snapshot TEXT,
computed_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
agent_version TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS sim_events ( CREATE TABLE IF NOT EXISTS sim_events (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
run_id TEXT NOT NULL REFERENCES sim_runs(id), run_id TEXT NOT NULL REFERENCES sim_runs(id),