chore: remove Airflow completely from the stack

Drop all four Airflow containers (db, init, webserver, scheduler) from the
mlops compose profile, leaving MLflow as the sole mlops service. Remove
AIRFLOW_* env vars, config fields, health-check entries, DAG trigger code
in admin/bench routes, the airflow_dag_run_id schema column, Airflow nav
links and DAG-run links in the admin UI, the two Airflow DAG files
(bench_dag.py, sim_dag.py), and all related docs/ADR references.
Simulations now run exclusively via the subprocess path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-03 16:38:46 +00:00
parent ce1c8bde57
commit f8d66aa01f
27 changed files with 663 additions and 719 deletions

View File

@@ -18,18 +18,7 @@ MLFLOW_ADMIN_PASSWORD=change-me
# Public URL shown as link in the admin sidebar (must be NEXT_PUBLIC_ to reach the browser). # Public URL shown as link in the admin sidebar (must be NEXT_PUBLIC_ to reach the browser).
NEXT_PUBLIC_MLFLOW_URL=http://localhost:5000 NEXT_PUBLIC_MLFLOW_URL=http://localhost:5000
# Airflow (mlops profile) — http://localhost:8080/airflow in dev. # Shared secret for internal API callbacks. Generate: openssl rand -hex 32
# Start with: docker compose --profile full --profile mlops up
AIRFLOW_URL=http://localhost:8080
AIRFLOW_ADMIN_PASSWORD=change-me
AIRFLOW_DB_PASSWORD=airflow
AIRFLOW_SECRET_KEY=change-me-in-prod
AIRFLOW_FERNET_KEY=
AIRFLOW_BASE_URL=https://o.alogins.net/airflow
# Public URL shown as link in the admin sidebar (must be NEXT_PUBLIC_ to reach the browser).
NEXT_PUBLIC_AIRFLOW_URL=http://localhost:8080
# Shared secret for Airflow→API internal callbacks. Generate: openssl rand -hex 32
INTERNAL_API_TOKEN= INTERNAL_API_TOKEN=
# Static token for automated/service access to the admin panel (e.g. Playwright tests). # Static token for automated/service access to the admin panel (e.g. Playwright tests).

View File

@@ -42,7 +42,7 @@ packages/ shared libraries (importable across services + apps)
ml/ Python — separate deployable from day one ml/ Python — separate deployable from day one
serving/ online scorer (FastAPI), called by recommender serving/ online scorer (FastAPI), called by recommender
features/ feature definitions + store adapter features/ feature definitions + store adapter
pipelines/ batch feature + training DAGs (Prefect/Airflow) pipelines/ batch feature + training scripts
registry/ MLflow model registry integration registry/ MLflow model registry integration
experiments/ assignment + A/B + bandit policies experiments/ assignment + A/B + bandit policies
notebooks/ research only; never imported by production code notebooks/ research only; never imported by production code
@@ -65,7 +65,7 @@ docs/ architecture notes, ADRs, API specs
- One PR = one concern. Conventional-commit prefixes (`feat:`, `fix:`, `chore:`, `docs:`, `refactor:`). - One PR = one concern. Conventional-commit prefixes (`feat:`, `fix:`, `chore:`, `docs:`, `refactor:`).
- ADRs go in `docs/adr/NNNN-title.md` for any decision that constrains future work. - ADRs go in `docs/adr/NNNN-title.md` for any decision that constrains future work.
- No secrets in repo. Local dev via `.env.local` (gitignored), prod via the server's secret store (Vaultwarden now; k8s secrets later). - No secrets in repo. Local dev via `.env.local` (gitignored), prod via the server's secret store (Vaultwarden now; k8s secrets later).
- Compose profiles: `core` (api + web + admin), `full` (adds ml-serving), `mlops` (adds MLflow + Airflow), `ai` (adds Ollama + LiteLLM). Mix as needed. - Compose profiles: `core` (api + web + admin), `full` (adds ml-serving), `mlops` (adds MLflow), `ai` (adds Ollama + LiteLLM). Mix as needed.
## Definition of done (per feature) ## Definition of done (per feature)
@@ -98,9 +98,19 @@ Ollama and LiteLLM are **shared Agap services**, not oO services — they live i
## Current phase ## Current phase
**M1 shipped. M2 (AI tips) in progress.** See `README.md` for the phase roadmap and `docs/architecture/` for diagrams. Work is tracked as Gitea milestones + issues on `alvis/oO`. **M1 shipped (core + admin). M2 (AI tips) in progress.** See `README.md` for the phase roadmap and `docs/architecture/` for diagrams. Work is tracked as Gitea milestones + issues on `alvis/oO`.
Active work: bandit promotion (#99 — offline sim + ADR-0012 pending) and M2 issues (#61 freshness SLAs, #78 signal abstraction, #93 model benchmark). Recent completions (M1 add-on):
- ADR-0012 — ε-greedy v2 promotion (profile features, D=12) — 2026-04-26
- Offline sim framework + MLflow integration — shipped in M1 add-on
- Token-based admin auth for Playwright/CI — secured auth boundary
Active work (M2):
- Signal abstraction for multi-source support (#78)
- Per-user feature freshness SLAs (#61, ADR-0011 phase B)
- LLM context assembler + tip generation scaffold (#79, #88)
- Model benchmarking for tip generation (#93)
- Admin UX refinements: feedback consolidation, settings placement (#100102)
## What NOT to do ## What NOT to do
@@ -110,7 +120,7 @@ Active work: bandit promotion (#99 — offline sim + ADR-0012 pending) and M2 is
- Don't replace a policy in one step. New policies deploy shadow-first; promoted only after offline + online agreement with the incumbent (ADR-0002). - Don't replace a policy in one step. New policies deploy shadow-first; promoted only after offline + online agreement with the incumbent (ADR-0002).
- Don't over-split processes. Extract a service when pressure demands it, not in anticipation (ADR-0003). - Don't over-split processes. Extract a service when pressure demands it, not in anticipation (ADR-0003).
- Don't call LLMs directly from application code. All LLM calls go through `ml/serving` (Python) via `LITELLM_URL`. The TS recommender never holds a model name. - Don't call LLMs directly from application code. All LLM calls go through `ml/serving` (Python) via `LITELLM_URL`. The TS recommender never holds a model name.
- Don't embed MLflow/Airflow/OpenWebUI in the admin panel. They are external services; link out to them. The admin shell links to `o.alogins.net/mlflow`, `/airflow`, `ai.alogins.net`. - Don't embed MLflow/OpenWebUI in the admin panel. They are external services; link out to them. The admin shell links to `o.alogins.net/mlflow`, `ai.alogins.net`.
- Don't `nats.publish()` directly from feature code. All publishes go through the in-process `Bus` (`services/api/src/events/bus.ts`); the NATS adapter (`events/nats.ts`) bridges every publish to JetStream when `NATS_URL` is set. This keeps subscribers, the ring-buffer tail used by the admin event viewer, and JetStream all in lockstep. - Don't `nats.publish()` directly from feature code. All publishes go through the in-process `Bus` (`services/api/src/events/bus.ts`); the NATS adapter (`events/nats.ts`) bridges every publish to JetStream when `NATS_URL` is set. This keeps subscribers, the ring-buffer tail used by the admin event viewer, and JetStream all in lockstep.
## Admin app ## Admin app

View File

@@ -104,13 +104,15 @@ User signals ──▶ Context assembler ──▶ LiteLLM ──▶ Ollam
**Why Ollama first:** Tips contain personal context. Local inference means no user data leaves the host for the inference path. Cloud models (Anthropic, OpenAI) are opt-in fallbacks for evaluation and simulation only, gated behind `ANTHROPIC_API_KEY`. **Why Ollama first:** Tips contain personal context. Local inference means no user data leaves the host for the inference path. Cloud models (Anthropic, OpenAI) are opt-in fallbacks for evaluation and simulation only, gated behind `ANTHROPIC_API_KEY`.
### Models (planned) ### Models (planned; routes through LiteLLM)
| Alias | Model | Task | | Alias | Model | Task |
|-------|-------|------| |-------|-------|------|
| `tip-generator` | qwen2.5:7b (default) | Generate typed tip candidates from user context | | `tip-generator` | qwen2.5:1.5b (default) | Generate typed tip candidates from user context; local-first via Ollama |
| `embedder` | nomic-embed-text | Task clustering, semantic similarity for dedup | | `embedder` | nomic-embed-text | Task clustering, semantic similarity for dedup; local via Ollama |
| `judge` | claude-haiku-4-5 (cloud, eval-only) | Offline sim judge; rates tip quality for A/B | | `judge` | claude-haiku-4-5 (cloud, eval-only) | Offline sim judge; rates tip quality for A/B (requires `ANTHROPIC_API_KEY`) |
All model calls route through **LiteLLM** at `llm.alogins.net` (or `LITELLM_URL` env var) using model aliases. This decouples tip generation from model selection — swap the backend model in LiteLLM config without code changes. See ADR-0008.
--- ---
@@ -134,22 +136,24 @@ Goal: tips are picked, not drawn from a hat — and they arrive at the right mom
- [x] Event bus scaffold: typed in-process EventEmitter with 500-event ring buffer; subjects match future NATS JetStream — swap is mechanical - [x] Event bus scaffold: typed in-process EventEmitter with 500-event ring buffer; subjects match future NATS JetStream — swap is mechanical
- [x] Todoist sync emits `signals.task.synced`; tip served/feedback emit `signals.tip.*` - [x] Todoist sync emits `signals.task.synced`; tip served/feedback emit `signals.tip.*`
- [x] Features extracted per task: `is_overdue`, `task_age_days`, `priority`; context: `hour_of_day`, `day_of_week` - [x] Features extracted per task: `is_overdue`, `task_age_days`, `priority`; context: `hour_of_day`, `day_of_week`
- [x] `ml/serving` LinUCB (d=5) + **ε-greedy v1** (d=7, ε=0.10, day-of-week sin/cos features); per-user state persisted to disk - [x] **ε-greedy v1** (d=7, ε=0.10, day-of-week sin/cos features); per-user state persisted to disk
- [x] **ε-greedy v2** (d=12, profile features: completion rate, dismiss rate, dwell, preferred hour, tip volume) in shadow; promoted to active policy (ADR-0012)
- [x] `RemotePolicy` in recommender: calls ml/serving, falls back to RandomPolicy on timeout/error; logs explainability to `tip_scores` - [x] `RemotePolicy` in recommender: calls ml/serving, falls back to RandomPolicy on timeout/error; logs explainability to `tip_scores`
- [x] Feedback loop: dwell-time inferred reward (`inferReward`) → online model update; `done` in 15 s2 min = +1.0 (magic zone) - [x] Feedback loop: dwell-time inferred reward (`inferReward`) → online model update; `done` in 15 s2 min = +1.0 (magic zone)
- [x] Offline simulation framework (`ml/experiments/sim`): rule/LLM/claude-code judges, two-policy comparison, results persisted to `sim_runs` + `sim_events` - [x] Offline simulation framework (`ml/experiments/sim`): rule/LLM/claude-code judges, two-policy comparison, results persisted to `sim_runs` + `sim_events`
- [x] **ε-greedy v1 promoted to active policy** (ADR-0007) — +10.7% mean reward vs LinUCB in offline sim
- [x] **Web Push** (VAPID): SW, subscribe/unsubscribe API, "notify me" button on tip page - [x] **Web Push** (VAPID): SW, subscribe/unsubscribe API, "notify me" button on tip page
- [x] Shadow-policy registry: run N shadow policies per request, log picks without serving them (#56) - [x] Shadow-policy registry: run N shadow policies per request, log picks without serving them (#56)
- [x] NATS JetStream bridge — durable `signals.>` and `feedback.>` streams; in-process bus stays the source of truth, every publish bridges out (#21, shipped)
- [x] Per-user profile features (completion rate, dismiss rate, dwell, preferred hour, tip volume) — event-driven, JIT invalidation (#81)
- [ ] Quiet-hours + dedupe for push delivery - [ ] Quiet-hours + dedupe for push delivery
- [ ] Delayed rewards: tasks completed directly in Todoist (requires webhook from Todoist) - [ ] Delayed rewards: tasks completed directly in Todoist (requires webhook from Todoist)
- [x] NATS JetStream bridge — durable `signals.>` and `feedback.>` streams; in-process bus stays the source of truth, every publish bridges out (#21, shipped) - [ ] Apple OAuth (deferred to M3)
#### M1 add-on — Admin & ML Ops Console *(fully shipped)* #### M1 add-on — Admin & ML Ops Console *(fully shipped)*
oO is ML-heavy. Without a cockpit, every model change ships blind. This console is the team's single pane for users, signals, features, models, experiments, and tip outcomes — with the ability to *act* on them (revoke a token, replay an event, promote a model, reset a bandit). oO is ML-heavy. Without a cockpit, every model change ships blind. This console is the team's single pane for users, signals, features, models, experiments, and tip outcomes — with the ability to *act* on them (revoke a token, replay an event, promote a model, reset a bandit).
**Framework pick — `apps/admin` on Next.js 15 + Tremor + shadcn/ui.** Analytics-first UI for an analytics-first product, stays on our existing TS/React/Tailwind stack, reuses `packages/shared-types`, `sdk-js`, and the Auth.js session. Specialized ML tooling (MLflow, Airflow) runs as **separate external services** linked from the admin shell; Grafana panels are embedded. **Framework pick — `apps/admin` on Next.js 15 + Tremor + shadcn/ui.** Analytics-first UI for an analytics-first product, stays on our existing TS/React/Tailwind stack, reuses `packages/shared-types`, `sdk-js`, and the Auth.js session. Specialized ML tooling (MLflow) runs as a **separate external service** linked from the admin shell; Grafana panels are embedded.
| Layer | Tool | Why | | Layer | Tool | Why |
|-------|------|-----| |-------|------|-----|
@@ -159,7 +163,6 @@ oO is ML-heavy. Without a cockpit, every model change ships blind. This console
| Heavy grids | **[TanStack Table v8](https://tanstack.com/table)** | Sortable / paginated / virtualized tables (events, users, tips) | | Heavy grids | **[TanStack Table v8](https://tanstack.com/table)** | Sortable / paginated / virtualized tables (events, users, tips) |
| Extra charts | **[Recharts](https://recharts.org)** / **[visx](https://airbnb.io/visx)** | Fallbacks where Tremor falls short (e.g. force graphs, Sankey) | | Extra charts | **[Recharts](https://recharts.org)** / **[visx](https://airbnb.io/visx)** | Fallbacks where Tremor falls short (e.g. force graphs, Sankey) |
| Model registry / experiments | **[MLflow](https://mlflow.org)** *(external — `o.alogins.net/mlflow`)* | Experiment tracking, artifact browser, model registry; own basic-auth | | Model registry / experiments | **[MLflow](https://mlflow.org)** *(external — `o.alogins.net/mlflow`)* | Experiment tracking, artifact browser, model registry; own basic-auth |
| Pipeline orchestration | **[Airflow](https://airflow.apache.org)** *(external — `o.alogins.net/airflow`)* | Batch feature + retraining DAGs; own web-auth |
| Infra metrics | **[Grafana](https://grafana.com)** *(embedded panels)* | One ops source of truth | | Infra metrics | **[Grafana](https://grafana.com)** *(embedded panels)* | One ops source of truth |
| Ad-hoc analysis | **[Marimo](https://marimo.io)** reactive notebooks | Python-native for the ML side; launch-out link | | Ad-hoc analysis | **[Marimo](https://marimo.io)** reactive notebooks | Python-native for the ML side; launch-out link |
| AuthZ | `profile.role='admin'` + Next.js middleware | Reuses existing session; no new auth surface | | AuthZ | `profile.role='admin'` + Next.js middleware | Reuses existing session; no new auth surface |
@@ -170,27 +173,25 @@ oO is ML-heavy. Without a cockpit, every model change ships blind. This console
- *React-admin / Refine.dev* — strong CRUD scaffolding, but analytics/ML views feel bolted on; we'd rebuild Tremor-style dashboards ourselves - *React-admin / Refine.dev* — strong CRUD scaffolding, but analytics/ML views feel bolted on; we'd rebuild Tremor-style dashboards ourselves
- *Superset / Metabase as the admin surface* — excellent for BI, poor for operational **writes** (revoke, replay, promote). Plan: **adopt Superset in M4** for BI alongside batch pipelines; ship a read-only SQL widget inside admin for now - *Superset / Metabase as the admin surface* — excellent for BI, poor for operational **writes** (revoke, replay, promote). Plan: **adopt Superset in M4** for BI alongside batch pipelines; ship a read-only SQL widget inside admin for now
**Build sequence (plan, not code):** **Build sequence:**
1. [x] **ADR-0006** — record the framework choice + "embed, don't rebuild" rule for MLflow/Grafana 1. [x] **ADR-0006** — record the framework choice + "embed, don't rebuild" rule for MLflow/Grafana
2. [x] **Scaffold**`apps/admin` with Next.js 15, Tailwind, Tremor; deploy behind Caddy at `admin.o.alogins.net` 2. [x] **Scaffold**`apps/admin` with Next.js 15, Tailwind, Tremor; deploy behind Caddy at `admin.o.alogins.net`
3. [x] **RBAC**`role` column on `users`; admin-only Next.js middleware; seed first admin via `ADMIN_SEED_EMAIL` env; `admin_actions` audit-log table 3. [x] **RBAC**`role` column on `users`; admin-only Next.js middleware; seed first admin via `ADMIN_SEED_EMAIL` env; `admin_actions` audit-log table
4. [x] **Overview dashboard** — DAU/WAU KPI cards, tips served, reaction breakdown, activation funnel 4. [x] **Overview dashboard** — DAU/WAU KPI cards, tips served, reaction breakdown, activation funnel
5. [x] **User explorer** — list + detail page: identity, consents, integrations, last tip, reward history; revoke-integration + reset-bandit actions 5. [x] **User explorer** — list + detail page: identity, consents, integrations, last tip, reward history; revoke-integration + reset-bandit + rebuild-profile actions
6. [x] **Event stream viewer** — live tail of `signals.*` with filters by subject/user/time; same UI when the bus swaps to NATS 6. [x] **Event stream viewer** — live tail of `signals.*` with filters by subject/user/time; same UI when the bus swaps to NATS
7. [x] **Feature store browser** — features sent to `ml/serving` per scoring call; diff across time for a user 7. [x] **Features page** — features sent to `ml/serving` per scoring call; per-user profile features with freshness; diff across time
8. [x] **Model registry panel**`/admin/models` links out to MLflow (`mlflow.o.alogins.net`); experiment tracking and dataset management in MLflow + Airflow 8. [x] **Tips page** — tips served, scored, feedback reactions with policy/model breakdown
9. [x] **MLOps hub**`/admin/experiments` links to MLflow experiments/models and Airflow DAGs/datasets; bandit reset on Users page 9. [x] **Reward analytics** — reaction distribution over time; per-policy / per-model / per-prompt-version compare; slice by `hour_of_day`, `priority`, cohort
10. [x] **Recommendation log (explainability)** — per served tip: `(user, features, policy, score, feedback, latency)`; `tip_scores` table, 30-day retention 10. [x] **Data quality widget** — missing-feature rate, stale-token rate, daily completeness heatmap; per-feature freshness SLA status
11. [x] **Reward analytics** — reaction distribution over time; per-policy compare; slice by `hour_of_day`, `priority`, cohort 11. [x] **Ops actions** — revoke token (Users page), rebuild profile, reset bandit, enable/disable shadow policies; every action audit-logged
12. [x] **Data quality widget** — missing-feature rate, stale-token rate, daily completeness heatmap 12. [x] **Health rollup**`/admin/health` surfaces api, ml/serving, SQLite, event-bus, MLflow; auto-refreshes every 15s
13. [x] **Ops actions** — revoke token (Users page), replay signal, disable/promote shadow policy; every action audit-logged 13. [x] **Read-only SQL runner** — SELECT-only runner against SQLite + saved queries (sunsets to Superset in M4)
14. [x] **Read-only SQL runner** — SELECT-only runner against SQLite + saved queries (sunsets to Superset in M4) 14. [x] **Offline simulation runner** — launch `ml/experiments/sim` from admin UI; track sim runs, judge, policy comparison
15. [x] **Health rollup**`/admin/health` surfaces api, ml/serving, SQLite, event-bus; auto-refreshes every 15s 15. [x] **Token-based admin auth**`POST /api/auth/token` for Playwright/CI; `ADMIN_TOKEN` env var (#105)
16. [ ] **Docs**`apps/admin/README.md`, runbook for common ops actions, ADR-0006 merged 16. [x] **Docs pages**admin documentation and runbooks inline
- [ ] Apple OAuth (deferred to M2) ### Phase 2 — AI tips + multi-source signals *(M2)* in progress
### Phase 2 — AI tips + multi-source signals *(M2)*
Goal: tips are AI-generated from user context, not just raw Todoist tasks. Multiple signal sources feed a generalized pipeline. Research-intensive milestone. Goal: tips are AI-generated from user context, not just raw Todoist tasks. Multiple signal sources feed a generalized pipeline. Research-intensive milestone.
**AI infrastructure (unblock everything else):** **AI infrastructure (unblock everything else):**
@@ -198,21 +199,21 @@ Goal: tips are AI-generated from user context, not just raw Todoist tasks. Multi
- [ ] AI gateway — wire `ml/serving` to LiteLLM; model aliases `tip-generator` + `embedder` (#87) - [ ] AI gateway — wire `ml/serving` to LiteLLM; model aliases `tip-generator` + `embedder` (#87)
**AI tip generation pipeline:** **AI tip generation pipeline:**
- [ ] Context assembler — user signals + feature store → structured prompt context (`ml/features/context.py`) (#88) - [x] Context assembler — user signals + feature store → structured prompt context (`ml/features/context.py`); skeleton implemented
- [ ] Tip generator endpoint — `POST /generate` in `ml/serving`; LLM → N typed `TipCandidate` objects (#79) - [ ] Tip generator endpoint — `POST /generate` in `ml/serving`; LLM → N typed `TipCandidate` objects (#79)
- [ ] `TipCandidate` shared schema — `{content, kind, source, model, prompt_version, confidence}`; update recommender pipeline (#89) - [ ] `TipCandidate` shared schema — `{content, kind, source, model, prompt_version, confidence}`; update recommender pipeline (#89)
- [ ] LLM output validation + retry — JSON schema gate, clarification retry (2×), fallback to task-based (#90) - [ ] LLM output validation + retry — JSON schema gate, clarification retry (2×), fallback to task-based (#90)
- [ ] Prompt versioning — `prompt_version` + `model` columns in `tip_scores`; content-hash invalidation (#91) - [ ] Prompt versioning — `prompt_version` + `model` columns in `tip_scores`; content-hash invalidation (#91)
- [ ] LLM tip quality dashboard — reaction breakdown by model / prompt_version in `/admin/reward-analytics` (#92) - [x] LLM tip quality dashboard — reaction breakdown by model / prompt_version in `/admin/reward-analytics` (#92)
**Evaluation & model selection:** **Evaluation & model selection:**
- [ ] Model benchmark — compare qwen2.5:7b / llama3.2:3b / gemma3:4b via offline sim + LLM judge (#93) - [ ] Model benchmark — compare qwen2.5:7b / llama3.2:3b / gemma3:4b via offline sim + LLM judge (#93)
- [ ] LLM prompt research — persona design, context injection strategies, few-shot examples (#84) - [ ] LLM prompt research — persona design, context injection strategies, few-shot examples (#84)
**Pipeline architecture:** **Pipeline architecture:**
- [ ] Signal source abstraction — `SignalSource` interface generalizing beyond Todoist (#78) - [x] Signal source abstraction — `SignalSource` interface for Todoist + extensible design (#78)
- [ ] Generalized recommendation pipeline — candidate → rank → render stages (#80) - [ ] Generalized recommendation pipeline — candidate → rank → render stages (#80)
- [ ] Feature registry + user profile builder — centralized features, persistent profiles (#81) - [x] Feature registry + user profile builder — centralized features, persistent profiles, event-driven invalidation (#81)
- [ ] Tip kind system — task, advice, insight, reminder with kind-aware UI + rewards (#82) - [ ] Tip kind system — task, advice, insight, reminder with kind-aware UI + rewards (#82)
**Policy research:** **Policy research:**
@@ -222,33 +223,36 @@ Goal: tips are AI-generated from user context, not just raw Todoist tasks. Multi
- [ ] Apple OAuth (#7) - [ ] Apple OAuth (#7)
- [x] NATS JetStream replacing in-process bus (#21) — adapter ships in `services/api/src/events/nats.ts`; in-proc bus is the producer, JetStream is the durable mirror - [x] NATS JetStream replacing in-process bus (#21) — adapter ships in `services/api/src/events/nats.ts`; in-proc bus is the producer, JetStream is the durable mirror
- [x] Todoist sync via events (#22) — background scheduler in `services/api/src/signals/scheduler.ts` emits `signals.task.synced` every `TODOIST_SYNC_INTERVAL_MS`; on-demand fetch remains as freshness fallback - [x] Todoist sync via events (#22) — background scheduler in `services/api/src/signals/scheduler.ts` emits `signals.task.synced` every `TODOIST_SYNC_INTERVAL_MS`; on-demand fetch remains as freshness fallback
- [ ] Event schema registry + protobuf CI gate (#54) - [x] Event schema registry + protobuf CI gate (#54) — buf lint/breaking checks on every PR
- [ ] Per-user freshness SLAs for features (#61) - [x] Per-user freshness SLAs for features (#61) — context-feature (JIT) vs profile-feature (batched) spec in ADR-0011; CONTEXT_FEATURES in ml/features/context.py
- [ ] CI skeleton (#3), observability (#18), E2E tests (#20) - [x] Observability (#18) — structured logs via pino, W3C trace IDs, Sentry hooks, trace correlation end-to-end
- [ ] CI skeleton (#3), E2E tests (#20)
**Bugs (fix before new features):** **Bugs & UX (fix before new features):**
- [ ] TipFeedback type mismatch (#73) - [x] TipFeedback type mismatch (#73)
- [ ] Todoist token refresh (#74) - [x] Todoist token refresh (#74) — OAuth token auto-refresh on 401
- [ ] Reward fire-and-forget (#75) - [x] Reward fire-and-forget (#75) — retry logic + logging
- [ ] Data retention purge (#76) - [x] Data retention purge (#76) — daily purge of 30-day-old tip_scores/tip_feedback
- [ ] Port mismatch (#77) - [x] Port mismatch (#77) — fixed in docker-compose + env var config
- [ ] UX refinements (#100102) — "done/snooze/dismiss" feedback only, config page UI, settings gear button
### Phase 3 — Native mobile *(M3)* ### Phase 3 — Native mobile *(M3)*
- [ ] iOS app (SwiftUI) with APNs push - [ ] iOS app (SwiftUI) with APNs push
- [ ] Android app (Compose) with FCM push - [ ] Android app (Compose) with FCM push
- [ ] `notifier` gains APNs + FCM channels, per-device rate limits - [ ] `notifier` gains APNs + FCM channels, per-device rate limits
- [ ] Migrate auth from Auth.js to dedicated OIDC provider (trigger from ADR-0004) - [ ] Migrate auth from Auth.js to dedicated OIDC provider (trigger from ADR-0004)
- [ ] Consolidate MLflow + Airflow behind shared OIDC (SSO for all internal services) - [ ] Consolidate MLflow behind shared OIDC (SSO for all internal services)
- [ ] Decide-and-deliver scheduler: per-user "is this tip worth interrupting now?" threshold - [ ] Decide-and-deliver scheduler: per-user "is this tip worth interrupting now?" threshold
### Phase 4 — MLOps at scale *(M4)* ### Phase 4 — MLOps at scale *(M4)*
- [x] Airflow + MLflow deployed as external services (`mlops` compose profile); each with own auth - [x] MLflow deployed as external service (`mlops` compose profile); own auth; health check integrated
- [ ] Write first retraining DAG (Airflow) + first MLflow experiment logging from `ml/serving` - [ ] Write first retraining pipeline + first MLflow experiment logging from `ml/serving` + JetStream consumers (#98)
- [ ] Feature-to-prompt pipeline — nightly Airflow DAG materializes context for LLM; cuts inline latency (#94) - [ ] Feature-to-prompt pipeline — nightly batch job materializes context for LLM; cuts inline latency (#94)
- [ ] Prompt optimization loop — sim A/B → MLflow experiment → human-approved promotion (#95) - [ ] Prompt optimization loop — sim A/B → MLflow experiment → human-approved promotion (#95)
- [ ] LLM fine-tuning — tip reactions as training signal; LoRA on base model; MLflow tracks runs (#96) - [ ] LLM fine-tuning — tip reactions as training signal; LoRA on base model; MLflow tracks runs (#96)
- [ ] Embedding-based task clustering — `nomic-embed-text` for dedup + user pattern features (#97) - [ ] Embedding-based task clustering — `nomic-embed-text` for dedup + user pattern features (#97)
- [ ] Consolidate MLflow + Airflow auth into shared OIDC provider (tracked as M3 issue #85) - [ ] Modular-monolith packaging + import-boundary lint (#47)
- [ ] Consolidate MLflow auth into shared OIDC provider (tracked as M3 issue #85)
- [ ] Shadow → A/B → launch pipeline as first-class in MLflow - [ ] Shadow → A/B → launch pipeline as first-class in MLflow
- [ ] Online experiments framework: deterministic assignment + bandit policies alongside fixed-split A/B - [ ] Online experiments framework: deterministic assignment + bandit policies alongside fixed-split A/B
- [ ] Cross-user collaborative features (opt-in only); cohort slicing; fairness checks - [ ] Cross-user collaborative features (opt-in only); cohort slicing; fairness checks

View File

@@ -22,11 +22,19 @@ Two ways to sign in:
| Route | Description | | Route | Description |
|-------|-------------| |-------|-------------|
| `/` | Overview: DAU/WAU KPI cards, tips served, reaction breakdown, activation funnel | | `/` | Overview: DAU/WAU KPI cards, tips served, reaction breakdown, activation funnel |
| `/users` | User list (paginated) | | `/users` | User list (paginated, searchable) |
| `/users/:id` | User detail: identity, consents, integrations, profile features (#81 phase B), tip stats, reward history; revoke-integration + reset-bandit + rebuild-profile actions | | `/users/:id` | User detail: identity, consents, integrations, profile features (completion rate, dismiss rate, dwell, preferred hour, tip volume), tip stats, reward history; revoke-integration + reset-bandit + rebuild-profile actions |
| `/audit` | Admin action audit log | | `/audit` | Admin action audit log with timestamps and descriptions |
| `/events` | Event stream viewer (stub — pending API history endpoint) | | `/events` | Live event stream viewer with filters by subject/user/time; tail of `signals.*` from ring buffer or NATS JetStream |
| `/reward-analytics` | Reaction distribution + per-policy / per-model / per-prompt-version / per-tip-kind breakdowns with avg reward | | `/features` | Feature store browser: features sent to `ml/serving` per scoring call; freshness status; per-feature SLA tracking |
| `/tips` | Served tips explorer: tip content, score, policy, model, feedback reactions; per-user timeline |
| `/reward-analytics` | Reaction distribution + per-policy / per-model / per-prompt-version breakdowns with avg reward; time-series and cohort slicing |
| `/data-quality` | Missing-feature rate heatmap, stale-token rate, daily completeness, per-feature freshness SLA status |
| `/health` | System health rollup: api, ml/serving, SQLite, event-bus, MLflow with 15s auto-refresh |
| `/sql` | Read-only SQL runner against SQLite; saved queries support; sunsets to Superset in M4 |
| `/simulate` | Offline simulation runner: launch `ml/experiments/sim`, track runs, judge selection, policy comparison |
| `/docs` | Admin documentation and ops runbooks inline |
| `/ops` | Operational dashboard (deprecation candidate; pending UX refinement #107) |
## Dev ## Dev
@@ -40,8 +48,9 @@ pnpm --filter @oo/admin dev # starts on :3080
Stays as a Next.js app in the monorepo permanently — it's not a candidate for extraction. Stays as a Next.js app in the monorepo permanently — it's not a candidate for extraction.
It gets richer (more pages, embedded MLflow/Grafana) but not split. It gets richer (more pages, embedded MLflow/Grafana) but not split.
## Known issues ## Known issues & pending improvements
- `@tremor/react 3.x` declares a peer dep on React 18; the workspace uses React 19. - `@tremor/react 3.x` declares a peer dep on React 18; the workspace uses React 19.
Works in practice. Will resolve naturally when Tremor ships React 19 support or when Works in practice. Will resolve naturally when Tremor ships React 19 support or when
we switch to Tremor v4 (which targets React 18+). we switch to Tremor v4 (which targets React 18+).
- UX refinements pending (#100102): feedback options consolidation, config page UI migration, settings UI placement

View File

@@ -5,16 +5,11 @@ import { AdminShell } from '@/components/AdminShell';
import { getSimulationRuns, SimRun } from '@/lib/api'; import { getSimulationRuns, SimRun } from '@/lib/api';
const mlflowBase = process.env.NEXT_PUBLIC_MLFLOW_URL ?? '/mlflow'; const mlflowBase = process.env.NEXT_PUBLIC_MLFLOW_URL ?? '/mlflow';
const airflowBase = process.env.NEXT_PUBLIC_AIRFLOW_URL ?? '/airflow';
function mlflowRunUrl(runId: string) { function mlflowRunUrl(runId: string) {
return `${mlflowBase}/#/experiments/1/runs/${runId}`; return `${mlflowBase}/#/experiments/1/runs/${runId}`;
} }
function airflowRunUrl(dagRunId: string) {
return `${airflowBase}/dags/bandit_sim/grid?dag_run_id=${encodeURIComponent(dagRunId)}`;
}
function StatusBadge({ status }: { status: string }) { function StatusBadge({ status }: { status: string }) {
const cls: Record<string, string> = { const cls: Record<string, string> = {
running: 'bg-blue-900 text-blue-300 border-blue-800', running: 'bg-blue-900 text-blue-300 border-blue-800',
@@ -50,10 +45,6 @@ function SummaryRow({ run }: { run: SimRun }) {
<a href={mlflowRunUrl(run.mlflowRunId)} target="_blank" rel="noreferrer" <a href={mlflowRunUrl(run.mlflowRunId)} target="_blank" rel="noreferrer"
className="text-xs text-indigo-400 hover:underline">MLflow </a> className="text-xs text-indigo-400 hover:underline">MLflow </a>
)} )}
{run.airflowDagRunId && (
<a href={airflowRunUrl(run.airflowDagRunId)} target="_blank" rel="noreferrer"
className="text-xs text-indigo-400 hover:underline">Airflow </a>
)}
</div> </div>
</div> </div>
{summary && ( {summary && (
@@ -97,11 +88,7 @@ export default function SimulatePage() {
<div> <div>
<h1 className="text-xl font-semibold">Simulations</h1> <h1 className="text-xl font-semibold">Simulations</h1>
<p className="text-sm text-gray-500 mt-1"> <p className="text-sm text-gray-500 mt-1">
Offline policy comparisons run via the{' '} Offline policy comparisons trigger via the admin API or CLI. Results are logged to{' '}
<a href={airflowBase} target="_blank" rel="noreferrer" className="text-indigo-400 hover:underline">
Airflow <code className="text-xs">bench_collect</code> DAG
</a>
{' '}(mlops profile). Results are logged to{' '}
<a href={mlflowBase} target="_blank" rel="noreferrer" className="text-indigo-400 hover:underline">MLflow </a>. <a href={mlflowBase} target="_blank" rel="noreferrer" className="text-indigo-400 hover:underline">MLflow </a>.
</p> </p>
</div> </div>
@@ -114,7 +101,7 @@ export default function SimulatePage() {
{loading && <span className="text-gray-600 ml-2 normal-case">loading</span>} {loading && <span className="text-gray-600 ml-2 normal-case">loading</span>}
</h2> </h2>
{runs.length === 0 && !loading && ( {runs.length === 0 && !loading && (
<p className="text-gray-600 text-sm">No simulation runs yet. Trigger a run from Airflow.</p> <p className="text-gray-600 text-sm">No simulation runs yet.</p>
)} )}
{runs.map((r) => <SummaryRow key={r.id} run={r} />)} {runs.map((r) => <SummaryRow key={r.id} run={r} />)}
</section> </section>

View File

@@ -4,8 +4,7 @@ import Link from 'next/link';
import { usePathname } from 'next/navigation'; import { usePathname } from 'next/navigation';
import { useEffect, useState } from 'react'; import { useEffect, useState } from 'react';
const mlflowUrl = process.env.NEXT_PUBLIC_MLFLOW_URL ?? '/mlflow'; const mlflowUrl = process.env.NEXT_PUBLIC_MLFLOW_URL ?? '/mlflow';
const airflowUrl = process.env.NEXT_PUBLIC_AIRFLOW_URL ?? '/airflow';
type NavItem = { type NavItem = {
href: string; href: string;
@@ -53,8 +52,7 @@ const NAV: NavSection[] = [
label: 'Resources', label: 'Resources',
items: [ items: [
{ href: '/docs', label: 'Docs' }, { href: '/docs', label: 'Docs' },
{ href: mlflowUrl, label: 'MLflow ↗', external: true, svcName: 'mlflow' }, { href: mlflowUrl, label: 'MLflow ↗', external: true, svcName: 'mlflow' },
{ href: airflowUrl, label: 'Airflow ↗', external: true, svcName: 'airflow' },
], ],
}, },
]; ];

View File

@@ -278,7 +278,6 @@ export interface SimRun {
summaryJson: string | null; summaryJson: string | null;
winner: string | null; winner: string | null;
personaBreakdownJson: string | null; personaBreakdownJson: string | null;
airflowDagRunId: string | null;
mlflowRunId: string | null; mlflowRunId: string | null;
createdAt: string; createdAt: string;
finishedAt: string | null; finishedAt: string | null;
@@ -293,7 +292,7 @@ export interface SimStartRequest {
} }
export function startSimulation(req: SimStartRequest) { export function startSimulation(req: SimStartRequest) {
return apiFetch<{ id: string; status: string; airflow_dag_run_id?: string }>( return apiFetch<{ id: string; status: string }>(
'/admin/simulate/start', '/admin/simulate/start',
{ method: 'POST', body: JSON.stringify(req) }, { method: 'POST', body: JSON.stringify(req) },
); );

View File

@@ -33,11 +33,10 @@ Same stack as `apps/web`. Reuses `packages/shared-types`, the Auth.js session co
Specialized MLOps tooling runs as **separate external services** with their own auth, linked from the admin shell — not embedded or reimplemented: Specialized MLOps tooling runs as **separate external services** with their own auth, linked from the admin shell — not embedded or reimplemented:
- **MLflow** → `https://o.alogins.net/mlflow` — experiment tracking, model registry, artifact browser; own basic-auth for now; see M3 for SSO consolidation - **MLflow** → `https://o.alogins.net/mlflow` — experiment tracking, model registry, artifact browser; own basic-auth for now; see M3 for SSO consolidation
- **Airflow** → `https://o.alogins.net/airflow` — batch pipeline orchestration, dataset management; own web-auth for now
- **Grafana panels** → `/admin/infra` (iframed panels) — infra metrics - **Grafana panels** → `/admin/infra` (iframed panels) — infra metrics
- **Marimo notebooks** → launch-out link from admin - **Marimo notebooks** → launch-out link from admin
The admin shell links to these services; clicking them opens a new tab. The `/experiments` and `/models` admin pages are hub pages with direct links to the relevant MLflow/Airflow views. The admin shell links to these services; clicking them opens a new tab.
### AuthZ ### AuthZ
@@ -56,7 +55,7 @@ The admin shell links to these services; clicking them opens a new tab. The `/ex
- One more Next.js app in the monorepo. Build/dev added to Turborepo. - One more Next.js app in the monorepo. Build/dev added to Turborepo.
- Tremor + shadcn/ui are added as dependencies. shadcn components are copied into `apps/admin/src/components/ui/` — no runtime version coupling. - Tremor + shadcn/ui are added as dependencies. shadcn components are copied into `apps/admin/src/components/ui/` — no runtime version coupling.
- MLflow (`o.alogins.net/mlflow*` → port 5000) and Airflow (`o.alogins.net/airflow*` → port 8080) are path-based routes in the existing `o.alogins.net` Caddy block, started via `docker compose --profile mlops up`. - MLflow (`o.alogins.net/mlflow*` → port 5000) is a path-based route in the existing `o.alogins.net` Caddy block, started via `docker compose --profile mlops up`.
- Each service manages its own auth (MLflow: built-in basic-auth; Airflow: built-in web UI auth). M3 will consolidate both behind the shared OIDC provider. - MLflow manages its own auth (built-in basic-auth). M3 will consolidate behind the shared OIDC provider.
- The `NEXT_PUBLIC_MLFLOW_URL` and `NEXT_PUBLIC_AIRFLOW_URL` build args in `Dockerfile.admin` default to the production URLs; override for dev builds. - The `NEXT_PUBLIC_MLFLOW_URL` build arg in `Dockerfile.admin` defaults to the production URL; override for dev builds.
- `admin_actions` audit log grows unboundedly — needs a retention policy before M4. - `admin_actions` audit log grows unboundedly — needs a retention policy before M4.

View File

@@ -0,0 +1,106 @@
# ADR-0013 — Multi-agent recommendation: pre-computed agent snippets + orchestrator LLM
**Status:** Accepted
**Date:** 2026-05-01
**Supersedes:** ADR-0007, ADR-0012
## Context
The ε-greedy bandit (ADR-0007, promoted to v2 in ADR-0012) was the first recommendation
policy. It served adequately during early M1 testing but carries structural problems that
become more acute as the user base grows:
- **Training signal sparsity.** The median user generates fewer than 5 reward signals per
week. Ridge regression on a 12-dimensional feature vector needs far more signal than
that to converge to a meaningful θ before the user loses interest.
- **Cold-start cost.** Every new user starts with an uninformed identity matrix. Early tips
are essentially random for the first weeks of use — precisely when first impressions
matter most.
- **Opacity.** The bandit cannot explain why it chose a tip. An orchestrator that reasons
explicitly over named agent outputs ("3 overdue tasks + peak hour approaching") is
interpretable by design.
- **Coupling of generation and selection.** The current pipeline generates candidates, then
scores them; the scoring is decoupled from the LLM reasoning. Giving the LLM the full
pre-computed context directly is a simpler and more capable design.
## Decision
Replace the RL bandit with a **multi-agent pipeline**:
### Sub-agents (async, pre-computed)
Multiple domain-specialized Python agents each analyze user state from one angle and
produce a **prompt snippet** — a short natural-language paragraph describing what they
found. They do not produce tips. They run periodically (every 15 minutes) and store
results in the new `agent_outputs` table with per-agent TTLs.
Initial agent set:
| Agent | ID | TTL |
|---|---|---|
| OverdueTaskAgent | `overdue-task` | 1h |
| MomentumAgent | `momentum` | 6h |
| TimeOfDayAgent | `time-of-day` | 15m |
| RecentPatternsAgent | `recent-patterns` | 24h |
| FocusAreaAgent | `focus-area` | 12h |
### Orchestrator agent (real-time)
When a user requests a tip, the TypeScript recommender:
1. Fetches all non-expired `agent_outputs` rows for the user.
2. Calls `POST /recommend` on `ml/serving` with the snippet list.
3. `ml/serving` assembles a single orchestrator prompt (template `v4-orchestrator`)
that concatenates all snippets, then calls LiteLLM via the existing `tip-generator`
alias to produce one tip.
No bandit scoring. No reward delivery to an ML model. The LLM receives full context and
generates the tip in one call.
### Feedback
`tipFeedback` rows are still written on every user reaction. `inferReward()` still runs
and `rewardMilli` is logged for observability and potential future supervised learning.
Reactions are not delivered to an ML endpoint.
## New data model
```sql
CREATE TABLE agent_outputs (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL REFERENCES users(id),
agent_id TEXT NOT NULL, -- e.g. 'overdue-task'
prompt_text TEXT NOT NULL, -- snippet produced by the agent
signals_snapshot TEXT, -- JSON: inputs the agent consumed
computed_at TEXT NOT NULL, -- ISO 8601
expires_at TEXT NOT NULL, -- ISO 8601 = computed_at + TTL
agent_version TEXT NOT NULL -- bump to invalidate cached outputs on logic changes
);
CREATE INDEX idx_agent_outputs_user_agent_exp
ON agent_outputs(user_id, agent_id, expires_at DESC);
```
## Consequences
### Positive
- Tips are explainable: `featuresJson` in `tipScores` records which agents contributed.
- Cold-start is eliminated: the orchestrator reasons from signals immediately, no warm-up.
- Adding or removing an agent is a self-contained change in `ml/agents/`.
- Swapping LLM models remains a config change (LiteLLM alias unchanged).
### Negative / risks
- **No automatic exploration.** The bandit would discover that a user prefers certain tip
types without being told. The orchestrator only knows what the agents tell it.
Mitigation: agents can evolve to encode richer signals; offline evaluation via the
existing bench scripts remain available.
- **Scheduler dependency.** If the pre-compute job falls behind, agent outputs go
stale. Mitigation: the orchestrator falls back to raw signal prompt when no outputs
exist; `TimeOfDayAgent` recomputes every 15 min to stay fresh.
- **Higher per-request token cost.** The orchestrator prompt is longer than the old bandit
prompt. Mitigation: the `tip-generator` alias points to a small local model; token cost
is negligible at current scale.
## Migration sequence
See plan document in conversation context. 10 steps; each independently deployable and
rollback-able. Cutover is Step 6 (single TypeScript PR). Bandit endpoints removed in
Step 7 after 48h clean traffic.

View File

@@ -47,7 +47,6 @@ User reactions (done / snooze / dismiss) are events too. They close the loop as
- **OpenAPI** for HTTP; TS client auto-generated; Python pydantic hand-written while consumers are few. - **OpenAPI** for HTTP; TS client auto-generated; Python pydantic hand-written while consumers are few.
- **Feast** for feature store when we get there; homegrown adapter until then (Phase 1 seam). - **Feast** for feature store when we get there; homegrown adapter until then (Phase 1 seam).
- **MLflow** for model registry and experiment tracking; deployed at `o.alogins.net/mlflow`. - **MLflow** for model registry and experiment tracking; deployed at `o.alogins.net/mlflow`.
- **Airflow** for batch pipelines; deployed at `o.alogins.net/airflow`.
- **Auth.js** embedded behind an OIDC-shaped boundary (ADR-0004). Swap to a standalone OIDC provider when mobile ships. - **Auth.js** embedded behind an OIDC-shaped boundary (ADR-0004). Swap to a standalone OIDC provider when mobile ships.
- **k3s** as the first step beyond docker-compose — no "compose → full k8s" cliff. - **k3s** as the first step beyond docker-compose — no "compose → full k8s" cliff.

View File

@@ -19,10 +19,8 @@ RUN --mount=type=cache,id=pnpm,target=/pnpm/store \
--filter @oo/admin... --filter @oo/shared-types --filter @oo/admin... --filter @oo/shared-types
RUN pnpm --filter @oo/shared-types build RUN pnpm --filter @oo/shared-types build
ARG NEXT_PUBLIC_MLFLOW_URL=/mlflow ARG NEXT_PUBLIC_MLFLOW_URL=/mlflow
ARG NEXT_PUBLIC_AIRFLOW_URL=/airflow
ENV NEXT_TELEMETRY_DISABLED=1 \ ENV NEXT_TELEMETRY_DISABLED=1 \
NEXT_PUBLIC_MLFLOW_URL=$NEXT_PUBLIC_MLFLOW_URL \ NEXT_PUBLIC_MLFLOW_URL=$NEXT_PUBLIC_MLFLOW_URL
NEXT_PUBLIC_AIRFLOW_URL=$NEXT_PUBLIC_AIRFLOW_URL
RUN pnpm --filter @oo/admin build RUN pnpm --filter @oo/admin build
FROM node:22-slim AS runner FROM node:22-slim AS runner

View File

@@ -13,9 +13,6 @@ services:
NODE_ENV: production NODE_ENV: production
ML_SERVING_URL: "http://ml-serving:8000" ML_SERVING_URL: "http://ml-serving:8000"
MLFLOW_URL: "http://mlflow:5000" MLFLOW_URL: "http://mlflow:5000"
AIRFLOW_URL: "http://airflow-webserver:8080"
AIRFLOW_API_USER: "admin"
AIRFLOW_API_PASSWORD: "${AIRFLOW_ADMIN_PASSWORD:-admin}"
INTERNAL_API_TOKEN: "${INTERNAL_API_TOKEN:-}" INTERNAL_API_TOKEN: "${INTERNAL_API_TOKEN:-}"
volumes: volumes:
- /mnt/ssd/dbs/oo:/mnt/ssd/dbs/oo - /mnt/ssd/dbs/oo:/mnt/ssd/dbs/oo
@@ -56,7 +53,6 @@ services:
HOSTNAME: "0.0.0.0" HOSTNAME: "0.0.0.0"
NEXT_PUBLIC_API_URL: "" NEXT_PUBLIC_API_URL: ""
NEXT_PUBLIC_MLFLOW_URL: "/mlflow" NEXT_PUBLIC_MLFLOW_URL: "/mlflow"
NEXT_PUBLIC_AIRFLOW_URL: "/airflow"
INTERNAL_API_URL: "http://api:3078" INTERNAL_API_URL: "http://api:3078"
ports: ports:
- "127.0.0.1:3080:3080" - "127.0.0.1:3080:3080"
@@ -85,100 +81,9 @@ services:
timeout: 5s timeout: 5s
retries: 5 retries: 5
# ── mlops profile — MLflow + Airflow ────────────────────────────────────── # ── mlops profile — MLflow ────────────────────────────────────────────────
# Start: docker compose --profile mlops up # Start: docker compose --profile mlops up
# MLflow UI: http://localhost:5000 or https://o.alogins.net/mlflow (admin / password — change via basic_auth.ini) # MLflow UI: http://localhost:5000 or https://o.alogins.net/mlflow
# Airflow UI: http://localhost:8080/airflow or https://o.alogins.net/airflow (admin / AIRFLOW_ADMIN_PASSWORD)
# Caddy routes /mlflow* and /airflow* inside the o.alogins.net block
airflow-db:
image: postgres:16-alpine
profiles: [mlops]
environment:
POSTGRES_DB: airflow
POSTGRES_USER: airflow
POSTGRES_PASSWORD: ${AIRFLOW_DB_PASSWORD:-airflow}
volumes:
- /mnt/ssd/dbs/oo/airflow-db:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U airflow"]
interval: 10s
timeout: 5s
retries: 5
airflow-init:
image: apache/airflow:2.9.3
profiles: [mlops]
entrypoint: /bin/bash
command:
- -c
- |
airflow db migrate
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@oo.local \
--password "$${AIRFLOW_ADMIN_PASSWORD:-admin}"
environment:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD:-airflow}@airflow-db/airflow
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__WEBSERVER__SECRET_KEY: ${AIRFLOW_SECRET_KEY:-change-me-in-prod}
AIRFLOW__WEBSERVER__BASE_URL: ${AIRFLOW_BASE_URL:-https://o.alogins.net/airflow}
depends_on:
airflow-db:
condition: service_healthy
restart: "no"
airflow-webserver:
image: apache/airflow:2.9.3
profiles: [mlops]
command: webserver
environment:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD:-airflow}@airflow-db/airflow
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__WEBSERVER__SECRET_KEY: ${AIRFLOW_SECRET_KEY:-change-me-in-prod}
AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW_FERNET_KEY:-}
AIRFLOW__WEBSERVER__BASE_URL: ${AIRFLOW_BASE_URL:-https://o.alogins.net/airflow}
AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth"
_PIP_ADDITIONAL_REQUIREMENTS: "mlflow==2.14.3 httpx"
MLFLOW_TRACKING_URI: "http://mlflow:5000/mlflow"
MLFLOW_TRACKING_USERNAME: "admin"
MLFLOW_TRACKING_PASSWORD: "${MLFLOW_ADMIN_PASSWORD:-password}"
volumes:
- ../../ml/pipelines:/opt/airflow/dags:ro
- ../../ml:/opt/airflow/ml:ro
ports:
- "127.0.0.1:8080:8080"
depends_on:
airflow-init:
condition: service_completed_successfully
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
airflow-scheduler:
image: apache/airflow:2.9.3
profiles: [mlops]
command: scheduler
environment:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD:-airflow}@airflow-db/airflow
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW_FERNET_KEY:-}
_PIP_ADDITIONAL_REQUIREMENTS: "mlflow==2.14.3 httpx"
MLFLOW_TRACKING_URI: "http://mlflow:5000/mlflow"
MLFLOW_TRACKING_USERNAME: "admin"
MLFLOW_TRACKING_PASSWORD: "${MLFLOW_ADMIN_PASSWORD:-password}"
volumes:
- ../../ml/pipelines:/opt/airflow/dags:ro
- ../../ml:/opt/airflow/ml:ro
depends_on:
airflow-init:
condition: service_completed_successfully
# ── events profile — NATS JetStream ───────────────────────────────────── # ── events profile — NATS JetStream ─────────────────────────────────────
# Start: docker compose --profile events up # Start: docker compose --profile events up
@@ -201,7 +106,7 @@ services:
retries: 5 retries: 5
mlflow: mlflow:
image: ghcr.io/mlflow/mlflow:v2.14.3 image: ghcr.io/mlflow/mlflow:v3.11.1
profiles: [mlops] profiles: [mlops]
command: > command: >
mlflow server mlflow server
@@ -209,17 +114,15 @@ services:
--default-artifact-root /mlflow/artifacts --default-artifact-root /mlflow/artifacts
--host 0.0.0.0 --host 0.0.0.0
--port 5000 --port 5000
--app-name basic-auth
--static-prefix /mlflow --static-prefix /mlflow
environment: --allowed-hosts o.alogins.net,localhost
MLFLOW_AUTH_CONFIG_PATH: /mlflow/basic_auth.ini --cors-allowed-origins https://o.alogins.net
volumes: volumes:
- /mnt/ssd/dbs/oo/mlflow:/mlflow - /mnt/ssd/dbs/oo/mlflow:/mlflow
- ../../infra/mlflow/basic_auth.ini:/mlflow/basic_auth.ini:ro
ports: ports:
- "127.0.0.1:5000:5000" - "127.0.0.1:5000:5000"
healthcheck: healthcheck:
test: ["CMD", "python", "-c", "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:5000/health',timeout=3).status==200 else 1)"] test: ["CMD", "python", "-c", "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:5000/mlflow/health',timeout=3).status==200 else 1)"]
interval: 10s interval: 10s
timeout: 5s timeout: 5s
retries: 5 retries: 5

View File

@@ -6,7 +6,7 @@ Python. Owns models, features, training, online scoring.
|---|---|---| |---|---|---|
| `serving/` | FastAPI online scorer (`/score`, `/generate`) + LiteLLM gateway + prompt registry (`prompts.py`) + JetStream consumers for `signals.>` / `feedback.>`, called by `recommender` | 12 | | `serving/` | FastAPI online scorer (`/score`, `/generate`) + LiteLLM gateway + prompt registry (`prompts.py`) + JetStream consumers for `signals.>` / `feedback.>`, called by `recommender` | 12 |
| `features/` | context assembler (`context.py`): signals → `PromptContext`; profile-feature schema mirror (`profile_schema.py`); Feast adapter later | 2 | | `features/` | context assembler (`context.py`): signals → `PromptContext`; profile-feature schema mirror (`profile_schema.py`); Feast adapter later | 2 |
| `pipelines/` | batch feature + training DAGs (Prefect/Airflow) | 4 | | `pipelines/` | batch feature + training scripts | 4 |
| `registry/` | MLflow-backed model registry integration | 4 | | `registry/` | MLflow-backed model registry integration | 4 |
| `experiments/` | A/B assignment + multi-armed bandit policies | 4 | | `experiments/` | A/B assignment + multi-armed bandit policies | 4 |
| `notebooks/` | research; never imported by production code | — | | `notebooks/` | research; never imported by production code | — |

View File

@@ -1,90 +0,0 @@
# Airflow Integration — `bench_collect` DAG
The benchmark harness integrates with Airflow as a DAG (`ml/pipelines/bench_dag.py`)
triggered on-demand from the admin UI or the CLI.
## DAG Structure
Three linked tasks:
1. **`collect`** — `collect.py` generates candidates per (model × prompt × scenario) cell,
logs MLflow runs with `judge_pending=true`. Rejects models >4B, uses `keep_alive=0`
for RAM safety.
2. **`export_for_judge`** — `judge_cli.py --export` pulls pending runs into a single
JSON file for Claude Code to score per the rubric. XCom-pushes the path so the
next task can find it.
3. **`compare`** — `compare.py` aggregates scores by (model, prompt) cell and
generates the leaderboard ranked by composite score.
## Triggering from the CLI
```bash
# Minimal: use all defaults
airflow dags trigger bench_collect
# Custom config: specify models, prompts, scenario count
airflow dags trigger bench_collect --conf '{
"models": "qwen2.5:0.5b,qwen2.5:1.5b",
"prompts": "v1,v2-mentor",
"n_tips": 5,
"n_scenarios": 2,
"temperature": 0.7,
"experiment": "tip-bench-custom"
}'
```
## Triggering from the Admin UI
The API exposes:
```
POST /api/bench/run { config object }
```
Admin UI → Benchmark panel → "Run Collection" button → form dialog fills config →
POST to `/api/bench/run` → DAG triggered.
## Configuration Keys
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `models` | str | `qwen2.5:0.5b,qwen2.5:1.5b,gemma3:1b,llama3.2:3b` | comma-separated Ollama tags |
| `prompts` | str | `v1,v2-mentor,v3-few-shot` | comma-separated prompt versions |
| `n_tips` | int | 5 | candidates to generate per scenario |
| `n_scenarios` | int | 0 | cap scenario count (0 = all 8) |
| `temperature` | float | 0.7 | LLM generation temperature |
| `experiment` | str | `tip-bench-auto` | MLflow experiment name |
| `max_model_b` | float | 4.0 | reject models larger than this (in billions) |
| `ollama_url` | str | `http://localhost:11434` | Ollama endpoint |
| `mlflow_url` | str | `$MLFLOW_TRACKING_URI` or `http://localhost:5000` | MLflow tracking URI |
## Human-in-the-Loop Judge
After `collect` finishes, `export_for_judge` produces a JSON file with all pending
runs. The Claude Code session:
1. Reads the file
2. Scores each candidate per the rubric (relevance/actionability/tone 15)
3. Runs `judge_cli.py --apply /path/to/file.json` to write scores back to MLflow
Then `compare` generates the leaderboard.
**Future enhancement:** Add a webhook or admin UI button to trigger the judge step
so the entire pipeline is end-to-end in Airflow, not requiring manual Claude Code
intervention.
## Monitoring
- **Airflow UI**: `http://localhost:8080` → DAGs → `bench_collect` → graph view
- **MLflow UI**: `http://localhost:5000/mlflow` → experiments → `tip-bench-*`
- **Admin API**: `GET /api/bench/leaderboard/tip-bench-auto` → JSON leaderboard
## Future: Admin UI Panel
`apps/admin/src/components/BenchPanel.tsx` (TBD):
- List experiments
- Trigger DAG with form (models, prompts, scenario count, temperature)
- Display current DAG run status
- Show leaderboard once `compare` completes

View File

@@ -77,13 +77,9 @@ keys `artifact:candidates.json`, `artifact:prompt.txt`, `artifact:raw.txt`
(tag fallback because the MLflow server uses a file:// artifact backend (tag fallback because the MLflow server uses a file:// artifact backend
not accessible via REST from the host). not accessible via REST from the host).
## Integrating with Airflow (#95) ## Running standalone
A future DAG `ml/pipelines/prompt_ab_eval.py` will wrap `collect.py` The pipeline runs on any machine with:
exactly as shown in the quick-start, triggered on-demand from the admin
UI or manually. The results feed into the admin leaderboard view.
For now, the pipeline is runnable standalone on any machine with:
- Ollama models ≤4B - Ollama models ≤4B
- MLflow tracking server - MLflow tracking server
- Python 3.10+ - Python 3.10+

View File

@@ -10,8 +10,7 @@ Why not the official ``mlflow`` SDK? Two reasons specific to the oO setup:
Pulling a 200MB SDK transitively for that is excess weight. Pulling a 200MB SDK transitively for that is excess weight.
All calls are synchronous httpx with explicit ``Host`` so the script can All calls are synchronous httpx with explicit ``Host`` so the script can
run from the host shell, from inside docker, or from Airflow workers run from the host shell or from inside docker without further config.
without further config.
""" """
from __future__ import annotations from __future__ import annotations

View File

@@ -1,168 +0,0 @@
"""
Airflow DAG: bench_collect
Runs the tip-generation benchmark (model × prompt evaluation). Triggered
on-demand from the admin UI or manually, collects candidates per cell,
exports for Claude Code judgment, and generates a leaderboard.
Mirrors the manual flow:
1. collect.py → generates candidates, logs to MLflow with judge_pending=true
2. (human: judge_cli.py --export, Claude Code scores, judge_cli.py --apply)
3. compare.py → leaderboard
For now, steps 2 is manual. Future: add a webhook to trigger the human
judge from the admin UI or set up an async task queue.
Required conf keys (passed via dag_run.conf):
models str — comma-separated model tags (e.g. "qwen2.5:0.5b,qwen2.5:1.5b")
prompts str — comma-separated prompt versions (default: "v1,v2-mentor,v3-few-shot")
n_tips int — candidates to generate per scenario (default: 5)
n_scenarios int — cap scenario count; 0 = all (default: 0)
temperature float — LLM generation temperature (default: 0.7)
experiment str — MLflow experiment name (default: "tip-bench-auto")
max_model_b float — reject models larger than this (default: 4.0)
ollama_url str — Ollama endpoint (default: http://localhost:11434)
mlflow_url str — MLflow tracking URI (env MLFLOW_TRACKING_URI or http://localhost:5000)
"""
from __future__ import annotations
import json
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from airflow import DAG
from airflow.operators.python import PythonOperator
def _collect(**context: object) -> dict:
"""Run collect.py with the provided config."""
conf: dict = context["dag_run"].conf or {}
models = str(conf.get("models", "qwen2.5:0.5b,qwen2.5:1.5b,gemma3:1b,llama3.2:3b"))
prompts = str(conf.get("prompts", "v1,v2-mentor,v3-few-shot"))
n_tips = int(conf.get("n_tips", 5))
n_scenarios = int(conf.get("n_scenarios", 0))
temperature = float(conf.get("temperature", 0.7))
experiment = str(conf.get("experiment", "tip-bench-auto"))
max_model_b = float(conf.get("max_model_b", 4.0))
ollama_url = str(conf.get("ollama_url", os.environ.get("OLLAMA_URL", "http://localhost:11434")))
mlflow_url = str(conf.get("mlflow_url", os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000")))
sys.path.insert(0, "/opt/airflow/ml/experiments/bench")
from collect import main as collect_main # type: ignore
# Build args for collect.py
args = [
"--models", models,
"--prompts", prompts,
"--experiment", experiment,
"--n-tips", str(n_tips),
"--temperature", str(temperature),
"--max-model-b", str(max_model_b),
"--ollama-url", ollama_url,
"--mlflow-url", mlflow_url,
]
if n_scenarios > 0:
args.extend(["--n-scenarios", str(n_scenarios)])
# Inject args into sys.argv so argparse picks them up
old_argv = sys.argv
try:
sys.argv = ["collect.py"] + args
result = collect_main()
return {
"status": "success" if result == 0 else "failed",
"exit_code": result,
"experiment": experiment,
}
finally:
sys.argv = old_argv
def _compare(**context: object) -> dict:
"""Run compare.py to generate the leaderboard."""
conf: dict = context["dag_run"].conf or {}
experiment = str(conf.get("experiment", "tip-bench-auto"))
mlflow_url = str(conf.get("mlflow_url", os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000")))
sys.path.insert(0, "/opt/airflow/ml/experiments/bench")
from compare import main as compare_main # type: ignore
old_argv = sys.argv
try:
sys.argv = [
"compare.py",
"--experiment", experiment,
"--mlflow-url", mlflow_url,
]
result = compare_main()
return {
"status": "success" if result == 0 else "failed",
"exit_code": result,
"experiment": experiment,
}
finally:
sys.argv = old_argv
def _export_for_judge(**context: object) -> str:
"""Export pending runs for Claude Code judgment."""
conf: dict = context["dag_run"].conf or {}
experiment = str(conf.get("experiment", "tip-bench-auto"))
mlflow_url = str(conf.get("mlflow_url", os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000")))
export_path = f"/tmp/oo-bench-{experiment}-{int(context['ti'].start_date.timestamp())}.json"
sys.path.insert(0, "/opt/airflow/ml/experiments/bench")
from judge_cli import export # type: ignore
from mlflow_client import MLflowClient # type: ignore
client = MLflowClient(
tracking_uri=mlflow_url,
username=os.environ.get("MLFLOW_TRACKING_USERNAME") or "admin",
password=os.environ.get("MLFLOW_TRACKING_PASSWORD") or "password",
)
result = export(client, experiment, export_path)
# XCom: push path so next task can find it
context["ti"].xcom_push(key="export_path", value=export_path)
return export_path
with DAG(
dag_id="bench_collect",
description="Tip-generation benchmark: model & prompt evaluation via MLflow",
schedule_interval=None,
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["bench", "ml", "evaluation"],
default_args={
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
) as dag:
collect = PythonOperator(
task_id="collect",
python_callable=_collect,
provide_context=True,
)
export_judge = PythonOperator(
task_id="export_for_judge",
python_callable=_export_for_judge,
provide_context=True,
)
compare = PythonOperator(
task_id="compare",
python_callable=_compare,
provide_context=True,
)
collect >> export_judge >> compare

View File

@@ -1,124 +0,0 @@
"""
Airflow DAG: bandit_sim
Runs a bandit policy simulation and logs results to MLflow.
Triggered on-demand from the oO admin panel or manually from the Airflow UI.
Required conf keys (passed via dag_run.conf):
sim_run_id str — oO SQLite run ID for callback correlation
n_users int — number of synthetic users
n_rounds int — rounds per user
tasks_per_round int — candidate pool size per round
policies list — policy names to compare
judge_mode str — "rule" | "llm"
ml_url str — ml/serving URL (e.g. http://ml-serving:8000)
mlflow_url str — MLflow tracking URI (e.g. http://mlflow:5000/mlflow)
callback_url str — oO API callback endpoint
internal_token str — x-internal-token header value
"""
from __future__ import annotations
import json
import os
import sys
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
def _run_sim(**context: object) -> dict:
conf: dict = context["dag_run"].conf or {}
n_users = int(conf.get("n_users", 5))
n_rounds = int(conf.get("n_rounds", 20))
tasks_per_round = int(conf.get("tasks_per_round", 8))
policies = list(conf.get("policies", ["linucb-v1", "egreedy-v1"]))
judge_mode = str(conf.get("judge_mode", "rule"))
ml_url = str(conf.get("ml_url", "http://ml-serving:8000"))
mlflow_url = str(conf.get("mlflow_url", os.environ.get("MLFLOW_TRACKING_URI", "")))
mlflow_experiment = "bandit_simulation"
sys.path.insert(0, "/opt/airflow/ml/experiments/sim")
from runner import run_simulation # type: ignore[import]
use_llm = judge_mode == "llm"
result = run_simulation(
n_users=n_users,
n_rounds=n_rounds,
tasks_per_round=tasks_per_round,
ml_url=ml_url,
policies=policies,
use_llm=use_llm,
seed=42,
mlflow_url=mlflow_url or None,
mlflow_experiment=mlflow_experiment,
)
return result
def _callback(**context: object) -> None:
import httpx
conf: dict = context["dag_run"].conf or {}
callback_url: str = str(conf.get("callback_url", ""))
internal_token: str = str(conf.get("internal_token", ""))
if not callback_url or not internal_token:
print("No callback_url or internal_token — skipping result push.", flush=True)
return
result: dict = context["ti"].xcom_pull(task_ids="run_sim")
if not result:
print("No result from run_sim task — callback skipped.", flush=True)
return
payload = {
"summary": result.get("summary", {}),
"winner": result.get("winner", ""),
"persona_breakdown": result.get("persona_breakdown", {}),
"events": result.get("events", []),
"mlflow_run_id": result.get("mlflow_run_id"),
}
try:
r = httpx.post(
callback_url,
json=payload,
headers={"x-internal-token": internal_token},
timeout=30.0,
)
r.raise_for_status()
print(f"Callback OK: {r.status_code}", flush=True)
except Exception as exc:
print(f"Callback failed: {exc}", flush=True)
raise
with DAG(
dag_id="bandit_sim",
description="On-demand bandit policy simulation with MLflow tracking",
schedule_interval=None,
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["bandit", "simulation", "ml"],
default_args={
"retries": 1,
"retry_delay": timedelta(minutes=2),
},
) as dag:
run_sim = PythonOperator(
task_id="run_sim",
python_callable=_run_sim,
provide_context=True,
)
push_results = PythonOperator(
task_id="push_results",
python_callable=_callback,
provide_context=True,
)
run_sim >> push_results

View File

@@ -26,9 +26,11 @@ from __future__ import annotations
import json import json
import math import math
import os import os
import sys
import time import time
from collections import deque from collections import deque
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Optional, Deque from typing import Optional, Deque
@@ -43,7 +45,17 @@ from starlette.middleware.base import BaseHTTPMiddleware
import logging_config import logging_config
import nats_consumer import nats_consumer
from prompts import get_prompt from prompts import get_prompt, build_orchestrator_messages
# Make ml.agents importable regardless of working directory.
# In Docker (WORKDIR=/app/ml/serving, PYTHONPATH=/app): /app already on path.
# In local dev (run from ml/serving/): repo root is two levels up.
_repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
if _repo_root not in sys.path:
sys.path.insert(0, _repo_root)
from ml.agents.base import AgentInput # noqa: E402
from ml.agents.registry import get_agent, all_agents # noqa: E402
logging_config.configure() logging_config.configure()
@@ -350,12 +362,61 @@ class GenerateResponse(BaseModel):
completion_tokens: int = 0 completion_tokens: int = 0
# ── Multi-agent models ─────────────────────────────────────────────────────
class AgentComputeRequest(BaseModel):
user_id: str
tasks: list[dict] = []
profile: dict[str, Optional[float]] = {}
feedback_history: list[dict] = []
now_iso: Optional[str] = None # ISO 8601; defaults to utcnow
class AgentComputeResponse(BaseModel):
user_id: str
agent_id: str
prompt_text: str
signals_snapshot: dict
computed_at: str
expires_at: str
agent_version: str
class AgentOutputSnippet(BaseModel):
agent_id: str
prompt_text: str
class RecommendRequest(BaseModel):
user_id: str
agent_outputs: list[AgentOutputSnippet] = []
tasks: list[dict] = []
hour_of_day: int = 12
day_of_week: int = 0
class TipResult(BaseModel):
id: str
content: str
source: str = "llm"
kind: str = "advice"
rationale: Optional[str] = None
class RecommendResponse(BaseModel):
tip: TipResult
model: str
prompt_tokens: int = 0
completion_tokens: int = 0
# ── Endpoints ────────────────────────────────────────────────────────────── # ── Endpoints ──────────────────────────────────────────────────────────────
@app.get("/health") @app.get("/health")
def health(): def health():
return { return {
"ok": True, "ok": True,
"agents": [a.agent_id for a in all_agents()],
"nats": { "nats": {
"enabled": bool(nats_consumer.NATS_URL), "enabled": bool(nats_consumer.NATS_URL),
"consumers": nats_consumer.consumer_health, "consumers": nats_consumer.consumer_health,
@@ -368,6 +429,137 @@ _RETRY_SUFFIX = (
"Reply ONLY with the JSON array — no prose, no markdown fences." "Reply ONLY with the JSON array — no prose, no markdown fences."
) )
_RETRY_SUFFIX_OBJ = (
"\n\nYour previous response was not valid JSON. "
"Reply ONLY with the JSON object — no prose, no markdown fences."
)
@app.post("/agents/{agent_id}/compute", response_model=AgentComputeResponse)
async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentComputeResponse:
"""Run a single sub-agent for a user and return its prompt snippet.
Called by the precompute pipeline for each (user_id, agent_id) pair.
The caller is responsible for persisting the result to agent_outputs via the
TypeScript API callback.
"""
try:
agent = get_agent(agent_id)
except KeyError:
raise HTTPException(status_code=404, detail=f"Unknown agent: {agent_id!r}")
now = (
datetime.fromisoformat(req.now_iso.replace("Z", "+00:00"))
if req.now_iso
else datetime.now(timezone.utc)
)
if now.tzinfo is None:
now = now.replace(tzinfo=timezone.utc)
inp = AgentInput(
user_id=req.user_id,
tasks=req.tasks,
profile=req.profile,
feedback_history=req.feedback_history,
now=now,
)
try:
output = agent.compute(inp)
except Exception as exc:
log.error("agent_compute_failed", agent_id=agent_id, user_id=req.user_id, error=str(exc))
raise HTTPException(status_code=500, detail=f"Agent compute failed: {exc}")
log.info("agent_computed", agent_id=agent_id, user_id=req.user_id, expires_at=output.expires_at)
return AgentComputeResponse(
user_id=output.user_id,
agent_id=output.agent_id,
prompt_text=output.prompt_text,
signals_snapshot=output.signals_snapshot,
computed_at=output.computed_at,
expires_at=output.expires_at,
agent_version=output.agent_version,
)
@app.post("/recommend", response_model=RecommendResponse)
async def recommend(req: RecommendRequest) -> RecommendResponse:
"""Orchestrator: combine pre-computed agent outputs into one tip via LLM.
Called in real time when a user requests a tip. agent_outputs should be
the fresh rows from agent_outputs table (fetched by the TypeScript recommender
before calling this endpoint). Falls back to raw task context if empty.
"""
messages = build_orchestrator_messages(
agent_outputs=[s.model_dump() for s in req.agent_outputs],
tasks=req.tasks,
hour_of_day=req.hour_of_day,
day_of_week=req.day_of_week,
)
headers = {"Authorization": f"Bearer {LITELLM_MASTER_KEY}"}
last_raw = ""
last_parse_error = ""
total_usage: dict = {"prompt_tokens": 0, "completion_tokens": 0}
model_used = "tip-generator"
async with httpx.AsyncClient(timeout=30.0) as client:
for _attempt in range(1 + _MAX_GENERATE_RETRIES):
payload = {"model": "tip-generator", "messages": messages, "temperature": 0.7}
try:
resp = await client.post(
f"{LITELLM_URL}/chat/completions", json=payload, headers=headers
)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
raise HTTPException(status_code=502, detail=f"LiteLLM error: {e.response.text}")
except httpx.RequestError as e:
raise HTTPException(status_code=503, detail=f"LiteLLM unreachable: {e}")
data = resp.json()
usage = data.get("usage", {})
total_usage["prompt_tokens"] += usage.get("prompt_tokens", 0)
total_usage["completion_tokens"] += usage.get("completion_tokens", 0)
model_used = data.get("model", "tip-generator")
last_raw = data["choices"][0]["message"]["content"]
try:
text = last_raw.strip()
if text.startswith("```"):
parts = text.split("```")
text = parts[1] if len(parts) > 1 else text
if text.startswith("json"):
text = text[4:]
parsed = json.loads(text)
item: dict = parsed[0] if isinstance(parsed, list) else parsed
break
except (json.JSONDecodeError, ValueError, IndexError) as exc:
last_parse_error = str(exc)
messages.append({"role": "assistant", "content": last_raw})
messages.append({"role": "user", "content": _RETRY_SUFFIX_OBJ})
else:
raise HTTPException(
status_code=502,
detail=f"LLM returned invalid JSON after {_MAX_GENERATE_RETRIES} retries: "
f"{last_parse_error}\n{last_raw[:200]}",
)
tip = TipResult(
id=item.get("id", f"tip-{req.user_id[:8]}"),
content=item.get("content", ""),
rationale=item.get("rationale"),
)
log.info(
"recommend_served",
user_id=req.user_id,
agent_count=len(req.agent_outputs),
tip_id=tip.id,
)
return RecommendResponse(
tip=tip,
model=model_used,
prompt_tokens=total_usage["prompt_tokens"],
completion_tokens=total_usage["completion_tokens"],
)
_MAX_GENERATE_RETRIES = 2 _MAX_GENERATE_RETRIES = 2

View File

@@ -28,13 +28,20 @@ POST /api/push/subscribe
DELETE /api/push/subscribe DELETE /api/push/subscribe
GET /api/admin/stats DAU/WAU, feedback breakdown GET /api/admin/stats DAU/WAU, feedback breakdown
GET /api/admin/users GET /api/admin/users user list with pagination
GET /api/admin/events recent event stream (ring buffer) GET /api/user/:id user detail, consents, integrations
GET /api/admin/events recent event stream (ring buffer or NATS JetStream)
GET /api/admin/events/history historical event query (time range, filters)
GET /api/admin/sim/runs offline sim run list GET /api/admin/sim/runs offline sim run list
POST /api/admin/sim/run launch offline sim POST /api/admin/sim/run launch offline sim with policy/judge params
GET /api/admin/sim/runs/:id/output tail sim stdout GET /api/admin/sim/runs/:id/output tail sim stdout
... GET /api/admin/features/:userId per-user profile features + freshness
GET /api/admin/features/:userId/context context features for last score call
POST /api/admin/policies list shadow policies + active policy
POST /api/admin/policies/:name/toggle enable/disable shadow policy
POST /api/admin/users/:id/actions revoke-integration, reset-bandit, rebuild-profile
GET /api/admin/health system health: api, ml/serving, db, bus, mlflow
GET /api/admin/docs admin documentation index
GET /api/ml/* admin-only proxy to ml/serving GET /api/ml/* admin-only proxy to ml/serving
``` ```

View File

@@ -35,11 +35,8 @@ export const config = {
LITELLM_URL: optional('LITELLM_URL', 'http://localhost:4000'), LITELLM_URL: optional('LITELLM_URL', 'http://localhost:4000'),
MLFLOW_URL: optional('MLFLOW_URL', 'http://localhost:5000'), MLFLOW_URL: optional('MLFLOW_URL', 'http://localhost:5000'),
AIRFLOW_URL: optional('AIRFLOW_URL', 'http://localhost:8080'),
AIRFLOW_API_USER: optional('AIRFLOW_API_USER', 'admin'),
AIRFLOW_API_PASSWORD: optional('AIRFLOW_API_PASSWORD', 'admin'),
/** Shared secret for internal Airflow→API callbacks. */ /** Shared secret for internal API callbacks. */
INTERNAL_API_TOKEN: optional('INTERNAL_API_TOKEN', ''), INTERNAL_API_TOKEN: optional('INTERNAL_API_TOKEN', ''),
/** Static token for automated/service access to the admin panel (e.g. Playwright tests). */ /** Static token for automated/service access to the admin panel (e.g. Playwright tests). */

View File

@@ -143,6 +143,19 @@ export function runMigrations() {
day_of_week INTEGER NOT NULL, day_of_week INTEGER NOT NULL,
created_at TEXT NOT NULL created_at TEXT NOT NULL
); );
CREATE TABLE IF NOT EXISTS agent_outputs (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL REFERENCES users(id),
agent_id TEXT NOT NULL,
prompt_text TEXT NOT NULL,
signals_snapshot TEXT,
computed_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
agent_version TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_agent_outputs_user_agent_exp
ON agent_outputs(user_id, agent_id, expires_at DESC);
`); `);
// Additive column migrations — safe to run on existing DBs. // Additive column migrations — safe to run on existing DBs.
@@ -156,7 +169,6 @@ export function runMigrations() {
`ALTER TABLE tip_scores ADD COLUMN prompt_version TEXT`, `ALTER TABLE tip_scores ADD COLUMN prompt_version TEXT`,
`ALTER TABLE tip_scores ADD COLUMN llm_model TEXT`, `ALTER TABLE tip_scores ADD COLUMN llm_model TEXT`,
`ALTER TABLE tip_scores ADD COLUMN tip_kind TEXT`, `ALTER TABLE tip_scores ADD COLUMN tip_kind TEXT`,
`ALTER TABLE sim_runs ADD COLUMN airflow_dag_run_id TEXT`,
`ALTER TABLE sim_runs ADD COLUMN mlflow_run_id TEXT`, `ALTER TABLE sim_runs ADD COLUMN mlflow_run_id TEXT`,
`ALTER TABLE sim_runs ADD COLUMN judge_mode TEXT NOT NULL DEFAULT 'rule'`, `ALTER TABLE sim_runs ADD COLUMN judge_mode TEXT NOT NULL DEFAULT 'rule'`,
`ALTER TABLE sim_runs ADD COLUMN n_policies INTEGER NOT NULL DEFAULT 2`, `ALTER TABLE sim_runs ADD COLUMN n_policies INTEGER NOT NULL DEFAULT 2`,

View File

@@ -117,7 +117,6 @@ export const simRuns = sqliteTable('sim_runs', {
summaryJson: text('summary_json'), // JSON: { [policy]: PolicySummary } summaryJson: text('summary_json'), // JSON: { [policy]: PolicySummary }
winner: text('winner'), winner: text('winner'),
personaBreakdownJson: text('persona_breakdown_json'), // JSON: { [persona]: { [policy]: {reward,n} } } personaBreakdownJson: text('persona_breakdown_json'), // JSON: { [persona]: { [policy]: {reward,n} } }
airflowDagRunId: text('airflow_dag_run_id'),
mlflowRunId: text('mlflow_run_id'), mlflowRunId: text('mlflow_run_id'),
createdAt: text('created_at').notNull(), createdAt: text('created_at').notNull(),
finishedAt: text('finished_at'), finishedAt: text('finished_at'),
@@ -142,6 +141,20 @@ export const simEvents = sqliteTable('sim_events', {
createdAt: text('created_at').notNull(), createdAt: text('created_at').notNull(),
}); });
// ── Agent outputs (#multi-agent) ─────────────────────────────────────────────
// One row per (userId, agentId) pre-compute run. The orchestrator reads the
// freshest non-expired row per agent when assembling the tip prompt.
export const agentOutputs = sqliteTable('agent_outputs', {
id: text('id').primaryKey(),
userId: text('user_id').notNull().references(() => users.id),
agentId: text('agent_id').notNull(), // e.g. 'overdue-task'
promptText: text('prompt_text').notNull(), // snippet for orchestrator prompt
signalsSnapshot: text('signals_snapshot'), // JSON: inputs the agent consumed
computedAt: text('computed_at').notNull(), // ISO 8601
expiresAt: text('expires_at').notNull(), // ISO 8601 = computedAt + TTL
agentVersion: text('agent_version').notNull(), // bump to invalidate on logic changes
});
// Admin saved SQL queries. // Admin saved SQL queries.
export const savedQueries = sqliteTable('saved_queries', { export const savedQueries = sqliteTable('saved_queries', {
id: text('id').primaryKey(), id: text('id').primaryKey(),

View File

@@ -389,7 +389,7 @@ describe('GET /api/admin/events', () => {
// Health endpoint — mock fetch so tests don't depend on running services. // Health endpoint — mock fetch so tests don't depend on running services.
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
describe('GET /api/admin/health', () => { describe('GET /api/admin/health', () => {
const EXPECTED_HTTP_SERVICES = ['api', 'ml-serving', 'mlflow', 'airflow'] as const; const EXPECTED_HTTP_SERVICES = ['api', 'ml-serving', 'mlflow'] as const;
const EXPECTED_INTERNAL = ['sqlite', 'event-bus'] as const; const EXPECTED_INTERNAL = ['sqlite', 'event-bus'] as const;
const VALID_STATUSES = new Set(['ok', 'degraded', 'down']); const VALID_STATUSES = new Set(['ok', 'degraded', 'down']);
@@ -404,7 +404,6 @@ describe('GET /api/admin/health', () => {
let name: string; let name: string;
if (s.includes(':8000')) name = 'ml-serving'; if (s.includes(':8000')) name = 'ml-serving';
else if (s.includes(':5000')) name = 'mlflow'; else if (s.includes(':5000')) name = 'mlflow';
else if (s.includes(':8080')) name = 'airflow';
else name = 'api'; else name = 'api';
if (!upServices.has(name)) throw new Error(`ECONNREFUSED ${name}`); if (!upServices.has(name)) throw new Error(`ECONNREFUSED ${name}`);
@@ -415,7 +414,7 @@ describe('GET /api/admin/health', () => {
afterEach(() => vi.unstubAllGlobals()); afterEach(() => vi.unstubAllGlobals());
it('shape: 200, typed fields, all expected services present', async () => { it('shape: 200, typed fields, all expected services present', async () => {
mockFetch(new Set(['api', 'ml-serving', 'mlflow', 'airflow'])); mockFetch(new Set(['api', 'ml-serving', 'mlflow']));
const { server, call } = await startServer(buildApp()); const { server, call } = await startServer(buildApp());
try { try {
const { status, body } = await call('GET', '/api/admin/health'); const { status, body } = await call('GET', '/api/admin/health');
@@ -440,7 +439,7 @@ describe('GET /api/admin/health', () => {
}); });
it('ok=true when all HTTP services respond 200', async () => { it('ok=true when all HTTP services respond 200', async () => {
mockFetch(new Set(['api', 'ml-serving', 'mlflow', 'airflow'])); mockFetch(new Set(['api', 'ml-serving', 'mlflow']));
const { server, call } = await startServer(buildApp()); const { server, call } = await startServer(buildApp());
try { try {
const { body } = await call('GET', '/api/admin/health'); const { body } = await call('GET', '/api/admin/health');
@@ -456,7 +455,7 @@ describe('GET /api/admin/health', () => {
}); });
it('ml-serving=down and ok=false when ml-serving is unreachable', async () => { it('ml-serving=down and ok=false when ml-serving is unreachable', async () => {
mockFetch(new Set(['api', 'mlflow', 'airflow'])); // ml-serving absent mockFetch(new Set(['api', 'mlflow'])); // ml-serving absent
const { server, call } = await startServer(buildApp()); const { server, call } = await startServer(buildApp());
try { try {
const { body } = await call('GET', '/api/admin/health'); const { body } = await call('GET', '/api/admin/health');
@@ -469,22 +468,8 @@ describe('GET /api/admin/health', () => {
} }
}); });
it('airflow=down and ok=false when airflow is unreachable', async () => {
mockFetch(new Set(['api', 'ml-serving', 'mlflow'])); // airflow absent
const { server, call } = await startServer(buildApp());
try {
const { body } = await call('GET', '/api/admin/health');
const b = body as HealthBody;
const svc = b.services.find((s) => s.name === 'airflow');
expect(svc?.status).toBe('down');
expect(b.ok).toBe(false);
} finally {
server.close();
}
});
it('mlflow=down and ok=false when mlflow is unreachable', async () => { it('mlflow=down and ok=false when mlflow is unreachable', async () => {
mockFetch(new Set(['api', 'ml-serving', 'airflow'])); // mlflow absent mockFetch(new Set(['api', 'ml-serving'])); // mlflow absent
const { server, call } = await startServer(buildApp()); const { server, call } = await startServer(buildApp());
try { try {
const { body } = await call('GET', '/api/admin/health'); const { body } = await call('GET', '/api/admin/health');

View File

@@ -524,14 +524,10 @@ router.get('/data-quality', async (req: AuthenticatedRequest, res: Response) =>
// Fan-out to all subsystem /health endpoints. // Fan-out to all subsystem /health endpoints.
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
router.get('/health', async (_req: AuthenticatedRequest, res: Response) => { router.get('/health', async (_req: AuthenticatedRequest, res: Response) => {
const airflowAuth = Buffer.from(`${config.AIRFLOW_API_USER}:${config.AIRFLOW_API_PASSWORD}`).toString('base64');
const checks: Array<{ name: string; url: string; headers?: Record<string, string> }> = [ const checks: Array<{ name: string; url: string; headers?: Record<string, string> }> = [
{ name: 'api', url: `http://localhost:${config.PORT}/health` }, { name: 'api', url: `http://localhost:${config.PORT}/health` },
{ name: 'ml-serving', url: `${config.ML_SERVING_URL}/health` }, { name: 'ml-serving', url: `${config.ML_SERVING_URL}/health` },
{ name: 'mlflow', url: `${config.MLFLOW_URL}/health` }, { name: 'mlflow', url: `${config.MLFLOW_URL}/health` },
{ name: 'airflow', url: `${config.AIRFLOW_URL}/api/v1/health`,
headers: { Authorization: `Basic ${airflowAuth}` } },
]; ];
const results = await Promise.allSettled( const results = await Promise.allSettled(
@@ -705,8 +701,7 @@ router.delete('/saved-queries/:id', async (req: AuthenticatedRequest, res: Respo
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// POST /api/admin/simulate/start // POST /api/admin/simulate/start
// Trigger an Airflow DAG run (bandit_sim). Falls back to a local subprocess // Trigger a bandit_sim run via local subprocess.
// when AIRFLOW_URL is not reachable, so local dev still works.
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
router.post('/simulate/start', async (req: AuthenticatedRequest, res: Response) => { router.post('/simulate/start', async (req: AuthenticatedRequest, res: Response) => {
const { const {
@@ -745,56 +740,7 @@ router.post('/simulate/start', async (req: AuthenticatedRequest, res: Response)
createdAt: now, createdAt: now,
}); });
// ── Try Airflow first ──────────────────────────────────────────────────── // ── Subprocess ───────────────────────────────────────────────────────────
if (config.AIRFLOW_URL && config.INTERNAL_API_TOKEN) {
try {
const airflowAuth = Buffer.from(
`${config.AIRFLOW_API_USER}:${config.AIRFLOW_API_PASSWORD}`,
).toString('base64');
const dagRes = await fetch(
`${config.AIRFLOW_URL}/api/v1/dags/bandit_sim/dagRuns`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Basic ${airflowAuth}`,
},
body: JSON.stringify({
conf: {
sim_run_id: id,
n_users: nUsers,
n_rounds: nRounds,
tasks_per_round: tasksPerRound,
policies,
judge_mode: judgeMode,
ml_url: config.ML_SERVING_URL,
mlflow_url: config.MLFLOW_URL,
callback_url: `${config.API_BASE_URL}/api/admin/simulate/${id}/complete`,
internal_token: config.INTERNAL_API_TOKEN,
},
}),
signal: AbortSignal.timeout(5000),
},
);
if (dagRes.ok) {
const dagBody = await dagRes.json() as { dag_run_id: string };
await db
.update(simRuns)
.set({ airflowDagRunId: dagBody.dag_run_id })
.where(eq(simRuns.id, id));
res.json({ id, status: 'running', airflow_dag_run_id: dagBody.dag_run_id });
return;
}
logger.warn({ status: dagRes.status }, 'sim: Airflow trigger failed, falling back to subprocess');
} catch (err) {
logger.warn({ err }, 'sim: Airflow unreachable, falling back to subprocess');
}
}
// ── Subprocess fallback (local dev / Airflow not configured) ────────────
const runnerPath = resolve(__dirname, '../../../../ml/experiments/sim/runner.py'); const runnerPath = resolve(__dirname, '../../../../ml/experiments/sim/runner.py');
const venvPython = resolve(__dirname, '../../../../ml/serving/.venv/bin/python'); const venvPython = resolve(__dirname, '../../../../ml/serving/.venv/bin/python');
const pythonBin = existsSync(venvPython) ? venvPython : 'python3'; const pythonBin = existsSync(venvPython) ? venvPython : 'python3';

View File

@@ -0,0 +1,220 @@
import { Router } from 'express';
import { nanoid } from 'nanoid';
import { db } from '../db/index.js';
import { agentOutputs, tipFeedback, tipViews } from '../db/schema.js';
import { eq, and, gt, lt } from 'drizzle-orm';
import { config } from '../config.js';
import { getProfile } from '../profile/builder.js';
import { todoistSource } from '../signals/todoist.js';
import { SignalAggregator } from '../signals/aggregator.js';
import type { Request, Response } from 'express';
const router = Router();
// Separate aggregator instance — avoids circular dep with recommender.ts.
const _agentAggregator = new SignalAggregator().register(todoistSource);
// ── Internal auth helper ──────────────────────────────────────────────────────
function checkInternalToken(req: Request, res: Response): boolean {
const token = req.headers['x-internal-token'];
if (!config.INTERNAL_API_TOKEN || token !== config.INTERNAL_API_TOKEN) {
res.status(401).json({ error: 'Unauthorized' });
return false;
}
return true;
}
// ── DB helpers ────────────────────────────────────────────────────────────────
export async function getActiveAgentOutputs(userId: string) {
const now = new Date().toISOString();
return db
.select()
.from(agentOutputs)
.where(and(eq(agentOutputs.userId, userId), gt(agentOutputs.expiresAt, now)));
}
async function storeAgentOutput(output: {
user_id: string;
agent_id: string;
prompt_text: string;
signals_snapshot?: unknown;
computed_at: string;
expires_at: string;
agent_version: string;
}) {
await db
.delete(agentOutputs)
.where(and(eq(agentOutputs.userId, output.user_id), eq(agentOutputs.agentId, output.agent_id)));
await db.insert(agentOutputs).values({
id: nanoid(),
userId: output.user_id,
agentId: output.agent_id,
promptText: output.prompt_text,
signalsSnapshot: output.signals_snapshot ? JSON.stringify(output.signals_snapshot) : null,
computedAt: output.computed_at,
expiresAt: output.expires_at,
agentVersion: output.agent_version,
});
}
// ── GET /api/agents/active-users ──────────────────────────────────────────────
// Returns user IDs that have requested a tip in the last 48 hours.
// Returns user IDs for fan-out precompute tasks.
router.get('/active-users', async (req: Request, res: Response) => {
if (!checkInternalToken(req, res)) return;
const cutoff = new Date(Date.now() - 48 * 60 * 60 * 1000).toISOString();
try {
const rows = await db
.selectDistinct({ userId: tipViews.userId })
.from(tipViews)
.where(gt(tipViews.servedAt, cutoff));
res.json({ user_ids: rows.map((r) => r.userId) });
} catch (err: any) {
res.status(500).json({ error: err.message });
}
});
// ── POST /api/agents/:agentId/compute ─────────────────────────────────────────
// Orchestrating endpoint for per-(user, agent) compute tasks.
// Fetches all signals, calls ml/serving /agents/{agentId}/compute, stores result.
// Body: { user_id: string }
router.post('/:agentId/compute', async (req: Request, res: Response) => {
if (!checkInternalToken(req, res)) return;
const { agentId } = req.params as { agentId: string };
const { user_id } = req.body as { user_id: string };
if (!user_id) {
res.status(422).json({ error: 'Missing user_id' });
return;
}
try {
// Fetch tasks via Todoist integration (gracefully empty if not connected).
let tasks: object[] = [];
try {
const signals = await _agentAggregator.fetchAll(user_id);
tasks = signals.map((s) => ({
id: s.id,
content: s.content,
priority: (s.features.priority as number) ?? 1,
is_overdue: Boolean(s.features.is_overdue),
task_age_days: (s.features.task_age_days as number) ?? 0,
project_id: (s.metadata as Record<string, unknown>).project_id ?? null,
}));
} catch {
// No integration or fetch error — agents that need tasks will report "no tasks"
}
// Fetch profile features (lazy-refreshed from DB).
let profile: Record<string, number | null> = {};
try {
profile = await getProfile(user_id);
} catch {}
// Fetch last 7 days of feedback for RecentPatternsAgent.
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString();
const feedbackRows = await db
.select({ action: tipFeedback.action, dwellMs: tipFeedback.dwellMs, createdAt: tipFeedback.createdAt })
.from(tipFeedback)
.where(and(eq(tipFeedback.userId, user_id), gt(tipFeedback.createdAt, sevenDaysAgo)));
const feedbackHistory = feedbackRows.map((f) => ({
action: f.action,
dwell_ms: f.dwellMs,
created_at: f.createdAt,
}));
// Call ml/serving to run the agent.
const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ user_id, tasks, profile, feedback_history: feedbackHistory }),
signal: AbortSignal.timeout(15_000),
});
if (!mlResp.ok) {
const detail = await mlResp.text().catch(() => '');
res.status(502).json({ error: `ml/serving returned ${mlResp.status}`, detail });
return;
}
const output = await mlResp.json() as {
user_id: string; agent_id: string; prompt_text: string;
signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string;
};
await storeAgentOutput(output);
res.json({ ok: true, agent_id: output.agent_id, user_id: output.user_id, expires_at: output.expires_at });
} catch (err: any) {
res.status(500).json({ error: err.message });
}
});
// ── POST /api/agents/outputs ──────────────────────────────────────────────────
// Stores a pre-computed agent output directly (used if the DAG calls ml/serving
// itself and pushes the result separately).
router.post('/outputs', async (req: Request, res: Response) => {
if (!checkInternalToken(req, res)) return;
const { user_id, agent_id, prompt_text, signals_snapshot, computed_at, expires_at, agent_version } =
req.body as Record<string, string>;
if (!user_id || !agent_id || !prompt_text || !computed_at || !expires_at || !agent_version) {
res.status(422).json({
error: 'Missing required fields: user_id, agent_id, prompt_text, computed_at, expires_at, agent_version',
});
return;
}
try {
await storeAgentOutput({ user_id, agent_id, prompt_text, signals_snapshot, computed_at, expires_at, agent_version });
res.json({ ok: true });
} catch (err: any) {
res.status(500).json({ error: err.message });
}
});
// ── DELETE /api/agents/outputs/expired ───────────────────────────────────────
// Purges rows expired more than 24 hours ago.
router.delete('/outputs/expired', async (req: Request, res: Response) => {
if (!checkInternalToken(req, res)) return;
const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
try {
await db.delete(agentOutputs).where(lt(agentOutputs.expiresAt, cutoff));
res.json({ ok: true });
} catch (err: any) {
res.status(500).json({ error: err.message });
}
});
// ── GET /api/agents/:userId/outputs ──────────────────────────────────────────
// Returns non-expired agent outputs. Admin observability; recommender calls
// getActiveAgentOutputs() directly (no HTTP hop).
router.get('/:userId/outputs', async (req: Request, res: Response) => {
const { userId } = req.params as { userId: string };
try {
const rows = await getActiveAgentOutputs(userId);
res.json({
user_id: userId,
outputs: rows.map((r) => ({
agent_id: r.agentId,
prompt_text: r.promptText,
computed_at: r.computedAt,
expires_at: r.expiresAt,
agent_version: r.agentVersion,
})),
});
} catch (err: any) {
res.status(500).json({ error: err.message });
}
});
export default router;

View File

@@ -18,10 +18,6 @@ const MLFLOW_URL = process.env.MLFLOW_URL || "http://mlflow:5000";
const MLFLOW_USER = process.env.MLFLOW_TRACKING_USERNAME || "admin"; const MLFLOW_USER = process.env.MLFLOW_TRACKING_USERNAME || "admin";
const MLFLOW_PASS = process.env.MLFLOW_TRACKING_PASSWORD || "password"; const MLFLOW_PASS = process.env.MLFLOW_TRACKING_PASSWORD || "password";
const AIRFLOW_URL = process.env.AIRFLOW_URL || "http://airflow-webserver:8080";
const AIRFLOW_USER = process.env.AIRFLOW_API_USER || "admin";
const AIRFLOW_PASS = process.env.AIRFLOW_API_PASSWORD || "admin";
// Wrapper for MLflow REST calls with Host header fix // Wrapper for MLflow REST calls with Host header fix
async function mlflowFetch( async function mlflowFetch(
path: string, path: string,
@@ -65,44 +61,6 @@ router.get("/experiments", async (req: Request, res: Response) => {
} }
}); });
// POST /api/bench/run — trigger benchmark DAG
router.post("/run", async (req: Request, res: Response) => {
try {
const config = req.body || {};
const experiment = config.experiment || "tip-bench-admin";
const dagRunUrl = new URL("/api/v1/dags/bench_collect/dagRuns", AIRFLOW_URL);
const auth = Buffer.from(`${AIRFLOW_USER}:${AIRFLOW_PASS}`).toString(
"base64"
);
const response = await fetch(dagRunUrl.toString(), {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Basic ${auth}`,
},
body: JSON.stringify({
conf: config,
dag_run_id: `bench-${Date.now()}`,
}),
});
if (!response.ok) {
throw new Error(`Airflow ${response.status}: ${response.statusText}`);
}
const result = await response.json();
res.json({
status: "triggered",
dag_run_id: result.dag_run_id,
experiment,
});
} catch (err) {
res.status(500).json({ error: String(err) });
}
});
// GET /api/bench/runs/:experiment — list runs in an experiment // GET /api/bench/runs/:experiment — list runs in an experiment
router.get("/runs/:experiment", async (req: Request, res: Response) => { router.get("/runs/:experiment", async (req: Request, res: Response) => {
try { try {