Compare commits

..

7 Commits

Author SHA1 Message Date
f8d66aa01f chore: remove Airflow completely from the stack
Drop all four Airflow containers (db, init, webserver, scheduler) from the
mlops compose profile, leaving MLflow as the sole mlops service. Remove
AIRFLOW_* env vars, config fields, health-check entries, DAG trigger code
in admin/bench routes, the airflow_dag_run_id schema column, Airflow nav
links and DAG-run links in the admin UI, the two Airflow DAG files
(bench_dag.py, sim_dag.py), and all related docs/ADR references.
Simulations now run exclusively via the subprocess path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-03 16:38:46 +00:00
ce1c8bde57 fix(admin): simulations view-only + docs path in Docker (#109 #110)
- simulate/page.tsx: remove launch form — simulations are triggered via
  Airflow DAG, not the admin UI. Page now shows run history + links to
  Airflow and MLflow only (#109)
- docs.ts: use DOCS_ROOT env var (fallback: ../../docs for local dev) so
  the path works in Docker standalone where CWD is /app (#110)
- Dockerfile.admin: copy docs/ into the runner image at /app/docs and set
  DOCS_ROOT=/app/docs so listAllDocs() finds the files at runtime (#110)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 13:55:50 +00:00
c1f5fcb561 fix(admin): ops page — add section description, remove redundant footer (#107)
Adds a one-line purpose description under the Ops heading so it is clear
what the section is for (shadow policy toggles, signal replay, per-user
actions). Removes the duplicate "User-level actions" subsection whose
content is now covered by the header description.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 13:53:35 +00:00
9bd60a9835 feat(web): action sheet cleanup + settings page (#100 #101 #102)
- Remove "Helpful"/"Not helpful" from action sheet — reward is inferred
  from done/snooze/dismiss + dwell time; explicit sentiment buttons were
  redundant and cluttered the UI (#100)
- Move "notify me" push subscription button to new /config page (#101)
- Add settings gear icon (bottom-right, fixed) on tip page linking to /config (#102)
- New /config page: push notification toggle + link to /connect integrations

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 13:52:45 +00:00
4267e6ac68 feat(ml/serving): inject profile features + sort tasks in tip prompt (#79)
- prompts.py: sort tasks overdue-first → priority desc → age desc before
  rendering into the LLM prompt (same ordering as ml/features/context.py)
- prompts.py: render User profile summary line (completion_rate, dismiss_rate,
  preferred_hour) when profile_features are present
- main.py: add profile_features field to PromptContext; plumb from
  GenerateRequest into the prompt builder via model_copy
- logging_config.py: drop add_logger_name processor (incompatible with
  PrintLoggerFactory — caused test ordering failures)
- test_generate.py: 6 new tests covering sort order, profile rendering,
  partial fields, empty profile, and end-to-end plumbing through /generate

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 13:46:16 +00:00
0474ad4deb feat(airflow): integrate bench harness into bench_collect DAG
New DAG (`ml/pipelines/bench_dag.py`) with three linked tasks:
1. collect.py — generates candidates, logs to MLflow
2. export_for_judge — exports pending runs for Claude Code scoring
3. compare — generates leaderboard by (model, prompt) cell

Config via dag_run.conf supports all collect.py options (models, prompts,
n_tips, n_scenarios, temperature, experiment name, max_model_b).

New admin API endpoints (`services/api/src/routes/bench.ts`):
- GET /api/bench/experiments — list tip-bench-* experiments
- POST /api/bench/run — trigger DAG with custom config
- GET /api/bench/runs/:experiment — list runs in experiment
- GET /api/bench/leaderboard/:experiment — leaderboard by (model, prompt)

All endpoints require admin auth. Human judge (Claude Code) scores are
applied manually post-export; future enhancement: add webhook to DAG.

Admin UI can now trigger and monitor benchmarks from a dashboard panel.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 11:54:30 +00:00
556019b060 feat(bench): MLflow-based tip-generation benchmark harness (#93, #95)
Combines model evaluation (#93) and prompt A/B testing (#95) into one
experiment. Evaluates all (model × prompt × scenario) cells on the same
fixed contexts so quality differences are attributable.

Architecture:
- Phase A (collect.py): generates candidates per cell, logs to MLflow
  with judge_pending=true. Rejects models >4B, uses keep_alive=0 for
  RAM safety (no concurrent model weights in VRAM).
- Phase B (judge_cli.py): exports pending runs as JSON for Claude Code
  to score per the rubric, then applies scores back to MLflow.
- Phase C (compare.py): leaderboard by (model, prompt) cell.

Rubric (tip-v1) defines 1–5 scales for relevance, actionability, tone,
plus format_ok and overlong flags. Composite = rel + act + tone +
2×format_ok − overlong. Rubric is self-describing and persisted in every
run so judges use consistent criteria across sessions.

Artifacts (prompts, candidates, raw responses) stored as MLflow tags
because the server uses a file:// backend not accessible via REST. Full
artifacts accessible in MLflow UI → run → Tags section.

Tested end-to-end on local machine:
- 4 models (qwen2.5:0.5b/1.5b, gemma3:1b, llama3.2:3b) ≤4B
- 3 prompts (v1, v2-mentor, v3-few-shot)
- 4 scenarios (4 personas × 2 time-slots)
- 48 cells total, all judged and ranked

Winner: qwen2.5:1.5b × v3-few-shot (composite=12.75).

Ready for integration into Airflow prompt_ab_eval DAG and admin UI.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 11:48:59 +00:00
40 changed files with 2282 additions and 608 deletions

View File

@@ -18,18 +18,7 @@ MLFLOW_ADMIN_PASSWORD=change-me
# Public URL shown as link in the admin sidebar (must be NEXT_PUBLIC_ to reach the browser).
NEXT_PUBLIC_MLFLOW_URL=http://localhost:5000
# Airflow (mlops profile) — http://localhost:8080/airflow in dev.
# Start with: docker compose --profile full --profile mlops up
AIRFLOW_URL=http://localhost:8080
AIRFLOW_ADMIN_PASSWORD=change-me
AIRFLOW_DB_PASSWORD=airflow
AIRFLOW_SECRET_KEY=change-me-in-prod
AIRFLOW_FERNET_KEY=
AIRFLOW_BASE_URL=https://o.alogins.net/airflow
# Public URL shown as link in the admin sidebar (must be NEXT_PUBLIC_ to reach the browser).
NEXT_PUBLIC_AIRFLOW_URL=http://localhost:8080
# Shared secret for Airflow→API internal callbacks. Generate: openssl rand -hex 32
# Shared secret for internal API callbacks. Generate: openssl rand -hex 32
INTERNAL_API_TOKEN=
# Static token for automated/service access to the admin panel (e.g. Playwright tests).

View File

@@ -42,7 +42,7 @@ packages/ shared libraries (importable across services + apps)
ml/ Python — separate deployable from day one
serving/ online scorer (FastAPI), called by recommender
features/ feature definitions + store adapter
pipelines/ batch feature + training DAGs (Prefect/Airflow)
pipelines/ batch feature + training scripts
registry/ MLflow model registry integration
experiments/ assignment + A/B + bandit policies
notebooks/ research only; never imported by production code
@@ -65,7 +65,7 @@ docs/ architecture notes, ADRs, API specs
- One PR = one concern. Conventional-commit prefixes (`feat:`, `fix:`, `chore:`, `docs:`, `refactor:`).
- ADRs go in `docs/adr/NNNN-title.md` for any decision that constrains future work.
- No secrets in repo. Local dev via `.env.local` (gitignored), prod via the server's secret store (Vaultwarden now; k8s secrets later).
- Compose profiles: `core` (api + web + admin), `full` (adds ml-serving), `mlops` (adds MLflow + Airflow), `ai` (adds Ollama + LiteLLM). Mix as needed.
- Compose profiles: `core` (api + web + admin), `full` (adds ml-serving), `mlops` (adds MLflow), `ai` (adds Ollama + LiteLLM). Mix as needed.
## Definition of done (per feature)
@@ -98,9 +98,19 @@ Ollama and LiteLLM are **shared Agap services**, not oO services — they live i
## Current phase
**M1 shipped. M2 (AI tips) in progress.** See `README.md` for the phase roadmap and `docs/architecture/` for diagrams. Work is tracked as Gitea milestones + issues on `alvis/oO`.
**M1 shipped (core + admin). M2 (AI tips) in progress.** See `README.md` for the phase roadmap and `docs/architecture/` for diagrams. Work is tracked as Gitea milestones + issues on `alvis/oO`.
Active work: bandit promotion (#99 — offline sim + ADR-0012 pending) and M2 issues (#61 freshness SLAs, #78 signal abstraction, #93 model benchmark).
Recent completions (M1 add-on):
- ADR-0012 — ε-greedy v2 promotion (profile features, D=12) — 2026-04-26
- Offline sim framework + MLflow integration — shipped in M1 add-on
- Token-based admin auth for Playwright/CI — secured auth boundary
Active work (M2):
- Signal abstraction for multi-source support (#78)
- Per-user feature freshness SLAs (#61, ADR-0011 phase B)
- LLM context assembler + tip generation scaffold (#79, #88)
- Model benchmarking for tip generation (#93)
- Admin UX refinements: feedback consolidation, settings placement (#100102)
## What NOT to do
@@ -110,7 +120,7 @@ Active work: bandit promotion (#99 — offline sim + ADR-0012 pending) and M2 is
- Don't replace a policy in one step. New policies deploy shadow-first; promoted only after offline + online agreement with the incumbent (ADR-0002).
- Don't over-split processes. Extract a service when pressure demands it, not in anticipation (ADR-0003).
- Don't call LLMs directly from application code. All LLM calls go through `ml/serving` (Python) via `LITELLM_URL`. The TS recommender never holds a model name.
- Don't embed MLflow/Airflow/OpenWebUI in the admin panel. They are external services; link out to them. The admin shell links to `o.alogins.net/mlflow`, `/airflow`, `ai.alogins.net`.
- Don't embed MLflow/OpenWebUI in the admin panel. They are external services; link out to them. The admin shell links to `o.alogins.net/mlflow`, `ai.alogins.net`.
- Don't `nats.publish()` directly from feature code. All publishes go through the in-process `Bus` (`services/api/src/events/bus.ts`); the NATS adapter (`events/nats.ts`) bridges every publish to JetStream when `NATS_URL` is set. This keeps subscribers, the ring-buffer tail used by the admin event viewer, and JetStream all in lockstep.
## Admin app

View File

@@ -104,13 +104,15 @@ User signals ──▶ Context assembler ──▶ LiteLLM ──▶ Ollam
**Why Ollama first:** Tips contain personal context. Local inference means no user data leaves the host for the inference path. Cloud models (Anthropic, OpenAI) are opt-in fallbacks for evaluation and simulation only, gated behind `ANTHROPIC_API_KEY`.
### Models (planned)
### Models (planned; routes through LiteLLM)
| Alias | Model | Task |
|-------|-------|------|
| `tip-generator` | qwen2.5:7b (default) | Generate typed tip candidates from user context |
| `embedder` | nomic-embed-text | Task clustering, semantic similarity for dedup |
| `judge` | claude-haiku-4-5 (cloud, eval-only) | Offline sim judge; rates tip quality for A/B |
| `tip-generator` | qwen2.5:1.5b (default) | Generate typed tip candidates from user context; local-first via Ollama |
| `embedder` | nomic-embed-text | Task clustering, semantic similarity for dedup; local via Ollama |
| `judge` | claude-haiku-4-5 (cloud, eval-only) | Offline sim judge; rates tip quality for A/B (requires `ANTHROPIC_API_KEY`) |
All model calls route through **LiteLLM** at `llm.alogins.net` (or `LITELLM_URL` env var) using model aliases. This decouples tip generation from model selection — swap the backend model in LiteLLM config without code changes. See ADR-0008.
---
@@ -134,22 +136,24 @@ Goal: tips are picked, not drawn from a hat — and they arrive at the right mom
- [x] Event bus scaffold: typed in-process EventEmitter with 500-event ring buffer; subjects match future NATS JetStream — swap is mechanical
- [x] Todoist sync emits `signals.task.synced`; tip served/feedback emit `signals.tip.*`
- [x] Features extracted per task: `is_overdue`, `task_age_days`, `priority`; context: `hour_of_day`, `day_of_week`
- [x] `ml/serving` LinUCB (d=5) + **ε-greedy v1** (d=7, ε=0.10, day-of-week sin/cos features); per-user state persisted to disk
- [x] **ε-greedy v1** (d=7, ε=0.10, day-of-week sin/cos features); per-user state persisted to disk
- [x] **ε-greedy v2** (d=12, profile features: completion rate, dismiss rate, dwell, preferred hour, tip volume) in shadow; promoted to active policy (ADR-0012)
- [x] `RemotePolicy` in recommender: calls ml/serving, falls back to RandomPolicy on timeout/error; logs explainability to `tip_scores`
- [x] Feedback loop: dwell-time inferred reward (`inferReward`) → online model update; `done` in 15 s2 min = +1.0 (magic zone)
- [x] Offline simulation framework (`ml/experiments/sim`): rule/LLM/claude-code judges, two-policy comparison, results persisted to `sim_runs` + `sim_events`
- [x] **ε-greedy v1 promoted to active policy** (ADR-0007) — +10.7% mean reward vs LinUCB in offline sim
- [x] **Web Push** (VAPID): SW, subscribe/unsubscribe API, "notify me" button on tip page
- [x] Shadow-policy registry: run N shadow policies per request, log picks without serving them (#56)
- [x] NATS JetStream bridge — durable `signals.>` and `feedback.>` streams; in-process bus stays the source of truth, every publish bridges out (#21, shipped)
- [x] Per-user profile features (completion rate, dismiss rate, dwell, preferred hour, tip volume) — event-driven, JIT invalidation (#81)
- [ ] Quiet-hours + dedupe for push delivery
- [ ] Delayed rewards: tasks completed directly in Todoist (requires webhook from Todoist)
- [x] NATS JetStream bridge — durable `signals.>` and `feedback.>` streams; in-process bus stays the source of truth, every publish bridges out (#21, shipped)
- [ ] Apple OAuth (deferred to M3)
#### M1 add-on — Admin & ML Ops Console *(fully shipped)*
oO is ML-heavy. Without a cockpit, every model change ships blind. This console is the team's single pane for users, signals, features, models, experiments, and tip outcomes — with the ability to *act* on them (revoke a token, replay an event, promote a model, reset a bandit).
**Framework pick — `apps/admin` on Next.js 15 + Tremor + shadcn/ui.** Analytics-first UI for an analytics-first product, stays on our existing TS/React/Tailwind stack, reuses `packages/shared-types`, `sdk-js`, and the Auth.js session. Specialized ML tooling (MLflow, Airflow) runs as **separate external services** linked from the admin shell; Grafana panels are embedded.
**Framework pick — `apps/admin` on Next.js 15 + Tremor + shadcn/ui.** Analytics-first UI for an analytics-first product, stays on our existing TS/React/Tailwind stack, reuses `packages/shared-types`, `sdk-js`, and the Auth.js session. Specialized ML tooling (MLflow) runs as a **separate external service** linked from the admin shell; Grafana panels are embedded.
| Layer | Tool | Why |
|-------|------|-----|
@@ -159,7 +163,6 @@ oO is ML-heavy. Without a cockpit, every model change ships blind. This console
| Heavy grids | **[TanStack Table v8](https://tanstack.com/table)** | Sortable / paginated / virtualized tables (events, users, tips) |
| Extra charts | **[Recharts](https://recharts.org)** / **[visx](https://airbnb.io/visx)** | Fallbacks where Tremor falls short (e.g. force graphs, Sankey) |
| Model registry / experiments | **[MLflow](https://mlflow.org)** *(external — `o.alogins.net/mlflow`)* | Experiment tracking, artifact browser, model registry; own basic-auth |
| Pipeline orchestration | **[Airflow](https://airflow.apache.org)** *(external — `o.alogins.net/airflow`)* | Batch feature + retraining DAGs; own web-auth |
| Infra metrics | **[Grafana](https://grafana.com)** *(embedded panels)* | One ops source of truth |
| Ad-hoc analysis | **[Marimo](https://marimo.io)** reactive notebooks | Python-native for the ML side; launch-out link |
| AuthZ | `profile.role='admin'` + Next.js middleware | Reuses existing session; no new auth surface |
@@ -170,27 +173,25 @@ oO is ML-heavy. Without a cockpit, every model change ships blind. This console
- *React-admin / Refine.dev* — strong CRUD scaffolding, but analytics/ML views feel bolted on; we'd rebuild Tremor-style dashboards ourselves
- *Superset / Metabase as the admin surface* — excellent for BI, poor for operational **writes** (revoke, replay, promote). Plan: **adopt Superset in M4** for BI alongside batch pipelines; ship a read-only SQL widget inside admin for now
**Build sequence (plan, not code):**
**Build sequence:**
1. [x] **ADR-0006** — record the framework choice + "embed, don't rebuild" rule for MLflow/Grafana
2. [x] **Scaffold**`apps/admin` with Next.js 15, Tailwind, Tremor; deploy behind Caddy at `admin.o.alogins.net`
3. [x] **RBAC**`role` column on `users`; admin-only Next.js middleware; seed first admin via `ADMIN_SEED_EMAIL` env; `admin_actions` audit-log table
4. [x] **Overview dashboard** — DAU/WAU KPI cards, tips served, reaction breakdown, activation funnel
5. [x] **User explorer** — list + detail page: identity, consents, integrations, last tip, reward history; revoke-integration + reset-bandit actions
5. [x] **User explorer** — list + detail page: identity, consents, integrations, last tip, reward history; revoke-integration + reset-bandit + rebuild-profile actions
6. [x] **Event stream viewer** — live tail of `signals.*` with filters by subject/user/time; same UI when the bus swaps to NATS
7. [x] **Feature store browser** — features sent to `ml/serving` per scoring call; diff across time for a user
8. [x] **Model registry panel**`/admin/models` links out to MLflow (`mlflow.o.alogins.net`); experiment tracking and dataset management in MLflow + Airflow
9. [x] **MLOps hub**`/admin/experiments` links to MLflow experiments/models and Airflow DAGs/datasets; bandit reset on Users page
10. [x] **Recommendation log (explainability)** — per served tip: `(user, features, policy, score, feedback, latency)`; `tip_scores` table, 30-day retention
11. [x] **Reward analytics** — reaction distribution over time; per-policy compare; slice by `hour_of_day`, `priority`, cohort
12. [x] **Data quality widget** — missing-feature rate, stale-token rate, daily completeness heatmap
13. [x] **Ops actions** — revoke token (Users page), replay signal, disable/promote shadow policy; every action audit-logged
14. [x] **Read-only SQL runner** — SELECT-only runner against SQLite + saved queries (sunsets to Superset in M4)
15. [x] **Health rollup**`/admin/health` surfaces api, ml/serving, SQLite, event-bus; auto-refreshes every 15s
16. [ ] **Docs**`apps/admin/README.md`, runbook for common ops actions, ADR-0006 merged
7. [x] **Features page** — features sent to `ml/serving` per scoring call; per-user profile features with freshness; diff across time
8. [x] **Tips page** — tips served, scored, feedback reactions with policy/model breakdown
9. [x] **Reward analytics** — reaction distribution over time; per-policy / per-model / per-prompt-version compare; slice by `hour_of_day`, `priority`, cohort
10. [x] **Data quality widget** — missing-feature rate, stale-token rate, daily completeness heatmap; per-feature freshness SLA status
11. [x] **Ops actions** — revoke token (Users page), rebuild profile, reset bandit, enable/disable shadow policies; every action audit-logged
12. [x] **Health rollup**`/admin/health` surfaces api, ml/serving, SQLite, event-bus, MLflow; auto-refreshes every 15s
13. [x] **Read-only SQL runner** — SELECT-only runner against SQLite + saved queries (sunsets to Superset in M4)
14. [x] **Offline simulation runner** — launch `ml/experiments/sim` from admin UI; track sim runs, judge, policy comparison
15. [x] **Token-based admin auth**`POST /api/auth/token` for Playwright/CI; `ADMIN_TOKEN` env var (#105)
16. [x] **Docs pages**admin documentation and runbooks inline
- [ ] Apple OAuth (deferred to M2)
### Phase 2 — AI tips + multi-source signals *(M2)*
### Phase 2 — AI tips + multi-source signals *(M2)* in progress
Goal: tips are AI-generated from user context, not just raw Todoist tasks. Multiple signal sources feed a generalized pipeline. Research-intensive milestone.
**AI infrastructure (unblock everything else):**
@@ -198,21 +199,21 @@ Goal: tips are AI-generated from user context, not just raw Todoist tasks. Multi
- [ ] AI gateway — wire `ml/serving` to LiteLLM; model aliases `tip-generator` + `embedder` (#87)
**AI tip generation pipeline:**
- [ ] Context assembler — user signals + feature store → structured prompt context (`ml/features/context.py`) (#88)
- [x] Context assembler — user signals + feature store → structured prompt context (`ml/features/context.py`); skeleton implemented
- [ ] Tip generator endpoint — `POST /generate` in `ml/serving`; LLM → N typed `TipCandidate` objects (#79)
- [ ] `TipCandidate` shared schema — `{content, kind, source, model, prompt_version, confidence}`; update recommender pipeline (#89)
- [ ] LLM output validation + retry — JSON schema gate, clarification retry (2×), fallback to task-based (#90)
- [ ] Prompt versioning — `prompt_version` + `model` columns in `tip_scores`; content-hash invalidation (#91)
- [ ] LLM tip quality dashboard — reaction breakdown by model / prompt_version in `/admin/reward-analytics` (#92)
- [x] LLM tip quality dashboard — reaction breakdown by model / prompt_version in `/admin/reward-analytics` (#92)
**Evaluation & model selection:**
- [ ] Model benchmark — compare qwen2.5:7b / llama3.2:3b / gemma3:4b via offline sim + LLM judge (#93)
- [ ] LLM prompt research — persona design, context injection strategies, few-shot examples (#84)
**Pipeline architecture:**
- [ ] Signal source abstraction — `SignalSource` interface generalizing beyond Todoist (#78)
- [x] Signal source abstraction — `SignalSource` interface for Todoist + extensible design (#78)
- [ ] Generalized recommendation pipeline — candidate → rank → render stages (#80)
- [ ] Feature registry + user profile builder — centralized features, persistent profiles (#81)
- [x] Feature registry + user profile builder — centralized features, persistent profiles, event-driven invalidation (#81)
- [ ] Tip kind system — task, advice, insight, reminder with kind-aware UI + rewards (#82)
**Policy research:**
@@ -222,33 +223,36 @@ Goal: tips are AI-generated from user context, not just raw Todoist tasks. Multi
- [ ] Apple OAuth (#7)
- [x] NATS JetStream replacing in-process bus (#21) — adapter ships in `services/api/src/events/nats.ts`; in-proc bus is the producer, JetStream is the durable mirror
- [x] Todoist sync via events (#22) — background scheduler in `services/api/src/signals/scheduler.ts` emits `signals.task.synced` every `TODOIST_SYNC_INTERVAL_MS`; on-demand fetch remains as freshness fallback
- [ ] Event schema registry + protobuf CI gate (#54)
- [ ] Per-user freshness SLAs for features (#61)
- [ ] CI skeleton (#3), observability (#18), E2E tests (#20)
- [x] Event schema registry + protobuf CI gate (#54) — buf lint/breaking checks on every PR
- [x] Per-user freshness SLAs for features (#61) — context-feature (JIT) vs profile-feature (batched) spec in ADR-0011; CONTEXT_FEATURES in ml/features/context.py
- [x] Observability (#18) — structured logs via pino, W3C trace IDs, Sentry hooks, trace correlation end-to-end
- [ ] CI skeleton (#3), E2E tests (#20)
**Bugs (fix before new features):**
- [ ] TipFeedback type mismatch (#73)
- [ ] Todoist token refresh (#74)
- [ ] Reward fire-and-forget (#75)
- [ ] Data retention purge (#76)
- [ ] Port mismatch (#77)
**Bugs & UX (fix before new features):**
- [x] TipFeedback type mismatch (#73)
- [x] Todoist token refresh (#74) — OAuth token auto-refresh on 401
- [x] Reward fire-and-forget (#75) — retry logic + logging
- [x] Data retention purge (#76) — daily purge of 30-day-old tip_scores/tip_feedback
- [x] Port mismatch (#77) — fixed in docker-compose + env var config
- [ ] UX refinements (#100102) — "done/snooze/dismiss" feedback only, config page UI, settings gear button
### Phase 3 — Native mobile *(M3)*
- [ ] iOS app (SwiftUI) with APNs push
- [ ] Android app (Compose) with FCM push
- [ ] `notifier` gains APNs + FCM channels, per-device rate limits
- [ ] Migrate auth from Auth.js to dedicated OIDC provider (trigger from ADR-0004)
- [ ] Consolidate MLflow + Airflow behind shared OIDC (SSO for all internal services)
- [ ] Consolidate MLflow behind shared OIDC (SSO for all internal services)
- [ ] Decide-and-deliver scheduler: per-user "is this tip worth interrupting now?" threshold
### Phase 4 — MLOps at scale *(M4)*
- [x] Airflow + MLflow deployed as external services (`mlops` compose profile); each with own auth
- [ ] Write first retraining DAG (Airflow) + first MLflow experiment logging from `ml/serving`
- [ ] Feature-to-prompt pipeline — nightly Airflow DAG materializes context for LLM; cuts inline latency (#94)
- [x] MLflow deployed as external service (`mlops` compose profile); own auth; health check integrated
- [ ] Write first retraining pipeline + first MLflow experiment logging from `ml/serving` + JetStream consumers (#98)
- [ ] Feature-to-prompt pipeline — nightly batch job materializes context for LLM; cuts inline latency (#94)
- [ ] Prompt optimization loop — sim A/B → MLflow experiment → human-approved promotion (#95)
- [ ] LLM fine-tuning — tip reactions as training signal; LoRA on base model; MLflow tracks runs (#96)
- [ ] Embedding-based task clustering — `nomic-embed-text` for dedup + user pattern features (#97)
- [ ] Consolidate MLflow + Airflow auth into shared OIDC provider (tracked as M3 issue #85)
- [ ] Modular-monolith packaging + import-boundary lint (#47)
- [ ] Consolidate MLflow auth into shared OIDC provider (tracked as M3 issue #85)
- [ ] Shadow → A/B → launch pipeline as first-class in MLflow
- [ ] Online experiments framework: deterministic assignment + bandit policies alongside fixed-split A/B
- [ ] Cross-user collaborative features (opt-in only); cohort slicing; fairness checks

View File

@@ -22,11 +22,19 @@ Two ways to sign in:
| Route | Description |
|-------|-------------|
| `/` | Overview: DAU/WAU KPI cards, tips served, reaction breakdown, activation funnel |
| `/users` | User list (paginated) |
| `/users/:id` | User detail: identity, consents, integrations, profile features (#81 phase B), tip stats, reward history; revoke-integration + reset-bandit + rebuild-profile actions |
| `/audit` | Admin action audit log |
| `/events` | Event stream viewer (stub — pending API history endpoint) |
| `/reward-analytics` | Reaction distribution + per-policy / per-model / per-prompt-version / per-tip-kind breakdowns with avg reward |
| `/users` | User list (paginated, searchable) |
| `/users/:id` | User detail: identity, consents, integrations, profile features (completion rate, dismiss rate, dwell, preferred hour, tip volume), tip stats, reward history; revoke-integration + reset-bandit + rebuild-profile actions |
| `/audit` | Admin action audit log with timestamps and descriptions |
| `/events` | Live event stream viewer with filters by subject/user/time; tail of `signals.*` from ring buffer or NATS JetStream |
| `/features` | Feature store browser: features sent to `ml/serving` per scoring call; freshness status; per-feature SLA tracking |
| `/tips` | Served tips explorer: tip content, score, policy, model, feedback reactions; per-user timeline |
| `/reward-analytics` | Reaction distribution + per-policy / per-model / per-prompt-version breakdowns with avg reward; time-series and cohort slicing |
| `/data-quality` | Missing-feature rate heatmap, stale-token rate, daily completeness, per-feature freshness SLA status |
| `/health` | System health rollup: api, ml/serving, SQLite, event-bus, MLflow with 15s auto-refresh |
| `/sql` | Read-only SQL runner against SQLite; saved queries support; sunsets to Superset in M4 |
| `/simulate` | Offline simulation runner: launch `ml/experiments/sim`, track runs, judge selection, policy comparison |
| `/docs` | Admin documentation and ops runbooks inline |
| `/ops` | Operational dashboard (deprecation candidate; pending UX refinement #107) |
## Dev
@@ -40,8 +48,9 @@ pnpm --filter @oo/admin dev # starts on :3080
Stays as a Next.js app in the monorepo permanently — it's not a candidate for extraction.
It gets richer (more pages, embedded MLflow/Grafana) but not split.
## Known issues
## Known issues & pending improvements
- `@tremor/react 3.x` declares a peer dep on React 18; the workspace uses React 19.
Works in practice. Will resolve naturally when Tremor ships React 19 support or when
we switch to Tremor v4 (which targets React 18+).
- UX refinements pending (#100102): feedback options consolidation, config page UI migration, settings UI placement

View File

@@ -47,7 +47,14 @@ export default function OpsPage() {
return (
<AdminShell>
<div className="space-y-8">
<h1 className="text-xl font-semibold">Ops actions</h1>
<div>
<h1 className="text-xl font-semibold">Ops</h1>
<p className="text-sm text-gray-500 mt-1">
Live system controls toggle shadow recommendation policies, replay past signals
for backfill or debugging, and find per-user actions (token revoke, bandit reset)
on the <a href="/users" className="text-indigo-400 hover:underline">Users page</a>.
</p>
</div>
{msg && <p className="text-green-400 text-sm">{msg}</p>}
{error && <p className="text-red-400 text-sm">{error}</p>}
@@ -100,14 +107,6 @@ export default function OpsPage() {
</div>
</section>
{/* User-level ops */}
<section className="space-y-3">
<h2 className="text-base font-medium text-gray-300">User-level actions</h2>
<p className="text-sm text-gray-500">
Revoke integration tokens and reset bandit state are available on the{' '}
<a href="/users" className="text-indigo-400 hover:underline">Users page</a> navigate to a user detail view.
</p>
</section>
</div>
</AdminShell>
);

View File

@@ -2,25 +2,14 @@
import { useEffect, useState } from 'react';
import { AdminShell } from '@/components/AdminShell';
import {
startSimulation,
getSimulationRuns,
getSimulationRun,
SimRun,
} from '@/lib/api';
import { getSimulationRuns, SimRun } from '@/lib/api';
const POLICIES = ['linucb-v1', 'egreedy-v1', 'egreedy-v2'];
const mlflowBase = process.env.NEXT_PUBLIC_MLFLOW_URL ?? '/mlflow';
const airflowBase = process.env.NEXT_PUBLIC_AIRFLOW_URL ?? '/airflow';
function mlflowRunUrl(runId: string) {
return `${mlflowBase}/#/experiments/1/runs/${runId}`;
}
function airflowRunUrl(dagRunId: string) {
return `${airflowBase}/dags/bandit_sim/grid?dag_run_id=${encodeURIComponent(dagRunId)}`;
}
function StatusBadge({ status }: { status: string }) {
const cls: Record<string, string> = {
running: 'bg-blue-900 text-blue-300 border-blue-800',
@@ -56,10 +45,6 @@ function SummaryRow({ run }: { run: SimRun }) {
<a href={mlflowRunUrl(run.mlflowRunId)} target="_blank" rel="noreferrer"
className="text-xs text-indigo-400 hover:underline">MLflow </a>
)}
{run.airflowDagRunId && (
<a href={airflowRunUrl(run.airflowDagRunId)} target="_blank" rel="noreferrer"
className="text-xs text-indigo-400 hover:underline">Airflow </a>
)}
</div>
</div>
{summary && (
@@ -83,15 +68,7 @@ function SummaryRow({ run }: { run: SimRun }) {
export default function SimulatePage() {
const [runs, setRuns] = useState<SimRun[]>([]);
const [loading, setLoading] = useState(true);
const [launching, setLaunching] = useState(false);
const [error, setError] = useState('');
const [msg, setMsg] = useState('');
const [nUsers, setNUsers] = useState(5);
const [nRounds, setNRounds] = useState(20);
const [tasksPerRound, setTasksPerRound] = useState(8);
const [judgeMode, setJudgeMode] = useState<'rule' | 'llm'>('rule');
const [selectedPolicies, setSelectedPolicies] = useState<string[]>(['linucb-v1', 'egreedy-v1']);
const refresh = () =>
getSimulationRuns()
@@ -105,112 +82,26 @@ export default function SimulatePage() {
return () => clearInterval(t);
}, []);
const togglePolicy = (p: string) =>
setSelectedPolicies((prev) =>
prev.includes(p) ? prev.filter((x) => x !== p) : [...prev, p],
);
const handleLaunch = async () => {
if (selectedPolicies.length < 2) { setError('Select at least 2 policies.'); return; }
setLaunching(true); setError(''); setMsg('');
try {
const r = await startSimulation({ nUsers, nRounds, tasksPerRound, judgeMode, policies: selectedPolicies });
setMsg(r.airflow_dag_run_id
? `Launched via Airflow — dag_run_id: ${r.airflow_dag_run_id}`
: `Launched locally — run id: ${r.id}`);
await refresh();
} catch (e: unknown) {
setError((e as Error).message);
} finally {
setLaunching(false);
}
};
return (
<AdminShell>
<div className="space-y-8 max-w-4xl">
<div className="space-y-6 max-w-4xl">
<div>
<h1 className="text-xl font-semibold">Simulations</h1>
{error && <p className="text-red-400 text-sm">{error}</p>}
{msg && <p className="text-green-400 text-sm">{msg}</p>}
{/* Launch form */}
<section className="bg-gray-900 border border-gray-800 rounded p-5 space-y-4">
<h2 className="text-base font-medium text-gray-300">New simulation</h2>
<div className="grid grid-cols-3 gap-4 text-sm">
<label className="space-y-1">
<span className="text-gray-500">Users</span>
<input type="number" min={1} max={50} value={nUsers}
onChange={(e) => setNUsers(Number(e.target.value))}
className="w-full bg-gray-950 border border-gray-700 rounded px-2 py-1 text-gray-300" />
</label>
<label className="space-y-1">
<span className="text-gray-500">Rounds</span>
<input type="number" min={1} max={200} value={nRounds}
onChange={(e) => setNRounds(Number(e.target.value))}
className="w-full bg-gray-950 border border-gray-700 rounded px-2 py-1 text-gray-300" />
</label>
<label className="space-y-1">
<span className="text-gray-500">Tasks/round</span>
<input type="number" min={1} max={20} value={tasksPerRound}
onChange={(e) => setTasksPerRound(Number(e.target.value))}
className="w-full bg-gray-950 border border-gray-700 rounded px-2 py-1 text-gray-300" />
</label>
</div>
<div className="space-y-1 text-sm">
<span className="text-gray-500">Policies (select 2)</span>
<div className="flex gap-2 flex-wrap pt-1">
{POLICIES.map((p) => (
<button key={p} onClick={() => togglePolicy(p)}
className={`px-3 py-1 rounded border text-xs font-mono ${
selectedPolicies.includes(p)
? 'bg-indigo-900 border-indigo-700 text-indigo-200'
: 'border-gray-700 text-gray-500 hover:border-gray-500'
}`}>
{p}
</button>
))}
</div>
</div>
<div className="space-y-1 text-sm">
<span className="text-gray-500">Judge</span>
<div className="flex gap-2 pt-1">
{(['rule', 'llm'] as const).map((m) => (
<button key={m} onClick={() => setJudgeMode(m)}
className={`px-3 py-1 rounded border text-xs ${
judgeMode === m
? 'bg-gray-700 border-gray-500 text-white'
: 'border-gray-700 text-gray-500 hover:border-gray-500'
}`}>
{m}
</button>
))}
</div>
{judgeMode === 'llm' && (
<p className="text-xs text-yellow-600 mt-1">LLM judge requires ANTHROPIC_API_KEY in ml/serving env.</p>
)}
</div>
<button onClick={handleLaunch} disabled={launching}
className="bg-indigo-600 hover:bg-indigo-500 disabled:opacity-50 text-white rounded px-4 py-2 text-sm">
{launching ? 'Launching…' : 'Launch simulation'}
</button>
<p className="text-xs text-gray-600">
Runs via <a href={airflowBase} target="_blank" rel="noreferrer" className="text-indigo-500 hover:underline">Airflow</a> (mlops profile) when available; falls back to local subprocess.
Results logged to <a href={mlflowBase} target="_blank" rel="noreferrer" className="text-indigo-500 hover:underline">MLflow</a>.
<p className="text-sm text-gray-500 mt-1">
Offline policy comparisons trigger via the admin API or CLI. Results are logged to{' '}
<a href={mlflowBase} target="_blank" rel="noreferrer" className="text-indigo-400 hover:underline">MLflow </a>.
</p>
</section>
</div>
{error && <p className="text-red-400 text-sm">{error}</p>}
{/* Run history */}
<section className="space-y-3">
<h2 className="text-base font-medium text-gray-300">
<h2 className="text-xs text-gray-500 uppercase tracking-widest font-medium">
Run history
{loading && <span className="text-xs text-gray-600 ml-2">loading</span>}
{loading && <span className="text-gray-600 ml-2 normal-case">loading</span>}
</h2>
{runs.length === 0 && !loading && (
<p className="text-gray-600 text-sm">No simulations yet.</p>
<p className="text-gray-600 text-sm">No simulation runs yet.</p>
)}
{runs.map((r) => <SummaryRow key={r.id} run={r} />)}
</section>

View File

@@ -5,7 +5,6 @@ import { usePathname } from 'next/navigation';
import { useEffect, useState } from 'react';
const mlflowUrl = process.env.NEXT_PUBLIC_MLFLOW_URL ?? '/mlflow';
const airflowUrl = process.env.NEXT_PUBLIC_AIRFLOW_URL ?? '/airflow';
type NavItem = {
href: string;
@@ -54,7 +53,6 @@ const NAV: NavSection[] = [
items: [
{ href: '/docs', label: 'Docs' },
{ href: mlflowUrl, label: 'MLflow ↗', external: true, svcName: 'mlflow' },
{ href: airflowUrl, label: 'Airflow ↗', external: true, svcName: 'airflow' },
],
},
];

View File

@@ -278,7 +278,6 @@ export interface SimRun {
summaryJson: string | null;
winner: string | null;
personaBreakdownJson: string | null;
airflowDagRunId: string | null;
mlflowRunId: string | null;
createdAt: string;
finishedAt: string | null;
@@ -293,7 +292,7 @@ export interface SimStartRequest {
}
export function startSimulation(req: SimStartRequest) {
return apiFetch<{ id: string; status: string; airflow_dag_run_id?: string }>(
return apiFetch<{ id: string; status: string }>(
'/admin/simulate/start',
{ method: 'POST', body: JSON.stringify(req) },
);

View File

@@ -13,8 +13,11 @@ import { readdir, readFile } from 'fs/promises';
import path from 'path';
import { marked } from 'marked';
// apps/admin sits two levels below the monorepo root.
const DOCS_ROOT = path.resolve(process.cwd(), '../../docs');
// In development: process.cwd() = apps/admin/, so ../../docs = monorepo root docs/.
// In Docker standalone: CWD = /app, so ../../docs is wrong. Set DOCS_ROOT in the
// container to the absolute path where docs/ is copied (e.g. /app/docs).
const DOCS_ROOT =
process.env.DOCS_ROOT ?? path.resolve(process.cwd(), '../../docs');
export type DocCategory = 'adr' | 'architecture';

View File

@@ -0,0 +1,119 @@
'use client';
import { useEffect, useState, useCallback } from 'react';
import { getVapidPublicKey, subscribePush } from '@/lib/api';
type PushState = 'idle' | 'subscribed' | 'denied';
export default function ConfigPage() {
const [pushState, setPushState] = useState<PushState>('idle');
useEffect(() => {
if (typeof Notification !== 'undefined') {
if (Notification.permission === 'granted') setPushState('subscribed');
else if (Notification.permission === 'denied') setPushState('denied');
}
}, []);
const requestPush = useCallback(async () => {
if (!('serviceWorker' in navigator) || !('PushManager' in window)) return;
const permission = await Notification.requestPermission();
if (permission !== 'granted') { setPushState('denied'); return; }
try {
const reg = await navigator.serviceWorker.register('/sw.js');
const vapidKey = await getVapidPublicKey();
const sub = await reg.pushManager.subscribe({
userVisibleOnly: true,
applicationServerKey: vapidKey,
});
await subscribePush(sub.toJSON());
setPushState('subscribed');
} catch { setPushState('denied'); }
}, []);
return (
<main style={{ minHeight: '100vh', padding: '4rem 2rem', maxWidth: '480px', margin: '0 auto' }}>
<div style={{ display: 'flex', alignItems: 'center', gap: '1rem', marginBottom: '3rem' }}>
<a
href="/tip"
style={{ color: 'rgba(255,255,255,0.35)', fontSize: '0.85rem', textDecoration: 'none' }}
>
back
</a>
<h2 style={{ fontSize: '1.5rem', fontWeight: 300, margin: 0, letterSpacing: '-0.02em' }}>
Settings
</h2>
</div>
{/* Notifications */}
<section style={{ marginBottom: '2.5rem' }}>
<h3 style={{ fontSize: '0.75rem', letterSpacing: '0.12em', textTransform: 'uppercase', color: 'rgba(255,255,255,0.35)', marginBottom: '1rem', fontWeight: 400 }}>
Notifications
</h3>
<div style={{
border: '1px solid rgba(255,255,255,0.1)',
borderRadius: '0.75rem',
padding: '1.25rem 1.5rem',
display: 'flex',
alignItems: 'center',
justifyContent: 'space-between',
}}>
<div>
<div style={{ fontWeight: 400, fontSize: '0.9rem' }}>Push notifications</div>
<div style={{ color: 'rgba(255,255,255,0.35)', fontSize: '0.75rem', marginTop: '0.2rem' }}>
{pushState === 'subscribed' ? 'Enabled' : pushState === 'denied' ? 'Blocked by browser' : 'Get notified when a tip is ready'}
</div>
</div>
{pushState === 'idle' && (
<button
onClick={requestPush}
style={{
background: 'var(--white)',
color: 'var(--black)',
border: 'none',
borderRadius: '0.375rem',
padding: '0.375rem 0.875rem',
fontSize: '0.8rem',
fontWeight: 500,
cursor: 'pointer',
}}
>
Enable
</button>
)}
{pushState === 'subscribed' && (
<span style={{ color: 'rgba(255,255,255,0.35)', fontSize: '0.8rem' }}></span>
)}
</div>
</section>
{/* Integrations */}
<section>
<h3 style={{ fontSize: '0.75rem', letterSpacing: '0.12em', textTransform: 'uppercase', color: 'rgba(255,255,255,0.35)', marginBottom: '1rem', fontWeight: 400 }}>
Integrations
</h3>
<a
href="/connect"
style={{
display: 'flex',
alignItems: 'center',
justifyContent: 'space-between',
border: '1px solid rgba(255,255,255,0.1)',
borderRadius: '0.75rem',
padding: '1.25rem 1.5rem',
textDecoration: 'none',
color: 'var(--white)',
}}
>
<div>
<div style={{ fontWeight: 400, fontSize: '0.9rem' }}>Connected apps</div>
<div style={{ color: 'rgba(255,255,255,0.35)', fontSize: '0.75rem', marginTop: '0.2rem' }}>
Manage Todoist and other sources
</div>
</div>
<span style={{ color: 'rgba(255,255,255,0.35)', fontSize: '0.85rem' }}></span>
</a>
</section>
</main>
);
}

View File

@@ -1,12 +1,11 @@
'use client';
import { useEffect, useState, useRef, useCallback } from 'react';
import { getRecommendation, sendFeedback, getVapidPublicKey, subscribePush } from '@/lib/api';
import { getRecommendation, sendFeedback } from '@/lib/api';
import type { Tip } from '@oo/shared-types';
type State = 'loading' | 'tip' | 'empty' | 'actions' | 'done';
// Fade wrapper — children fade in when `visible`, fade out when not
function Fade({ visible, children, style }: {
visible: boolean;
children: React.ReactNode;
@@ -30,9 +29,7 @@ export default function TipPage() {
const [visible, setVisible] = useState(false);
const holdTimer = useRef<ReturnType<typeof setTimeout> | null>(null);
const [pressed, setPressed] = useState(false);
const [pushState, setPushState] = useState<'idle' | 'subscribed' | 'denied'>('idle');
// Fade in after state change settles
useEffect(() => {
if (state === 'loading' || state === 'done') {
setVisible(false);
@@ -61,42 +58,12 @@ export default function TipPage() {
useEffect(() => { loadTip(); }, [loadTip]);
// Check existing push permission on mount
useEffect(() => {
if (typeof Notification !== 'undefined' && Notification.permission === 'granted') {
setPushState('subscribed');
} else if (typeof Notification !== 'undefined' && Notification.permission === 'denied') {
setPushState('denied');
}
}, []);
const requestPush = useCallback(async () => {
if (!('serviceWorker' in navigator) || !('PushManager' in window)) return;
const permission = await Notification.requestPermission();
if (permission !== 'granted') { setPushState('denied'); return; }
try {
const reg = await navigator.serviceWorker.register('/sw.js');
const vapidKey = await getVapidPublicKey();
const sub = await reg.pushManager.subscribe({
userVisibleOnly: true,
applicationServerKey: vapidKey,
});
await subscribePush(sub.toJSON());
setPushState('subscribed');
} catch { setPushState('denied'); }
}, []);
const react = async (action: 'done' | 'dismiss' | 'snooze' | 'helpful' | 'not_helpful') => {
const react = async (action: 'done' | 'dismiss' | 'snooze') => {
if (!tip) return;
const isNavigating = ['done', 'dismiss', 'snooze'].includes(action);
if (isNavigating) {
setVisible(false);
setState('done');
} else {
setState('tip');
}
await sendFeedback(tip.id, { action });
if (isNavigating) setTimeout(() => loadTip(), 700);
setTimeout(() => loadTip(), 700);
};
const onPointerDown = () => {
@@ -119,7 +86,6 @@ export default function TipPage() {
return (
<>
<style>{`
@keyframes breathe {
0%, 100% { opacity: 0.3; }
@@ -144,7 +110,7 @@ export default function TipPage() {
overflow: 'hidden',
}}
>
{/* Ambient glow — breathes while loading */}
{/* Ambient glow */}
<div style={{
position: 'absolute',
inset: 0,
@@ -192,24 +158,6 @@ export default function TipPage() {
}}>
hold to act
</p>
{pushState === 'idle' && (
<button
onClick={(e) => { e.stopPropagation(); requestPush(); }}
style={{
marginTop: '2.5rem',
background: 'transparent',
border: 'none',
color: 'rgba(255,255,255,0.18)',
fontSize: '0.65rem',
letterSpacing: '0.12em',
textTransform: 'uppercase',
cursor: 'pointer',
padding: 0,
}}
>
notify me
</button>
)}
</Fade>
)}
@@ -242,12 +190,7 @@ export default function TipPage() {
<>
<div
onClick={() => { setState('tip'); }}
style={{
position: 'fixed',
inset: 0,
background: 'rgba(0,0,0,0.5)',
animation: 'none',
}}
style={{ position: 'fixed', inset: 0, background: 'rgba(0,0,0,0.5)' }}
/>
<div style={{
position: 'fixed',
@@ -260,8 +203,6 @@ export default function TipPage() {
display: 'flex',
flexDirection: 'column',
gap: '0.75rem',
transform: 'translateY(0)',
transition: 'transform 0.3s ease',
}}>
{tip && (
<p style={{
@@ -274,8 +215,6 @@ export default function TipPage() {
</p>
)}
<ActionButton label="Done ✓" onClick={() => react('done')} primary />
<ActionButton label="Helpful" onClick={() => react('helpful')} />
<ActionButton label="Not helpful" onClick={() => react('not_helpful')} />
<ActionButton label="Snooze" onClick={() => react('snooze')} />
<ActionButton label="Dismiss" onClick={() => react('dismiss')} />
<button
@@ -295,6 +234,27 @@ export default function TipPage() {
</div>
</>
)}
{/* Settings gear — bottom right */}
<a
href="/config"
onClick={(e) => e.stopPropagation()}
aria-label="Settings"
style={{
position: 'fixed',
bottom: '1.5rem',
right: '1.5rem',
color: 'rgba(255,255,255,0.15)',
fontSize: '1.1rem',
lineHeight: 1,
textDecoration: 'none',
padding: '0.5rem',
pointerEvents: 'auto',
zIndex: 10,
}}
>
</a>
</main>
</>
);

View File

@@ -13,6 +13,8 @@ vi.mock('@/lib/api', () => ({
import { getRecommendation, sendFeedback } from '@/lib/api';
import TipPage from '@/app/tip/page';
// jsdom doesn't support full anchor navigation — just verify the link exists
const mockGetRec = getRecommendation as ReturnType<typeof vi.fn>;
const mockSendFeedback = sendFeedback as ReturnType<typeof vi.fn>;
@@ -123,9 +125,20 @@ describe('TipPage — action sheet', () => {
expect(mockSendFeedback).toHaveBeenCalledWith('tip:dis', { action: 'dismiss' });
});
it('clicking "Helpful" calls sendFeedback with action=helpful (non-navigating)', async () => {
await renderTipAndHold('tip:help', 'Helpful tip');
await act(async () => { fireEvent.click(screen.getByText('Helpful')); });
expect(mockSendFeedback).toHaveBeenCalledWith('tip:help', { action: 'helpful' });
it('action sheet has exactly Done, Snooze, Dismiss — no Helpful/Not helpful', async () => {
await renderTipAndHold('tip:actions', 'Check actions');
expect(screen.getByText('Done ✓')).toBeInTheDocument();
expect(screen.getByText('Snooze')).toBeInTheDocument();
expect(screen.getByText('Dismiss')).toBeInTheDocument();
expect(screen.queryByText('Helpful')).not.toBeInTheDocument();
expect(screen.queryByText('Not helpful')).not.toBeInTheDocument();
});
it('settings gear link is present on tip page', async () => {
mockGetRec.mockResolvedValue({ tip: { id: 'tip:g', content: 'Gear test', source: 'todoist', createdAt: '' } });
render(<TipPage />);
await screen.findByText('Gear test');
const link = screen.getByRole('link', { name: /settings/i });
expect(link).toHaveAttribute('href', '/config');
});
});

View File

@@ -33,11 +33,10 @@ Same stack as `apps/web`. Reuses `packages/shared-types`, the Auth.js session co
Specialized MLOps tooling runs as **separate external services** with their own auth, linked from the admin shell — not embedded or reimplemented:
- **MLflow** → `https://o.alogins.net/mlflow` — experiment tracking, model registry, artifact browser; own basic-auth for now; see M3 for SSO consolidation
- **Airflow** → `https://o.alogins.net/airflow` — batch pipeline orchestration, dataset management; own web-auth for now
- **Grafana panels** → `/admin/infra` (iframed panels) — infra metrics
- **Marimo notebooks** → launch-out link from admin
The admin shell links to these services; clicking them opens a new tab. The `/experiments` and `/models` admin pages are hub pages with direct links to the relevant MLflow/Airflow views.
The admin shell links to these services; clicking them opens a new tab.
### AuthZ
@@ -56,7 +55,7 @@ The admin shell links to these services; clicking them opens a new tab. The `/ex
- One more Next.js app in the monorepo. Build/dev added to Turborepo.
- Tremor + shadcn/ui are added as dependencies. shadcn components are copied into `apps/admin/src/components/ui/` — no runtime version coupling.
- MLflow (`o.alogins.net/mlflow*` → port 5000) and Airflow (`o.alogins.net/airflow*` → port 8080) are path-based routes in the existing `o.alogins.net` Caddy block, started via `docker compose --profile mlops up`.
- Each service manages its own auth (MLflow: built-in basic-auth; Airflow: built-in web UI auth). M3 will consolidate both behind the shared OIDC provider.
- The `NEXT_PUBLIC_MLFLOW_URL` and `NEXT_PUBLIC_AIRFLOW_URL` build args in `Dockerfile.admin` default to the production URLs; override for dev builds.
- MLflow (`o.alogins.net/mlflow*` → port 5000) is a path-based route in the existing `o.alogins.net` Caddy block, started via `docker compose --profile mlops up`.
- MLflow manages its own auth (built-in basic-auth). M3 will consolidate behind the shared OIDC provider.
- The `NEXT_PUBLIC_MLFLOW_URL` build arg in `Dockerfile.admin` defaults to the production URL; override for dev builds.
- `admin_actions` audit log grows unboundedly — needs a retention policy before M4.

View File

@@ -0,0 +1,106 @@
# ADR-0013 — Multi-agent recommendation: pre-computed agent snippets + orchestrator LLM
**Status:** Accepted
**Date:** 2026-05-01
**Supersedes:** ADR-0007, ADR-0012
## Context
The ε-greedy bandit (ADR-0007, promoted to v2 in ADR-0012) was the first recommendation
policy. It served adequately during early M1 testing but carries structural problems that
become more acute as the user base grows:
- **Training signal sparsity.** The median user generates fewer than 5 reward signals per
week. Ridge regression on a 12-dimensional feature vector needs far more signal than
that to converge to a meaningful θ before the user loses interest.
- **Cold-start cost.** Every new user starts with an uninformed identity matrix. Early tips
are essentially random for the first weeks of use — precisely when first impressions
matter most.
- **Opacity.** The bandit cannot explain why it chose a tip. An orchestrator that reasons
explicitly over named agent outputs ("3 overdue tasks + peak hour approaching") is
interpretable by design.
- **Coupling of generation and selection.** The current pipeline generates candidates, then
scores them; the scoring is decoupled from the LLM reasoning. Giving the LLM the full
pre-computed context directly is a simpler and more capable design.
## Decision
Replace the RL bandit with a **multi-agent pipeline**:
### Sub-agents (async, pre-computed)
Multiple domain-specialized Python agents each analyze user state from one angle and
produce a **prompt snippet** — a short natural-language paragraph describing what they
found. They do not produce tips. They run periodically (every 15 minutes) and store
results in the new `agent_outputs` table with per-agent TTLs.
Initial agent set:
| Agent | ID | TTL |
|---|---|---|
| OverdueTaskAgent | `overdue-task` | 1h |
| MomentumAgent | `momentum` | 6h |
| TimeOfDayAgent | `time-of-day` | 15m |
| RecentPatternsAgent | `recent-patterns` | 24h |
| FocusAreaAgent | `focus-area` | 12h |
### Orchestrator agent (real-time)
When a user requests a tip, the TypeScript recommender:
1. Fetches all non-expired `agent_outputs` rows for the user.
2. Calls `POST /recommend` on `ml/serving` with the snippet list.
3. `ml/serving` assembles a single orchestrator prompt (template `v4-orchestrator`)
that concatenates all snippets, then calls LiteLLM via the existing `tip-generator`
alias to produce one tip.
No bandit scoring. No reward delivery to an ML model. The LLM receives full context and
generates the tip in one call.
### Feedback
`tipFeedback` rows are still written on every user reaction. `inferReward()` still runs
and `rewardMilli` is logged for observability and potential future supervised learning.
Reactions are not delivered to an ML endpoint.
## New data model
```sql
CREATE TABLE agent_outputs (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL REFERENCES users(id),
agent_id TEXT NOT NULL, -- e.g. 'overdue-task'
prompt_text TEXT NOT NULL, -- snippet produced by the agent
signals_snapshot TEXT, -- JSON: inputs the agent consumed
computed_at TEXT NOT NULL, -- ISO 8601
expires_at TEXT NOT NULL, -- ISO 8601 = computed_at + TTL
agent_version TEXT NOT NULL -- bump to invalidate cached outputs on logic changes
);
CREATE INDEX idx_agent_outputs_user_agent_exp
ON agent_outputs(user_id, agent_id, expires_at DESC);
```
## Consequences
### Positive
- Tips are explainable: `featuresJson` in `tipScores` records which agents contributed.
- Cold-start is eliminated: the orchestrator reasons from signals immediately, no warm-up.
- Adding or removing an agent is a self-contained change in `ml/agents/`.
- Swapping LLM models remains a config change (LiteLLM alias unchanged).
### Negative / risks
- **No automatic exploration.** The bandit would discover that a user prefers certain tip
types without being told. The orchestrator only knows what the agents tell it.
Mitigation: agents can evolve to encode richer signals; offline evaluation via the
existing bench scripts remain available.
- **Scheduler dependency.** If the pre-compute job falls behind, agent outputs go
stale. Mitigation: the orchestrator falls back to raw signal prompt when no outputs
exist; `TimeOfDayAgent` recomputes every 15 min to stay fresh.
- **Higher per-request token cost.** The orchestrator prompt is longer than the old bandit
prompt. Mitigation: the `tip-generator` alias points to a small local model; token cost
is negligible at current scale.
## Migration sequence
See plan document in conversation context. 10 steps; each independently deployable and
rollback-able. Cutover is Step 6 (single TypeScript PR). Bandit endpoints removed in
Step 7 after 48h clean traffic.

View File

@@ -47,7 +47,6 @@ User reactions (done / snooze / dismiss) are events too. They close the loop as
- **OpenAPI** for HTTP; TS client auto-generated; Python pydantic hand-written while consumers are few.
- **Feast** for feature store when we get there; homegrown adapter until then (Phase 1 seam).
- **MLflow** for model registry and experiment tracking; deployed at `o.alogins.net/mlflow`.
- **Airflow** for batch pipelines; deployed at `o.alogins.net/airflow`.
- **Auth.js** embedded behind an OIDC-shaped boundary (ADR-0004). Swap to a standalone OIDC provider when mobile ships.
- **k3s** as the first step beyond docker-compose — no "compose → full k8s" cliff.

View File

@@ -19,15 +19,14 @@ RUN --mount=type=cache,id=pnpm,target=/pnpm/store \
--filter @oo/admin... --filter @oo/shared-types
RUN pnpm --filter @oo/shared-types build
ARG NEXT_PUBLIC_MLFLOW_URL=/mlflow
ARG NEXT_PUBLIC_AIRFLOW_URL=/airflow
ENV NEXT_TELEMETRY_DISABLED=1 \
NEXT_PUBLIC_MLFLOW_URL=$NEXT_PUBLIC_MLFLOW_URL \
NEXT_PUBLIC_AIRFLOW_URL=$NEXT_PUBLIC_AIRFLOW_URL
NEXT_PUBLIC_MLFLOW_URL=$NEXT_PUBLIC_MLFLOW_URL
RUN pnpm --filter @oo/admin build
FROM node:22-slim AS runner
ENV NODE_ENV=production NEXT_TELEMETRY_DISABLED=1 PORT=3080
ENV NODE_ENV=production NEXT_TELEMETRY_DISABLED=1 PORT=3080 DOCS_ROOT=/app/docs
WORKDIR /app
COPY --from=builder /app/apps/admin/.next/standalone ./
COPY --from=builder /app/apps/admin/.next/static ./apps/admin/.next/static
COPY --from=builder /app/docs ./docs
CMD ["node", "apps/admin/server.js"]

View File

@@ -13,9 +13,6 @@ services:
NODE_ENV: production
ML_SERVING_URL: "http://ml-serving:8000"
MLFLOW_URL: "http://mlflow:5000"
AIRFLOW_URL: "http://airflow-webserver:8080"
AIRFLOW_API_USER: "admin"
AIRFLOW_API_PASSWORD: "${AIRFLOW_ADMIN_PASSWORD:-admin}"
INTERNAL_API_TOKEN: "${INTERNAL_API_TOKEN:-}"
volumes:
- /mnt/ssd/dbs/oo:/mnt/ssd/dbs/oo
@@ -56,7 +53,6 @@ services:
HOSTNAME: "0.0.0.0"
NEXT_PUBLIC_API_URL: ""
NEXT_PUBLIC_MLFLOW_URL: "/mlflow"
NEXT_PUBLIC_AIRFLOW_URL: "/airflow"
INTERNAL_API_URL: "http://api:3078"
ports:
- "127.0.0.1:3080:3080"
@@ -85,100 +81,9 @@ services:
timeout: 5s
retries: 5
# ── mlops profile — MLflow + Airflow ──────────────────────────────────────
# ── mlops profile — MLflow ────────────────────────────────────────────────
# Start: docker compose --profile mlops up
# MLflow UI: http://localhost:5000 or https://o.alogins.net/mlflow (admin / password — change via basic_auth.ini)
# Airflow UI: http://localhost:8080/airflow or https://o.alogins.net/airflow (admin / AIRFLOW_ADMIN_PASSWORD)
# Caddy routes /mlflow* and /airflow* inside the o.alogins.net block
airflow-db:
image: postgres:16-alpine
profiles: [mlops]
environment:
POSTGRES_DB: airflow
POSTGRES_USER: airflow
POSTGRES_PASSWORD: ${AIRFLOW_DB_PASSWORD:-airflow}
volumes:
- /mnt/ssd/dbs/oo/airflow-db:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U airflow"]
interval: 10s
timeout: 5s
retries: 5
airflow-init:
image: apache/airflow:2.9.3
profiles: [mlops]
entrypoint: /bin/bash
command:
- -c
- |
airflow db migrate
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@oo.local \
--password "$${AIRFLOW_ADMIN_PASSWORD:-admin}"
environment:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD:-airflow}@airflow-db/airflow
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__WEBSERVER__SECRET_KEY: ${AIRFLOW_SECRET_KEY:-change-me-in-prod}
AIRFLOW__WEBSERVER__BASE_URL: ${AIRFLOW_BASE_URL:-https://o.alogins.net/airflow}
depends_on:
airflow-db:
condition: service_healthy
restart: "no"
airflow-webserver:
image: apache/airflow:2.9.3
profiles: [mlops]
command: webserver
environment:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD:-airflow}@airflow-db/airflow
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__WEBSERVER__SECRET_KEY: ${AIRFLOW_SECRET_KEY:-change-me-in-prod}
AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW_FERNET_KEY:-}
AIRFLOW__WEBSERVER__BASE_URL: ${AIRFLOW_BASE_URL:-https://o.alogins.net/airflow}
AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth"
_PIP_ADDITIONAL_REQUIREMENTS: "mlflow==2.14.3 httpx"
MLFLOW_TRACKING_URI: "http://mlflow:5000/mlflow"
MLFLOW_TRACKING_USERNAME: "admin"
MLFLOW_TRACKING_PASSWORD: "${MLFLOW_ADMIN_PASSWORD:-password}"
volumes:
- ../../ml/pipelines:/opt/airflow/dags:ro
- ../../ml:/opt/airflow/ml:ro
ports:
- "127.0.0.1:8080:8080"
depends_on:
airflow-init:
condition: service_completed_successfully
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
airflow-scheduler:
image: apache/airflow:2.9.3
profiles: [mlops]
command: scheduler
environment:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD:-airflow}@airflow-db/airflow
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW_FERNET_KEY:-}
_PIP_ADDITIONAL_REQUIREMENTS: "mlflow==2.14.3 httpx"
MLFLOW_TRACKING_URI: "http://mlflow:5000/mlflow"
MLFLOW_TRACKING_USERNAME: "admin"
MLFLOW_TRACKING_PASSWORD: "${MLFLOW_ADMIN_PASSWORD:-password}"
volumes:
- ../../ml/pipelines:/opt/airflow/dags:ro
- ../../ml:/opt/airflow/ml:ro
depends_on:
airflow-init:
condition: service_completed_successfully
# MLflow UI: http://localhost:5000 or https://o.alogins.net/mlflow
# ── events profile — NATS JetStream ─────────────────────────────────────
# Start: docker compose --profile events up
@@ -201,7 +106,7 @@ services:
retries: 5
mlflow:
image: ghcr.io/mlflow/mlflow:v2.14.3
image: ghcr.io/mlflow/mlflow:v3.11.1
profiles: [mlops]
command: >
mlflow server
@@ -209,17 +114,15 @@ services:
--default-artifact-root /mlflow/artifacts
--host 0.0.0.0
--port 5000
--app-name basic-auth
--static-prefix /mlflow
environment:
MLFLOW_AUTH_CONFIG_PATH: /mlflow/basic_auth.ini
--allowed-hosts o.alogins.net,localhost
--cors-allowed-origins https://o.alogins.net
volumes:
- /mnt/ssd/dbs/oo/mlflow:/mlflow
- ../../infra/mlflow/basic_auth.ini:/mlflow/basic_auth.ini:ro
ports:
- "127.0.0.1:5000:5000"
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:5000/health',timeout=3).status==200 else 1)"]
test: ["CMD", "python", "-c", "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:5000/mlflow/health',timeout=3).status==200 else 1)"]
interval: 10s
timeout: 5s
retries: 5

View File

@@ -6,7 +6,7 @@ Python. Owns models, features, training, online scoring.
|---|---|---|
| `serving/` | FastAPI online scorer (`/score`, `/generate`) + LiteLLM gateway + prompt registry (`prompts.py`) + JetStream consumers for `signals.>` / `feedback.>`, called by `recommender` | 12 |
| `features/` | context assembler (`context.py`): signals → `PromptContext`; profile-feature schema mirror (`profile_schema.py`); Feast adapter later | 2 |
| `pipelines/` | batch feature + training DAGs (Prefect/Airflow) | 4 |
| `pipelines/` | batch feature + training scripts | 4 |
| `registry/` | MLflow-backed model registry integration | 4 |
| `experiments/` | A/B assignment + multi-armed bandit policies | 4 |
| `notebooks/` | research; never imported by production code | — |

View File

@@ -0,0 +1,85 @@
# `bench/` — combined model + prompt evaluation harness
Combines the work of issues **#93** (model benchmark) and **#95** (prompt
A/B) into one MLflow-tracked experiment. Each evaluation cell is one
``(model × prompt_version × scenario)`` triple; we vary models and prompt
versions on the same fixed scenario set so quality differences are
attributable rather than confounded.
## Pieces
| File | Purpose |
|------|---------|
| `rubric.md` | The scoring rubric (`tip-v1`). Anchor for the human judge across sessions. |
| `scenarios.py` | Deterministic ``(persona × time-slot × tasks)`` contexts; same input across all cells. |
| `mlflow_client.py` | Thin httpx-based MLflow REST wrapper. Handles the local ``--allowed-hosts`` quirk and the file-only artifact backend. |
| `collect.py` | **Phase A.** Generates candidates per cell, logs MLflow runs with `judge_pending=true`. |
| `judge_cli.py` | **Phase B.** `--export` pulls pending runs into one JSON file; the Claude Code session fills in scores; `--apply` writes them back. |
| `compare.py` | **Phase C.** Leaderboard per ``(model, prompt)`` cell. |
## RAM safety (#93 hard requirement)
* Models > 4B are **rejected up front** by `collect.py --max-model-b 4.0`.
* Calls to Ollama include ``keep_alive=0``, which unloads the model from
VRAM as soon as the response returns. We never hold two LLM weights
concurrently.
* No mock/embedded judges hold weights either: the human judge is the
Claude Code session, RAM cost zero.
The pipeline can run on a 15 GiB / 8 GiB-VRAM box (1070-class GPU) end
to end without paging.
## Quick start
```bash
# 1. Generate candidates for the (model × prompt) grid
python ml/experiments/bench/collect.py \
--models qwen2.5:0.5b,qwen2.5:1.5b,gemma3:1b,llama3.2:3b \
--prompts v1,v2-mentor,v3-few-shot \
--experiment tip-bench-2026-04-27 \
--n-tips 5 \
--diversity
# 2. Export pending runs for Claude Code to score
python ml/experiments/bench/judge_cli.py \
--experiment tip-bench-2026-04-27 \
--export /tmp/oo-bench-judge.json
# 3. (Claude Code edits /tmp/oo-bench-judge.json, fills scores per rubric.md.)
# 4. Push scores back to MLflow
python ml/experiments/bench/judge_cli.py \
--experiment tip-bench-2026-04-27 \
--apply /tmp/oo-bench-judge.json
# 5. Leaderboard
python ml/experiments/bench/compare.py --experiment tip-bench-2026-04-27
```
## Why the rubric matters
Different judging sessions need to be comparable. `rubric.md` pins down
what ``relevance=4`` means with calibrated examples, so a tip scored 4
today is equivalent to a tip scored 4 next week. Without the rubric, the
"lazy human-in-the-loop" judge drifts.
## Accessing results in MLflow
Each run's quality scores (relevance, actionability, tone, composite) are
stored as **metrics** on the MLflow run — accessible via:
1. **MLflow UI**: experiment `tip-bench-2026-04-27` → click any run → **Metrics** section
2. **Leaderboard**: `python ml/experiments/bench/compare.py --experiment tip-bench-2026-04-27`
3. **Raw API**: `mlflow_client.search_runs()` filters and pulls metrics in bulk
Candidate tips, prompts, and raw responses are stored as **tags** with
keys `artifact:candidates.json`, `artifact:prompt.txt`, `artifact:raw.txt`
(tag fallback because the MLflow server uses a file:// artifact backend
not accessible via REST from the host).
## Running standalone
The pipeline runs on any machine with:
- Ollama models ≤4B
- MLflow tracking server
- Python 3.10+

View File

@@ -0,0 +1,18 @@
"""oO tip-generation benchmark harness.
Combines model evaluation (#93) and prompt A/B testing (#95) into one
MLflow-tracked experiment. Each evaluation cell is one (model × prompt ×
scenario) triple; we vary models and prompts on the same fixed scenario
set so quality differences are attributable rather than confounded.
The pipeline follows the lazy-judge pattern: collect candidates with
deterministic metrics (latency, format_ok), export to a JSON file for
Claude Code to score per the rubric, apply scores back to MLflow, and
generate a leaderboard.
RAM safety is enforced: models >4B are rejected, Ollama calls use
keep_alive=0 to unload VRAM immediately, and the human judge (Claude Code
session) has zero inference cost.
See README.md for usage.
"""

View File

@@ -0,0 +1,338 @@
"""Phase A — collect tip candidates per (model × prompt × scenario) cell.
Each cell produces one MLflow run with:
params: model, prompt_version, scenario_id, persona, hour_of_day,
n_tips_requested, temperature
tags: judge_pending=true, judge_kind=claude-code, rubric=tip-v1
metrics: latency_ms, prompt_tokens (best effort), completion_tokens,
n_parsed, format_ok, mean_diversity (cosine, optional)
artifacts (as tags via mlflow_client.log_text):
prompt.txt system + user prompt as sent
candidates.json parsed candidate array
raw.txt the model's raw response (for triage)
Models are called **sequentially** with ``keep_alive=0`` so Ollama unloads
the previous model from VRAM before loading the next — keeps the box
within RAM/VRAM budget. Models > 4B are rejected up front.
Usage:
python collect.py \\
--models qwen2.5:0.5b,qwen2.5:1.5b,gemma3:1b,llama3.2:3b \\
--prompts v1,v2-mentor,v3-few-shot \\
--n-tips 5 \\
--experiment tip-bench-2026-04-27
"""
from __future__ import annotations
import argparse
import json
import math
import os
import re
import sys
import time
from dataclasses import asdict
from pathlib import Path
import httpx
_BENCH = Path(__file__).resolve().parent
_ML = _BENCH.parent.parent
sys.path.insert(0, str(_BENCH))
sys.path.insert(0, str(_BENCH.parent / "sim"))
sys.path.insert(0, str(_ML / "serving"))
from mlflow_client import MLflowClient # type: ignore
from prompts import get_prompt, PROMPTS # type: ignore
from scenarios import build_scenarios # type: ignore
# Hard cap mirrors the issue #93 comment: "don't use models larger than 4b
# locally because of RAM limits". A regex cheap-match on the tag handles
# the common ``name:Nb`` and ``name:N.Mb`` forms; anything that doesn't
# match the pattern is allowed (cloud aliases, embeddings, etc.).
_SIZE_TAG = re.compile(r":(\d+(?:\.\d+)?)b\b", re.IGNORECASE)
def _model_too_big(model: str, max_b: float = 4.0) -> bool:
m = _SIZE_TAG.search(model)
if not m:
return False
return float(m.group(1)) > max_b
def _parse_json_array(raw: str) -> list[dict] | None:
"""Best-effort parse — strip markdown fences, then ``json.loads``."""
text = raw.strip()
if text.startswith("```"):
parts = text.split("```")
text = parts[1] if len(parts) > 1 else text
if text.lstrip().lower().startswith("json"):
text = text.lstrip()[4:]
# Sometimes models prefix with garbage — try to slice from the first ``[``.
if not text.lstrip().startswith("["):
i = text.find("[")
if i >= 0:
text = text[i:]
try:
v = json.loads(text)
return v if isinstance(v, list) else None
except (json.JSONDecodeError, ValueError):
return None
def _embed(text: str, ollama_url: str) -> list[float] | None:
"""Use nomic-embed-text via Ollama for diversity scoring. ~250MB,
safe to load alongside any 4B chat model thanks to ``keep_alive=0``.
"""
try:
with httpx.Client(trust_env=False, timeout=30.0) as c:
r = c.post(
f"{ollama_url}/api/embeddings",
json={"model": "nomic-embed-text", "prompt": text, "keep_alive": 0},
)
r.raise_for_status()
return r.json().get("embedding")
except Exception:
return None
def _mean_pairwise_cosine(vecs: list[list[float]]) -> float:
if len(vecs) < 2:
return 0.0
def cos(a: list[float], b: list[float]) -> float:
na = math.sqrt(sum(x * x for x in a))
nb = math.sqrt(sum(x * x for x in b))
if na == 0 or nb == 0:
return 0.0
return sum(x * y for x, y in zip(a, b)) / (na * nb)
n = len(vecs)
total, count = 0.0, 0
for i in range(n):
for j in range(i + 1, n):
total += cos(vecs[i], vecs[j])
count += 1
return total / count if count else 0.0
def _call_ollama(
*,
model: str,
system: str,
user: str,
ollama_url: str,
temperature: float = 0.7,
) -> tuple[str, dict]:
"""Direct call to Ollama. Returns (raw_text, telemetry).
``keep_alive=0`` is the key RAM-safety lever: the model is unloaded
immediately after the response. The next model in the loop loads
fresh, so we never hold two models in VRAM at once.
"""
t0 = time.perf_counter()
body = {
"model": model,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user},
],
"stream": False,
"keep_alive": 0,
"options": {"temperature": temperature},
}
with httpx.Client(trust_env=False, timeout=180.0) as c:
r = c.post(f"{ollama_url}/api/chat", json=body)
r.raise_for_status()
data = r.json()
elapsed_ms = (time.perf_counter() - t0) * 1000.0
raw = data.get("message", {}).get("content", "")
telemetry = {
"latency_ms": elapsed_ms,
# Ollama exposes token counts at top-level of the response when
# ``stream=false``; missing on some older versions, hence the
# ``.get`` defaults.
"prompt_tokens": float(data.get("prompt_eval_count", 0) or 0),
"completion_tokens": float(data.get("eval_count", 0) or 0),
}
return raw, telemetry
def main() -> int:
parser = argparse.ArgumentParser(description="oO tip-generation benchmark — Phase A")
parser.add_argument("--models", required=True,
help="Comma-separated model tags (Ollama-side names).")
parser.add_argument("--prompts", default=",".join(PROMPTS.keys()),
help="Comma-separated prompt versions from ml/serving/prompts.py.")
parser.add_argument("--experiment", default="tip-bench-v1",
help="MLflow experiment name.")
parser.add_argument("--n-tips", type=int, default=5,
help="Tips to request per scenario.")
parser.add_argument("--temperature", type=float, default=0.7)
parser.add_argument("--ollama-url", default=os.environ.get("OLLAMA_URL", "http://localhost:11434"))
parser.add_argument("--mlflow-url", default=os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000"))
parser.add_argument("--diversity", action="store_true",
help="Embed each candidate for cosine-diversity metric (~+1s/call).")
parser.add_argument("--max-model-b", type=float, default=4.0,
help="Reject models tagged larger than this many billion params.")
parser.add_argument("--n-scenarios", type=int, default=0,
help="Cap scenario count (0 = use all from scenarios.py).")
parser.add_argument("--rubric", default=str(_BENCH / "rubric.md"),
help="Rubric file logged once per experiment.")
args = parser.parse_args()
models = [m.strip() for m in args.models.split(",") if m.strip()]
prompts = [p.strip() for p in args.prompts.split(",") if p.strip()]
too_big = [m for m in models if _model_too_big(m, args.max_model_b)]
if too_big:
print(f"ERROR: models exceed --max-model-b={args.max_model_b}: {too_big}", file=sys.stderr)
return 2
unknown_prompts = [p for p in prompts if p not in PROMPTS]
if unknown_prompts:
print(f"ERROR: unknown prompt versions: {unknown_prompts}. "
f"Available: {list(PROMPTS)}", file=sys.stderr)
return 2
scenarios = build_scenarios()
if args.n_scenarios and args.n_scenarios < len(scenarios):
scenarios = scenarios[:args.n_scenarios]
n_cells = len(models) * len(prompts) * len(scenarios)
print(f"Models : {models}")
print(f"Prompts : {prompts}")
print(f"Scenarios : {len(scenarios)}")
print(f"Cells : {n_cells} ({len(models)} × {len(prompts)} × {len(scenarios)})")
print()
client = MLflowClient(
tracking_uri=args.mlflow_url,
username=os.environ.get("MLFLOW_TRACKING_USERNAME") or "admin",
password=os.environ.get("MLFLOW_TRACKING_PASSWORD") or "password",
)
exp_id = client.get_or_create_experiment(args.experiment)
print(f"MLflow experiment: {args.experiment} (id={exp_id})")
rubric_text = Path(args.rubric).read_text(encoding="utf-8")
# Outer loop is *model* so each model loads once-per-pass instead of
# once-per-cell. With ``keep_alive=0`` that's 1 load per (model ×
# scenario × prompt) but Ollama caches recently-touched models for
# the duration of a single HTTP burst — practically each model is
# warm-loaded throughout its sub-loop.
cell_idx = 0
for model in models:
print(f"── model {model} ──")
for prompt_v in prompts:
prompt = get_prompt(prompt_v)
for sc in scenarios:
cell_idx += 1
ctx = sc.to_prompt_context()
class _Ctx:
pass
_ctx = _Ctx()
_ctx.tasks = ctx["tasks"]
_ctx.hour_of_day = ctx["hour_of_day"]
_ctx.day_of_week = ctx["day_of_week"]
_ctx.extra = ctx["extra"]
user_msg = prompt.build_user(_ctx, args.n_tips)
run_id = client.create_run(
exp_id,
run_name=f"{model}__{prompt_v}__{sc.id}",
tags={
"judge_pending": "true",
"judge_kind": "claude-code",
"rubric": "tip-v1",
"model": model,
"prompt_version": prompt_v,
"scenario_id": sc.id,
"persona": sc.persona.name,
},
)
client.log_params(run_id, {
"model": model,
"prompt_version": prompt_v,
"scenario_id": sc.id,
"persona": sc.persona.name,
"hour_of_day": sc.hour_of_day,
"day_of_week": sc.day_of_week,
"n_tips_requested": args.n_tips,
"temperature": args.temperature,
})
try:
raw, telemetry = _call_ollama(
model=model,
system=prompt.system,
user=user_msg,
ollama_url=args.ollama_url,
temperature=args.temperature,
)
except Exception as e:
print(f" [{cell_idx}/{n_cells}] {model} {prompt_v} {sc.id}: ERROR {e}")
client.set_tag(run_id, "error", str(e)[:500])
client.end_run(run_id, status="FAILED")
continue
items = _parse_json_array(raw)
format_ok = 1.0 if items is not None else 0.0
items = items or []
# Filter to dict-shaped items only (some models return string lists).
cand_dicts = [
{
"id": str(it.get("id", f"tip-{i}")),
"content": str(it.get("content", "")),
"rationale": str(it.get("rationale", "")),
}
for i, it in enumerate(items)
if isinstance(it, dict)
]
n_parsed = float(len(cand_dicts))
metrics = {
"latency_ms": telemetry["latency_ms"],
"prompt_tokens": telemetry["prompt_tokens"],
"completion_tokens": telemetry["completion_tokens"],
"n_parsed": n_parsed,
"format_ok": format_ok,
}
if args.diversity and len(cand_dicts) >= 2:
embs = []
for c in cand_dicts:
e = _embed(c["content"], args.ollama_url)
if e:
embs.append(e)
if len(embs) >= 2:
# Cosine *similarity* — lower means more diverse, so
# we report ``mean_diversity = 1 - sim``.
sim = _mean_pairwise_cosine(embs)
metrics["mean_diversity"] = 1.0 - sim
client.log_metrics(run_id, metrics)
client.log_text(run_id, prompt.system + "\n\n---\n\n" + user_msg, "prompt.txt")
client.log_text(run_id, json.dumps(cand_dicts, indent=2), "candidates.json")
client.log_text(run_id, raw[:9_000], "raw.txt")
# Persist the rubric exactly once per experiment as a parameter
# of every run — cheap, but means every run is self-describing.
client.set_tag(run_id, "rubric_md", rubric_text[: client._TAG_VALUE_LIMIT])
client.end_run(run_id)
print(f" [{cell_idx:>3}/{n_cells}] {model:18s} {prompt_v:12s} {sc.id:24s} "
f"lat={metrics['latency_ms']:>6.0f}ms parsed={int(n_parsed)}/{args.n_tips} "
f"fmt={int(format_ok)}")
print()
print(f"Phase A complete. Run judge_cli.py --export to score pending runs.")
print(f" python ml/experiments/bench/judge_cli.py --experiment {args.experiment} \\")
print(f" --export /tmp/oo-bench-judge-requests.json")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,144 @@
"""Phase C — leaderboard from judged MLflow runs.
Pulls every judged run (``judge_pending=false`` or any run with the
composite metric set) from the experiment, groups by (model, prompt)
cell, and prints a leaderboard sorted by mean composite score.
Also reports the deterministic-only metrics (latency, format_ok) so
cells with great prose but broken JSON are visible.
"""
from __future__ import annotations
import argparse
import os
import statistics
import sys
from collections import defaultdict
from pathlib import Path
_BENCH = Path(__file__).resolve().parent
sys.path.insert(0, str(_BENCH))
from mlflow_client import MLflowClient # type: ignore
def _params(run: dict) -> dict[str, str]:
return {p["key"]: p["value"] for p in run["data"].get("params", [])}
def _metrics(run: dict) -> dict[str, float]:
return {m["key"]: m["value"] for m in run["data"].get("metrics", [])}
def _tags(run: dict) -> dict[str, str]:
return {t["key"]: t["value"] for t in run["data"].get("tags", [])}
def main() -> int:
parser = argparse.ArgumentParser(description="oO bench — Phase C (leaderboard)")
parser.add_argument("--experiment", required=True)
parser.add_argument("--mlflow-url", default=os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000"))
parser.add_argument("--include-pending", action="store_true",
help="Also include rows with no quality scores (latency/format only).")
args = parser.parse_args()
client = MLflowClient(
tracking_uri=args.mlflow_url,
username=os.environ.get("MLFLOW_TRACKING_USERNAME") or "admin",
password=os.environ.get("MLFLOW_TRACKING_PASSWORD") or "password",
)
exp_id = client.get_or_create_experiment(args.experiment)
runs = client.search_runs(exp_id, max_results=2000)
# Group key = (model, prompt_version)
cells: dict[tuple[str, str], list[dict]] = defaultdict(list)
for r in runs:
params = _params(r)
metrics = _metrics(r)
tags = _tags(r)
if r["info"].get("status") != "FINISHED":
continue
if not args.include_pending and "composite" not in metrics:
continue
cells[(params.get("model", "?"), params.get("prompt_version", "?"))].append({
"metrics": metrics,
"scenario": params.get("scenario_id", "?"),
"judged": tags.get("judge_pending") == "false",
})
if not cells:
print("No judged runs found. Did you run judge_cli.py --apply?")
return 1
rows = []
for (model, prompt), records in cells.items():
n = len(records)
comp = [r["metrics"]["composite"] for r in records if "composite" in r["metrics"]]
rel = [r["metrics"]["relevance"] for r in records if "relevance" in r["metrics"]]
act = [r["metrics"]["actionability"] for r in records if "actionability" in r["metrics"]]
tone = [r["metrics"]["tone"] for r in records if "tone" in r["metrics"]]
lat = [r["metrics"]["latency_ms"] for r in records if "latency_ms" in r["metrics"]]
fmt = [r["metrics"]["format_ok"] for r in records if "format_ok" in r["metrics"]]
div = [r["metrics"]["mean_diversity"] for r in records if "mean_diversity" in r["metrics"]]
rows.append({
"model": model,
"prompt": prompt,
"n": n,
"composite": statistics.mean(comp) if comp else None,
"relevance": statistics.mean(rel) if rel else None,
"actionability": statistics.mean(act) if act else None,
"tone": statistics.mean(tone) if tone else None,
"format_ok": statistics.mean(fmt) if fmt else None,
"latency_p50": statistics.median(lat) if lat else None,
"latency_p95": _p95(lat) if lat else None,
"diversity": statistics.mean(div) if div else None,
})
rows.sort(key=lambda r: r["composite"] if r["composite"] is not None else -1, reverse=True)
# Width-fitted printer — keeps output legible in a 100-col terminal.
print()
print(f"Experiment: {args.experiment} (id={exp_id})")
print(f"Cells : {len(rows)}")
print()
header = (
f"{'#':>2} {'model':18s} {'prompt':12s} {'n':>3s} "
f"{'comp':>5s} {'rel':>4s} {'act':>4s} {'tone':>4s} "
f"{'fmt':>4s} {'p50':>6s} {'p95':>6s} {'div':>5s}"
)
print(header)
print("" * len(header))
for i, r in enumerate(rows, 1):
comp = f"{r['composite']:.2f}" if r["composite"] is not None else " -- "
rel = f"{r['relevance']:.1f}" if r["relevance"] is not None else " -- "
act = f"{r['actionability']:.1f}" if r["actionability"] is not None else " -- "
tone = f"{r['tone']:.1f}" if r["tone"] is not None else " -- "
fmt = f"{r['format_ok']:.2f}" if r["format_ok"] is not None else " -- "
p50 = f"{r['latency_p50']:.0f}" if r["latency_p50"] is not None else " -- "
p95 = f"{r['latency_p95']:.0f}" if r["latency_p95"] is not None else " -- "
div = f"{r['diversity']:.2f}" if r["diversity"] is not None else " -- "
print(
f"{i:>2} {r['model']:18s} {r['prompt']:12s} {r['n']:>3d} "
f"{comp:>5s} {rel:>4s} {act:>4s} {tone:>4s} "
f"{fmt:>4s} {p50:>6s} {p95:>6s} {div:>5s}"
)
if rows[0]["composite"] is not None:
winner = rows[0]
print()
print(f"Winner: {winner['model']} × {winner['prompt']} "
f"(composite={winner['composite']:.2f}, n={winner['n']})")
return 0
def _p95(xs: list[float]) -> float:
if not xs:
return 0.0
s = sorted(xs)
idx = max(0, int(round(0.95 * (len(s) - 1))))
return s[idx]
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,191 @@
"""Phase B — Claude Code as the lazy MLflow judge.
Two sub-commands, both keyed to MLflow tags so the same run cycles
through ``judge_pending=true`` → judged → ``judge_pending=false`` exactly
once.
--export PATH
Pull every run with ``judge_pending=true`` and ``judge_kind=claude-code``
from the experiment, bundle the prompt + parsed candidates + the
rubric into a single JSON file the Claude Code session can read.
--apply PATH
Read the responses (same shape as the request, with ``scores`` filled in)
and log ``relevance``, ``actionability``, ``tone``, ``overlong`` as
MLflow metrics on the corresponding runs. Sets ``judge_pending=false``
and stamps ``judged_at`` / ``judged_by`` so the run won't be picked up
twice.
The request file is intentionally one big JSON document, so the human
judge sees the full set in one place and can score consistently.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from pathlib import Path
_BENCH = Path(__file__).resolve().parent
sys.path.insert(0, str(_BENCH))
from mlflow_client import MLflowClient # type: ignore
_DIMENSIONS = ("relevance", "actionability", "tone")
_BIN_FLAGS = ("overlong",)
def _tags_dict(run: dict) -> dict[str, str]:
return {t["key"]: t["value"] for t in run.get("data", {}).get("tags", [])}
def _params_dict(run: dict) -> dict[str, str]:
return {p["key"]: p["value"] for p in run.get("data", {}).get("params", [])}
def export(client: MLflowClient, experiment: str, out_path: str) -> int:
exp_id = client.get_or_create_experiment(experiment)
runs = client.search_runs(
exp_id,
filter_string="tags.judge_pending = 'true' and tags.judge_kind = 'claude-code'",
)
if not runs:
print("No pending runs.")
Path(out_path).write_text(json.dumps({
"experiment": experiment,
"exported_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"rubric": "tip-v1",
"items": [],
}, indent=2))
return 0
rubric_text = (_BENCH / "rubric.md").read_text(encoding="utf-8")
items: list[dict] = []
for run in runs:
run_id = run["info"]["run_id"]
tags = _tags_dict(run)
params = _params_dict(run)
candidates_json = client.get_artifact_text(run_id, "candidates.json")
prompt_text = client.get_artifact_text(run_id, "prompt.txt")
try:
candidates = json.loads(candidates_json) if candidates_json else []
except json.JSONDecodeError:
candidates = []
items.append({
"run_id": run_id,
"model": params.get("model") or tags.get("model"),
"prompt_version": params.get("prompt_version") or tags.get("prompt_version"),
"scenario_id": params.get("scenario_id") or tags.get("scenario_id"),
"persona": params.get("persona") or tags.get("persona"),
"hour_of_day": int(params.get("hour_of_day", "12")),
"day_of_week": int(params.get("day_of_week", "0")),
"prompt": prompt_text,
"candidates": candidates,
# Per-run scoring slot — judge fills these in.
"scores": {
"relevance": None, # 15, integer
"actionability": None, # 15, integer
"tone": None, # 15, integer
"overlong": None, # 0/1
"notes": "", # short comment, optional
},
})
out = {
"experiment": experiment,
"exported_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"rubric": "tip-v1",
"rubric_md": rubric_text,
"items": items,
}
Path(out_path).write_text(json.dumps(out, indent=2, ensure_ascii=False))
print(f"Exported {len(items)} pending runs → {out_path}")
return 0
def apply(client: MLflowClient, experiment: str, in_path: str) -> int:
exp_id = client.get_or_create_experiment(experiment)
payload = json.loads(Path(in_path).read_text(encoding="utf-8"))
items = payload.get("items", [])
if not items:
print("No items in response file.")
return 0
judged_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
n_applied, n_skipped = 0, 0
for item in items:
run_id = item["run_id"]
scores = item.get("scores") or {}
missing = [d for d in _DIMENSIONS if scores.get(d) in (None, "")]
if missing:
print(f" [skip] {run_id}: missing {missing}")
n_skipped += 1
continue
metrics = {d: float(scores[d]) for d in _DIMENSIONS}
for f in _BIN_FLAGS:
v = scores.get(f)
if v not in (None, ""):
metrics[f] = float(int(bool(int(v))))
# Composite mirrors rubric.md: relevance + actionability + tone
# + 2 * format_ok - overlong. format_ok is already a metric on
# the run from collect.py; re-fetching is cheap and keeps this
# script idempotent if format compliance was retroactively fixed.
run = client._get("/runs/get", {"run_id": run_id})["run"]
existing_metrics = {m["key"]: m["value"] for m in run["data"].get("metrics", [])}
format_ok = float(existing_metrics.get("format_ok", 0.0))
overlong = metrics.get("overlong", 0.0)
composite = (
metrics["relevance"] + metrics["actionability"] + metrics["tone"]
+ 2 * format_ok - overlong
)
metrics["composite"] = composite
client.log_metrics(run_id, metrics)
client.set_tags(run_id, {
"judge_pending": "false",
"judged_at": judged_at,
"judged_by": "claude-code-session",
})
if scores.get("notes"):
client.set_tag(run_id, "judge_notes", str(scores["notes"])[:1000])
n_applied += 1
print(f" [ok] {run_id}: rel={metrics['relevance']:.1f} "
f"act={metrics['actionability']:.1f} tone={metrics['tone']:.1f} "
f"comp={composite:.2f}")
print(f"Applied {n_applied}, skipped {n_skipped}.")
return 0
def main() -> int:
parser = argparse.ArgumentParser(description="oO bench — Phase B (Claude Code judge)")
parser.add_argument("--experiment", required=True)
parser.add_argument("--mlflow-url", default=os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000"))
grp = parser.add_mutually_exclusive_group(required=True)
grp.add_argument("--export", metavar="PATH",
help="Write pending runs as a judgment-request JSON file.")
grp.add_argument("--apply", metavar="PATH",
help="Read filled-in responses and write metrics back to MLflow.")
args = parser.parse_args()
client = MLflowClient(
tracking_uri=args.mlflow_url,
username=os.environ.get("MLFLOW_TRACKING_USERNAME") or "admin",
password=os.environ.get("MLFLOW_TRACKING_PASSWORD") or "password",
)
if args.export:
return export(client, args.experiment, args.export)
return apply(client, args.experiment, args.apply)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,201 @@
"""Thin MLflow REST wrapper.
Why not the official ``mlflow`` SDK? Two reasons specific to the oO setup:
1. The MLflow server (3.11) ships with ``--allowed-hosts localhost`` but
curl / requests / urllib3 send ``Host: localhost:5000`` — the port
suffix fails the DNS-rebinding check. We override the Host header per
request, which the SDK doesn't expose.
2. The collect/judge phases only need ~6 endpoints (create/search/log).
Pulling a 200MB SDK transitively for that is excess weight.
All calls are synchronous httpx with explicit ``Host`` so the script can
run from the host shell or from inside docker without further config.
"""
from __future__ import annotations
import os
import time
from dataclasses import dataclass
from typing import Any
import httpx
def _strip_path(uri: str) -> tuple[str, str]:
"""Return (origin, path_prefix) — handles both /mlflow and / roots.
``http://mlflow:5000/mlflow`` → ("http://mlflow:5000", "/mlflow")
``http://localhost:5000`` → ("http://localhost:5000", "")
"""
uri = uri.rstrip("/")
if "/" not in uri.split("://", 1)[1]:
return uri, ""
scheme_host, _, rest = uri.partition("://")
host, _, path = rest.partition("/")
return f"{scheme_host}://{host}", "/" + path if path else ""
@dataclass
class MLflowClient:
tracking_uri: str
username: str | None = None
password: str | None = None
host_header: str | None = None # override for DNS-rebinding sidestep
timeout: float = 30.0
def __post_init__(self) -> None:
self._origin, self._ui_prefix = _strip_path(self.tracking_uri)
# MLflow 3.x exposes the REST API at the root, *not* under the
# ``/mlflow`` UI prefix. Empirically verified against the running
# ghcr.io/mlflow/mlflow:v3.11.1 container.
self._api = f"{self._origin}/api/2.0/mlflow"
self._auth = (self.username, self.password) if self.username else None
# If user did not pass a host header, derive from origin. Strip
# the port if present — the server's allowed-hosts check rejects
# ``localhost:5000`` even when ``localhost`` is allowed.
if self.host_header is None:
host = self._origin.split("://", 1)[1]
self.host_header = host.split(":", 1)[0]
@classmethod
def from_env(cls) -> "MLflowClient":
return cls(
tracking_uri=os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000"),
username=os.environ.get("MLFLOW_TRACKING_USERNAME") or "admin",
password=os.environ.get("MLFLOW_TRACKING_PASSWORD") or "password",
host_header=os.environ.get("MLFLOW_HOST_HEADER"),
)
def _headers(self) -> dict[str, str]:
return {"Host": self.host_header or "localhost"}
def _post(self, path: str, body: dict) -> dict:
with httpx.Client(trust_env=False, timeout=self.timeout) as c:
r = c.post(f"{self._api}{path}", json=body, headers=self._headers(), auth=self._auth)
r.raise_for_status()
return r.json()
def _get(self, path: str, params: dict | None = None) -> dict:
with httpx.Client(trust_env=False, timeout=self.timeout) as c:
r = c.get(f"{self._api}{path}", params=params or {}, headers=self._headers(), auth=self._auth)
r.raise_for_status()
return r.json()
# ── Experiments ────────────────────────────────────────────────────
def get_or_create_experiment(self, name: str) -> str:
try:
r = self._get("/experiments/get-by-name", {"experiment_name": name})
return r["experiment"]["experiment_id"]
except httpx.HTTPStatusError as e:
if e.response.status_code not in (404, 400):
raise
r = self._post("/experiments/create", {"name": name})
return r["experiment_id"]
# ── Runs ───────────────────────────────────────────────────────────
def create_run(
self,
experiment_id: str,
run_name: str,
tags: dict[str, str] | None = None,
) -> str:
body: dict[str, Any] = {
"experiment_id": experiment_id,
"start_time": int(time.time() * 1000),
"run_name": run_name,
"tags": [
{"key": k, "value": str(v)}
for k, v in (tags or {}).items()
],
}
r = self._post("/runs/create", body)
return r["run"]["info"]["run_id"]
def log_param(self, run_id: str, key: str, value: Any) -> None:
self._post("/runs/log-parameter", {"run_id": run_id, "key": key, "value": str(value)})
def log_params(self, run_id: str, params: dict[str, Any]) -> None:
for k, v in params.items():
self.log_param(run_id, k, v)
def log_metric(self, run_id: str, key: str, value: float, step: int = 0) -> None:
self._post("/runs/log-metric", {
"run_id": run_id,
"key": key,
"value": float(value),
"timestamp": int(time.time() * 1000),
"step": step,
})
def log_metrics(self, run_id: str, metrics: dict[str, float]) -> None:
for k, v in metrics.items():
self.log_metric(run_id, k, v)
def set_tag(self, run_id: str, key: str, value: str) -> None:
self._post("/runs/set-tag", {"run_id": run_id, "key": key, "value": str(value)})
def set_tags(self, run_id: str, tags: dict[str, str]) -> None:
for k, v in tags.items():
self.set_tag(run_id, k, v)
# MLflow tag values are capped at 5000 chars by the server (RESOURCE_DOES_NOT_EXIST
# below that, INVALID_PARAMETER_VALUE above). 4500 leaves headroom for
# internal metadata MLflow may append on its own.
_TAG_VALUE_LIMIT = 4500
def log_text(self, run_id: str, text: str, artifact_path: str) -> None:
"""Persist short text alongside the run.
The MLflow server in this deployment uses a ``file://`` artifact
backend, which is only reachable from inside the container — not
via the REST proxy. We instead stash short payloads as tags
keyed ``artifact:<path>``. Anything longer than 4500 chars is
chunked into ``artifact:<path>:0``, ``:1`` …; ``get_artifact_text``
re-stitches them in order.
"""
key_base = f"artifact:{artifact_path}"
if len(text) <= self._TAG_VALUE_LIMIT:
self.set_tag(run_id, key_base, text)
return
# chunk
for i in range(0, len(text), self._TAG_VALUE_LIMIT):
self.set_tag(run_id, f"{key_base}:{i // self._TAG_VALUE_LIMIT}",
text[i:i + self._TAG_VALUE_LIMIT])
def get_artifact_text(self, run_id: str, artifact_path: str) -> str:
run = self._get("/runs/get", {"run_id": run_id})["run"]
tags = {t["key"]: t["value"] for t in run["data"].get("tags", [])}
key_base = f"artifact:{artifact_path}"
if key_base in tags:
return tags[key_base]
# chunked form
chunks = sorted(
(k for k in tags if k.startswith(f"{key_base}:")),
key=lambda k: int(k.rsplit(":", 1)[1]),
)
return "".join(tags[k] for k in chunks)
def end_run(self, run_id: str, status: str = "FINISHED") -> None:
self._post("/runs/update", {
"run_id": run_id,
"status": status,
"end_time": int(time.time() * 1000),
})
def search_runs(
self,
experiment_id: str,
filter_string: str = "",
max_results: int = 1000,
) -> list[dict]:
body = {
"experiment_ids": [experiment_id],
"filter": filter_string,
"max_results": max_results,
}
r = self._post("/runs/search", body)
return r.get("runs", [])

View File

@@ -0,0 +1,85 @@
# Tip-quality rubric — `tip-v1`
This file is the consistency anchor for the Claude Code judge. The same
rubric is used across every judging session so verdicts are comparable
across runs (per the lazy-judge pattern in #95).
Each candidate tip is scored on three independent 15 dimensions, plus
two binary flags. Score the **content of the tip itself** for the given
persona/context — do not score the rationale.
## Dimensions
### relevance — 1 to 5
How well does the tip respond to *this specific persona at this specific
time*? A generic productivity platitude is 1; a tip that hooks into the
persona's stated preferences and the actual hour-of-day is 5.
| score | description |
|-------|-------------|
| 1 | Boilerplate. Could apply to any user, any time. |
| 2 | Vaguely fits the persona but ignores context. |
| 3 | Fits the persona OR the time, not both. |
| 4 | Fits both persona and time, with one specific anchor (a task, an hour, a habit). |
| 5 | Specific to the persona's preferences AND respects the hour, with a clear hook into a candidate task or routine. |
### actionability — 1 to 5
Could the user *do this in the next 10 minutes* without further planning?
"Try to focus more" is 1; "Spend 12 minutes on the Call dentist task and
stop when the timer ends" is 5.
| score | description |
|-------|-------------|
| 1 | Pure encouragement, no action. |
| 2 | Action exists but vague ("review your tasks"). |
| 3 | Concrete verb + object, but missing the time/duration handle. |
| 4 | Concrete action with a duration or trigger ("for 10 minutes", "before lunch"). |
| 5 | Micro-action with explicit start, duration, and a stop condition. |
### tone — 1 to 5
Does the tip sound like a calm, specific mentor (the product voice) or
like a generic chatbot/coach? Penalize emoji-spam, exclamation marks,
hype words ("amazing!", "let's crush it!"), and corporate jargon.
| score | description |
|-------|-------------|
| 1 | Hype, jargon, or motivational-poster tone. |
| 2 | Polite chatbot tone, no warmth. |
| 3 | Neutral, businesslike. |
| 4 | Quiet and specific, like a coach who knows you. |
| 5 | Earned. Reads like a mentor who has seen this exact stuck-pattern before. |
## Binary flags
### format_ok — 0 or 1
1 if the *whole response* parsed as a JSON array of objects with the
required keys (`id`, `content`, `rationale`). 0 otherwise. **This is
computed automatically by `collect.py`** — judges should not override it.
### overlong — 0 or 1
1 if `content` exceeds the documented 2-sentence cap (count sentence-
ending punctuation `. ! ?`). Judges may flag this as a tiebreaker.
## Composite score
`compare.py` ranks cells by:
```
composite = relevance + actionability + tone + 2*format_ok - overlong
```
i.e. format compliance is a doubled weight (a malformed JSON is a hard
production failure regardless of how good the prose is).
## Calibration examples
(Shared with judges so a 4 means the same thing across sessions.)
**Persona**: deadline-driven (responds to overdue/high-priority,
morning-active). **Hour**: 09:00. **Tasks include**: an overdue
"Call dentist", priority 4.
- "Stay focused and make today count!" — relevance 1, actionability 1, tone 1.
- "Review your tasks and pick one that matters." — relevance 2, actionability 2, tone 3.
- "Spend the next 12 minutes on Call dentist — set a timer and stop when it rings." — relevance 5, actionability 5, tone 4.
- "It's 09:00 — you respond to overdue items best now. Block 12 minutes for Call dentist before your first meeting." — relevance 5, actionability 5, tone 5.

View File

@@ -0,0 +1,80 @@
"""Fixed contexts for the tip-generation benchmark.
Every cell of the (model × prompt) grid is evaluated on the *same* set of
scenarios so quality differences are attributable to the model/prompt,
not to context variance.
A scenario is one (persona, hour-of-day, candidate-task-pool) tuple. The
hour and the task pool are seeded deterministically from the persona's
name so the bench is reproducible across machines.
"""
from __future__ import annotations
import sys
from dataclasses import dataclass
from pathlib import Path
# Reuse personas from sim — same source of truth for user archetypes.
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "sim"))
from personas import PERSONAS, Persona # type: ignore
from task_generator import generate_task_pool # type: ignore
@dataclass(frozen=True)
class Scenario:
id: str # stable id used as MLflow tag — keep ASCII safe
persona: Persona
hour_of_day: int # 023
day_of_week: int # 0=Mon
tasks: list[dict]
def to_prompt_context(self) -> dict:
"""Shape expected by ml/serving/prompts.PromptContext."""
return {
"tasks": [
{
"content": t["content"],
"priority": t["features"]["priority"],
"is_overdue": t["features"]["is_overdue"],
"due_date": t.get("due_date", "no due date"),
}
for t in self.tasks
],
"hour_of_day": self.hour_of_day,
"day_of_week": self.day_of_week,
"extra": {
"persona": self.persona.name,
"persona_hint": self.persona.description,
},
}
# Two time-slots probe whether the model adapts its tone to the hour.
# Morning (09) and evening (21) are picked because most personas have
# strong directional preferences there.
_TIME_SLOTS = [(9, 1), (21, 3)] # (hour_of_day, day_of_week)
def build_scenarios(tasks_per_scenario: int = 6) -> list[Scenario]:
"""Return a deterministic list of scenarios.
With 4 personas × 2 time-slots = 8 scenarios. Task pools are seeded
by ``hash(persona.name) + hour`` so runs are reproducible and each
persona sees the same tasks at the same hour across cells.
"""
out: list[Scenario] = []
for persona in PERSONAS[:4]:
for hour, dow in _TIME_SLOTS:
seed = (abs(hash(persona.name)) % 9973) + hour
tasks = generate_task_pool(n=tasks_per_scenario, seed=seed)
out.append(
Scenario(
id=f"{persona.name}-h{hour:02d}",
persona=persona,
hour_of_day=hour,
day_of_week=dow,
tasks=tasks,
)
)
return out

View File

@@ -1,124 +0,0 @@
"""
Airflow DAG: bandit_sim
Runs a bandit policy simulation and logs results to MLflow.
Triggered on-demand from the oO admin panel or manually from the Airflow UI.
Required conf keys (passed via dag_run.conf):
sim_run_id str — oO SQLite run ID for callback correlation
n_users int — number of synthetic users
n_rounds int — rounds per user
tasks_per_round int — candidate pool size per round
policies list — policy names to compare
judge_mode str — "rule" | "llm"
ml_url str — ml/serving URL (e.g. http://ml-serving:8000)
mlflow_url str — MLflow tracking URI (e.g. http://mlflow:5000/mlflow)
callback_url str — oO API callback endpoint
internal_token str — x-internal-token header value
"""
from __future__ import annotations
import json
import os
import sys
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
def _run_sim(**context: object) -> dict:
conf: dict = context["dag_run"].conf or {}
n_users = int(conf.get("n_users", 5))
n_rounds = int(conf.get("n_rounds", 20))
tasks_per_round = int(conf.get("tasks_per_round", 8))
policies = list(conf.get("policies", ["linucb-v1", "egreedy-v1"]))
judge_mode = str(conf.get("judge_mode", "rule"))
ml_url = str(conf.get("ml_url", "http://ml-serving:8000"))
mlflow_url = str(conf.get("mlflow_url", os.environ.get("MLFLOW_TRACKING_URI", "")))
mlflow_experiment = "bandit_simulation"
sys.path.insert(0, "/opt/airflow/ml/experiments/sim")
from runner import run_simulation # type: ignore[import]
use_llm = judge_mode == "llm"
result = run_simulation(
n_users=n_users,
n_rounds=n_rounds,
tasks_per_round=tasks_per_round,
ml_url=ml_url,
policies=policies,
use_llm=use_llm,
seed=42,
mlflow_url=mlflow_url or None,
mlflow_experiment=mlflow_experiment,
)
return result
def _callback(**context: object) -> None:
import httpx
conf: dict = context["dag_run"].conf or {}
callback_url: str = str(conf.get("callback_url", ""))
internal_token: str = str(conf.get("internal_token", ""))
if not callback_url or not internal_token:
print("No callback_url or internal_token — skipping result push.", flush=True)
return
result: dict = context["ti"].xcom_pull(task_ids="run_sim")
if not result:
print("No result from run_sim task — callback skipped.", flush=True)
return
payload = {
"summary": result.get("summary", {}),
"winner": result.get("winner", ""),
"persona_breakdown": result.get("persona_breakdown", {}),
"events": result.get("events", []),
"mlflow_run_id": result.get("mlflow_run_id"),
}
try:
r = httpx.post(
callback_url,
json=payload,
headers={"x-internal-token": internal_token},
timeout=30.0,
)
r.raise_for_status()
print(f"Callback OK: {r.status_code}", flush=True)
except Exception as exc:
print(f"Callback failed: {exc}", flush=True)
raise
with DAG(
dag_id="bandit_sim",
description="On-demand bandit policy simulation with MLflow tracking",
schedule_interval=None,
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["bandit", "simulation", "ml"],
default_args={
"retries": 1,
"retry_delay": timedelta(minutes=2),
},
) as dag:
run_sim = PythonOperator(
task_id="run_sim",
python_callable=_run_sim,
provide_context=True,
)
push_results = PythonOperator(
task_id="push_results",
python_callable=_callback,
provide_context=True,
)
run_sim >> push_results

View File

@@ -8,7 +8,6 @@ def configure() -> None:
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.JSONRenderer(),

View File

@@ -26,9 +26,11 @@ from __future__ import annotations
import json
import math
import os
import sys
import time
from collections import deque
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Deque
@@ -43,7 +45,17 @@ from starlette.middleware.base import BaseHTTPMiddleware
import logging_config
import nats_consumer
from prompts import get_prompt
from prompts import get_prompt, build_orchestrator_messages
# Make ml.agents importable regardless of working directory.
# In Docker (WORKDIR=/app/ml/serving, PYTHONPATH=/app): /app already on path.
# In local dev (run from ml/serving/): repo root is two levels up.
_repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
if _repo_root not in sys.path:
sys.path.insert(0, _repo_root)
from ml.agents.base import AgentInput # noqa: E402
from ml.agents.registry import get_agent, all_agents # noqa: E402
logging_config.configure()
@@ -322,6 +334,7 @@ class PromptContext(BaseModel):
hour_of_day: int = 12
day_of_week: int = 0
extra: dict = {}
profile_features: Optional[dict] = None
class GenerateRequest(BaseModel):
@@ -349,12 +362,61 @@ class GenerateResponse(BaseModel):
completion_tokens: int = 0
# ── Multi-agent models ─────────────────────────────────────────────────────
class AgentComputeRequest(BaseModel):
user_id: str
tasks: list[dict] = []
profile: dict[str, Optional[float]] = {}
feedback_history: list[dict] = []
now_iso: Optional[str] = None # ISO 8601; defaults to utcnow
class AgentComputeResponse(BaseModel):
user_id: str
agent_id: str
prompt_text: str
signals_snapshot: dict
computed_at: str
expires_at: str
agent_version: str
class AgentOutputSnippet(BaseModel):
agent_id: str
prompt_text: str
class RecommendRequest(BaseModel):
user_id: str
agent_outputs: list[AgentOutputSnippet] = []
tasks: list[dict] = []
hour_of_day: int = 12
day_of_week: int = 0
class TipResult(BaseModel):
id: str
content: str
source: str = "llm"
kind: str = "advice"
rationale: Optional[str] = None
class RecommendResponse(BaseModel):
tip: TipResult
model: str
prompt_tokens: int = 0
completion_tokens: int = 0
# ── Endpoints ──────────────────────────────────────────────────────────────
@app.get("/health")
def health():
return {
"ok": True,
"agents": [a.agent_id for a in all_agents()],
"nats": {
"enabled": bool(nats_consumer.NATS_URL),
"consumers": nats_consumer.consumer_health,
@@ -367,6 +429,137 @@ _RETRY_SUFFIX = (
"Reply ONLY with the JSON array — no prose, no markdown fences."
)
_RETRY_SUFFIX_OBJ = (
"\n\nYour previous response was not valid JSON. "
"Reply ONLY with the JSON object — no prose, no markdown fences."
)
@app.post("/agents/{agent_id}/compute", response_model=AgentComputeResponse)
async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentComputeResponse:
"""Run a single sub-agent for a user and return its prompt snippet.
Called by the precompute pipeline for each (user_id, agent_id) pair.
The caller is responsible for persisting the result to agent_outputs via the
TypeScript API callback.
"""
try:
agent = get_agent(agent_id)
except KeyError:
raise HTTPException(status_code=404, detail=f"Unknown agent: {agent_id!r}")
now = (
datetime.fromisoformat(req.now_iso.replace("Z", "+00:00"))
if req.now_iso
else datetime.now(timezone.utc)
)
if now.tzinfo is None:
now = now.replace(tzinfo=timezone.utc)
inp = AgentInput(
user_id=req.user_id,
tasks=req.tasks,
profile=req.profile,
feedback_history=req.feedback_history,
now=now,
)
try:
output = agent.compute(inp)
except Exception as exc:
log.error("agent_compute_failed", agent_id=agent_id, user_id=req.user_id, error=str(exc))
raise HTTPException(status_code=500, detail=f"Agent compute failed: {exc}")
log.info("agent_computed", agent_id=agent_id, user_id=req.user_id, expires_at=output.expires_at)
return AgentComputeResponse(
user_id=output.user_id,
agent_id=output.agent_id,
prompt_text=output.prompt_text,
signals_snapshot=output.signals_snapshot,
computed_at=output.computed_at,
expires_at=output.expires_at,
agent_version=output.agent_version,
)
@app.post("/recommend", response_model=RecommendResponse)
async def recommend(req: RecommendRequest) -> RecommendResponse:
"""Orchestrator: combine pre-computed agent outputs into one tip via LLM.
Called in real time when a user requests a tip. agent_outputs should be
the fresh rows from agent_outputs table (fetched by the TypeScript recommender
before calling this endpoint). Falls back to raw task context if empty.
"""
messages = build_orchestrator_messages(
agent_outputs=[s.model_dump() for s in req.agent_outputs],
tasks=req.tasks,
hour_of_day=req.hour_of_day,
day_of_week=req.day_of_week,
)
headers = {"Authorization": f"Bearer {LITELLM_MASTER_KEY}"}
last_raw = ""
last_parse_error = ""
total_usage: dict = {"prompt_tokens": 0, "completion_tokens": 0}
model_used = "tip-generator"
async with httpx.AsyncClient(timeout=30.0) as client:
for _attempt in range(1 + _MAX_GENERATE_RETRIES):
payload = {"model": "tip-generator", "messages": messages, "temperature": 0.7}
try:
resp = await client.post(
f"{LITELLM_URL}/chat/completions", json=payload, headers=headers
)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
raise HTTPException(status_code=502, detail=f"LiteLLM error: {e.response.text}")
except httpx.RequestError as e:
raise HTTPException(status_code=503, detail=f"LiteLLM unreachable: {e}")
data = resp.json()
usage = data.get("usage", {})
total_usage["prompt_tokens"] += usage.get("prompt_tokens", 0)
total_usage["completion_tokens"] += usage.get("completion_tokens", 0)
model_used = data.get("model", "tip-generator")
last_raw = data["choices"][0]["message"]["content"]
try:
text = last_raw.strip()
if text.startswith("```"):
parts = text.split("```")
text = parts[1] if len(parts) > 1 else text
if text.startswith("json"):
text = text[4:]
parsed = json.loads(text)
item: dict = parsed[0] if isinstance(parsed, list) else parsed
break
except (json.JSONDecodeError, ValueError, IndexError) as exc:
last_parse_error = str(exc)
messages.append({"role": "assistant", "content": last_raw})
messages.append({"role": "user", "content": _RETRY_SUFFIX_OBJ})
else:
raise HTTPException(
status_code=502,
detail=f"LLM returned invalid JSON after {_MAX_GENERATE_RETRIES} retries: "
f"{last_parse_error}\n{last_raw[:200]}",
)
tip = TipResult(
id=item.get("id", f"tip-{req.user_id[:8]}"),
content=item.get("content", ""),
rationale=item.get("rationale"),
)
log.info(
"recommend_served",
user_id=req.user_id,
agent_count=len(req.agent_outputs),
tip_id=tip.id,
)
return RecommendResponse(
tip=tip,
model=model_used,
prompt_tokens=total_usage["prompt_tokens"],
completion_tokens=total_usage["completion_tokens"],
)
_MAX_GENERATE_RETRIES = 2
@@ -392,7 +585,8 @@ async def generate(req: GenerateRequest) -> GenerateResponse:
prompt_template = get_prompt(req.prompt_version)
except KeyError as e:
raise HTTPException(status_code=422, detail=f"Unknown prompt_version: {e.args[0]}")
user_msg = prompt_template.build_user(req.context, req.n)
ctx = req.context.model_copy(update={"profile_features": req.profile_features})
user_msg = prompt_template.build_user(ctx, req.n)
messages: list[dict] = [
{"role": "system", "content": prompt_template.system},
{"role": "user", "content": user_msg},

View File

@@ -23,6 +23,7 @@ class _Ctx(Protocol):
hour_of_day: int
day_of_week: int
extra: dict
profile_features: "dict | None"
@dataclass(frozen=True)
@@ -33,13 +34,29 @@ class Prompt:
def _base_user_lines(ctx: "_Ctx") -> list[str]:
# Overdue tasks first, then high-priority, then oldest — most actionable context at top
tasks = sorted(
ctx.tasks,
key=lambda t: (not t.get("is_overdue", False), -t.get("priority", 1), -t.get("task_age_days", 0.0)),
)
lines = [f"Time: {ctx.hour_of_day:02d}:00, day_of_week={ctx.day_of_week}"]
if ctx.tasks:
overdue = [t for t in ctx.tasks if t.get("is_overdue")]
lines.append(f"Tasks: {len(ctx.tasks)} total, {len(overdue)} overdue")
for t in ctx.tasks[:5]:
if tasks:
overdue = [t for t in tasks if t.get("is_overdue")]
lines.append(f"Tasks: {len(tasks)} total, {len(overdue)} overdue")
for t in tasks[:5]:
due = t.get("due_date", "no due date")
lines.append(f" - [{t.get('priority','?')}] {t.get('content','?')} (due: {due})")
p = getattr(ctx, "profile_features", None) or {}
if p:
parts: list[str] = []
if (v := p.get("completion_rate_30d")) is not None:
parts.append(f"completion_rate={float(v):.0%}")
if (v := p.get("dismiss_rate_30d")) is not None:
parts.append(f"dismiss_rate={float(v):.0%}")
if (v := p.get("preferred_hour")) is not None:
parts.append(f"preferred_hour={int(v):02d}:00")
if parts:
lines.append(f"User profile: {', '.join(parts)}")
for k, v in ctx.extra.items():
lines.append(f"{k}: {v}")
return lines

View File

@@ -127,6 +127,46 @@ def test_build_prompt_empty_tasks_no_task_line():
assert "Generate 2 tips" in prompt
def test_build_prompt_tasks_sorted_overdue_first():
tasks = [
{"content": "Low priority", "priority": 1, "is_overdue": False, "task_age_days": 0},
{"content": "Overdue task", "priority": 2, "is_overdue": True, "task_age_days": 3},
]
ctx = PromptContext(tasks=tasks, hour_of_day=9)
prompt = _build_user_v1(ctx, n=2)
assert prompt.index("Overdue task") < prompt.index("Low priority")
def test_build_prompt_includes_profile_features():
ctx = PromptContext(
tasks=[],
hour_of_day=14,
profile_features={"completion_rate_30d": 0.75, "dismiss_rate_30d": 0.1, "preferred_hour": 9},
)
prompt = _build_user_v1(ctx, n=1)
assert "User profile:" in prompt
assert "completion_rate=75%" in prompt
assert "dismiss_rate=10%" in prompt
assert "preferred_hour=09:00" in prompt
def test_build_prompt_no_profile_line_when_empty():
ctx = PromptContext(tasks=[], hour_of_day=10, profile_features={})
prompt = _build_user_v1(ctx, n=1)
assert "User profile:" not in prompt
def test_build_prompt_profile_partial_fields():
ctx = PromptContext(
tasks=[],
hour_of_day=10,
profile_features={"completion_rate_30d": 0.5},
)
prompt = _build_user_v1(ctx, n=1)
assert "completion_rate=50%" in prompt
assert "dismiss_rate" not in prompt
@pytest.mark.anyio
async def test_generate_retry_succeeds_on_second_attempt():
"""First response is invalid JSON; second is valid. Should return 200."""
@@ -271,6 +311,38 @@ async def test_generate_echoes_selected_prompt_version():
assert resp.json()["prompt_version"] == "v2-mentor"
@pytest.mark.anyio
async def test_generate_passes_profile_features_to_prompt():
"""profile_features from GenerateRequest should appear in the user message sent to LiteLLM."""
fake_items = [{"id": "tip-1", "content": "x", "rationale": "y"}]
mock_resp = _litellm_response(fake_items)
captured_payload: list[dict] = []
async def _capture(url, *, json, headers):
captured_payload.append(json)
return mock_resp
with patch("main.httpx.AsyncClient") as MockClient:
instance = AsyncMock()
instance.post = AsyncMock(side_effect=_capture)
instance.__aenter__ = AsyncMock(return_value=instance)
instance.__aexit__ = AsyncMock(return_value=False)
MockClient.return_value = instance
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.post("/generate", json={
"user_id": "u1",
"n": 1,
"profile_features": {"completion_rate_30d": 0.8, "preferred_hour": 10},
})
assert resp.status_code == 200
user_msg = captured_payload[0]["messages"][1]["content"]
assert "User profile:" in user_msg
assert "completion_rate=80%" in user_msg
assert "preferred_hour=10:00" in user_msg
@pytest.mark.anyio
async def test_generate_422_on_unknown_prompt_version():
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:

View File

@@ -28,13 +28,20 @@ POST /api/push/subscribe
DELETE /api/push/subscribe
GET /api/admin/stats DAU/WAU, feedback breakdown
GET /api/admin/users
GET /api/admin/events recent event stream (ring buffer)
GET /api/admin/users user list with pagination
GET /api/user/:id user detail, consents, integrations
GET /api/admin/events recent event stream (ring buffer or NATS JetStream)
GET /api/admin/events/history historical event query (time range, filters)
GET /api/admin/sim/runs offline sim run list
POST /api/admin/sim/run launch offline sim
POST /api/admin/sim/run launch offline sim with policy/judge params
GET /api/admin/sim/runs/:id/output tail sim stdout
...
GET /api/admin/features/:userId per-user profile features + freshness
GET /api/admin/features/:userId/context context features for last score call
POST /api/admin/policies list shadow policies + active policy
POST /api/admin/policies/:name/toggle enable/disable shadow policy
POST /api/admin/users/:id/actions revoke-integration, reset-bandit, rebuild-profile
GET /api/admin/health system health: api, ml/serving, db, bus, mlflow
GET /api/admin/docs admin documentation index
GET /api/ml/* admin-only proxy to ml/serving
```

View File

@@ -35,11 +35,8 @@ export const config = {
LITELLM_URL: optional('LITELLM_URL', 'http://localhost:4000'),
MLFLOW_URL: optional('MLFLOW_URL', 'http://localhost:5000'),
AIRFLOW_URL: optional('AIRFLOW_URL', 'http://localhost:8080'),
AIRFLOW_API_USER: optional('AIRFLOW_API_USER', 'admin'),
AIRFLOW_API_PASSWORD: optional('AIRFLOW_API_PASSWORD', 'admin'),
/** Shared secret for internal Airflow→API callbacks. */
/** Shared secret for internal API callbacks. */
INTERNAL_API_TOKEN: optional('INTERNAL_API_TOKEN', ''),
/** Static token for automated/service access to the admin panel (e.g. Playwright tests). */

View File

@@ -143,6 +143,19 @@ export function runMigrations() {
day_of_week INTEGER NOT NULL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS agent_outputs (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL REFERENCES users(id),
agent_id TEXT NOT NULL,
prompt_text TEXT NOT NULL,
signals_snapshot TEXT,
computed_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
agent_version TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_agent_outputs_user_agent_exp
ON agent_outputs(user_id, agent_id, expires_at DESC);
`);
// Additive column migrations — safe to run on existing DBs.
@@ -156,7 +169,6 @@ export function runMigrations() {
`ALTER TABLE tip_scores ADD COLUMN prompt_version TEXT`,
`ALTER TABLE tip_scores ADD COLUMN llm_model TEXT`,
`ALTER TABLE tip_scores ADD COLUMN tip_kind TEXT`,
`ALTER TABLE sim_runs ADD COLUMN airflow_dag_run_id TEXT`,
`ALTER TABLE sim_runs ADD COLUMN mlflow_run_id TEXT`,
`ALTER TABLE sim_runs ADD COLUMN judge_mode TEXT NOT NULL DEFAULT 'rule'`,
`ALTER TABLE sim_runs ADD COLUMN n_policies INTEGER NOT NULL DEFAULT 2`,

View File

@@ -117,7 +117,6 @@ export const simRuns = sqliteTable('sim_runs', {
summaryJson: text('summary_json'), // JSON: { [policy]: PolicySummary }
winner: text('winner'),
personaBreakdownJson: text('persona_breakdown_json'), // JSON: { [persona]: { [policy]: {reward,n} } }
airflowDagRunId: text('airflow_dag_run_id'),
mlflowRunId: text('mlflow_run_id'),
createdAt: text('created_at').notNull(),
finishedAt: text('finished_at'),
@@ -142,6 +141,20 @@ export const simEvents = sqliteTable('sim_events', {
createdAt: text('created_at').notNull(),
});
// ── Agent outputs (#multi-agent) ─────────────────────────────────────────────
// One row per (userId, agentId) pre-compute run. The orchestrator reads the
// freshest non-expired row per agent when assembling the tip prompt.
export const agentOutputs = sqliteTable('agent_outputs', {
id: text('id').primaryKey(),
userId: text('user_id').notNull().references(() => users.id),
agentId: text('agent_id').notNull(), // e.g. 'overdue-task'
promptText: text('prompt_text').notNull(), // snippet for orchestrator prompt
signalsSnapshot: text('signals_snapshot'), // JSON: inputs the agent consumed
computedAt: text('computed_at').notNull(), // ISO 8601
expiresAt: text('expires_at').notNull(), // ISO 8601 = computedAt + TTL
agentVersion: text('agent_version').notNull(), // bump to invalidate on logic changes
});
// Admin saved SQL queries.
export const savedQueries = sqliteTable('saved_queries', {
id: text('id').primaryKey(),

View File

@@ -16,6 +16,7 @@ import { recommenderRouter } from './routes/recommender.js';
import { userRouter } from './routes/user.js';
import { pushRouter } from './routes/push.js';
import { adminRouter, adminInternalRouter } from './routes/admin.js';
import benchRouter from './routes/bench.js';
import { mkdir } from 'fs/promises';
import { dirname } from 'path';
import { requireAuth } from './middleware/session.js';
@@ -66,6 +67,7 @@ app.use('/api/user', userRouter);
app.use('/api/push', pushRouter);
app.use('/api/admin', adminRouter);
app.use('/api/admin', adminInternalRouter);
app.use('/api/bench', requireAuth as any, requireAdmin as any, benchRouter);
app.use('/api/ml', requireAuth as any, requireAdmin as any, async (req: Request, res: Response) => {
const mlUrl = config.ML_SERVING_URL;

View File

@@ -389,7 +389,7 @@ describe('GET /api/admin/events', () => {
// Health endpoint — mock fetch so tests don't depend on running services.
// ---------------------------------------------------------------------------
describe('GET /api/admin/health', () => {
const EXPECTED_HTTP_SERVICES = ['api', 'ml-serving', 'mlflow', 'airflow'] as const;
const EXPECTED_HTTP_SERVICES = ['api', 'ml-serving', 'mlflow'] as const;
const EXPECTED_INTERNAL = ['sqlite', 'event-bus'] as const;
const VALID_STATUSES = new Set(['ok', 'degraded', 'down']);
@@ -404,7 +404,6 @@ describe('GET /api/admin/health', () => {
let name: string;
if (s.includes(':8000')) name = 'ml-serving';
else if (s.includes(':5000')) name = 'mlflow';
else if (s.includes(':8080')) name = 'airflow';
else name = 'api';
if (!upServices.has(name)) throw new Error(`ECONNREFUSED ${name}`);
@@ -415,7 +414,7 @@ describe('GET /api/admin/health', () => {
afterEach(() => vi.unstubAllGlobals());
it('shape: 200, typed fields, all expected services present', async () => {
mockFetch(new Set(['api', 'ml-serving', 'mlflow', 'airflow']));
mockFetch(new Set(['api', 'ml-serving', 'mlflow']));
const { server, call } = await startServer(buildApp());
try {
const { status, body } = await call('GET', '/api/admin/health');
@@ -440,7 +439,7 @@ describe('GET /api/admin/health', () => {
});
it('ok=true when all HTTP services respond 200', async () => {
mockFetch(new Set(['api', 'ml-serving', 'mlflow', 'airflow']));
mockFetch(new Set(['api', 'ml-serving', 'mlflow']));
const { server, call } = await startServer(buildApp());
try {
const { body } = await call('GET', '/api/admin/health');
@@ -456,7 +455,7 @@ describe('GET /api/admin/health', () => {
});
it('ml-serving=down and ok=false when ml-serving is unreachable', async () => {
mockFetch(new Set(['api', 'mlflow', 'airflow'])); // ml-serving absent
mockFetch(new Set(['api', 'mlflow'])); // ml-serving absent
const { server, call } = await startServer(buildApp());
try {
const { body } = await call('GET', '/api/admin/health');
@@ -469,22 +468,8 @@ describe('GET /api/admin/health', () => {
}
});
it('airflow=down and ok=false when airflow is unreachable', async () => {
mockFetch(new Set(['api', 'ml-serving', 'mlflow'])); // airflow absent
const { server, call } = await startServer(buildApp());
try {
const { body } = await call('GET', '/api/admin/health');
const b = body as HealthBody;
const svc = b.services.find((s) => s.name === 'airflow');
expect(svc?.status).toBe('down');
expect(b.ok).toBe(false);
} finally {
server.close();
}
});
it('mlflow=down and ok=false when mlflow is unreachable', async () => {
mockFetch(new Set(['api', 'ml-serving', 'airflow'])); // mlflow absent
mockFetch(new Set(['api', 'ml-serving'])); // mlflow absent
const { server, call } = await startServer(buildApp());
try {
const { body } = await call('GET', '/api/admin/health');

View File

@@ -524,14 +524,10 @@ router.get('/data-quality', async (req: AuthenticatedRequest, res: Response) =>
// Fan-out to all subsystem /health endpoints.
// ---------------------------------------------------------------------------
router.get('/health', async (_req: AuthenticatedRequest, res: Response) => {
const airflowAuth = Buffer.from(`${config.AIRFLOW_API_USER}:${config.AIRFLOW_API_PASSWORD}`).toString('base64');
const checks: Array<{ name: string; url: string; headers?: Record<string, string> }> = [
{ name: 'api', url: `http://localhost:${config.PORT}/health` },
{ name: 'ml-serving', url: `${config.ML_SERVING_URL}/health` },
{ name: 'mlflow', url: `${config.MLFLOW_URL}/health` },
{ name: 'airflow', url: `${config.AIRFLOW_URL}/api/v1/health`,
headers: { Authorization: `Basic ${airflowAuth}` } },
];
const results = await Promise.allSettled(
@@ -705,8 +701,7 @@ router.delete('/saved-queries/:id', async (req: AuthenticatedRequest, res: Respo
// ---------------------------------------------------------------------------
// POST /api/admin/simulate/start
// Trigger an Airflow DAG run (bandit_sim). Falls back to a local subprocess
// when AIRFLOW_URL is not reachable, so local dev still works.
// Trigger a bandit_sim run via local subprocess.
// ---------------------------------------------------------------------------
router.post('/simulate/start', async (req: AuthenticatedRequest, res: Response) => {
const {
@@ -745,56 +740,7 @@ router.post('/simulate/start', async (req: AuthenticatedRequest, res: Response)
createdAt: now,
});
// ── Try Airflow first ────────────────────────────────────────────────────
if (config.AIRFLOW_URL && config.INTERNAL_API_TOKEN) {
try {
const airflowAuth = Buffer.from(
`${config.AIRFLOW_API_USER}:${config.AIRFLOW_API_PASSWORD}`,
).toString('base64');
const dagRes = await fetch(
`${config.AIRFLOW_URL}/api/v1/dags/bandit_sim/dagRuns`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Basic ${airflowAuth}`,
},
body: JSON.stringify({
conf: {
sim_run_id: id,
n_users: nUsers,
n_rounds: nRounds,
tasks_per_round: tasksPerRound,
policies,
judge_mode: judgeMode,
ml_url: config.ML_SERVING_URL,
mlflow_url: config.MLFLOW_URL,
callback_url: `${config.API_BASE_URL}/api/admin/simulate/${id}/complete`,
internal_token: config.INTERNAL_API_TOKEN,
},
}),
signal: AbortSignal.timeout(5000),
},
);
if (dagRes.ok) {
const dagBody = await dagRes.json() as { dag_run_id: string };
await db
.update(simRuns)
.set({ airflowDagRunId: dagBody.dag_run_id })
.where(eq(simRuns.id, id));
res.json({ id, status: 'running', airflow_dag_run_id: dagBody.dag_run_id });
return;
}
logger.warn({ status: dagRes.status }, 'sim: Airflow trigger failed, falling back to subprocess');
} catch (err) {
logger.warn({ err }, 'sim: Airflow unreachable, falling back to subprocess');
}
}
// ── Subprocess fallback (local dev / Airflow not configured) ────────────
// ── Subprocess ───────────────────────────────────────────────────────────
const runnerPath = resolve(__dirname, '../../../../ml/experiments/sim/runner.py');
const venvPython = resolve(__dirname, '../../../../ml/serving/.venv/bin/python');
const pythonBin = existsSync(venvPython) ? venvPython : 'python3';

View File

@@ -0,0 +1,220 @@
import { Router } from 'express';
import { nanoid } from 'nanoid';
import { db } from '../db/index.js';
import { agentOutputs, tipFeedback, tipViews } from '../db/schema.js';
import { eq, and, gt, lt } from 'drizzle-orm';
import { config } from '../config.js';
import { getProfile } from '../profile/builder.js';
import { todoistSource } from '../signals/todoist.js';
import { SignalAggregator } from '../signals/aggregator.js';
import type { Request, Response } from 'express';
const router = Router();
// Separate aggregator instance — avoids circular dep with recommender.ts.
const _agentAggregator = new SignalAggregator().register(todoistSource);
// ── Internal auth helper ──────────────────────────────────────────────────────
function checkInternalToken(req: Request, res: Response): boolean {
const token = req.headers['x-internal-token'];
if (!config.INTERNAL_API_TOKEN || token !== config.INTERNAL_API_TOKEN) {
res.status(401).json({ error: 'Unauthorized' });
return false;
}
return true;
}
// ── DB helpers ────────────────────────────────────────────────────────────────
export async function getActiveAgentOutputs(userId: string) {
const now = new Date().toISOString();
return db
.select()
.from(agentOutputs)
.where(and(eq(agentOutputs.userId, userId), gt(agentOutputs.expiresAt, now)));
}
async function storeAgentOutput(output: {
user_id: string;
agent_id: string;
prompt_text: string;
signals_snapshot?: unknown;
computed_at: string;
expires_at: string;
agent_version: string;
}) {
await db
.delete(agentOutputs)
.where(and(eq(agentOutputs.userId, output.user_id), eq(agentOutputs.agentId, output.agent_id)));
await db.insert(agentOutputs).values({
id: nanoid(),
userId: output.user_id,
agentId: output.agent_id,
promptText: output.prompt_text,
signalsSnapshot: output.signals_snapshot ? JSON.stringify(output.signals_snapshot) : null,
computedAt: output.computed_at,
expiresAt: output.expires_at,
agentVersion: output.agent_version,
});
}
// ── GET /api/agents/active-users ──────────────────────────────────────────────
// Returns user IDs that have requested a tip in the last 48 hours.
// Returns user IDs for fan-out precompute tasks.
router.get('/active-users', async (req: Request, res: Response) => {
if (!checkInternalToken(req, res)) return;
const cutoff = new Date(Date.now() - 48 * 60 * 60 * 1000).toISOString();
try {
const rows = await db
.selectDistinct({ userId: tipViews.userId })
.from(tipViews)
.where(gt(tipViews.servedAt, cutoff));
res.json({ user_ids: rows.map((r) => r.userId) });
} catch (err: any) {
res.status(500).json({ error: err.message });
}
});
// ── POST /api/agents/:agentId/compute ─────────────────────────────────────────
// Orchestrating endpoint for per-(user, agent) compute tasks.
// Fetches all signals, calls ml/serving /agents/{agentId}/compute, stores result.
// Body: { user_id: string }
router.post('/:agentId/compute', async (req: Request, res: Response) => {
if (!checkInternalToken(req, res)) return;
const { agentId } = req.params as { agentId: string };
const { user_id } = req.body as { user_id: string };
if (!user_id) {
res.status(422).json({ error: 'Missing user_id' });
return;
}
try {
// Fetch tasks via Todoist integration (gracefully empty if not connected).
let tasks: object[] = [];
try {
const signals = await _agentAggregator.fetchAll(user_id);
tasks = signals.map((s) => ({
id: s.id,
content: s.content,
priority: (s.features.priority as number) ?? 1,
is_overdue: Boolean(s.features.is_overdue),
task_age_days: (s.features.task_age_days as number) ?? 0,
project_id: (s.metadata as Record<string, unknown>).project_id ?? null,
}));
} catch {
// No integration or fetch error — agents that need tasks will report "no tasks"
}
// Fetch profile features (lazy-refreshed from DB).
let profile: Record<string, number | null> = {};
try {
profile = await getProfile(user_id);
} catch {}
// Fetch last 7 days of feedback for RecentPatternsAgent.
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString();
const feedbackRows = await db
.select({ action: tipFeedback.action, dwellMs: tipFeedback.dwellMs, createdAt: tipFeedback.createdAt })
.from(tipFeedback)
.where(and(eq(tipFeedback.userId, user_id), gt(tipFeedback.createdAt, sevenDaysAgo)));
const feedbackHistory = feedbackRows.map((f) => ({
action: f.action,
dwell_ms: f.dwellMs,
created_at: f.createdAt,
}));
// Call ml/serving to run the agent.
const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ user_id, tasks, profile, feedback_history: feedbackHistory }),
signal: AbortSignal.timeout(15_000),
});
if (!mlResp.ok) {
const detail = await mlResp.text().catch(() => '');
res.status(502).json({ error: `ml/serving returned ${mlResp.status}`, detail });
return;
}
const output = await mlResp.json() as {
user_id: string; agent_id: string; prompt_text: string;
signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string;
};
await storeAgentOutput(output);
res.json({ ok: true, agent_id: output.agent_id, user_id: output.user_id, expires_at: output.expires_at });
} catch (err: any) {
res.status(500).json({ error: err.message });
}
});
// ── POST /api/agents/outputs ──────────────────────────────────────────────────
// Stores a pre-computed agent output directly (used if the DAG calls ml/serving
// itself and pushes the result separately).
router.post('/outputs', async (req: Request, res: Response) => {
if (!checkInternalToken(req, res)) return;
const { user_id, agent_id, prompt_text, signals_snapshot, computed_at, expires_at, agent_version } =
req.body as Record<string, string>;
if (!user_id || !agent_id || !prompt_text || !computed_at || !expires_at || !agent_version) {
res.status(422).json({
error: 'Missing required fields: user_id, agent_id, prompt_text, computed_at, expires_at, agent_version',
});
return;
}
try {
await storeAgentOutput({ user_id, agent_id, prompt_text, signals_snapshot, computed_at, expires_at, agent_version });
res.json({ ok: true });
} catch (err: any) {
res.status(500).json({ error: err.message });
}
});
// ── DELETE /api/agents/outputs/expired ───────────────────────────────────────
// Purges rows expired more than 24 hours ago.
router.delete('/outputs/expired', async (req: Request, res: Response) => {
if (!checkInternalToken(req, res)) return;
const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
try {
await db.delete(agentOutputs).where(lt(agentOutputs.expiresAt, cutoff));
res.json({ ok: true });
} catch (err: any) {
res.status(500).json({ error: err.message });
}
});
// ── GET /api/agents/:userId/outputs ──────────────────────────────────────────
// Returns non-expired agent outputs. Admin observability; recommender calls
// getActiveAgentOutputs() directly (no HTTP hop).
router.get('/:userId/outputs', async (req: Request, res: Response) => {
const { userId } = req.params as { userId: string };
try {
const rows = await getActiveAgentOutputs(userId);
res.json({
user_id: userId,
outputs: rows.map((r) => ({
agent_id: r.agentId,
prompt_text: r.promptText,
computed_at: r.computedAt,
expires_at: r.expiresAt,
agent_version: r.agentVersion,
})),
});
} catch (err: any) {
res.status(500).json({ error: err.message });
}
});
export default router;

View File

@@ -0,0 +1,192 @@
/**
* Admin API endpoints for the tip-generation benchmark.
*
* Exposes:
* GET /api/bench/experiments — list MLflow experiments
* POST /api/bench/run — trigger benchmark DAG
* GET /api/bench/runs/:experiment — list runs in experiment
* GET /api/bench/leaderboard/:experiment — leaderboard by (model, prompt)
*/
import { Router, Request, Response } from "express";
import httpx from "httpx";
import * as process from "process";
const router = Router();
const MLFLOW_URL = process.env.MLFLOW_URL || "http://mlflow:5000";
const MLFLOW_USER = process.env.MLFLOW_TRACKING_USERNAME || "admin";
const MLFLOW_PASS = process.env.MLFLOW_TRACKING_PASSWORD || "password";
// Wrapper for MLflow REST calls with Host header fix
async function mlflowFetch(
path: string,
method: string = "GET",
body?: object
): Promise<any> {
const url = new URL(path, MLFLOW_URL);
const headers: Record<string, string> = {
"Host": "localhost",
"Content-Type": "application/json",
};
const auth = Buffer.from(`${MLFLOW_USER}:${MLFLOW_PASS}`).toString("base64");
headers["Authorization"] = `Basic ${auth}`;
const response = await fetch(url.toString(), {
method,
headers,
body: body ? JSON.stringify(body) : undefined,
});
if (!response.ok) {
throw new Error(`MLflow ${response.status}: ${response.statusText}`);
}
return response.json();
}
// GET /api/bench/experiments — list available experiments
router.get("/experiments", async (req: Request, res: Response) => {
try {
const result = await mlflowFetch("/api/2.0/mlflow/experiments/search", "GET");
const experiments = result.experiments
.filter((e: any) => e.name.startsWith("tip-bench"))
.map((e: any) => ({
id: e.experiment_id,
name: e.name,
creation_time: e.creation_time,
}));
res.json(experiments);
} catch (err) {
res.status(500).json({ error: String(err) });
}
});
// GET /api/bench/runs/:experiment — list runs in an experiment
router.get("/runs/:experiment", async (req: Request, res: Response) => {
try {
const { experiment } = req.params;
// First, get experiment ID
const exps = await mlflowFetch("/api/2.0/mlflow/experiments/search", "GET");
const exp = exps.experiments.find((e: any) => e.name === experiment);
if (!exp) {
return res.status(404).json({ error: "Experiment not found" });
}
// Then, search runs
const result = await mlflowFetch("/api/2.0/mlflow/runs/search", "POST", {
experiment_ids: [exp.experiment_id],
max_results: 1000,
});
const runs = (result.runs || []).map((r: any) => {
const params = Object.fromEntries(
(r.data?.params || []).map((p: any) => [p.key, p.value])
);
const metrics = Object.fromEntries(
(r.data?.metrics || []).map((m: any) => [m.key, m.value])
);
return {
run_id: r.info.run_id,
status: r.info.status,
model: params.model,
prompt_version: params.prompt_version,
scenario_id: params.scenario_id,
composite: metrics.composite || null,
relevance: metrics.relevance || null,
actionability: metrics.actionability || null,
tone: metrics.tone || null,
latency_ms: metrics.latency_ms || null,
};
});
res.json(runs);
} catch (err) {
res.status(500).json({ error: String(err) });
}
});
// GET /api/bench/leaderboard/:experiment — leaderboard
router.get("/leaderboard/:experiment", async (req: Request, res: Response) => {
try {
const { experiment } = req.params;
// Get experiment ID
const exps = await mlflowFetch("/api/2.0/mlflow/experiments/search", "GET");
const exp = exps.experiments.find((e: any) => e.name === experiment);
if (!exp) {
return res.status(404).json({ error: "Experiment not found" });
}
// Search runs
const result = await mlflowFetch("/api/2.0/mlflow/runs/search", "POST", {
experiment_ids: [exp.experiment_id],
max_results: 1000,
});
// Aggregate by (model, prompt)
const cells: Record<
string,
{ n: number; composites: number[]; latencies: number[] }
> = {};
for (const r of result.runs || []) {
const params = Object.fromEntries(
(r.data?.params || []).map((p: any) => [p.key, p.value])
);
const metrics = Object.fromEntries(
(r.data?.metrics || []).map((m: any) => [m.key, m.value])
);
if (r.info.status !== "FINISHED") continue;
const key = `${params.model}|${params.prompt_version}`;
if (!cells[key]) {
cells[key] = { n: 0, composites: [], latencies: [] };
}
cells[key].n++;
if (metrics.composite !== undefined) {
cells[key].composites.push(metrics.composite);
}
if (metrics.latency_ms !== undefined) {
cells[key].latencies.push(metrics.latency_ms);
}
}
// Build leaderboard rows
const rows = Object.entries(cells).map(([key, stats]) => {
const [model, prompt] = key.split("|");
const meanComp =
stats.composites.length > 0
? stats.composites.reduce((a, b) => a + b, 0) / stats.composites.length
: null;
const meanLat =
stats.latencies.length > 0
? stats.latencies.reduce((a, b) => a + b, 0) / stats.latencies.length
: null;
return {
model,
prompt,
n: stats.n,
composite: meanComp,
latency_ms: meanLat,
};
});
rows.sort((a, b) => {
const aComp = a.composite !== null ? a.composite : -Infinity;
const bComp = b.composite !== null ? b.composite : -Infinity;
return bComp - aComp;
});
res.json({
experiment,
rows,
winner: rows.length > 0 ? rows[0] : null,
});
} catch (err) {
res.status(500).json({ error: String(err) });
}
});
export default router;