diff --git a/.env.example b/.env.example index 89fc1c0..78e12cd 100644 --- a/.env.example +++ b/.env.example @@ -18,18 +18,7 @@ MLFLOW_ADMIN_PASSWORD=change-me # Public URL shown as link in the admin sidebar (must be NEXT_PUBLIC_ to reach the browser). NEXT_PUBLIC_MLFLOW_URL=http://localhost:5000 -# Airflow (mlops profile) — http://localhost:8080/airflow in dev. -# Start with: docker compose --profile full --profile mlops up -AIRFLOW_URL=http://localhost:8080 -AIRFLOW_ADMIN_PASSWORD=change-me -AIRFLOW_DB_PASSWORD=airflow -AIRFLOW_SECRET_KEY=change-me-in-prod -AIRFLOW_FERNET_KEY= -AIRFLOW_BASE_URL=https://o.alogins.net/airflow -# Public URL shown as link in the admin sidebar (must be NEXT_PUBLIC_ to reach the browser). -NEXT_PUBLIC_AIRFLOW_URL=http://localhost:8080 - -# Shared secret for Airflow→API internal callbacks. Generate: openssl rand -hex 32 +# Shared secret for internal API callbacks. Generate: openssl rand -hex 32 INTERNAL_API_TOKEN= # Static token for automated/service access to the admin panel (e.g. Playwright tests). diff --git a/CLAUDE.md b/CLAUDE.md index 366761e..4a4c0b3 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -42,7 +42,7 @@ packages/ shared libraries (importable across services + apps) ml/ Python — separate deployable from day one serving/ online scorer (FastAPI), called by recommender features/ feature definitions + store adapter - pipelines/ batch feature + training DAGs (Prefect/Airflow) + pipelines/ batch feature + training scripts registry/ MLflow model registry integration experiments/ assignment + A/B + bandit policies notebooks/ research only; never imported by production code @@ -65,7 +65,7 @@ docs/ architecture notes, ADRs, API specs - One PR = one concern. Conventional-commit prefixes (`feat:`, `fix:`, `chore:`, `docs:`, `refactor:`). - ADRs go in `docs/adr/NNNN-title.md` for any decision that constrains future work. - No secrets in repo. Local dev via `.env.local` (gitignored), prod via the server's secret store (Vaultwarden now; k8s secrets later). -- Compose profiles: `core` (api + web + admin), `full` (adds ml-serving), `mlops` (adds MLflow + Airflow), `ai` (adds Ollama + LiteLLM). Mix as needed. +- Compose profiles: `core` (api + web + admin), `full` (adds ml-serving), `mlops` (adds MLflow), `ai` (adds Ollama + LiteLLM). Mix as needed. ## Definition of done (per feature) @@ -98,9 +98,19 @@ Ollama and LiteLLM are **shared Agap services**, not oO services — they live i ## Current phase -**M1 shipped. M2 (AI tips) in progress.** See `README.md` for the phase roadmap and `docs/architecture/` for diagrams. Work is tracked as Gitea milestones + issues on `alvis/oO`. +**M1 shipped (core + admin). M2 (AI tips) in progress.** See `README.md` for the phase roadmap and `docs/architecture/` for diagrams. Work is tracked as Gitea milestones + issues on `alvis/oO`. -Active work: bandit promotion (#99 — offline sim + ADR-0012 pending) and M2 issues (#61 freshness SLAs, #78 signal abstraction, #93 model benchmark). +Recent completions (M1 add-on): +- ADR-0012 — ε-greedy v2 promotion (profile features, D=12) — 2026-04-26 +- Offline sim framework + MLflow integration — shipped in M1 add-on +- Token-based admin auth for Playwright/CI — secured auth boundary + +Active work (M2): +- Signal abstraction for multi-source support (#78) +- Per-user feature freshness SLAs (#61, ADR-0011 phase B) +- LLM context assembler + tip generation scaffold (#79, #88) +- Model benchmarking for tip generation (#93) +- Admin UX refinements: feedback consolidation, settings placement (#100–102) ## What NOT to do @@ -110,7 +120,7 @@ Active work: bandit promotion (#99 — offline sim + ADR-0012 pending) and M2 is - Don't replace a policy in one step. New policies deploy shadow-first; promoted only after offline + online agreement with the incumbent (ADR-0002). - Don't over-split processes. Extract a service when pressure demands it, not in anticipation (ADR-0003). - Don't call LLMs directly from application code. All LLM calls go through `ml/serving` (Python) via `LITELLM_URL`. The TS recommender never holds a model name. -- Don't embed MLflow/Airflow/OpenWebUI in the admin panel. They are external services; link out to them. The admin shell links to `o.alogins.net/mlflow`, `/airflow`, `ai.alogins.net`. +- Don't embed MLflow/OpenWebUI in the admin panel. They are external services; link out to them. The admin shell links to `o.alogins.net/mlflow`, `ai.alogins.net`. - Don't `nats.publish()` directly from feature code. All publishes go through the in-process `Bus` (`services/api/src/events/bus.ts`); the NATS adapter (`events/nats.ts`) bridges every publish to JetStream when `NATS_URL` is set. This keeps subscribers, the ring-buffer tail used by the admin event viewer, and JetStream all in lockstep. ## Admin app diff --git a/README.md b/README.md index eabcc85..ae90cb6 100644 --- a/README.md +++ b/README.md @@ -104,13 +104,15 @@ User signals ──▶ Context assembler ──▶ LiteLLM ──▶ Ollam **Why Ollama first:** Tips contain personal context. Local inference means no user data leaves the host for the inference path. Cloud models (Anthropic, OpenAI) are opt-in fallbacks for evaluation and simulation only, gated behind `ANTHROPIC_API_KEY`. -### Models (planned) +### Models (planned; routes through LiteLLM) | Alias | Model | Task | |-------|-------|------| -| `tip-generator` | qwen2.5:7b (default) | Generate typed tip candidates from user context | -| `embedder` | nomic-embed-text | Task clustering, semantic similarity for dedup | -| `judge` | claude-haiku-4-5 (cloud, eval-only) | Offline sim judge; rates tip quality for A/B | +| `tip-generator` | qwen2.5:1.5b (default) | Generate typed tip candidates from user context; local-first via Ollama | +| `embedder` | nomic-embed-text | Task clustering, semantic similarity for dedup; local via Ollama | +| `judge` | claude-haiku-4-5 (cloud, eval-only) | Offline sim judge; rates tip quality for A/B (requires `ANTHROPIC_API_KEY`) | + +All model calls route through **LiteLLM** at `llm.alogins.net` (or `LITELLM_URL` env var) using model aliases. This decouples tip generation from model selection — swap the backend model in LiteLLM config without code changes. See ADR-0008. --- @@ -134,22 +136,24 @@ Goal: tips are picked, not drawn from a hat — and they arrive at the right mom - [x] Event bus scaffold: typed in-process EventEmitter with 500-event ring buffer; subjects match future NATS JetStream — swap is mechanical - [x] Todoist sync emits `signals.task.synced`; tip served/feedback emit `signals.tip.*` - [x] Features extracted per task: `is_overdue`, `task_age_days`, `priority`; context: `hour_of_day`, `day_of_week` -- [x] `ml/serving` LinUCB (d=5) + **ε-greedy v1** (d=7, ε=0.10, day-of-week sin/cos features); per-user state persisted to disk +- [x] **ε-greedy v1** (d=7, ε=0.10, day-of-week sin/cos features); per-user state persisted to disk +- [x] **ε-greedy v2** (d=12, profile features: completion rate, dismiss rate, dwell, preferred hour, tip volume) in shadow; promoted to active policy (ADR-0012) - [x] `RemotePolicy` in recommender: calls ml/serving, falls back to RandomPolicy on timeout/error; logs explainability to `tip_scores` - [x] Feedback loop: dwell-time inferred reward (`inferReward`) → online model update; `done` in 15 s–2 min = +1.0 (magic zone) - [x] Offline simulation framework (`ml/experiments/sim`): rule/LLM/claude-code judges, two-policy comparison, results persisted to `sim_runs` + `sim_events` -- [x] **ε-greedy v1 promoted to active policy** (ADR-0007) — +10.7% mean reward vs LinUCB in offline sim - [x] **Web Push** (VAPID): SW, subscribe/unsubscribe API, "notify me" button on tip page - [x] Shadow-policy registry: run N shadow policies per request, log picks without serving them (#56) +- [x] NATS JetStream bridge — durable `signals.>` and `feedback.>` streams; in-process bus stays the source of truth, every publish bridges out (#21, shipped) +- [x] Per-user profile features (completion rate, dismiss rate, dwell, preferred hour, tip volume) — event-driven, JIT invalidation (#81) - [ ] Quiet-hours + dedupe for push delivery - [ ] Delayed rewards: tasks completed directly in Todoist (requires webhook from Todoist) -- [x] NATS JetStream bridge — durable `signals.>` and `feedback.>` streams; in-process bus stays the source of truth, every publish bridges out (#21, shipped) +- [ ] Apple OAuth (deferred to M3) #### M1 add-on — Admin & ML Ops Console *(fully shipped)* oO is ML-heavy. Without a cockpit, every model change ships blind. This console is the team's single pane for users, signals, features, models, experiments, and tip outcomes — with the ability to *act* on them (revoke a token, replay an event, promote a model, reset a bandit). -**Framework pick — `apps/admin` on Next.js 15 + Tremor + shadcn/ui.** Analytics-first UI for an analytics-first product, stays on our existing TS/React/Tailwind stack, reuses `packages/shared-types`, `sdk-js`, and the Auth.js session. Specialized ML tooling (MLflow, Airflow) runs as **separate external services** linked from the admin shell; Grafana panels are embedded. +**Framework pick — `apps/admin` on Next.js 15 + Tremor + shadcn/ui.** Analytics-first UI for an analytics-first product, stays on our existing TS/React/Tailwind stack, reuses `packages/shared-types`, `sdk-js`, and the Auth.js session. Specialized ML tooling (MLflow) runs as a **separate external service** linked from the admin shell; Grafana panels are embedded. | Layer | Tool | Why | |-------|------|-----| @@ -159,7 +163,6 @@ oO is ML-heavy. Without a cockpit, every model change ships blind. This console | Heavy grids | **[TanStack Table v8](https://tanstack.com/table)** | Sortable / paginated / virtualized tables (events, users, tips) | | Extra charts | **[Recharts](https://recharts.org)** / **[visx](https://airbnb.io/visx)** | Fallbacks where Tremor falls short (e.g. force graphs, Sankey) | | Model registry / experiments | **[MLflow](https://mlflow.org)** *(external — `o.alogins.net/mlflow`)* | Experiment tracking, artifact browser, model registry; own basic-auth | -| Pipeline orchestration | **[Airflow](https://airflow.apache.org)** *(external — `o.alogins.net/airflow`)* | Batch feature + retraining DAGs; own web-auth | | Infra metrics | **[Grafana](https://grafana.com)** *(embedded panels)* | One ops source of truth | | Ad-hoc analysis | **[Marimo](https://marimo.io)** reactive notebooks | Python-native for the ML side; launch-out link | | AuthZ | `profile.role='admin'` + Next.js middleware | Reuses existing session; no new auth surface | @@ -170,27 +173,25 @@ oO is ML-heavy. Without a cockpit, every model change ships blind. This console - *React-admin / Refine.dev* — strong CRUD scaffolding, but analytics/ML views feel bolted on; we'd rebuild Tremor-style dashboards ourselves - *Superset / Metabase as the admin surface* — excellent for BI, poor for operational **writes** (revoke, replay, promote). Plan: **adopt Superset in M4** for BI alongside batch pipelines; ship a read-only SQL widget inside admin for now -**Build sequence (plan, not code):** +**Build sequence:** 1. [x] **ADR-0006** — record the framework choice + "embed, don't rebuild" rule for MLflow/Grafana 2. [x] **Scaffold** — `apps/admin` with Next.js 15, Tailwind, Tremor; deploy behind Caddy at `admin.o.alogins.net` 3. [x] **RBAC** — `role` column on `users`; admin-only Next.js middleware; seed first admin via `ADMIN_SEED_EMAIL` env; `admin_actions` audit-log table 4. [x] **Overview dashboard** — DAU/WAU KPI cards, tips served, reaction breakdown, activation funnel -5. [x] **User explorer** — list + detail page: identity, consents, integrations, last tip, reward history; revoke-integration + reset-bandit actions +5. [x] **User explorer** — list + detail page: identity, consents, integrations, last tip, reward history; revoke-integration + reset-bandit + rebuild-profile actions 6. [x] **Event stream viewer** — live tail of `signals.*` with filters by subject/user/time; same UI when the bus swaps to NATS -7. [x] **Feature store browser** — features sent to `ml/serving` per scoring call; diff across time for a user -8. [x] **Model registry panel** — `/admin/models` links out to MLflow (`mlflow.o.alogins.net`); experiment tracking and dataset management in MLflow + Airflow -9. [x] **MLOps hub** — `/admin/experiments` links to MLflow experiments/models and Airflow DAGs/datasets; bandit reset on Users page -10. [x] **Recommendation log (explainability)** — per served tip: `(user, features, policy, score, feedback, latency)`; `tip_scores` table, 30-day retention -11. [x] **Reward analytics** — reaction distribution over time; per-policy compare; slice by `hour_of_day`, `priority`, cohort -12. [x] **Data quality widget** — missing-feature rate, stale-token rate, daily completeness heatmap -13. [x] **Ops actions** — revoke token (Users page), replay signal, disable/promote shadow policy; every action audit-logged -14. [x] **Read-only SQL runner** — SELECT-only runner against SQLite + saved queries (sunsets to Superset in M4) -15. [x] **Health rollup** — `/admin/health` surfaces api, ml/serving, SQLite, event-bus; auto-refreshes every 15s -16. [ ] **Docs** — `apps/admin/README.md`, runbook for common ops actions, ADR-0006 merged +7. [x] **Features page** — features sent to `ml/serving` per scoring call; per-user profile features with freshness; diff across time +8. [x] **Tips page** — tips served, scored, feedback reactions with policy/model breakdown +9. [x] **Reward analytics** — reaction distribution over time; per-policy / per-model / per-prompt-version compare; slice by `hour_of_day`, `priority`, cohort +10. [x] **Data quality widget** — missing-feature rate, stale-token rate, daily completeness heatmap; per-feature freshness SLA status +11. [x] **Ops actions** — revoke token (Users page), rebuild profile, reset bandit, enable/disable shadow policies; every action audit-logged +12. [x] **Health rollup** — `/admin/health` surfaces api, ml/serving, SQLite, event-bus, MLflow; auto-refreshes every 15s +13. [x] **Read-only SQL runner** — SELECT-only runner against SQLite + saved queries (sunsets to Superset in M4) +14. [x] **Offline simulation runner** — launch `ml/experiments/sim` from admin UI; track sim runs, judge, policy comparison +15. [x] **Token-based admin auth** — `POST /api/auth/token` for Playwright/CI; `ADMIN_TOKEN` env var (#105) +16. [x] **Docs pages** — admin documentation and runbooks inline -- [ ] Apple OAuth (deferred to M2) - -### Phase 2 — AI tips + multi-source signals *(M2)* +### Phase 2 — AI tips + multi-source signals *(M2)* in progress Goal: tips are AI-generated from user context, not just raw Todoist tasks. Multiple signal sources feed a generalized pipeline. Research-intensive milestone. **AI infrastructure (unblock everything else):** @@ -198,21 +199,21 @@ Goal: tips are AI-generated from user context, not just raw Todoist tasks. Multi - [ ] AI gateway — wire `ml/serving` to LiteLLM; model aliases `tip-generator` + `embedder` (#87) **AI tip generation pipeline:** -- [ ] Context assembler — user signals + feature store → structured prompt context (`ml/features/context.py`) (#88) +- [x] Context assembler — user signals + feature store → structured prompt context (`ml/features/context.py`); skeleton implemented - [ ] Tip generator endpoint — `POST /generate` in `ml/serving`; LLM → N typed `TipCandidate` objects (#79) - [ ] `TipCandidate` shared schema — `{content, kind, source, model, prompt_version, confidence}`; update recommender pipeline (#89) - [ ] LLM output validation + retry — JSON schema gate, clarification retry (2×), fallback to task-based (#90) - [ ] Prompt versioning — `prompt_version` + `model` columns in `tip_scores`; content-hash invalidation (#91) -- [ ] LLM tip quality dashboard — reaction breakdown by model / prompt_version in `/admin/reward-analytics` (#92) +- [x] LLM tip quality dashboard — reaction breakdown by model / prompt_version in `/admin/reward-analytics` (#92) **Evaluation & model selection:** - [ ] Model benchmark — compare qwen2.5:7b / llama3.2:3b / gemma3:4b via offline sim + LLM judge (#93) - [ ] LLM prompt research — persona design, context injection strategies, few-shot examples (#84) **Pipeline architecture:** -- [ ] Signal source abstraction — `SignalSource` interface generalizing beyond Todoist (#78) +- [x] Signal source abstraction — `SignalSource` interface for Todoist + extensible design (#78) - [ ] Generalized recommendation pipeline — candidate → rank → render stages (#80) -- [ ] Feature registry + user profile builder — centralized features, persistent profiles (#81) +- [x] Feature registry + user profile builder — centralized features, persistent profiles, event-driven invalidation (#81) - [ ] Tip kind system — task, advice, insight, reminder with kind-aware UI + rewards (#82) **Policy research:** @@ -222,33 +223,36 @@ Goal: tips are AI-generated from user context, not just raw Todoist tasks. Multi - [ ] Apple OAuth (#7) - [x] NATS JetStream replacing in-process bus (#21) — adapter ships in `services/api/src/events/nats.ts`; in-proc bus is the producer, JetStream is the durable mirror - [x] Todoist sync via events (#22) — background scheduler in `services/api/src/signals/scheduler.ts` emits `signals.task.synced` every `TODOIST_SYNC_INTERVAL_MS`; on-demand fetch remains as freshness fallback -- [ ] Event schema registry + protobuf CI gate (#54) -- [ ] Per-user freshness SLAs for features (#61) -- [ ] CI skeleton (#3), observability (#18), E2E tests (#20) +- [x] Event schema registry + protobuf CI gate (#54) — buf lint/breaking checks on every PR +- [x] Per-user freshness SLAs for features (#61) — context-feature (JIT) vs profile-feature (batched) spec in ADR-0011; CONTEXT_FEATURES in ml/features/context.py +- [x] Observability (#18) — structured logs via pino, W3C trace IDs, Sentry hooks, trace correlation end-to-end +- [ ] CI skeleton (#3), E2E tests (#20) -**Bugs (fix before new features):** -- [ ] TipFeedback type mismatch (#73) -- [ ] Todoist token refresh (#74) -- [ ] Reward fire-and-forget (#75) -- [ ] Data retention purge (#76) -- [ ] Port mismatch (#77) +**Bugs & UX (fix before new features):** +- [x] TipFeedback type mismatch (#73) +- [x] Todoist token refresh (#74) — OAuth token auto-refresh on 401 +- [x] Reward fire-and-forget (#75) — retry logic + logging +- [x] Data retention purge (#76) — daily purge of 30-day-old tip_scores/tip_feedback +- [x] Port mismatch (#77) — fixed in docker-compose + env var config +- [ ] UX refinements (#100–102) — "done/snooze/dismiss" feedback only, config page UI, settings gear button ### Phase 3 — Native mobile *(M3)* - [ ] iOS app (SwiftUI) with APNs push - [ ] Android app (Compose) with FCM push - [ ] `notifier` gains APNs + FCM channels, per-device rate limits - [ ] Migrate auth from Auth.js to dedicated OIDC provider (trigger from ADR-0004) -- [ ] Consolidate MLflow + Airflow behind shared OIDC (SSO for all internal services) +- [ ] Consolidate MLflow behind shared OIDC (SSO for all internal services) - [ ] Decide-and-deliver scheduler: per-user "is this tip worth interrupting now?" threshold ### Phase 4 — MLOps at scale *(M4)* -- [x] Airflow + MLflow deployed as external services (`mlops` compose profile); each with own auth -- [ ] Write first retraining DAG (Airflow) + first MLflow experiment logging from `ml/serving` -- [ ] Feature-to-prompt pipeline — nightly Airflow DAG materializes context for LLM; cuts inline latency (#94) +- [x] MLflow deployed as external service (`mlops` compose profile); own auth; health check integrated +- [ ] Write first retraining pipeline + first MLflow experiment logging from `ml/serving` + JetStream consumers (#98) +- [ ] Feature-to-prompt pipeline — nightly batch job materializes context for LLM; cuts inline latency (#94) - [ ] Prompt optimization loop — sim A/B → MLflow experiment → human-approved promotion (#95) - [ ] LLM fine-tuning — tip reactions as training signal; LoRA on base model; MLflow tracks runs (#96) - [ ] Embedding-based task clustering — `nomic-embed-text` for dedup + user pattern features (#97) -- [ ] Consolidate MLflow + Airflow auth into shared OIDC provider (tracked as M3 issue #85) +- [ ] Modular-monolith packaging + import-boundary lint (#47) +- [ ] Consolidate MLflow auth into shared OIDC provider (tracked as M3 issue #85) - [ ] Shadow → A/B → launch pipeline as first-class in MLflow - [ ] Online experiments framework: deterministic assignment + bandit policies alongside fixed-split A/B - [ ] Cross-user collaborative features (opt-in only); cohort slicing; fairness checks diff --git a/apps/admin/README.md b/apps/admin/README.md index cbf6984..07a7b38 100644 --- a/apps/admin/README.md +++ b/apps/admin/README.md @@ -22,11 +22,19 @@ Two ways to sign in: | Route | Description | |-------|-------------| | `/` | Overview: DAU/WAU KPI cards, tips served, reaction breakdown, activation funnel | -| `/users` | User list (paginated) | -| `/users/:id` | User detail: identity, consents, integrations, profile features (#81 phase B), tip stats, reward history; revoke-integration + reset-bandit + rebuild-profile actions | -| `/audit` | Admin action audit log | -| `/events` | Event stream viewer (stub — pending API history endpoint) | -| `/reward-analytics` | Reaction distribution + per-policy / per-model / per-prompt-version / per-tip-kind breakdowns with avg reward | +| `/users` | User list (paginated, searchable) | +| `/users/:id` | User detail: identity, consents, integrations, profile features (completion rate, dismiss rate, dwell, preferred hour, tip volume), tip stats, reward history; revoke-integration + reset-bandit + rebuild-profile actions | +| `/audit` | Admin action audit log with timestamps and descriptions | +| `/events` | Live event stream viewer with filters by subject/user/time; tail of `signals.*` from ring buffer or NATS JetStream | +| `/features` | Feature store browser: features sent to `ml/serving` per scoring call; freshness status; per-feature SLA tracking | +| `/tips` | Served tips explorer: tip content, score, policy, model, feedback reactions; per-user timeline | +| `/reward-analytics` | Reaction distribution + per-policy / per-model / per-prompt-version breakdowns with avg reward; time-series and cohort slicing | +| `/data-quality` | Missing-feature rate heatmap, stale-token rate, daily completeness, per-feature freshness SLA status | +| `/health` | System health rollup: api, ml/serving, SQLite, event-bus, MLflow with 15s auto-refresh | +| `/sql` | Read-only SQL runner against SQLite; saved queries support; sunsets to Superset in M4 | +| `/simulate` | Offline simulation runner: launch `ml/experiments/sim`, track runs, judge selection, policy comparison | +| `/docs` | Admin documentation and ops runbooks inline | +| `/ops` | Operational dashboard (deprecation candidate; pending UX refinement #107) | ## Dev @@ -40,8 +48,9 @@ pnpm --filter @oo/admin dev # starts on :3080 Stays as a Next.js app in the monorepo permanently — it's not a candidate for extraction. It gets richer (more pages, embedded MLflow/Grafana) but not split. -## Known issues +## Known issues & pending improvements - `@tremor/react 3.x` declares a peer dep on React 18; the workspace uses React 19. Works in practice. Will resolve naturally when Tremor ships React 19 support or when we switch to Tremor v4 (which targets React 18+). +- UX refinements pending (#100–102): feedback options consolidation, config page UI migration, settings UI placement diff --git a/apps/admin/src/app/simulate/page.tsx b/apps/admin/src/app/simulate/page.tsx index 3913b95..995be03 100644 --- a/apps/admin/src/app/simulate/page.tsx +++ b/apps/admin/src/app/simulate/page.tsx @@ -5,16 +5,11 @@ import { AdminShell } from '@/components/AdminShell'; import { getSimulationRuns, SimRun } from '@/lib/api'; const mlflowBase = process.env.NEXT_PUBLIC_MLFLOW_URL ?? '/mlflow'; -const airflowBase = process.env.NEXT_PUBLIC_AIRFLOW_URL ?? '/airflow'; function mlflowRunUrl(runId: string) { return `${mlflowBase}/#/experiments/1/runs/${runId}`; } -function airflowRunUrl(dagRunId: string) { - return `${airflowBase}/dags/bandit_sim/grid?dag_run_id=${encodeURIComponent(dagRunId)}`; -} - function StatusBadge({ status }: { status: string }) { const cls: Record = { running: 'bg-blue-900 text-blue-300 border-blue-800', @@ -50,10 +45,6 @@ function SummaryRow({ run }: { run: SimRun }) { MLflow ↗ )} - {run.airflowDagRunId && ( - Airflow ↗ - )} {summary && ( @@ -97,11 +88,7 @@ export default function SimulatePage() {

Simulations

- Offline policy comparisons — run via the{' '} - - Airflow bench_collect DAG - - {' '}(mlops profile). Results are logged to{' '} + Offline policy comparisons — trigger via the admin API or CLI. Results are logged to{' '} MLflow ↗.

@@ -114,7 +101,7 @@ export default function SimulatePage() { {loading && loading…} {runs.length === 0 && !loading && ( -

No simulation runs yet. Trigger a run from Airflow.

+

No simulation runs yet.

)} {runs.map((r) => )} diff --git a/apps/admin/src/components/AdminShell.tsx b/apps/admin/src/components/AdminShell.tsx index 279faff..7de0cbd 100644 --- a/apps/admin/src/components/AdminShell.tsx +++ b/apps/admin/src/components/AdminShell.tsx @@ -4,8 +4,7 @@ import Link from 'next/link'; import { usePathname } from 'next/navigation'; import { useEffect, useState } from 'react'; -const mlflowUrl = process.env.NEXT_PUBLIC_MLFLOW_URL ?? '/mlflow'; -const airflowUrl = process.env.NEXT_PUBLIC_AIRFLOW_URL ?? '/airflow'; +const mlflowUrl = process.env.NEXT_PUBLIC_MLFLOW_URL ?? '/mlflow'; type NavItem = { href: string; @@ -53,8 +52,7 @@ const NAV: NavSection[] = [ label: 'Resources', items: [ { href: '/docs', label: 'Docs' }, - { href: mlflowUrl, label: 'MLflow ↗', external: true, svcName: 'mlflow' }, - { href: airflowUrl, label: 'Airflow ↗', external: true, svcName: 'airflow' }, + { href: mlflowUrl, label: 'MLflow ↗', external: true, svcName: 'mlflow' }, ], }, ]; diff --git a/apps/admin/src/lib/api.ts b/apps/admin/src/lib/api.ts index 87a44c6..50cd905 100644 --- a/apps/admin/src/lib/api.ts +++ b/apps/admin/src/lib/api.ts @@ -278,7 +278,6 @@ export interface SimRun { summaryJson: string | null; winner: string | null; personaBreakdownJson: string | null; - airflowDagRunId: string | null; mlflowRunId: string | null; createdAt: string; finishedAt: string | null; @@ -293,7 +292,7 @@ export interface SimStartRequest { } export function startSimulation(req: SimStartRequest) { - return apiFetch<{ id: string; status: string; airflow_dag_run_id?: string }>( + return apiFetch<{ id: string; status: string }>( '/admin/simulate/start', { method: 'POST', body: JSON.stringify(req) }, ); diff --git a/docs/adr/0006-admin-console-framework.md b/docs/adr/0006-admin-console-framework.md index 04e72b5..b751aa1 100644 --- a/docs/adr/0006-admin-console-framework.md +++ b/docs/adr/0006-admin-console-framework.md @@ -33,11 +33,10 @@ Same stack as `apps/web`. Reuses `packages/shared-types`, the Auth.js session co Specialized MLOps tooling runs as **separate external services** with their own auth, linked from the admin shell — not embedded or reimplemented: - **MLflow** → `https://o.alogins.net/mlflow` — experiment tracking, model registry, artifact browser; own basic-auth for now; see M3 for SSO consolidation -- **Airflow** → `https://o.alogins.net/airflow` — batch pipeline orchestration, dataset management; own web-auth for now - **Grafana panels** → `/admin/infra` (iframed panels) — infra metrics - **Marimo notebooks** → launch-out link from admin -The admin shell links to these services; clicking them opens a new tab. The `/experiments` and `/models` admin pages are hub pages with direct links to the relevant MLflow/Airflow views. +The admin shell links to these services; clicking them opens a new tab. ### AuthZ @@ -56,7 +55,7 @@ The admin shell links to these services; clicking them opens a new tab. The `/ex - One more Next.js app in the monorepo. Build/dev added to Turborepo. - Tremor + shadcn/ui are added as dependencies. shadcn components are copied into `apps/admin/src/components/ui/` — no runtime version coupling. -- MLflow (`o.alogins.net/mlflow*` → port 5000) and Airflow (`o.alogins.net/airflow*` → port 8080) are path-based routes in the existing `o.alogins.net` Caddy block, started via `docker compose --profile mlops up`. -- Each service manages its own auth (MLflow: built-in basic-auth; Airflow: built-in web UI auth). M3 will consolidate both behind the shared OIDC provider. -- The `NEXT_PUBLIC_MLFLOW_URL` and `NEXT_PUBLIC_AIRFLOW_URL` build args in `Dockerfile.admin` default to the production URLs; override for dev builds. +- MLflow (`o.alogins.net/mlflow*` → port 5000) is a path-based route in the existing `o.alogins.net` Caddy block, started via `docker compose --profile mlops up`. +- MLflow manages its own auth (built-in basic-auth). M3 will consolidate behind the shared OIDC provider. +- The `NEXT_PUBLIC_MLFLOW_URL` build arg in `Dockerfile.admin` defaults to the production URL; override for dev builds. - `admin_actions` audit log grows unboundedly — needs a retention policy before M4. diff --git a/docs/adr/0013-multi-agent-recommendation.md b/docs/adr/0013-multi-agent-recommendation.md new file mode 100644 index 0000000..88df686 --- /dev/null +++ b/docs/adr/0013-multi-agent-recommendation.md @@ -0,0 +1,106 @@ +# ADR-0013 — Multi-agent recommendation: pre-computed agent snippets + orchestrator LLM + +**Status:** Accepted +**Date:** 2026-05-01 +**Supersedes:** ADR-0007, ADR-0012 + +## Context + +The ε-greedy bandit (ADR-0007, promoted to v2 in ADR-0012) was the first recommendation +policy. It served adequately during early M1 testing but carries structural problems that +become more acute as the user base grows: + +- **Training signal sparsity.** The median user generates fewer than 5 reward signals per + week. Ridge regression on a 12-dimensional feature vector needs far more signal than + that to converge to a meaningful θ before the user loses interest. +- **Cold-start cost.** Every new user starts with an uninformed identity matrix. Early tips + are essentially random for the first weeks of use — precisely when first impressions + matter most. +- **Opacity.** The bandit cannot explain why it chose a tip. An orchestrator that reasons + explicitly over named agent outputs ("3 overdue tasks + peak hour approaching") is + interpretable by design. +- **Coupling of generation and selection.** The current pipeline generates candidates, then + scores them; the scoring is decoupled from the LLM reasoning. Giving the LLM the full + pre-computed context directly is a simpler and more capable design. + +## Decision + +Replace the RL bandit with a **multi-agent pipeline**: + +### Sub-agents (async, pre-computed) + +Multiple domain-specialized Python agents each analyze user state from one angle and +produce a **prompt snippet** — a short natural-language paragraph describing what they +found. They do not produce tips. They run periodically (every 15 minutes) and store +results in the new `agent_outputs` table with per-agent TTLs. + +Initial agent set: + +| Agent | ID | TTL | +|---|---|---| +| OverdueTaskAgent | `overdue-task` | 1h | +| MomentumAgent | `momentum` | 6h | +| TimeOfDayAgent | `time-of-day` | 15m | +| RecentPatternsAgent | `recent-patterns` | 24h | +| FocusAreaAgent | `focus-area` | 12h | + +### Orchestrator agent (real-time) + +When a user requests a tip, the TypeScript recommender: +1. Fetches all non-expired `agent_outputs` rows for the user. +2. Calls `POST /recommend` on `ml/serving` with the snippet list. +3. `ml/serving` assembles a single orchestrator prompt (template `v4-orchestrator`) + that concatenates all snippets, then calls LiteLLM via the existing `tip-generator` + alias to produce one tip. + +No bandit scoring. No reward delivery to an ML model. The LLM receives full context and +generates the tip in one call. + +### Feedback + +`tipFeedback` rows are still written on every user reaction. `inferReward()` still runs +and `rewardMilli` is logged for observability and potential future supervised learning. +Reactions are not delivered to an ML endpoint. + +## New data model + +```sql +CREATE TABLE agent_outputs ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL REFERENCES users(id), + agent_id TEXT NOT NULL, -- e.g. 'overdue-task' + prompt_text TEXT NOT NULL, -- snippet produced by the agent + signals_snapshot TEXT, -- JSON: inputs the agent consumed + computed_at TEXT NOT NULL, -- ISO 8601 + expires_at TEXT NOT NULL, -- ISO 8601 = computed_at + TTL + agent_version TEXT NOT NULL -- bump to invalidate cached outputs on logic changes +); +CREATE INDEX idx_agent_outputs_user_agent_exp + ON agent_outputs(user_id, agent_id, expires_at DESC); +``` + +## Consequences + +### Positive +- Tips are explainable: `featuresJson` in `tipScores` records which agents contributed. +- Cold-start is eliminated: the orchestrator reasons from signals immediately, no warm-up. +- Adding or removing an agent is a self-contained change in `ml/agents/`. +- Swapping LLM models remains a config change (LiteLLM alias unchanged). + +### Negative / risks +- **No automatic exploration.** The bandit would discover that a user prefers certain tip + types without being told. The orchestrator only knows what the agents tell it. + Mitigation: agents can evolve to encode richer signals; offline evaluation via the + existing bench scripts remain available. +- **Scheduler dependency.** If the pre-compute job falls behind, agent outputs go + stale. Mitigation: the orchestrator falls back to raw signal prompt when no outputs + exist; `TimeOfDayAgent` recomputes every 15 min to stay fresh. +- **Higher per-request token cost.** The orchestrator prompt is longer than the old bandit + prompt. Mitigation: the `tip-generator` alias points to a small local model; token cost + is negligible at current scale. + +## Migration sequence + +See plan document in conversation context. 10 steps; each independently deployable and +rollback-able. Cutover is Step 6 (single TypeScript PR). Bandit endpoints removed in +Step 7 after 48h clean traffic. diff --git a/docs/architecture/overview.md b/docs/architecture/overview.md index cb67f05..d870633 100644 --- a/docs/architecture/overview.md +++ b/docs/architecture/overview.md @@ -47,7 +47,6 @@ User reactions (done / snooze / dismiss) are events too. They close the loop as - **OpenAPI** for HTTP; TS client auto-generated; Python pydantic hand-written while consumers are few. - **Feast** for feature store when we get there; homegrown adapter until then (Phase 1 seam). - **MLflow** for model registry and experiment tracking; deployed at `o.alogins.net/mlflow`. -- **Airflow** for batch pipelines; deployed at `o.alogins.net/airflow`. - **Auth.js** embedded behind an OIDC-shaped boundary (ADR-0004). Swap to a standalone OIDC provider when mobile ships. - **k3s** as the first step beyond docker-compose — no "compose → full k8s" cliff. diff --git a/infra/docker/Dockerfile.admin b/infra/docker/Dockerfile.admin index 0398c93..f1b099a 100644 --- a/infra/docker/Dockerfile.admin +++ b/infra/docker/Dockerfile.admin @@ -19,10 +19,8 @@ RUN --mount=type=cache,id=pnpm,target=/pnpm/store \ --filter @oo/admin... --filter @oo/shared-types RUN pnpm --filter @oo/shared-types build ARG NEXT_PUBLIC_MLFLOW_URL=/mlflow -ARG NEXT_PUBLIC_AIRFLOW_URL=/airflow ENV NEXT_TELEMETRY_DISABLED=1 \ - NEXT_PUBLIC_MLFLOW_URL=$NEXT_PUBLIC_MLFLOW_URL \ - NEXT_PUBLIC_AIRFLOW_URL=$NEXT_PUBLIC_AIRFLOW_URL + NEXT_PUBLIC_MLFLOW_URL=$NEXT_PUBLIC_MLFLOW_URL RUN pnpm --filter @oo/admin build FROM node:22-slim AS runner diff --git a/infra/docker/docker-compose.yml b/infra/docker/docker-compose.yml index 921784a..0e90247 100644 --- a/infra/docker/docker-compose.yml +++ b/infra/docker/docker-compose.yml @@ -13,9 +13,6 @@ services: NODE_ENV: production ML_SERVING_URL: "http://ml-serving:8000" MLFLOW_URL: "http://mlflow:5000" - AIRFLOW_URL: "http://airflow-webserver:8080" - AIRFLOW_API_USER: "admin" - AIRFLOW_API_PASSWORD: "${AIRFLOW_ADMIN_PASSWORD:-admin}" INTERNAL_API_TOKEN: "${INTERNAL_API_TOKEN:-}" volumes: - /mnt/ssd/dbs/oo:/mnt/ssd/dbs/oo @@ -56,7 +53,6 @@ services: HOSTNAME: "0.0.0.0" NEXT_PUBLIC_API_URL: "" NEXT_PUBLIC_MLFLOW_URL: "/mlflow" - NEXT_PUBLIC_AIRFLOW_URL: "/airflow" INTERNAL_API_URL: "http://api:3078" ports: - "127.0.0.1:3080:3080" @@ -85,100 +81,9 @@ services: timeout: 5s retries: 5 - # ── mlops profile — MLflow + Airflow ────────────────────────────────────── + # ── mlops profile — MLflow ──────────────────────────────────────────────── # Start: docker compose --profile mlops up - # MLflow UI: http://localhost:5000 or https://o.alogins.net/mlflow (admin / password — change via basic_auth.ini) - # Airflow UI: http://localhost:8080/airflow or https://o.alogins.net/airflow (admin / AIRFLOW_ADMIN_PASSWORD) - # Caddy routes /mlflow* and /airflow* inside the o.alogins.net block - - airflow-db: - image: postgres:16-alpine - profiles: [mlops] - environment: - POSTGRES_DB: airflow - POSTGRES_USER: airflow - POSTGRES_PASSWORD: ${AIRFLOW_DB_PASSWORD:-airflow} - volumes: - - /mnt/ssd/dbs/oo/airflow-db:/var/lib/postgresql/data - healthcheck: - test: ["CMD-SHELL", "pg_isready -U airflow"] - interval: 10s - timeout: 5s - retries: 5 - - airflow-init: - image: apache/airflow:2.9.3 - profiles: [mlops] - entrypoint: /bin/bash - command: - - -c - - | - airflow db migrate - airflow users create \ - --username admin \ - --firstname Admin \ - --lastname User \ - --role Admin \ - --email admin@oo.local \ - --password "$${AIRFLOW_ADMIN_PASSWORD:-admin}" - environment: - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD:-airflow}@airflow-db/airflow - AIRFLOW__CORE__EXECUTOR: LocalExecutor - AIRFLOW__WEBSERVER__SECRET_KEY: ${AIRFLOW_SECRET_KEY:-change-me-in-prod} - AIRFLOW__WEBSERVER__BASE_URL: ${AIRFLOW_BASE_URL:-https://o.alogins.net/airflow} - depends_on: - airflow-db: - condition: service_healthy - restart: "no" - - airflow-webserver: - image: apache/airflow:2.9.3 - profiles: [mlops] - command: webserver - environment: - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD:-airflow}@airflow-db/airflow - AIRFLOW__CORE__EXECUTOR: LocalExecutor - AIRFLOW__WEBSERVER__SECRET_KEY: ${AIRFLOW_SECRET_KEY:-change-me-in-prod} - AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW_FERNET_KEY:-} - AIRFLOW__WEBSERVER__BASE_URL: ${AIRFLOW_BASE_URL:-https://o.alogins.net/airflow} - AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth" - _PIP_ADDITIONAL_REQUIREMENTS: "mlflow==2.14.3 httpx" - MLFLOW_TRACKING_URI: "http://mlflow:5000/mlflow" - MLFLOW_TRACKING_USERNAME: "admin" - MLFLOW_TRACKING_PASSWORD: "${MLFLOW_ADMIN_PASSWORD:-password}" - volumes: - - ../../ml/pipelines:/opt/airflow/dags:ro - - ../../ml:/opt/airflow/ml:ro - ports: - - "127.0.0.1:8080:8080" - depends_on: - airflow-init: - condition: service_completed_successfully - healthcheck: - test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] - interval: 30s - timeout: 10s - retries: 5 - start_period: 60s - - airflow-scheduler: - image: apache/airflow:2.9.3 - profiles: [mlops] - command: scheduler - environment: - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD:-airflow}@airflow-db/airflow - AIRFLOW__CORE__EXECUTOR: LocalExecutor - AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW_FERNET_KEY:-} - _PIP_ADDITIONAL_REQUIREMENTS: "mlflow==2.14.3 httpx" - MLFLOW_TRACKING_URI: "http://mlflow:5000/mlflow" - MLFLOW_TRACKING_USERNAME: "admin" - MLFLOW_TRACKING_PASSWORD: "${MLFLOW_ADMIN_PASSWORD:-password}" - volumes: - - ../../ml/pipelines:/opt/airflow/dags:ro - - ../../ml:/opt/airflow/ml:ro - depends_on: - airflow-init: - condition: service_completed_successfully + # MLflow UI: http://localhost:5000 or https://o.alogins.net/mlflow # ── events profile — NATS JetStream ───────────────────────────────────── # Start: docker compose --profile events up @@ -201,7 +106,7 @@ services: retries: 5 mlflow: - image: ghcr.io/mlflow/mlflow:v2.14.3 + image: ghcr.io/mlflow/mlflow:v3.11.1 profiles: [mlops] command: > mlflow server @@ -209,17 +114,15 @@ services: --default-artifact-root /mlflow/artifacts --host 0.0.0.0 --port 5000 - --app-name basic-auth --static-prefix /mlflow - environment: - MLFLOW_AUTH_CONFIG_PATH: /mlflow/basic_auth.ini + --allowed-hosts o.alogins.net,localhost + --cors-allowed-origins https://o.alogins.net volumes: - /mnt/ssd/dbs/oo/mlflow:/mlflow - - ../../infra/mlflow/basic_auth.ini:/mlflow/basic_auth.ini:ro ports: - "127.0.0.1:5000:5000" healthcheck: - test: ["CMD", "python", "-c", "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:5000/health',timeout=3).status==200 else 1)"] + test: ["CMD", "python", "-c", "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:5000/mlflow/health',timeout=3).status==200 else 1)"] interval: 10s timeout: 5s retries: 5 diff --git a/ml/README.md b/ml/README.md index 97cf137..2c62caf 100644 --- a/ml/README.md +++ b/ml/README.md @@ -6,7 +6,7 @@ Python. Owns models, features, training, online scoring. |---|---|---| | `serving/` | FastAPI online scorer (`/score`, `/generate`) + LiteLLM gateway + prompt registry (`prompts.py`) + JetStream consumers for `signals.>` / `feedback.>`, called by `recommender` | 1–2 | | `features/` | context assembler (`context.py`): signals → `PromptContext`; profile-feature schema mirror (`profile_schema.py`); Feast adapter later | 2 | -| `pipelines/` | batch feature + training DAGs (Prefect/Airflow) | 4 | +| `pipelines/` | batch feature + training scripts | 4 | | `registry/` | MLflow-backed model registry integration | 4 | | `experiments/` | A/B assignment + multi-armed bandit policies | 4 | | `notebooks/` | research; never imported by production code | — | diff --git a/ml/experiments/bench/AIRFLOW.md b/ml/experiments/bench/AIRFLOW.md deleted file mode 100644 index eb1e5a4..0000000 --- a/ml/experiments/bench/AIRFLOW.md +++ /dev/null @@ -1,90 +0,0 @@ -# Airflow Integration — `bench_collect` DAG - -The benchmark harness integrates with Airflow as a DAG (`ml/pipelines/bench_dag.py`) -triggered on-demand from the admin UI or the CLI. - -## DAG Structure - -Three linked tasks: - -1. **`collect`** — `collect.py` generates candidates per (model × prompt × scenario) cell, - logs MLflow runs with `judge_pending=true`. Rejects models >4B, uses `keep_alive=0` - for RAM safety. - -2. **`export_for_judge`** — `judge_cli.py --export` pulls pending runs into a single - JSON file for Claude Code to score per the rubric. XCom-pushes the path so the - next task can find it. - -3. **`compare`** — `compare.py` aggregates scores by (model, prompt) cell and - generates the leaderboard ranked by composite score. - -## Triggering from the CLI - -```bash -# Minimal: use all defaults -airflow dags trigger bench_collect - -# Custom config: specify models, prompts, scenario count -airflow dags trigger bench_collect --conf '{ - "models": "qwen2.5:0.5b,qwen2.5:1.5b", - "prompts": "v1,v2-mentor", - "n_tips": 5, - "n_scenarios": 2, - "temperature": 0.7, - "experiment": "tip-bench-custom" -}' -``` - -## Triggering from the Admin UI - -The API exposes: - -``` -POST /api/bench/run { config object } -``` - -Admin UI → Benchmark panel → "Run Collection" button → form dialog fills config → -POST to `/api/bench/run` → DAG triggered. - -## Configuration Keys - -| Key | Type | Default | Description | -|-----|------|---------|-------------| -| `models` | str | `qwen2.5:0.5b,qwen2.5:1.5b,gemma3:1b,llama3.2:3b` | comma-separated Ollama tags | -| `prompts` | str | `v1,v2-mentor,v3-few-shot` | comma-separated prompt versions | -| `n_tips` | int | 5 | candidates to generate per scenario | -| `n_scenarios` | int | 0 | cap scenario count (0 = all 8) | -| `temperature` | float | 0.7 | LLM generation temperature | -| `experiment` | str | `tip-bench-auto` | MLflow experiment name | -| `max_model_b` | float | 4.0 | reject models larger than this (in billions) | -| `ollama_url` | str | `http://localhost:11434` | Ollama endpoint | -| `mlflow_url` | str | `$MLFLOW_TRACKING_URI` or `http://localhost:5000` | MLflow tracking URI | - -## Human-in-the-Loop Judge - -After `collect` finishes, `export_for_judge` produces a JSON file with all pending -runs. The Claude Code session: - -1. Reads the file -2. Scores each candidate per the rubric (relevance/actionability/tone 1–5) -3. Runs `judge_cli.py --apply /path/to/file.json` to write scores back to MLflow - -Then `compare` generates the leaderboard. - -**Future enhancement:** Add a webhook or admin UI button to trigger the judge step -so the entire pipeline is end-to-end in Airflow, not requiring manual Claude Code -intervention. - -## Monitoring - -- **Airflow UI**: `http://localhost:8080` → DAGs → `bench_collect` → graph view -- **MLflow UI**: `http://localhost:5000/mlflow` → experiments → `tip-bench-*` -- **Admin API**: `GET /api/bench/leaderboard/tip-bench-auto` → JSON leaderboard - -## Future: Admin UI Panel - -`apps/admin/src/components/BenchPanel.tsx` (TBD): -- List experiments -- Trigger DAG with form (models, prompts, scenario count, temperature) -- Display current DAG run status -- Show leaderboard once `compare` completes diff --git a/ml/experiments/bench/README.md b/ml/experiments/bench/README.md index c08fb48..7fa5c06 100644 --- a/ml/experiments/bench/README.md +++ b/ml/experiments/bench/README.md @@ -77,13 +77,9 @@ keys `artifact:candidates.json`, `artifact:prompt.txt`, `artifact:raw.txt` (tag fallback because the MLflow server uses a file:// artifact backend not accessible via REST from the host). -## Integrating with Airflow (#95) +## Running standalone -A future DAG `ml/pipelines/prompt_ab_eval.py` will wrap `collect.py` -exactly as shown in the quick-start, triggered on-demand from the admin -UI or manually. The results feed into the admin leaderboard view. - -For now, the pipeline is runnable standalone on any machine with: +The pipeline runs on any machine with: - Ollama models ≤4B - MLflow tracking server - Python 3.10+ diff --git a/ml/experiments/bench/mlflow_client.py b/ml/experiments/bench/mlflow_client.py index 9eaa2ac..9657b23 100644 --- a/ml/experiments/bench/mlflow_client.py +++ b/ml/experiments/bench/mlflow_client.py @@ -10,8 +10,7 @@ Why not the official ``mlflow`` SDK? Two reasons specific to the oO setup: Pulling a 200MB SDK transitively for that is excess weight. All calls are synchronous httpx with explicit ``Host`` so the script can -run from the host shell, from inside docker, or from Airflow workers -without further config. +run from the host shell or from inside docker without further config. """ from __future__ import annotations diff --git a/ml/pipelines/bench_dag.py b/ml/pipelines/bench_dag.py deleted file mode 100644 index d901c0b..0000000 --- a/ml/pipelines/bench_dag.py +++ /dev/null @@ -1,168 +0,0 @@ -""" -Airflow DAG: bench_collect - -Runs the tip-generation benchmark (model × prompt evaluation). Triggered -on-demand from the admin UI or manually, collects candidates per cell, -exports for Claude Code judgment, and generates a leaderboard. - -Mirrors the manual flow: - - 1. collect.py → generates candidates, logs to MLflow with judge_pending=true - 2. (human: judge_cli.py --export, Claude Code scores, judge_cli.py --apply) - 3. compare.py → leaderboard - -For now, steps 2 is manual. Future: add a webhook to trigger the human -judge from the admin UI or set up an async task queue. - -Required conf keys (passed via dag_run.conf): - models str — comma-separated model tags (e.g. "qwen2.5:0.5b,qwen2.5:1.5b") - prompts str — comma-separated prompt versions (default: "v1,v2-mentor,v3-few-shot") - n_tips int — candidates to generate per scenario (default: 5) - n_scenarios int — cap scenario count; 0 = all (default: 0) - temperature float — LLM generation temperature (default: 0.7) - experiment str — MLflow experiment name (default: "tip-bench-auto") - max_model_b float — reject models larger than this (default: 4.0) - ollama_url str — Ollama endpoint (default: http://localhost:11434) - mlflow_url str — MLflow tracking URI (env MLFLOW_TRACKING_URI or http://localhost:5000) -""" - -from __future__ import annotations - -import json -import os -import sys -from datetime import datetime, timedelta -from pathlib import Path - -from airflow import DAG -from airflow.operators.python import PythonOperator - - -def _collect(**context: object) -> dict: - """Run collect.py with the provided config.""" - conf: dict = context["dag_run"].conf or {} - - models = str(conf.get("models", "qwen2.5:0.5b,qwen2.5:1.5b,gemma3:1b,llama3.2:3b")) - prompts = str(conf.get("prompts", "v1,v2-mentor,v3-few-shot")) - n_tips = int(conf.get("n_tips", 5)) - n_scenarios = int(conf.get("n_scenarios", 0)) - temperature = float(conf.get("temperature", 0.7)) - experiment = str(conf.get("experiment", "tip-bench-auto")) - max_model_b = float(conf.get("max_model_b", 4.0)) - ollama_url = str(conf.get("ollama_url", os.environ.get("OLLAMA_URL", "http://localhost:11434"))) - mlflow_url = str(conf.get("mlflow_url", os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000"))) - - sys.path.insert(0, "/opt/airflow/ml/experiments/bench") - from collect import main as collect_main # type: ignore - - # Build args for collect.py - args = [ - "--models", models, - "--prompts", prompts, - "--experiment", experiment, - "--n-tips", str(n_tips), - "--temperature", str(temperature), - "--max-model-b", str(max_model_b), - "--ollama-url", ollama_url, - "--mlflow-url", mlflow_url, - ] - if n_scenarios > 0: - args.extend(["--n-scenarios", str(n_scenarios)]) - - # Inject args into sys.argv so argparse picks them up - old_argv = sys.argv - try: - sys.argv = ["collect.py"] + args - result = collect_main() - return { - "status": "success" if result == 0 else "failed", - "exit_code": result, - "experiment": experiment, - } - finally: - sys.argv = old_argv - - -def _compare(**context: object) -> dict: - """Run compare.py to generate the leaderboard.""" - conf: dict = context["dag_run"].conf or {} - experiment = str(conf.get("experiment", "tip-bench-auto")) - mlflow_url = str(conf.get("mlflow_url", os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000"))) - - sys.path.insert(0, "/opt/airflow/ml/experiments/bench") - from compare import main as compare_main # type: ignore - - old_argv = sys.argv - try: - sys.argv = [ - "compare.py", - "--experiment", experiment, - "--mlflow-url", mlflow_url, - ] - result = compare_main() - return { - "status": "success" if result == 0 else "failed", - "exit_code": result, - "experiment": experiment, - } - finally: - sys.argv = old_argv - - -def _export_for_judge(**context: object) -> str: - """Export pending runs for Claude Code judgment.""" - conf: dict = context["dag_run"].conf or {} - experiment = str(conf.get("experiment", "tip-bench-auto")) - mlflow_url = str(conf.get("mlflow_url", os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000"))) - - export_path = f"/tmp/oo-bench-{experiment}-{int(context['ti'].start_date.timestamp())}.json" - - sys.path.insert(0, "/opt/airflow/ml/experiments/bench") - from judge_cli import export # type: ignore - from mlflow_client import MLflowClient # type: ignore - - client = MLflowClient( - tracking_uri=mlflow_url, - username=os.environ.get("MLFLOW_TRACKING_USERNAME") or "admin", - password=os.environ.get("MLFLOW_TRACKING_PASSWORD") or "password", - ) - result = export(client, experiment, export_path) - - # XCom: push path so next task can find it - context["ti"].xcom_push(key="export_path", value=export_path) - - return export_path - - -with DAG( - dag_id="bench_collect", - description="Tip-generation benchmark: model & prompt evaluation via MLflow", - schedule_interval=None, - start_date=datetime(2025, 1, 1), - catchup=False, - tags=["bench", "ml", "evaluation"], - default_args={ - "retries": 1, - "retry_delay": timedelta(minutes=5), - }, -) as dag: - - collect = PythonOperator( - task_id="collect", - python_callable=_collect, - provide_context=True, - ) - - export_judge = PythonOperator( - task_id="export_for_judge", - python_callable=_export_for_judge, - provide_context=True, - ) - - compare = PythonOperator( - task_id="compare", - python_callable=_compare, - provide_context=True, - ) - - collect >> export_judge >> compare diff --git a/ml/pipelines/sim_dag.py b/ml/pipelines/sim_dag.py deleted file mode 100644 index 867f6c8..0000000 --- a/ml/pipelines/sim_dag.py +++ /dev/null @@ -1,124 +0,0 @@ -""" -Airflow DAG: bandit_sim - -Runs a bandit policy simulation and logs results to MLflow. -Triggered on-demand from the oO admin panel or manually from the Airflow UI. - -Required conf keys (passed via dag_run.conf): - sim_run_id str — oO SQLite run ID for callback correlation - n_users int — number of synthetic users - n_rounds int — rounds per user - tasks_per_round int — candidate pool size per round - policies list — policy names to compare - judge_mode str — "rule" | "llm" - ml_url str — ml/serving URL (e.g. http://ml-serving:8000) - mlflow_url str — MLflow tracking URI (e.g. http://mlflow:5000/mlflow) - callback_url str — oO API callback endpoint - internal_token str — x-internal-token header value -""" - -from __future__ import annotations - -import json -import os -import sys -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.operators.python import PythonOperator - - -def _run_sim(**context: object) -> dict: - conf: dict = context["dag_run"].conf or {} - - n_users = int(conf.get("n_users", 5)) - n_rounds = int(conf.get("n_rounds", 20)) - tasks_per_round = int(conf.get("tasks_per_round", 8)) - policies = list(conf.get("policies", ["linucb-v1", "egreedy-v1"])) - judge_mode = str(conf.get("judge_mode", "rule")) - ml_url = str(conf.get("ml_url", "http://ml-serving:8000")) - mlflow_url = str(conf.get("mlflow_url", os.environ.get("MLFLOW_TRACKING_URI", ""))) - mlflow_experiment = "bandit_simulation" - - sys.path.insert(0, "/opt/airflow/ml/experiments/sim") - from runner import run_simulation # type: ignore[import] - - use_llm = judge_mode == "llm" - result = run_simulation( - n_users=n_users, - n_rounds=n_rounds, - tasks_per_round=tasks_per_round, - ml_url=ml_url, - policies=policies, - use_llm=use_llm, - seed=42, - mlflow_url=mlflow_url or None, - mlflow_experiment=mlflow_experiment, - ) - return result - - -def _callback(**context: object) -> None: - import httpx - - conf: dict = context["dag_run"].conf or {} - callback_url: str = str(conf.get("callback_url", "")) - internal_token: str = str(conf.get("internal_token", "")) - - if not callback_url or not internal_token: - print("No callback_url or internal_token — skipping result push.", flush=True) - return - - result: dict = context["ti"].xcom_pull(task_ids="run_sim") - if not result: - print("No result from run_sim task — callback skipped.", flush=True) - return - - payload = { - "summary": result.get("summary", {}), - "winner": result.get("winner", ""), - "persona_breakdown": result.get("persona_breakdown", {}), - "events": result.get("events", []), - "mlflow_run_id": result.get("mlflow_run_id"), - } - - try: - r = httpx.post( - callback_url, - json=payload, - headers={"x-internal-token": internal_token}, - timeout=30.0, - ) - r.raise_for_status() - print(f"Callback OK: {r.status_code}", flush=True) - except Exception as exc: - print(f"Callback failed: {exc}", flush=True) - raise - - -with DAG( - dag_id="bandit_sim", - description="On-demand bandit policy simulation with MLflow tracking", - schedule_interval=None, - start_date=datetime(2025, 1, 1), - catchup=False, - tags=["bandit", "simulation", "ml"], - default_args={ - "retries": 1, - "retry_delay": timedelta(minutes=2), - }, -) as dag: - - run_sim = PythonOperator( - task_id="run_sim", - python_callable=_run_sim, - provide_context=True, - ) - - push_results = PythonOperator( - task_id="push_results", - python_callable=_callback, - provide_context=True, - ) - - run_sim >> push_results diff --git a/ml/serving/main.py b/ml/serving/main.py index 1a8bb26..878caf0 100644 --- a/ml/serving/main.py +++ b/ml/serving/main.py @@ -26,9 +26,11 @@ from __future__ import annotations import json import math import os +import sys import time from collections import deque from contextlib import asynccontextmanager +from datetime import datetime, timezone from pathlib import Path from typing import Optional, Deque @@ -43,7 +45,17 @@ from starlette.middleware.base import BaseHTTPMiddleware import logging_config import nats_consumer -from prompts import get_prompt +from prompts import get_prompt, build_orchestrator_messages + +# Make ml.agents importable regardless of working directory. +# In Docker (WORKDIR=/app/ml/serving, PYTHONPATH=/app): /app already on path. +# In local dev (run from ml/serving/): repo root is two levels up. +_repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if _repo_root not in sys.path: + sys.path.insert(0, _repo_root) + +from ml.agents.base import AgentInput # noqa: E402 +from ml.agents.registry import get_agent, all_agents # noqa: E402 logging_config.configure() @@ -350,12 +362,61 @@ class GenerateResponse(BaseModel): completion_tokens: int = 0 +# ── Multi-agent models ───────────────────────────────────────────────────── + +class AgentComputeRequest(BaseModel): + user_id: str + tasks: list[dict] = [] + profile: dict[str, Optional[float]] = {} + feedback_history: list[dict] = [] + now_iso: Optional[str] = None # ISO 8601; defaults to utcnow + + +class AgentComputeResponse(BaseModel): + user_id: str + agent_id: str + prompt_text: str + signals_snapshot: dict + computed_at: str + expires_at: str + agent_version: str + + +class AgentOutputSnippet(BaseModel): + agent_id: str + prompt_text: str + + +class RecommendRequest(BaseModel): + user_id: str + agent_outputs: list[AgentOutputSnippet] = [] + tasks: list[dict] = [] + hour_of_day: int = 12 + day_of_week: int = 0 + + +class TipResult(BaseModel): + id: str + content: str + source: str = "llm" + kind: str = "advice" + rationale: Optional[str] = None + + +class RecommendResponse(BaseModel): + tip: TipResult + model: str + prompt_tokens: int = 0 + completion_tokens: int = 0 + + # ── Endpoints ────────────────────────────────────────────────────────────── @app.get("/health") def health(): return { "ok": True, + "agents": [a.agent_id for a in all_agents()], "nats": { "enabled": bool(nats_consumer.NATS_URL), "consumers": nats_consumer.consumer_health, @@ -368,6 +429,137 @@ _RETRY_SUFFIX = ( "Reply ONLY with the JSON array — no prose, no markdown fences." ) +_RETRY_SUFFIX_OBJ = ( + "\n\nYour previous response was not valid JSON. " + "Reply ONLY with the JSON object — no prose, no markdown fences." +) + + +@app.post("/agents/{agent_id}/compute", response_model=AgentComputeResponse) +async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentComputeResponse: + """Run a single sub-agent for a user and return its prompt snippet. + + Called by the precompute pipeline for each (user_id, agent_id) pair. + The caller is responsible for persisting the result to agent_outputs via the + TypeScript API callback. + """ + try: + agent = get_agent(agent_id) + except KeyError: + raise HTTPException(status_code=404, detail=f"Unknown agent: {agent_id!r}") + + now = ( + datetime.fromisoformat(req.now_iso.replace("Z", "+00:00")) + if req.now_iso + else datetime.now(timezone.utc) + ) + if now.tzinfo is None: + now = now.replace(tzinfo=timezone.utc) + + inp = AgentInput( + user_id=req.user_id, + tasks=req.tasks, + profile=req.profile, + feedback_history=req.feedback_history, + now=now, + ) + try: + output = agent.compute(inp) + except Exception as exc: + log.error("agent_compute_failed", agent_id=agent_id, user_id=req.user_id, error=str(exc)) + raise HTTPException(status_code=500, detail=f"Agent compute failed: {exc}") + + log.info("agent_computed", agent_id=agent_id, user_id=req.user_id, expires_at=output.expires_at) + return AgentComputeResponse( + user_id=output.user_id, + agent_id=output.agent_id, + prompt_text=output.prompt_text, + signals_snapshot=output.signals_snapshot, + computed_at=output.computed_at, + expires_at=output.expires_at, + agent_version=output.agent_version, + ) + + +@app.post("/recommend", response_model=RecommendResponse) +async def recommend(req: RecommendRequest) -> RecommendResponse: + """Orchestrator: combine pre-computed agent outputs into one tip via LLM. + + Called in real time when a user requests a tip. agent_outputs should be + the fresh rows from agent_outputs table (fetched by the TypeScript recommender + before calling this endpoint). Falls back to raw task context if empty. + """ + messages = build_orchestrator_messages( + agent_outputs=[s.model_dump() for s in req.agent_outputs], + tasks=req.tasks, + hour_of_day=req.hour_of_day, + day_of_week=req.day_of_week, + ) + headers = {"Authorization": f"Bearer {LITELLM_MASTER_KEY}"} + last_raw = "" + last_parse_error = "" + total_usage: dict = {"prompt_tokens": 0, "completion_tokens": 0} + model_used = "tip-generator" + + async with httpx.AsyncClient(timeout=30.0) as client: + for _attempt in range(1 + _MAX_GENERATE_RETRIES): + payload = {"model": "tip-generator", "messages": messages, "temperature": 0.7} + try: + resp = await client.post( + f"{LITELLM_URL}/chat/completions", json=payload, headers=headers + ) + resp.raise_for_status() + except httpx.HTTPStatusError as e: + raise HTTPException(status_code=502, detail=f"LiteLLM error: {e.response.text}") + except httpx.RequestError as e: + raise HTTPException(status_code=503, detail=f"LiteLLM unreachable: {e}") + + data = resp.json() + usage = data.get("usage", {}) + total_usage["prompt_tokens"] += usage.get("prompt_tokens", 0) + total_usage["completion_tokens"] += usage.get("completion_tokens", 0) + model_used = data.get("model", "tip-generator") + last_raw = data["choices"][0]["message"]["content"] + + try: + text = last_raw.strip() + if text.startswith("```"): + parts = text.split("```") + text = parts[1] if len(parts) > 1 else text + if text.startswith("json"): + text = text[4:] + parsed = json.loads(text) + item: dict = parsed[0] if isinstance(parsed, list) else parsed + break + except (json.JSONDecodeError, ValueError, IndexError) as exc: + last_parse_error = str(exc) + messages.append({"role": "assistant", "content": last_raw}) + messages.append({"role": "user", "content": _RETRY_SUFFIX_OBJ}) + else: + raise HTTPException( + status_code=502, + detail=f"LLM returned invalid JSON after {_MAX_GENERATE_RETRIES} retries: " + f"{last_parse_error}\n{last_raw[:200]}", + ) + + tip = TipResult( + id=item.get("id", f"tip-{req.user_id[:8]}"), + content=item.get("content", ""), + rationale=item.get("rationale"), + ) + log.info( + "recommend_served", + user_id=req.user_id, + agent_count=len(req.agent_outputs), + tip_id=tip.id, + ) + return RecommendResponse( + tip=tip, + model=model_used, + prompt_tokens=total_usage["prompt_tokens"], + completion_tokens=total_usage["completion_tokens"], + ) + _MAX_GENERATE_RETRIES = 2 diff --git a/services/api/README.md b/services/api/README.md index 736219e..0d789c1 100644 --- a/services/api/README.md +++ b/services/api/README.md @@ -28,13 +28,20 @@ POST /api/push/subscribe DELETE /api/push/subscribe GET /api/admin/stats DAU/WAU, feedback breakdown -GET /api/admin/users -GET /api/admin/events recent event stream (ring buffer) +GET /api/admin/users user list with pagination +GET /api/user/:id user detail, consents, integrations +GET /api/admin/events recent event stream (ring buffer or NATS JetStream) +GET /api/admin/events/history historical event query (time range, filters) GET /api/admin/sim/runs offline sim run list -POST /api/admin/sim/run launch offline sim +POST /api/admin/sim/run launch offline sim with policy/judge params GET /api/admin/sim/runs/:id/output tail sim stdout -... - +GET /api/admin/features/:userId per-user profile features + freshness +GET /api/admin/features/:userId/context context features for last score call +POST /api/admin/policies list shadow policies + active policy +POST /api/admin/policies/:name/toggle enable/disable shadow policy +POST /api/admin/users/:id/actions revoke-integration, reset-bandit, rebuild-profile +GET /api/admin/health system health: api, ml/serving, db, bus, mlflow +GET /api/admin/docs admin documentation index GET /api/ml/* admin-only proxy to ml/serving ``` diff --git a/services/api/src/config.ts b/services/api/src/config.ts index 249a5b4..1a5ea5a 100644 --- a/services/api/src/config.ts +++ b/services/api/src/config.ts @@ -35,11 +35,8 @@ export const config = { LITELLM_URL: optional('LITELLM_URL', 'http://localhost:4000'), MLFLOW_URL: optional('MLFLOW_URL', 'http://localhost:5000'), - AIRFLOW_URL: optional('AIRFLOW_URL', 'http://localhost:8080'), - AIRFLOW_API_USER: optional('AIRFLOW_API_USER', 'admin'), - AIRFLOW_API_PASSWORD: optional('AIRFLOW_API_PASSWORD', 'admin'), - /** Shared secret for internal Airflow→API callbacks. */ + /** Shared secret for internal API callbacks. */ INTERNAL_API_TOKEN: optional('INTERNAL_API_TOKEN', ''), /** Static token for automated/service access to the admin panel (e.g. Playwright tests). */ diff --git a/services/api/src/db/index.ts b/services/api/src/db/index.ts index c12d739..43a18cf 100644 --- a/services/api/src/db/index.ts +++ b/services/api/src/db/index.ts @@ -143,6 +143,19 @@ export function runMigrations() { day_of_week INTEGER NOT NULL, created_at TEXT NOT NULL ); + + CREATE TABLE IF NOT EXISTS agent_outputs ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL REFERENCES users(id), + agent_id TEXT NOT NULL, + prompt_text TEXT NOT NULL, + signals_snapshot TEXT, + computed_at TEXT NOT NULL, + expires_at TEXT NOT NULL, + agent_version TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_agent_outputs_user_agent_exp + ON agent_outputs(user_id, agent_id, expires_at DESC); `); // Additive column migrations — safe to run on existing DBs. @@ -156,7 +169,6 @@ export function runMigrations() { `ALTER TABLE tip_scores ADD COLUMN prompt_version TEXT`, `ALTER TABLE tip_scores ADD COLUMN llm_model TEXT`, `ALTER TABLE tip_scores ADD COLUMN tip_kind TEXT`, - `ALTER TABLE sim_runs ADD COLUMN airflow_dag_run_id TEXT`, `ALTER TABLE sim_runs ADD COLUMN mlflow_run_id TEXT`, `ALTER TABLE sim_runs ADD COLUMN judge_mode TEXT NOT NULL DEFAULT 'rule'`, `ALTER TABLE sim_runs ADD COLUMN n_policies INTEGER NOT NULL DEFAULT 2`, diff --git a/services/api/src/db/schema.ts b/services/api/src/db/schema.ts index 84db0bc..b400eef 100644 --- a/services/api/src/db/schema.ts +++ b/services/api/src/db/schema.ts @@ -117,7 +117,6 @@ export const simRuns = sqliteTable('sim_runs', { summaryJson: text('summary_json'), // JSON: { [policy]: PolicySummary } winner: text('winner'), personaBreakdownJson: text('persona_breakdown_json'), // JSON: { [persona]: { [policy]: {reward,n} } } - airflowDagRunId: text('airflow_dag_run_id'), mlflowRunId: text('mlflow_run_id'), createdAt: text('created_at').notNull(), finishedAt: text('finished_at'), @@ -142,6 +141,20 @@ export const simEvents = sqliteTable('sim_events', { createdAt: text('created_at').notNull(), }); +// ── Agent outputs (#multi-agent) ───────────────────────────────────────────── +// One row per (userId, agentId) pre-compute run. The orchestrator reads the +// freshest non-expired row per agent when assembling the tip prompt. +export const agentOutputs = sqliteTable('agent_outputs', { + id: text('id').primaryKey(), + userId: text('user_id').notNull().references(() => users.id), + agentId: text('agent_id').notNull(), // e.g. 'overdue-task' + promptText: text('prompt_text').notNull(), // snippet for orchestrator prompt + signalsSnapshot: text('signals_snapshot'), // JSON: inputs the agent consumed + computedAt: text('computed_at').notNull(), // ISO 8601 + expiresAt: text('expires_at').notNull(), // ISO 8601 = computedAt + TTL + agentVersion: text('agent_version').notNull(), // bump to invalidate on logic changes +}); + // Admin saved SQL queries. export const savedQueries = sqliteTable('saved_queries', { id: text('id').primaryKey(), diff --git a/services/api/src/routes/__tests__/admin.test.ts b/services/api/src/routes/__tests__/admin.test.ts index 4632d4a..413097c 100644 --- a/services/api/src/routes/__tests__/admin.test.ts +++ b/services/api/src/routes/__tests__/admin.test.ts @@ -389,7 +389,7 @@ describe('GET /api/admin/events', () => { // Health endpoint — mock fetch so tests don't depend on running services. // --------------------------------------------------------------------------- describe('GET /api/admin/health', () => { - const EXPECTED_HTTP_SERVICES = ['api', 'ml-serving', 'mlflow', 'airflow'] as const; + const EXPECTED_HTTP_SERVICES = ['api', 'ml-serving', 'mlflow'] as const; const EXPECTED_INTERNAL = ['sqlite', 'event-bus'] as const; const VALID_STATUSES = new Set(['ok', 'degraded', 'down']); @@ -404,7 +404,6 @@ describe('GET /api/admin/health', () => { let name: string; if (s.includes(':8000')) name = 'ml-serving'; else if (s.includes(':5000')) name = 'mlflow'; - else if (s.includes(':8080')) name = 'airflow'; else name = 'api'; if (!upServices.has(name)) throw new Error(`ECONNREFUSED ${name}`); @@ -415,7 +414,7 @@ describe('GET /api/admin/health', () => { afterEach(() => vi.unstubAllGlobals()); it('shape: 200, typed fields, all expected services present', async () => { - mockFetch(new Set(['api', 'ml-serving', 'mlflow', 'airflow'])); + mockFetch(new Set(['api', 'ml-serving', 'mlflow'])); const { server, call } = await startServer(buildApp()); try { const { status, body } = await call('GET', '/api/admin/health'); @@ -440,7 +439,7 @@ describe('GET /api/admin/health', () => { }); it('ok=true when all HTTP services respond 200', async () => { - mockFetch(new Set(['api', 'ml-serving', 'mlflow', 'airflow'])); + mockFetch(new Set(['api', 'ml-serving', 'mlflow'])); const { server, call } = await startServer(buildApp()); try { const { body } = await call('GET', '/api/admin/health'); @@ -456,7 +455,7 @@ describe('GET /api/admin/health', () => { }); it('ml-serving=down and ok=false when ml-serving is unreachable', async () => { - mockFetch(new Set(['api', 'mlflow', 'airflow'])); // ml-serving absent + mockFetch(new Set(['api', 'mlflow'])); // ml-serving absent const { server, call } = await startServer(buildApp()); try { const { body } = await call('GET', '/api/admin/health'); @@ -469,22 +468,8 @@ describe('GET /api/admin/health', () => { } }); - it('airflow=down and ok=false when airflow is unreachable', async () => { - mockFetch(new Set(['api', 'ml-serving', 'mlflow'])); // airflow absent - const { server, call } = await startServer(buildApp()); - try { - const { body } = await call('GET', '/api/admin/health'); - const b = body as HealthBody; - const svc = b.services.find((s) => s.name === 'airflow'); - expect(svc?.status).toBe('down'); - expect(b.ok).toBe(false); - } finally { - server.close(); - } - }); - it('mlflow=down and ok=false when mlflow is unreachable', async () => { - mockFetch(new Set(['api', 'ml-serving', 'airflow'])); // mlflow absent + mockFetch(new Set(['api', 'ml-serving'])); // mlflow absent const { server, call } = await startServer(buildApp()); try { const { body } = await call('GET', '/api/admin/health'); diff --git a/services/api/src/routes/admin.ts b/services/api/src/routes/admin.ts index 7347f4d..40e31d4 100644 --- a/services/api/src/routes/admin.ts +++ b/services/api/src/routes/admin.ts @@ -524,14 +524,10 @@ router.get('/data-quality', async (req: AuthenticatedRequest, res: Response) => // Fan-out to all subsystem /health endpoints. // --------------------------------------------------------------------------- router.get('/health', async (_req: AuthenticatedRequest, res: Response) => { - const airflowAuth = Buffer.from(`${config.AIRFLOW_API_USER}:${config.AIRFLOW_API_PASSWORD}`).toString('base64'); - const checks: Array<{ name: string; url: string; headers?: Record }> = [ { name: 'api', url: `http://localhost:${config.PORT}/health` }, { name: 'ml-serving', url: `${config.ML_SERVING_URL}/health` }, { name: 'mlflow', url: `${config.MLFLOW_URL}/health` }, - { name: 'airflow', url: `${config.AIRFLOW_URL}/api/v1/health`, - headers: { Authorization: `Basic ${airflowAuth}` } }, ]; const results = await Promise.allSettled( @@ -705,8 +701,7 @@ router.delete('/saved-queries/:id', async (req: AuthenticatedRequest, res: Respo // --------------------------------------------------------------------------- // POST /api/admin/simulate/start -// Trigger an Airflow DAG run (bandit_sim). Falls back to a local subprocess -// when AIRFLOW_URL is not reachable, so local dev still works. +// Trigger a bandit_sim run via local subprocess. // --------------------------------------------------------------------------- router.post('/simulate/start', async (req: AuthenticatedRequest, res: Response) => { const { @@ -745,56 +740,7 @@ router.post('/simulate/start', async (req: AuthenticatedRequest, res: Response) createdAt: now, }); - // ── Try Airflow first ──────────────────────────────────────────────────── - if (config.AIRFLOW_URL && config.INTERNAL_API_TOKEN) { - try { - const airflowAuth = Buffer.from( - `${config.AIRFLOW_API_USER}:${config.AIRFLOW_API_PASSWORD}`, - ).toString('base64'); - - const dagRes = await fetch( - `${config.AIRFLOW_URL}/api/v1/dags/bandit_sim/dagRuns`, - { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - Authorization: `Basic ${airflowAuth}`, - }, - body: JSON.stringify({ - conf: { - sim_run_id: id, - n_users: nUsers, - n_rounds: nRounds, - tasks_per_round: tasksPerRound, - policies, - judge_mode: judgeMode, - ml_url: config.ML_SERVING_URL, - mlflow_url: config.MLFLOW_URL, - callback_url: `${config.API_BASE_URL}/api/admin/simulate/${id}/complete`, - internal_token: config.INTERNAL_API_TOKEN, - }, - }), - signal: AbortSignal.timeout(5000), - }, - ); - - if (dagRes.ok) { - const dagBody = await dagRes.json() as { dag_run_id: string }; - await db - .update(simRuns) - .set({ airflowDagRunId: dagBody.dag_run_id }) - .where(eq(simRuns.id, id)); - - res.json({ id, status: 'running', airflow_dag_run_id: dagBody.dag_run_id }); - return; - } - logger.warn({ status: dagRes.status }, 'sim: Airflow trigger failed, falling back to subprocess'); - } catch (err) { - logger.warn({ err }, 'sim: Airflow unreachable, falling back to subprocess'); - } - } - - // ── Subprocess fallback (local dev / Airflow not configured) ──────────── + // ── Subprocess ─────────────────────────────────────────────────────────── const runnerPath = resolve(__dirname, '../../../../ml/experiments/sim/runner.py'); const venvPython = resolve(__dirname, '../../../../ml/serving/.venv/bin/python'); const pythonBin = existsSync(venvPython) ? venvPython : 'python3'; diff --git a/services/api/src/routes/agent-outputs.ts b/services/api/src/routes/agent-outputs.ts new file mode 100644 index 0000000..8dbc1b3 --- /dev/null +++ b/services/api/src/routes/agent-outputs.ts @@ -0,0 +1,220 @@ +import { Router } from 'express'; +import { nanoid } from 'nanoid'; +import { db } from '../db/index.js'; +import { agentOutputs, tipFeedback, tipViews } from '../db/schema.js'; +import { eq, and, gt, lt } from 'drizzle-orm'; +import { config } from '../config.js'; +import { getProfile } from '../profile/builder.js'; +import { todoistSource } from '../signals/todoist.js'; +import { SignalAggregator } from '../signals/aggregator.js'; +import type { Request, Response } from 'express'; + +const router = Router(); + +// Separate aggregator instance — avoids circular dep with recommender.ts. +const _agentAggregator = new SignalAggregator().register(todoistSource); + +// ── Internal auth helper ────────────────────────────────────────────────────── + +function checkInternalToken(req: Request, res: Response): boolean { + const token = req.headers['x-internal-token']; + if (!config.INTERNAL_API_TOKEN || token !== config.INTERNAL_API_TOKEN) { + res.status(401).json({ error: 'Unauthorized' }); + return false; + } + return true; +} + +// ── DB helpers ──────────────────────────────────────────────────────────────── + +export async function getActiveAgentOutputs(userId: string) { + const now = new Date().toISOString(); + return db + .select() + .from(agentOutputs) + .where(and(eq(agentOutputs.userId, userId), gt(agentOutputs.expiresAt, now))); +} + +async function storeAgentOutput(output: { + user_id: string; + agent_id: string; + prompt_text: string; + signals_snapshot?: unknown; + computed_at: string; + expires_at: string; + agent_version: string; +}) { + await db + .delete(agentOutputs) + .where(and(eq(agentOutputs.userId, output.user_id), eq(agentOutputs.agentId, output.agent_id))); + await db.insert(agentOutputs).values({ + id: nanoid(), + userId: output.user_id, + agentId: output.agent_id, + promptText: output.prompt_text, + signalsSnapshot: output.signals_snapshot ? JSON.stringify(output.signals_snapshot) : null, + computedAt: output.computed_at, + expiresAt: output.expires_at, + agentVersion: output.agent_version, + }); +} + +// ── GET /api/agents/active-users ────────────────────────────────────────────── +// Returns user IDs that have requested a tip in the last 48 hours. +// Returns user IDs for fan-out precompute tasks. + +router.get('/active-users', async (req: Request, res: Response) => { + if (!checkInternalToken(req, res)) return; + const cutoff = new Date(Date.now() - 48 * 60 * 60 * 1000).toISOString(); + try { + const rows = await db + .selectDistinct({ userId: tipViews.userId }) + .from(tipViews) + .where(gt(tipViews.servedAt, cutoff)); + res.json({ user_ids: rows.map((r) => r.userId) }); + } catch (err: any) { + res.status(500).json({ error: err.message }); + } +}); + +// ── POST /api/agents/:agentId/compute ───────────────────────────────────────── +// Orchestrating endpoint for per-(user, agent) compute tasks. +// Fetches all signals, calls ml/serving /agents/{agentId}/compute, stores result. +// Body: { user_id: string } + +router.post('/:agentId/compute', async (req: Request, res: Response) => { + if (!checkInternalToken(req, res)) return; + + const { agentId } = req.params as { agentId: string }; + const { user_id } = req.body as { user_id: string }; + + if (!user_id) { + res.status(422).json({ error: 'Missing user_id' }); + return; + } + + try { + // Fetch tasks via Todoist integration (gracefully empty if not connected). + let tasks: object[] = []; + try { + const signals = await _agentAggregator.fetchAll(user_id); + tasks = signals.map((s) => ({ + id: s.id, + content: s.content, + priority: (s.features.priority as number) ?? 1, + is_overdue: Boolean(s.features.is_overdue), + task_age_days: (s.features.task_age_days as number) ?? 0, + project_id: (s.metadata as Record).project_id ?? null, + })); + } catch { + // No integration or fetch error — agents that need tasks will report "no tasks" + } + + // Fetch profile features (lazy-refreshed from DB). + let profile: Record = {}; + try { + profile = await getProfile(user_id); + } catch {} + + // Fetch last 7 days of feedback for RecentPatternsAgent. + const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString(); + const feedbackRows = await db + .select({ action: tipFeedback.action, dwellMs: tipFeedback.dwellMs, createdAt: tipFeedback.createdAt }) + .from(tipFeedback) + .where(and(eq(tipFeedback.userId, user_id), gt(tipFeedback.createdAt, sevenDaysAgo))); + + const feedbackHistory = feedbackRows.map((f) => ({ + action: f.action, + dwell_ms: f.dwellMs, + created_at: f.createdAt, + })); + + // Call ml/serving to run the agent. + const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ user_id, tasks, profile, feedback_history: feedbackHistory }), + signal: AbortSignal.timeout(15_000), + }); + + if (!mlResp.ok) { + const detail = await mlResp.text().catch(() => ''); + res.status(502).json({ error: `ml/serving returned ${mlResp.status}`, detail }); + return; + } + + const output = await mlResp.json() as { + user_id: string; agent_id: string; prompt_text: string; + signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string; + }; + + await storeAgentOutput(output); + res.json({ ok: true, agent_id: output.agent_id, user_id: output.user_id, expires_at: output.expires_at }); + } catch (err: any) { + res.status(500).json({ error: err.message }); + } +}); + +// ── POST /api/agents/outputs ────────────────────────────────────────────────── +// Stores a pre-computed agent output directly (used if the DAG calls ml/serving +// itself and pushes the result separately). + +router.post('/outputs', async (req: Request, res: Response) => { + if (!checkInternalToken(req, res)) return; + + const { user_id, agent_id, prompt_text, signals_snapshot, computed_at, expires_at, agent_version } = + req.body as Record; + + if (!user_id || !agent_id || !prompt_text || !computed_at || !expires_at || !agent_version) { + res.status(422).json({ + error: 'Missing required fields: user_id, agent_id, prompt_text, computed_at, expires_at, agent_version', + }); + return; + } + + try { + await storeAgentOutput({ user_id, agent_id, prompt_text, signals_snapshot, computed_at, expires_at, agent_version }); + res.json({ ok: true }); + } catch (err: any) { + res.status(500).json({ error: err.message }); + } +}); + +// ── DELETE /api/agents/outputs/expired ─────────────────────────────────────── +// Purges rows expired more than 24 hours ago. + +router.delete('/outputs/expired', async (req: Request, res: Response) => { + if (!checkInternalToken(req, res)) return; + const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString(); + try { + await db.delete(agentOutputs).where(lt(agentOutputs.expiresAt, cutoff)); + res.json({ ok: true }); + } catch (err: any) { + res.status(500).json({ error: err.message }); + } +}); + +// ── GET /api/agents/:userId/outputs ────────────────────────────────────────── +// Returns non-expired agent outputs. Admin observability; recommender calls +// getActiveAgentOutputs() directly (no HTTP hop). + +router.get('/:userId/outputs', async (req: Request, res: Response) => { + const { userId } = req.params as { userId: string }; + try { + const rows = await getActiveAgentOutputs(userId); + res.json({ + user_id: userId, + outputs: rows.map((r) => ({ + agent_id: r.agentId, + prompt_text: r.promptText, + computed_at: r.computedAt, + expires_at: r.expiresAt, + agent_version: r.agentVersion, + })), + }); + } catch (err: any) { + res.status(500).json({ error: err.message }); + } +}); + +export default router; diff --git a/services/api/src/routes/bench.ts b/services/api/src/routes/bench.ts index 886c853..b1c1a43 100644 --- a/services/api/src/routes/bench.ts +++ b/services/api/src/routes/bench.ts @@ -18,10 +18,6 @@ const MLFLOW_URL = process.env.MLFLOW_URL || "http://mlflow:5000"; const MLFLOW_USER = process.env.MLFLOW_TRACKING_USERNAME || "admin"; const MLFLOW_PASS = process.env.MLFLOW_TRACKING_PASSWORD || "password"; -const AIRFLOW_URL = process.env.AIRFLOW_URL || "http://airflow-webserver:8080"; -const AIRFLOW_USER = process.env.AIRFLOW_API_USER || "admin"; -const AIRFLOW_PASS = process.env.AIRFLOW_API_PASSWORD || "admin"; - // Wrapper for MLflow REST calls with Host header fix async function mlflowFetch( path: string, @@ -65,44 +61,6 @@ router.get("/experiments", async (req: Request, res: Response) => { } }); -// POST /api/bench/run — trigger benchmark DAG -router.post("/run", async (req: Request, res: Response) => { - try { - const config = req.body || {}; - const experiment = config.experiment || "tip-bench-admin"; - - const dagRunUrl = new URL("/api/v1/dags/bench_collect/dagRuns", AIRFLOW_URL); - const auth = Buffer.from(`${AIRFLOW_USER}:${AIRFLOW_PASS}`).toString( - "base64" - ); - - const response = await fetch(dagRunUrl.toString(), { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Basic ${auth}`, - }, - body: JSON.stringify({ - conf: config, - dag_run_id: `bench-${Date.now()}`, - }), - }); - - if (!response.ok) { - throw new Error(`Airflow ${response.status}: ${response.statusText}`); - } - - const result = await response.json(); - res.json({ - status: "triggered", - dag_run_id: result.dag_run_id, - experiment, - }); - } catch (err) { - res.status(500).json({ error: String(err) }); - } -}); - // GET /api/bench/runs/:experiment — list runs in an experiment router.get("/runs/:experiment", async (req: Request, res: Response) => { try {