diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 2077df5..ae5b2e2 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -25,6 +25,7 @@ import { requireAdmin } from './middleware/admin.js'; import type { Request, Response } from 'express'; import { connectNats } from './events/nats.js'; import { startTodoistSyncScheduler } from './signals/scheduler.js'; +import { startAgentPrecomputeScheduler } from './signals/agent-scheduler.js'; import { bus } from './events/bus.js'; import { registerProfileSubscriptions } from './profile/subscriber.js'; @@ -110,6 +111,7 @@ if (config.NATS_URL) { } startTodoistSyncScheduler(config.TODOIST_SYNC_INTERVAL_MS); +void startAgentPrecomputeScheduler(); // Profile features are invalidated on relevant signals (#81 phase B.2); // TTL stays as a safety net for clock drift / dropped events. diff --git a/services/api/src/routes/agent-outputs.ts b/services/api/src/routes/agent-outputs.ts index 8dbc1b3..1a95d9d 100644 --- a/services/api/src/routes/agent-outputs.ts +++ b/services/api/src/routes/agent-outputs.ts @@ -4,7 +4,7 @@ import { db } from '../db/index.js'; import { agentOutputs, tipFeedback, tipViews } from '../db/schema.js'; import { eq, and, gt, lt } from 'drizzle-orm'; import { config } from '../config.js'; -import { getProfile } from '../profile/builder.js'; +import { getProfile, type Profile } from '../profile/builder.js'; import { todoistSource } from '../signals/todoist.js'; import { SignalAggregator } from '../signals/aggregator.js'; import type { Request, Response } from 'express'; @@ -77,9 +77,63 @@ router.get('/active-users', async (req: Request, res: Response) => { } }); +// ── Core compute logic (used by route + scheduler) ─────────────────────────── + +export async function computeAndStore(userId: string, agentId: string): Promise { + let tasks: object[] = []; + try { + const signals = await _agentAggregator.fetchAll(userId); + tasks = signals.map((s) => ({ + id: s.id, + content: s.content, + priority: (s.features.priority as number) ?? 1, + is_overdue: Boolean(s.features.is_overdue), + task_age_days: (s.features.task_age_days as number) ?? 0, + project_id: (s.metadata as Record).project_id ?? null, + })); + } catch { + // No integration or fetch error — agents that need tasks will report "no tasks" + } + + let profile: Profile = {}; + try { + profile = await getProfile(userId); + } catch {} + + const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString(); + const feedbackRows = await db + .select({ action: tipFeedback.action, dwellMs: tipFeedback.dwellMs, createdAt: tipFeedback.createdAt }) + .from(tipFeedback) + .where(and(eq(tipFeedback.userId, userId), gt(tipFeedback.createdAt, sevenDaysAgo))); + + const feedbackHistory = feedbackRows.map((f) => ({ + action: f.action, + dwell_ms: f.dwellMs, + created_at: f.createdAt, + })); + + const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory }), + signal: AbortSignal.timeout(15_000), + }); + + if (!mlResp.ok) { + const detail = await mlResp.text().catch(() => ''); + throw new Error(`ml/serving /agents/${agentId}/compute returned ${mlResp.status}: ${detail}`); + } + + const output = await mlResp.json() as { + user_id: string; agent_id: string; prompt_text: string; + signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string; + }; + + await storeAgentOutput(output); +} + // ── POST /api/agents/:agentId/compute ───────────────────────────────────────── // Orchestrating endpoint for per-(user, agent) compute tasks. -// Fetches all signals, calls ml/serving /agents/{agentId}/compute, stores result. // Body: { user_id: string } router.post('/:agentId/compute', async (req: Request, res: Response) => { @@ -94,64 +148,11 @@ router.post('/:agentId/compute', async (req: Request, res: Response) => { } try { - // Fetch tasks via Todoist integration (gracefully empty if not connected). - let tasks: object[] = []; - try { - const signals = await _agentAggregator.fetchAll(user_id); - tasks = signals.map((s) => ({ - id: s.id, - content: s.content, - priority: (s.features.priority as number) ?? 1, - is_overdue: Boolean(s.features.is_overdue), - task_age_days: (s.features.task_age_days as number) ?? 0, - project_id: (s.metadata as Record).project_id ?? null, - })); - } catch { - // No integration or fetch error — agents that need tasks will report "no tasks" - } - - // Fetch profile features (lazy-refreshed from DB). - let profile: Record = {}; - try { - profile = await getProfile(user_id); - } catch {} - - // Fetch last 7 days of feedback for RecentPatternsAgent. - const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString(); - const feedbackRows = await db - .select({ action: tipFeedback.action, dwellMs: tipFeedback.dwellMs, createdAt: tipFeedback.createdAt }) - .from(tipFeedback) - .where(and(eq(tipFeedback.userId, user_id), gt(tipFeedback.createdAt, sevenDaysAgo))); - - const feedbackHistory = feedbackRows.map((f) => ({ - action: f.action, - dwell_ms: f.dwellMs, - created_at: f.createdAt, - })); - - // Call ml/serving to run the agent. - const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ user_id, tasks, profile, feedback_history: feedbackHistory }), - signal: AbortSignal.timeout(15_000), - }); - - if (!mlResp.ok) { - const detail = await mlResp.text().catch(() => ''); - res.status(502).json({ error: `ml/serving returned ${mlResp.status}`, detail }); - return; - } - - const output = await mlResp.json() as { - user_id: string; agent_id: string; prompt_text: string; - signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string; - }; - - await storeAgentOutput(output); - res.json({ ok: true, agent_id: output.agent_id, user_id: output.user_id, expires_at: output.expires_at }); + await computeAndStore(user_id, agentId); + res.json({ ok: true, agent_id: agentId, user_id }); } catch (err: any) { - res.status(500).json({ error: err.message }); + const status = err.message?.includes('returned 4') ? 422 : 500; + res.status(status).json({ error: err.message }); } }); diff --git a/services/api/src/signals/agent-scheduler.ts b/services/api/src/signals/agent-scheduler.ts new file mode 100644 index 0000000..36c35a8 --- /dev/null +++ b/services/api/src/signals/agent-scheduler.ts @@ -0,0 +1,105 @@ +/** + * Agent pre-compute scheduler (ADR-0013, Step 5). + * + * Every 15 minutes: for each user who viewed a tip in the last 48 hours, + * run all sub-agents and store their prompt snippets in agent_outputs. + * Also purges rows expired more than 24 hours ago. + * + * Agent IDs are fetched from ml/serving /health at start, falling back to + * a hardcoded list if ml/serving is not yet reachable. + */ + +import { db } from '../db/index.js'; +import { agentOutputs, tipViews } from '../db/schema.js'; +import { gt, lt } from 'drizzle-orm'; +import { logger } from '../logger.js'; +import { config } from '../config.js'; +import { computeAndStore } from '../routes/agent-outputs.js'; + +const FALLBACK_AGENT_IDS = [ + 'overdue-task', + 'momentum', + 'time-of-day', + 'recent-patterns', + 'focus-area', +]; + +const DEFAULT_INTERVAL_MS = 15 * 60 * 1000; + +async function fetchAgentIds(): Promise { + try { + const res = await fetch(`${config.ML_SERVING_URL}/health`, { + signal: AbortSignal.timeout(5_000), + }); + if (!res.ok) return FALLBACK_AGENT_IDS; + const data = (await res.json()) as { agents?: string[] }; + return data.agents?.length ? data.agents : FALLBACK_AGENT_IDS; + } catch { + return FALLBACK_AGENT_IDS; + } +} + +async function getActiveUserIds(): Promise { + const cutoff = new Date(Date.now() - 48 * 60 * 60 * 1000).toISOString(); + const rows = await db + .selectDistinct({ userId: tipViews.userId }) + .from(tipViews) + .where(gt(tipViews.servedAt, cutoff)); + return rows.map((r) => r.userId); +} + +async function purgeExpired(): Promise { + const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString(); + await db.delete(agentOutputs).where(lt(agentOutputs.expiresAt, cutoff)); +} + +async function runCycle(agentIds: string[]): Promise { + let userIds: string[]; + try { + userIds = await getActiveUserIds(); + } catch (err: any) { + logger.error({ err }, 'agent-scheduler: failed to query active users'); + return; + } + + if (!userIds.length) return; + + let ok = 0; + let failed = 0; + + for (const userId of userIds) { + const results = await Promise.allSettled( + agentIds.map((agentId) => computeAndStore(userId, agentId)), + ); + for (const r of results) { + if (r.status === 'fulfilled') ok++; + else { + failed++; + logger.error({ err: r.reason, userId }, 'agent-scheduler: compute error'); + } + } + } + + try { + await purgeExpired(); + } catch (err: any) { + logger.error({ err }, 'agent-scheduler: purge failed'); + } + + logger.info( + { ok, failed, users: userIds.length, agents: agentIds.length }, + 'agent-scheduler: cycle complete', + ); +} + +export async function startAgentPrecomputeScheduler( + intervalMs = DEFAULT_INTERVAL_MS, +): Promise { + const agentIds = await fetchAgentIds(); + logger.info({ agentIds }, 'agent-scheduler: starting'); + + setTimeout(() => { + void runCycle(agentIds); + setInterval(() => void runCycle(agentIds), intervalMs); + }, 15_000); +}