feat(ml): egreedy-v2 shadow policy — D=12 with profile features (#99)

Ship the scaffolding for #99 (phase B.3 of #81):

- ml/serving: add /score/egreedy/v2, /reward/egreedy/v2, /stats/egreedy/v2
  endpoints (D=12). New feature dims: completion/dismiss rates, mean dwell
  (clipped 10min), preferred-hour alignment (cosine, 1-dim), tip volume (log).
  Separate state file per user (_egreedy_v2.json). /reset clears v2 state too.
- ADR-0012: documents D=7→12 dimension change, normalization choices, shadow
  rollout protocol, and promotion gate (offline sim win per ADR-0002).
- recommender.ts: register egreedy-v2-shadow in shadow-policy map (disabled by
  default). When enabled, calls /score/egreedy/v2 fire-and-forget and publishes
  shadow:egreedy-v2-shadow serve signal. No reward to shadow — sim is the gate.
- sim runner/personas: personas carry synthetic profile_features per persona;
  _call_score/_call_reward thread profile_features through (None-safe for v1/linucb).
- 18 new Python tests; all 56 Python + 170 TS tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-25 10:00:38 +00:00
parent b8113d4bda
commit 2d7cf217a9
6 changed files with 629 additions and 20 deletions

View File

@@ -2,12 +2,17 @@
oO ML Serving — Phase 1: LinUCB contextual bandit.
Contract:
POST /score { user_id, candidates, context } → { tip_id, score, policy }
POST /reward { user_id, tip_id, reward, features } → { ok }
POST /reset/{user_id}{ ok }
GET /stats/{user_id}{ pulls, cumulative_reward, estimated_mean, last_updated }
GET /features/{user_id} { history: [{ ts, features, score }] }
GET /health → { ok }
POST /score LinUCB d=5 (baseline, kept as shadow-eligible)
POST /score/egreedy ε-greedy v1, d=7 (active — ADR-0007)
POST /score/egreedy/v2 ε-greedy v2, d=12, profile features (shadow — ADR-0012)
POST /reward, /reward/egreedy, /reward/egreedy/v2
GET /stats/{user_id} LinUCB stats
GET /stats/egreedy/{user_id} ε-greedy v1 stats
GET /stats/egreedy/v2/{user_id} ε-greedy v2 stats
POST /reset/{user_id} clear all per-user bandit state
GET /features/{user_id} last 100 scored feature vectors
POST /generate LLM tip candidates via LiteLLM
GET /health { ok }
Features (d=5):
hour_sin, hour_cos — cyclical time-of-day encoding
@@ -43,7 +48,8 @@ STATE_DIR.mkdir(parents=True, exist_ok=True)
ALPHA = 1.0 # LinUCB exploration coefficient
D = 5 # LinUCB feature dimension
D7 = 7 # ε-greedy feature dimension (adds day-of-week cyclical encoding)
D7 = 7 # ε-greedy v1 feature dimension (adds day-of-week cyclical encoding)
D12 = 12 # ε-greedy v2 feature dimension (adds 5 profile features — ADR-0012)
EPSILON = 0.1 # ε-greedy exploration rate
FEATURE_HISTORY_SIZE = 100 # per-user ring buffer
@@ -126,6 +132,98 @@ def save_state7(user_id: str, A: np.ndarray, b: np.ndarray, meta: dict) -> None:
p.write_text(json.dumps({"A": A.tolist(), "b": b.tolist(), "meta": meta}))
# ── ε-greedy v2 state helpers (d=12, profile features — ADR-0012) ─────────
#
# Normalization choices (see ADR-0012):
# completion_rate_30d — already 01, passthrough; null → 0
# dismiss_rate_30d — already 01, passthrough; null → 0
# mean_dwell_ms_30d — clipped to [0, 600_000 ms] (10 min), then /600_000
# preferred_hour — circular alignment with context hour:
# (cos(2π·(now pref)/24) + 1) / 2 → 01
# captures "is the user's habitual peak near now?"
# null → 0.5 (neutral)
# tip_volume_30d — log1p(n) / log1p(100), clipped to [0, 1]
_DWELL_CLIP_MS = 600_000.0 # 10 minutes
_VOLUME_LOG_MAX = math.log1p(100.0)
def _profile_value(profile: Optional[dict], key: str) -> Optional[float]:
if not profile:
return None
v = profile.get(key)
if v is None:
return None
try:
return float(v)
except (TypeError, ValueError):
return None
def _norm_rate(v: Optional[float]) -> float:
return 0.0 if v is None else max(0.0, min(1.0, v))
def _norm_dwell(v: Optional[float]) -> float:
if v is None:
return 0.0
return max(0.0, min(1.0, v / _DWELL_CLIP_MS))
def _norm_volume(v: Optional[float]) -> float:
if v is None or v <= 0:
return 0.0
return min(1.0, math.log1p(float(v)) / _VOLUME_LOG_MAX)
def _norm_preferred_hour(pref: Optional[float], now_hour: int) -> float:
if pref is None:
return 0.5 # neutral when the user has no established peak yet
delta = (float(pref) - float(now_hour)) * (2.0 * math.pi / 24.0)
return (math.cos(delta) + 1.0) / 2.0
def build_feature_vector_12(
features: dict,
day_of_week: int = 0,
profile: Optional[dict] = None,
) -> np.ndarray:
"""d=12: egreedy-v1's 7 dims + 5 normalized profile features (ADR-0012)."""
base7 = build_feature_vector_7(features, day_of_week)
now_hour = int(features.get("hour_of_day", 12))
profile_dims = np.array(
[
_norm_rate(_profile_value(profile, "completion_rate_30d")),
_norm_rate(_profile_value(profile, "dismiss_rate_30d")),
_norm_dwell(_profile_value(profile, "mean_dwell_ms_30d")),
_norm_preferred_hour(_profile_value(profile, "preferred_hour"), now_hour),
_norm_volume(_profile_value(profile, "tip_volume_30d")),
],
dtype=np.float64,
)
return np.concatenate([base7, profile_dims])
def state12_path(user_id: str) -> Path:
safe = "".join(c if c.isalnum() else "_" for c in user_id)
return STATE_DIR / f"{safe}_egreedy_v2.json"
def load_state12(user_id: str) -> tuple[np.ndarray, np.ndarray, dict]:
p = state12_path(user_id)
if p.exists():
raw = json.loads(p.read_text())
A = np.array(raw["A"], dtype=np.float64)
b = np.array(raw["b"], dtype=np.float64)
return A, b, raw.get("meta", {})
return np.identity(D12, dtype=np.float64), np.zeros(D12, dtype=np.float64), {}
def save_state12(user_id: str, A: np.ndarray, b: np.ndarray, meta: dict) -> None:
p = state12_path(user_id)
p.write_text(json.dumps({"A": A.tolist(), "b": b.tolist(), "meta": meta}))
# ── API models ─────────────────────────────────────────────────────────────
class CandidateFeatures(BaseModel):
@@ -171,6 +269,10 @@ class RewardRequest(BaseModel):
reward: float # +1 done, +0.5 helpful, 0 snooze, -0.5 not_helpful, -1 dismiss
features: CandidateFeatures
day_of_week: int = 0 # included so egreedy can train dow features correctly
# Profile features at the time the tip was served. Ignored by /reward and
# /reward/egreedy; consumed by /reward/egreedy/v2 so the ridge update uses
# the same feature vector as the matching /score/egreedy/v2 call.
profile_features: Optional[dict] = None
class RewardResponse(BaseModel):
@@ -472,6 +574,128 @@ def reward_egreedy(req: RewardRequest) -> RewardResponse:
return RewardResponse(ok=True)
@app.post("/score/egreedy/v2", response_model=ScoreResponse)
def score_egreedy_v2(req: ScoreRequest) -> ScoreResponse:
"""ε-greedy v2 — d=12, adds 5 normalized profile features (ADR-0012).
Shadow-only until offline sim + rollout per ADR-0002 completes.
Accepts the same ScoreRequest shape as v1; `profile_features` drives the
extra 5 dims (defaults: zeros for rates/volume/dwell, 0.5 neutral for
preferred_hour alignment).
"""
if not req.candidates:
raise HTTPException(status_code=422, detail="No candidates")
A, b, meta = load_state12(req.user_id)
try:
A_inv = np.linalg.inv(A)
except np.linalg.LinAlgError:
A_inv = np.identity(D12, dtype=np.float64)
theta = A_inv @ b
dow = req.context.day_of_week
exploring = np.random.random() < EPSILON
if exploring:
chosen = req.candidates[np.random.randint(len(req.candidates))]
feat_dict = {
"hour_of_day": req.context.hour_of_day,
"is_overdue": chosen.features.is_overdue,
"task_age_days": chosen.features.task_age_days,
"priority": chosen.features.priority,
}
x = build_feature_vector_12(feat_dict, dow, req.profile_features)
best_score = float(theta @ x)
best_id = chosen.id
else:
best_id = None
best_score = -float("inf")
feat_dict = {}
for candidate in req.candidates:
fd = {
"hour_of_day": req.context.hour_of_day,
"is_overdue": candidate.features.is_overdue,
"task_age_days": candidate.features.task_age_days,
"priority": candidate.features.priority,
}
x = build_feature_vector_12(fd, dow, req.profile_features)
s = float(theta @ x)
if s > best_score:
best_score = s
best_id = candidate.id
feat_dict = fd
history = get_feature_history(req.user_id)
history.append({
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"features": {**feat_dict, "day_of_week": dow, "exploring": exploring},
"score": best_score,
"tip_id": best_id,
"policy": "egreedy-v2",
})
meta["pulls"] = meta.get("pulls", 0) + 1
meta["explore_count"] = meta.get("explore_count", 0) + int(exploring)
meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
save_state12(req.user_id, A, b, meta)
return ScoreResponse(tip_id=best_id, score=best_score, policy="egreedy-v2")
@app.post("/reward/egreedy/v2", response_model=RewardResponse)
def reward_egreedy_v2(req: RewardRequest) -> RewardResponse:
"""Update ε-greedy v2 ridge estimator using the d=12 feature vector."""
A, b, meta = load_state12(req.user_id)
feat_dict = {
"hour_of_day": req.features.hour_of_day,
"is_overdue": req.features.is_overdue,
"task_age_days": req.features.task_age_days,
"priority": req.features.priority,
}
x = build_feature_vector_12(feat_dict, req.day_of_week, req.profile_features)
A += np.outer(x, x)
b += req.reward * x
meta["cumulative_reward"] = meta.get("cumulative_reward", 0.0) + req.reward
meta["reward_count"] = meta.get("reward_count", 0) + 1
meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
save_state12(req.user_id, A, b, meta)
return RewardResponse(ok=True)
@app.get("/stats/egreedy/v2/{user_id}")
def stats_egreedy_v2(user_id: str):
"""ε-greedy v2 policy stats — pulls, cumulative reward, θ vector."""
A, b, meta = load_state12(user_id)
try:
theta = (np.linalg.inv(A) @ b).tolist()
except np.linalg.LinAlgError:
theta = [0.0] * D12
pulls = meta.get("pulls", 0)
cumulative_reward = meta.get("cumulative_reward", 0.0)
reward_count = meta.get("reward_count", 0)
explore_count = meta.get("explore_count", 0)
return {
"user_id": user_id,
"policy": "egreedy-v2",
"pulls": pulls,
"reward_count": reward_count,
"cumulative_reward": cumulative_reward,
"estimated_mean_reward": cumulative_reward / reward_count if reward_count > 0 else 0.0,
"exploration_rate": explore_count / pulls if pulls > 0 else 0.0,
"theta": theta,
"feature_labels": [
"hour_sin", "hour_cos", "is_overdue", "task_age", "priority",
"dow_sin", "dow_cos",
"completion_rate_30d", "dismiss_rate_30d", "mean_dwell_norm",
"preferred_hour_alignment", "tip_volume_norm",
],
"last_updated": meta.get("last_updated"),
}
@app.get("/stats/egreedy/{user_id}")
def stats_egreedy(user_id: str):
"""ε-greedy policy stats — pulls, cumulative reward, θ vector."""
@@ -509,6 +733,9 @@ def reset(user_id: str) -> RewardResponse:
p7 = state7_path(user_id)
if p7.exists():
p7.unlink()
p12 = state12_path(user_id)
if p12.exists():
p12.unlink()
if user_id in _feature_history:
_feature_history[user_id].clear()
return RewardResponse(ok=True)

View File

@@ -6,7 +6,15 @@ import math
import pytest
from httpx import AsyncClient, ASGITransport
from main import app, build_feature_vector
from main import (
app,
build_feature_vector,
build_feature_vector_12,
_norm_dwell,
_norm_preferred_hour,
_norm_rate,
_norm_volume,
)
class TestFeatureVector:
@@ -243,6 +251,176 @@ async def test_stats_for_fresh_user():
assert body["estimated_mean_reward"] == 0.0
class TestV2Normalization:
def test_rate_passthrough(self):
assert _norm_rate(0.0) == 0.0
assert _norm_rate(0.42) == 0.42
assert _norm_rate(1.0) == 1.0
def test_rate_none_zero(self):
assert _norm_rate(None) == 0.0
def test_rate_clipped(self):
assert _norm_rate(1.5) == 1.0
assert _norm_rate(-0.1) == 0.0
def test_dwell_none_zero(self):
assert _norm_dwell(None) == 0.0
def test_dwell_scales_to_0_1(self):
assert _norm_dwell(0) == 0.0
# 600_000 ms (10 min) is the clip ceiling
assert _norm_dwell(600_000) == 1.0
assert _norm_dwell(1_200_000) == 1.0
assert _norm_dwell(60_000) == pytest.approx(0.1)
def test_volume_monotonic_and_clipped(self):
assert _norm_volume(None) == 0.0
assert _norm_volume(0) == 0.0
assert _norm_volume(10) < _norm_volume(100)
# 100 tips ≈ full saturation
assert _norm_volume(100) == pytest.approx(1.0)
assert _norm_volume(10_000) == 1.0
def test_preferred_hour_alignment(self):
# Exact match → 1.0
assert _norm_preferred_hour(9, 9) == pytest.approx(1.0)
# 12h opposite → 0.0
assert _norm_preferred_hour(21, 9) == pytest.approx(0.0, abs=1e-10)
# 6h off → 0.5 (cos(π/2) = 0, scaled to 0.5)
assert _norm_preferred_hour(15, 9) == pytest.approx(0.5, abs=1e-10)
def test_preferred_hour_null_neutral(self):
# Null preference → neutral 0.5 rather than misleading "alignment at 0"
assert _norm_preferred_hour(None, 9) == 0.5
class TestFeatureVector12:
def test_shape(self):
v = build_feature_vector_12(
{"hour_of_day": 9, "is_overdue": True, "task_age_days": 2, "priority": 3},
day_of_week=2,
profile={
"completion_rate_30d": 0.5,
"dismiss_rate_30d": 0.1,
"mean_dwell_ms_30d": 60_000,
"preferred_hour": 9,
"tip_volume_30d": 20,
},
)
assert v.shape == (12,)
def test_first_seven_match_v1(self):
"""v2 must reduce to v1-style features on the first 7 dims so rollout
behaviour is predictable when profile is absent."""
from main import build_feature_vector_7
feat = {"hour_of_day": 14, "is_overdue": True, "task_age_days": 5, "priority": 2}
v1 = build_feature_vector_7(feat, day_of_week=3)
v2 = build_feature_vector_12(feat, day_of_week=3, profile=None)
assert (v1 == v2[:7]).all()
def test_missing_profile_defaults(self):
v = build_feature_vector_12({"hour_of_day": 9}, day_of_week=0, profile=None)
# completion, dismiss, dwell, volume → 0; preferred_hour → 0.5 neutral
assert v[7] == 0.0
assert v[8] == 0.0
assert v[9] == 0.0
assert v[10] == pytest.approx(0.5)
assert v[11] == 0.0
@pytest.mark.asyncio
async def test_score_egreedy_v2_returns_candidate():
payload = {
"user_id": "v2-user",
"candidates": [
{"id": "t:a", "content": "A", "source": "todoist",
"features": {"is_overdue": True, "task_age_days": 2, "priority": 3}},
{"id": "t:b", "content": "B", "source": "todoist",
"features": {"is_overdue": False, "task_age_days": 0, "priority": 1}},
],
"context": {"hour_of_day": 9, "day_of_week": 1},
"profile_features": {
"completion_rate_30d": 0.4,
"dismiss_rate_30d": 0.1,
"mean_dwell_ms_30d": 45_000,
"preferred_hour": 9,
"tip_volume_30d": 8,
},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.post("/score/egreedy/v2", json=payload)
assert r.status_code == 200
body = r.json()
assert body["tip_id"] in {"t:a", "t:b"}
assert body["policy"] == "egreedy-v2"
@pytest.mark.asyncio
async def test_score_egreedy_v2_accepts_missing_profile():
payload = {
"user_id": "v2-no-profile",
"candidates": [
{"id": "t:solo", "content": "Solo", "source": "todoist",
"features": {"is_overdue": False, "task_age_days": 0, "priority": 1}},
],
"context": {"hour_of_day": 10, "day_of_week": 0},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.post("/score/egreedy/v2", json=payload)
assert r.status_code == 200
assert r.json()["tip_id"] == "t:solo"
@pytest.mark.asyncio
async def test_reward_egreedy_v2_updates_stats():
user_id = "v2-reward-stats"
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r0 = await client.get(f"/stats/egreedy/v2/{user_id}")
before = r0.json()["cumulative_reward"]
await client.post("/reward/egreedy/v2", json={
"user_id": user_id,
"tip_id": "t:r",
"reward": 1.0,
"features": {"hour_of_day": 9, "is_overdue": True, "task_age_days": 2, "priority": 3},
"day_of_week": 1,
"profile_features": {
"completion_rate_30d": 0.3,
"dismiss_rate_30d": 0.2,
"mean_dwell_ms_30d": 30_000,
"preferred_hour": 9,
"tip_volume_30d": 5,
},
})
r1 = await client.get(f"/stats/egreedy/v2/{user_id}")
body = r1.json()
assert body["cumulative_reward"] == pytest.approx(before + 1.0)
assert body["policy"] == "egreedy-v2"
assert len(body["theta"]) == 12
assert len(body["feature_labels"]) == 12
@pytest.mark.asyncio
async def test_reset_clears_v2_state():
user_id = "v2-reset"
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
await client.post("/score/egreedy/v2", json={
"user_id": user_id,
"candidates": [
{"id": "t:v2r", "content": "x", "source": "todoist",
"features": {"is_overdue": False, "task_age_days": 0, "priority": 1}},
],
"context": {"hour_of_day": 10, "day_of_week": 0},
})
r0 = await client.get(f"/stats/egreedy/v2/{user_id}")
assert r0.json()["pulls"] >= 1
await client.post(f"/reset/{user_id}")
r1 = await client.get(f"/stats/egreedy/v2/{user_id}")
assert r1.json()["pulls"] == 0
@pytest.mark.asyncio
async def test_reward_negative_value():
"""Dismissing a tip should decrease cumulative_reward."""