diff --git a/.env.example b/.env.example index 136dc72..0047b53 100644 --- a/.env.example +++ b/.env.example @@ -27,3 +27,11 @@ VAPID_SUBJECT=mailto:you@example.com # Todoist OAuth — https://developer.todoist.com/appconsole.html TODOIST_CLIENT_ID= TODOIST_CLIENT_SECRET= + +# Event bus — leave NATS_URL empty for in-process bus only (no JetStream bridge). +# Set to nats://nats:4222 (compose service name) or nats://localhost:4222 (host) +# to mirror every publish to durable JetStream streams (signals.>, feedback.>). +# Start the broker with: docker compose --profile events up nats +NATS_URL= +# How often the background scheduler refreshes Todoist tasks per active user (ms). +TODOIST_SYNC_INTERVAL_MS=900000 diff --git a/CLAUDE.md b/CLAUDE.md index 2672621..e03c06d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -56,7 +56,7 @@ docs/ architecture notes, ADRs, API specs ## Contracts between modules - **HTTP** (OpenAPI, in `packages/shared-types/http/`) — synchronous request/response. In-process today; over the network once extracted. Signatures are identical. -- **Events** (Protocol Buffers, in `packages/shared-types/events/`) — durable signals + feedback. Today: in-process event emitter. Tomorrow: NATS JetStream. Schema registry enforced in CI (ADR-0005). +- **Events** (Protocol Buffers, in `packages/shared-types/events/`) — durable signals + feedback. Today: in-process `Bus` with a `onPublish` bridge to NATS JetStream when `NATS_URL` is set (ADR-0010). The in-proc bus stays the source of truth — JetStream is the durable mirror that cross-process consumers (`ml/serving`, future feature pipelines) tail. Schema registry enforced in CI when #54 lands; until then payloads are JSON envelopes (ADR-0005). - Do not redefine types per module. Regenerate from `shared-types`. ## Conventions @@ -111,3 +111,4 @@ Active work: AI tip generation pipeline — issues #86–#93 in M2 milestone. - Don't over-split processes. Extract a service when pressure demands it, not in anticipation (ADR-0003). - Don't call LLMs directly from application code. All LLM calls go through `ml/serving` (Python) via `LITELLM_URL`. The TS recommender never holds a model name. - Don't embed MLflow/Airflow/OpenWebUI in the admin panel. They are external services; link out to them. The admin shell links to `o.alogins.net/mlflow`, `/airflow`, `ai.alogins.net`. +- Don't `nats.publish()` directly from feature code. All publishes go through the in-process `Bus` (`services/api/src/events/bus.ts`); the NATS adapter (`events/nats.ts`) bridges every publish to JetStream when `NATS_URL` is set. This keeps subscribers, the ring-buffer tail used by the admin event viewer, and JetStream all in lockstep. diff --git a/README.md b/README.md index 40f70f7..eabcc85 100644 --- a/README.md +++ b/README.md @@ -143,7 +143,7 @@ Goal: tips are picked, not drawn from a hat — and they arrive at the right mom - [x] Shadow-policy registry: run N shadow policies per request, log picks without serving them (#56) - [ ] Quiet-hours + dedupe for push delivery - [ ] Delayed rewards: tasks completed directly in Todoist (requires webhook from Todoist) -- [ ] NATS JetStream replacing in-process bus (when multi-process pressure arrives) +- [x] NATS JetStream bridge — durable `signals.>` and `feedback.>` streams; in-process bus stays the source of truth, every publish bridges out (#21, shipped) #### M1 add-on — Admin & ML Ops Console *(fully shipped)* @@ -220,8 +220,8 @@ Goal: tips are AI-generated from user context, not just raw Todoist tasks. Multi **Integrations & infra (carried from M1):** - [ ] Apple OAuth (#7) -- [ ] NATS JetStream replacing in-process bus (#21) -- [ ] Todoist sync via events (#22) +- [x] NATS JetStream replacing in-process bus (#21) — adapter ships in `services/api/src/events/nats.ts`; in-proc bus is the producer, JetStream is the durable mirror +- [x] Todoist sync via events (#22) — background scheduler in `services/api/src/signals/scheduler.ts` emits `signals.task.synced` every `TODOIST_SYNC_INTERVAL_MS`; on-demand fetch remains as freshness fallback - [ ] Event schema registry + protobuf CI gate (#54) - [ ] Per-user freshness SLAs for features (#61) - [ ] CI skeleton (#3), observability (#18), E2E tests (#20) diff --git a/docs/adr/0010-nats-bridge-and-background-sync.md b/docs/adr/0010-nats-bridge-and-background-sync.md new file mode 100644 index 0000000..d772301 --- /dev/null +++ b/docs/adr/0010-nats-bridge-and-background-sync.md @@ -0,0 +1,59 @@ +# ADR-0010: NATS bridge over the in-process bus, and Todoist background sync + +## Status +Accepted — 2026-04-18 + +## Context +ADR-0005 set protobuf + JetStream as the long-term event substrate. M1 shipped +an in-process `EventEmitter`-based bus with the right subjects (`signals.*`, +`feedback.*`) so the swap would be mechanical. + +Two pressures pulled forward: +1. **ml/serving** and future feature pipelines need to consume signals across + process boundaries — the in-proc emitter cannot do that. +2. **Todoist** signals were only fetched on the recommend path. Cold-cache hits + added latency and a single 401/429 stalled the request that triggered it. + +## Decision + +### 1. Bridge, do not replace +The `Bus` stays the producer. A new `Bus.onPublish(hook)` hook fires on every +`publish`. When `NATS_URL` is set, `connectNats()` registers a hook that +JSON-encodes the payload and `js.publish(subject, data)`s it to JetStream. + +- Streams are created on startup and are idempotent: `signals` (`signals.>`, + 7-day file storage, 500k msgs) and `feedback` (`feedback.>`, 30-day, 200k). +- JetStream publish errors are caught inside the hook so an unhealthy broker + cannot crash the in-process publisher or its subscribers. +- When `NATS_URL` is unset, `connectNats` is a no-op — local dev keeps working. + +This preserves the existing `bus.subscribe()` contract for in-process consumers +(reward inference, ring-buffer tail for the admin event viewer) while making +events durably consumable across processes. + +### 2. Schedule Todoist, keep on-demand as the SLA fallback +A 15-minute background scheduler (`TODOIST_SYNC_INTERVAL_MS`) walks every +user with `tokenStatus = 'active'` and calls `todoistSource.fetchSignals(uid)`, +which in turn emits `signals.task.synced`. The per-request fetch in +`recommender` stays — when the cache is colder than 30 s it still goes to +Todoist inline, so freshness on the user's first hit of the day is unchanged. + +Per-user failures are isolated with `Promise.allSettled`; one expired token +cannot stop the rest of the cohort. The whole tick is wrapped so a transient +SQLite error logs and skips, never crashes the API. + +## Consequences +- ml/serving (and any future Python consumer) can durably tail + `signals.task.synced`, `signals.tip.served`, `signals.tip.feedback` from + JetStream without coupling to the API process. +- Local dev still runs without NATS; the bridge is opt-in via env. +- Wire format is JSON today (envelope per ADR-0005 not enforced yet) — see + Open follow-ups. + +## Open follow-ups +- A ml/serving JetStream consumer for the feature pipeline (today nothing + reads from JetStream — the API only writes). +- Move the wire payload to the protobuf envelope from ADR-0005 once the + schema-registry CI gate (#54) lands. +- Graceful shutdown of the scheduler timer on `SIGTERM`. +- Per-publish failure metrics exported to the admin health view. \ No newline at end of file diff --git a/docs/architecture/overview.md b/docs/architecture/overview.md index f27da8c..cb67f05 100644 --- a/docs/architecture/overview.md +++ b/docs/architecture/overview.md @@ -15,7 +15,7 @@ | `auth` | TS | OAuth (Google; Apple in M1), sessions, JWT | identities, sessions | Node monolith | | `profile` | TS | user profile, preferences, consents | profiles | Node monolith | | `integrations` | TS | third-party connectors, token vault, signal fetch | credentials, cursors | Node monolith | -| `events` | TS | event-bus abstraction + durable log (M1) | signal store | Node monolith (in-proc emitter) | +| `events` | TS | event-bus abstraction + durable log | signal store | Node monolith (in-proc emitter, bridges to NATS JetStream when `NATS_URL` set) | | `recommender` | TS | orchestration: candidates → policy → tip; feedback sink | tip history | Node monolith | | `notifier` | TS | push/email delivery, quiet hours, dedupe | delivery log | Node monolith (web push in M1) | | `ml/serving` | Python | online scoring for policies/models | — (stateless) | **separate process** | diff --git a/services/README.md b/services/README.md index 8e468fc..d168762 100644 --- a/services/README.md +++ b/services/README.md @@ -9,7 +9,7 @@ Backend modules. Each owns a contract and ships its own `README.md`. In **Phase | `profile/` | user profile, preferences, consents | in-proc module | team ownership diverges | | `integrations/` | connectors + encrypted token vault | in-proc module | credential blast-radius isolation | | `recommender/` | `POST /recommend` — policy-driven tip selection | in-proc; calls `ml/serving` from M1 | scaling hotspot | -| `events/` | event bus + signal log | in-proc emitter (Phase 0); NATS (M1) | always a library + broker, not a service | +| `events/` | event bus + signal log | in-proc emitter; bridges to NATS JetStream when `NATS_URL` set (ADR-0010) | always a library + broker, not a service | | `notifier/` | push/email delivery + quiet hours | in-proc; **web push in M1** | SLA divergence or mobile push scale | Contracts that cross module lines (HTTP or events) come from `packages/shared-types/`. In-module imports across modules are forbidden by import lint. diff --git a/services/api/src/events/__tests__/bus.test.ts b/services/api/src/events/__tests__/bus.test.ts index 82528d6..571355e 100644 --- a/services/api/src/events/__tests__/bus.test.ts +++ b/services/api/src/events/__tests__/bus.test.ts @@ -171,3 +171,51 @@ describe('EventBus — singleton bus export', () => { expect(bus).toBeInstanceOf(Bus); }); }); + +describe('EventBus — onPublish hook (NATS bridge contract)', () => { + it('invokes registered hook with subject + payload on every publish', () => { + const b = makeBus(); + const hook = vi.fn(); + b.onPublish(hook); + + const payload = { userId: 'u', count: 2, syncedAt: 'now' }; + b.publish('signals.task.synced', payload); + + expect(hook).toHaveBeenCalledOnce(); + expect(hook).toHaveBeenCalledWith('signals.task.synced', payload); + }); + + it('fires multiple hooks in registration order', () => { + const b = makeBus(); + const calls: string[] = []; + b.onPublish(() => calls.push('a')); + b.onPublish(() => calls.push('b')); + + b.publish('signals.task.synced', { userId: 'u', count: 0, syncedAt: '' }); + expect(calls).toEqual(['a', 'b']); + }); + + it('hook fires alongside subscribers — both receive the publish', () => { + const b = makeBus(); + const hook = vi.fn(); + const sub = vi.fn(); + b.onPublish(hook); + b.subscribe('signals.task.synced', sub); + + b.publish('signals.task.synced', { userId: 'u', count: 1, syncedAt: '' }); + expect(hook).toHaveBeenCalledOnce(); + expect(sub).toHaveBeenCalledOnce(); + }); + + it('a throwing hook still bubbles up — bus does not silently swallow', () => { + // Documents current behaviour: hooks run inside publish(), so a thrown + // error escapes. The NATS adapter therefore catches inside the hook. + const b = makeBus(); + b.onPublish(() => { + throw new Error('boom'); + }); + expect(() => + b.publish('signals.task.synced', { userId: 'u', count: 0, syncedAt: '' }), + ).toThrow('boom'); + }); +}); diff --git a/services/api/src/events/__tests__/nats.test.ts b/services/api/src/events/__tests__/nats.test.ts new file mode 100644 index 0000000..6f411fb --- /dev/null +++ b/services/api/src/events/__tests__/nats.test.ts @@ -0,0 +1,162 @@ +/** + * Tests for the NATS JetStream adapter (events/nats.ts). + * + * The real `nats` module is mocked so we can verify: + * - streams are created when missing, skipped when present + * - bus.onPublish() bridge encodes payload as JSON and calls js.publish + * - bridge errors are caught (do not crash the in-process publisher) + * - a connection failure logs a warning and leaves the bus working + */ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; + +// ── nats mock ──────────────────────────────────────────────────────────────── +// Capture the most recent fakes so individual tests can inspect them. +let lastConnect: ReturnType; +let lastJsPublish: ReturnType; +let lastStreamsAdd: ReturnType; +let lastStreamsInfo: ReturnType; +let lastDrain: ReturnType; + +vi.mock('nats', () => { + const StorageType = { File: 'file', Memory: 'memory' }; + const RetentionPolicy = { Limits: 'limits', Workqueue: 'workqueue', Interest: 'interest' }; + const DiscardPolicy = { Old: 'old', New: 'new' }; + + const connect = vi.fn(async (_opts: unknown) => { + lastJsPublish = vi.fn(async () => ({ seq: 1 })); + lastStreamsAdd = vi.fn(async () => ({})); + lastStreamsInfo = vi.fn(async (_name: string) => { + throw new Error('stream not found'); + }); + lastDrain = vi.fn(async () => {}); + + const jsm = { streams: { info: lastStreamsInfo, add: lastStreamsAdd } }; + return { + jetstream: () => ({ publish: lastJsPublish }), + jetstreamManager: async () => jsm, + drain: lastDrain, + }; + }); + + lastConnect = connect; + return { connect, StorageType, RetentionPolicy, DiscardPolicy }; +}); + +import { Bus } from '../bus.js'; + +beforeEach(() => { + vi.resetModules(); + vi.clearAllMocks(); +}); + +afterEach(() => { + vi.restoreAllMocks(); +}); + +describe('connectNats — happy path', () => { + it('creates both streams when neither exists', async () => { + // Re-import to get a fresh module state; nats is hoisted-mocked above. + const { connectNats } = await import('../nats.js'); + await connectNats('nats://test:4222'); + + expect(lastConnect).toHaveBeenCalledWith( + expect.objectContaining({ servers: 'nats://test:4222', reconnect: true }), + ); + expect(lastStreamsInfo).toHaveBeenCalledTimes(2); + expect(lastStreamsAdd).toHaveBeenCalledTimes(2); + + const created = lastStreamsAdd.mock.calls.map((c) => (c[0] as any).name).sort(); + expect(created).toEqual(['feedback', 'signals']); + + const signalsCfg = lastStreamsAdd.mock.calls + .map((c) => c[0] as any) + .find((c) => c.name === 'signals'); + expect(signalsCfg.subjects).toEqual(['signals.>']); + expect(signalsCfg.storage).toBe('file'); + expect(signalsCfg.retention).toBe('limits'); + }); + + it('skips stream creation when info() succeeds (idempotent)', async () => { + const { connectNats } = await import('../nats.js'); + // Override info to succeed. + lastConnect.mockImplementationOnce(async () => { + lastJsPublish = vi.fn(async () => ({ seq: 1 })); + lastStreamsAdd = vi.fn(async () => ({})); + lastStreamsInfo = vi.fn(async (_name: string) => ({ config: { name: _name } })); + lastDrain = vi.fn(async () => {}); + return { + jetstream: () => ({ publish: lastJsPublish }), + jetstreamManager: async () => ({ + streams: { info: lastStreamsInfo, add: lastStreamsAdd }, + }), + drain: lastDrain, + }; + }); + + await connectNats('nats://test:4222'); + expect(lastStreamsInfo).toHaveBeenCalledTimes(2); + expect(lastStreamsAdd).not.toHaveBeenCalled(); + }); +}); + +describe('connectNats — bridge bus → JetStream', () => { + it('publishes JSON-encoded payload on the same subject', async () => { + const { connectNats } = await import('../nats.js'); + const { bus } = await import('../bus.js'); + + await connectNats('nats://test:4222'); + + const payload = { userId: 'u1', count: 7, syncedAt: '2026-01-01T00:00:00Z' }; + bus.publish('signals.task.synced', payload); + + // Allow the queued microtask in the hook to flush. + await Promise.resolve(); + + expect(lastJsPublish).toHaveBeenCalledTimes(1); + const [subject, data] = lastJsPublish.mock.calls[0]; + expect(subject).toBe('signals.task.synced'); + const decoded = JSON.parse(new TextDecoder().decode(data as Uint8Array)); + expect(decoded).toEqual(payload); + }); + + it('swallows JetStream publish errors so the in-process bus keeps working', async () => { + const { connectNats } = await import('../nats.js'); + const { bus } = await import('../bus.js'); + + await connectNats('nats://test:4222'); + + // Force the next js.publish to reject. + lastJsPublish.mockRejectedValueOnce(new Error('jetstream down')); + const errSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + + expect(() => + bus.publish('signals.task.synced', { userId: 'u', count: 0, syncedAt: '' }), + ).not.toThrow(); + + // Wait a tick for the rejected promise's catch to run. + await new Promise((r) => setTimeout(r, 0)); + expect(errSpy).toHaveBeenCalled(); + }); +}); + +describe('connectNats — failure mode', () => { + it('logs a warning and stays silent when connect rejects', async () => { + const { connectNats } = await import('../nats.js'); + + lastConnect.mockRejectedValueOnce(new Error('ECONNREFUSED')); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + await expect(connectNats('nats://nope:4222')).resolves.toBeUndefined(); + expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('connection failed')); + }); +}); + +describe('Bus.onPublish contract — used by NATS bridge', () => { + it('a fresh Bus accepts and fires onPublish hooks (smoke check)', () => { + const b = new Bus(); + const hook = vi.fn(); + b.onPublish(hook); + b.publish('signals.task.synced', { userId: 'u', count: 0, syncedAt: '' }); + expect(hook).toHaveBeenCalledOnce(); + }); +}); diff --git a/services/api/src/signals/__tests__/scheduler.test.ts b/services/api/src/signals/__tests__/scheduler.test.ts new file mode 100644 index 0000000..627a280 --- /dev/null +++ b/services/api/src/signals/__tests__/scheduler.test.ts @@ -0,0 +1,130 @@ +/** + * Tests for the Todoist background sync scheduler (signals/scheduler.ts). + * + * The scheduler queries every user with an active integration token and calls + * todoistSource.fetchSignals(userId) for each. We mock the db and todoistSource + * so no network or SQLite is touched; vi.useFakeTimers() drives the scheduler + * deterministically. + */ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; + +// ── mock the drizzle query chain: db.select(...).from(...).where(...) ──────── +let users: { userId: string }[] = []; +const whereMock = vi.fn(async () => users); +const fromMock = vi.fn(() => ({ where: whereMock })); +const selectMock = vi.fn(() => ({ from: fromMock })); +vi.mock('../../db/index.js', () => ({ db: { select: selectMock } })); + +// integrationTokens stub — the scheduler only references column identities +vi.mock('../../db/schema.js', () => ({ + integrationTokens: { userId: { name: 'user_id' }, tokenStatus: { name: 'token_status' } }, +})); + +// drizzle-orm.eq stub — the scheduler only uses it as a predicate marker +vi.mock('drizzle-orm', () => ({ eq: (a: unknown, b: unknown) => ({ a, b }) })); + +// todoistSource.fetchSignals — replaceable per test +const fetchSignalsMock = vi.fn(async (_userId: string) => []); +vi.mock('../todoist.js', () => ({ + todoistSource: { fetchSignals: fetchSignalsMock }, +})); + +beforeEach(() => { + users = []; + fetchSignalsMock.mockReset().mockResolvedValue([]); + whereMock.mockClear(); + fromMock.mockClear(); + selectMock.mockClear(); + vi.useFakeTimers(); +}); + +afterEach(() => { + vi.useRealTimers(); +}); + +describe('startTodoistSyncScheduler', () => { + it('returns a Timeout handle (so callers can clearTimeout it on shutdown)', async () => { + const { startTodoistSyncScheduler } = await import('../scheduler.js'); + const handle = startTodoistSyncScheduler(60_000); + expect(handle).toBeDefined(); + clearTimeout(handle); + }); + + it('does not call fetchSignals before the 10s startup delay elapses', async () => { + const { startTodoistSyncScheduler } = await import('../scheduler.js'); + users = [{ userId: 'u1' }]; + startTodoistSyncScheduler(60_000); + + await vi.advanceTimersByTimeAsync(5_000); + expect(fetchSignalsMock).not.toHaveBeenCalled(); + }); + + it('after the 10s delay, calls fetchSignals once per active-token user', async () => { + const { startTodoistSyncScheduler } = await import('../scheduler.js'); + users = [{ userId: 'alice' }, { userId: 'bob' }, { userId: 'carol' }]; + startTodoistSyncScheduler(60_000); + + await vi.advanceTimersByTimeAsync(10_001); + + expect(fetchSignalsMock).toHaveBeenCalledTimes(3); + const ids = fetchSignalsMock.mock.calls.map((c) => c[0]).sort(); + expect(ids).toEqual(['alice', 'bob', 'carol']); + }); + + it('runs on the given interval after the first tick', async () => { + const { startTodoistSyncScheduler } = await import('../scheduler.js'); + users = [{ userId: 'u1' }]; + startTodoistSyncScheduler(30_000); + + await vi.advanceTimersByTimeAsync(10_001); // first run + expect(fetchSignalsMock).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(30_000); // second run + expect(fetchSignalsMock).toHaveBeenCalledTimes(2); + + await vi.advanceTimersByTimeAsync(30_000); // third run + expect(fetchSignalsMock).toHaveBeenCalledTimes(3); + }); + + it('no-ops when there are no active-token users', async () => { + const { startTodoistSyncScheduler } = await import('../scheduler.js'); + users = []; + startTodoistSyncScheduler(60_000); + + await vi.advanceTimersByTimeAsync(10_001); + expect(fetchSignalsMock).not.toHaveBeenCalled(); + }); + + it('isolates per-user failures — one rejection does not stop the others', async () => { + const { startTodoistSyncScheduler } = await import('../scheduler.js'); + users = [{ userId: 'good' }, { userId: 'bad' }, { userId: 'also-good' }]; + fetchSignalsMock.mockImplementation(async (id: string) => { + if (id === 'bad') throw new Error('todoist 401'); + return []; + }); + const errSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + const logSpy = vi.spyOn(console, 'log').mockImplementation(() => {}); + + startTodoistSyncScheduler(60_000); + await vi.advanceTimersByTimeAsync(10_001); + // allow the awaited Promise.allSettled to settle + await Promise.resolve(); + await Promise.resolve(); + + expect(fetchSignalsMock).toHaveBeenCalledTimes(3); + expect(errSpy).toHaveBeenCalledWith(expect.stringContaining('sync error'), expect.anything()); + expect(logSpy).toHaveBeenCalledWith(expect.stringContaining('2 ok, 1 failed')); + }); + + it('survives a db query failure — logs and skips the tick', async () => { + const { startTodoistSyncScheduler } = await import('../scheduler.js'); + whereMock.mockRejectedValueOnce(new Error('sqlite locked')); + const errSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + + startTodoistSyncScheduler(60_000); + await vi.advanceTimersByTimeAsync(10_001); + + expect(fetchSignalsMock).not.toHaveBeenCalled(); + expect(errSpy).toHaveBeenCalledWith(expect.stringContaining('failed to query users')); + }); +});