/** * Optional NATS JetStream adapter. * * When NATS_URL is set: connects to NATS, creates the required streams on * startup, and wraps the in-process Bus so every publish also goes to * JetStream. Subscribers across processes (ml/serving, future services) * consume from JetStream. * * When NATS_URL is not set: this module is a no-op and the in-process Bus * works as before. */ import type { NatsConnection, JetStreamClient, StreamConfig } from 'nats'; import { bus } from './bus.js'; import { logger } from '../logger.js'; let nc: NatsConnection | null = null; let js: JetStreamClient | null = null; // nats enums are string enums — import the values at runtime, not just types async function getStreamConfigs(): Promise[]> { const { StorageType: S, RetentionPolicy: R, DiscardPolicy: D } = await import('nats'); return [ { name: 'signals', subjects: ['signals.>'], max_msgs: 500_000, max_age: 7 * 24 * 60 * 60 * 1e9, // 7 days in nanoseconds storage: S.File, retention: R.Limits, discard: D.Old, num_replicas: 1, duplicate_window: 120e9, }, { name: 'feedback', subjects: ['feedback.>'], max_msgs: 200_000, max_age: 30 * 24 * 60 * 60 * 1e9, storage: S.File, retention: R.Limits, discard: D.Old, num_replicas: 1, duplicate_window: 120e9, }, ]; } export async function connectNats(natsUrl: string): Promise { try { const { connect } = await import('nats'); nc = await connect({ servers: natsUrl, reconnect: true, maxReconnectAttempts: -1 }); js = nc.jetstream(); // Ensure streams exist (idempotent) const STREAMS = await getStreamConfigs(); const jsm = await nc.jetstreamManager(); for (const cfg of STREAMS) { try { await jsm.streams.info(cfg.name!); } catch { await jsm.streams.add(cfg as StreamConfig); } } // Bridge: every in-process bus publish also goes to JetStream bus.onPublish((subject, payload) => { if (!js) return; const data = new TextEncoder().encode(JSON.stringify(payload)); js.publish(subject, data).catch((err: Error) => logger.error({ err, subject }, 'nats publish failed'), ); }); logger.info({ url: natsUrl, streams: STREAMS.map((s) => s.name) }, 'nats connected'); } catch (err: any) { logger.warn({ err }, 'nats connection failed — running without JetStream'); } } export async function closeNats(): Promise { await nc?.drain(); nc = null; js = null; }