Issue 21 — event infrastructure: - NormalizedEvent<T> + payload types in packages/shared-types/src/events/ - Bus.onPublish() hook for side-effect bridges - NATS JetStream adapter (services/api/src/events/nats.ts): connects when NATS_URL is set, creates signals.> and feedback.> streams, bridges all in-process bus publishes to JetStream — no-ops gracefully when NATS is absent - NATS service added to docker-compose (profile: events|full, port 4222/8222) Issue 22 — Todoist background sync: - services/api/src/signals/scheduler.ts: queries all active-token users every 15 min (TODOIST_SYNC_INTERVAL_MS), fan-out via todoistSource.fetchSignals() which emits signals.task.synced; on-demand fetch remains as freshness fallback - NATS_URL + TODOIST_SYNC_INTERVAL_MS added to config Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -37,4 +37,10 @@ export const config = {
|
||||
VAPID_PUBLIC_KEY: optional('VAPID_PUBLIC_KEY', ''),
|
||||
VAPID_PRIVATE_KEY: optional('VAPID_PRIVATE_KEY', ''),
|
||||
VAPID_SUBJECT: optional('VAPID_SUBJECT', 'mailto:admin@localhost'),
|
||||
|
||||
/** NATS server URL — omit to run without JetStream (in-process bus only) */
|
||||
NATS_URL: process.env['NATS_URL'] ?? '',
|
||||
|
||||
/** How often to proactively sync Todoist tasks in the background (ms) */
|
||||
TODOIST_SYNC_INTERVAL_MS: parseInt(optional('TODOIST_SYNC_INTERVAL_MS', String(15 * 60 * 1000)), 10),
|
||||
};
|
||||
|
||||
@@ -69,6 +69,12 @@ const RING_SIZE = 500;
|
||||
class Bus extends EventEmitter {
|
||||
private ring: StoredEvent[] = [];
|
||||
private seq = 0;
|
||||
private publishHooks: Array<(subject: string, payload: unknown) => void> = [];
|
||||
|
||||
/** Register a side-effect hook called on every publish (e.g. NATS bridge) */
|
||||
onPublish(hook: (subject: string, payload: unknown) => void): void {
|
||||
this.publishHooks.push(hook);
|
||||
}
|
||||
|
||||
publish<K extends keyof EventMap>(subject: K, payload: EventMap[K]): void {
|
||||
const entry: StoredEvent = {
|
||||
@@ -80,6 +86,7 @@ class Bus extends EventEmitter {
|
||||
if (this.ring.length >= RING_SIZE) this.ring.shift();
|
||||
this.ring.push(entry);
|
||||
this.emit(subject, payload);
|
||||
for (const hook of this.publishHooks) hook(subject, payload);
|
||||
}
|
||||
|
||||
subscribe<K extends keyof EventMap>(subject: K, handler: (payload: EventMap[K]) => void): void {
|
||||
|
||||
84
services/api/src/events/nats.ts
Normal file
84
services/api/src/events/nats.ts
Normal file
@@ -0,0 +1,84 @@
|
||||
/**
|
||||
* Optional NATS JetStream adapter.
|
||||
*
|
||||
* When NATS_URL is set: connects to NATS, creates the required streams on
|
||||
* startup, and wraps the in-process Bus so every publish also goes to
|
||||
* JetStream. Subscribers across processes (ml/serving, future services)
|
||||
* consume from JetStream.
|
||||
*
|
||||
* When NATS_URL is not set: this module is a no-op and the in-process Bus
|
||||
* works as before.
|
||||
*/
|
||||
|
||||
import type { NatsConnection, JetStreamClient, StreamConfig } from 'nats';
|
||||
import { bus } from './bus.js';
|
||||
|
||||
let nc: NatsConnection | null = null;
|
||||
let js: JetStreamClient | null = null;
|
||||
|
||||
// nats enums are string enums — import the values at runtime, not just types
|
||||
async function getStreamConfigs(): Promise<Partial<StreamConfig>[]> {
|
||||
const { StorageType: S, RetentionPolicy: R, DiscardPolicy: D } = await import('nats');
|
||||
return [
|
||||
{
|
||||
name: 'signals',
|
||||
subjects: ['signals.>'],
|
||||
max_msgs: 500_000,
|
||||
max_age: 7 * 24 * 60 * 60 * 1e9, // 7 days in nanoseconds
|
||||
storage: S.File,
|
||||
retention: R.Limits,
|
||||
discard: D.Old,
|
||||
num_replicas: 1,
|
||||
duplicate_window: 120e9,
|
||||
},
|
||||
{
|
||||
name: 'feedback',
|
||||
subjects: ['feedback.>'],
|
||||
max_msgs: 200_000,
|
||||
max_age: 30 * 24 * 60 * 60 * 1e9,
|
||||
storage: S.File,
|
||||
retention: R.Limits,
|
||||
discard: D.Old,
|
||||
num_replicas: 1,
|
||||
duplicate_window: 120e9,
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
export async function connectNats(natsUrl: string): Promise<void> {
|
||||
try {
|
||||
const { connect } = await import('nats');
|
||||
nc = await connect({ servers: natsUrl, reconnect: true, maxReconnectAttempts: -1 });
|
||||
js = nc.jetstream();
|
||||
|
||||
// Ensure streams exist (idempotent)
|
||||
const STREAMS = await getStreamConfigs();
|
||||
const jsm = await nc.jetstreamManager();
|
||||
for (const cfg of STREAMS) {
|
||||
try {
|
||||
await jsm.streams.info(cfg.name!);
|
||||
} catch {
|
||||
await jsm.streams.add(cfg as StreamConfig);
|
||||
}
|
||||
}
|
||||
|
||||
// Bridge: every in-process bus publish also goes to JetStream
|
||||
bus.onPublish((subject, payload) => {
|
||||
if (!js) return;
|
||||
const data = new TextEncoder().encode(JSON.stringify(payload));
|
||||
js.publish(subject, data).catch((err: Error) =>
|
||||
console.error(`[nats] publish failed for ${subject}: ${err.message}`),
|
||||
);
|
||||
});
|
||||
|
||||
console.log(`[nats] connected to ${natsUrl}, streams: ${STREAMS.map((s) => s.name).join(', ')}`);
|
||||
} catch (err: any) {
|
||||
console.warn(`[nats] connection failed — running without JetStream: ${err.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function closeNats(): Promise<void> {
|
||||
await nc?.drain();
|
||||
nc = null;
|
||||
js = null;
|
||||
}
|
||||
@@ -18,6 +18,8 @@ import { dirname } from 'path';
|
||||
import { requireAuth } from './middleware/session.js';
|
||||
import { requireAdmin } from './middleware/admin.js';
|
||||
import type { Request, Response } from 'express';
|
||||
import { connectNats } from './events/nats.js';
|
||||
import { startTodoistSyncScheduler } from './signals/scheduler.js';
|
||||
|
||||
await mkdir(dirname(config.DATABASE_PATH), { recursive: true });
|
||||
runMigrations();
|
||||
@@ -88,3 +90,9 @@ setInterval(purgeExpiredData, 24 * 60 * 60 * 1000);
|
||||
app.listen(config.PORT, () => {
|
||||
console.log(`oO API listening on http://localhost:${config.PORT}`);
|
||||
});
|
||||
|
||||
if (config.NATS_URL) {
|
||||
connectNats(config.NATS_URL);
|
||||
}
|
||||
|
||||
startTodoistSyncScheduler(config.TODOIST_SYNC_INTERVAL_MS);
|
||||
|
||||
55
services/api/src/signals/scheduler.ts
Normal file
55
services/api/src/signals/scheduler.ts
Normal file
@@ -0,0 +1,55 @@
|
||||
/**
|
||||
* Todoist background sync scheduler (Issue #22).
|
||||
*
|
||||
* Periodically fetches Todoist tasks for every user with an active token so
|
||||
* that signals are fresh before the next /recommend call. The on-demand fetch
|
||||
* in TodoistSignalSource remains as the freshness-critical fallback — if a user
|
||||
* hits /recommend and the cache is stale, it re-fetches inline.
|
||||
*
|
||||
* Interval: TODOIST_SYNC_INTERVAL_MS (default 15 min).
|
||||
*/
|
||||
|
||||
import { db } from '../db/index.js';
|
||||
import { integrationTokens } from '../db/schema.js';
|
||||
import { eq } from 'drizzle-orm';
|
||||
import { todoistSource } from './todoist.js';
|
||||
|
||||
const DEFAULT_INTERVAL_MS = 15 * 60 * 1000;
|
||||
|
||||
export function startTodoistSyncScheduler(intervalMs = DEFAULT_INTERVAL_MS): NodeJS.Timeout {
|
||||
async function syncAll(): Promise<void> {
|
||||
let users: { userId: string }[] = [];
|
||||
try {
|
||||
users = await db
|
||||
.select({ userId: integrationTokens.userId })
|
||||
.from(integrationTokens)
|
||||
.where(eq(integrationTokens.tokenStatus, 'active'));
|
||||
} catch (err: any) {
|
||||
console.error(`[scheduler] failed to query users: ${err.message}`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!users.length) return;
|
||||
|
||||
const results = await Promise.allSettled(
|
||||
users.map((u) => todoistSource.fetchSignals(u.userId)),
|
||||
);
|
||||
|
||||
let ok = 0;
|
||||
let failed = 0;
|
||||
for (const r of results) {
|
||||
if (r.status === 'fulfilled') ok++;
|
||||
else { failed++; console.error(`[scheduler] sync error:`, r.reason); }
|
||||
}
|
||||
|
||||
console.log(`[scheduler] todoist sync: ${ok} ok, ${failed} failed (${users.length} users)`);
|
||||
}
|
||||
|
||||
// Run once shortly after startup, then on interval
|
||||
const delay = setTimeout(() => {
|
||||
syncAll();
|
||||
setInterval(syncAll, intervalMs);
|
||||
}, 10_000);
|
||||
|
||||
return delay;
|
||||
}
|
||||
Reference in New Issue
Block a user