From 2a7380933caf5b4e8559ec3804a9fafc8c903287 Mon Sep 17 00:00:00 2001 From: alvis Date: Sat, 18 Apr 2026 01:18:51 +0000 Subject: [PATCH] feat: NATS JetStream + Todoist background sync (#21, #22) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issue 21 — event infrastructure: - NormalizedEvent + payload types in packages/shared-types/src/events/ - Bus.onPublish() hook for side-effect bridges - NATS JetStream adapter (services/api/src/events/nats.ts): connects when NATS_URL is set, creates signals.> and feedback.> streams, bridges all in-process bus publishes to JetStream — no-ops gracefully when NATS is absent - NATS service added to docker-compose (profile: events|full, port 4222/8222) Issue 22 — Todoist background sync: - services/api/src/signals/scheduler.ts: queries all active-token users every 15 min (TODOIST_SYNC_INTERVAL_MS), fan-out via todoistSource.fetchSignals() which emits signals.task.synced; on-demand fetch remains as freshness fallback - NATS_URL + TODOIST_SYNC_INTERVAL_MS added to config Co-Authored-By: Claude Sonnet 4.6 --- infra/docker/docker-compose.yml | 20 ++++++ packages/shared-types/src/events/index.ts | 60 ++++++++++++++++ packages/shared-types/src/index.ts | 1 + pnpm-lock.yaml | 26 ++++++- services/api/package.json | 1 + services/api/src/config.ts | 6 ++ services/api/src/events/bus.ts | 7 ++ services/api/src/events/nats.ts | 84 +++++++++++++++++++++++ services/api/src/index.ts | 8 +++ services/api/src/signals/scheduler.ts | 55 +++++++++++++++ 10 files changed, 267 insertions(+), 1 deletion(-) create mode 100644 packages/shared-types/src/events/index.ts create mode 100644 services/api/src/events/nats.ts create mode 100644 services/api/src/signals/scheduler.ts diff --git a/infra/docker/docker-compose.yml b/infra/docker/docker-compose.yml index 5056d8b..1700f10 100644 --- a/infra/docker/docker-compose.yml +++ b/infra/docker/docker-compose.yml @@ -178,6 +178,26 @@ services: timeout: 5s retries: 5 + # ── events profile — NATS JetStream ───────────────────────────────────── + # Start: docker compose --profile events up + # NATS monitoring: http://localhost:8222 + # Enable in the API by setting NATS_URL=nats://nats:4222 in .env.local + + nats: + image: nats:2.10-alpine + profiles: [events, full] + command: ["-js", "-sd", "/data", "-m", "8222"] + volumes: + - /mnt/ssd/dbs/oo/nats:/data + ports: + - "127.0.0.1:4222:4222" # client connections + - "127.0.0.1:8222:8222" # HTTP monitoring + healthcheck: + test: ["CMD", "wget", "--spider", "-q", "http://localhost:8222/healthz"] + interval: 10s + timeout: 5s + retries: 5 + litellm: image: ghcr.io/berriai/litellm:main-latest profiles: [ai] diff --git a/packages/shared-types/src/events/index.ts b/packages/shared-types/src/events/index.ts new file mode 100644 index 0000000..5802e99 --- /dev/null +++ b/packages/shared-types/src/events/index.ts @@ -0,0 +1,60 @@ +/** + * NormalizedEvent — the durable envelope for all events flowing through + * the system. Today: in-process EventEmitter. Tomorrow: NATS JetStream. + * + * Subject taxonomy: + * signals.task.synced — Todoist (or other source) task list refreshed + * signals.tip.served — tip returned to client + * signals.tip.feedback — user reacted (done / dismiss / snooze / helpful / not_helpful) + * signals.tip.reward_failed — reward delivery to ml/serving failed after retries + * signals.integration.token_expired — OAuth token needs reconnect + */ +export interface NormalizedEvent { + /** NATS-style subject: domain.entity.verb */ + subject: string; + /** ISO 8601 timestamp */ + ts: string; + /** Monotonically increasing sequence number (in-process ring; JetStream seq in prod) */ + seq: number; + payload: T; +} + +// ── Payload types ──────────────────────────────────────────────────────────── + +export interface TaskSyncedPayload { + userId: string; + source: string; // e.g. 'todoist' + count: number; + syncedAt: string; +} + +export interface TipServedPayload { + userId: string; + tipId: string; + policy: string; + servedAt: string; +} + +export interface TipFeedbackPayload { + userId: string; + tipId: string; + action: 'done' | 'dismiss' | 'snooze' | 'helpful' | 'not_helpful'; + reward: number; + dwellMs: number | null; + createdAt: string; +} + +export interface TipRewardFailedPayload { + userId: string; + tipId: string; + reward: number; + attempts: number; + error: string; + failedAt: string; +} + +export interface IntegrationTokenExpiredPayload { + userId: string; + provider: string; + detectedAt: string; +} diff --git a/packages/shared-types/src/index.ts b/packages/shared-types/src/index.ts index 5165daa..b2e376b 100644 --- a/packages/shared-types/src/index.ts +++ b/packages/shared-types/src/index.ts @@ -3,3 +3,4 @@ export * from './http/auth.js'; export * from './http/integrations.js'; export * from './http/user.js'; export * from './http/signal.js'; +export * from './events/index.js'; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7492584..c97b708 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -162,6 +162,9 @@ importers: nanoid: specifier: ^5.1.0 version: 5.1.7 + nats: + specifier: ^2.29.3 + version: 2.29.3 node-fetch: specifier: ^3.3.2 version: 3.3.2 @@ -2322,6 +2325,10 @@ packages: napi-build-utils@2.0.0: resolution: {integrity: sha512-GEbrYkbfF7MoNaoh2iGG84Mnf/WZfB0GdGEsM8wz7Expx/LlWf5U8t9nvJKXSp3qr5IsEbK04cBGhol/KwOsWA==} + nats@2.29.3: + resolution: {integrity: sha512-tOQCRCwC74DgBTk4pWZ9V45sk4d7peoE2njVprMRCBXrhJ5q5cYM7i6W+Uvw2qUrcfOSnuisrX7bEx3b3Wx4QA==} + engines: {node: '>= 14.0.0'} + negotiator@0.6.3: resolution: {integrity: sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==} engines: {node: '>= 0.6'} @@ -2347,6 +2354,10 @@ packages: sass: optional: true + nkeys.js@1.1.0: + resolution: {integrity: sha512-tB/a0shZL5UZWSwsoeyqfTszONTt4k2YS0tuQioMOD180+MbombYVgzDUYHlx+gejYK6rgf08n/2Df99WY0Sxg==} + engines: {node: '>=10.0.0'} + node-abi@3.89.0: resolution: {integrity: sha512-6u9UwL0HlAl21+agMN3YAMXcKByMqwGx+pq+P76vii5f7hTPtKDp08/H9py6DY+cfDw7kQNTGEj/rly3IgbNQA==} engines: {node: '>=10'} @@ -2851,6 +2862,9 @@ packages: resolution: {integrity: sha512-+v2QJey7ZUeUiuigkU+uFfklvNUyPI2VO2vBpMYJA+a1hKFLFiKtUYlRHdb3P9CrAvMzi0upbjI4WT+zKtqkBg==} hasBin: true + tweetnacl@1.0.3: + resolution: {integrity: sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw==} + type-is@1.6.18: resolution: {integrity: sha512-TkRKr9sUTxEH8MdfuCSP7VizJyzRNMjj2J2do2Jr3Kym598JVdEksuzPQCnlFPW4ky9Q+iA+ma9BGm06XQBy8g==} engines: {node: '>= 0.6'} @@ -3856,7 +3870,7 @@ snapshots: obug: 2.1.1 std-env: 4.1.0 tinyrainbow: 3.1.0 - vitest: 4.1.4(@types/node@22.19.17)(@vitest/coverage-v8@4.1.4)(jsdom@29.0.2)(vite@8.0.8(@types/node@22.19.17)(esbuild@0.27.7)(jiti@1.21.7)(tsx@4.21.0)) + vitest: 4.1.4(@types/node@22.19.17)(@vitest/coverage-v8@4.1.4)(jsdom@29.0.2)(vite@8.0.8(@types/node@22.19.17)(esbuild@0.19.12)(jiti@1.21.7)(tsx@4.21.0)) '@vitest/expect@4.1.4': dependencies: @@ -4757,6 +4771,10 @@ snapshots: napi-build-utils@2.0.0: {} + nats@2.29.3: + dependencies: + nkeys.js: 1.1.0 + negotiator@0.6.3: {} next@15.5.15(@playwright/test@1.59.1)(react-dom@19.2.5(react@19.2.5))(react@19.2.5): @@ -4783,6 +4801,10 @@ snapshots: - '@babel/core' - babel-plugin-macros + nkeys.js@1.1.0: + dependencies: + tweetnacl: 1.0.3 + node-abi@3.89.0: dependencies: semver: 7.7.4 @@ -5372,6 +5394,8 @@ snapshots: '@turbo/windows-64': 2.9.6 '@turbo/windows-arm64': 2.9.6 + tweetnacl@1.0.3: {} + type-is@1.6.18: dependencies: media-typer: 0.3.0 diff --git a/services/api/package.json b/services/api/package.json index 2a1a1f0..64f0415 100644 --- a/services/api/package.json +++ b/services/api/package.json @@ -24,6 +24,7 @@ "express": "^4.21.2", "express-session": "^1.18.1", "nanoid": "^5.1.0", + "nats": "^2.29.3", "node-fetch": "^3.3.2", "openid-client": "^6.3.4", "web-push": "^3.6.7", diff --git a/services/api/src/config.ts b/services/api/src/config.ts index 5a46b01..c8561f9 100644 --- a/services/api/src/config.ts +++ b/services/api/src/config.ts @@ -37,4 +37,10 @@ export const config = { VAPID_PUBLIC_KEY: optional('VAPID_PUBLIC_KEY', ''), VAPID_PRIVATE_KEY: optional('VAPID_PRIVATE_KEY', ''), VAPID_SUBJECT: optional('VAPID_SUBJECT', 'mailto:admin@localhost'), + + /** NATS server URL — omit to run without JetStream (in-process bus only) */ + NATS_URL: process.env['NATS_URL'] ?? '', + + /** How often to proactively sync Todoist tasks in the background (ms) */ + TODOIST_SYNC_INTERVAL_MS: parseInt(optional('TODOIST_SYNC_INTERVAL_MS', String(15 * 60 * 1000)), 10), }; diff --git a/services/api/src/events/bus.ts b/services/api/src/events/bus.ts index 34fc755..3693212 100644 --- a/services/api/src/events/bus.ts +++ b/services/api/src/events/bus.ts @@ -69,6 +69,12 @@ const RING_SIZE = 500; class Bus extends EventEmitter { private ring: StoredEvent[] = []; private seq = 0; + private publishHooks: Array<(subject: string, payload: unknown) => void> = []; + + /** Register a side-effect hook called on every publish (e.g. NATS bridge) */ + onPublish(hook: (subject: string, payload: unknown) => void): void { + this.publishHooks.push(hook); + } publish(subject: K, payload: EventMap[K]): void { const entry: StoredEvent = { @@ -80,6 +86,7 @@ class Bus extends EventEmitter { if (this.ring.length >= RING_SIZE) this.ring.shift(); this.ring.push(entry); this.emit(subject, payload); + for (const hook of this.publishHooks) hook(subject, payload); } subscribe(subject: K, handler: (payload: EventMap[K]) => void): void { diff --git a/services/api/src/events/nats.ts b/services/api/src/events/nats.ts new file mode 100644 index 0000000..2d4e61d --- /dev/null +++ b/services/api/src/events/nats.ts @@ -0,0 +1,84 @@ +/** + * Optional NATS JetStream adapter. + * + * When NATS_URL is set: connects to NATS, creates the required streams on + * startup, and wraps the in-process Bus so every publish also goes to + * JetStream. Subscribers across processes (ml/serving, future services) + * consume from JetStream. + * + * When NATS_URL is not set: this module is a no-op and the in-process Bus + * works as before. + */ + +import type { NatsConnection, JetStreamClient, StreamConfig } from 'nats'; +import { bus } from './bus.js'; + +let nc: NatsConnection | null = null; +let js: JetStreamClient | null = null; + +// nats enums are string enums — import the values at runtime, not just types +async function getStreamConfigs(): Promise[]> { + const { StorageType: S, RetentionPolicy: R, DiscardPolicy: D } = await import('nats'); + return [ + { + name: 'signals', + subjects: ['signals.>'], + max_msgs: 500_000, + max_age: 7 * 24 * 60 * 60 * 1e9, // 7 days in nanoseconds + storage: S.File, + retention: R.Limits, + discard: D.Old, + num_replicas: 1, + duplicate_window: 120e9, + }, + { + name: 'feedback', + subjects: ['feedback.>'], + max_msgs: 200_000, + max_age: 30 * 24 * 60 * 60 * 1e9, + storage: S.File, + retention: R.Limits, + discard: D.Old, + num_replicas: 1, + duplicate_window: 120e9, + }, + ]; +} + +export async function connectNats(natsUrl: string): Promise { + try { + const { connect } = await import('nats'); + nc = await connect({ servers: natsUrl, reconnect: true, maxReconnectAttempts: -1 }); + js = nc.jetstream(); + + // Ensure streams exist (idempotent) + const STREAMS = await getStreamConfigs(); + const jsm = await nc.jetstreamManager(); + for (const cfg of STREAMS) { + try { + await jsm.streams.info(cfg.name!); + } catch { + await jsm.streams.add(cfg as StreamConfig); + } + } + + // Bridge: every in-process bus publish also goes to JetStream + bus.onPublish((subject, payload) => { + if (!js) return; + const data = new TextEncoder().encode(JSON.stringify(payload)); + js.publish(subject, data).catch((err: Error) => + console.error(`[nats] publish failed for ${subject}: ${err.message}`), + ); + }); + + console.log(`[nats] connected to ${natsUrl}, streams: ${STREAMS.map((s) => s.name).join(', ')}`); + } catch (err: any) { + console.warn(`[nats] connection failed — running without JetStream: ${err.message}`); + } +} + +export async function closeNats(): Promise { + await nc?.drain(); + nc = null; + js = null; +} diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 527dd91..e67c280 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -18,6 +18,8 @@ import { dirname } from 'path'; import { requireAuth } from './middleware/session.js'; import { requireAdmin } from './middleware/admin.js'; import type { Request, Response } from 'express'; +import { connectNats } from './events/nats.js'; +import { startTodoistSyncScheduler } from './signals/scheduler.js'; await mkdir(dirname(config.DATABASE_PATH), { recursive: true }); runMigrations(); @@ -88,3 +90,9 @@ setInterval(purgeExpiredData, 24 * 60 * 60 * 1000); app.listen(config.PORT, () => { console.log(`oO API listening on http://localhost:${config.PORT}`); }); + +if (config.NATS_URL) { + connectNats(config.NATS_URL); +} + +startTodoistSyncScheduler(config.TODOIST_SYNC_INTERVAL_MS); diff --git a/services/api/src/signals/scheduler.ts b/services/api/src/signals/scheduler.ts new file mode 100644 index 0000000..950865f --- /dev/null +++ b/services/api/src/signals/scheduler.ts @@ -0,0 +1,55 @@ +/** + * Todoist background sync scheduler (Issue #22). + * + * Periodically fetches Todoist tasks for every user with an active token so + * that signals are fresh before the next /recommend call. The on-demand fetch + * in TodoistSignalSource remains as the freshness-critical fallback — if a user + * hits /recommend and the cache is stale, it re-fetches inline. + * + * Interval: TODOIST_SYNC_INTERVAL_MS (default 15 min). + */ + +import { db } from '../db/index.js'; +import { integrationTokens } from '../db/schema.js'; +import { eq } from 'drizzle-orm'; +import { todoistSource } from './todoist.js'; + +const DEFAULT_INTERVAL_MS = 15 * 60 * 1000; + +export function startTodoistSyncScheduler(intervalMs = DEFAULT_INTERVAL_MS): NodeJS.Timeout { + async function syncAll(): Promise { + let users: { userId: string }[] = []; + try { + users = await db + .select({ userId: integrationTokens.userId }) + .from(integrationTokens) + .where(eq(integrationTokens.tokenStatus, 'active')); + } catch (err: any) { + console.error(`[scheduler] failed to query users: ${err.message}`); + return; + } + + if (!users.length) return; + + const results = await Promise.allSettled( + users.map((u) => todoistSource.fetchSignals(u.userId)), + ); + + let ok = 0; + let failed = 0; + for (const r of results) { + if (r.status === 'fulfilled') ok++; + else { failed++; console.error(`[scheduler] sync error:`, r.reason); } + } + + console.log(`[scheduler] todoist sync: ${ok} ok, ${failed} failed (${users.length} users)`); + } + + // Run once shortly after startup, then on interval + const delay = setTimeout(() => { + syncAll(); + setInterval(syncAll, intervalMs); + }, 10_000); + + return delay; +}