Check getEligibleAgentIds per user in runCycle before calling computeAndStore — agents without consented data sources, silenced by active context, or disabled via preference are skipped rather than computed unconditionally. Eligibility check failure skips the whole user (fail-closed). Skipped count added to cycle-complete log line. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
120 lines
3.3 KiB
TypeScript
120 lines
3.3 KiB
TypeScript
/**
|
|
* Agent pre-compute scheduler (ADR-0013, Step 5).
|
|
*
|
|
* Every 15 minutes: for each user who viewed a tip in the last 48 hours,
|
|
* run all sub-agents and store their prompt snippets in agent_outputs.
|
|
* Also purges rows expired more than 24 hours ago.
|
|
*
|
|
* Agent IDs are fetched from ml/serving /health at start, falling back to
|
|
* a hardcoded list if ml/serving is not yet reachable.
|
|
*/
|
|
|
|
import { db } from '../db/index.js';
|
|
import { agentOutputs, tipViews } from '../db/schema.js';
|
|
import { gt, lt } from 'drizzle-orm';
|
|
import { logger } from '../logger.js';
|
|
import { config } from '../config.js';
|
|
import { computeAndStore } from '../routes/agent-outputs.js';
|
|
import { getEligibleAgentIds } from '../profile/eligibility.js';
|
|
|
|
const FALLBACK_AGENT_IDS = [
|
|
'overdue-task',
|
|
'momentum',
|
|
'time-of-day',
|
|
'recent-patterns',
|
|
'focus-area',
|
|
];
|
|
|
|
const DEFAULT_INTERVAL_MS = 15 * 60 * 1000;
|
|
|
|
async function fetchAgentIds(): Promise<string[]> {
|
|
try {
|
|
const res = await fetch(`${config.ML_SERVING_URL}/health`, {
|
|
signal: AbortSignal.timeout(5_000),
|
|
});
|
|
if (!res.ok) return FALLBACK_AGENT_IDS;
|
|
const data = (await res.json()) as { agents?: string[] };
|
|
return data.agents?.length ? data.agents : FALLBACK_AGENT_IDS;
|
|
} catch {
|
|
return FALLBACK_AGENT_IDS;
|
|
}
|
|
}
|
|
|
|
async function getActiveUserIds(): Promise<string[]> {
|
|
const cutoff = new Date(Date.now() - 48 * 60 * 60 * 1000).toISOString();
|
|
const rows = await db
|
|
.selectDistinct({ userId: tipViews.userId })
|
|
.from(tipViews)
|
|
.where(gt(tipViews.servedAt, cutoff));
|
|
return rows.map((r) => r.userId);
|
|
}
|
|
|
|
async function purgeExpired(): Promise<void> {
|
|
const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
|
|
await db.delete(agentOutputs).where(lt(agentOutputs.expiresAt, cutoff));
|
|
}
|
|
|
|
async function runCycle(agentIds: string[]): Promise<void> {
|
|
let userIds: string[];
|
|
try {
|
|
userIds = await getActiveUserIds();
|
|
} catch (err: any) {
|
|
logger.error({ err }, 'agent-scheduler: failed to query active users');
|
|
return;
|
|
}
|
|
|
|
if (!userIds.length) return;
|
|
|
|
let ok = 0;
|
|
let failed = 0;
|
|
|
|
let skipped = 0;
|
|
for (const userId of userIds) {
|
|
let eligible: Set<string>;
|
|
try {
|
|
eligible = await getEligibleAgentIds(userId);
|
|
} catch (err: any) {
|
|
logger.error({ err, userId }, 'agent-scheduler: eligibility check failed, skipping user');
|
|
skipped += agentIds.length;
|
|
continue;
|
|
}
|
|
|
|
for (const agentId of agentIds) {
|
|
if (!eligible.has(agentId)) {
|
|
skipped++;
|
|
continue;
|
|
}
|
|
try {
|
|
await computeAndStore(userId, agentId);
|
|
ok++;
|
|
} catch (err: any) {
|
|
failed++;
|
|
logger.error({ err, userId, agentId }, 'agent-scheduler: compute error');
|
|
}
|
|
}
|
|
}
|
|
|
|
try {
|
|
await purgeExpired();
|
|
} catch (err: any) {
|
|
logger.error({ err }, 'agent-scheduler: purge failed');
|
|
}
|
|
|
|
logger.info(
|
|
{ ok, failed, skipped, users: userIds.length, agents: agentIds.length },
|
|
'agent-scheduler: cycle complete',
|
|
);
|
|
}
|
|
|
|
export async function startAgentPrecomputeScheduler(
|
|
intervalMs = DEFAULT_INTERVAL_MS,
|
|
): Promise<void> {
|
|
const agentIds = await fetchAgentIds();
|
|
logger.info({ agentIds }, 'agent-scheduler: starting');
|
|
|
|
setTimeout(() => {
|
|
void runCycle(agentIds);
|
|
setInterval(() => void runCycle(agentIds), intervalMs);
|
|
}, 15_000);
|
|
}
|