From c7edd92e155fdf1b586aef80e2cad9576bbd3ae5 Mon Sep 17 00:00:00 2001 From: alvis Date: Wed, 15 Apr 2026 14:08:00 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20M1=20=E2=80=94=20LinUCB=20bandit,=20Rem?= =?UTF-8?q?otePolicy,=20Web=20Push,=20event=20bus?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ML serving: - LinUCB contextual bandit (disjoint, d=5 features: hour_sin/cos, is_overdue, task_age, priority) - /score endpoint replaces stub random; /reward endpoint for online learning - Per-user model state persisted to disk as JSON (survives restarts) - venv at ml/serving/.venv; start with pnpm dev from ml/serving Recommender: - Todoist fetch now extracts features (is_overdue, task_age_days, priority) - RemotePolicy calls ml/serving with 3s timeout; falls back to RandomPolicy - Reward sent to /reward on feedback (done=+1, snooze=0, dismiss=-1) Web Push: - VAPID keys in config; push_subscriptions table in DB - POST/DELETE /api/push/subscribe; GET /api/push/vapid-public-key - Service worker (public/sw.js): push → showNotification, notificationclick → focus/open - "notify me" button on tip page; registers SW + subscribes on permission grant Event bus: - services/api/src/events/bus.ts: typed EventEmitter wrapper - Subjects: signals.tip.served, signals.tip.feedback, signals.task.synced - Same publish/subscribe API NATS JetStream will implement — swap is mechanical Co-Authored-By: Claude Sonnet 4.6 --- .env.example | 5 + .gitignore | 1 + apps/web/public/sw.js | 25 ++++ apps/web/src/app/tip/page.tsx | 46 ++++++- apps/web/src/lib/api.ts | 19 +++ ml/serving/main.py | 151 +++++++++++++++++++-- ml/serving/package.json | 4 +- ml/serving/requirements.txt | 1 + pnpm-lock.yaml | 100 ++++++++++++++ services/api/package.json | 26 ++-- services/api/src/config.ts | 4 + services/api/src/db/schema.ts | 9 ++ services/api/src/events/bus.ts | 52 +++++++ services/api/src/index.ts | 2 + services/api/src/routes/push.ts | 98 ++++++++++++++ services/api/src/routes/recommender.ts | 180 ++++++++++++++++++------- 16 files changed, 648 insertions(+), 75 deletions(-) create mode 100644 apps/web/public/sw.js create mode 100644 services/api/src/events/bus.ts create mode 100644 services/api/src/routes/push.ts diff --git a/.env.example b/.env.example index 0b55ce3..715005f 100644 --- a/.env.example +++ b/.env.example @@ -14,6 +14,11 @@ ML_SERVING_URL=http://localhost:8000 GOOGLE_CLIENT_ID= GOOGLE_CLIENT_SECRET= +# VAPID (Web Push) — generate: node -e "const wp=require('web-push');console.log(JSON.stringify(wp.generateVAPIDKeys()))" +VAPID_PUBLIC_KEY= +VAPID_PRIVATE_KEY= +VAPID_SUBJECT=mailto:you@example.com + # Todoist OAuth — https://developer.todoist.com/appconsole.html TODOIST_CLIENT_ID= TODOIST_CLIENT_SECRET= diff --git a/.gitignore b/.gitignore index 1da3b2b..bb6ba65 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ build/ __pycache__/ *.pyc .venv/ +__pycache__/ .mypy_cache/ .pytest_cache/ .ruff_cache/ diff --git a/apps/web/public/sw.js b/apps/web/public/sw.js new file mode 100644 index 0000000..b0c52e7 --- /dev/null +++ b/apps/web/public/sw.js @@ -0,0 +1,25 @@ +self.addEventListener('push', (event) => { + const data = event.data?.json() ?? {}; + event.waitUntil( + self.registration.showNotification(data.title ?? 'oO', { + body: data.body ?? '', + icon: '/icon-192.png', + badge: '/icon-192.png', + data: { url: data.url ?? '/tip' }, + }) + ); +}); + +self.addEventListener('notificationclick', (event) => { + event.notification.close(); + event.waitUntil( + clients.matchAll({ type: 'window', includeUncontrolled: true }).then((list) => { + for (const client of list) { + if (client.url.includes(self.location.origin) && 'focus' in client) { + return client.focus(); + } + } + return clients.openWindow(event.notification.data?.url ?? '/tip'); + }) + ); +}); diff --git a/apps/web/src/app/tip/page.tsx b/apps/web/src/app/tip/page.tsx index 09f3b0c..55e5aab 100644 --- a/apps/web/src/app/tip/page.tsx +++ b/apps/web/src/app/tip/page.tsx @@ -1,7 +1,7 @@ 'use client'; import { useEffect, useState, useRef, useCallback } from 'react'; -import { getRecommendation, sendFeedback } from '@/lib/api'; +import { getRecommendation, sendFeedback, getVapidPublicKey, subscribePush } from '@/lib/api'; import type { Tip } from '@oo/shared-types'; type State = 'loading' | 'tip' | 'empty' | 'actions' | 'done'; @@ -30,6 +30,7 @@ export default function TipPage() { const [visible, setVisible] = useState(false); const holdTimer = useRef | null>(null); const [pressed, setPressed] = useState(false); + const [pushState, setPushState] = useState<'idle' | 'subscribed' | 'denied'>('idle'); // Fade in after state change settles useEffect(() => { @@ -60,6 +61,31 @@ export default function TipPage() { useEffect(() => { loadTip(); }, [loadTip]); + // Check existing push permission on mount + useEffect(() => { + if (typeof Notification !== 'undefined' && Notification.permission === 'granted') { + setPushState('subscribed'); + } else if (typeof Notification !== 'undefined' && Notification.permission === 'denied') { + setPushState('denied'); + } + }, []); + + const requestPush = useCallback(async () => { + if (!('serviceWorker' in navigator) || !('PushManager' in window)) return; + const permission = await Notification.requestPermission(); + if (permission !== 'granted') { setPushState('denied'); return; } + try { + const reg = await navigator.serviceWorker.register('/sw.js'); + const vapidKey = await getVapidPublicKey(); + const sub = await reg.pushManager.subscribe({ + userVisibleOnly: true, + applicationServerKey: vapidKey, + }); + await subscribePush(sub.toJSON()); + setPushState('subscribed'); + } catch { setPushState('denied'); } + }, []); + const react = async (action: 'done' | 'dismiss' | 'snooze') => { if (!tip) return; setVisible(false); @@ -161,6 +187,24 @@ export default function TipPage() { }}> hold to act

+ {pushState === 'idle' && ( + + )} )} diff --git a/apps/web/src/lib/api.ts b/apps/web/src/lib/api.ts index 26beb20..34264d5 100644 --- a/apps/web/src/lib/api.ts +++ b/apps/web/src/lib/api.ts @@ -62,3 +62,22 @@ export async function deleteAccount() { export async function logout() { return apiFetch<{ ok: boolean }>('/auth/logout', { method: 'POST' }); } + +export async function getVapidPublicKey(): Promise { + const { key } = await apiFetch<{ key: string }>('/push/vapid-public-key'); + return key; +} + +export async function subscribePush(subscription: PushSubscriptionJSON) { + return apiFetch<{ ok: boolean }>('/push/subscribe', { + method: 'POST', + body: JSON.stringify(subscription), + }); +} + +export async function unsubscribePush(endpoint: string) { + return apiFetch<{ ok: boolean }>('/push/subscribe', { + method: 'DELETE', + body: JSON.stringify({ endpoint }), + }); +} diff --git a/ml/serving/main.py b/ml/serving/main.py index 6147622..df0113f 100644 --- a/ml/serving/main.py +++ b/ml/serving/main.py @@ -1,32 +1,101 @@ """ -oO ML Serving — Phase 0 stub. - -Returns a placeholder response that matches the interface the real scorer will implement. -The recommender service calls this via RemotePolicy (not yet wired in Phase 0). +oO ML Serving — Phase 1: LinUCB contextual bandit. Contract: - POST /score - Body: { user_id: str, candidates: [{ id: str, content: str, source: str, source_id?: str }] } - Response: { tip_id: str, score: float } + POST /score { user_id, candidates, context } → { tip_id, score, policy } + POST /reward { user_id, tip_id, reward, features } → { ok } + GET /health → { ok } + +Features (d=5): + hour_sin, hour_cos — cyclical time-of-day encoding + is_overdue — 0 or 1 + task_age_days — days since due date (clipped 0–30, normalised 0–1) + priority_norm — Todoist priority 1–4, normalised to 0–1 """ +from __future__ import annotations + +import json +import math +import os +import random +from pathlib import Path +from typing import Optional + +import numpy as np from fastapi import FastAPI, HTTPException from pydantic import BaseModel -import random -app = FastAPI(title="oO ML Serving", version="0.0.0") +app = FastAPI(title="oO ML Serving", version="1.0.0") + +STATE_DIR = Path(os.getenv("STATE_DIR", "/tmp/oo-bandit-state")) +STATE_DIR.mkdir(parents=True, exist_ok=True) + +ALPHA = 1.0 # exploration coefficient +D = 5 # feature dimension + + +# ── Feature helpers ──────────────────────────────────────────────────────── + +def build_feature_vector(features: dict) -> np.ndarray: + hour = features.get("hour_of_day", 12) + hour_sin = math.sin(2 * math.pi * hour / 24) + hour_cos = math.cos(2 * math.pi * hour / 24) + is_overdue = float(bool(features.get("is_overdue", False))) + age = min(float(features.get("task_age_days", 0)), 30.0) / 30.0 + priority = (float(features.get("priority", 1)) - 1.0) / 3.0 + return np.array([hour_sin, hour_cos, is_overdue, age, priority], dtype=np.float64) + + +# ── Per-user bandit state (disjoint LinUCB, global arm) ─────────────────── + +def state_path(user_id: str) -> Path: + safe = "".join(c if c.isalnum() else "_" for c in user_id) + return STATE_DIR / f"{safe}.json" + + +def load_state(user_id: str) -> tuple[np.ndarray, np.ndarray]: + """Returns (A, b). A is DxD, b is D-vector.""" + p = state_path(user_id) + if p.exists(): + raw = json.loads(p.read_text()) + A = np.array(raw["A"], dtype=np.float64) + b = np.array(raw["b"], dtype=np.float64) + return A, b + return np.identity(D, dtype=np.float64), np.zeros(D, dtype=np.float64) + + +def save_state(user_id: str, A: np.ndarray, b: np.ndarray) -> None: + p = state_path(user_id) + p.write_text(json.dumps({"A": A.tolist(), "b": b.tolist()})) + + +# ── API models ───────────────────────────────────────────────────────────── + +class CandidateFeatures(BaseModel): + hour_of_day: int = 12 + is_overdue: bool = False + task_age_days: float = 0.0 + priority: int = 1 class Candidate(BaseModel): id: str content: str source: str - source_id: str | None = None + source_id: Optional[str] = None + features: CandidateFeatures = CandidateFeatures() + + +class Context(BaseModel): + hour_of_day: int = 12 + day_of_week: int = 0 class ScoreRequest(BaseModel): user_id: str candidates: list[Candidate] + context: Context = Context() class ScoreResponse(BaseModel): @@ -35,15 +104,69 @@ class ScoreResponse(BaseModel): policy: str +class RewardRequest(BaseModel): + user_id: str + tip_id: str + reward: float # +1 done, 0 snooze, -1 dismiss + features: CandidateFeatures + + +class RewardResponse(BaseModel): + ok: bool + + +# ── Endpoints ────────────────────────────────────────────────────────────── + @app.get("/health") def health(): return {"ok": True} @app.post("/score", response_model=ScoreResponse) -def score(req: ScoreRequest): +def score(req: ScoreRequest) -> ScoreResponse: if not req.candidates: raise HTTPException(status_code=422, detail="No candidates") - # Stub: random uniform scoring — real model slots in here - chosen = random.choice(req.candidates) - return ScoreResponse(tip_id=chosen.id, score=1.0, policy="stub-random") + + A, b = load_state(req.user_id) + try: + A_inv = np.linalg.inv(A) + except np.linalg.LinAlgError: + A_inv = np.identity(D, dtype=np.float64) + + theta = A_inv @ b + + best_id = None + best_score = -float("inf") + + for candidate in req.candidates: + feat_dict = { + "hour_of_day": req.context.hour_of_day, + "is_overdue": candidate.features.is_overdue, + "task_age_days": candidate.features.task_age_days, + "priority": candidate.features.priority, + } + x = build_feature_vector(feat_dict) + exploit = float(theta @ x) + explore = ALPHA * math.sqrt(float(x @ A_inv @ x)) + ucb = exploit + explore + if ucb > best_score: + best_score = ucb + best_id = candidate.id + + return ScoreResponse(tip_id=best_id, score=best_score, policy="linucb-v1") + + +@app.post("/reward", response_model=RewardResponse) +def reward(req: RewardRequest) -> RewardResponse: + A, b = load_state(req.user_id) + feat_dict = { + "hour_of_day": req.features.hour_of_day, + "is_overdue": req.features.is_overdue, + "task_age_days": req.features.task_age_days, + "priority": req.features.priority, + } + x = build_feature_vector(feat_dict) + A += np.outer(x, x) + b += req.reward * x + save_state(req.user_id, A, b) + return RewardResponse(ok=True) diff --git a/ml/serving/package.json b/ml/serving/package.json index 27a6960..8f6564a 100644 --- a/ml/serving/package.json +++ b/ml/serving/package.json @@ -3,7 +3,7 @@ "version": "0.0.0", "private": true, "scripts": { - "dev": "uvicorn main:app --reload --port 8000", - "start": "uvicorn main:app --port 8000" + "dev": ".venv/bin/uvicorn main:app --reload --port 8000", + "start": ".venv/bin/uvicorn main:app --port 8000" } } diff --git a/ml/serving/requirements.txt b/ml/serving/requirements.txt index cff403a..3bd7f2f 100644 --- a/ml/serving/requirements.txt +++ b/ml/serving/requirements.txt @@ -1,3 +1,4 @@ fastapi==0.115.6 uvicorn[standard]==0.32.1 pydantic==2.10.4 +numpy>=1.26.0 diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2ef21b8..c76a88b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -89,6 +89,9 @@ importers: openid-client: specifier: ^6.3.4 version: 6.8.3 + web-push: + specifier: ^3.6.7 + version: 3.6.7 zod: specifier: ^3.24.1 version: 3.25.76 @@ -108,6 +111,9 @@ importers: '@types/express-session': specifier: ^1.18.1 version: 1.18.2 + '@types/web-push': + specifier: ^3.6.4 + version: 3.6.4 drizzle-kit: specifier: ^0.30.4 version: 0.30.6 @@ -856,13 +862,23 @@ packages: '@types/serve-static@2.2.0': resolution: {integrity: sha512-8mam4H1NHLtu7nmtalF7eyBH14QyOASmcxHhSfEoRyr0nP/YdoesEtU+uSRvMe96TW/HPTtkoKqQLl53N7UXMQ==} + '@types/web-push@3.6.4': + resolution: {integrity: sha512-GnJmSr40H3RAnj0s34FNTcJi1hmWFV5KXugE0mYWnYhgTAHLJ/dJKAwDmvPJYMke0RplY2XE9LnM4hqSqKIjhQ==} + accepts@1.3.8: resolution: {integrity: sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw==} engines: {node: '>= 0.6'} + agent-base@7.1.4: + resolution: {integrity: sha512-MnA+YT8fwfJPgBx3m60MNqakm30XOkyIoH1y6huTQvC0PwZG7ki8NacLBcrPbNoo8vEZy7Jpuk7+jMO+CUovTQ==} + engines: {node: '>= 14'} + array-flatten@1.1.1: resolution: {integrity: sha512-PCVAQswWemu6UdxsDFFX/+gVeYqKAod3D3UVm91jHwynguOwAvYPhx8nNlM++NqRcK6CxxpUafjmhIdKiHibqg==} + asn1.js@5.4.1: + resolution: {integrity: sha512-+I//4cYPccV8LdmBLiX8CYvf9Sp3vQsrqu2QNXRcrbiWvcx/UdlFiqUJJzxRQxgsZmvhXhn4cSKeSmoFjVdupA==} + base64-js@1.5.1: resolution: {integrity: sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==} @@ -875,10 +891,16 @@ packages: bl@4.1.0: resolution: {integrity: sha512-1W07cM9gS6DcLperZfFSj+bWLtaPGSOHWhPiGzXmvVJbRLdG82sH/Kn8EtW1VqWVA54AKf2h5k5BbnIbwF3h6w==} + bn.js@4.12.3: + resolution: {integrity: sha512-fGTi3gxV/23FTYdAoUtLYp6qySe2KE3teyZitipKNRuVYcBkoP/bB3guXN/XVKUe9mxCHXnc9C4ocyz8OmgN0g==} + body-parser@1.20.4: resolution: {integrity: sha512-ZTgYYLMOXY9qKU/57FAo8F+HA2dGX7bqGc71txDRC1rS4frdFI5R7NhluHxH6M0YItAP0sHB4uqAOcYKxO6uGA==} engines: {node: '>= 0.8', npm: 1.2.8000 || >= 1.4.16} + buffer-equal-constant-time@1.0.1: + resolution: {integrity: sha512-zRpUiDwd/xk6ADqPMATG8vc9VPrkck7T07OIx0gnjmJAnHnTVXNQG3vfvWNuiZIkwu9KrKdA1iJKfsfTVxE6NA==} + buffer-from@1.1.2: resolution: {integrity: sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==} @@ -1080,6 +1102,9 @@ packages: resolution: {integrity: sha512-KIN/nDJBQRcXw0MLVhZE9iQHmG68qAVIBg9CqmUYjmQIhgij9U5MFvrqkUL5FbtyyzZuOeOt0zdeRe4UY7ct+A==} engines: {node: '>= 0.4'} + ecdsa-sig-formatter@1.0.11: + resolution: {integrity: sha512-nagl3RYrbNv6kQkeJIpt6NJZy8twLB/2vtz6yN9Z4vRKHN4/QZJIEbqohALSgwKdnksuY3k5Addp5lg8sVoVcQ==} + ee-first@1.1.1: resolution: {integrity: sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow==} @@ -1214,6 +1239,14 @@ packages: resolution: {integrity: sha512-4FbRdAX+bSdmo4AUFuS0WNiPz8NgFt+r8ThgNWmlrjQjt1Q7ZR9+zTlce2859x4KSXrwIsaeTqDoKQmtP8pLmQ==} engines: {node: '>= 0.8'} + http_ece@1.2.0: + resolution: {integrity: sha512-JrF8SSLVmcvc5NducxgyOrKXe3EsyHMgBFgSaIUGmArKe+rwr0uphRkRXvwiom3I+fpIfoItveHrfudL8/rxuA==} + engines: {node: '>=16'} + + https-proxy-agent@7.0.6: + resolution: {integrity: sha512-vK9P5/iUfdl95AI+JVyUuIcVtd4ofvtrOr3HNtM2yxC9bnMbEdp3x01OhQNnjb8IJYi38VlTE3mBXwcfvywuSw==} + engines: {node: '>= 14'} + iconv-lite@0.4.24: resolution: {integrity: sha512-v3MXnZAcvnywkTUEZomIActle7RXXeedOR31wwl7VlyoXO4Qi9arvSenNQWne1TcRwhCL1HwLI21bEqdpj8/rA==} engines: {node: '>=0.10.0'} @@ -1238,6 +1271,12 @@ packages: jose@6.2.2: resolution: {integrity: sha512-d7kPDd34KO/YnzaDOlikGpOurfF0ByC2sEV4cANCtdqLlTfBlw2p14O/5d/zv40gJPbIQxfES3nSx1/oYNyuZQ==} + jwa@2.0.1: + resolution: {integrity: sha512-hRF04fqJIP8Abbkq5NKGN0Bbr3JxlQ+qhZufXVr0DvujKy93ZCbXZMHDL4EOtodSbCWxOqR8MS1tXA5hwqCXDg==} + + jws@4.0.1: + resolution: {integrity: sha512-EKI/M/yqPncGUUh44xz0PxSidXFr/+r0pA70+gIYhjv+et7yxM+s29Y+VGDkovRofQem0fs7Uvf4+YmAdyRduA==} + math-intrinsics@1.1.0: resolution: {integrity: sha512-/IXtbwEk5HTPyEwyKX6hGkYXxM9nbj64B+ilVJnC/R6B0pH5G4V3b0pVbL7DBj4tkhBAppbQUlf6F6Xl9LHu1g==} engines: {node: '>= 0.4'} @@ -1270,6 +1309,9 @@ packages: resolution: {integrity: sha512-z0yWI+4FDrrweS8Zmt4Ej5HdJmky15+L2e6Wgn3+iK5fWzb6T3fhNFq2+MeTRb064c6Wr4N/wv0DzQTjNzHNGQ==} engines: {node: '>=10'} + minimalistic-assert@1.0.1: + resolution: {integrity: sha512-UtJcAD4yEaGtjPezWuO9wC4nwUnVH/8/Im3yEHQP4b67cXlD/Qr9hdITCU1xDbSEXg2XKNaP8jsReV7vQd00/A==} + minimist@1.2.8: resolution: {integrity: sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA==} @@ -1568,6 +1610,11 @@ packages: resolution: {integrity: sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==} engines: {node: '>= 0.8'} + web-push@3.6.7: + resolution: {integrity: sha512-OpiIUe8cuGjrj3mMBFWY+e4MMIkW3SVT+7vEIjvD9kejGUypv8GPDf84JdPWskK8zMRIJ6xYGm+Kxr8YkPyA0A==} + engines: {node: '>= 16'} + hasBin: true + web-streams-polyfill@3.3.3: resolution: {integrity: sha512-d2JWLCivmZYTSIoge9MsgFCZrt571BikcWGYkjC1khllbTeDlGqZ2D8vD8E/lJa8WGWbb7Plm8/XJYV7IJHZZw==} engines: {node: '>= 8'} @@ -2027,13 +2074,26 @@ snapshots: '@types/http-errors': 2.0.5 '@types/node': 22.19.17 + '@types/web-push@3.6.4': + dependencies: + '@types/node': 22.19.17 + accepts@1.3.8: dependencies: mime-types: 2.1.35 negotiator: 0.6.3 + agent-base@7.1.4: {} + array-flatten@1.1.1: {} + asn1.js@5.4.1: + dependencies: + bn.js: 4.12.3 + inherits: 2.0.4 + minimalistic-assert: 1.0.1 + safer-buffer: 2.1.2 + base64-js@1.5.1: {} better-sqlite3@11.10.0: @@ -2051,6 +2111,8 @@ snapshots: inherits: 2.0.4 readable-stream: 3.6.2 + bn.js@4.12.3: {} + body-parser@1.20.4: dependencies: bytes: 3.1.2 @@ -2068,6 +2130,8 @@ snapshots: transitivePeerDependencies: - supports-color + buffer-equal-constant-time@1.0.1: {} + buffer-from@1.1.2: {} buffer@5.7.1: @@ -2164,6 +2228,10 @@ snapshots: es-errors: 1.3.0 gopd: 1.2.0 + ecdsa-sig-formatter@1.0.11: + dependencies: + safe-buffer: 5.2.1 + ee-first@1.1.1: {} encodeurl@2.0.0: {} @@ -2409,6 +2477,15 @@ snapshots: statuses: 2.0.2 toidentifier: 1.0.1 + http_ece@1.2.0: {} + + https-proxy-agent@7.0.6: + dependencies: + agent-base: 7.1.4 + debug: 4.4.3 + transitivePeerDependencies: + - supports-color + iconv-lite@0.4.24: dependencies: safer-buffer: 2.1.2 @@ -2425,6 +2502,17 @@ snapshots: jose@6.2.2: {} + jwa@2.0.1: + dependencies: + buffer-equal-constant-time: 1.0.1 + ecdsa-sig-formatter: 1.0.11 + safe-buffer: 5.2.1 + + jws@4.0.1: + dependencies: + jwa: 2.0.1 + safe-buffer: 5.2.1 + math-intrinsics@1.1.0: {} media-typer@0.3.0: {} @@ -2443,6 +2531,8 @@ snapshots: mimic-response@3.1.0: {} + minimalistic-assert@1.0.1: {} + minimist@1.2.8: {} mkdirp-classic@0.5.3: {} @@ -2778,6 +2868,16 @@ snapshots: vary@1.1.2: {} + web-push@3.6.7: + dependencies: + asn1.js: 5.4.1 + http_ece: 1.2.0 + https-proxy-agent: 7.0.6 + jws: 4.0.1 + minimist: 1.2.8 + transitivePeerDependencies: + - supports-color + web-streams-polyfill@3.3.3: {} which@4.0.0: diff --git a/services/api/package.json b/services/api/package.json index b575138..608efae 100644 --- a/services/api/package.json +++ b/services/api/package.json @@ -13,26 +13,28 @@ }, "dependencies": { "@oo/shared-types": "workspace:*", + "better-sqlite3": "^11.8.1", + "cookie-parser": "^1.4.7", + "cors": "^2.8.5", + "dotenv": "^16.4.7", + "drizzle-orm": "^0.38.3", "express": "^4.21.2", "express-session": "^1.18.1", - "better-sqlite3": "^11.8.1", - "drizzle-orm": "^0.38.3", - "openid-client": "^6.3.4", - "node-fetch": "^3.3.2", - "dotenv": "^16.4.7", - "zod": "^3.24.1", "nanoid": "^5.1.0", - "cookie-parser": "^1.4.7", - "cors": "^2.8.5" + "node-fetch": "^3.3.2", + "openid-client": "^6.3.4", + "web-push": "^3.6.7", + "zod": "^3.24.1" }, "devDependencies": { - "@types/express": "^5.0.0", - "@types/express-session": "^1.18.1", "@types/better-sqlite3": "^7.6.12", "@types/cookie-parser": "^1.4.8", "@types/cors": "^2.8.17", + "@types/express": "^5.0.0", + "@types/express-session": "^1.18.1", + "@types/web-push": "^3.6.4", + "drizzle-kit": "^0.30.4", "tsx": "^4.19.2", - "typescript": "^5.7.3", - "drizzle-kit": "^0.30.4" + "typescript": "^5.7.3" } } diff --git a/services/api/src/config.ts b/services/api/src/config.ts index 4a12855..46f5795 100644 --- a/services/api/src/config.ts +++ b/services/api/src/config.ts @@ -32,4 +32,8 @@ export const config = { WEB_BASE_URL: optional('WEB_BASE_URL', 'http://localhost:3000'), ML_SERVING_URL: optional('ML_SERVING_URL', 'http://localhost:8000'), + + VAPID_PUBLIC_KEY: optional('VAPID_PUBLIC_KEY', ''), + VAPID_PRIVATE_KEY: optional('VAPID_PRIVATE_KEY', ''), + VAPID_SUBJECT: optional('VAPID_SUBJECT', 'mailto:admin@localhost'), }; diff --git a/services/api/src/db/schema.ts b/services/api/src/db/schema.ts index 2afcf22..1f60721 100644 --- a/services/api/src/db/schema.ts +++ b/services/api/src/db/schema.ts @@ -39,6 +39,15 @@ export const tipViews = sqliteTable('tip_views', { servedAt: text('served_at').notNull(), }); +export const pushSubscriptions = sqliteTable('push_subscriptions', { + id: text('id').primaryKey(), + userId: text('user_id').notNull().references(() => users.id), + endpoint: text('endpoint').notNull().unique(), + p256dh: text('p256dh').notNull(), + auth: text('auth').notNull(), + createdAt: text('created_at').notNull(), +}); + export const sessions = sqliteTable('sessions', { id: text('id').primaryKey(), userId: text('user_id').notNull().references(() => users.id), diff --git a/services/api/src/events/bus.ts b/services/api/src/events/bus.ts new file mode 100644 index 0000000..a943f81 --- /dev/null +++ b/services/api/src/events/bus.ts @@ -0,0 +1,52 @@ +/** + * EventBus — in-process today, NATS JetStream tomorrow. + * + * To swap to NATS: replace the EventEmitter body with a NATS JetStream + * publish call. Subjects and payload shapes are the durable contract. + * + * Subjects follow the pattern: .. + * signals.tip.served — a tip was returned to the client + * signals.tip.feedback — user reacted (done / dismiss / snooze) + * signals.task.synced — Todoist task list refreshed for a user + */ + +import { EventEmitter } from 'events'; + +export type TipServedEvent = { + userId: string; + tipId: string; + policy: string; + servedAt: string; +}; + +export type TipFeedbackEvent = { + userId: string; + tipId: string; + action: 'done' | 'dismiss' | 'snooze'; + reward: number; + createdAt: string; +}; + +export type TaskSyncedEvent = { + userId: string; + count: number; + syncedAt: string; +}; + +type EventMap = { + 'signals.tip.served': TipServedEvent; + 'signals.tip.feedback': TipFeedbackEvent; + 'signals.task.synced': TaskSyncedEvent; +}; + +class Bus extends EventEmitter { + publish(subject: K, payload: EventMap[K]): void { + this.emit(subject, payload); + } + + subscribe(subject: K, handler: (payload: EventMap[K]) => void): void { + this.on(subject, handler); + } +} + +export const bus = new Bus(); diff --git a/services/api/src/index.ts b/services/api/src/index.ts index f127021..3830cfd 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -9,6 +9,7 @@ import { authRouter } from './routes/auth.js'; import { integrationsRouter } from './routes/integrations.js'; import { recommenderRouter } from './routes/recommender.js'; import { userRouter } from './routes/user.js'; +import { pushRouter } from './routes/push.js'; import { mkdir } from 'fs/promises'; import { dirname } from 'path'; @@ -33,6 +34,7 @@ app.use('/api/auth', authRouter); app.use('/api/integrations', integrationsRouter); app.use('/api', recommenderRouter); app.use('/api/user', userRouter); +app.use('/api/push', pushRouter); app.listen(config.PORT, () => { console.log(`oO API listening on http://localhost:${config.PORT}`); diff --git a/services/api/src/routes/push.ts b/services/api/src/routes/push.ts new file mode 100644 index 0000000..1c4840d --- /dev/null +++ b/services/api/src/routes/push.ts @@ -0,0 +1,98 @@ +import { type Router as ExpressRouter, Router, Response } from 'express'; +import webpush from 'web-push'; +import { nanoid } from 'nanoid'; +import { db } from '../db/index.js'; +import { pushSubscriptions } from '../db/schema.js'; +import { eq, and } from 'drizzle-orm'; +import { requireAuth, AuthenticatedRequest } from '../middleware/session.js'; +import { config } from '../config.js'; + +const router: ExpressRouter = Router(); + +if (config.VAPID_PUBLIC_KEY && config.VAPID_PRIVATE_KEY) { + webpush.setVapidDetails( + config.VAPID_SUBJECT, + config.VAPID_PUBLIC_KEY, + config.VAPID_PRIVATE_KEY, + ); +} + +/** GET /api/push/vapid-public-key — client needs this to subscribe */ +router.get('/vapid-public-key', (_req, res: Response) => { + res.json({ key: config.VAPID_PUBLIC_KEY }); +}); + +/** POST /api/push/subscribe — save or refresh a push subscription */ +router.post('/subscribe', requireAuth, async (req: AuthenticatedRequest, res: Response) => { + const { endpoint, keys } = req.body as { + endpoint: string; + keys: { p256dh: string; auth: string }; + }; + + if (!endpoint || !keys?.p256dh || !keys?.auth) { + res.status(400).json({ error: 'Invalid subscription' }); + return; + } + + // Upsert by endpoint + const existing = await db + .select() + .from(pushSubscriptions) + .where(eq(pushSubscriptions.endpoint, endpoint)) + .limit(1); + + if (existing.length) { + await db + .update(pushSubscriptions) + .set({ p256dh: keys.p256dh, auth: keys.auth }) + .where(eq(pushSubscriptions.endpoint, endpoint)); + } else { + await db.insert(pushSubscriptions).values({ + id: nanoid(), + userId: req.userId!, + endpoint, + p256dh: keys.p256dh, + auth: keys.auth, + createdAt: new Date().toISOString(), + }); + } + + res.json({ ok: true }); +}); + +/** DELETE /api/push/subscribe — unsubscribe */ +router.delete('/subscribe', requireAuth, async (req: AuthenticatedRequest, res: Response) => { + const { endpoint } = req.body as { endpoint: string }; + if (endpoint) { + await db + .delete(pushSubscriptions) + .where( + and( + eq(pushSubscriptions.userId, req.userId!), + eq(pushSubscriptions.endpoint, endpoint), + ), + ); + } + res.json({ ok: true }); +}); + +/** Send a push notification to a user — called internally */ +export async function sendPushToUser(userId: string, payload: { title: string; body: string; url?: string }) { + if (!config.VAPID_PUBLIC_KEY) return; + + const subs = await db + .select() + .from(pushSubscriptions) + .where(eq(pushSubscriptions.userId, userId)); + + await Promise.allSettled( + subs.map((sub) => + webpush.sendNotification( + { endpoint: sub.endpoint, keys: { p256dh: sub.p256dh, auth: sub.auth } }, + JSON.stringify(payload), + ).catch(() => {}), + ), + ); +} + +export { router as pushRouter }; diff --git a/services/api/src/routes/recommender.ts b/services/api/src/routes/recommender.ts index 43ec2bf..d14340f 100644 --- a/services/api/src/routes/recommender.ts +++ b/services/api/src/routes/recommender.ts @@ -4,44 +4,109 @@ import { db } from '../db/index.js'; import { integrationTokens, tipFeedback, tipViews } from '../db/schema.js'; import { eq, and } from 'drizzle-orm'; import { requireAuth, AuthenticatedRequest } from '../middleware/session.js'; +import { config } from '../config.js'; +import { bus } from '../events/bus.js'; import type { Tip } from '@oo/shared-types'; const router: ExpressRouter = Router(); -const CACHE_TTL_MS = 30_000; // 30 seconds -const taskCache = new Map(); +const CACHE_TTL_MS = 30_000; -/** Fetch active Todoist tasks, with a 30s in-memory cache per user */ -async function fetchTodoistTasks(userId: string, accessToken: string): Promise { +interface TaskFeatures { + is_overdue: boolean; + task_age_days: number; + priority: number; +} + +interface CachedTask extends Tip { + features: TaskFeatures; +} + +const taskCache = new Map(); + +/** Parse a Todoist due date string into age in days (relative to now) */ +function dueAgeDays(due: { date?: string; datetime?: string } | null | undefined): number { + if (!due) return 0; + const dateStr = due.datetime ?? due.date; + if (!dateStr) return 0; + const dueMs = new Date(dateStr).getTime(); + return Math.max(0, (Date.now() - dueMs) / (1000 * 60 * 60 * 24)); +} + +async function fetchTodoistTasks(userId: string, accessToken: string): Promise { const cached = taskCache.get(userId); - if (cached && Date.now() - cached.fetchedAt < CACHE_TTL_MS) { - return cached.tips; - } + if (cached && Date.now() - cached.fetchedAt < CACHE_TTL_MS) return cached.tasks; const res = await fetch('https://api.todoist.com/api/v1/tasks?filter=today%7Coverdue', { headers: { Authorization: `Bearer ${accessToken}` }, }); - if (!res.ok) return cached?.tips ?? []; + if (!res.ok) return cached?.tasks ?? []; - const body = (await res.json()) as { results: Array<{ id: string; content: string }> }; - const tips: Tip[] = (body.results ?? []).map((t) => ({ - id: `todoist:${t.id}`, - content: t.content, - source: 'todoist' as const, - sourceId: t.id, - createdAt: new Date().toISOString(), - })); + const body = (await res.json()) as { + results: Array<{ + id: string; + content: string; + priority: number; + due: { date?: string; datetime?: string; is_recurring?: boolean } | null; + }>; + }; - taskCache.set(userId, { tips, fetchedAt: Date.now() }); - return tips; + const now = new Date(); + const tasks: CachedTask[] = (body.results ?? []).map((t) => { + const ageDays = dueAgeDays(t.due); + const isOverdue = ageDays > 0; + return { + id: `todoist:${t.id}`, + content: t.content, + source: 'todoist' as const, + sourceId: t.id, + createdAt: now.toISOString(), + features: { + is_overdue: isOverdue, + task_age_days: ageDays, + priority: t.priority ?? 1, + }, + }; + }); + + taskCache.set(userId, { tasks, fetchedAt: Date.now() }); + return tasks; } -/** - * RandomPolicy — picks one task at random from the candidate set. - * Contract: same interface the ML scorer will implement. - */ -function randomPolicy(candidates: Tip[]): Tip | null { +/** Call ml/serving for scored selection; returns tip_id or null on failure */ +async function remotePolicy(userId: string, tasks: CachedTask[]): Promise { + const hour = new Date().getHours(); + const dayOfWeek = new Date().getDay(); + + const body = { + user_id: userId, + candidates: tasks.map((t) => ({ + id: t.id, + content: t.content, + source: t.source, + source_id: t.sourceId ?? null, + features: t.features, + })), + context: { hour_of_day: hour, day_of_week: dayOfWeek }, + }; + + try { + const res = await fetch(`${config.ML_SERVING_URL}/score`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + signal: AbortSignal.timeout(3000), + }); + if (!res.ok) return null; + const { tip_id } = (await res.json()) as { tip_id: string }; + return tip_id; + } catch { + return null; + } +} + +function randomPolicy(candidates: CachedTask[]): CachedTask | null { if (!candidates.length) return null; return candidates[Math.floor(Math.random() * candidates.length)]; } @@ -51,12 +116,7 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re const [token] = await db .select() .from(integrationTokens) - .where( - and( - eq(integrationTokens.userId, req.userId!), - eq(integrationTokens.provider, 'todoist'), - ), - ) + .where(and(eq(integrationTokens.userId, req.userId!), eq(integrationTokens.provider, 'todoist'))) .limit(1); if (!token) { @@ -64,20 +124,31 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re return; } - const candidates = await fetchTodoistTasks(req.userId!, token.accessToken); - const tip = randomPolicy(candidates); + const tasks = await fetchTodoistTasks(req.userId!, token.accessToken); + if (!tasks.length) { + res.status(204).end(); + return; + } + + // RemotePolicy with RandomPolicy fallback + const scoredId = await remotePolicy(req.userId!, tasks); + const tip = scoredId + ? (tasks.find((t) => t.id === scoredId) ?? randomPolicy(tasks)) + : randomPolicy(tasks); if (!tip) { res.status(204).end(); return; } - // Record metric: tip served - await db.insert(tipViews).values({ - id: nanoid(), + const servedAt = new Date().toISOString(); + await db.insert(tipViews).values({ id: nanoid(), userId: req.userId!, tipId: tip.id, servedAt }); + + bus.publish('signals.tip.served', { userId: req.userId!, tipId: tip.id, - servedAt: new Date().toISOString(), + policy: scoredId ? 'linucb-v1' : 'random', + servedAt, }); res.json({ tip }); @@ -102,27 +173,44 @@ router.post('/tip/:id/feedback', requireAuth, async (req: AuthenticatedRequest, createdAt: new Date().toISOString(), }); - // Invalidate cache so next recommend fetches fresh tasks + // Capture task features before clearing cache + const reward = action === 'done' ? 1.0 : action === 'dismiss' ? -1.0 : 0.0; + const task = taskCache.get(req.userId!)?.tasks.find((t) => t.id === tipId); taskCache.delete(req.userId!); - // If done, mark complete in Todoist + bus.publish('signals.tip.feedback', { + userId: req.userId!, + tipId, + action: action as 'done' | 'dismiss' | 'snooze', + reward, + createdAt: new Date().toISOString(), + }); + if (task) { + fetch(`${config.ML_SERVING_URL}/reward`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + user_id: req.userId!, + tip_id: tipId, + reward, + features: task.features, + }), + }).catch(() => {}); + } + + // Mark complete in Todoist if done if (action === 'done' && tipId.startsWith('todoist:')) { const todoistId = tipId.slice(8); - const [token] = await db + const [tok] = await db .select() .from(integrationTokens) - .where( - and( - eq(integrationTokens.userId, req.userId!), - eq(integrationTokens.provider, 'todoist'), - ), - ) + .where(and(eq(integrationTokens.userId, req.userId!), eq(integrationTokens.provider, 'todoist'))) .limit(1); - if (token) { + if (tok) { await fetch(`https://api.todoist.com/api/v1/tasks/${todoistId}/close`, { method: 'POST', - headers: { Authorization: `Bearer ${token.accessToken}` }, + headers: { Authorization: `Bearer ${tok.accessToken}` }, }).catch(() => {}); } }