feat(simulate): MLflow tracking, Airflow DAG integration, health checks for mlflow/airflow
- sim_runs schema: add judge_mode, n_policies, airflow_dag_run_id, mlflow_run_id columns - admin health endpoint: add mlflow + airflow checks (Basic auth for Airflow API) - admin nav: add Simulations page link; rename section label - runner.py: optional MLflow experiment tracking; multi-policy support - sim_dag.py: Airflow DAG for offline sim pipeline - admin simulate page + API client methods for sim runs - shared-types tsconfig: exclude test files from build Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -26,6 +26,7 @@ from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
import time
|
||||
@@ -40,6 +41,12 @@ from llm_judge import ACTIONS, infer_reward, judge
|
||||
from personas import PERSONAS, Persona
|
||||
from task_generator import generate_task_pool
|
||||
|
||||
try:
|
||||
import mlflow
|
||||
_MLFLOW_AVAILABLE = True
|
||||
except ImportError:
|
||||
_MLFLOW_AVAILABLE = False
|
||||
|
||||
POLICY_SCORE_ENDPOINTS: dict[str, str] = {
|
||||
"linucb-v1": "/score",
|
||||
"egreedy-v1": "/score/egreedy",
|
||||
@@ -107,14 +114,30 @@ def _call_reward(
|
||||
|
||||
# ── Standard single-pass runner (rule / llm modes) ─────────────────────────
|
||||
|
||||
def _init_mlflow(mlflow_url: str | None, experiment: str) -> str | None:
|
||||
"""Set up MLflow tracking and return the active run_id, or None if unavailable."""
|
||||
if not _MLFLOW_AVAILABLE or not mlflow_url:
|
||||
return None
|
||||
try:
|
||||
mlflow.set_tracking_uri(mlflow_url)
|
||||
mlflow.set_experiment(experiment)
|
||||
return "ready"
|
||||
except Exception as e:
|
||||
print(f" [warn] MLflow init failed: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def run_simulation(
|
||||
n_users: int, n_rounds: int, tasks_per_round: int,
|
||||
ml_url: str, policies: list[str], use_llm: bool, seed: int,
|
||||
mlflow_url: str | None = None, mlflow_experiment: str = "bandit_simulation",
|
||||
) -> dict:
|
||||
rng = random.Random(seed)
|
||||
run_id = str(uuid.uuid4())[:8]
|
||||
started_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
||||
|
||||
_init_mlflow(mlflow_url, mlflow_experiment)
|
||||
|
||||
user_personas = [
|
||||
(f"sim-{run_id}-u{i}", PERSONAS[i % len(PERSONAS)])
|
||||
for i in range(n_users)
|
||||
@@ -130,62 +153,101 @@ def run_simulation(
|
||||
}
|
||||
events: list[dict] = []
|
||||
|
||||
with httpx.Client(trust_env=False) as client:
|
||||
for rnd in range(n_rounds):
|
||||
hour = rng.randint(6, 22)
|
||||
dow = rng.randint(0, 6)
|
||||
round_rewards = {p: 0.0 for p in policies}
|
||||
mlflow_run_id: str | None = None
|
||||
mlflow_ctx = (
|
||||
mlflow.start_run(run_name=run_id)
|
||||
if (_MLFLOW_AVAILABLE and mlflow_url)
|
||||
else None
|
||||
)
|
||||
|
||||
for user_id, persona in user_personas:
|
||||
seed_tasks = rnd * 997 + abs(hash(user_id)) % 997
|
||||
tasks = generate_task_pool(n=tasks_per_round, seed=seed_tasks)
|
||||
try:
|
||||
if mlflow_ctx:
|
||||
active = mlflow_ctx.__enter__()
|
||||
mlflow_run_id = active.info.run_id
|
||||
mlflow.log_params({
|
||||
"n_users": n_users,
|
||||
"n_rounds": n_rounds,
|
||||
"tasks_per_round": tasks_per_round,
|
||||
"policies": ",".join(policies),
|
||||
"judge": "llm" if use_llm else "rule",
|
||||
"seed": seed,
|
||||
})
|
||||
|
||||
# Per-persona profile features for v2 (synthetic for sim — see ADR-0012)
|
||||
profile = persona.profile_features(hour) if hasattr(persona, "profile_features") else None
|
||||
with httpx.Client(trust_env=False) as client:
|
||||
for rnd in range(n_rounds):
|
||||
hour = rng.randint(6, 22)
|
||||
dow = rng.randint(0, 6)
|
||||
round_rewards = {p: 0.0 for p in policies}
|
||||
|
||||
for policy in policies:
|
||||
p_user = f"{user_id}-{policy}"
|
||||
scored = _call_score(client, ml_url, policy, p_user, tasks, hour, dow,
|
||||
profile_features=profile)
|
||||
if not scored:
|
||||
continue
|
||||
tip_id = scored.get("tip_id")
|
||||
tip = next((t for t in tasks if t["id"] == tip_id), None)
|
||||
if not tip:
|
||||
continue
|
||||
for user_id, persona in user_personas:
|
||||
seed_tasks = rnd * 997 + abs(hash(user_id)) % 997
|
||||
tasks = generate_task_pool(n=tasks_per_round, seed=seed_tasks)
|
||||
profile = persona.profile_features(hour) if hasattr(persona, "profile_features") else None
|
||||
|
||||
action, dwell_ms, reward = judge(persona, tip, hour, dow, rng, use_llm=use_llm)
|
||||
_call_reward(client, ml_url, policy, p_user, tip_id, reward, {
|
||||
"hour_of_day": hour,
|
||||
"is_overdue": tip["features"]["is_overdue"],
|
||||
"task_age_days": tip["features"]["task_age_days"],
|
||||
"priority": tip["features"]["priority"],
|
||||
}, day_of_week=dow, profile_features=profile)
|
||||
for policy in policies:
|
||||
p_user = f"{user_id}-{policy}"
|
||||
scored = _call_score(client, ml_url, policy, p_user, tasks, hour, dow,
|
||||
profile_features=profile)
|
||||
if not scored:
|
||||
continue
|
||||
tip_id = scored.get("tip_id")
|
||||
tip = next((t for t in tasks if t["id"] == tip_id), None)
|
||||
if not tip:
|
||||
continue
|
||||
|
||||
acc[policy]["total_reward"] += reward
|
||||
acc[policy]["n_pulls"] += 1
|
||||
acc[policy]["action_counts"][action] += 1
|
||||
round_rewards[policy] += reward
|
||||
events.append({
|
||||
"round": rnd, "user_id": user_id, "persona": persona.name,
|
||||
"policy": policy, "tip_content": tip["content"],
|
||||
"priority": tip["features"]["priority"],
|
||||
"is_overdue": tip["features"]["is_overdue"],
|
||||
"action": action, "dwell_ms": dwell_ms, "reward": reward,
|
||||
"hour": hour, "day_of_week": dow,
|
||||
})
|
||||
action, dwell_ms, reward = judge(persona, tip, hour, dow, rng, use_llm=use_llm)
|
||||
_call_reward(client, ml_url, policy, p_user, tip_id, reward, {
|
||||
"hour_of_day": hour,
|
||||
"is_overdue": tip["features"]["is_overdue"],
|
||||
"task_age_days": tip["features"]["task_age_days"],
|
||||
"priority": tip["features"]["priority"],
|
||||
}, day_of_week=dow, profile_features=profile)
|
||||
|
||||
for p in policies:
|
||||
prev = acc[p]["cumulative_rewards"][-1] if acc[p]["cumulative_rewards"] else 0.0
|
||||
acc[p]["cumulative_rewards"].append(prev + round_rewards[p])
|
||||
acc[policy]["total_reward"] += reward
|
||||
acc[policy]["n_pulls"] += 1
|
||||
acc[policy]["action_counts"][action] += 1
|
||||
round_rewards[policy] += reward
|
||||
events.append({
|
||||
"round": rnd, "user_id": user_id, "persona": persona.name,
|
||||
"policy": policy, "tip_content": tip["content"],
|
||||
"priority": tip["features"]["priority"],
|
||||
"is_overdue": tip["features"]["is_overdue"],
|
||||
"action": action, "dwell_ms": dwell_ms, "reward": reward,
|
||||
"hour": hour, "day_of_week": dow,
|
||||
})
|
||||
|
||||
mode = "llm" if use_llm else "rule"
|
||||
print(f" Round {rnd+1:>3}/{n_rounds} [{mode}] " + " ".join(
|
||||
f"{p}={acc[p]['cumulative_rewards'][-1]:+.2f}" for p in policies
|
||||
))
|
||||
for p in policies:
|
||||
prev = acc[p]["cumulative_rewards"][-1] if acc[p]["cumulative_rewards"] else 0.0
|
||||
acc[p]["cumulative_rewards"].append(prev + round_rewards[p])
|
||||
|
||||
return _build_result(run_id, started_at, policies, acc, events,
|
||||
n_users, n_rounds, tasks_per_round, use_llm, seed)
|
||||
if mlflow_ctx:
|
||||
for p in policies:
|
||||
mlflow.log_metric(f"{p}_cumulative_reward",
|
||||
acc[p]["cumulative_rewards"][-1], step=rnd)
|
||||
|
||||
mode = "llm" if use_llm else "rule"
|
||||
print(f" Round {rnd+1:>3}/{n_rounds} [{mode}] " + " ".join(
|
||||
f"{p}={acc[p]['cumulative_rewards'][-1]:+.2f}" for p in policies
|
||||
))
|
||||
|
||||
result = _build_result(run_id, started_at, policies, acc, events,
|
||||
n_users, n_rounds, tasks_per_round, use_llm, seed)
|
||||
result["mlflow_run_id"] = mlflow_run_id
|
||||
|
||||
if mlflow_ctx:
|
||||
for p, s in result["summary"].items():
|
||||
mlflow.log_metrics({
|
||||
f"{p}_total_reward": s["total_reward"],
|
||||
f"{p}_mean_reward": s["mean_reward"],
|
||||
f"{p}_n_pulls": s["n_pulls"],
|
||||
})
|
||||
mlflow.set_tag("winner", result["winner"])
|
||||
|
||||
return result
|
||||
|
||||
finally:
|
||||
if mlflow_ctx:
|
||||
mlflow_ctx.__exit__(None, None, None)
|
||||
|
||||
|
||||
# ── Claude Code judge — phase 1: score ─────────────────────────────────────
|
||||
@@ -494,6 +556,9 @@ if __name__ == "__main__":
|
||||
help="Alias for --judge rule (backwards compat)")
|
||||
parser.add_argument("--seed", type=int, default=42)
|
||||
parser.add_argument("--out", default=None)
|
||||
parser.add_argument("--mlflow-url", default=os.environ.get("MLFLOW_TRACKING_URI"),
|
||||
help="MLflow tracking URI (e.g. http://mlflow:5000/mlflow)")
|
||||
parser.add_argument("--mlflow-experiment", default="bandit_simulation")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.no_llm:
|
||||
@@ -534,6 +599,7 @@ if __name__ == "__main__":
|
||||
n_users=args.n_users, n_rounds=args.n_rounds,
|
||||
tasks_per_round=args.tasks_per_round, ml_url=args.ml_url,
|
||||
policies=args.policies, use_llm=use_llm, seed=args.seed,
|
||||
mlflow_url=args.mlflow_url, mlflow_experiment=args.mlflow_experiment,
|
||||
)
|
||||
Path(out_path).write_text(json.dumps(result, indent=2))
|
||||
print()
|
||||
|
||||
124
ml/pipelines/sim_dag.py
Normal file
124
ml/pipelines/sim_dag.py
Normal file
@@ -0,0 +1,124 @@
|
||||
"""
|
||||
Airflow DAG: bandit_sim
|
||||
|
||||
Runs a bandit policy simulation and logs results to MLflow.
|
||||
Triggered on-demand from the oO admin panel or manually from the Airflow UI.
|
||||
|
||||
Required conf keys (passed via dag_run.conf):
|
||||
sim_run_id str — oO SQLite run ID for callback correlation
|
||||
n_users int — number of synthetic users
|
||||
n_rounds int — rounds per user
|
||||
tasks_per_round int — candidate pool size per round
|
||||
policies list — policy names to compare
|
||||
judge_mode str — "rule" | "llm"
|
||||
ml_url str — ml/serving URL (e.g. http://ml-serving:8000)
|
||||
mlflow_url str — MLflow tracking URI (e.g. http://mlflow:5000/mlflow)
|
||||
callback_url str — oO API callback endpoint
|
||||
internal_token str — x-internal-token header value
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.operators.python import PythonOperator
|
||||
|
||||
|
||||
def _run_sim(**context: object) -> dict:
|
||||
conf: dict = context["dag_run"].conf or {}
|
||||
|
||||
n_users = int(conf.get("n_users", 5))
|
||||
n_rounds = int(conf.get("n_rounds", 20))
|
||||
tasks_per_round = int(conf.get("tasks_per_round", 8))
|
||||
policies = list(conf.get("policies", ["linucb-v1", "egreedy-v1"]))
|
||||
judge_mode = str(conf.get("judge_mode", "rule"))
|
||||
ml_url = str(conf.get("ml_url", "http://ml-serving:8000"))
|
||||
mlflow_url = str(conf.get("mlflow_url", os.environ.get("MLFLOW_TRACKING_URI", "")))
|
||||
mlflow_experiment = "bandit_simulation"
|
||||
|
||||
sys.path.insert(0, "/opt/airflow/ml/experiments/sim")
|
||||
from runner import run_simulation # type: ignore[import]
|
||||
|
||||
use_llm = judge_mode == "llm"
|
||||
result = run_simulation(
|
||||
n_users=n_users,
|
||||
n_rounds=n_rounds,
|
||||
tasks_per_round=tasks_per_round,
|
||||
ml_url=ml_url,
|
||||
policies=policies,
|
||||
use_llm=use_llm,
|
||||
seed=42,
|
||||
mlflow_url=mlflow_url or None,
|
||||
mlflow_experiment=mlflow_experiment,
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def _callback(**context: object) -> None:
|
||||
import httpx
|
||||
|
||||
conf: dict = context["dag_run"].conf or {}
|
||||
callback_url: str = str(conf.get("callback_url", ""))
|
||||
internal_token: str = str(conf.get("internal_token", ""))
|
||||
|
||||
if not callback_url or not internal_token:
|
||||
print("No callback_url or internal_token — skipping result push.", flush=True)
|
||||
return
|
||||
|
||||
result: dict = context["ti"].xcom_pull(task_ids="run_sim")
|
||||
if not result:
|
||||
print("No result from run_sim task — callback skipped.", flush=True)
|
||||
return
|
||||
|
||||
payload = {
|
||||
"summary": result.get("summary", {}),
|
||||
"winner": result.get("winner", ""),
|
||||
"persona_breakdown": result.get("persona_breakdown", {}),
|
||||
"events": result.get("events", []),
|
||||
"mlflow_run_id": result.get("mlflow_run_id"),
|
||||
}
|
||||
|
||||
try:
|
||||
r = httpx.post(
|
||||
callback_url,
|
||||
json=payload,
|
||||
headers={"x-internal-token": internal_token},
|
||||
timeout=30.0,
|
||||
)
|
||||
r.raise_for_status()
|
||||
print(f"Callback OK: {r.status_code}", flush=True)
|
||||
except Exception as exc:
|
||||
print(f"Callback failed: {exc}", flush=True)
|
||||
raise
|
||||
|
||||
|
||||
with DAG(
|
||||
dag_id="bandit_sim",
|
||||
description="On-demand bandit policy simulation with MLflow tracking",
|
||||
schedule_interval=None,
|
||||
start_date=datetime(2025, 1, 1),
|
||||
catchup=False,
|
||||
tags=["bandit", "simulation", "ml"],
|
||||
default_args={
|
||||
"retries": 1,
|
||||
"retry_delay": timedelta(minutes=2),
|
||||
},
|
||||
) as dag:
|
||||
|
||||
run_sim = PythonOperator(
|
||||
task_id="run_sim",
|
||||
python_callable=_run_sim,
|
||||
provide_context=True,
|
||||
)
|
||||
|
||||
push_results = PythonOperator(
|
||||
task_id="push_results",
|
||||
python_callable=_callback,
|
||||
provide_context=True,
|
||||
)
|
||||
|
||||
run_sim >> push_results
|
||||
Reference in New Issue
Block a user