From 56fda0d7378b4aa9e2a428f411055391b0585c64 Mon Sep 17 00:00:00 2001 From: alvis Date: Tue, 12 May 2026 15:45:08 +0000 Subject: [PATCH] chore(scheduler): skip agents whose data sources aren't granted (#128) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Check getEligibleAgentIds per user in runCycle before calling computeAndStore — agents without consented data sources, silenced by active context, or disabled via preference are skipped rather than computed unconditionally. Eligibility check failure skips the whole user (fail-closed). Skipped count added to cycle-complete log line. Co-Authored-By: Claude Sonnet 4.6 --- .../signals/__tests__/agent-scheduler.test.ts | 166 ++++++++++++++++++ services/api/src/signals/agent-scheduler.ts | 17 +- 2 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 services/api/src/signals/__tests__/agent-scheduler.test.ts diff --git a/services/api/src/signals/__tests__/agent-scheduler.test.ts b/services/api/src/signals/__tests__/agent-scheduler.test.ts new file mode 100644 index 0000000..1e18c88 --- /dev/null +++ b/services/api/src/signals/__tests__/agent-scheduler.test.ts @@ -0,0 +1,166 @@ +/** + * Tests for the agent pre-compute scheduler (signals/agent-scheduler.ts). + * + * Key behaviour under test: runCycle calls getEligibleAgentIds per user and + * skips computeAndStore for agents the user hasn't consented to. + */ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; + +vi.mock('../../logger.js', () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), fatal: vi.fn() }, +})); +import { logger } from '../../logger.js'; + +// ── active-user query: db.selectDistinct(...).from(...).where(...) ────────── +let activeUsers: { userId: string }[] = []; +const userWhereMock = vi.fn(async () => activeUsers); +const userFromMock = vi.fn(() => ({ where: userWhereMock })); +const selectDistinctMock = vi.fn(() => ({ from: userFromMock })); + +// ── purge: db.delete(...).where(...) ──────────────────────────────────────── +const deleteWhereMock = vi.fn(async () => ({})); +const deleteMock = vi.fn(() => ({ where: deleteWhereMock })); + +vi.mock('../../db/index.js', () => ({ + db: { selectDistinct: selectDistinctMock, delete: deleteMock }, +})); + +vi.mock('../../db/schema.js', () => ({ + agentOutputs: { expiresAt: 'expires_at' }, + tipViews: { userId: 'user_id', servedAt: 'served_at' }, +})); + +vi.mock('drizzle-orm', () => ({ + gt: vi.fn(), + lt: vi.fn(), + and: vi.fn(), + eq: vi.fn(), + isNull: vi.fn(), +})); + +vi.mock('../../config.js', () => ({ config: { ML_SERVING_URL: 'http://ml' } })); + +// ── computeAndStore — tracks which (user, agent) pairs were computed ──────── +const computeAndStoreMock = vi.fn(async () => {}); +vi.mock('../../routes/agent-outputs.js', () => ({ + computeAndStore: computeAndStoreMock, +})); + +// ── eligibility — replaceable per test ───────────────────────────────────── +let eligibleIds: Set = new Set(); +const getEligibleAgentIdsMock = vi.fn(async (_userId: string) => eligibleIds); +vi.mock('../../profile/eligibility.js', () => ({ + getEligibleAgentIds: getEligibleAgentIdsMock, +})); + +// ml-serving /health — return a fixed agent list +global.fetch = vi.fn(async () => ({ + ok: true, + json: async () => ({ agents: ['overdue-task', 'momentum', 'time-of-day'] }), +})) as unknown as typeof fetch; + +beforeEach(() => { + activeUsers = []; + eligibleIds = new Set(); + computeAndStoreMock.mockClear(); + getEligibleAgentIdsMock.mockClear(); + userWhereMock.mockClear(); + deleteWhereMock.mockClear(); + vi.clearAllMocks(); + vi.useFakeTimers(); + // restore default mocks after clearAllMocks + userWhereMock.mockImplementation(async () => activeUsers); + getEligibleAgentIdsMock.mockImplementation(async () => eligibleIds); + computeAndStoreMock.mockResolvedValue(undefined); + deleteWhereMock.mockResolvedValue({}); + global.fetch = vi.fn(async () => ({ + ok: true, + json: async () => ({ agents: ['overdue-task', 'momentum', 'time-of-day'] }), + })) as unknown as typeof fetch; +}); + +afterEach(() => { + vi.useRealTimers(); +}); + +describe('startAgentPrecomputeScheduler', () => { + it('skips computeAndStore for agents not in the eligibility set', async () => { + activeUsers = [{ userId: 'alice' }]; + eligibleIds = new Set(['momentum']); // only momentum consented + + const { startAgentPrecomputeScheduler } = await import('../agent-scheduler.js'); + startAgentPrecomputeScheduler(60_000); + await vi.advanceTimersByTimeAsync(16_000); + await Promise.resolve(); + + const computed = computeAndStoreMock.mock.calls.map((c) => c[1]); + expect(computed).toEqual(['momentum']); + expect(computed).not.toContain('overdue-task'); + expect(computed).not.toContain('time-of-day'); + }); + + it('skips all agents when eligibility set is empty', async () => { + activeUsers = [{ userId: 'bob' }]; + eligibleIds = new Set(); // no consents + + const { startAgentPrecomputeScheduler } = await import('../agent-scheduler.js'); + startAgentPrecomputeScheduler(60_000); + await vi.advanceTimersByTimeAsync(16_000); + await Promise.resolve(); + + expect(computeAndStoreMock).not.toHaveBeenCalled(); + expect(logger.info).toHaveBeenCalledWith( + expect.objectContaining({ skipped: 3, ok: 0 }), + 'agent-scheduler: cycle complete', + ); + }); + + it('computes all agents when all are eligible', async () => { + activeUsers = [{ userId: 'carol' }]; + eligibleIds = new Set(['overdue-task', 'momentum', 'time-of-day']); + + const { startAgentPrecomputeScheduler } = await import('../agent-scheduler.js'); + startAgentPrecomputeScheduler(60_000); + await vi.advanceTimersByTimeAsync(16_000); + await Promise.resolve(); + + expect(computeAndStoreMock).toHaveBeenCalledTimes(3); + expect(logger.info).toHaveBeenCalledWith( + expect.objectContaining({ ok: 3, skipped: 0 }), + 'agent-scheduler: cycle complete', + ); + }); + + it('skips entire user when eligibility check throws', async () => { + activeUsers = [{ userId: 'dave' }]; + getEligibleAgentIdsMock.mockRejectedValueOnce(new Error('db timeout')); + + const { startAgentPrecomputeScheduler } = await import('../agent-scheduler.js'); + startAgentPrecomputeScheduler(60_000); + await vi.advanceTimersByTimeAsync(16_000); + await Promise.resolve(); + + expect(computeAndStoreMock).not.toHaveBeenCalled(); + expect(logger.error).toHaveBeenCalledWith( + expect.objectContaining({ err: expect.anything(), userId: 'dave' }), + 'agent-scheduler: eligibility check failed, skipping user', + ); + }); + + it('checks eligibility independently per user', async () => { + activeUsers = [{ userId: 'u1' }, { userId: 'u2' }]; + getEligibleAgentIdsMock.mockImplementation(async (userId: string) => + userId === 'u1' ? new Set(['momentum']) : new Set(['overdue-task', 'time-of-day']), + ); + + const { startAgentPrecomputeScheduler } = await import('../agent-scheduler.js'); + startAgentPrecomputeScheduler(60_000); + await vi.advanceTimersByTimeAsync(16_000); + await Promise.resolve(); + + const u1Calls = computeAndStoreMock.mock.calls.filter((c) => c[0] === 'u1').map((c) => c[1]); + const u2Calls = computeAndStoreMock.mock.calls.filter((c) => c[0] === 'u2').map((c) => c[1]); + expect(u1Calls).toEqual(['momentum']); + expect(u2Calls.sort()).toEqual(['overdue-task', 'time-of-day']); + }); +}); diff --git a/services/api/src/signals/agent-scheduler.ts b/services/api/src/signals/agent-scheduler.ts index 42c6db3..45267d3 100644 --- a/services/api/src/signals/agent-scheduler.ts +++ b/services/api/src/signals/agent-scheduler.ts @@ -15,6 +15,7 @@ import { gt, lt } from 'drizzle-orm'; import { logger } from '../logger.js'; import { config } from '../config.js'; import { computeAndStore } from '../routes/agent-outputs.js'; +import { getEligibleAgentIds } from '../profile/eligibility.js'; const FALLBACK_AGENT_IDS = [ 'overdue-task', @@ -67,8 +68,22 @@ async function runCycle(agentIds: string[]): Promise { let ok = 0; let failed = 0; + let skipped = 0; for (const userId of userIds) { + let eligible: Set; + try { + eligible = await getEligibleAgentIds(userId); + } catch (err: any) { + logger.error({ err, userId }, 'agent-scheduler: eligibility check failed, skipping user'); + skipped += agentIds.length; + continue; + } + for (const agentId of agentIds) { + if (!eligible.has(agentId)) { + skipped++; + continue; + } try { await computeAndStore(userId, agentId); ok++; @@ -86,7 +101,7 @@ async function runCycle(agentIds: string[]): Promise { } logger.info( - { ok, failed, users: userIds.length, agents: agentIds.length }, + { ok, failed, skipped, users: userIds.length, agents: agentIds.length }, 'agent-scheduler: cycle complete', ); }