feat(api): agent pre-compute scheduler (ADR-0013 step 5)
Extracts computeAndStore() from the /agents/:agentId/compute route so it can be called without an HTTP round-trip. startAgentPrecomputeScheduler() runs every 15 min: fetches active users (tip view in 48h), runs all agents in parallel per user, then purges outputs expired >24h. Agent IDs are resolved from ml/serving /health at startup with a fallback hardcoded list. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -25,6 +25,7 @@ import { requireAdmin } from './middleware/admin.js';
|
|||||||
import type { Request, Response } from 'express';
|
import type { Request, Response } from 'express';
|
||||||
import { connectNats } from './events/nats.js';
|
import { connectNats } from './events/nats.js';
|
||||||
import { startTodoistSyncScheduler } from './signals/scheduler.js';
|
import { startTodoistSyncScheduler } from './signals/scheduler.js';
|
||||||
|
import { startAgentPrecomputeScheduler } from './signals/agent-scheduler.js';
|
||||||
import { bus } from './events/bus.js';
|
import { bus } from './events/bus.js';
|
||||||
import { registerProfileSubscriptions } from './profile/subscriber.js';
|
import { registerProfileSubscriptions } from './profile/subscriber.js';
|
||||||
|
|
||||||
@@ -110,6 +111,7 @@ if (config.NATS_URL) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
startTodoistSyncScheduler(config.TODOIST_SYNC_INTERVAL_MS);
|
startTodoistSyncScheduler(config.TODOIST_SYNC_INTERVAL_MS);
|
||||||
|
void startAgentPrecomputeScheduler();
|
||||||
|
|
||||||
// Profile features are invalidated on relevant signals (#81 phase B.2);
|
// Profile features are invalidated on relevant signals (#81 phase B.2);
|
||||||
// TTL stays as a safety net for clock drift / dropped events.
|
// TTL stays as a safety net for clock drift / dropped events.
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import { db } from '../db/index.js';
|
|||||||
import { agentOutputs, tipFeedback, tipViews } from '../db/schema.js';
|
import { agentOutputs, tipFeedback, tipViews } from '../db/schema.js';
|
||||||
import { eq, and, gt, lt } from 'drizzle-orm';
|
import { eq, and, gt, lt } from 'drizzle-orm';
|
||||||
import { config } from '../config.js';
|
import { config } from '../config.js';
|
||||||
import { getProfile } from '../profile/builder.js';
|
import { getProfile, type Profile } from '../profile/builder.js';
|
||||||
import { todoistSource } from '../signals/todoist.js';
|
import { todoistSource } from '../signals/todoist.js';
|
||||||
import { SignalAggregator } from '../signals/aggregator.js';
|
import { SignalAggregator } from '../signals/aggregator.js';
|
||||||
import type { Request, Response } from 'express';
|
import type { Request, Response } from 'express';
|
||||||
@@ -77,9 +77,63 @@ router.get('/active-users', async (req: Request, res: Response) => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// ── Core compute logic (used by route + scheduler) ───────────────────────────
|
||||||
|
|
||||||
|
export async function computeAndStore(userId: string, agentId: string): Promise<void> {
|
||||||
|
let tasks: object[] = [];
|
||||||
|
try {
|
||||||
|
const signals = await _agentAggregator.fetchAll(userId);
|
||||||
|
tasks = signals.map((s) => ({
|
||||||
|
id: s.id,
|
||||||
|
content: s.content,
|
||||||
|
priority: (s.features.priority as number) ?? 1,
|
||||||
|
is_overdue: Boolean(s.features.is_overdue),
|
||||||
|
task_age_days: (s.features.task_age_days as number) ?? 0,
|
||||||
|
project_id: (s.metadata as Record<string, unknown>).project_id ?? null,
|
||||||
|
}));
|
||||||
|
} catch {
|
||||||
|
// No integration or fetch error — agents that need tasks will report "no tasks"
|
||||||
|
}
|
||||||
|
|
||||||
|
let profile: Profile = {};
|
||||||
|
try {
|
||||||
|
profile = await getProfile(userId);
|
||||||
|
} catch {}
|
||||||
|
|
||||||
|
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString();
|
||||||
|
const feedbackRows = await db
|
||||||
|
.select({ action: tipFeedback.action, dwellMs: tipFeedback.dwellMs, createdAt: tipFeedback.createdAt })
|
||||||
|
.from(tipFeedback)
|
||||||
|
.where(and(eq(tipFeedback.userId, userId), gt(tipFeedback.createdAt, sevenDaysAgo)));
|
||||||
|
|
||||||
|
const feedbackHistory = feedbackRows.map((f) => ({
|
||||||
|
action: f.action,
|
||||||
|
dwell_ms: f.dwellMs,
|
||||||
|
created_at: f.createdAt,
|
||||||
|
}));
|
||||||
|
|
||||||
|
const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory }),
|
||||||
|
signal: AbortSignal.timeout(15_000),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!mlResp.ok) {
|
||||||
|
const detail = await mlResp.text().catch(() => '');
|
||||||
|
throw new Error(`ml/serving /agents/${agentId}/compute returned ${mlResp.status}: ${detail}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const output = await mlResp.json() as {
|
||||||
|
user_id: string; agent_id: string; prompt_text: string;
|
||||||
|
signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
await storeAgentOutput(output);
|
||||||
|
}
|
||||||
|
|
||||||
// ── POST /api/agents/:agentId/compute ─────────────────────────────────────────
|
// ── POST /api/agents/:agentId/compute ─────────────────────────────────────────
|
||||||
// Orchestrating endpoint for per-(user, agent) compute tasks.
|
// Orchestrating endpoint for per-(user, agent) compute tasks.
|
||||||
// Fetches all signals, calls ml/serving /agents/{agentId}/compute, stores result.
|
|
||||||
// Body: { user_id: string }
|
// Body: { user_id: string }
|
||||||
|
|
||||||
router.post('/:agentId/compute', async (req: Request, res: Response) => {
|
router.post('/:agentId/compute', async (req: Request, res: Response) => {
|
||||||
@@ -94,64 +148,11 @@ router.post('/:agentId/compute', async (req: Request, res: Response) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Fetch tasks via Todoist integration (gracefully empty if not connected).
|
await computeAndStore(user_id, agentId);
|
||||||
let tasks: object[] = [];
|
res.json({ ok: true, agent_id: agentId, user_id });
|
||||||
try {
|
|
||||||
const signals = await _agentAggregator.fetchAll(user_id);
|
|
||||||
tasks = signals.map((s) => ({
|
|
||||||
id: s.id,
|
|
||||||
content: s.content,
|
|
||||||
priority: (s.features.priority as number) ?? 1,
|
|
||||||
is_overdue: Boolean(s.features.is_overdue),
|
|
||||||
task_age_days: (s.features.task_age_days as number) ?? 0,
|
|
||||||
project_id: (s.metadata as Record<string, unknown>).project_id ?? null,
|
|
||||||
}));
|
|
||||||
} catch {
|
|
||||||
// No integration or fetch error — agents that need tasks will report "no tasks"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch profile features (lazy-refreshed from DB).
|
|
||||||
let profile: Record<string, number | null> = {};
|
|
||||||
try {
|
|
||||||
profile = await getProfile(user_id);
|
|
||||||
} catch {}
|
|
||||||
|
|
||||||
// Fetch last 7 days of feedback for RecentPatternsAgent.
|
|
||||||
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString();
|
|
||||||
const feedbackRows = await db
|
|
||||||
.select({ action: tipFeedback.action, dwellMs: tipFeedback.dwellMs, createdAt: tipFeedback.createdAt })
|
|
||||||
.from(tipFeedback)
|
|
||||||
.where(and(eq(tipFeedback.userId, user_id), gt(tipFeedback.createdAt, sevenDaysAgo)));
|
|
||||||
|
|
||||||
const feedbackHistory = feedbackRows.map((f) => ({
|
|
||||||
action: f.action,
|
|
||||||
dwell_ms: f.dwellMs,
|
|
||||||
created_at: f.createdAt,
|
|
||||||
}));
|
|
||||||
|
|
||||||
// Call ml/serving to run the agent.
|
|
||||||
const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, {
|
|
||||||
method: 'POST',
|
|
||||||
headers: { 'Content-Type': 'application/json' },
|
|
||||||
body: JSON.stringify({ user_id, tasks, profile, feedback_history: feedbackHistory }),
|
|
||||||
signal: AbortSignal.timeout(15_000),
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!mlResp.ok) {
|
|
||||||
const detail = await mlResp.text().catch(() => '');
|
|
||||||
res.status(502).json({ error: `ml/serving returned ${mlResp.status}`, detail });
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const output = await mlResp.json() as {
|
|
||||||
user_id: string; agent_id: string; prompt_text: string;
|
|
||||||
signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string;
|
|
||||||
};
|
|
||||||
|
|
||||||
await storeAgentOutput(output);
|
|
||||||
res.json({ ok: true, agent_id: output.agent_id, user_id: output.user_id, expires_at: output.expires_at });
|
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
res.status(500).json({ error: err.message });
|
const status = err.message?.includes('returned 4') ? 422 : 500;
|
||||||
|
res.status(status).json({ error: err.message });
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
105
services/api/src/signals/agent-scheduler.ts
Normal file
105
services/api/src/signals/agent-scheduler.ts
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
/**
|
||||||
|
* Agent pre-compute scheduler (ADR-0013, Step 5).
|
||||||
|
*
|
||||||
|
* Every 15 minutes: for each user who viewed a tip in the last 48 hours,
|
||||||
|
* run all sub-agents and store their prompt snippets in agent_outputs.
|
||||||
|
* Also purges rows expired more than 24 hours ago.
|
||||||
|
*
|
||||||
|
* Agent IDs are fetched from ml/serving /health at start, falling back to
|
||||||
|
* a hardcoded list if ml/serving is not yet reachable.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { db } from '../db/index.js';
|
||||||
|
import { agentOutputs, tipViews } from '../db/schema.js';
|
||||||
|
import { gt, lt } from 'drizzle-orm';
|
||||||
|
import { logger } from '../logger.js';
|
||||||
|
import { config } from '../config.js';
|
||||||
|
import { computeAndStore } from '../routes/agent-outputs.js';
|
||||||
|
|
||||||
|
const FALLBACK_AGENT_IDS = [
|
||||||
|
'overdue-task',
|
||||||
|
'momentum',
|
||||||
|
'time-of-day',
|
||||||
|
'recent-patterns',
|
||||||
|
'focus-area',
|
||||||
|
];
|
||||||
|
|
||||||
|
const DEFAULT_INTERVAL_MS = 15 * 60 * 1000;
|
||||||
|
|
||||||
|
async function fetchAgentIds(): Promise<string[]> {
|
||||||
|
try {
|
||||||
|
const res = await fetch(`${config.ML_SERVING_URL}/health`, {
|
||||||
|
signal: AbortSignal.timeout(5_000),
|
||||||
|
});
|
||||||
|
if (!res.ok) return FALLBACK_AGENT_IDS;
|
||||||
|
const data = (await res.json()) as { agents?: string[] };
|
||||||
|
return data.agents?.length ? data.agents : FALLBACK_AGENT_IDS;
|
||||||
|
} catch {
|
||||||
|
return FALLBACK_AGENT_IDS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function getActiveUserIds(): Promise<string[]> {
|
||||||
|
const cutoff = new Date(Date.now() - 48 * 60 * 60 * 1000).toISOString();
|
||||||
|
const rows = await db
|
||||||
|
.selectDistinct({ userId: tipViews.userId })
|
||||||
|
.from(tipViews)
|
||||||
|
.where(gt(tipViews.servedAt, cutoff));
|
||||||
|
return rows.map((r) => r.userId);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function purgeExpired(): Promise<void> {
|
||||||
|
const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
|
||||||
|
await db.delete(agentOutputs).where(lt(agentOutputs.expiresAt, cutoff));
|
||||||
|
}
|
||||||
|
|
||||||
|
async function runCycle(agentIds: string[]): Promise<void> {
|
||||||
|
let userIds: string[];
|
||||||
|
try {
|
||||||
|
userIds = await getActiveUserIds();
|
||||||
|
} catch (err: any) {
|
||||||
|
logger.error({ err }, 'agent-scheduler: failed to query active users');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!userIds.length) return;
|
||||||
|
|
||||||
|
let ok = 0;
|
||||||
|
let failed = 0;
|
||||||
|
|
||||||
|
for (const userId of userIds) {
|
||||||
|
const results = await Promise.allSettled(
|
||||||
|
agentIds.map((agentId) => computeAndStore(userId, agentId)),
|
||||||
|
);
|
||||||
|
for (const r of results) {
|
||||||
|
if (r.status === 'fulfilled') ok++;
|
||||||
|
else {
|
||||||
|
failed++;
|
||||||
|
logger.error({ err: r.reason, userId }, 'agent-scheduler: compute error');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await purgeExpired();
|
||||||
|
} catch (err: any) {
|
||||||
|
logger.error({ err }, 'agent-scheduler: purge failed');
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
{ ok, failed, users: userIds.length, agents: agentIds.length },
|
||||||
|
'agent-scheduler: cycle complete',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function startAgentPrecomputeScheduler(
|
||||||
|
intervalMs = DEFAULT_INTERVAL_MS,
|
||||||
|
): Promise<void> {
|
||||||
|
const agentIds = await fetchAgentIds();
|
||||||
|
logger.info({ agentIds }, 'agent-scheduler: starting');
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
void runCycle(agentIds);
|
||||||
|
setInterval(() => void runCycle(agentIds), intervalMs);
|
||||||
|
}, 15_000);
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user