feat: ε-greedy v1 as active policy; dwell-time reward inference; offline sim framework

- Promote egreedy-v1 to active serving policy (ADR-0007): /score/egreedy + /reward/egreedy
  replaces linucb-v1 endpoints after offline sim shows +10.7% mean reward (−0.548 vs −0.606)
- Replace explicit helpful/not_helpful feedback with dwell-time inferred reward (inferReward):
  dismiss=−1.0, snooze=+0.1, done<15s=−0.3, done 15s–2min=+1.0, done 2–10min=+0.6, done>10min=+0.3
- Add ml/serving ε-greedy endpoints: /score/egreedy, /reward/egreedy, /stats/egreedy/{user_id}
  with d=7 feature vector (base 5 + sin/cos day-of-week encoding)
- Add offline simulation framework (ml/experiments/sim): rule/LLM/claude-code judges,
  two-phase score+reward, synthetic personas, task generator; results stored in sim_runs/sim_events
- Add /admin/simulations page: start runs, live-poll status, reward curve SVG, action/persona tables
- Fix egreedy day_of_week training skew: reward endpoint now uses actual dow instead of hardcoded 0
- Fix runner.py proxy bypass: httpx.Client(trust_env=False) for localhost ML calls
- Add dwellMs to TipFeedbackEvent contract and bus.test.ts fixture
- Schema: sim_runs, sim_events tables; tip_feedback gains dwell_ms, reward_milli columns
- ADR-0006: admin console framework; ADR-0007: egreedy-v1 policy selection rationale

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-16 07:44:37 +00:00
parent c5ea18ec6e
commit faf44c18fc
48 changed files with 6151 additions and 40 deletions

View File

@@ -0,0 +1,204 @@
"""
LLM-based user reaction judge.
Uses Claude Haiku when ANTHROPIC_API_KEY is set; falls back to a
deterministic persona-based rule when it is not.
"""
from __future__ import annotations
import os
import random
from personas import Persona
ACTIONS = ["done", "snooze", "dismiss"]
# Reward is NOT a fixed map anymore — it depends on action + simulated dwell time.
# Use infer_reward() to compute the final reward after simulating dwell.
_BASE_REWARDS: dict[str, float] = {
"done": 1.0, # placeholder; real reward computed from dwell
"snooze": 0.1,
"dismiss": -1.0,
}
def infer_reward(action: str, dwell_ms: int) -> float:
"""Mirror of production inferReward() in recommender.ts."""
if action == "dismiss":
return -1.0
if action == "snooze":
return 0.1
# done — dwell-based
if dwell_ms < 15_000:
return -0.3 # stale / reflex done
if dwell_ms < 120_000:
return 1.0 # magic zone
if dwell_ms < 600_000:
return 0.6 # good
return 0.3 # eventually done
_HOUR_PERIODS = {
(5, 10): "morning",
(10, 14): "midday",
(14, 18): "afternoon",
(18, 22): "evening",
}
def _period(hour: int) -> str:
for (lo, hi), name in _HOUR_PERIODS.items():
if lo <= hour < hi:
return name
return "night"
# ── Deterministic judge ────────────────────────────────────────────────────
def _engagement_score(persona: Persona, tip: dict, hour: int) -> float:
"""01 score of how well this tip fits this persona right now."""
features = tip.get("features", {})
priority = features.get("priority", 1)
is_overdue = features.get("is_overdue", False)
p = 0.35
priority_norm = (priority - 1) / 3.0
p += (priority_norm - 0.5) * persona.prefers_high_priority * 0.4
if is_overdue:
p += (persona.prefers_overdue - 0.5) * 0.3
is_morning = 5 <= hour < 10
is_evening = 18 <= hour < 22
if persona.morning_active and is_morning:
p += 0.15
elif persona.evening_active and is_evening:
p += 0.15
elif persona.morning_active and not is_morning and not is_evening:
p -= 0.10
elif persona.evening_active and not is_evening and not is_morning:
p -= 0.10
return max(0.05, min(0.90, p))
def _simulate_dwell_ms(engagement: float, rng: random.Random) -> int:
"""
Simulate how many milliseconds the user takes to act on a tip.
High engagement → quick action (magic zone, 15s2min).
Medium engagement → slower (210min).
Low engagement → very slow (>10min) — tip helped eventually but not 'magic'.
For snooze/dismiss the dwell doesn't affect reward; return a short value.
"""
if engagement >= 0.70:
# Strong match — magic zone: 15s90s
return rng.randint(15_000, 90_000)
elif engagement >= 0.50:
# Moderate match — good zone: 28min
return rng.randint(120_000, 480_000)
else:
# Weak match but still done — eventually: 1030min
return rng.randint(600_000, 1_800_000)
def _rule_judge(persona: Persona, tip: dict, hour: int, rng: random.Random) -> tuple[str, int]:
"""Return (action, dwell_ms) based on persona preferences and task features."""
engagement = _engagement_score(persona, tip, hour)
r = rng.random()
if r < engagement * 0.55:
# done — dwell depends on engagement
dwell = _simulate_dwell_ms(engagement, rng)
return "done", dwell
elif r < engagement:
return "snooze", rng.randint(3_000, 20_000)
else:
return "dismiss", rng.randint(1_000, 5_000)
# ── LLM judge ─────────────────────────────────────────────────────────────
_anthropic_client = None
def _get_client():
global _anthropic_client
if _anthropic_client is None:
try:
import anthropic # type: ignore
key = os.environ.get("ANTHROPIC_API_KEY", "")
if key:
_anthropic_client = anthropic.Anthropic(api_key=key)
except ImportError:
pass
return _anthropic_client
def _llm_judge(
persona: Persona, tip: dict, hour: int, day_of_week: int, rng: random.Random,
) -> tuple[str, int]:
client = _get_client()
if client is None:
return _rule_judge(persona, tip, hour, rng)
features = tip.get("features", {})
priority = features.get("priority", 1)
is_overdue = features.get("is_overdue", False)
age_days = features.get("task_age_days", 0)
priority_label = {1: "low", 2: "normal", 3: "high", 4: "urgent"}.get(priority, "normal")
overdue_str = f", overdue by {age_days:.0f} day(s)" if is_overdue else ""
days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
day_str = days[day_of_week % 7]
prompt = (
f"You are simulating how a specific user reacts to a task recommendation app.\n\n"
f"User persona: {persona.name}\n"
f"Persona: {persona.description}\n\n"
f'Recommended task: "{tip.get("content", "Unknown task")}"\n'
f"Task: priority={priority_label}{overdue_str}\n"
f"Current time: {_period(hour)} ({hour}:00, {day_str})\n\n"
f"How does this user react? Reply with exactly one word: done | snooze | dismiss\n\n"
f"- done: acts on this tip (marks task complete)\n"
f"- snooze: acknowledges but not now\n"
f"- dismiss: ignores or rejects it"
)
try:
message = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=10,
messages=[{"role": "user", "content": prompt}],
)
raw = message.content[0].text.strip().lower().split()[0]
action = raw if raw in ACTIONS else _rule_judge(persona, tip, hour, rng)[0]
except Exception:
action, _ = _rule_judge(persona, tip, hour, rng)
# Simulate dwell based on engagement level
engagement = _engagement_score(persona, tip, hour)
dwell = _simulate_dwell_ms(engagement, rng) if action == "done" else rng.randint(2_000, 15_000)
return action, dwell
# ── Public API ─────────────────────────────────────────────────────────────
def judge(
persona: Persona,
tip: dict,
hour: int,
day_of_week: int,
rng: random.Random,
use_llm: bool = True,
) -> tuple[str, int, float]:
"""Return (action, dwell_ms, reward).
action — 'done' | 'snooze' | 'dismiss'
dwell_ms — simulated milliseconds between tip appearance and user action
reward — inferred from action + dwell_ms via infer_reward()
"""
if use_llm and os.environ.get("ANTHROPIC_API_KEY"):
action, dwell_ms = _llm_judge(persona, tip, hour, day_of_week, rng)
else:
action, dwell_ms = _rule_judge(persona, tip, hour, rng)
return action, dwell_ms, infer_reward(action, dwell_ms)

View File

@@ -0,0 +1,79 @@
"""Synthetic user personas for simulation."""
from dataclasses import dataclass
@dataclass
class Persona:
name: str
description: str
# Feature preference weights — used by deterministic judge
prefers_high_priority: float # 01: scales response to priority
prefers_overdue: float # 01: scales response to overdue tasks
morning_active: bool # higher engagement hours 610
evening_active: bool # higher engagement hours 1822
recency_bias: float # 01: prefers recently-due tasks
PERSONAS: list[Persona] = [
Persona(
name="deadline-driven",
description=(
"Responds urgently to overdue and high-priority tasks. "
"Most active in the morning. Dismisses low-priority tips."
),
prefers_high_priority=0.9,
prefers_overdue=0.85,
morning_active=True,
evening_active=False,
recency_bias=0.3,
),
Persona(
name="evening-relaxed",
description=(
"Reviews tasks in the evenings. Neutral on priority. "
"Snoozes morning recommendations."
),
prefers_high_priority=0.5,
prefers_overdue=0.4,
morning_active=False,
evening_active=True,
recency_bias=0.5,
),
Persona(
name="low-priority-first",
description=(
"Clears small tasks first. Snoozes urgent items until deadline. "
"Morning person."
),
prefers_high_priority=0.2,
prefers_overdue=0.6,
morning_active=True,
evening_active=False,
recency_bias=0.7,
),
Persona(
name="consistent-responder",
description=(
"Engages consistently across hours and days. "
"Acts on helpful tips regardless of priority."
),
prefers_high_priority=0.6,
prefers_overdue=0.6,
morning_active=True,
evening_active=True,
recency_bias=0.5,
),
Persona(
name="overdue-ignorer",
description=(
"Avoids overdue tasks (stress avoidance). "
"Focuses on future-due, high-priority items. Evening person."
),
prefers_high_priority=0.8,
prefers_overdue=0.1,
morning_active=False,
evening_active=True,
recency_bias=0.2,
),
]

View File

@@ -0,0 +1,527 @@
"""
oO simulation runner — compares two recommendation policies.
Judge modes:
rule Deterministic persona-based rules (default, no external deps)
llm Claude Haiku via Anthropic API (requires ANTHROPIC_API_KEY)
claude-code Two-phase: Claude Code acts as the judge (you are the judge)
Usage — rule/llm (single pass):
python runner.py --n-users 5 --n-rounds 10 --no-llm
python runner.py --n-users 5 --n-rounds 10
Usage — claude-code judge (two phases):
# Phase 1: score candidates, write judgment requests
python runner.py --judge claude-code --phase score \\
--n-users 5 --n-rounds 10 --out /tmp/oo-cc-sim.json
# (Claude Code reads /tmp/oo-cc-sim-requests.json and writes /tmp/oo-cc-sim-responses.json)
# Phase 2: apply responses, run rewards, produce results
python runner.py --judge claude-code --phase reward --plan /tmp/oo-cc-sim-plan.json \\
--out /tmp/oo-cc-sim.json
"""
from __future__ import annotations
import argparse
import json
import random
import sys
import time
import uuid
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
import httpx
from llm_judge import ACTIONS, infer_reward, judge
from personas import PERSONAS, Persona
from task_generator import generate_task_pool
POLICY_SCORE_ENDPOINTS: dict[str, str] = {
"linucb-v1": "/score",
"egreedy-v1": "/score/egreedy",
}
POLICY_REWARD_ENDPOINTS: dict[str, str] = {
"linucb-v1": "/reward",
"egreedy-v1": "/reward/egreedy",
}
def _call_score(
client: httpx.Client, ml_url: str, policy: str,
user_id: str, tasks: list[dict], hour: int, dow: int,
) -> dict | None:
endpoint = POLICY_SCORE_ENDPOINTS.get(policy, "/score")
body = {
"user_id": user_id,
"candidates": [
{
"id": t["id"], "content": t["content"], "source": t["source"],
"source_id": None,
"features": {
"hour_of_day": hour,
"is_overdue": t["features"]["is_overdue"],
"task_age_days": t["features"]["task_age_days"],
"priority": t["features"]["priority"],
},
}
for t in tasks
],
"context": {"hour_of_day": hour, "day_of_week": dow},
}
try:
r = client.post(f"{ml_url}{endpoint}", json=body, timeout=5.0)
r.raise_for_status()
return r.json()
except Exception as e:
print(f" [warn] score {policy}: {e}", file=sys.stderr)
return None
def _call_reward(
client: httpx.Client, ml_url: str, policy: str,
user_id: str, tip_id: str, reward: float, features: dict,
day_of_week: int = 0,
) -> None:
endpoint = POLICY_REWARD_ENDPOINTS.get(policy, "/reward")
try:
client.post(
f"{ml_url}{endpoint}",
json={"user_id": user_id, "tip_id": tip_id, "reward": reward,
"features": features, "day_of_week": day_of_week},
timeout=5.0,
)
except Exception as e:
print(f" [warn] reward {policy}: {e}", file=sys.stderr)
# ── Standard single-pass runner (rule / llm modes) ─────────────────────────
def run_simulation(
n_users: int, n_rounds: int, tasks_per_round: int,
ml_url: str, policies: list[str], use_llm: bool, seed: int,
) -> dict:
rng = random.Random(seed)
run_id = str(uuid.uuid4())[:8]
started_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
user_personas = [
(f"sim-{run_id}-u{i}", PERSONAS[i % len(PERSONAS)])
for i in range(n_users)
]
acc: dict[str, dict] = {
p: {
"total_reward": 0.0, "n_pulls": 0,
"cumulative_rewards": [],
"action_counts": {a: 0 for a in ACTIONS},
}
for p in policies
}
events: list[dict] = []
with httpx.Client(trust_env=False) as client:
for rnd in range(n_rounds):
hour = rng.randint(6, 22)
dow = rng.randint(0, 6)
round_rewards = {p: 0.0 for p in policies}
for user_id, persona in user_personas:
seed_tasks = rnd * 997 + abs(hash(user_id)) % 997
tasks = generate_task_pool(n=tasks_per_round, seed=seed_tasks)
for policy in policies:
p_user = f"{user_id}-{policy}"
scored = _call_score(client, ml_url, policy, p_user, tasks, hour, dow)
if not scored:
continue
tip_id = scored.get("tip_id")
tip = next((t for t in tasks if t["id"] == tip_id), None)
if not tip:
continue
action, dwell_ms, reward = judge(persona, tip, hour, dow, rng, use_llm=use_llm)
_call_reward(client, ml_url, policy, p_user, tip_id, reward, {
"hour_of_day": hour,
"is_overdue": tip["features"]["is_overdue"],
"task_age_days": tip["features"]["task_age_days"],
"priority": tip["features"]["priority"],
}, day_of_week=dow)
acc[policy]["total_reward"] += reward
acc[policy]["n_pulls"] += 1
acc[policy]["action_counts"][action] += 1
round_rewards[policy] += reward
events.append({
"round": rnd, "user_id": user_id, "persona": persona.name,
"policy": policy, "tip_content": tip["content"],
"priority": tip["features"]["priority"],
"is_overdue": tip["features"]["is_overdue"],
"action": action, "dwell_ms": dwell_ms, "reward": reward,
"hour": hour, "day_of_week": dow,
})
for p in policies:
prev = acc[p]["cumulative_rewards"][-1] if acc[p]["cumulative_rewards"] else 0.0
acc[p]["cumulative_rewards"].append(prev + round_rewards[p])
mode = "llm" if use_llm else "rule"
print(f" Round {rnd+1:>3}/{n_rounds} [{mode}] " + " ".join(
f"{p}={acc[p]['cumulative_rewards'][-1]:+.2f}" for p in policies
))
return _build_result(run_id, started_at, policies, acc, events,
n_users, n_rounds, tasks_per_round, use_llm, seed)
# ── Claude Code judge — phase 1: score ─────────────────────────────────────
def run_score_phase(
n_users: int, n_rounds: int, tasks_per_round: int,
ml_url: str, policies: list[str], seed: int, out_path: str,
) -> None:
"""Score all candidates and write judgment requests for Claude Code."""
rng = random.Random(seed)
run_id = str(uuid.uuid4())[:8]
started_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
user_personas = [
(f"sim-{run_id}-u{i}", PERSONAS[i % len(PERSONAS)])
for i in range(n_users)
]
plan_rounds: list[dict] = []
judgment_requests: list[dict] = []
print(f"[Phase 1] Scoring {n_rounds} rounds × {n_users} users × {len(policies)} policies…")
with httpx.Client(trust_env=False) as client:
for rnd in range(n_rounds):
hour = rng.randint(6, 22)
dow = rng.randint(0, 6)
round_sessions: list[dict] = []
for user_id, persona in user_personas:
seed_tasks = rnd * 997 + abs(hash(user_id)) % 997
tasks = generate_task_pool(n=tasks_per_round, seed=seed_tasks)
for policy in policies:
p_user = f"{user_id}-{policy}"
scored = _call_score(client, ml_url, policy, p_user, tasks, hour, dow)
if not scored:
continue
tip_id = scored.get("tip_id")
tip = next((t for t in tasks if t["id"] == tip_id), None)
if not tip:
continue
req_id = f"r{rnd}_{user_id.split('-')[-1]}_{policy}"
round_sessions.append({
"req_id": req_id,
"p_user": p_user,
"policy": policy,
"user_id": user_id,
"persona_name": persona.name,
"tip_id": tip_id,
"tip_features": tip["features"],
"tip_content": tip["content"],
"ml_score": scored.get("score"),
})
judgment_requests.append({
"id": req_id,
"round": rnd,
"hour": hour,
"day_of_week": dow,
"policy": policy,
"persona_name": persona.name,
"persona_description": persona.description,
"tip_content": tip["content"],
"priority": tip["features"]["priority"],
"is_overdue": tip["features"]["is_overdue"],
"age_days": tip["features"]["task_age_days"],
"ml_score": scored.get("score"),
})
plan_rounds.append({
"round": rnd, "hour": hour, "dow": dow,
"sessions": round_sessions,
})
print(f" Round {rnd+1:>3}/{n_rounds}: {len(round_sessions)} sessions scored")
plan = {
"run_id": run_id,
"started_at": started_at,
"config": {
"n_users": n_users, "n_rounds": n_rounds,
"tasks_per_round": tasks_per_round, "policies": policies,
"use_llm": False, "seed": seed,
},
"user_personas": [
{"user_id": uid, "persona_name": p.name, "persona_description": p.description}
for uid, p in user_personas
],
"rounds": plan_rounds,
}
base = out_path.replace(".json", "")
plan_path = f"{base}-plan.json"
requests_path = f"{base}-requests.json"
responses_path = f"{base}-responses.json"
Path(plan_path).write_text(json.dumps(plan, indent=2))
Path(requests_path).write_text(json.dumps(judgment_requests, indent=2))
print()
print("=" * 60)
print(f"Phase 1 complete — {len(judgment_requests)} judgment requests.")
print()
print(f" Requests : {requests_path}")
print(f" Plan : {plan_path}")
print()
print('Claude Code: read the requests file, judge each tip for the persona,')
print(f'then write your responses to: {responses_path}')
print()
print('Response format: { "<id>": "<action>" | { "action": "<action>", "dwell_ms": <int> } }')
print('Valid actions: done | snooze | dismiss')
print()
print('For "done", optionally specify dwell_ms (ms between tip appearing and user acting):')
print(' { "r0_u0_linucb-v1": { "action": "done", "dwell_ms": 45000 } } # magic zone')
print(' { "r0_u0_linucb-v1": "snooze" } # plain string also ok (uses default 60s dwell for done)')
print()
print('Reward is inferred from action + dwell_ms:')
print(' dismiss → -1.0')
print(' snooze → 0.1')
print(' done < 15s → -0.3 (stale task)')
print(' done 15s2min → 1.0 (magic!)')
print(' done 210min → 0.6 (good)')
print(' done > 10min → 0.3 (eventually)')
print()
print('Then run Phase 2:')
print(f' python runner.py --judge claude-code --phase reward \\')
print(f' --plan {plan_path} --out {out_path}')
# ── Claude Code judge — phase 2: reward ────────────────────────────────────
def run_reward_phase(plan_path: str, out_path: str, ml_url: str) -> dict:
"""Apply Claude Code judgments, send reward signals, compute metrics."""
plan = json.loads(Path(plan_path).read_text())
base = plan_path.replace("-plan.json", "")
responses_path = f"{base}-responses.json"
if not Path(responses_path).exists():
print(f"ERROR: responses file not found: {responses_path}", file=sys.stderr)
sys.exit(1)
raw_responses = json.loads(Path(responses_path).read_text())
# Responses can be either { id: "action" } or { id: { action, dwell_ms } }
def _parse_response(v) -> tuple[str, int]:
if isinstance(v, dict):
return v["action"], int(v.get("dwell_ms", 60_000))
return str(v), 60_000 # plain string → assume 60s dwell for "done"
responses: dict[str, tuple[str, int]] = {k: _parse_response(v) for k, v in raw_responses.items()}
invalid = {k: v[0] for k, v in responses.items() if v[0] not in ACTIONS}
if invalid:
print(f"ERROR: invalid actions in responses: {invalid}", file=sys.stderr)
sys.exit(1)
policies: list[str] = plan["config"]["policies"]
acc: dict[str, dict] = {
p: {
"total_reward": 0.0, "n_pulls": 0,
"cumulative_rewards": [],
"action_counts": {a: 0 for a in ACTIONS},
}
for p in policies
}
events: list[dict] = []
persona_map = {u["user_id"]: u["persona_name"] for u in plan["user_personas"]}
missing_responses = 0
print(f"[Phase 2] Applying {len(responses)} judgments → reward calls…")
with httpx.Client(trust_env=False) as client:
for rnd_data in plan["rounds"]:
rnd = rnd_data["round"]
round_rewards = {p: 0.0 for p in policies}
for session in rnd_data["sessions"]:
req_id = session["req_id"]
resp = responses.get(req_id)
if not resp:
print(f" [warn] no response for {req_id}, defaulting to snooze")
action, dwell_ms = "snooze", 10_000
missing_responses += 1
else:
action, dwell_ms = resp
reward = infer_reward(action, dwell_ms)
_call_reward(
client, ml_url, session["policy"], session["p_user"],
session["tip_id"], reward,
{"hour_of_day": rnd_data["hour"], **session["tip_features"]},
day_of_week=rnd_data["dow"],
)
p = session["policy"]
acc[p]["total_reward"] += reward
acc[p]["n_pulls"] += 1
acc[p]["action_counts"][action] += 1
round_rewards[p] += reward
events.append({
"round": rnd,
"user_id": session["user_id"],
"persona": persona_map.get(session["user_id"], "?"),
"policy": p,
"tip_content": session["tip_content"],
"priority": session["tip_features"]["priority"],
"is_overdue": session["tip_features"]["is_overdue"],
"action": action,
"dwell_ms": dwell_ms,
"reward": reward,
"hour": rnd_data["hour"],
"day_of_week": rnd_data["dow"],
})
for p in policies:
prev = acc[p]["cumulative_rewards"][-1] if acc[p]["cumulative_rewards"] else 0.0
acc[p]["cumulative_rewards"].append(prev + round_rewards[p])
print(f" Round {rnd+1:>3}/{plan['config']['n_rounds']} [cc] " + " ".join(
f"{p}={acc[p]['cumulative_rewards'][-1]:+.2f}" for p in policies
))
if missing_responses:
print(f" [warn] {missing_responses} requests had no response (defaulted to snooze)")
cfg = plan["config"]
result = _build_result(
plan["run_id"], plan["started_at"], policies, acc, events,
cfg["n_users"], cfg["n_rounds"], cfg["tasks_per_round"],
use_llm=False, seed=cfg["seed"],
)
result["judge_mode"] = "claude-code"
Path(out_path).write_text(json.dumps(result, indent=2))
return result
# ── Shared result builder ───────────────────────────────────────────────────
def _build_result(
run_id: str, started_at: str, policies: list[str],
acc: dict, events: list[dict],
n_users: int, n_rounds: int, tasks_per_round: int,
use_llm: bool, seed: int,
) -> dict:
summary = {
p: {
"total_reward": acc[p]["total_reward"],
"mean_reward": (
acc[p]["total_reward"] / acc[p]["n_pulls"]
if acc[p]["n_pulls"] > 0 else 0.0
),
"n_pulls": acc[p]["n_pulls"],
"cumulative_rewards": acc[p]["cumulative_rewards"],
"action_counts": acc[p]["action_counts"],
}
for p in policies
}
winner = max(policies, key=lambda p: summary[p]["total_reward"])
persona_breakdown: dict[str, dict] = {}
for ev in events:
pname = ev["persona"]
pol = ev["policy"]
persona_breakdown.setdefault(pname, {}).setdefault(pol, {"reward": 0.0, "n": 0})
persona_breakdown[pname][pol]["reward"] += ev["reward"]
persona_breakdown[pname][pol]["n"] += 1
return {
"run_id": run_id,
"started_at": started_at,
"finished_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"config": {
"n_users": n_users, "n_rounds": n_rounds,
"tasks_per_round": tasks_per_round, "policies": policies,
"use_llm": use_llm, "seed": seed,
},
"summary": summary,
"winner": winner,
"persona_breakdown": persona_breakdown,
"events": events,
}
# ── CLI ─────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="oO simulation runner")
parser.add_argument("--judge", choices=["rule", "llm", "claude-code"], default="rule")
parser.add_argument("--phase", choices=["score", "reward"], default=None,
help="For --judge claude-code only")
parser.add_argument("--plan", default=None,
help="Plan file path (for --judge claude-code --phase reward)")
parser.add_argument("--n-users", type=int, default=5)
parser.add_argument("--n-rounds", type=int, default=20)
parser.add_argument("--tasks-per-round", type=int, default=8)
parser.add_argument("--ml-url", default="http://localhost:5001")
parser.add_argument("--policies", nargs="+", default=["linucb-v1", "egreedy-v1"])
parser.add_argument("--no-llm", action="store_true",
help="Alias for --judge rule (backwards compat)")
parser.add_argument("--seed", type=int, default=42)
parser.add_argument("--out", default=None)
args = parser.parse_args()
if args.no_llm:
args.judge = "rule"
out_path = args.out or f"/tmp/oo-sim-{int(time.time())}.json"
if args.judge == "claude-code":
if args.phase == "score":
run_score_phase(
n_users=args.n_users, n_rounds=args.n_rounds,
tasks_per_round=args.tasks_per_round, ml_url=args.ml_url,
policies=args.policies, seed=args.seed, out_path=out_path,
)
elif args.phase == "reward":
if not args.plan:
print("ERROR: --plan is required for --phase reward", file=sys.stderr)
sys.exit(1)
result = run_reward_phase(args.plan, out_path, args.ml_url)
print()
print(f"Winner : {result['winner']}")
for p, s in result["summary"].items():
print(f" {p:20s} total={s['total_reward']:+.2f} mean={s['mean_reward']:+.4f} pulls={s['n_pulls']}")
print(f"Results: {out_path}")
else:
print("ERROR: --judge claude-code requires --phase score or --phase reward",
file=sys.stderr)
sys.exit(1)
else:
use_llm = (args.judge == "llm")
print(f"oO simulation: {args.n_users} users × {args.n_rounds} rounds")
print(f"Policies : {args.policies}")
print(f"ML URL : {args.ml_url}")
print(f"Judge : {args.judge}")
print()
result = run_simulation(
n_users=args.n_users, n_rounds=args.n_rounds,
tasks_per_round=args.tasks_per_round, ml_url=args.ml_url,
policies=args.policies, use_llm=use_llm, seed=args.seed,
)
Path(out_path).write_text(json.dumps(result, indent=2))
print()
print(f"Winner : {result['winner']}")
for p, s in result["summary"].items():
print(f" {p:20s} total={s['total_reward']:+.2f} mean={s['mean_reward']:+.4f} pulls={s['n_pulls']}")
print(f"Results: {out_path}")

View File

@@ -0,0 +1,62 @@
"""Generate synthetic task pools for simulation."""
from __future__ import annotations
import random
_TEMPLATES = [
"Send weekly report to team",
"Review pull request #{n}",
"Schedule meeting with {name}",
"Update project documentation",
"Fix bug in authentication module",
"Prepare presentation for stakeholders",
"Call back {name}",
"Submit expense report",
"Review quarterly goals",
"Clean up inbox",
"Follow up on proposal to {name}",
"Complete onboarding checklist",
"Write tests for feature #{n}",
"Deploy hotfix to production",
"Respond to support ticket #{n}",
"Draft release notes",
"Update dependencies",
"Review design mockups",
"Archive old tickets",
"Check in with {name}",
]
_NAMES = ["Alice", "Bob", "Carol", "David", "Eve", "Frank", "Grace"]
def generate_task_pool(n: int = 10, seed: int | None = None) -> list[dict]:
"""Return n synthetic tasks with randomly sampled features."""
rng = random.Random(seed)
tasks = []
for i in range(n):
priority = rng.choices([1, 2, 3, 4], weights=[0.3, 0.3, 0.25, 0.15])[0]
# age_days: most tasks fresh, a few stale
age_days = rng.choices(
[0.0, 0.5, 1.0, 3.0, 7.0, 14.0],
weights=[0.35, 0.20, 0.20, 0.12, 0.08, 0.05],
)[0] + rng.random() * 0.5
# is_overdue only meaningful when age > 0
is_overdue = age_days > 0.5 and rng.random() < 0.65
template = rng.choice(_TEMPLATES)
content = template.format(n=rng.randint(100, 999), name=rng.choice(_NAMES))
tasks.append({
"id": f"sim:{i}",
"content": content,
"source": "sim",
"features": {
"is_overdue": is_overdue,
"task_age_days": age_days if is_overdue else 0.0,
"priority": priority,
},
})
return tasks

View File

@@ -35,8 +35,10 @@ app = FastAPI(title="oO ML Serving", version="1.0.0")
STATE_DIR = Path(os.getenv("STATE_DIR", "/tmp/oo-bandit-state"))
STATE_DIR.mkdir(parents=True, exist_ok=True)
ALPHA = 1.0 # exploration coefficient
D = 5 # feature dimension
ALPHA = 1.0 # LinUCB exploration coefficient
D = 5 # LinUCB feature dimension
D7 = 7 # ε-greedy feature dimension (adds day-of-week cyclical encoding)
EPSILON = 0.1 # ε-greedy exploration rate
FEATURE_HISTORY_SIZE = 100 # per-user ring buffer
@@ -63,6 +65,8 @@ def build_feature_vector(features: dict) -> np.ndarray:
# ── Per-user bandit state (disjoint LinUCB, global arm) ───────────────────
# ── LinUCB state helpers ───────────────────────────────────────────────────
def state_path(user_id: str) -> Path:
safe = "".join(c if c.isalnum() else "_" for c in user_id)
return STATE_DIR / f"{safe}.json"
@@ -85,6 +89,37 @@ def save_state(user_id: str, A: np.ndarray, b: np.ndarray, meta: dict) -> None:
p.write_text(json.dumps({"A": A.tolist(), "b": b.tolist(), "meta": meta}))
# ── ε-greedy state helpers (d=7, extended features) ───────────────────────
def build_feature_vector_7(features: dict, day_of_week: int = 0) -> np.ndarray:
"""d=7: base 5 features + day-of-week cyclical encoding."""
base = build_feature_vector(features)
dow_sin = math.sin(2 * math.pi * day_of_week / 7)
dow_cos = math.cos(2 * math.pi * day_of_week / 7)
return np.append(base, [dow_sin, dow_cos])
def state7_path(user_id: str) -> Path:
safe = "".join(c if c.isalnum() else "_" for c in user_id)
return STATE_DIR / f"{safe}_egreedy.json"
def load_state7(user_id: str) -> tuple[np.ndarray, np.ndarray, dict]:
"""Returns (A, b, meta) for ε-greedy d=7 policy."""
p = state7_path(user_id)
if p.exists():
raw = json.loads(p.read_text())
A = np.array(raw["A"], dtype=np.float64)
b = np.array(raw["b"], dtype=np.float64)
return A, b, raw.get("meta", {})
return np.identity(D7, dtype=np.float64), np.zeros(D7, dtype=np.float64), {}
def save_state7(user_id: str, A: np.ndarray, b: np.ndarray, meta: dict) -> None:
p = state7_path(user_id)
p.write_text(json.dumps({"A": A.tolist(), "b": b.tolist(), "meta": meta}))
# ── API models ─────────────────────────────────────────────────────────────
class CandidateFeatures(BaseModel):
@@ -124,6 +159,7 @@ class RewardRequest(BaseModel):
tip_id: str
reward: float # +1 done, +0.5 helpful, 0 snooze, -0.5 not_helpful, -1 dismiss
features: CandidateFeatures
day_of_week: int = 0 # included so egreedy can train dow features correctly
class RewardResponse(BaseModel):
@@ -209,12 +245,131 @@ def reward(req: RewardRequest) -> RewardResponse:
return RewardResponse(ok=True)
@app.post("/score/egreedy", response_model=ScoreResponse)
def score_egreedy(req: ScoreRequest) -> ScoreResponse:
"""ε-greedy policy with d=7 features (adds day-of-week encoding).
Exploration: pick uniformly at random with probability ε.
Exploitation: pick argmax of linear payoff estimate θ·x.
Differs from LinUCB in: no UCB bonus, richer feature space.
"""
if not req.candidates:
raise HTTPException(status_code=422, detail="No candidates")
A, b, meta = load_state7(req.user_id)
try:
A_inv = np.linalg.inv(A)
except np.linalg.LinAlgError:
A_inv = np.identity(D7, dtype=np.float64)
theta = A_inv @ b
dow = req.context.day_of_week
exploring = np.random.random() < EPSILON
if exploring:
chosen = req.candidates[np.random.randint(len(req.candidates))]
feat_dict = {
"hour_of_day": req.context.hour_of_day,
"is_overdue": chosen.features.is_overdue,
"task_age_days": chosen.features.task_age_days,
"priority": chosen.features.priority,
}
x = build_feature_vector_7(feat_dict, dow)
best_score = float(theta @ x)
best_id = chosen.id
else:
best_id = None
best_score = -float("inf")
feat_dict = {}
for candidate in req.candidates:
fd = {
"hour_of_day": req.context.hour_of_day,
"is_overdue": candidate.features.is_overdue,
"task_age_days": candidate.features.task_age_days,
"priority": candidate.features.priority,
}
x = build_feature_vector_7(fd, dow)
s = float(theta @ x)
if s > best_score:
best_score = s
best_id = candidate.id
feat_dict = fd
history = get_feature_history(req.user_id)
history.append({
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"features": {**feat_dict, "day_of_week": dow, "exploring": exploring},
"score": best_score,
"tip_id": best_id,
"policy": "egreedy-v1",
})
meta["pulls"] = meta.get("pulls", 0) + 1
meta["explore_count"] = meta.get("explore_count", 0) + int(exploring)
meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
save_state7(req.user_id, A, b, meta)
return ScoreResponse(tip_id=best_id, score=best_score, policy="egreedy-v1")
@app.post("/reward/egreedy", response_model=RewardResponse)
def reward_egreedy(req: RewardRequest) -> RewardResponse:
"""Update ε-greedy ridge estimator with observed reward."""
A, b, meta = load_state7(req.user_id)
feat_dict = {
"hour_of_day": req.features.hour_of_day,
"is_overdue": req.features.is_overdue,
"task_age_days": req.features.task_age_days,
"priority": req.features.priority,
}
x = build_feature_vector_7(feat_dict, day_of_week=req.day_of_week)
A += np.outer(x, x)
b += req.reward * x
meta["cumulative_reward"] = meta.get("cumulative_reward", 0.0) + req.reward
meta["reward_count"] = meta.get("reward_count", 0) + 1
meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
save_state7(req.user_id, A, b, meta)
return RewardResponse(ok=True)
@app.get("/stats/egreedy/{user_id}")
def stats_egreedy(user_id: str):
"""ε-greedy policy stats — pulls, cumulative reward, θ vector."""
A, b, meta = load_state7(user_id)
try:
theta = (np.linalg.inv(A) @ b).tolist()
except np.linalg.LinAlgError:
theta = [0.0] * D7
pulls = meta.get("pulls", 0)
cumulative_reward = meta.get("cumulative_reward", 0.0)
reward_count = meta.get("reward_count", 0)
explore_count = meta.get("explore_count", 0)
return {
"user_id": user_id,
"policy": "egreedy-v1",
"pulls": pulls,
"reward_count": reward_count,
"cumulative_reward": cumulative_reward,
"estimated_mean_reward": cumulative_reward / reward_count if reward_count > 0 else 0.0,
"exploration_rate": explore_count / pulls if pulls > 0 else 0.0,
"theta": theta,
"feature_labels": ["hour_sin", "hour_cos", "is_overdue", "task_age", "priority", "dow_sin", "dow_cos"],
"last_updated": meta.get("last_updated"),
}
@app.post("/reset/{user_id}", response_model=RewardResponse)
def reset(user_id: str) -> RewardResponse:
"""Reset per-user bandit state (admin action)."""
p = state_path(user_id)
if p.exists():
p.unlink()
p7 = state7_path(user_id)
if p7.exists():
p7.unlink()
if user_id in _feature_history:
_feature_history[user_id].clear()
return RewardResponse(ok=True)

View File

@@ -4,6 +4,7 @@
"private": true,
"scripts": {
"dev": ".venv/bin/uvicorn main:app --reload --port 8000",
"start": ".venv/bin/uvicorn main:app --port 8000"
"start": ".venv/bin/uvicorn main:app --port 8000",
"test": ".venv/bin/python -m pytest tests/ -v"
}
}

View File

@@ -0,0 +1,4 @@
-r requirements.txt
pytest==8.3.5
pytest-asyncio==0.24.0
httpx==0.28.1

View File

@@ -2,3 +2,5 @@ fastapi==0.115.6
uvicorn[standard]==0.32.1
pydantic==2.10.4
numpy>=1.26.0
httpx>=0.27.0
anthropic>=0.40.0

View File

View File

@@ -0,0 +1,261 @@
"""
Unit tests for ml/serving — feature building and scoring contract.
Run with: pytest ml/serving/tests/
"""
import math
import pytest
from httpx import AsyncClient, ASGITransport
from main import app, build_feature_vector
class TestFeatureVector:
def test_shape(self):
v = build_feature_vector({"hour_of_day": 8, "is_overdue": True, "task_age_days": 3, "priority": 3})
assert v.shape == (5,)
def test_hour_encoding_noon(self):
v = build_feature_vector({"hour_of_day": 12})
# sin(2π * 12/24) = sin(π) ≈ 0
assert abs(v[0]) < 1e-10
# cos(2π * 12/24) = cos(π) = -1
assert abs(v[1] - (-1.0)) < 1e-10
def test_hour_encoding_midnight(self):
v = build_feature_vector({"hour_of_day": 0})
# sin(0) = 0
assert abs(v[0]) < 1e-10
# cos(0) = 1
assert abs(v[1] - 1.0) < 1e-10
def test_hour_encoding_6am(self):
v = build_feature_vector({"hour_of_day": 6})
# sin(2π * 6/24) = sin(π/2) = 1
assert abs(v[0] - 1.0) < 1e-10
# cos(π/2) = 0
assert abs(v[1]) < 1e-10
def test_age_clipped_at_30(self):
v_long = build_feature_vector({"task_age_days": 100})
v_cap = build_feature_vector({"task_age_days": 30})
assert v_long[3] == v_cap[3] == 1.0
def test_age_zero(self):
v = build_feature_vector({"task_age_days": 0})
assert v[3] == pytest.approx(0.0)
def test_age_15_days_normalised(self):
v = build_feature_vector({"task_age_days": 15})
assert v[3] == pytest.approx(0.5)
def test_priority_normalised(self):
v1 = build_feature_vector({"priority": 1})
v4 = build_feature_vector({"priority": 4})
assert v1[4] == pytest.approx(0.0)
assert v4[4] == pytest.approx(1.0)
def test_priority_2_and_3(self):
v2 = build_feature_vector({"priority": 2})
v3 = build_feature_vector({"priority": 3})
assert v2[4] == pytest.approx(1 / 3)
assert v3[4] == pytest.approx(2 / 3)
def test_is_overdue_true(self):
v = build_feature_vector({"is_overdue": True})
assert v[2] == 1.0
def test_is_overdue_false(self):
v = build_feature_vector({"is_overdue": False})
assert v[2] == 0.0
def test_defaults_when_no_keys(self):
v = build_feature_vector({})
# hour=12 → sin(π)≈0, cos(π)=-1
assert abs(v[0]) < 1e-10
assert abs(v[1] - (-1.0)) < 1e-10
assert v[2] == 0.0 # is_overdue=False
assert v[3] == 0.0 # task_age_days=0
assert v[4] == 0.0 # priority=1 → (1-1)/3=0
@pytest.mark.asyncio
async def test_health():
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.get("/health")
assert r.status_code == 200
assert r.json()["ok"] is True
@pytest.mark.asyncio
async def test_score_returns_a_candidate():
payload = {
"user_id": "test-user",
"candidates": [
{"id": "t:1", "content": "Task A", "source": "todoist", "source_id": "1",
"features": {"is_overdue": True, "task_age_days": 2, "priority": 3}},
{"id": "t:2", "content": "Task B", "source": "todoist", "source_id": "2",
"features": {"is_overdue": False, "task_age_days": 0, "priority": 1}},
],
"context": {"hour_of_day": 9, "day_of_week": 1},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.post("/score", json=payload)
assert r.status_code == 200
body = r.json()
assert body["tip_id"] in {"t:1", "t:2"}
assert "policy" in body
assert body["policy"] == "linucb-v1"
assert isinstance(body["score"], float)
@pytest.mark.asyncio
async def test_score_single_candidate_always_selected():
"""With a single candidate there is no choice — it must be returned."""
payload = {
"user_id": "solo-user",
"candidates": [
{"id": "only:1", "content": "Only task", "source": "todoist",
"features": {"is_overdue": False, "task_age_days": 0, "priority": 1}},
],
"context": {"hour_of_day": 10, "day_of_week": 0},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.post("/score", json=payload)
assert r.status_code == 200
assert r.json()["tip_id"] == "only:1"
@pytest.mark.asyncio
async def test_score_empty_candidates_returns_422():
payload = {"user_id": "u", "candidates": [], "context": {"hour_of_day": 9, "day_of_week": 1}}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.post("/score", json=payload)
assert r.status_code == 422
@pytest.mark.asyncio
async def test_reward_accepted():
payload = {
"user_id": "reward-user",
"tip_id": "t:1",
"reward": 1.0,
"features": {"hour_of_day": 9, "is_overdue": True, "task_age_days": 2, "priority": 3},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.post("/reward", json=payload)
assert r.status_code == 200
assert r.json()["ok"] is True
@pytest.mark.asyncio
async def test_reward_updates_stats():
"""Posting a reward should increase cumulative_reward in /stats."""
user_id = "reward-stats-user"
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r0 = await client.get(f"/stats/{user_id}")
before = r0.json()["cumulative_reward"]
await client.post("/reward", json={
"user_id": user_id,
"tip_id": "tip:x",
"reward": 1.0,
"features": {"hour_of_day": 8, "is_overdue": False, "task_age_days": 0, "priority": 2},
})
r1 = await client.get(f"/stats/{user_id}")
assert r1.json()["cumulative_reward"] == pytest.approx(before + 1.0)
@pytest.mark.asyncio
async def test_score_increments_pulls():
user_id = "pull-counter-user"
payload = {
"user_id": user_id,
"candidates": [
{"id": "t:p1", "content": "Pull task", "source": "todoist",
"features": {"is_overdue": False, "task_age_days": 1, "priority": 2}},
],
"context": {"hour_of_day": 10, "day_of_week": 2},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r0 = await client.get(f"/stats/{user_id}")
pulls_before = r0.json()["pulls"]
await client.post("/score", json=payload)
await client.post("/score", json=payload)
r1 = await client.get(f"/stats/{user_id}")
assert r1.json()["pulls"] == pulls_before + 2
@pytest.mark.asyncio
async def test_reset_clears_state():
user_id = "reset-user"
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
# Score once to build state
await client.post("/score", json={
"user_id": user_id,
"candidates": [
{"id": "t:r", "content": "Reset task", "source": "todoist",
"features": {"is_overdue": True, "task_age_days": 5, "priority": 4}},
],
"context": {"hour_of_day": 14, "day_of_week": 3},
})
r_reset = await client.post(f"/reset/{user_id}")
assert r_reset.json()["ok"] is True
r_stats = await client.get(f"/stats/{user_id}")
assert r_stats.json()["pulls"] == 0
@pytest.mark.asyncio
async def test_features_endpoint_returns_history():
user_id = "features-user"
payload = {
"user_id": user_id,
"candidates": [
{"id": "t:f1", "content": "Feature task", "source": "todoist",
"features": {"is_overdue": False, "task_age_days": 0, "priority": 1}},
],
"context": {"hour_of_day": 7, "day_of_week": 0},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
await client.post("/score", json=payload)
r = await client.get(f"/features/{user_id}")
body = r.json()
assert r.status_code == 200
assert "history" in body
assert len(body["history"]) >= 1
entry = body["history"][-1]
assert "ts" in entry
assert "score" in entry
assert "tip_id" in entry
@pytest.mark.asyncio
async def test_stats_for_fresh_user():
"""A user with no history should return zero/default stats without error."""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.get("/stats/brand-new-user-xyz-abc")
body = r.json()
assert r.status_code == 200
assert body["pulls"] == 0
assert body["cumulative_reward"] == 0.0
assert body["estimated_mean_reward"] == 0.0
@pytest.mark.asyncio
async def test_reward_negative_value():
"""Dismissing a tip should decrease cumulative_reward."""
user_id = "dismiss-user-neg"
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r0 = await client.get(f"/stats/{user_id}")
before = r0.json()["cumulative_reward"]
await client.post("/reward", json={
"user_id": user_id,
"tip_id": "t:neg",
"reward": -1.0,
"features": {"hour_of_day": 20, "is_overdue": False, "task_age_days": 0, "priority": 1},
})
r1 = await client.get(f"/stats/{user_id}")
assert r1.json()["cumulative_reward"] == pytest.approx(before - 1.0)