test: cover NATS bridge + Todoist scheduler; ADR-0010

- bus.test.ts: 4 cases for the new onPublish hook contract
- nats.test.ts: stream creation idempotency + JSON publish bridge
- scheduler.test.ts: startup delay, fan-out, per-user failure isolation
- ADR-0010 documents the bridge-don't-replace decision and the
  Todoist scheduler isolation, plus open follow-ups (#98 ml/serving
  consumer, #54 protobuf migration, graceful shutdown, metrics)
- README/overview/services README reflect the bridged event substrate
- CLAUDE.md gains a "don't nats.publish() directly" rule
- .env.example documents NATS_URL + TODOIST_SYNC_INTERVAL_MS

Verified in deployment 2026-04-18: api -> nats bridge connects on
boot, signals + feedback streams created, scheduler tick logs
"todoist sync: 1 ok, 0 failed (1 users)" within 10s. Closes #21, #22.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-18 07:55:25 +00:00
parent 2a7380933c
commit 5b52c6bf40
9 changed files with 414 additions and 6 deletions

View File

@@ -9,7 +9,7 @@ Backend modules. Each owns a contract and ships its own `README.md`. In **Phase
| `profile/` | user profile, preferences, consents | in-proc module | team ownership diverges |
| `integrations/` | connectors + encrypted token vault | in-proc module | credential blast-radius isolation |
| `recommender/` | `POST /recommend` — policy-driven tip selection | in-proc; calls `ml/serving` from M1 | scaling hotspot |
| `events/` | event bus + signal log | in-proc emitter (Phase 0); NATS (M1) | always a library + broker, not a service |
| `events/` | event bus + signal log | in-proc emitter; bridges to NATS JetStream when `NATS_URL` set (ADR-0010) | always a library + broker, not a service |
| `notifier/` | push/email delivery + quiet hours | in-proc; **web push in M1** | SLA divergence or mobile push scale |
Contracts that cross module lines (HTTP or events) come from `packages/shared-types/`. In-module imports across modules are forbidden by import lint.

View File

@@ -171,3 +171,51 @@ describe('EventBus — singleton bus export', () => {
expect(bus).toBeInstanceOf(Bus);
});
});
describe('EventBus — onPublish hook (NATS bridge contract)', () => {
it('invokes registered hook with subject + payload on every publish', () => {
const b = makeBus();
const hook = vi.fn();
b.onPublish(hook);
const payload = { userId: 'u', count: 2, syncedAt: 'now' };
b.publish('signals.task.synced', payload);
expect(hook).toHaveBeenCalledOnce();
expect(hook).toHaveBeenCalledWith('signals.task.synced', payload);
});
it('fires multiple hooks in registration order', () => {
const b = makeBus();
const calls: string[] = [];
b.onPublish(() => calls.push('a'));
b.onPublish(() => calls.push('b'));
b.publish('signals.task.synced', { userId: 'u', count: 0, syncedAt: '' });
expect(calls).toEqual(['a', 'b']);
});
it('hook fires alongside subscribers — both receive the publish', () => {
const b = makeBus();
const hook = vi.fn();
const sub = vi.fn();
b.onPublish(hook);
b.subscribe('signals.task.synced', sub);
b.publish('signals.task.synced', { userId: 'u', count: 1, syncedAt: '' });
expect(hook).toHaveBeenCalledOnce();
expect(sub).toHaveBeenCalledOnce();
});
it('a throwing hook still bubbles up — bus does not silently swallow', () => {
// Documents current behaviour: hooks run inside publish(), so a thrown
// error escapes. The NATS adapter therefore catches inside the hook.
const b = makeBus();
b.onPublish(() => {
throw new Error('boom');
});
expect(() =>
b.publish('signals.task.synced', { userId: 'u', count: 0, syncedAt: '' }),
).toThrow('boom');
});
});

View File

@@ -0,0 +1,162 @@
/**
* Tests for the NATS JetStream adapter (events/nats.ts).
*
* The real `nats` module is mocked so we can verify:
* - streams are created when missing, skipped when present
* - bus.onPublish() bridge encodes payload as JSON and calls js.publish
* - bridge errors are caught (do not crash the in-process publisher)
* - a connection failure logs a warning and leaves the bus working
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
// ── nats mock ────────────────────────────────────────────────────────────────
// Capture the most recent fakes so individual tests can inspect them.
let lastConnect: ReturnType<typeof vi.fn>;
let lastJsPublish: ReturnType<typeof vi.fn>;
let lastStreamsAdd: ReturnType<typeof vi.fn>;
let lastStreamsInfo: ReturnType<typeof vi.fn>;
let lastDrain: ReturnType<typeof vi.fn>;
vi.mock('nats', () => {
const StorageType = { File: 'file', Memory: 'memory' };
const RetentionPolicy = { Limits: 'limits', Workqueue: 'workqueue', Interest: 'interest' };
const DiscardPolicy = { Old: 'old', New: 'new' };
const connect = vi.fn(async (_opts: unknown) => {
lastJsPublish = vi.fn(async () => ({ seq: 1 }));
lastStreamsAdd = vi.fn(async () => ({}));
lastStreamsInfo = vi.fn(async (_name: string) => {
throw new Error('stream not found');
});
lastDrain = vi.fn(async () => {});
const jsm = { streams: { info: lastStreamsInfo, add: lastStreamsAdd } };
return {
jetstream: () => ({ publish: lastJsPublish }),
jetstreamManager: async () => jsm,
drain: lastDrain,
};
});
lastConnect = connect;
return { connect, StorageType, RetentionPolicy, DiscardPolicy };
});
import { Bus } from '../bus.js';
beforeEach(() => {
vi.resetModules();
vi.clearAllMocks();
});
afterEach(() => {
vi.restoreAllMocks();
});
describe('connectNats — happy path', () => {
it('creates both streams when neither exists', async () => {
// Re-import to get a fresh module state; nats is hoisted-mocked above.
const { connectNats } = await import('../nats.js');
await connectNats('nats://test:4222');
expect(lastConnect).toHaveBeenCalledWith(
expect.objectContaining({ servers: 'nats://test:4222', reconnect: true }),
);
expect(lastStreamsInfo).toHaveBeenCalledTimes(2);
expect(lastStreamsAdd).toHaveBeenCalledTimes(2);
const created = lastStreamsAdd.mock.calls.map((c) => (c[0] as any).name).sort();
expect(created).toEqual(['feedback', 'signals']);
const signalsCfg = lastStreamsAdd.mock.calls
.map((c) => c[0] as any)
.find((c) => c.name === 'signals');
expect(signalsCfg.subjects).toEqual(['signals.>']);
expect(signalsCfg.storage).toBe('file');
expect(signalsCfg.retention).toBe('limits');
});
it('skips stream creation when info() succeeds (idempotent)', async () => {
const { connectNats } = await import('../nats.js');
// Override info to succeed.
lastConnect.mockImplementationOnce(async () => {
lastJsPublish = vi.fn(async () => ({ seq: 1 }));
lastStreamsAdd = vi.fn(async () => ({}));
lastStreamsInfo = vi.fn(async (_name: string) => ({ config: { name: _name } }));
lastDrain = vi.fn(async () => {});
return {
jetstream: () => ({ publish: lastJsPublish }),
jetstreamManager: async () => ({
streams: { info: lastStreamsInfo, add: lastStreamsAdd },
}),
drain: lastDrain,
};
});
await connectNats('nats://test:4222');
expect(lastStreamsInfo).toHaveBeenCalledTimes(2);
expect(lastStreamsAdd).not.toHaveBeenCalled();
});
});
describe('connectNats — bridge bus → JetStream', () => {
it('publishes JSON-encoded payload on the same subject', async () => {
const { connectNats } = await import('../nats.js');
const { bus } = await import('../bus.js');
await connectNats('nats://test:4222');
const payload = { userId: 'u1', count: 7, syncedAt: '2026-01-01T00:00:00Z' };
bus.publish('signals.task.synced', payload);
// Allow the queued microtask in the hook to flush.
await Promise.resolve();
expect(lastJsPublish).toHaveBeenCalledTimes(1);
const [subject, data] = lastJsPublish.mock.calls[0];
expect(subject).toBe('signals.task.synced');
const decoded = JSON.parse(new TextDecoder().decode(data as Uint8Array));
expect(decoded).toEqual(payload);
});
it('swallows JetStream publish errors so the in-process bus keeps working', async () => {
const { connectNats } = await import('../nats.js');
const { bus } = await import('../bus.js');
await connectNats('nats://test:4222');
// Force the next js.publish to reject.
lastJsPublish.mockRejectedValueOnce(new Error('jetstream down'));
const errSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
expect(() =>
bus.publish('signals.task.synced', { userId: 'u', count: 0, syncedAt: '' }),
).not.toThrow();
// Wait a tick for the rejected promise's catch to run.
await new Promise((r) => setTimeout(r, 0));
expect(errSpy).toHaveBeenCalled();
});
});
describe('connectNats — failure mode', () => {
it('logs a warning and stays silent when connect rejects', async () => {
const { connectNats } = await import('../nats.js');
lastConnect.mockRejectedValueOnce(new Error('ECONNREFUSED'));
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
await expect(connectNats('nats://nope:4222')).resolves.toBeUndefined();
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('connection failed'));
});
});
describe('Bus.onPublish contract — used by NATS bridge', () => {
it('a fresh Bus accepts and fires onPublish hooks (smoke check)', () => {
const b = new Bus();
const hook = vi.fn();
b.onPublish(hook);
b.publish('signals.task.synced', { userId: 'u', count: 0, syncedAt: '' });
expect(hook).toHaveBeenCalledOnce();
});
});

View File

@@ -0,0 +1,130 @@
/**
* Tests for the Todoist background sync scheduler (signals/scheduler.ts).
*
* The scheduler queries every user with an active integration token and calls
* todoistSource.fetchSignals(userId) for each. We mock the db and todoistSource
* so no network or SQLite is touched; vi.useFakeTimers() drives the scheduler
* deterministically.
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
// ── mock the drizzle query chain: db.select(...).from(...).where(...) ────────
let users: { userId: string }[] = [];
const whereMock = vi.fn(async () => users);
const fromMock = vi.fn(() => ({ where: whereMock }));
const selectMock = vi.fn(() => ({ from: fromMock }));
vi.mock('../../db/index.js', () => ({ db: { select: selectMock } }));
// integrationTokens stub — the scheduler only references column identities
vi.mock('../../db/schema.js', () => ({
integrationTokens: { userId: { name: 'user_id' }, tokenStatus: { name: 'token_status' } },
}));
// drizzle-orm.eq stub — the scheduler only uses it as a predicate marker
vi.mock('drizzle-orm', () => ({ eq: (a: unknown, b: unknown) => ({ a, b }) }));
// todoistSource.fetchSignals — replaceable per test
const fetchSignalsMock = vi.fn(async (_userId: string) => []);
vi.mock('../todoist.js', () => ({
todoistSource: { fetchSignals: fetchSignalsMock },
}));
beforeEach(() => {
users = [];
fetchSignalsMock.mockReset().mockResolvedValue([]);
whereMock.mockClear();
fromMock.mockClear();
selectMock.mockClear();
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
describe('startTodoistSyncScheduler', () => {
it('returns a Timeout handle (so callers can clearTimeout it on shutdown)', async () => {
const { startTodoistSyncScheduler } = await import('../scheduler.js');
const handle = startTodoistSyncScheduler(60_000);
expect(handle).toBeDefined();
clearTimeout(handle);
});
it('does not call fetchSignals before the 10s startup delay elapses', async () => {
const { startTodoistSyncScheduler } = await import('../scheduler.js');
users = [{ userId: 'u1' }];
startTodoistSyncScheduler(60_000);
await vi.advanceTimersByTimeAsync(5_000);
expect(fetchSignalsMock).not.toHaveBeenCalled();
});
it('after the 10s delay, calls fetchSignals once per active-token user', async () => {
const { startTodoistSyncScheduler } = await import('../scheduler.js');
users = [{ userId: 'alice' }, { userId: 'bob' }, { userId: 'carol' }];
startTodoistSyncScheduler(60_000);
await vi.advanceTimersByTimeAsync(10_001);
expect(fetchSignalsMock).toHaveBeenCalledTimes(3);
const ids = fetchSignalsMock.mock.calls.map((c) => c[0]).sort();
expect(ids).toEqual(['alice', 'bob', 'carol']);
});
it('runs on the given interval after the first tick', async () => {
const { startTodoistSyncScheduler } = await import('../scheduler.js');
users = [{ userId: 'u1' }];
startTodoistSyncScheduler(30_000);
await vi.advanceTimersByTimeAsync(10_001); // first run
expect(fetchSignalsMock).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(30_000); // second run
expect(fetchSignalsMock).toHaveBeenCalledTimes(2);
await vi.advanceTimersByTimeAsync(30_000); // third run
expect(fetchSignalsMock).toHaveBeenCalledTimes(3);
});
it('no-ops when there are no active-token users', async () => {
const { startTodoistSyncScheduler } = await import('../scheduler.js');
users = [];
startTodoistSyncScheduler(60_000);
await vi.advanceTimersByTimeAsync(10_001);
expect(fetchSignalsMock).not.toHaveBeenCalled();
});
it('isolates per-user failures — one rejection does not stop the others', async () => {
const { startTodoistSyncScheduler } = await import('../scheduler.js');
users = [{ userId: 'good' }, { userId: 'bad' }, { userId: 'also-good' }];
fetchSignalsMock.mockImplementation(async (id: string) => {
if (id === 'bad') throw new Error('todoist 401');
return [];
});
const errSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
const logSpy = vi.spyOn(console, 'log').mockImplementation(() => {});
startTodoistSyncScheduler(60_000);
await vi.advanceTimersByTimeAsync(10_001);
// allow the awaited Promise.allSettled to settle
await Promise.resolve();
await Promise.resolve();
expect(fetchSignalsMock).toHaveBeenCalledTimes(3);
expect(errSpy).toHaveBeenCalledWith(expect.stringContaining('sync error'), expect.anything());
expect(logSpy).toHaveBeenCalledWith(expect.stringContaining('2 ok, 1 failed'));
});
it('survives a db query failure — logs and skips the tick', async () => {
const { startTodoistSyncScheduler } = await import('../scheduler.js');
whereMock.mockRejectedValueOnce(new Error('sqlite locked'));
const errSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
startTodoistSyncScheduler(60_000);
await vi.advanceTimersByTimeAsync(10_001);
expect(fetchSignalsMock).not.toHaveBeenCalled();
expect(errSpy).toHaveBeenCalledWith(expect.stringContaining('failed to query users'));
});
});