diff --git a/services/api/src/events/__tests__/bus.test.ts b/services/api/src/events/__tests__/bus.test.ts index 571355e..d9e6660 100644 --- a/services/api/src/events/__tests__/bus.test.ts +++ b/services/api/src/events/__tests__/bus.test.ts @@ -56,7 +56,7 @@ describe('EventBus — delivery', () => { it('does not throw when publishing with no subscribers', () => { const b = makeBus(); expect(() => - b.publish('signals.task.synced', { userId: 'u', count: 3, syncedAt: '' }), + b.publish('signals.task.synced', { userId: 'u', source: 'todoist', count: 3, syncedAt: '' }), ).not.toThrow(); }); @@ -101,7 +101,7 @@ describe('EventBus — ring buffer / tail()', () => { it('tail() filters by subject prefix', () => { const b = makeBus(); b.publish('signals.tip.served', { userId: 'u', tipId: 't', policy: 'p', servedAt: '' }); - b.publish('signals.task.synced', { userId: 'u', count: 1, syncedAt: '' }); + b.publish('signals.task.synced', { userId: 'u', source: 'todoist', count: 1, syncedAt: '' }); const tipEvents = b.tail({ subject: 'signals.tip' }); expect(tipEvents.every((e) => e.subject.startsWith('signals.tip'))).toBe(true); @@ -178,7 +178,7 @@ describe('EventBus — onPublish hook (NATS bridge contract)', () => { const hook = vi.fn(); b.onPublish(hook); - const payload = { userId: 'u', count: 2, syncedAt: 'now' }; + const payload = { userId: 'u', source: 'todoist', count: 2, syncedAt: 'now' }; b.publish('signals.task.synced', payload); expect(hook).toHaveBeenCalledOnce(); @@ -191,7 +191,7 @@ describe('EventBus — onPublish hook (NATS bridge contract)', () => { b.onPublish(() => calls.push('a')); b.onPublish(() => calls.push('b')); - b.publish('signals.task.synced', { userId: 'u', count: 0, syncedAt: '' }); + b.publish('signals.task.synced', { userId: 'u', source: 'todoist', count: 0, syncedAt: '' }); expect(calls).toEqual(['a', 'b']); }); @@ -202,7 +202,7 @@ describe('EventBus — onPublish hook (NATS bridge contract)', () => { b.onPublish(hook); b.subscribe('signals.task.synced', sub); - b.publish('signals.task.synced', { userId: 'u', count: 1, syncedAt: '' }); + b.publish('signals.task.synced', { userId: 'u', source: 'todoist', count: 1, syncedAt: '' }); expect(hook).toHaveBeenCalledOnce(); expect(sub).toHaveBeenCalledOnce(); }); @@ -215,7 +215,7 @@ describe('EventBus — onPublish hook (NATS bridge contract)', () => { throw new Error('boom'); }); expect(() => - b.publish('signals.task.synced', { userId: 'u', count: 0, syncedAt: '' }), + b.publish('signals.task.synced', { userId: 'u', source: 'todoist', count: 0, syncedAt: '' }), ).toThrow('boom'); }); }); diff --git a/services/api/src/events/__tests__/nats.test.ts b/services/api/src/events/__tests__/nats.test.ts index 6f411fb..0b287b2 100644 --- a/services/api/src/events/__tests__/nats.test.ts +++ b/services/api/src/events/__tests__/nats.test.ts @@ -106,7 +106,7 @@ describe('connectNats — bridge bus → JetStream', () => { await connectNats('nats://test:4222'); - const payload = { userId: 'u1', count: 7, syncedAt: '2026-01-01T00:00:00Z' }; + const payload = { userId: 'u1', source: 'todoist', count: 7, syncedAt: '2026-01-01T00:00:00Z' }; bus.publish('signals.task.synced', payload); // Allow the queued microtask in the hook to flush. @@ -130,7 +130,7 @@ describe('connectNats — bridge bus → JetStream', () => { const errSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); expect(() => - bus.publish('signals.task.synced', { userId: 'u', count: 0, syncedAt: '' }), + bus.publish('signals.task.synced', { userId: 'u', source: 'todoist', count: 0, syncedAt: '' }), ).not.toThrow(); // Wait a tick for the rejected promise's catch to run. @@ -156,7 +156,7 @@ describe('Bus.onPublish contract — used by NATS bridge', () => { const b = new Bus(); const hook = vi.fn(); b.onPublish(hook); - b.publish('signals.task.synced', { userId: 'u', count: 0, syncedAt: '' }); + b.publish('signals.task.synced', { userId: 'u', source: 'todoist', count: 0, syncedAt: '' }); expect(hook).toHaveBeenCalledOnce(); }); }); diff --git a/services/api/src/events/bus.ts b/services/api/src/events/bus.ts index 3693212..798fd26 100644 --- a/services/api/src/events/bus.ts +++ b/services/api/src/events/bus.ts @@ -45,6 +45,7 @@ export type RewardDeliveryFailedEvent = { export type TaskSyncedEvent = { userId: string; + source: string; // e.g. 'todoist' count: number; syncedAt: string; }; diff --git a/services/api/src/signals/todoist.ts b/services/api/src/signals/todoist.ts index 16d7f6c..1b76437 100644 --- a/services/api/src/signals/todoist.ts +++ b/services/api/src/signals/todoist.ts @@ -88,7 +88,7 @@ export class TodoistSignalSource implements SignalSource { }); this.cache.set(userId, { signals, fetchedAt: Date.now() }); - bus.publish('signals.task.synced', { userId, count: signals.length, syncedAt: now }); + bus.publish('signals.task.synced', { userId, source: 'todoist', count: signals.length, syncedAt: now }); return signals; }