diff --git a/ml/experiments/bench/README.md b/ml/experiments/bench/README.md new file mode 100644 index 0000000..c08fb48 --- /dev/null +++ b/ml/experiments/bench/README.md @@ -0,0 +1,89 @@ +# `bench/` — combined model + prompt evaluation harness + +Combines the work of issues **#93** (model benchmark) and **#95** (prompt +A/B) into one MLflow-tracked experiment. Each evaluation cell is one +``(model × prompt_version × scenario)`` triple; we vary models and prompt +versions on the same fixed scenario set so quality differences are +attributable rather than confounded. + +## Pieces + +| File | Purpose | +|------|---------| +| `rubric.md` | The scoring rubric (`tip-v1`). Anchor for the human judge across sessions. | +| `scenarios.py` | Deterministic ``(persona × time-slot × tasks)`` contexts; same input across all cells. | +| `mlflow_client.py` | Thin httpx-based MLflow REST wrapper. Handles the local ``--allowed-hosts`` quirk and the file-only artifact backend. | +| `collect.py` | **Phase A.** Generates candidates per cell, logs MLflow runs with `judge_pending=true`. | +| `judge_cli.py` | **Phase B.** `--export` pulls pending runs into one JSON file; the Claude Code session fills in scores; `--apply` writes them back. | +| `compare.py` | **Phase C.** Leaderboard per ``(model, prompt)`` cell. | + +## RAM safety (#93 hard requirement) + +* Models > 4B are **rejected up front** by `collect.py --max-model-b 4.0`. +* Calls to Ollama include ``keep_alive=0``, which unloads the model from + VRAM as soon as the response returns. We never hold two LLM weights + concurrently. +* No mock/embedded judges hold weights either: the human judge is the + Claude Code session, RAM cost zero. + +The pipeline can run on a 15 GiB / 8 GiB-VRAM box (1070-class GPU) end +to end without paging. + +## Quick start + +```bash +# 1. Generate candidates for the (model × prompt) grid +python ml/experiments/bench/collect.py \ + --models qwen2.5:0.5b,qwen2.5:1.5b,gemma3:1b,llama3.2:3b \ + --prompts v1,v2-mentor,v3-few-shot \ + --experiment tip-bench-2026-04-27 \ + --n-tips 5 \ + --diversity + +# 2. Export pending runs for Claude Code to score +python ml/experiments/bench/judge_cli.py \ + --experiment tip-bench-2026-04-27 \ + --export /tmp/oo-bench-judge.json + +# 3. (Claude Code edits /tmp/oo-bench-judge.json, fills scores per rubric.md.) + +# 4. Push scores back to MLflow +python ml/experiments/bench/judge_cli.py \ + --experiment tip-bench-2026-04-27 \ + --apply /tmp/oo-bench-judge.json + +# 5. Leaderboard +python ml/experiments/bench/compare.py --experiment tip-bench-2026-04-27 +``` + +## Why the rubric matters + +Different judging sessions need to be comparable. `rubric.md` pins down +what ``relevance=4`` means with calibrated examples, so a tip scored 4 +today is equivalent to a tip scored 4 next week. Without the rubric, the +"lazy human-in-the-loop" judge drifts. + +## Accessing results in MLflow + +Each run's quality scores (relevance, actionability, tone, composite) are +stored as **metrics** on the MLflow run — accessible via: + +1. **MLflow UI**: experiment `tip-bench-2026-04-27` → click any run → **Metrics** section +2. **Leaderboard**: `python ml/experiments/bench/compare.py --experiment tip-bench-2026-04-27` +3. **Raw API**: `mlflow_client.search_runs()` filters and pulls metrics in bulk + +Candidate tips, prompts, and raw responses are stored as **tags** with +keys `artifact:candidates.json`, `artifact:prompt.txt`, `artifact:raw.txt` +(tag fallback because the MLflow server uses a file:// artifact backend +not accessible via REST from the host). + +## Integrating with Airflow (#95) + +A future DAG `ml/pipelines/prompt_ab_eval.py` will wrap `collect.py` +exactly as shown in the quick-start, triggered on-demand from the admin +UI or manually. The results feed into the admin leaderboard view. + +For now, the pipeline is runnable standalone on any machine with: +- Ollama models ≤4B +- MLflow tracking server +- Python 3.10+ diff --git a/ml/experiments/bench/__init__.py b/ml/experiments/bench/__init__.py new file mode 100644 index 0000000..824ea95 --- /dev/null +++ b/ml/experiments/bench/__init__.py @@ -0,0 +1,18 @@ +"""oO tip-generation benchmark harness. + +Combines model evaluation (#93) and prompt A/B testing (#95) into one +MLflow-tracked experiment. Each evaluation cell is one (model × prompt × +scenario) triple; we vary models and prompts on the same fixed scenario +set so quality differences are attributable rather than confounded. + +The pipeline follows the lazy-judge pattern: collect candidates with +deterministic metrics (latency, format_ok), export to a JSON file for +Claude Code to score per the rubric, apply scores back to MLflow, and +generate a leaderboard. + +RAM safety is enforced: models >4B are rejected, Ollama calls use +keep_alive=0 to unload VRAM immediately, and the human judge (Claude Code +session) has zero inference cost. + +See README.md for usage. +""" diff --git a/ml/experiments/bench/collect.py b/ml/experiments/bench/collect.py new file mode 100644 index 0000000..2e68928 --- /dev/null +++ b/ml/experiments/bench/collect.py @@ -0,0 +1,338 @@ +"""Phase A — collect tip candidates per (model × prompt × scenario) cell. + +Each cell produces one MLflow run with: + + params: model, prompt_version, scenario_id, persona, hour_of_day, + n_tips_requested, temperature + tags: judge_pending=true, judge_kind=claude-code, rubric=tip-v1 + metrics: latency_ms, prompt_tokens (best effort), completion_tokens, + n_parsed, format_ok, mean_diversity (cosine, optional) + artifacts (as tags via mlflow_client.log_text): + prompt.txt system + user prompt as sent + candidates.json parsed candidate array + raw.txt the model's raw response (for triage) + +Models are called **sequentially** with ``keep_alive=0`` so Ollama unloads +the previous model from VRAM before loading the next — keeps the box +within RAM/VRAM budget. Models > 4B are rejected up front. + +Usage: + + python collect.py \\ + --models qwen2.5:0.5b,qwen2.5:1.5b,gemma3:1b,llama3.2:3b \\ + --prompts v1,v2-mentor,v3-few-shot \\ + --n-tips 5 \\ + --experiment tip-bench-2026-04-27 +""" + +from __future__ import annotations + +import argparse +import json +import math +import os +import re +import sys +import time +from dataclasses import asdict +from pathlib import Path + +import httpx + +_BENCH = Path(__file__).resolve().parent +_ML = _BENCH.parent.parent +sys.path.insert(0, str(_BENCH)) +sys.path.insert(0, str(_BENCH.parent / "sim")) +sys.path.insert(0, str(_ML / "serving")) + +from mlflow_client import MLflowClient # type: ignore +from prompts import get_prompt, PROMPTS # type: ignore +from scenarios import build_scenarios # type: ignore + + +# Hard cap mirrors the issue #93 comment: "don't use models larger than 4b +# locally because of RAM limits". A regex cheap-match on the tag handles +# the common ``name:Nb`` and ``name:N.Mb`` forms; anything that doesn't +# match the pattern is allowed (cloud aliases, embeddings, etc.). +_SIZE_TAG = re.compile(r":(\d+(?:\.\d+)?)b\b", re.IGNORECASE) + + +def _model_too_big(model: str, max_b: float = 4.0) -> bool: + m = _SIZE_TAG.search(model) + if not m: + return False + return float(m.group(1)) > max_b + + +def _parse_json_array(raw: str) -> list[dict] | None: + """Best-effort parse — strip markdown fences, then ``json.loads``.""" + text = raw.strip() + if text.startswith("```"): + parts = text.split("```") + text = parts[1] if len(parts) > 1 else text + if text.lstrip().lower().startswith("json"): + text = text.lstrip()[4:] + # Sometimes models prefix with garbage — try to slice from the first ``[``. + if not text.lstrip().startswith("["): + i = text.find("[") + if i >= 0: + text = text[i:] + try: + v = json.loads(text) + return v if isinstance(v, list) else None + except (json.JSONDecodeError, ValueError): + return None + + +def _embed(text: str, ollama_url: str) -> list[float] | None: + """Use nomic-embed-text via Ollama for diversity scoring. ~250MB, + safe to load alongside any 4B chat model thanks to ``keep_alive=0``. + """ + try: + with httpx.Client(trust_env=False, timeout=30.0) as c: + r = c.post( + f"{ollama_url}/api/embeddings", + json={"model": "nomic-embed-text", "prompt": text, "keep_alive": 0}, + ) + r.raise_for_status() + return r.json().get("embedding") + except Exception: + return None + + +def _mean_pairwise_cosine(vecs: list[list[float]]) -> float: + if len(vecs) < 2: + return 0.0 + + def cos(a: list[float], b: list[float]) -> float: + na = math.sqrt(sum(x * x for x in a)) + nb = math.sqrt(sum(x * x for x in b)) + if na == 0 or nb == 0: + return 0.0 + return sum(x * y for x, y in zip(a, b)) / (na * nb) + + n = len(vecs) + total, count = 0.0, 0 + for i in range(n): + for j in range(i + 1, n): + total += cos(vecs[i], vecs[j]) + count += 1 + return total / count if count else 0.0 + + +def _call_ollama( + *, + model: str, + system: str, + user: str, + ollama_url: str, + temperature: float = 0.7, +) -> tuple[str, dict]: + """Direct call to Ollama. Returns (raw_text, telemetry). + + ``keep_alive=0`` is the key RAM-safety lever: the model is unloaded + immediately after the response. The next model in the loop loads + fresh, so we never hold two models in VRAM at once. + """ + t0 = time.perf_counter() + body = { + "model": model, + "messages": [ + {"role": "system", "content": system}, + {"role": "user", "content": user}, + ], + "stream": False, + "keep_alive": 0, + "options": {"temperature": temperature}, + } + with httpx.Client(trust_env=False, timeout=180.0) as c: + r = c.post(f"{ollama_url}/api/chat", json=body) + r.raise_for_status() + data = r.json() + elapsed_ms = (time.perf_counter() - t0) * 1000.0 + raw = data.get("message", {}).get("content", "") + telemetry = { + "latency_ms": elapsed_ms, + # Ollama exposes token counts at top-level of the response when + # ``stream=false``; missing on some older versions, hence the + # ``.get`` defaults. + "prompt_tokens": float(data.get("prompt_eval_count", 0) or 0), + "completion_tokens": float(data.get("eval_count", 0) or 0), + } + return raw, telemetry + + +def main() -> int: + parser = argparse.ArgumentParser(description="oO tip-generation benchmark — Phase A") + parser.add_argument("--models", required=True, + help="Comma-separated model tags (Ollama-side names).") + parser.add_argument("--prompts", default=",".join(PROMPTS.keys()), + help="Comma-separated prompt versions from ml/serving/prompts.py.") + parser.add_argument("--experiment", default="tip-bench-v1", + help="MLflow experiment name.") + parser.add_argument("--n-tips", type=int, default=5, + help="Tips to request per scenario.") + parser.add_argument("--temperature", type=float, default=0.7) + parser.add_argument("--ollama-url", default=os.environ.get("OLLAMA_URL", "http://localhost:11434")) + parser.add_argument("--mlflow-url", default=os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000")) + parser.add_argument("--diversity", action="store_true", + help="Embed each candidate for cosine-diversity metric (~+1s/call).") + parser.add_argument("--max-model-b", type=float, default=4.0, + help="Reject models tagged larger than this many billion params.") + parser.add_argument("--n-scenarios", type=int, default=0, + help="Cap scenario count (0 = use all from scenarios.py).") + parser.add_argument("--rubric", default=str(_BENCH / "rubric.md"), + help="Rubric file logged once per experiment.") + args = parser.parse_args() + + models = [m.strip() for m in args.models.split(",") if m.strip()] + prompts = [p.strip() for p in args.prompts.split(",") if p.strip()] + too_big = [m for m in models if _model_too_big(m, args.max_model_b)] + if too_big: + print(f"ERROR: models exceed --max-model-b={args.max_model_b}: {too_big}", file=sys.stderr) + return 2 + unknown_prompts = [p for p in prompts if p not in PROMPTS] + if unknown_prompts: + print(f"ERROR: unknown prompt versions: {unknown_prompts}. " + f"Available: {list(PROMPTS)}", file=sys.stderr) + return 2 + + scenarios = build_scenarios() + if args.n_scenarios and args.n_scenarios < len(scenarios): + scenarios = scenarios[:args.n_scenarios] + n_cells = len(models) * len(prompts) * len(scenarios) + print(f"Models : {models}") + print(f"Prompts : {prompts}") + print(f"Scenarios : {len(scenarios)}") + print(f"Cells : {n_cells} ({len(models)} × {len(prompts)} × {len(scenarios)})") + print() + + client = MLflowClient( + tracking_uri=args.mlflow_url, + username=os.environ.get("MLFLOW_TRACKING_USERNAME") or "admin", + password=os.environ.get("MLFLOW_TRACKING_PASSWORD") or "password", + ) + exp_id = client.get_or_create_experiment(args.experiment) + print(f"MLflow experiment: {args.experiment} (id={exp_id})") + + rubric_text = Path(args.rubric).read_text(encoding="utf-8") + + # Outer loop is *model* so each model loads once-per-pass instead of + # once-per-cell. With ``keep_alive=0`` that's 1 load per (model × + # scenario × prompt) but Ollama caches recently-touched models for + # the duration of a single HTTP burst — practically each model is + # warm-loaded throughout its sub-loop. + cell_idx = 0 + for model in models: + print(f"── model {model} ──") + for prompt_v in prompts: + prompt = get_prompt(prompt_v) + for sc in scenarios: + cell_idx += 1 + ctx = sc.to_prompt_context() + + class _Ctx: + pass + _ctx = _Ctx() + _ctx.tasks = ctx["tasks"] + _ctx.hour_of_day = ctx["hour_of_day"] + _ctx.day_of_week = ctx["day_of_week"] + _ctx.extra = ctx["extra"] + user_msg = prompt.build_user(_ctx, args.n_tips) + + run_id = client.create_run( + exp_id, + run_name=f"{model}__{prompt_v}__{sc.id}", + tags={ + "judge_pending": "true", + "judge_kind": "claude-code", + "rubric": "tip-v1", + "model": model, + "prompt_version": prompt_v, + "scenario_id": sc.id, + "persona": sc.persona.name, + }, + ) + client.log_params(run_id, { + "model": model, + "prompt_version": prompt_v, + "scenario_id": sc.id, + "persona": sc.persona.name, + "hour_of_day": sc.hour_of_day, + "day_of_week": sc.day_of_week, + "n_tips_requested": args.n_tips, + "temperature": args.temperature, + }) + + try: + raw, telemetry = _call_ollama( + model=model, + system=prompt.system, + user=user_msg, + ollama_url=args.ollama_url, + temperature=args.temperature, + ) + except Exception as e: + print(f" [{cell_idx}/{n_cells}] {model} {prompt_v} {sc.id}: ERROR {e}") + client.set_tag(run_id, "error", str(e)[:500]) + client.end_run(run_id, status="FAILED") + continue + + items = _parse_json_array(raw) + format_ok = 1.0 if items is not None else 0.0 + items = items or [] + + # Filter to dict-shaped items only (some models return string lists). + cand_dicts = [ + { + "id": str(it.get("id", f"tip-{i}")), + "content": str(it.get("content", "")), + "rationale": str(it.get("rationale", "")), + } + for i, it in enumerate(items) + if isinstance(it, dict) + ] + n_parsed = float(len(cand_dicts)) + + metrics = { + "latency_ms": telemetry["latency_ms"], + "prompt_tokens": telemetry["prompt_tokens"], + "completion_tokens": telemetry["completion_tokens"], + "n_parsed": n_parsed, + "format_ok": format_ok, + } + + if args.diversity and len(cand_dicts) >= 2: + embs = [] + for c in cand_dicts: + e = _embed(c["content"], args.ollama_url) + if e: + embs.append(e) + if len(embs) >= 2: + # Cosine *similarity* — lower means more diverse, so + # we report ``mean_diversity = 1 - sim``. + sim = _mean_pairwise_cosine(embs) + metrics["mean_diversity"] = 1.0 - sim + + client.log_metrics(run_id, metrics) + client.log_text(run_id, prompt.system + "\n\n---\n\n" + user_msg, "prompt.txt") + client.log_text(run_id, json.dumps(cand_dicts, indent=2), "candidates.json") + client.log_text(run_id, raw[:9_000], "raw.txt") + # Persist the rubric exactly once per experiment as a parameter + # of every run — cheap, but means every run is self-describing. + client.set_tag(run_id, "rubric_md", rubric_text[: client._TAG_VALUE_LIMIT]) + + client.end_run(run_id) + print(f" [{cell_idx:>3}/{n_cells}] {model:18s} {prompt_v:12s} {sc.id:24s} " + f"lat={metrics['latency_ms']:>6.0f}ms parsed={int(n_parsed)}/{args.n_tips} " + f"fmt={int(format_ok)}") + + print() + print(f"Phase A complete. Run judge_cli.py --export to score pending runs.") + print(f" python ml/experiments/bench/judge_cli.py --experiment {args.experiment} \\") + print(f" --export /tmp/oo-bench-judge-requests.json") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/ml/experiments/bench/compare.py b/ml/experiments/bench/compare.py new file mode 100644 index 0000000..bec3751 --- /dev/null +++ b/ml/experiments/bench/compare.py @@ -0,0 +1,144 @@ +"""Phase C — leaderboard from judged MLflow runs. + +Pulls every judged run (``judge_pending=false`` or any run with the +composite metric set) from the experiment, groups by (model, prompt) +cell, and prints a leaderboard sorted by mean composite score. + +Also reports the deterministic-only metrics (latency, format_ok) so +cells with great prose but broken JSON are visible. +""" + +from __future__ import annotations + +import argparse +import os +import statistics +import sys +from collections import defaultdict +from pathlib import Path + +_BENCH = Path(__file__).resolve().parent +sys.path.insert(0, str(_BENCH)) +from mlflow_client import MLflowClient # type: ignore + + +def _params(run: dict) -> dict[str, str]: + return {p["key"]: p["value"] for p in run["data"].get("params", [])} + + +def _metrics(run: dict) -> dict[str, float]: + return {m["key"]: m["value"] for m in run["data"].get("metrics", [])} + + +def _tags(run: dict) -> dict[str, str]: + return {t["key"]: t["value"] for t in run["data"].get("tags", [])} + + +def main() -> int: + parser = argparse.ArgumentParser(description="oO bench — Phase C (leaderboard)") + parser.add_argument("--experiment", required=True) + parser.add_argument("--mlflow-url", default=os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000")) + parser.add_argument("--include-pending", action="store_true", + help="Also include rows with no quality scores (latency/format only).") + args = parser.parse_args() + + client = MLflowClient( + tracking_uri=args.mlflow_url, + username=os.environ.get("MLFLOW_TRACKING_USERNAME") or "admin", + password=os.environ.get("MLFLOW_TRACKING_PASSWORD") or "password", + ) + exp_id = client.get_or_create_experiment(args.experiment) + runs = client.search_runs(exp_id, max_results=2000) + + # Group key = (model, prompt_version) + cells: dict[tuple[str, str], list[dict]] = defaultdict(list) + for r in runs: + params = _params(r) + metrics = _metrics(r) + tags = _tags(r) + if r["info"].get("status") != "FINISHED": + continue + if not args.include_pending and "composite" not in metrics: + continue + cells[(params.get("model", "?"), params.get("prompt_version", "?"))].append({ + "metrics": metrics, + "scenario": params.get("scenario_id", "?"), + "judged": tags.get("judge_pending") == "false", + }) + + if not cells: + print("No judged runs found. Did you run judge_cli.py --apply?") + return 1 + + rows = [] + for (model, prompt), records in cells.items(): + n = len(records) + comp = [r["metrics"]["composite"] for r in records if "composite" in r["metrics"]] + rel = [r["metrics"]["relevance"] for r in records if "relevance" in r["metrics"]] + act = [r["metrics"]["actionability"] for r in records if "actionability" in r["metrics"]] + tone = [r["metrics"]["tone"] for r in records if "tone" in r["metrics"]] + lat = [r["metrics"]["latency_ms"] for r in records if "latency_ms" in r["metrics"]] + fmt = [r["metrics"]["format_ok"] for r in records if "format_ok" in r["metrics"]] + div = [r["metrics"]["mean_diversity"] for r in records if "mean_diversity" in r["metrics"]] + + rows.append({ + "model": model, + "prompt": prompt, + "n": n, + "composite": statistics.mean(comp) if comp else None, + "relevance": statistics.mean(rel) if rel else None, + "actionability": statistics.mean(act) if act else None, + "tone": statistics.mean(tone) if tone else None, + "format_ok": statistics.mean(fmt) if fmt else None, + "latency_p50": statistics.median(lat) if lat else None, + "latency_p95": _p95(lat) if lat else None, + "diversity": statistics.mean(div) if div else None, + }) + + rows.sort(key=lambda r: r["composite"] if r["composite"] is not None else -1, reverse=True) + + # Width-fitted printer — keeps output legible in a 100-col terminal. + print() + print(f"Experiment: {args.experiment} (id={exp_id})") + print(f"Cells : {len(rows)}") + print() + header = ( + f"{'#':>2} {'model':18s} {'prompt':12s} {'n':>3s} " + f"{'comp':>5s} {'rel':>4s} {'act':>4s} {'tone':>4s} " + f"{'fmt':>4s} {'p50':>6s} {'p95':>6s} {'div':>5s}" + ) + print(header) + print("─" * len(header)) + for i, r in enumerate(rows, 1): + comp = f"{r['composite']:.2f}" if r["composite"] is not None else " -- " + rel = f"{r['relevance']:.1f}" if r["relevance"] is not None else " -- " + act = f"{r['actionability']:.1f}" if r["actionability"] is not None else " -- " + tone = f"{r['tone']:.1f}" if r["tone"] is not None else " -- " + fmt = f"{r['format_ok']:.2f}" if r["format_ok"] is not None else " -- " + p50 = f"{r['latency_p50']:.0f}" if r["latency_p50"] is not None else " -- " + p95 = f"{r['latency_p95']:.0f}" if r["latency_p95"] is not None else " -- " + div = f"{r['diversity']:.2f}" if r["diversity"] is not None else " -- " + print( + f"{i:>2} {r['model']:18s} {r['prompt']:12s} {r['n']:>3d} " + f"{comp:>5s} {rel:>4s} {act:>4s} {tone:>4s} " + f"{fmt:>4s} {p50:>6s} {p95:>6s} {div:>5s}" + ) + + if rows[0]["composite"] is not None: + winner = rows[0] + print() + print(f"Winner: {winner['model']} × {winner['prompt']} " + f"(composite={winner['composite']:.2f}, n={winner['n']})") + return 0 + + +def _p95(xs: list[float]) -> float: + if not xs: + return 0.0 + s = sorted(xs) + idx = max(0, int(round(0.95 * (len(s) - 1)))) + return s[idx] + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/ml/experiments/bench/judge_cli.py b/ml/experiments/bench/judge_cli.py new file mode 100644 index 0000000..fd809fe --- /dev/null +++ b/ml/experiments/bench/judge_cli.py @@ -0,0 +1,191 @@ +"""Phase B — Claude Code as the lazy MLflow judge. + +Two sub-commands, both keyed to MLflow tags so the same run cycles +through ``judge_pending=true`` → judged → ``judge_pending=false`` exactly +once. + + --export PATH + Pull every run with ``judge_pending=true`` and ``judge_kind=claude-code`` + from the experiment, bundle the prompt + parsed candidates + the + rubric into a single JSON file the Claude Code session can read. + + --apply PATH + Read the responses (same shape as the request, with ``scores`` filled in) + and log ``relevance``, ``actionability``, ``tone``, ``overlong`` as + MLflow metrics on the corresponding runs. Sets ``judge_pending=false`` + and stamps ``judged_at`` / ``judged_by`` so the run won't be picked up + twice. + +The request file is intentionally one big JSON document, so the human +judge sees the full set in one place and can score consistently. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +from pathlib import Path + +_BENCH = Path(__file__).resolve().parent +sys.path.insert(0, str(_BENCH)) +from mlflow_client import MLflowClient # type: ignore + + +_DIMENSIONS = ("relevance", "actionability", "tone") +_BIN_FLAGS = ("overlong",) + + +def _tags_dict(run: dict) -> dict[str, str]: + return {t["key"]: t["value"] for t in run.get("data", {}).get("tags", [])} + + +def _params_dict(run: dict) -> dict[str, str]: + return {p["key"]: p["value"] for p in run.get("data", {}).get("params", [])} + + +def export(client: MLflowClient, experiment: str, out_path: str) -> int: + exp_id = client.get_or_create_experiment(experiment) + runs = client.search_runs( + exp_id, + filter_string="tags.judge_pending = 'true' and tags.judge_kind = 'claude-code'", + ) + if not runs: + print("No pending runs.") + Path(out_path).write_text(json.dumps({ + "experiment": experiment, + "exported_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "rubric": "tip-v1", + "items": [], + }, indent=2)) + return 0 + + rubric_text = (_BENCH / "rubric.md").read_text(encoding="utf-8") + + items: list[dict] = [] + for run in runs: + run_id = run["info"]["run_id"] + tags = _tags_dict(run) + params = _params_dict(run) + candidates_json = client.get_artifact_text(run_id, "candidates.json") + prompt_text = client.get_artifact_text(run_id, "prompt.txt") + try: + candidates = json.loads(candidates_json) if candidates_json else [] + except json.JSONDecodeError: + candidates = [] + + items.append({ + "run_id": run_id, + "model": params.get("model") or tags.get("model"), + "prompt_version": params.get("prompt_version") or tags.get("prompt_version"), + "scenario_id": params.get("scenario_id") or tags.get("scenario_id"), + "persona": params.get("persona") or tags.get("persona"), + "hour_of_day": int(params.get("hour_of_day", "12")), + "day_of_week": int(params.get("day_of_week", "0")), + "prompt": prompt_text, + "candidates": candidates, + # Per-run scoring slot — judge fills these in. + "scores": { + "relevance": None, # 1–5, integer + "actionability": None, # 1–5, integer + "tone": None, # 1–5, integer + "overlong": None, # 0/1 + "notes": "", # short comment, optional + }, + }) + + out = { + "experiment": experiment, + "exported_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "rubric": "tip-v1", + "rubric_md": rubric_text, + "items": items, + } + Path(out_path).write_text(json.dumps(out, indent=2, ensure_ascii=False)) + print(f"Exported {len(items)} pending runs → {out_path}") + return 0 + + +def apply(client: MLflowClient, experiment: str, in_path: str) -> int: + exp_id = client.get_or_create_experiment(experiment) + payload = json.loads(Path(in_path).read_text(encoding="utf-8")) + items = payload.get("items", []) + if not items: + print("No items in response file.") + return 0 + + judged_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + n_applied, n_skipped = 0, 0 + for item in items: + run_id = item["run_id"] + scores = item.get("scores") or {} + + missing = [d for d in _DIMENSIONS if scores.get(d) in (None, "")] + if missing: + print(f" [skip] {run_id}: missing {missing}") + n_skipped += 1 + continue + + metrics = {d: float(scores[d]) for d in _DIMENSIONS} + for f in _BIN_FLAGS: + v = scores.get(f) + if v not in (None, ""): + metrics[f] = float(int(bool(int(v)))) + + # Composite mirrors rubric.md: relevance + actionability + tone + # + 2 * format_ok - overlong. format_ok is already a metric on + # the run from collect.py; re-fetching is cheap and keeps this + # script idempotent if format compliance was retroactively fixed. + run = client._get("/runs/get", {"run_id": run_id})["run"] + existing_metrics = {m["key"]: m["value"] for m in run["data"].get("metrics", [])} + format_ok = float(existing_metrics.get("format_ok", 0.0)) + overlong = metrics.get("overlong", 0.0) + composite = ( + metrics["relevance"] + metrics["actionability"] + metrics["tone"] + + 2 * format_ok - overlong + ) + metrics["composite"] = composite + + client.log_metrics(run_id, metrics) + client.set_tags(run_id, { + "judge_pending": "false", + "judged_at": judged_at, + "judged_by": "claude-code-session", + }) + if scores.get("notes"): + client.set_tag(run_id, "judge_notes", str(scores["notes"])[:1000]) + + n_applied += 1 + print(f" [ok] {run_id}: rel={metrics['relevance']:.1f} " + f"act={metrics['actionability']:.1f} tone={metrics['tone']:.1f} " + f"comp={composite:.2f}") + + print(f"Applied {n_applied}, skipped {n_skipped}.") + return 0 + + +def main() -> int: + parser = argparse.ArgumentParser(description="oO bench — Phase B (Claude Code judge)") + parser.add_argument("--experiment", required=True) + parser.add_argument("--mlflow-url", default=os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000")) + grp = parser.add_mutually_exclusive_group(required=True) + grp.add_argument("--export", metavar="PATH", + help="Write pending runs as a judgment-request JSON file.") + grp.add_argument("--apply", metavar="PATH", + help="Read filled-in responses and write metrics back to MLflow.") + args = parser.parse_args() + + client = MLflowClient( + tracking_uri=args.mlflow_url, + username=os.environ.get("MLFLOW_TRACKING_USERNAME") or "admin", + password=os.environ.get("MLFLOW_TRACKING_PASSWORD") or "password", + ) + if args.export: + return export(client, args.experiment, args.export) + return apply(client, args.experiment, args.apply) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/ml/experiments/bench/mlflow_client.py b/ml/experiments/bench/mlflow_client.py new file mode 100644 index 0000000..9eaa2ac --- /dev/null +++ b/ml/experiments/bench/mlflow_client.py @@ -0,0 +1,202 @@ +"""Thin MLflow REST wrapper. + +Why not the official ``mlflow`` SDK? Two reasons specific to the oO setup: + +1. The MLflow server (3.11) ships with ``--allowed-hosts localhost`` but + curl / requests / urllib3 send ``Host: localhost:5000`` — the port + suffix fails the DNS-rebinding check. We override the Host header per + request, which the SDK doesn't expose. +2. The collect/judge phases only need ~6 endpoints (create/search/log). + Pulling a 200MB SDK transitively for that is excess weight. + +All calls are synchronous httpx with explicit ``Host`` so the script can +run from the host shell, from inside docker, or from Airflow workers +without further config. +""" + +from __future__ import annotations + +import os +import time +from dataclasses import dataclass +from typing import Any + +import httpx + + +def _strip_path(uri: str) -> tuple[str, str]: + """Return (origin, path_prefix) — handles both /mlflow and / roots. + + ``http://mlflow:5000/mlflow`` → ("http://mlflow:5000", "/mlflow") + ``http://localhost:5000`` → ("http://localhost:5000", "") + """ + uri = uri.rstrip("/") + if "/" not in uri.split("://", 1)[1]: + return uri, "" + scheme_host, _, rest = uri.partition("://") + host, _, path = rest.partition("/") + return f"{scheme_host}://{host}", "/" + path if path else "" + + +@dataclass +class MLflowClient: + tracking_uri: str + username: str | None = None + password: str | None = None + host_header: str | None = None # override for DNS-rebinding sidestep + timeout: float = 30.0 + + def __post_init__(self) -> None: + self._origin, self._ui_prefix = _strip_path(self.tracking_uri) + # MLflow 3.x exposes the REST API at the root, *not* under the + # ``/mlflow`` UI prefix. Empirically verified against the running + # ghcr.io/mlflow/mlflow:v3.11.1 container. + self._api = f"{self._origin}/api/2.0/mlflow" + self._auth = (self.username, self.password) if self.username else None + # If user did not pass a host header, derive from origin. Strip + # the port if present — the server's allowed-hosts check rejects + # ``localhost:5000`` even when ``localhost`` is allowed. + if self.host_header is None: + host = self._origin.split("://", 1)[1] + self.host_header = host.split(":", 1)[0] + + @classmethod + def from_env(cls) -> "MLflowClient": + return cls( + tracking_uri=os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000"), + username=os.environ.get("MLFLOW_TRACKING_USERNAME") or "admin", + password=os.environ.get("MLFLOW_TRACKING_PASSWORD") or "password", + host_header=os.environ.get("MLFLOW_HOST_HEADER"), + ) + + def _headers(self) -> dict[str, str]: + return {"Host": self.host_header or "localhost"} + + def _post(self, path: str, body: dict) -> dict: + with httpx.Client(trust_env=False, timeout=self.timeout) as c: + r = c.post(f"{self._api}{path}", json=body, headers=self._headers(), auth=self._auth) + r.raise_for_status() + return r.json() + + def _get(self, path: str, params: dict | None = None) -> dict: + with httpx.Client(trust_env=False, timeout=self.timeout) as c: + r = c.get(f"{self._api}{path}", params=params or {}, headers=self._headers(), auth=self._auth) + r.raise_for_status() + return r.json() + + # ── Experiments ──────────────────────────────────────────────────── + + def get_or_create_experiment(self, name: str) -> str: + try: + r = self._get("/experiments/get-by-name", {"experiment_name": name}) + return r["experiment"]["experiment_id"] + except httpx.HTTPStatusError as e: + if e.response.status_code not in (404, 400): + raise + r = self._post("/experiments/create", {"name": name}) + return r["experiment_id"] + + # ── Runs ─────────────────────────────────────────────────────────── + + def create_run( + self, + experiment_id: str, + run_name: str, + tags: dict[str, str] | None = None, + ) -> str: + body: dict[str, Any] = { + "experiment_id": experiment_id, + "start_time": int(time.time() * 1000), + "run_name": run_name, + "tags": [ + {"key": k, "value": str(v)} + for k, v in (tags or {}).items() + ], + } + r = self._post("/runs/create", body) + return r["run"]["info"]["run_id"] + + def log_param(self, run_id: str, key: str, value: Any) -> None: + self._post("/runs/log-parameter", {"run_id": run_id, "key": key, "value": str(value)}) + + def log_params(self, run_id: str, params: dict[str, Any]) -> None: + for k, v in params.items(): + self.log_param(run_id, k, v) + + def log_metric(self, run_id: str, key: str, value: float, step: int = 0) -> None: + self._post("/runs/log-metric", { + "run_id": run_id, + "key": key, + "value": float(value), + "timestamp": int(time.time() * 1000), + "step": step, + }) + + def log_metrics(self, run_id: str, metrics: dict[str, float]) -> None: + for k, v in metrics.items(): + self.log_metric(run_id, k, v) + + def set_tag(self, run_id: str, key: str, value: str) -> None: + self._post("/runs/set-tag", {"run_id": run_id, "key": key, "value": str(value)}) + + def set_tags(self, run_id: str, tags: dict[str, str]) -> None: + for k, v in tags.items(): + self.set_tag(run_id, k, v) + + # MLflow tag values are capped at 5000 chars by the server (RESOURCE_DOES_NOT_EXIST + # below that, INVALID_PARAMETER_VALUE above). 4500 leaves headroom for + # internal metadata MLflow may append on its own. + _TAG_VALUE_LIMIT = 4500 + + def log_text(self, run_id: str, text: str, artifact_path: str) -> None: + """Persist short text alongside the run. + + The MLflow server in this deployment uses a ``file://`` artifact + backend, which is only reachable from inside the container — not + via the REST proxy. We instead stash short payloads as tags + keyed ``artifact:``. Anything longer than 4500 chars is + chunked into ``artifact::0``, ``:1`` …; ``get_artifact_text`` + re-stitches them in order. + """ + key_base = f"artifact:{artifact_path}" + if len(text) <= self._TAG_VALUE_LIMIT: + self.set_tag(run_id, key_base, text) + return + # chunk + for i in range(0, len(text), self._TAG_VALUE_LIMIT): + self.set_tag(run_id, f"{key_base}:{i // self._TAG_VALUE_LIMIT}", + text[i:i + self._TAG_VALUE_LIMIT]) + + def get_artifact_text(self, run_id: str, artifact_path: str) -> str: + run = self._get("/runs/get", {"run_id": run_id})["run"] + tags = {t["key"]: t["value"] for t in run["data"].get("tags", [])} + key_base = f"artifact:{artifact_path}" + if key_base in tags: + return tags[key_base] + # chunked form + chunks = sorted( + (k for k in tags if k.startswith(f"{key_base}:")), + key=lambda k: int(k.rsplit(":", 1)[1]), + ) + return "".join(tags[k] for k in chunks) + + def end_run(self, run_id: str, status: str = "FINISHED") -> None: + self._post("/runs/update", { + "run_id": run_id, + "status": status, + "end_time": int(time.time() * 1000), + }) + + def search_runs( + self, + experiment_id: str, + filter_string: str = "", + max_results: int = 1000, + ) -> list[dict]: + body = { + "experiment_ids": [experiment_id], + "filter": filter_string, + "max_results": max_results, + } + r = self._post("/runs/search", body) + return r.get("runs", []) diff --git a/ml/experiments/bench/rubric.md b/ml/experiments/bench/rubric.md new file mode 100644 index 0000000..633c09b --- /dev/null +++ b/ml/experiments/bench/rubric.md @@ -0,0 +1,85 @@ +# Tip-quality rubric — `tip-v1` + +This file is the consistency anchor for the Claude Code judge. The same +rubric is used across every judging session so verdicts are comparable +across runs (per the lazy-judge pattern in #95). + +Each candidate tip is scored on three independent 1–5 dimensions, plus +two binary flags. Score the **content of the tip itself** for the given +persona/context — do not score the rationale. + +## Dimensions + +### relevance — 1 to 5 +How well does the tip respond to *this specific persona at this specific +time*? A generic productivity platitude is 1; a tip that hooks into the +persona's stated preferences and the actual hour-of-day is 5. + +| score | description | +|-------|-------------| +| 1 | Boilerplate. Could apply to any user, any time. | +| 2 | Vaguely fits the persona but ignores context. | +| 3 | Fits the persona OR the time, not both. | +| 4 | Fits both persona and time, with one specific anchor (a task, an hour, a habit). | +| 5 | Specific to the persona's preferences AND respects the hour, with a clear hook into a candidate task or routine. | + +### actionability — 1 to 5 +Could the user *do this in the next 10 minutes* without further planning? +"Try to focus more" is 1; "Spend 12 minutes on the Call dentist task and +stop when the timer ends" is 5. + +| score | description | +|-------|-------------| +| 1 | Pure encouragement, no action. | +| 2 | Action exists but vague ("review your tasks"). | +| 3 | Concrete verb + object, but missing the time/duration handle. | +| 4 | Concrete action with a duration or trigger ("for 10 minutes", "before lunch"). | +| 5 | Micro-action with explicit start, duration, and a stop condition. | + +### tone — 1 to 5 +Does the tip sound like a calm, specific mentor (the product voice) or +like a generic chatbot/coach? Penalize emoji-spam, exclamation marks, +hype words ("amazing!", "let's crush it!"), and corporate jargon. + +| score | description | +|-------|-------------| +| 1 | Hype, jargon, or motivational-poster tone. | +| 2 | Polite chatbot tone, no warmth. | +| 3 | Neutral, businesslike. | +| 4 | Quiet and specific, like a coach who knows you. | +| 5 | Earned. Reads like a mentor who has seen this exact stuck-pattern before. | + +## Binary flags + +### format_ok — 0 or 1 +1 if the *whole response* parsed as a JSON array of objects with the +required keys (`id`, `content`, `rationale`). 0 otherwise. **This is +computed automatically by `collect.py`** — judges should not override it. + +### overlong — 0 or 1 +1 if `content` exceeds the documented 2-sentence cap (count sentence- +ending punctuation `. ! ?`). Judges may flag this as a tiebreaker. + +## Composite score + +`compare.py` ranks cells by: + +``` +composite = relevance + actionability + tone + 2*format_ok - overlong +``` + +i.e. format compliance is a doubled weight (a malformed JSON is a hard +production failure regardless of how good the prose is). + +## Calibration examples + +(Shared with judges so a 4 means the same thing across sessions.) + +**Persona**: deadline-driven (responds to overdue/high-priority, +morning-active). **Hour**: 09:00. **Tasks include**: an overdue +"Call dentist", priority 4. + +- "Stay focused and make today count!" — relevance 1, actionability 1, tone 1. +- "Review your tasks and pick one that matters." — relevance 2, actionability 2, tone 3. +- "Spend the next 12 minutes on Call dentist — set a timer and stop when it rings." — relevance 5, actionability 5, tone 4. +- "It's 09:00 — you respond to overdue items best now. Block 12 minutes for Call dentist before your first meeting." — relevance 5, actionability 5, tone 5. diff --git a/ml/experiments/bench/scenarios.py b/ml/experiments/bench/scenarios.py new file mode 100644 index 0000000..35947b5 --- /dev/null +++ b/ml/experiments/bench/scenarios.py @@ -0,0 +1,80 @@ +"""Fixed contexts for the tip-generation benchmark. + +Every cell of the (model × prompt) grid is evaluated on the *same* set of +scenarios so quality differences are attributable to the model/prompt, +not to context variance. + +A scenario is one (persona, hour-of-day, candidate-task-pool) tuple. The +hour and the task pool are seeded deterministically from the persona's +name so the bench is reproducible across machines. +""" + +from __future__ import annotations + +import sys +from dataclasses import dataclass +from pathlib import Path + +# Reuse personas from sim — same source of truth for user archetypes. +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "sim")) +from personas import PERSONAS, Persona # type: ignore +from task_generator import generate_task_pool # type: ignore + + +@dataclass(frozen=True) +class Scenario: + id: str # stable id used as MLflow tag — keep ASCII safe + persona: Persona + hour_of_day: int # 0–23 + day_of_week: int # 0=Mon + tasks: list[dict] + + def to_prompt_context(self) -> dict: + """Shape expected by ml/serving/prompts.PromptContext.""" + return { + "tasks": [ + { + "content": t["content"], + "priority": t["features"]["priority"], + "is_overdue": t["features"]["is_overdue"], + "due_date": t.get("due_date", "no due date"), + } + for t in self.tasks + ], + "hour_of_day": self.hour_of_day, + "day_of_week": self.day_of_week, + "extra": { + "persona": self.persona.name, + "persona_hint": self.persona.description, + }, + } + + +# Two time-slots probe whether the model adapts its tone to the hour. +# Morning (09) and evening (21) are picked because most personas have +# strong directional preferences there. +_TIME_SLOTS = [(9, 1), (21, 3)] # (hour_of_day, day_of_week) + + +def build_scenarios(tasks_per_scenario: int = 6) -> list[Scenario]: + """Return a deterministic list of scenarios. + + With 4 personas × 2 time-slots = 8 scenarios. Task pools are seeded + by ``hash(persona.name) + hour`` so runs are reproducible and each + persona sees the same tasks at the same hour across cells. + """ + out: list[Scenario] = [] + for persona in PERSONAS[:4]: + for hour, dow in _TIME_SLOTS: + seed = (abs(hash(persona.name)) % 9973) + hour + tasks = generate_task_pool(n=tasks_per_scenario, seed=seed) + out.append( + Scenario( + id=f"{persona.name}-h{hour:02d}", + persona=persona, + hour_of_day=hour, + day_of_week=dow, + tasks=tasks, + ) + ) + return out