import { Router, type Request, type Response, type IRouter } from 'express'; import { nanoid } from 'nanoid'; import { db } from '../db/index.js'; import { agentOutputs, tipFeedback, tipViews, userPreferences } from '../db/schema.js'; import { eq, and, gt, lt } from 'drizzle-orm'; import { config } from '../config.js'; import { getProfile, type Profile } from '../profile/builder.js'; import { todoistSource } from '../signals/todoist.js'; import { SignalAggregator } from '../signals/aggregator.js'; const router: IRouter = Router(); // Separate aggregator instance — avoids circular dep with recommender.ts. const _agentAggregator = new SignalAggregator().register(todoistSource); // ── Internal auth helper ────────────────────────────────────────────────────── function checkInternalToken(req: Request, res: Response): boolean { const token = req.headers['x-internal-token']; if (!config.INTERNAL_API_TOKEN || token !== config.INTERNAL_API_TOKEN) { res.status(401).json({ error: 'Unauthorized' }); return false; } return true; } // ── DB helpers ──────────────────────────────────────────────────────────────── export async function getActiveAgentOutputs(userId: string) { const now = new Date().toISOString(); return db .select() .from(agentOutputs) .where(and(eq(agentOutputs.userId, userId), gt(agentOutputs.expiresAt, now))); } async function storeAgentOutput(output: { user_id: string; agent_id: string; prompt_text: string; signals_snapshot?: unknown; computed_at: string; expires_at: string; agent_version: string; }) { await db .delete(agentOutputs) .where(and(eq(agentOutputs.userId, output.user_id), eq(agentOutputs.agentId, output.agent_id))); await db.insert(agentOutputs).values({ id: nanoid(), userId: output.user_id, agentId: output.agent_id, promptText: output.prompt_text, signalsSnapshot: output.signals_snapshot ? JSON.stringify(output.signals_snapshot) : null, computedAt: output.computed_at, expiresAt: output.expires_at, agentVersion: output.agent_version, }); } // ── GET /api/agents/active-users ────────────────────────────────────────────── // Returns user IDs that have requested a tip in the last 48 hours. // Returns user IDs for fan-out precompute tasks. router.get('/active-users', async (req: Request, res: Response) => { if (!checkInternalToken(req, res)) return; const cutoff = new Date(Date.now() - 48 * 60 * 60 * 1000).toISOString(); try { const rows = await db .selectDistinct({ userId: tipViews.userId }) .from(tipViews) .where(gt(tipViews.servedAt, cutoff)); res.json({ user_ids: rows.map((r) => r.userId) }); } catch (err: any) { res.status(500).json({ error: err.message }); } }); // ── Core compute logic (used by route + scheduler) ─────────────────────────── /** Load agent prefs for a user from user_preferences, merging user+inferred. * User source wins: if both exist, the 'user' row is returned. */ async function loadAgentPrefs(userId: string, agentId: string): Promise> { const scope = `agent:${agentId}`; const rows = await db .select({ key: userPreferences.key, valueJson: userPreferences.valueJson, source: userPreferences.source }) .from(userPreferences) .where(and(eq(userPreferences.userId, userId), eq(userPreferences.scope, scope))); // Build merged dict: 'user' source takes precedence over 'inferred' const merged: Record = {}; for (const row of rows) { try { const value = JSON.parse(row.valueJson); const existing = merged[row.key]; if (!existing || row.source === 'user') { merged[row.key] = { value, source: row.source }; } } catch { // skip malformed } } return Object.fromEntries(Object.entries(merged).map(([k, v]) => [k, v.value])); } /** Persist inferred prefs to user_preferences, skipping keys the user has explicitly set. */ async function persistInferredPrefs( userId: string, agentId: string, inferredPrefs: Record, ): Promise { if (!Object.keys(inferredPrefs).length) return; const scope = `agent:${agentId}`; const now = new Date().toISOString(); for (const [key, value] of Object.entries(inferredPrefs)) { const valueJson = JSON.stringify(value); await db .insert(userPreferences) .values({ userId, scope, key, valueJson, source: 'inferred', updatedAt: now }) .onConflictDoUpdate({ target: [userPreferences.userId, userPreferences.scope, userPreferences.key], set: { valueJson, updatedAt: now }, // Only overwrite rows already marked inferred; user overrides are untouched. setWhere: eq(userPreferences.source, 'inferred'), }); } } export async function computeAndStore(userId: string, agentId: string): Promise { let tasks: object[] = []; try { const signals = await _agentAggregator.fetchAll(userId); tasks = signals.map((s) => ({ id: s.id, content: s.content, priority: (s.features.priority as number) ?? 1, is_overdue: Boolean(s.features.is_overdue), task_age_days: (s.features.task_age_days as number) ?? 0, project_id: (s.metadata as Record).project_id ?? null, })); } catch { // No integration or fetch error — agents that need tasks will report "no tasks" } let profile: Profile = {}; try { profile = await getProfile(userId); } catch {} const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString(); const feedbackRows = await db .select({ action: tipFeedback.action, dwellMs: tipFeedback.dwellMs, createdAt: tipFeedback.createdAt }) .from(tipFeedback) .where(and(eq(tipFeedback.userId, userId), gt(tipFeedback.createdAt, sevenDaysAgo))); const feedbackHistory = feedbackRows.map((f) => ({ action: f.action, dwell_ms: f.dwellMs, created_at: f.createdAt, })); // Load agent prefs (user overrides + previous inferences) to inject into the compute call. const agentPrefs = await loadAgentPrefs(userId, agentId); const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs }), signal: AbortSignal.timeout(15_000), }); if (!mlResp.ok) { const detail = await mlResp.text().catch(() => ''); throw new Error(`ml/serving /agents/${agentId}/compute returned ${mlResp.status}: ${detail}`); } const output = await mlResp.json() as { user_id: string; agent_id: string; prompt_text: string; signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string; }; await storeAgentOutput(output); // Run inference framework for this agent and persist results. // Failures are non-fatal — the compute result is already stored. try { const inferResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/infer`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ user_id: userId, feedback_history: feedbackHistory }), signal: AbortSignal.timeout(10_000), }); if (inferResp.ok) { const inferResult = await inferResp.json() as { inferred_prefs: Record }; await persistInferredPrefs(userId, agentId, inferResult.inferred_prefs); } } catch { // inference failure is non-fatal } } // ── POST /api/agents/:agentId/compute ───────────────────────────────────────── // Orchestrating endpoint for per-(user, agent) compute tasks. // Body: { user_id: string } router.post('/:agentId/compute', async (req: Request, res: Response) => { if (!checkInternalToken(req, res)) return; const { agentId } = req.params as { agentId: string }; const { user_id } = req.body as { user_id: string }; if (!user_id) { res.status(422).json({ error: 'Missing user_id' }); return; } try { await computeAndStore(user_id, agentId); res.json({ ok: true, agent_id: agentId, user_id }); } catch (err: any) { const status = err.message?.includes('returned 4') ? 422 : 500; res.status(status).json({ error: err.message }); } }); // ── POST /api/agents/outputs ────────────────────────────────────────────────── // Stores a pre-computed agent output directly (used if the DAG calls ml/serving // itself and pushes the result separately). router.post('/outputs', async (req: Request, res: Response) => { if (!checkInternalToken(req, res)) return; const { user_id, agent_id, prompt_text, signals_snapshot, computed_at, expires_at, agent_version } = req.body as Record; if (!user_id || !agent_id || !prompt_text || !computed_at || !expires_at || !agent_version) { res.status(422).json({ error: 'Missing required fields: user_id, agent_id, prompt_text, computed_at, expires_at, agent_version', }); return; } try { await storeAgentOutput({ user_id, agent_id, prompt_text, signals_snapshot, computed_at, expires_at, agent_version }); res.json({ ok: true }); } catch (err: any) { res.status(500).json({ error: err.message }); } }); // ── DELETE /api/agents/outputs/expired ─────────────────────────────────────── // Purges rows expired more than 24 hours ago. router.delete('/outputs/expired', async (req: Request, res: Response) => { if (!checkInternalToken(req, res)) return; const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString(); try { await db.delete(agentOutputs).where(lt(agentOutputs.expiresAt, cutoff)); res.json({ ok: true }); } catch (err: any) { res.status(500).json({ error: err.message }); } }); // ── GET /api/agents/:userId/outputs ────────────────────────────────────────── // Returns non-expired agent outputs. Admin observability; recommender calls // getActiveAgentOutputs() directly (no HTTP hop). router.get('/:userId/outputs', async (req: Request, res: Response) => { const { userId } = req.params as { userId: string }; try { const rows = await getActiveAgentOutputs(userId); res.json({ user_id: userId, outputs: rows.map((r) => ({ agent_id: r.agentId, prompt_text: r.promptText, computed_at: r.computedAt, expires_at: r.expiresAt, agent_version: r.agentVersion, })), }); } catch (err: any) { res.status(500).json({ error: err.message }); } }); export default router;