diff --git a/ml/experiments/bench/AIRFLOW.md b/ml/experiments/bench/AIRFLOW.md new file mode 100644 index 0000000..eb1e5a4 --- /dev/null +++ b/ml/experiments/bench/AIRFLOW.md @@ -0,0 +1,90 @@ +# Airflow Integration — `bench_collect` DAG + +The benchmark harness integrates with Airflow as a DAG (`ml/pipelines/bench_dag.py`) +triggered on-demand from the admin UI or the CLI. + +## DAG Structure + +Three linked tasks: + +1. **`collect`** — `collect.py` generates candidates per (model × prompt × scenario) cell, + logs MLflow runs with `judge_pending=true`. Rejects models >4B, uses `keep_alive=0` + for RAM safety. + +2. **`export_for_judge`** — `judge_cli.py --export` pulls pending runs into a single + JSON file for Claude Code to score per the rubric. XCom-pushes the path so the + next task can find it. + +3. **`compare`** — `compare.py` aggregates scores by (model, prompt) cell and + generates the leaderboard ranked by composite score. + +## Triggering from the CLI + +```bash +# Minimal: use all defaults +airflow dags trigger bench_collect + +# Custom config: specify models, prompts, scenario count +airflow dags trigger bench_collect --conf '{ + "models": "qwen2.5:0.5b,qwen2.5:1.5b", + "prompts": "v1,v2-mentor", + "n_tips": 5, + "n_scenarios": 2, + "temperature": 0.7, + "experiment": "tip-bench-custom" +}' +``` + +## Triggering from the Admin UI + +The API exposes: + +``` +POST /api/bench/run { config object } +``` + +Admin UI → Benchmark panel → "Run Collection" button → form dialog fills config → +POST to `/api/bench/run` → DAG triggered. + +## Configuration Keys + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `models` | str | `qwen2.5:0.5b,qwen2.5:1.5b,gemma3:1b,llama3.2:3b` | comma-separated Ollama tags | +| `prompts` | str | `v1,v2-mentor,v3-few-shot` | comma-separated prompt versions | +| `n_tips` | int | 5 | candidates to generate per scenario | +| `n_scenarios` | int | 0 | cap scenario count (0 = all 8) | +| `temperature` | float | 0.7 | LLM generation temperature | +| `experiment` | str | `tip-bench-auto` | MLflow experiment name | +| `max_model_b` | float | 4.0 | reject models larger than this (in billions) | +| `ollama_url` | str | `http://localhost:11434` | Ollama endpoint | +| `mlflow_url` | str | `$MLFLOW_TRACKING_URI` or `http://localhost:5000` | MLflow tracking URI | + +## Human-in-the-Loop Judge + +After `collect` finishes, `export_for_judge` produces a JSON file with all pending +runs. The Claude Code session: + +1. Reads the file +2. Scores each candidate per the rubric (relevance/actionability/tone 1–5) +3. Runs `judge_cli.py --apply /path/to/file.json` to write scores back to MLflow + +Then `compare` generates the leaderboard. + +**Future enhancement:** Add a webhook or admin UI button to trigger the judge step +so the entire pipeline is end-to-end in Airflow, not requiring manual Claude Code +intervention. + +## Monitoring + +- **Airflow UI**: `http://localhost:8080` → DAGs → `bench_collect` → graph view +- **MLflow UI**: `http://localhost:5000/mlflow` → experiments → `tip-bench-*` +- **Admin API**: `GET /api/bench/leaderboard/tip-bench-auto` → JSON leaderboard + +## Future: Admin UI Panel + +`apps/admin/src/components/BenchPanel.tsx` (TBD): +- List experiments +- Trigger DAG with form (models, prompts, scenario count, temperature) +- Display current DAG run status +- Show leaderboard once `compare` completes diff --git a/ml/pipelines/bench_dag.py b/ml/pipelines/bench_dag.py new file mode 100644 index 0000000..d901c0b --- /dev/null +++ b/ml/pipelines/bench_dag.py @@ -0,0 +1,168 @@ +""" +Airflow DAG: bench_collect + +Runs the tip-generation benchmark (model × prompt evaluation). Triggered +on-demand from the admin UI or manually, collects candidates per cell, +exports for Claude Code judgment, and generates a leaderboard. + +Mirrors the manual flow: + + 1. collect.py → generates candidates, logs to MLflow with judge_pending=true + 2. (human: judge_cli.py --export, Claude Code scores, judge_cli.py --apply) + 3. compare.py → leaderboard + +For now, steps 2 is manual. Future: add a webhook to trigger the human +judge from the admin UI or set up an async task queue. + +Required conf keys (passed via dag_run.conf): + models str — comma-separated model tags (e.g. "qwen2.5:0.5b,qwen2.5:1.5b") + prompts str — comma-separated prompt versions (default: "v1,v2-mentor,v3-few-shot") + n_tips int — candidates to generate per scenario (default: 5) + n_scenarios int — cap scenario count; 0 = all (default: 0) + temperature float — LLM generation temperature (default: 0.7) + experiment str — MLflow experiment name (default: "tip-bench-auto") + max_model_b float — reject models larger than this (default: 4.0) + ollama_url str — Ollama endpoint (default: http://localhost:11434) + mlflow_url str — MLflow tracking URI (env MLFLOW_TRACKING_URI or http://localhost:5000) +""" + +from __future__ import annotations + +import json +import os +import sys +from datetime import datetime, timedelta +from pathlib import Path + +from airflow import DAG +from airflow.operators.python import PythonOperator + + +def _collect(**context: object) -> dict: + """Run collect.py with the provided config.""" + conf: dict = context["dag_run"].conf or {} + + models = str(conf.get("models", "qwen2.5:0.5b,qwen2.5:1.5b,gemma3:1b,llama3.2:3b")) + prompts = str(conf.get("prompts", "v1,v2-mentor,v3-few-shot")) + n_tips = int(conf.get("n_tips", 5)) + n_scenarios = int(conf.get("n_scenarios", 0)) + temperature = float(conf.get("temperature", 0.7)) + experiment = str(conf.get("experiment", "tip-bench-auto")) + max_model_b = float(conf.get("max_model_b", 4.0)) + ollama_url = str(conf.get("ollama_url", os.environ.get("OLLAMA_URL", "http://localhost:11434"))) + mlflow_url = str(conf.get("mlflow_url", os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000"))) + + sys.path.insert(0, "/opt/airflow/ml/experiments/bench") + from collect import main as collect_main # type: ignore + + # Build args for collect.py + args = [ + "--models", models, + "--prompts", prompts, + "--experiment", experiment, + "--n-tips", str(n_tips), + "--temperature", str(temperature), + "--max-model-b", str(max_model_b), + "--ollama-url", ollama_url, + "--mlflow-url", mlflow_url, + ] + if n_scenarios > 0: + args.extend(["--n-scenarios", str(n_scenarios)]) + + # Inject args into sys.argv so argparse picks them up + old_argv = sys.argv + try: + sys.argv = ["collect.py"] + args + result = collect_main() + return { + "status": "success" if result == 0 else "failed", + "exit_code": result, + "experiment": experiment, + } + finally: + sys.argv = old_argv + + +def _compare(**context: object) -> dict: + """Run compare.py to generate the leaderboard.""" + conf: dict = context["dag_run"].conf or {} + experiment = str(conf.get("experiment", "tip-bench-auto")) + mlflow_url = str(conf.get("mlflow_url", os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000"))) + + sys.path.insert(0, "/opt/airflow/ml/experiments/bench") + from compare import main as compare_main # type: ignore + + old_argv = sys.argv + try: + sys.argv = [ + "compare.py", + "--experiment", experiment, + "--mlflow-url", mlflow_url, + ] + result = compare_main() + return { + "status": "success" if result == 0 else "failed", + "exit_code": result, + "experiment": experiment, + } + finally: + sys.argv = old_argv + + +def _export_for_judge(**context: object) -> str: + """Export pending runs for Claude Code judgment.""" + conf: dict = context["dag_run"].conf or {} + experiment = str(conf.get("experiment", "tip-bench-auto")) + mlflow_url = str(conf.get("mlflow_url", os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000"))) + + export_path = f"/tmp/oo-bench-{experiment}-{int(context['ti'].start_date.timestamp())}.json" + + sys.path.insert(0, "/opt/airflow/ml/experiments/bench") + from judge_cli import export # type: ignore + from mlflow_client import MLflowClient # type: ignore + + client = MLflowClient( + tracking_uri=mlflow_url, + username=os.environ.get("MLFLOW_TRACKING_USERNAME") or "admin", + password=os.environ.get("MLFLOW_TRACKING_PASSWORD") or "password", + ) + result = export(client, experiment, export_path) + + # XCom: push path so next task can find it + context["ti"].xcom_push(key="export_path", value=export_path) + + return export_path + + +with DAG( + dag_id="bench_collect", + description="Tip-generation benchmark: model & prompt evaluation via MLflow", + schedule_interval=None, + start_date=datetime(2025, 1, 1), + catchup=False, + tags=["bench", "ml", "evaluation"], + default_args={ + "retries": 1, + "retry_delay": timedelta(minutes=5), + }, +) as dag: + + collect = PythonOperator( + task_id="collect", + python_callable=_collect, + provide_context=True, + ) + + export_judge = PythonOperator( + task_id="export_for_judge", + python_callable=_export_for_judge, + provide_context=True, + ) + + compare = PythonOperator( + task_id="compare", + python_callable=_compare, + provide_context=True, + ) + + collect >> export_judge >> compare diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 2fa4e52..a05697a 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -16,6 +16,7 @@ import { recommenderRouter } from './routes/recommender.js'; import { userRouter } from './routes/user.js'; import { pushRouter } from './routes/push.js'; import { adminRouter, adminInternalRouter } from './routes/admin.js'; +import benchRouter from './routes/bench.js'; import { mkdir } from 'fs/promises'; import { dirname } from 'path'; import { requireAuth } from './middleware/session.js'; @@ -66,6 +67,7 @@ app.use('/api/user', userRouter); app.use('/api/push', pushRouter); app.use('/api/admin', adminRouter); app.use('/api/admin', adminInternalRouter); +app.use('/api/bench', requireAuth as any, requireAdmin as any, benchRouter); app.use('/api/ml', requireAuth as any, requireAdmin as any, async (req: Request, res: Response) => { const mlUrl = config.ML_SERVING_URL; diff --git a/services/api/src/routes/bench.ts b/services/api/src/routes/bench.ts new file mode 100644 index 0000000..886c853 --- /dev/null +++ b/services/api/src/routes/bench.ts @@ -0,0 +1,234 @@ +/** + * Admin API endpoints for the tip-generation benchmark. + * + * Exposes: + * GET /api/bench/experiments — list MLflow experiments + * POST /api/bench/run — trigger benchmark DAG + * GET /api/bench/runs/:experiment — list runs in experiment + * GET /api/bench/leaderboard/:experiment — leaderboard by (model, prompt) + */ + +import { Router, Request, Response } from "express"; +import httpx from "httpx"; +import * as process from "process"; + +const router = Router(); + +const MLFLOW_URL = process.env.MLFLOW_URL || "http://mlflow:5000"; +const MLFLOW_USER = process.env.MLFLOW_TRACKING_USERNAME || "admin"; +const MLFLOW_PASS = process.env.MLFLOW_TRACKING_PASSWORD || "password"; + +const AIRFLOW_URL = process.env.AIRFLOW_URL || "http://airflow-webserver:8080"; +const AIRFLOW_USER = process.env.AIRFLOW_API_USER || "admin"; +const AIRFLOW_PASS = process.env.AIRFLOW_API_PASSWORD || "admin"; + +// Wrapper for MLflow REST calls with Host header fix +async function mlflowFetch( + path: string, + method: string = "GET", + body?: object +): Promise { + const url = new URL(path, MLFLOW_URL); + const headers: Record = { + "Host": "localhost", + "Content-Type": "application/json", + }; + const auth = Buffer.from(`${MLFLOW_USER}:${MLFLOW_PASS}`).toString("base64"); + headers["Authorization"] = `Basic ${auth}`; + + const response = await fetch(url.toString(), { + method, + headers, + body: body ? JSON.stringify(body) : undefined, + }); + + if (!response.ok) { + throw new Error(`MLflow ${response.status}: ${response.statusText}`); + } + return response.json(); +} + +// GET /api/bench/experiments — list available experiments +router.get("/experiments", async (req: Request, res: Response) => { + try { + const result = await mlflowFetch("/api/2.0/mlflow/experiments/search", "GET"); + const experiments = result.experiments + .filter((e: any) => e.name.startsWith("tip-bench")) + .map((e: any) => ({ + id: e.experiment_id, + name: e.name, + creation_time: e.creation_time, + })); + res.json(experiments); + } catch (err) { + res.status(500).json({ error: String(err) }); + } +}); + +// POST /api/bench/run — trigger benchmark DAG +router.post("/run", async (req: Request, res: Response) => { + try { + const config = req.body || {}; + const experiment = config.experiment || "tip-bench-admin"; + + const dagRunUrl = new URL("/api/v1/dags/bench_collect/dagRuns", AIRFLOW_URL); + const auth = Buffer.from(`${AIRFLOW_USER}:${AIRFLOW_PASS}`).toString( + "base64" + ); + + const response = await fetch(dagRunUrl.toString(), { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Basic ${auth}`, + }, + body: JSON.stringify({ + conf: config, + dag_run_id: `bench-${Date.now()}`, + }), + }); + + if (!response.ok) { + throw new Error(`Airflow ${response.status}: ${response.statusText}`); + } + + const result = await response.json(); + res.json({ + status: "triggered", + dag_run_id: result.dag_run_id, + experiment, + }); + } catch (err) { + res.status(500).json({ error: String(err) }); + } +}); + +// GET /api/bench/runs/:experiment — list runs in an experiment +router.get("/runs/:experiment", async (req: Request, res: Response) => { + try { + const { experiment } = req.params; + + // First, get experiment ID + const exps = await mlflowFetch("/api/2.0/mlflow/experiments/search", "GET"); + const exp = exps.experiments.find((e: any) => e.name === experiment); + if (!exp) { + return res.status(404).json({ error: "Experiment not found" }); + } + + // Then, search runs + const result = await mlflowFetch("/api/2.0/mlflow/runs/search", "POST", { + experiment_ids: [exp.experiment_id], + max_results: 1000, + }); + + const runs = (result.runs || []).map((r: any) => { + const params = Object.fromEntries( + (r.data?.params || []).map((p: any) => [p.key, p.value]) + ); + const metrics = Object.fromEntries( + (r.data?.metrics || []).map((m: any) => [m.key, m.value]) + ); + return { + run_id: r.info.run_id, + status: r.info.status, + model: params.model, + prompt_version: params.prompt_version, + scenario_id: params.scenario_id, + composite: metrics.composite || null, + relevance: metrics.relevance || null, + actionability: metrics.actionability || null, + tone: metrics.tone || null, + latency_ms: metrics.latency_ms || null, + }; + }); + + res.json(runs); + } catch (err) { + res.status(500).json({ error: String(err) }); + } +}); + +// GET /api/bench/leaderboard/:experiment — leaderboard +router.get("/leaderboard/:experiment", async (req: Request, res: Response) => { + try { + const { experiment } = req.params; + + // Get experiment ID + const exps = await mlflowFetch("/api/2.0/mlflow/experiments/search", "GET"); + const exp = exps.experiments.find((e: any) => e.name === experiment); + if (!exp) { + return res.status(404).json({ error: "Experiment not found" }); + } + + // Search runs + const result = await mlflowFetch("/api/2.0/mlflow/runs/search", "POST", { + experiment_ids: [exp.experiment_id], + max_results: 1000, + }); + + // Aggregate by (model, prompt) + const cells: Record< + string, + { n: number; composites: number[]; latencies: number[] } + > = {}; + for (const r of result.runs || []) { + const params = Object.fromEntries( + (r.data?.params || []).map((p: any) => [p.key, p.value]) + ); + const metrics = Object.fromEntries( + (r.data?.metrics || []).map((m: any) => [m.key, m.value]) + ); + + if (r.info.status !== "FINISHED") continue; + + const key = `${params.model}|${params.prompt_version}`; + if (!cells[key]) { + cells[key] = { n: 0, composites: [], latencies: [] }; + } + cells[key].n++; + if (metrics.composite !== undefined) { + cells[key].composites.push(metrics.composite); + } + if (metrics.latency_ms !== undefined) { + cells[key].latencies.push(metrics.latency_ms); + } + } + + // Build leaderboard rows + const rows = Object.entries(cells).map(([key, stats]) => { + const [model, prompt] = key.split("|"); + const meanComp = + stats.composites.length > 0 + ? stats.composites.reduce((a, b) => a + b, 0) / stats.composites.length + : null; + const meanLat = + stats.latencies.length > 0 + ? stats.latencies.reduce((a, b) => a + b, 0) / stats.latencies.length + : null; + + return { + model, + prompt, + n: stats.n, + composite: meanComp, + latency_ms: meanLat, + }; + }); + + rows.sort((a, b) => { + const aComp = a.composite !== null ? a.composite : -Infinity; + const bComp = b.composite !== null ? b.composite : -Infinity; + return bComp - aComp; + }); + + res.json({ + experiment, + rows, + winner: rows.length > 0 ? rows[0] : null, + }); + } catch (err) { + res.status(500).json({ error: String(err) }); + } +}); + +export default router;