Issue 21 — event infrastructure: - NormalizedEvent<T> + payload types in packages/shared-types/src/events/ - Bus.onPublish() hook for side-effect bridges - NATS JetStream adapter (services/api/src/events/nats.ts): connects when NATS_URL is set, creates signals.> and feedback.> streams, bridges all in-process bus publishes to JetStream — no-ops gracefully when NATS is absent - NATS service added to docker-compose (profile: events|full, port 4222/8222) Issue 22 — Todoist background sync: - services/api/src/signals/scheduler.ts: queries all active-token users every 15 min (TODOIST_SYNC_INTERVAL_MS), fan-out via todoistSource.fetchSignals() which emits signals.task.synced; on-demand fetch remains as freshness fallback - NATS_URL + TODOIST_SYNC_INTERVAL_MS added to config Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -178,6 +178,26 @@ services:
|
|||||||
timeout: 5s
|
timeout: 5s
|
||||||
retries: 5
|
retries: 5
|
||||||
|
|
||||||
|
# ── events profile — NATS JetStream ─────────────────────────────────────
|
||||||
|
# Start: docker compose --profile events up
|
||||||
|
# NATS monitoring: http://localhost:8222
|
||||||
|
# Enable in the API by setting NATS_URL=nats://nats:4222 in .env.local
|
||||||
|
|
||||||
|
nats:
|
||||||
|
image: nats:2.10-alpine
|
||||||
|
profiles: [events, full]
|
||||||
|
command: ["-js", "-sd", "/data", "-m", "8222"]
|
||||||
|
volumes:
|
||||||
|
- /mnt/ssd/dbs/oo/nats:/data
|
||||||
|
ports:
|
||||||
|
- "127.0.0.1:4222:4222" # client connections
|
||||||
|
- "127.0.0.1:8222:8222" # HTTP monitoring
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "wget", "--spider", "-q", "http://localhost:8222/healthz"]
|
||||||
|
interval: 10s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 5
|
||||||
|
|
||||||
litellm:
|
litellm:
|
||||||
image: ghcr.io/berriai/litellm:main-latest
|
image: ghcr.io/berriai/litellm:main-latest
|
||||||
profiles: [ai]
|
profiles: [ai]
|
||||||
|
|||||||
60
packages/shared-types/src/events/index.ts
Normal file
60
packages/shared-types/src/events/index.ts
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
/**
|
||||||
|
* NormalizedEvent — the durable envelope for all events flowing through
|
||||||
|
* the system. Today: in-process EventEmitter. Tomorrow: NATS JetStream.
|
||||||
|
*
|
||||||
|
* Subject taxonomy:
|
||||||
|
* signals.task.synced — Todoist (or other source) task list refreshed
|
||||||
|
* signals.tip.served — tip returned to client
|
||||||
|
* signals.tip.feedback — user reacted (done / dismiss / snooze / helpful / not_helpful)
|
||||||
|
* signals.tip.reward_failed — reward delivery to ml/serving failed after retries
|
||||||
|
* signals.integration.token_expired — OAuth token needs reconnect
|
||||||
|
*/
|
||||||
|
export interface NormalizedEvent<T = unknown> {
|
||||||
|
/** NATS-style subject: domain.entity.verb */
|
||||||
|
subject: string;
|
||||||
|
/** ISO 8601 timestamp */
|
||||||
|
ts: string;
|
||||||
|
/** Monotonically increasing sequence number (in-process ring; JetStream seq in prod) */
|
||||||
|
seq: number;
|
||||||
|
payload: T;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Payload types ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
export interface TaskSyncedPayload {
|
||||||
|
userId: string;
|
||||||
|
source: string; // e.g. 'todoist'
|
||||||
|
count: number;
|
||||||
|
syncedAt: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface TipServedPayload {
|
||||||
|
userId: string;
|
||||||
|
tipId: string;
|
||||||
|
policy: string;
|
||||||
|
servedAt: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface TipFeedbackPayload {
|
||||||
|
userId: string;
|
||||||
|
tipId: string;
|
||||||
|
action: 'done' | 'dismiss' | 'snooze' | 'helpful' | 'not_helpful';
|
||||||
|
reward: number;
|
||||||
|
dwellMs: number | null;
|
||||||
|
createdAt: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface TipRewardFailedPayload {
|
||||||
|
userId: string;
|
||||||
|
tipId: string;
|
||||||
|
reward: number;
|
||||||
|
attempts: number;
|
||||||
|
error: string;
|
||||||
|
failedAt: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface IntegrationTokenExpiredPayload {
|
||||||
|
userId: string;
|
||||||
|
provider: string;
|
||||||
|
detectedAt: string;
|
||||||
|
}
|
||||||
@@ -3,3 +3,4 @@ export * from './http/auth.js';
|
|||||||
export * from './http/integrations.js';
|
export * from './http/integrations.js';
|
||||||
export * from './http/user.js';
|
export * from './http/user.js';
|
||||||
export * from './http/signal.js';
|
export * from './http/signal.js';
|
||||||
|
export * from './events/index.js';
|
||||||
|
|||||||
26
pnpm-lock.yaml
generated
26
pnpm-lock.yaml
generated
@@ -162,6 +162,9 @@ importers:
|
|||||||
nanoid:
|
nanoid:
|
||||||
specifier: ^5.1.0
|
specifier: ^5.1.0
|
||||||
version: 5.1.7
|
version: 5.1.7
|
||||||
|
nats:
|
||||||
|
specifier: ^2.29.3
|
||||||
|
version: 2.29.3
|
||||||
node-fetch:
|
node-fetch:
|
||||||
specifier: ^3.3.2
|
specifier: ^3.3.2
|
||||||
version: 3.3.2
|
version: 3.3.2
|
||||||
@@ -2322,6 +2325,10 @@ packages:
|
|||||||
napi-build-utils@2.0.0:
|
napi-build-utils@2.0.0:
|
||||||
resolution: {integrity: sha512-GEbrYkbfF7MoNaoh2iGG84Mnf/WZfB0GdGEsM8wz7Expx/LlWf5U8t9nvJKXSp3qr5IsEbK04cBGhol/KwOsWA==}
|
resolution: {integrity: sha512-GEbrYkbfF7MoNaoh2iGG84Mnf/WZfB0GdGEsM8wz7Expx/LlWf5U8t9nvJKXSp3qr5IsEbK04cBGhol/KwOsWA==}
|
||||||
|
|
||||||
|
nats@2.29.3:
|
||||||
|
resolution: {integrity: sha512-tOQCRCwC74DgBTk4pWZ9V45sk4d7peoE2njVprMRCBXrhJ5q5cYM7i6W+Uvw2qUrcfOSnuisrX7bEx3b3Wx4QA==}
|
||||||
|
engines: {node: '>= 14.0.0'}
|
||||||
|
|
||||||
negotiator@0.6.3:
|
negotiator@0.6.3:
|
||||||
resolution: {integrity: sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==}
|
resolution: {integrity: sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==}
|
||||||
engines: {node: '>= 0.6'}
|
engines: {node: '>= 0.6'}
|
||||||
@@ -2347,6 +2354,10 @@ packages:
|
|||||||
sass:
|
sass:
|
||||||
optional: true
|
optional: true
|
||||||
|
|
||||||
|
nkeys.js@1.1.0:
|
||||||
|
resolution: {integrity: sha512-tB/a0shZL5UZWSwsoeyqfTszONTt4k2YS0tuQioMOD180+MbombYVgzDUYHlx+gejYK6rgf08n/2Df99WY0Sxg==}
|
||||||
|
engines: {node: '>=10.0.0'}
|
||||||
|
|
||||||
node-abi@3.89.0:
|
node-abi@3.89.0:
|
||||||
resolution: {integrity: sha512-6u9UwL0HlAl21+agMN3YAMXcKByMqwGx+pq+P76vii5f7hTPtKDp08/H9py6DY+cfDw7kQNTGEj/rly3IgbNQA==}
|
resolution: {integrity: sha512-6u9UwL0HlAl21+agMN3YAMXcKByMqwGx+pq+P76vii5f7hTPtKDp08/H9py6DY+cfDw7kQNTGEj/rly3IgbNQA==}
|
||||||
engines: {node: '>=10'}
|
engines: {node: '>=10'}
|
||||||
@@ -2851,6 +2862,9 @@ packages:
|
|||||||
resolution: {integrity: sha512-+v2QJey7ZUeUiuigkU+uFfklvNUyPI2VO2vBpMYJA+a1hKFLFiKtUYlRHdb3P9CrAvMzi0upbjI4WT+zKtqkBg==}
|
resolution: {integrity: sha512-+v2QJey7ZUeUiuigkU+uFfklvNUyPI2VO2vBpMYJA+a1hKFLFiKtUYlRHdb3P9CrAvMzi0upbjI4WT+zKtqkBg==}
|
||||||
hasBin: true
|
hasBin: true
|
||||||
|
|
||||||
|
tweetnacl@1.0.3:
|
||||||
|
resolution: {integrity: sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw==}
|
||||||
|
|
||||||
type-is@1.6.18:
|
type-is@1.6.18:
|
||||||
resolution: {integrity: sha512-TkRKr9sUTxEH8MdfuCSP7VizJyzRNMjj2J2do2Jr3Kym598JVdEksuzPQCnlFPW4ky9Q+iA+ma9BGm06XQBy8g==}
|
resolution: {integrity: sha512-TkRKr9sUTxEH8MdfuCSP7VizJyzRNMjj2J2do2Jr3Kym598JVdEksuzPQCnlFPW4ky9Q+iA+ma9BGm06XQBy8g==}
|
||||||
engines: {node: '>= 0.6'}
|
engines: {node: '>= 0.6'}
|
||||||
@@ -3856,7 +3870,7 @@ snapshots:
|
|||||||
obug: 2.1.1
|
obug: 2.1.1
|
||||||
std-env: 4.1.0
|
std-env: 4.1.0
|
||||||
tinyrainbow: 3.1.0
|
tinyrainbow: 3.1.0
|
||||||
vitest: 4.1.4(@types/node@22.19.17)(@vitest/coverage-v8@4.1.4)(jsdom@29.0.2)(vite@8.0.8(@types/node@22.19.17)(esbuild@0.27.7)(jiti@1.21.7)(tsx@4.21.0))
|
vitest: 4.1.4(@types/node@22.19.17)(@vitest/coverage-v8@4.1.4)(jsdom@29.0.2)(vite@8.0.8(@types/node@22.19.17)(esbuild@0.19.12)(jiti@1.21.7)(tsx@4.21.0))
|
||||||
|
|
||||||
'@vitest/expect@4.1.4':
|
'@vitest/expect@4.1.4':
|
||||||
dependencies:
|
dependencies:
|
||||||
@@ -4757,6 +4771,10 @@ snapshots:
|
|||||||
|
|
||||||
napi-build-utils@2.0.0: {}
|
napi-build-utils@2.0.0: {}
|
||||||
|
|
||||||
|
nats@2.29.3:
|
||||||
|
dependencies:
|
||||||
|
nkeys.js: 1.1.0
|
||||||
|
|
||||||
negotiator@0.6.3: {}
|
negotiator@0.6.3: {}
|
||||||
|
|
||||||
next@15.5.15(@playwright/test@1.59.1)(react-dom@19.2.5(react@19.2.5))(react@19.2.5):
|
next@15.5.15(@playwright/test@1.59.1)(react-dom@19.2.5(react@19.2.5))(react@19.2.5):
|
||||||
@@ -4783,6 +4801,10 @@ snapshots:
|
|||||||
- '@babel/core'
|
- '@babel/core'
|
||||||
- babel-plugin-macros
|
- babel-plugin-macros
|
||||||
|
|
||||||
|
nkeys.js@1.1.0:
|
||||||
|
dependencies:
|
||||||
|
tweetnacl: 1.0.3
|
||||||
|
|
||||||
node-abi@3.89.0:
|
node-abi@3.89.0:
|
||||||
dependencies:
|
dependencies:
|
||||||
semver: 7.7.4
|
semver: 7.7.4
|
||||||
@@ -5372,6 +5394,8 @@ snapshots:
|
|||||||
'@turbo/windows-64': 2.9.6
|
'@turbo/windows-64': 2.9.6
|
||||||
'@turbo/windows-arm64': 2.9.6
|
'@turbo/windows-arm64': 2.9.6
|
||||||
|
|
||||||
|
tweetnacl@1.0.3: {}
|
||||||
|
|
||||||
type-is@1.6.18:
|
type-is@1.6.18:
|
||||||
dependencies:
|
dependencies:
|
||||||
media-typer: 0.3.0
|
media-typer: 0.3.0
|
||||||
|
|||||||
@@ -24,6 +24,7 @@
|
|||||||
"express": "^4.21.2",
|
"express": "^4.21.2",
|
||||||
"express-session": "^1.18.1",
|
"express-session": "^1.18.1",
|
||||||
"nanoid": "^5.1.0",
|
"nanoid": "^5.1.0",
|
||||||
|
"nats": "^2.29.3",
|
||||||
"node-fetch": "^3.3.2",
|
"node-fetch": "^3.3.2",
|
||||||
"openid-client": "^6.3.4",
|
"openid-client": "^6.3.4",
|
||||||
"web-push": "^3.6.7",
|
"web-push": "^3.6.7",
|
||||||
|
|||||||
@@ -37,4 +37,10 @@ export const config = {
|
|||||||
VAPID_PUBLIC_KEY: optional('VAPID_PUBLIC_KEY', ''),
|
VAPID_PUBLIC_KEY: optional('VAPID_PUBLIC_KEY', ''),
|
||||||
VAPID_PRIVATE_KEY: optional('VAPID_PRIVATE_KEY', ''),
|
VAPID_PRIVATE_KEY: optional('VAPID_PRIVATE_KEY', ''),
|
||||||
VAPID_SUBJECT: optional('VAPID_SUBJECT', 'mailto:admin@localhost'),
|
VAPID_SUBJECT: optional('VAPID_SUBJECT', 'mailto:admin@localhost'),
|
||||||
|
|
||||||
|
/** NATS server URL — omit to run without JetStream (in-process bus only) */
|
||||||
|
NATS_URL: process.env['NATS_URL'] ?? '',
|
||||||
|
|
||||||
|
/** How often to proactively sync Todoist tasks in the background (ms) */
|
||||||
|
TODOIST_SYNC_INTERVAL_MS: parseInt(optional('TODOIST_SYNC_INTERVAL_MS', String(15 * 60 * 1000)), 10),
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -69,6 +69,12 @@ const RING_SIZE = 500;
|
|||||||
class Bus extends EventEmitter {
|
class Bus extends EventEmitter {
|
||||||
private ring: StoredEvent[] = [];
|
private ring: StoredEvent[] = [];
|
||||||
private seq = 0;
|
private seq = 0;
|
||||||
|
private publishHooks: Array<(subject: string, payload: unknown) => void> = [];
|
||||||
|
|
||||||
|
/** Register a side-effect hook called on every publish (e.g. NATS bridge) */
|
||||||
|
onPublish(hook: (subject: string, payload: unknown) => void): void {
|
||||||
|
this.publishHooks.push(hook);
|
||||||
|
}
|
||||||
|
|
||||||
publish<K extends keyof EventMap>(subject: K, payload: EventMap[K]): void {
|
publish<K extends keyof EventMap>(subject: K, payload: EventMap[K]): void {
|
||||||
const entry: StoredEvent = {
|
const entry: StoredEvent = {
|
||||||
@@ -80,6 +86,7 @@ class Bus extends EventEmitter {
|
|||||||
if (this.ring.length >= RING_SIZE) this.ring.shift();
|
if (this.ring.length >= RING_SIZE) this.ring.shift();
|
||||||
this.ring.push(entry);
|
this.ring.push(entry);
|
||||||
this.emit(subject, payload);
|
this.emit(subject, payload);
|
||||||
|
for (const hook of this.publishHooks) hook(subject, payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
subscribe<K extends keyof EventMap>(subject: K, handler: (payload: EventMap[K]) => void): void {
|
subscribe<K extends keyof EventMap>(subject: K, handler: (payload: EventMap[K]) => void): void {
|
||||||
|
|||||||
84
services/api/src/events/nats.ts
Normal file
84
services/api/src/events/nats.ts
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
/**
|
||||||
|
* Optional NATS JetStream adapter.
|
||||||
|
*
|
||||||
|
* When NATS_URL is set: connects to NATS, creates the required streams on
|
||||||
|
* startup, and wraps the in-process Bus so every publish also goes to
|
||||||
|
* JetStream. Subscribers across processes (ml/serving, future services)
|
||||||
|
* consume from JetStream.
|
||||||
|
*
|
||||||
|
* When NATS_URL is not set: this module is a no-op and the in-process Bus
|
||||||
|
* works as before.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { NatsConnection, JetStreamClient, StreamConfig } from 'nats';
|
||||||
|
import { bus } from './bus.js';
|
||||||
|
|
||||||
|
let nc: NatsConnection | null = null;
|
||||||
|
let js: JetStreamClient | null = null;
|
||||||
|
|
||||||
|
// nats enums are string enums — import the values at runtime, not just types
|
||||||
|
async function getStreamConfigs(): Promise<Partial<StreamConfig>[]> {
|
||||||
|
const { StorageType: S, RetentionPolicy: R, DiscardPolicy: D } = await import('nats');
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
name: 'signals',
|
||||||
|
subjects: ['signals.>'],
|
||||||
|
max_msgs: 500_000,
|
||||||
|
max_age: 7 * 24 * 60 * 60 * 1e9, // 7 days in nanoseconds
|
||||||
|
storage: S.File,
|
||||||
|
retention: R.Limits,
|
||||||
|
discard: D.Old,
|
||||||
|
num_replicas: 1,
|
||||||
|
duplicate_window: 120e9,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'feedback',
|
||||||
|
subjects: ['feedback.>'],
|
||||||
|
max_msgs: 200_000,
|
||||||
|
max_age: 30 * 24 * 60 * 60 * 1e9,
|
||||||
|
storage: S.File,
|
||||||
|
retention: R.Limits,
|
||||||
|
discard: D.Old,
|
||||||
|
num_replicas: 1,
|
||||||
|
duplicate_window: 120e9,
|
||||||
|
},
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function connectNats(natsUrl: string): Promise<void> {
|
||||||
|
try {
|
||||||
|
const { connect } = await import('nats');
|
||||||
|
nc = await connect({ servers: natsUrl, reconnect: true, maxReconnectAttempts: -1 });
|
||||||
|
js = nc.jetstream();
|
||||||
|
|
||||||
|
// Ensure streams exist (idempotent)
|
||||||
|
const STREAMS = await getStreamConfigs();
|
||||||
|
const jsm = await nc.jetstreamManager();
|
||||||
|
for (const cfg of STREAMS) {
|
||||||
|
try {
|
||||||
|
await jsm.streams.info(cfg.name!);
|
||||||
|
} catch {
|
||||||
|
await jsm.streams.add(cfg as StreamConfig);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bridge: every in-process bus publish also goes to JetStream
|
||||||
|
bus.onPublish((subject, payload) => {
|
||||||
|
if (!js) return;
|
||||||
|
const data = new TextEncoder().encode(JSON.stringify(payload));
|
||||||
|
js.publish(subject, data).catch((err: Error) =>
|
||||||
|
console.error(`[nats] publish failed for ${subject}: ${err.message}`),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log(`[nats] connected to ${natsUrl}, streams: ${STREAMS.map((s) => s.name).join(', ')}`);
|
||||||
|
} catch (err: any) {
|
||||||
|
console.warn(`[nats] connection failed — running without JetStream: ${err.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function closeNats(): Promise<void> {
|
||||||
|
await nc?.drain();
|
||||||
|
nc = null;
|
||||||
|
js = null;
|
||||||
|
}
|
||||||
@@ -18,6 +18,8 @@ import { dirname } from 'path';
|
|||||||
import { requireAuth } from './middleware/session.js';
|
import { requireAuth } from './middleware/session.js';
|
||||||
import { requireAdmin } from './middleware/admin.js';
|
import { requireAdmin } from './middleware/admin.js';
|
||||||
import type { Request, Response } from 'express';
|
import type { Request, Response } from 'express';
|
||||||
|
import { connectNats } from './events/nats.js';
|
||||||
|
import { startTodoistSyncScheduler } from './signals/scheduler.js';
|
||||||
|
|
||||||
await mkdir(dirname(config.DATABASE_PATH), { recursive: true });
|
await mkdir(dirname(config.DATABASE_PATH), { recursive: true });
|
||||||
runMigrations();
|
runMigrations();
|
||||||
@@ -88,3 +90,9 @@ setInterval(purgeExpiredData, 24 * 60 * 60 * 1000);
|
|||||||
app.listen(config.PORT, () => {
|
app.listen(config.PORT, () => {
|
||||||
console.log(`oO API listening on http://localhost:${config.PORT}`);
|
console.log(`oO API listening on http://localhost:${config.PORT}`);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (config.NATS_URL) {
|
||||||
|
connectNats(config.NATS_URL);
|
||||||
|
}
|
||||||
|
|
||||||
|
startTodoistSyncScheduler(config.TODOIST_SYNC_INTERVAL_MS);
|
||||||
|
|||||||
55
services/api/src/signals/scheduler.ts
Normal file
55
services/api/src/signals/scheduler.ts
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
/**
|
||||||
|
* Todoist background sync scheduler (Issue #22).
|
||||||
|
*
|
||||||
|
* Periodically fetches Todoist tasks for every user with an active token so
|
||||||
|
* that signals are fresh before the next /recommend call. The on-demand fetch
|
||||||
|
* in TodoistSignalSource remains as the freshness-critical fallback — if a user
|
||||||
|
* hits /recommend and the cache is stale, it re-fetches inline.
|
||||||
|
*
|
||||||
|
* Interval: TODOIST_SYNC_INTERVAL_MS (default 15 min).
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { db } from '../db/index.js';
|
||||||
|
import { integrationTokens } from '../db/schema.js';
|
||||||
|
import { eq } from 'drizzle-orm';
|
||||||
|
import { todoistSource } from './todoist.js';
|
||||||
|
|
||||||
|
const DEFAULT_INTERVAL_MS = 15 * 60 * 1000;
|
||||||
|
|
||||||
|
export function startTodoistSyncScheduler(intervalMs = DEFAULT_INTERVAL_MS): NodeJS.Timeout {
|
||||||
|
async function syncAll(): Promise<void> {
|
||||||
|
let users: { userId: string }[] = [];
|
||||||
|
try {
|
||||||
|
users = await db
|
||||||
|
.select({ userId: integrationTokens.userId })
|
||||||
|
.from(integrationTokens)
|
||||||
|
.where(eq(integrationTokens.tokenStatus, 'active'));
|
||||||
|
} catch (err: any) {
|
||||||
|
console.error(`[scheduler] failed to query users: ${err.message}`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!users.length) return;
|
||||||
|
|
||||||
|
const results = await Promise.allSettled(
|
||||||
|
users.map((u) => todoistSource.fetchSignals(u.userId)),
|
||||||
|
);
|
||||||
|
|
||||||
|
let ok = 0;
|
||||||
|
let failed = 0;
|
||||||
|
for (const r of results) {
|
||||||
|
if (r.status === 'fulfilled') ok++;
|
||||||
|
else { failed++; console.error(`[scheduler] sync error:`, r.reason); }
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`[scheduler] todoist sync: ${ok} ok, ${failed} failed (${users.length} users)`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run once shortly after startup, then on interval
|
||||||
|
const delay = setTimeout(() => {
|
||||||
|
syncAll();
|
||||||
|
setInterval(syncAll, intervalMs);
|
||||||
|
}, 10_000);
|
||||||
|
|
||||||
|
return delay;
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user