Files
oO/services/api/src/events/__tests__/bus.test.ts
alvis 352469162d fix(signals): add missing source field to TaskSyncedEvent (#78)
TaskSyncedPayload in shared-types and ml/serving schemas both require
source, but TaskSyncedEvent in bus.ts and the todoist publish call both
omitted it — causing the JetStream consumer to nak every task.synced
message on validation failure.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-25 17:15:32 +00:00

222 lines
7.6 KiB
TypeScript

import { describe, it, expect, vi } from 'vitest';
import { Bus, bus } from '../bus.js';
// Use a fresh Bus instance for isolation in most tests
function makeBus() {
return new Bus();
}
describe('EventBus — delivery', () => {
it('delivers a published event to subscribers', () => {
const b = makeBus();
const handler = vi.fn();
b.subscribe('signals.tip.served', handler);
const payload = { userId: 'u1', tipId: 'tip:1', policy: 'random', servedAt: new Date().toISOString() };
b.publish('signals.tip.served', payload);
expect(handler).toHaveBeenCalledOnce();
expect(handler).toHaveBeenCalledWith(payload);
});
it('delivers to multiple subscribers on the same subject', () => {
const b = makeBus();
const h1 = vi.fn();
const h2 = vi.fn();
b.subscribe('signals.tip.served', h1);
b.subscribe('signals.tip.served', h2);
b.publish('signals.tip.served', { userId: 'u', tipId: 't', policy: 'p', servedAt: '' });
expect(h1).toHaveBeenCalledOnce();
expect(h2).toHaveBeenCalledOnce();
});
it('does not deliver to handlers on a different subject', () => {
const b = makeBus();
const feedbackHandler = vi.fn();
b.subscribe('signals.tip.feedback', feedbackHandler);
b.publish('signals.tip.served', { userId: 'u', tipId: 't', policy: 'p', servedAt: '' });
expect(feedbackHandler).not.toHaveBeenCalled();
});
it('does not call a handler after bus.off()', () => {
const b = makeBus();
const handler = vi.fn();
b.subscribe('signals.tip.served', handler);
b.off('signals.tip.served', handler);
b.publish('signals.tip.served', { userId: 'u', tipId: 't', policy: 'p', servedAt: '' });
expect(handler).not.toHaveBeenCalled();
});
it('does not throw when publishing with no subscribers', () => {
const b = makeBus();
expect(() =>
b.publish('signals.task.synced', { userId: 'u', source: 'todoist', count: 3, syncedAt: '' }),
).not.toThrow();
});
it('reward maps correctly: done=1, dismiss=-1, snooze=0', () => {
const b = makeBus();
const cases: Array<['done' | 'dismiss' | 'snooze', number]> = [
['done', 1.0],
['dismiss', -1.0],
['snooze', 0.0],
];
for (const [action, expected] of cases) {
const handler = vi.fn();
b.subscribe('signals.tip.feedback', handler);
const payload = {
userId: 'u1',
tipId: 'todoist:42',
action,
reward: action === 'done' ? 1.0 : action === 'dismiss' ? -1.0 : 0.0,
dwellMs: null,
createdAt: new Date().toISOString(),
};
b.publish('signals.tip.feedback', payload);
expect(handler).toHaveBeenCalledWith(expect.objectContaining({ reward: expected }));
b.off('signals.tip.feedback', handler);
}
});
});
describe('EventBus — ring buffer / tail()', () => {
it('tail() returns published events', () => {
const b = makeBus();
b.publish('signals.tip.served', { userId: 'u1', tipId: 't1', policy: 'p', servedAt: '' });
b.publish('signals.tip.served', { userId: 'u2', tipId: 't2', policy: 'p', servedAt: '' });
const events = b.tail();
expect(events.length).toBeGreaterThanOrEqual(2);
});
it('tail() filters by subject prefix', () => {
const b = makeBus();
b.publish('signals.tip.served', { userId: 'u', tipId: 't', policy: 'p', servedAt: '' });
b.publish('signals.task.synced', { userId: 'u', source: 'todoist', count: 1, syncedAt: '' });
const tipEvents = b.tail({ subject: 'signals.tip' });
expect(tipEvents.every((e) => e.subject.startsWith('signals.tip'))).toBe(true);
const taskEvents = b.tail({ subject: 'signals.task' });
expect(taskEvents.every((e) => e.subject.startsWith('signals.task'))).toBe(true);
});
it('tail() filters by userId', () => {
const b = makeBus();
b.publish('signals.tip.served', { userId: 'alice', tipId: 't1', policy: 'p', servedAt: '' });
b.publish('signals.tip.served', { userId: 'bob', tipId: 't2', policy: 'p', servedAt: '' });
const aliceEvents = b.tail({ userId: 'alice' });
expect(aliceEvents.every((e) => (e.payload as any).userId === 'alice')).toBe(true);
});
it('tail() respects limit', () => {
const b = makeBus();
for (let i = 0; i < 10; i++) {
b.publish('signals.tip.served', { userId: 'u', tipId: `t${i}`, policy: 'p', servedAt: '' });
}
const events = b.tail({ limit: 3 });
expect(events).toHaveLength(3);
});
it('tail() returns only events after `since` id', () => {
const b = makeBus();
b.publish('signals.tip.served', { userId: 'u', tipId: 't1', policy: 'p', servedAt: '' });
const snap = b.tail();
const lastId = snap[snap.length - 1].id;
b.publish('signals.tip.served', { userId: 'u', tipId: 't2', policy: 'p', servedAt: '' });
const after = b.tail({ since: lastId });
expect(after).toHaveLength(1);
expect((after[0].payload as any).tipId).toBe('t2');
});
it('assigns monotonically increasing ids', () => {
const b = makeBus();
b.publish('signals.tip.served', { userId: 'u', tipId: 't1', policy: 'p', servedAt: '' });
b.publish('signals.tip.served', { userId: 'u', tipId: 't2', policy: 'p', servedAt: '' });
const events = b.tail();
const ids = events.map((e) => e.id);
for (let i = 1; i < ids.length; i++) {
expect(ids[i]).toBeGreaterThan(ids[i - 1]);
}
});
it('ring buffer caps at 500 entries and evicts oldest', () => {
const b = makeBus();
// Publish 502 events — the first two should be evicted
for (let i = 0; i < 502; i++) {
b.publish('signals.tip.served', { userId: 'u', tipId: `t${i}`, policy: 'p', servedAt: '' });
}
const all = b.tail({ limit: 1000 });
expect(all).toHaveLength(500);
// Oldest surviving entry should be the 3rd published (index 2)
expect((all[0].payload as any).tipId).toBe('t2');
});
});
describe('EventBus — singleton bus export', () => {
it('singleton bus is a Bus instance', () => {
expect(bus).toBeInstanceOf(Bus);
});
});
describe('EventBus — onPublish hook (NATS bridge contract)', () => {
it('invokes registered hook with subject + payload on every publish', () => {
const b = makeBus();
const hook = vi.fn();
b.onPublish(hook);
const payload = { userId: 'u', source: 'todoist', count: 2, syncedAt: 'now' };
b.publish('signals.task.synced', payload);
expect(hook).toHaveBeenCalledOnce();
expect(hook).toHaveBeenCalledWith('signals.task.synced', payload);
});
it('fires multiple hooks in registration order', () => {
const b = makeBus();
const calls: string[] = [];
b.onPublish(() => calls.push('a'));
b.onPublish(() => calls.push('b'));
b.publish('signals.task.synced', { userId: 'u', source: 'todoist', count: 0, syncedAt: '' });
expect(calls).toEqual(['a', 'b']);
});
it('hook fires alongside subscribers — both receive the publish', () => {
const b = makeBus();
const hook = vi.fn();
const sub = vi.fn();
b.onPublish(hook);
b.subscribe('signals.task.synced', sub);
b.publish('signals.task.synced', { userId: 'u', source: 'todoist', count: 1, syncedAt: '' });
expect(hook).toHaveBeenCalledOnce();
expect(sub).toHaveBeenCalledOnce();
});
it('a throwing hook still bubbles up — bus does not silently swallow', () => {
// Documents current behaviour: hooks run inside publish(), so a thrown
// error escapes. The NATS adapter therefore catches inside the hook.
const b = makeBus();
b.onPublish(() => {
throw new Error('boom');
});
expect(() =>
b.publish('signals.task.synced', { userId: 'u', source: 'todoist', count: 0, syncedAt: '' }),
).toThrow('boom');
});
});