feat: SignalSource abstraction — generalize signal ingestion beyond Todoist (#78)

- Add Signal + SignalSource interfaces to packages/shared-types
- TipCandidate.features widened to Record<string,number|boolean> to match Signal
- TodoistSignalSource: encapsulates fetch, cache, 401 handling, bus events, and act()
- SignalAggregator: parallel fan-out across sources with per-source failure isolation
- Recommender refactored to consume Signal[] via aggregator; source action dispatch via aggregator.act()
- ADR-0009: signal normalization strategy

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-18 01:11:56 +00:00
parent 46dee7377e
commit e3ca3ba733
8 changed files with 289 additions and 122 deletions

View File

@@ -6,22 +6,33 @@ import { eq, and, desc } from 'drizzle-orm';
import { requireAuth, AuthenticatedRequest } from '../middleware/session.js';
import { config } from '../config.js';
import { bus } from '../events/bus.js';
import type { TipCandidate } from '@oo/shared-types';
import type { TipCandidate, Signal } from '@oo/shared-types';
import { todoistSource, dueAgeDays } from '../signals/todoist.js';
export { dueAgeDays };
import { SignalAggregator } from '../signals/aggregator.js';
const router: ExpressRouter = Router();
const CACHE_TTL_MS = 30_000;
const PROMPT_VERSION = 'v1';
const taskCache = new Map<string, { tasks: TipCandidate[]; fetchedAt: number }>();
export const _clearTaskCacheForTests = () => taskCache.clear();
// ---------------------------------------------------------------------------
// Signal aggregator — register sources here as new integrations are added
// ---------------------------------------------------------------------------
export const aggregator = new SignalAggregator().register(todoistSource);
// ---------------------------------------------------------------------------
// Candidate cache — stores the last assembled candidate set per user so the
// feedback handler can look up features for reward delivery.
// ---------------------------------------------------------------------------
const candidateCache = new Map<string, TipCandidate[]>();
export const _clearCandidateCacheForTests = () => {
candidateCache.clear();
todoistSource.clearCache();
};
// ---------------------------------------------------------------------------
// Shadow-policy registry
// ---------------------------------------------------------------------------
// A shadow policy runs alongside the active policy, logs its picks, but does
// NOT affect what the user sees. Promotion to A/B or live is a manual step.
// Structure: Map<policyName, { active: boolean }>
const shadowPolicies = new Map<string, { active: boolean }>([
// Example: enable random as a shadow baseline
// ('random-shadow', { active: true }),
@@ -38,81 +49,24 @@ export function setPolicyActive(name: string, active: boolean): boolean {
}
// ---------------------------------------------------------------------------
// Todoist helpers
// Signal → TipCandidate conversion
// ---------------------------------------------------------------------------
export function dueAgeDays(due: { date?: string; datetime?: string } | null | undefined): number {
if (!due) return 0;
const dateStr = due.datetime ?? due.date;
if (!dateStr) return 0;
const dueMs = new Date(dateStr).getTime();
return Math.max(0, (Date.now() - dueMs) / (1000 * 60 * 60 * 24));
}
async function fetchTodoistTasks(userId: string, accessToken: string): Promise<TipCandidate[]> {
const cached = taskCache.get(userId);
if (cached && Date.now() - cached.fetchedAt < CACHE_TTL_MS) return cached.tasks;
const res = await fetch('https://api.todoist.com/api/v1/tasks?filter=today%7Coverdue', {
headers: { Authorization: `Bearer ${accessToken}` },
});
if (!res.ok) {
if (res.status === 401) {
console.error(`[todoist] token expired for user ${userId}`);
bus.publish('signals.integration.token_expired', {
userId,
provider: 'todoist',
detectedAt: new Date().toISOString(),
});
await db
.update(integrationTokens)
.set({ tokenStatus: 'needs_reconnect' })
.where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'todoist')));
}
return cached?.tasks ?? [];
}
const body = (await res.json()) as {
results: Array<{
id: string;
content: string;
priority: number;
due: { date?: string; datetime?: string; is_recurring?: boolean } | null;
}>;
function signalToCandidate(signal: Signal): TipCandidate {
return {
id: signal.id,
content: signal.content,
source: signal.source as TipCandidate['source'],
kind: signal.kind as TipCandidate['kind'],
sourceId: (signal.metadata.todoistId as string | undefined) ?? undefined,
createdAt: signal.timestamp,
features: signal.features,
};
const now = new Date();
const tasks: TipCandidate[] = (body.results ?? []).map((t) => {
const ageDays = dueAgeDays(t.due);
const isOverdue = ageDays > 0;
return {
id: `todoist:${t.id}`,
content: t.content,
source: 'todoist' as const,
kind: 'task' as const,
sourceId: t.id,
createdAt: now.toISOString(),
features: {
is_overdue: isOverdue,
task_age_days: ageDays,
priority: t.priority ?? 1,
},
};
});
taskCache.set(userId, { tasks, fetchedAt: Date.now() });
bus.publish('signals.task.synced', { userId, count: tasks.length, syncedAt: now.toISOString() });
return tasks;
}
// ---------------------------------------------------------------------------
// Stage 2: score candidates via ml/serving bandit
// ---------------------------------------------------------------------------
/** Call ml/serving for scored selection; returns { tip_id, score } or null on failure */
async function remotePolicy(
userId: string,
tasks: TipCandidate[],
@@ -165,16 +119,16 @@ interface LlmCandidate {
async function fetchLlmCandidates(
userId: string,
todoistTasks: TipCandidate[],
signals: Signal[],
hour: number,
dayOfWeek: number,
): Promise<TipCandidate[]> {
try {
const tasks = todoistTasks.slice(0, 10).map((t) => ({
content: t.content,
priority: t.features.priority,
is_overdue: t.features.is_overdue,
task_age_days: t.features.task_age_days,
const tasks = signals.slice(0, 10).map((s) => ({
content: s.content,
priority: s.features.priority,
is_overdue: s.features.is_overdue,
task_age_days: s.features.task_age_days,
}));
const res = await fetch(`${config.ML_SERVING_URL}/generate`, {
method: 'POST',
@@ -208,32 +162,36 @@ async function fetchLlmCandidates(
// Pipeline: [Stage 1] assemble candidates → [Stage 2] score → [Stage 3] serve
// ---------------------------------------------------------------------------
router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
const [token] = await db
.select()
const hour = new Date().getHours();
const dayOfWeek = new Date().getDay();
// Fail fast if no source tokens are connected
const anyToken = await db
.select({ id: integrationTokens.id })
.from(integrationTokens)
.where(and(eq(integrationTokens.userId, req.userId!), eq(integrationTokens.provider, 'todoist')))
.where(eq(integrationTokens.userId, req.userId!))
.limit(1);
if (!token) {
if (!anyToken.length) {
res.status(422).json({ error: 'No integrations connected' });
return;
}
const hour = new Date().getHours();
const dayOfWeek = new Date().getDay();
// Stage 1: assemble candidates — aggregated signals + LLM-generated advice (parallel)
const signals = await aggregator.fetchAll(req.userId!);
// Stage 1: assemble candidates — Todoist tasks + LLM-generated advice (parallel)
const [todoistTasks, llmCandidates] = await Promise.all([
fetchTodoistTasks(req.userId!, token.accessToken),
fetchLlmCandidates(req.userId!, taskCache.get(req.userId!)?.tasks ?? [], hour, dayOfWeek),
]);
const signalCandidates = signals.map(signalToCandidate);
const llmCandidates = await fetchLlmCandidates(req.userId!, signals, hour, dayOfWeek);
const allCandidates: TipCandidate[] = [...todoistTasks, ...llmCandidates];
const allCandidates: TipCandidate[] = [...signalCandidates, ...llmCandidates];
if (!allCandidates.length) {
res.status(204).end();
return;
}
// Cache candidates so the feedback handler can retrieve features
candidateCache.set(req.userId!, allCandidates);
const t0 = Date.now();
// Stage 2: score — egreedy bandit with random fallback
@@ -262,9 +220,7 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
policy,
mlScore: scored ? Math.round(scored.score * 1000) : null,
featuresJson: JSON.stringify({
is_overdue: tip.features.is_overdue,
task_age_days: tip.features.task_age_days,
priority: tip.features.priority,
...tip.features,
hour_of_day: hour,
day_of_week: dayOfWeek,
}),
@@ -410,9 +366,10 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest,
createdAt: now.toISOString(),
});
const task: TipCandidate | undefined = taskCache.get(req.userId!)?.tasks.find((t) => t.id === tipId);
taskCache.delete(req.userId!);
// Look up cached candidate for reward features; invalidate after
const cached = candidateCache.get(req.userId!);
const candidate = cached?.find((t) => t.id === tipId);
candidateCache.delete(req.userId!);
bus.publish('signals.tip.feedback', {
userId: req.userId!,
@@ -423,26 +380,12 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest,
createdAt: now.toISOString(),
});
if (task) {
sendRewardWithRetry(req.userId!, tipId, reward, task.features);
if (candidate) {
sendRewardWithRetry(req.userId!, tipId, reward, candidate.features);
}
// Mark complete in Todoist if done
if (action === 'done' && tipId.startsWith('todoist:')) {
const todoistId = tipId.slice(8);
const [tok] = await db
.select()
.from(integrationTokens)
.where(and(eq(integrationTokens.userId, req.userId!), eq(integrationTokens.provider, 'todoist')))
.limit(1);
if (tok) {
await fetch(`https://api.todoist.com/api/v1/tasks/${todoistId}/close`, {
method: 'POST',
headers: { Authorization: `Bearer ${tok.accessToken}` },
}).catch(() => {});
}
}
// Delegate action to the owning signal source (e.g. mark done in Todoist)
await aggregator.act(req.userId!, tipId, action);
res.json({ ok: true });
});