chore(ml): remove bandit endpoints + helpers (ADR-0013 step 9)

Deletes all LinUCB and ε-greedy code from ml/serving: score, reward,
stats, reset, features endpoints; feature vector builders; per-user state
file helpers; related Pydantic models; numpy/math/time imports.

Removes test_score.py (pure bandit unit tests). 40 remaining tests pass.
STATE_DIR kept — nats_consumer still writes sync metadata there.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-04 10:41:58 +00:00
parent c65bedcf68
commit 8e9718e8ba
2 changed files with 7 additions and 1054 deletions

View File

@@ -1,41 +1,24 @@
"""
oO ML Serving — Phase 1: LinUCB contextual bandit.
oO ML Serving — multi-agent orchestrator (ADR-0013).
Contract:
POST /score LinUCB d=5 (baseline, kept as shadow-eligible)
POST /score/egreedy ε-greedy v1, d=7 (active — ADR-0007)
POST /score/egreedy/v2 ε-greedy v2, d=12, profile features (shadow — ADR-0012)
POST /reward, /reward/egreedy, /reward/egreedy/v2
GET /stats/{user_id} LinUCB stats
GET /stats/egreedy/{user_id} ε-greedy v1 stats
GET /stats/egreedy/v2/{user_id} ε-greedy v2 stats
POST /reset/{user_id} clear all per-user bandit state
GET /features/{user_id} last 100 scored feature vectors
POST /generate LLM tip candidates via LiteLLM
GET /health { ok }
Features (d=5):
hour_sin, hour_cos — cyclical time-of-day encoding
is_overdue — 0 or 1
task_age_days — days since due date (clipped 030, normalised 01)
priority_norm — Todoist priority 14, normalised to 01
POST /agents/{agent_id}/compute run a sub-agent, return prompt snippet
POST /recommend orchestrate agent snippets → one tip via LiteLLM
POST /generate LLM tip candidates (legacy; kept for bench/eval)
GET /health { ok, agents: [...] }
"""
from __future__ import annotations
import json
import math
import os
import sys
import time
from collections import deque
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Deque
from typing import Optional
import httpx
import numpy as np
import sentry_sdk
import structlog
import structlog.contextvars
@@ -93,242 +76,12 @@ app.add_middleware(_TracingMiddleware)
LITELLM_URL = os.getenv("LITELLM_URL", "http://localhost:4000")
LITELLM_MASTER_KEY = os.getenv("LITELLM_MASTER_KEY", "sk-oo-dev")
STATE_DIR = Path(os.getenv("STATE_DIR", "/tmp/oo-bandit-state"))
STATE_DIR = Path(os.getenv("STATE_DIR", "/tmp/oo-serving-state"))
STATE_DIR.mkdir(parents=True, exist_ok=True)
ALPHA = 1.0 # LinUCB exploration coefficient
D = 5 # LinUCB feature dimension
D7 = 7 # ε-greedy v1 feature dimension (adds day-of-week cyclical encoding)
D12 = 12 # ε-greedy v2 feature dimension (adds 5 profile features — ADR-0012)
EPSILON = 0.1 # ε-greedy exploration rate
FEATURE_HISTORY_SIZE = 100 # per-user ring buffer
# ── Per-user in-memory feature history ────────────────────────────────────
_feature_history: dict[str, deque] = {}
def get_feature_history(user_id: str) -> deque:
if user_id not in _feature_history:
_feature_history[user_id] = deque(maxlen=FEATURE_HISTORY_SIZE)
return _feature_history[user_id]
# ── Feature helpers ────────────────────────────────────────────────────────
def build_feature_vector(features: dict) -> np.ndarray:
hour = features.get("hour_of_day", 12)
hour_sin = math.sin(2 * math.pi * hour / 24)
hour_cos = math.cos(2 * math.pi * hour / 24)
is_overdue = float(bool(features.get("is_overdue", False)))
age = min(float(features.get("task_age_days", 0)), 30.0) / 30.0
priority = (float(features.get("priority", 1)) - 1.0) / 3.0
return np.array([hour_sin, hour_cos, is_overdue, age, priority], dtype=np.float64)
# ── Per-user bandit state (disjoint LinUCB, global arm) ───────────────────
# ── LinUCB state helpers ───────────────────────────────────────────────────
def state_path(user_id: str) -> Path:
safe = "".join(c if c.isalnum() else "_" for c in user_id)
return STATE_DIR / f"{safe}.json"
def load_state(user_id: str) -> tuple[np.ndarray, np.ndarray, dict]:
"""Returns (A, b, meta). A is DxD, b is D-vector."""
p = state_path(user_id)
if p.exists():
raw = json.loads(p.read_text())
A = np.array(raw["A"], dtype=np.float64)
b = np.array(raw["b"], dtype=np.float64)
meta = raw.get("meta", {})
return A, b, meta
return np.identity(D, dtype=np.float64), np.zeros(D, dtype=np.float64), {}
def save_state(user_id: str, A: np.ndarray, b: np.ndarray, meta: dict) -> None:
p = state_path(user_id)
p.write_text(json.dumps({"A": A.tolist(), "b": b.tolist(), "meta": meta}))
# ── ε-greedy state helpers (d=7, extended features) ───────────────────────
def build_feature_vector_7(features: dict, day_of_week: int = 0) -> np.ndarray:
"""d=7: base 5 features + day-of-week cyclical encoding."""
base = build_feature_vector(features)
dow_sin = math.sin(2 * math.pi * day_of_week / 7)
dow_cos = math.cos(2 * math.pi * day_of_week / 7)
return np.append(base, [dow_sin, dow_cos])
def state7_path(user_id: str) -> Path:
safe = "".join(c if c.isalnum() else "_" for c in user_id)
return STATE_DIR / f"{safe}_egreedy.json"
def load_state7(user_id: str) -> tuple[np.ndarray, np.ndarray, dict]:
"""Returns (A, b, meta) for ε-greedy d=7 policy."""
p = state7_path(user_id)
if p.exists():
raw = json.loads(p.read_text())
A = np.array(raw["A"], dtype=np.float64)
b = np.array(raw["b"], dtype=np.float64)
return A, b, raw.get("meta", {})
return np.identity(D7, dtype=np.float64), np.zeros(D7, dtype=np.float64), {}
def save_state7(user_id: str, A: np.ndarray, b: np.ndarray, meta: dict) -> None:
p = state7_path(user_id)
p.write_text(json.dumps({"A": A.tolist(), "b": b.tolist(), "meta": meta}))
# ── ε-greedy v2 state helpers (d=12, profile features — ADR-0012) ─────────
#
# Normalization choices (see ADR-0012):
# completion_rate_30d — already 01, passthrough; null → 0
# dismiss_rate_30d — already 01, passthrough; null → 0
# mean_dwell_ms_30d — clipped to [0, 600_000 ms] (10 min), then /600_000
# preferred_hour — circular alignment with context hour:
# (cos(2π·(now pref)/24) + 1) / 2 → 01
# captures "is the user's habitual peak near now?"
# null → 0.5 (neutral)
# tip_volume_30d — log1p(n) / log1p(100), clipped to [0, 1]
_DWELL_CLIP_MS = 600_000.0 # 10 minutes
_VOLUME_LOG_MAX = math.log1p(100.0)
def _profile_value(profile: Optional[dict], key: str) -> Optional[float]:
if not profile:
return None
v = profile.get(key)
if v is None:
return None
try:
return float(v)
except (TypeError, ValueError):
return None
def _norm_rate(v: Optional[float]) -> float:
return 0.0 if v is None else max(0.0, min(1.0, v))
def _norm_dwell(v: Optional[float]) -> float:
if v is None:
return 0.0
return max(0.0, min(1.0, v / _DWELL_CLIP_MS))
def _norm_volume(v: Optional[float]) -> float:
if v is None or v <= 0:
return 0.0
return min(1.0, math.log1p(float(v)) / _VOLUME_LOG_MAX)
def _norm_preferred_hour(pref: Optional[float], now_hour: int) -> float:
if pref is None:
return 0.5 # neutral when the user has no established peak yet
delta = (float(pref) - float(now_hour)) * (2.0 * math.pi / 24.0)
return (math.cos(delta) + 1.0) / 2.0
def build_feature_vector_12(
features: dict,
day_of_week: int = 0,
profile: Optional[dict] = None,
) -> np.ndarray:
"""d=12: egreedy-v1's 7 dims + 5 normalized profile features (ADR-0012)."""
base7 = build_feature_vector_7(features, day_of_week)
now_hour = int(features.get("hour_of_day", 12))
profile_dims = np.array(
[
_norm_rate(_profile_value(profile, "completion_rate_30d")),
_norm_rate(_profile_value(profile, "dismiss_rate_30d")),
_norm_dwell(_profile_value(profile, "mean_dwell_ms_30d")),
_norm_preferred_hour(_profile_value(profile, "preferred_hour"), now_hour),
_norm_volume(_profile_value(profile, "tip_volume_30d")),
],
dtype=np.float64,
)
return np.concatenate([base7, profile_dims])
def state12_path(user_id: str) -> Path:
safe = "".join(c if c.isalnum() else "_" for c in user_id)
return STATE_DIR / f"{safe}_egreedy_v2.json"
def load_state12(user_id: str) -> tuple[np.ndarray, np.ndarray, dict]:
p = state12_path(user_id)
if p.exists():
raw = json.loads(p.read_text())
A = np.array(raw["A"], dtype=np.float64)
b = np.array(raw["b"], dtype=np.float64)
return A, b, raw.get("meta", {})
return np.identity(D12, dtype=np.float64), np.zeros(D12, dtype=np.float64), {}
def save_state12(user_id: str, A: np.ndarray, b: np.ndarray, meta: dict) -> None:
p = state12_path(user_id)
p.write_text(json.dumps({"A": A.tolist(), "b": b.tolist(), "meta": meta}))
# ── API models ─────────────────────────────────────────────────────────────
class CandidateFeatures(BaseModel):
hour_of_day: int = 12
is_overdue: bool = False
task_age_days: float = 0.0
priority: int = 1
class Candidate(BaseModel):
id: str
content: str
source: str
source_id: Optional[str] = None
features: CandidateFeatures = CandidateFeatures()
class Context(BaseModel):
hour_of_day: int = 12
day_of_week: int = 0
class ScoreRequest(BaseModel):
user_id: str
candidates: list[Candidate]
context: Context = Context()
# User-level features computed by the API (#81 phase A). Accepted, logged,
# but not yet consumed by the bandit — extending the feature vector
# changes `D` and resets every user's learned state, which is a deliberate
# follow-up (phase B), not a side effect of this PR.
profile_features: Optional[dict] = None
class ScoreResponse(BaseModel):
tip_id: str
score: float
policy: str
class RewardRequest(BaseModel):
user_id: str
tip_id: str
reward: float # +1 done, +0.5 helpful, 0 snooze, -0.5 not_helpful, -1 dismiss
features: CandidateFeatures
day_of_week: int = 0 # included so egreedy can train dow features correctly
# Profile features at the time the tip was served. Ignored by /reward and
# /reward/egreedy; consumed by /reward/egreedy/v2 so the ridge update uses
# the same feature vector as the matching /score/egreedy/v2 call.
profile_features: Optional[dict] = None
class RewardResponse(BaseModel):
ok: bool
class PromptContext(BaseModel):
tasks: list[dict] = []
hour_of_day: int = 12
@@ -652,364 +405,3 @@ async def generate(req: GenerateRequest) -> GenerateResponse:
)
@app.post("/score", response_model=ScoreResponse)
def score(req: ScoreRequest) -> ScoreResponse:
if not req.candidates:
raise HTTPException(status_code=422, detail="No candidates")
A, b, meta = load_state(req.user_id)
try:
A_inv = np.linalg.inv(A)
except np.linalg.LinAlgError:
A_inv = np.identity(D, dtype=np.float64)
theta = A_inv @ b
best_id = None
best_score = -float("inf")
best_features: dict = {}
for candidate in req.candidates:
feat_dict = {
"hour_of_day": req.context.hour_of_day,
"is_overdue": candidate.features.is_overdue,
"task_age_days": candidate.features.task_age_days,
"priority": candidate.features.priority,
}
x = build_feature_vector(feat_dict)
exploit = float(theta @ x)
explore = ALPHA * math.sqrt(float(x @ A_inv @ x))
ucb = exploit + explore
if ucb > best_score:
best_score = ucb
best_id = candidate.id
best_features = feat_dict
# Log to feature history ring buffer
history = get_feature_history(req.user_id)
history.append({
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"features": best_features,
"score": best_score,
"tip_id": best_id,
})
# Update meta stats
meta["pulls"] = meta.get("pulls", 0) + 1
meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
save_state(req.user_id, A, b, meta)
return ScoreResponse(tip_id=best_id, score=best_score, policy="linucb-v1")
@app.post("/reward", response_model=RewardResponse)
def reward(req: RewardRequest) -> RewardResponse:
A, b, meta = load_state(req.user_id)
feat_dict = {
"hour_of_day": req.features.hour_of_day,
"is_overdue": req.features.is_overdue,
"task_age_days": req.features.task_age_days,
"priority": req.features.priority,
}
x = build_feature_vector(feat_dict)
A += np.outer(x, x)
b += req.reward * x
# Track cumulative reward in meta
meta["cumulative_reward"] = meta.get("cumulative_reward", 0.0) + req.reward
meta["reward_count"] = meta.get("reward_count", 0) + 1
meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
save_state(req.user_id, A, b, meta)
return RewardResponse(ok=True)
@app.post("/score/egreedy", response_model=ScoreResponse)
def score_egreedy(req: ScoreRequest) -> ScoreResponse:
"""ε-greedy policy with d=7 features (adds day-of-week encoding).
Exploration: pick uniformly at random with probability ε.
Exploitation: pick argmax of linear payoff estimate θ·x.
Differs from LinUCB in: no UCB bonus, richer feature space.
"""
if not req.candidates:
raise HTTPException(status_code=422, detail="No candidates")
A, b, meta = load_state7(req.user_id)
try:
A_inv = np.linalg.inv(A)
except np.linalg.LinAlgError:
A_inv = np.identity(D7, dtype=np.float64)
theta = A_inv @ b
dow = req.context.day_of_week
exploring = np.random.random() < EPSILON
if exploring:
chosen = req.candidates[np.random.randint(len(req.candidates))]
feat_dict = {
"hour_of_day": req.context.hour_of_day,
"is_overdue": chosen.features.is_overdue,
"task_age_days": chosen.features.task_age_days,
"priority": chosen.features.priority,
}
x = build_feature_vector_7(feat_dict, dow)
best_score = float(theta @ x)
best_id = chosen.id
else:
best_id = None
best_score = -float("inf")
feat_dict = {}
for candidate in req.candidates:
fd = {
"hour_of_day": req.context.hour_of_day,
"is_overdue": candidate.features.is_overdue,
"task_age_days": candidate.features.task_age_days,
"priority": candidate.features.priority,
}
x = build_feature_vector_7(fd, dow)
s = float(theta @ x)
if s > best_score:
best_score = s
best_id = candidate.id
feat_dict = fd
history = get_feature_history(req.user_id)
history.append({
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"features": {**feat_dict, "day_of_week": dow, "exploring": exploring},
"score": best_score,
"tip_id": best_id,
"policy": "egreedy-v1",
})
meta["pulls"] = meta.get("pulls", 0) + 1
meta["explore_count"] = meta.get("explore_count", 0) + int(exploring)
meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
save_state7(req.user_id, A, b, meta)
return ScoreResponse(tip_id=best_id, score=best_score, policy="egreedy-v1")
@app.post("/reward/egreedy", response_model=RewardResponse)
def reward_egreedy(req: RewardRequest) -> RewardResponse:
"""Update ε-greedy ridge estimator with observed reward."""
A, b, meta = load_state7(req.user_id)
feat_dict = {
"hour_of_day": req.features.hour_of_day,
"is_overdue": req.features.is_overdue,
"task_age_days": req.features.task_age_days,
"priority": req.features.priority,
}
x = build_feature_vector_7(feat_dict, day_of_week=req.day_of_week)
A += np.outer(x, x)
b += req.reward * x
meta["cumulative_reward"] = meta.get("cumulative_reward", 0.0) + req.reward
meta["reward_count"] = meta.get("reward_count", 0) + 1
meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
save_state7(req.user_id, A, b, meta)
return RewardResponse(ok=True)
@app.post("/score/egreedy/v2", response_model=ScoreResponse)
def score_egreedy_v2(req: ScoreRequest) -> ScoreResponse:
"""ε-greedy v2 — d=12, adds 5 normalized profile features (ADR-0012).
Shadow-only until offline sim + rollout per ADR-0002 completes.
Accepts the same ScoreRequest shape as v1; `profile_features` drives the
extra 5 dims (defaults: zeros for rates/volume/dwell, 0.5 neutral for
preferred_hour alignment).
"""
if not req.candidates:
raise HTTPException(status_code=422, detail="No candidates")
A, b, meta = load_state12(req.user_id)
try:
A_inv = np.linalg.inv(A)
except np.linalg.LinAlgError:
A_inv = np.identity(D12, dtype=np.float64)
theta = A_inv @ b
dow = req.context.day_of_week
exploring = np.random.random() < EPSILON
if exploring:
chosen = req.candidates[np.random.randint(len(req.candidates))]
feat_dict = {
"hour_of_day": req.context.hour_of_day,
"is_overdue": chosen.features.is_overdue,
"task_age_days": chosen.features.task_age_days,
"priority": chosen.features.priority,
}
x = build_feature_vector_12(feat_dict, dow, req.profile_features)
best_score = float(theta @ x)
best_id = chosen.id
else:
best_id = None
best_score = -float("inf")
feat_dict = {}
for candidate in req.candidates:
fd = {
"hour_of_day": req.context.hour_of_day,
"is_overdue": candidate.features.is_overdue,
"task_age_days": candidate.features.task_age_days,
"priority": candidate.features.priority,
}
x = build_feature_vector_12(fd, dow, req.profile_features)
s = float(theta @ x)
if s > best_score:
best_score = s
best_id = candidate.id
feat_dict = fd
history = get_feature_history(req.user_id)
history.append({
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"features": {**feat_dict, "day_of_week": dow, "exploring": exploring},
"score": best_score,
"tip_id": best_id,
"policy": "egreedy-v2",
})
meta["pulls"] = meta.get("pulls", 0) + 1
meta["explore_count"] = meta.get("explore_count", 0) + int(exploring)
meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
save_state12(req.user_id, A, b, meta)
return ScoreResponse(tip_id=best_id, score=best_score, policy="egreedy-v2")
@app.post("/reward/egreedy/v2", response_model=RewardResponse)
def reward_egreedy_v2(req: RewardRequest) -> RewardResponse:
"""Update ε-greedy v2 ridge estimator using the d=12 feature vector."""
A, b, meta = load_state12(req.user_id)
feat_dict = {
"hour_of_day": req.features.hour_of_day,
"is_overdue": req.features.is_overdue,
"task_age_days": req.features.task_age_days,
"priority": req.features.priority,
}
x = build_feature_vector_12(feat_dict, req.day_of_week, req.profile_features)
A += np.outer(x, x)
b += req.reward * x
meta["cumulative_reward"] = meta.get("cumulative_reward", 0.0) + req.reward
meta["reward_count"] = meta.get("reward_count", 0) + 1
meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
save_state12(req.user_id, A, b, meta)
return RewardResponse(ok=True)
@app.get("/stats/egreedy/v2/{user_id}")
def stats_egreedy_v2(user_id: str):
"""ε-greedy v2 policy stats — pulls, cumulative reward, θ vector."""
A, b, meta = load_state12(user_id)
try:
theta = (np.linalg.inv(A) @ b).tolist()
except np.linalg.LinAlgError:
theta = [0.0] * D12
pulls = meta.get("pulls", 0)
cumulative_reward = meta.get("cumulative_reward", 0.0)
reward_count = meta.get("reward_count", 0)
explore_count = meta.get("explore_count", 0)
return {
"user_id": user_id,
"policy": "egreedy-v2",
"pulls": pulls,
"reward_count": reward_count,
"cumulative_reward": cumulative_reward,
"estimated_mean_reward": cumulative_reward / reward_count if reward_count > 0 else 0.0,
"exploration_rate": explore_count / pulls if pulls > 0 else 0.0,
"theta": theta,
"feature_labels": [
"hour_sin", "hour_cos", "is_overdue", "task_age", "priority",
"dow_sin", "dow_cos",
"completion_rate_30d", "dismiss_rate_30d", "mean_dwell_norm",
"preferred_hour_alignment", "tip_volume_norm",
],
"last_updated": meta.get("last_updated"),
}
@app.get("/stats/egreedy/{user_id}")
def stats_egreedy(user_id: str):
"""ε-greedy policy stats — pulls, cumulative reward, θ vector."""
A, b, meta = load_state7(user_id)
try:
theta = (np.linalg.inv(A) @ b).tolist()
except np.linalg.LinAlgError:
theta = [0.0] * D7
pulls = meta.get("pulls", 0)
cumulative_reward = meta.get("cumulative_reward", 0.0)
reward_count = meta.get("reward_count", 0)
explore_count = meta.get("explore_count", 0)
return {
"user_id": user_id,
"policy": "egreedy-v1",
"pulls": pulls,
"reward_count": reward_count,
"cumulative_reward": cumulative_reward,
"estimated_mean_reward": cumulative_reward / reward_count if reward_count > 0 else 0.0,
"exploration_rate": explore_count / pulls if pulls > 0 else 0.0,
"theta": theta,
"feature_labels": ["hour_sin", "hour_cos", "is_overdue", "task_age", "priority", "dow_sin", "dow_cos"],
"last_updated": meta.get("last_updated"),
}
@app.post("/reset/{user_id}", response_model=RewardResponse)
def reset(user_id: str) -> RewardResponse:
"""Reset per-user bandit state (admin action)."""
p = state_path(user_id)
if p.exists():
p.unlink()
p7 = state7_path(user_id)
if p7.exists():
p7.unlink()
p12 = state12_path(user_id)
if p12.exists():
p12.unlink()
if user_id in _feature_history:
_feature_history[user_id].clear()
return RewardResponse(ok=True)
@app.get("/stats/{user_id}")
def stats(user_id: str):
"""Return current LinUCB state summary for a user."""
A, b, meta = load_state(user_id)
try:
A_inv = np.linalg.inv(A)
theta = (A_inv @ b).tolist()
except np.linalg.LinAlgError:
theta = [0.0] * D
pulls = meta.get("pulls", 0)
cumulative_reward = meta.get("cumulative_reward", 0.0)
reward_count = meta.get("reward_count", 0)
estimated_mean = cumulative_reward / reward_count if reward_count > 0 else 0.0
return {
"user_id": user_id,
"pulls": pulls,
"reward_count": reward_count,
"cumulative_reward": cumulative_reward,
"estimated_mean_reward": estimated_mean,
"theta": theta,
"last_updated": meta.get("last_updated"),
}
@app.get("/features/{user_id}")
def features(user_id: str):
"""Return recent feature vectors logged at scoring time."""
history = get_feature_history(user_id)
return {
"user_id": user_id,
"history": list(history),
}

View File

@@ -1,439 +0,0 @@
"""
Unit tests for ml/serving — feature building and scoring contract.
Run with: pytest ml/serving/tests/
"""
import math
import pytest
from httpx import AsyncClient, ASGITransport
from main import (
app,
build_feature_vector,
build_feature_vector_12,
_norm_dwell,
_norm_preferred_hour,
_norm_rate,
_norm_volume,
)
class TestFeatureVector:
def test_shape(self):
v = build_feature_vector({"hour_of_day": 8, "is_overdue": True, "task_age_days": 3, "priority": 3})
assert v.shape == (5,)
def test_hour_encoding_noon(self):
v = build_feature_vector({"hour_of_day": 12})
# sin(2π * 12/24) = sin(π) ≈ 0
assert abs(v[0]) < 1e-10
# cos(2π * 12/24) = cos(π) = -1
assert abs(v[1] - (-1.0)) < 1e-10
def test_hour_encoding_midnight(self):
v = build_feature_vector({"hour_of_day": 0})
# sin(0) = 0
assert abs(v[0]) < 1e-10
# cos(0) = 1
assert abs(v[1] - 1.0) < 1e-10
def test_hour_encoding_6am(self):
v = build_feature_vector({"hour_of_day": 6})
# sin(2π * 6/24) = sin(π/2) = 1
assert abs(v[0] - 1.0) < 1e-10
# cos(π/2) = 0
assert abs(v[1]) < 1e-10
def test_age_clipped_at_30(self):
v_long = build_feature_vector({"task_age_days": 100})
v_cap = build_feature_vector({"task_age_days": 30})
assert v_long[3] == v_cap[3] == 1.0
def test_age_zero(self):
v = build_feature_vector({"task_age_days": 0})
assert v[3] == pytest.approx(0.0)
def test_age_15_days_normalised(self):
v = build_feature_vector({"task_age_days": 15})
assert v[3] == pytest.approx(0.5)
def test_priority_normalised(self):
v1 = build_feature_vector({"priority": 1})
v4 = build_feature_vector({"priority": 4})
assert v1[4] == pytest.approx(0.0)
assert v4[4] == pytest.approx(1.0)
def test_priority_2_and_3(self):
v2 = build_feature_vector({"priority": 2})
v3 = build_feature_vector({"priority": 3})
assert v2[4] == pytest.approx(1 / 3)
assert v3[4] == pytest.approx(2 / 3)
def test_is_overdue_true(self):
v = build_feature_vector({"is_overdue": True})
assert v[2] == 1.0
def test_is_overdue_false(self):
v = build_feature_vector({"is_overdue": False})
assert v[2] == 0.0
def test_defaults_when_no_keys(self):
v = build_feature_vector({})
# hour=12 → sin(π)≈0, cos(π)=-1
assert abs(v[0]) < 1e-10
assert abs(v[1] - (-1.0)) < 1e-10
assert v[2] == 0.0 # is_overdue=False
assert v[3] == 0.0 # task_age_days=0
assert v[4] == 0.0 # priority=1 → (1-1)/3=0
@pytest.mark.asyncio
async def test_health():
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.get("/health")
assert r.status_code == 200
assert r.json()["ok"] is True
@pytest.mark.asyncio
async def test_score_returns_a_candidate():
payload = {
"user_id": "test-user",
"candidates": [
{"id": "t:1", "content": "Task A", "source": "todoist", "source_id": "1",
"features": {"is_overdue": True, "task_age_days": 2, "priority": 3}},
{"id": "t:2", "content": "Task B", "source": "todoist", "source_id": "2",
"features": {"is_overdue": False, "task_age_days": 0, "priority": 1}},
],
"context": {"hour_of_day": 9, "day_of_week": 1},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.post("/score", json=payload)
assert r.status_code == 200
body = r.json()
assert body["tip_id"] in {"t:1", "t:2"}
assert "policy" in body
assert body["policy"] == "linucb-v1"
assert isinstance(body["score"], float)
@pytest.mark.asyncio
async def test_score_single_candidate_always_selected():
"""With a single candidate there is no choice — it must be returned."""
payload = {
"user_id": "solo-user",
"candidates": [
{"id": "only:1", "content": "Only task", "source": "todoist",
"features": {"is_overdue": False, "task_age_days": 0, "priority": 1}},
],
"context": {"hour_of_day": 10, "day_of_week": 0},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.post("/score", json=payload)
assert r.status_code == 200
assert r.json()["tip_id"] == "only:1"
@pytest.mark.asyncio
async def test_score_empty_candidates_returns_422():
payload = {"user_id": "u", "candidates": [], "context": {"hour_of_day": 9, "day_of_week": 1}}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.post("/score", json=payload)
assert r.status_code == 422
@pytest.mark.asyncio
async def test_reward_accepted():
payload = {
"user_id": "reward-user",
"tip_id": "t:1",
"reward": 1.0,
"features": {"hour_of_day": 9, "is_overdue": True, "task_age_days": 2, "priority": 3},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.post("/reward", json=payload)
assert r.status_code == 200
assert r.json()["ok"] is True
@pytest.mark.asyncio
async def test_reward_updates_stats():
"""Posting a reward should increase cumulative_reward in /stats."""
user_id = "reward-stats-user"
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r0 = await client.get(f"/stats/{user_id}")
before = r0.json()["cumulative_reward"]
await client.post("/reward", json={
"user_id": user_id,
"tip_id": "tip:x",
"reward": 1.0,
"features": {"hour_of_day": 8, "is_overdue": False, "task_age_days": 0, "priority": 2},
})
r1 = await client.get(f"/stats/{user_id}")
assert r1.json()["cumulative_reward"] == pytest.approx(before + 1.0)
@pytest.mark.asyncio
async def test_score_increments_pulls():
user_id = "pull-counter-user"
payload = {
"user_id": user_id,
"candidates": [
{"id": "t:p1", "content": "Pull task", "source": "todoist",
"features": {"is_overdue": False, "task_age_days": 1, "priority": 2}},
],
"context": {"hour_of_day": 10, "day_of_week": 2},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r0 = await client.get(f"/stats/{user_id}")
pulls_before = r0.json()["pulls"]
await client.post("/score", json=payload)
await client.post("/score", json=payload)
r1 = await client.get(f"/stats/{user_id}")
assert r1.json()["pulls"] == pulls_before + 2
@pytest.mark.asyncio
async def test_reset_clears_state():
user_id = "reset-user"
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
# Score once to build state
await client.post("/score", json={
"user_id": user_id,
"candidates": [
{"id": "t:r", "content": "Reset task", "source": "todoist",
"features": {"is_overdue": True, "task_age_days": 5, "priority": 4}},
],
"context": {"hour_of_day": 14, "day_of_week": 3},
})
r_reset = await client.post(f"/reset/{user_id}")
assert r_reset.json()["ok"] is True
r_stats = await client.get(f"/stats/{user_id}")
assert r_stats.json()["pulls"] == 0
@pytest.mark.asyncio
async def test_features_endpoint_returns_history():
user_id = "features-user"
payload = {
"user_id": user_id,
"candidates": [
{"id": "t:f1", "content": "Feature task", "source": "todoist",
"features": {"is_overdue": False, "task_age_days": 0, "priority": 1}},
],
"context": {"hour_of_day": 7, "day_of_week": 0},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
await client.post("/score", json=payload)
r = await client.get(f"/features/{user_id}")
body = r.json()
assert r.status_code == 200
assert "history" in body
assert len(body["history"]) >= 1
entry = body["history"][-1]
assert "ts" in entry
assert "score" in entry
assert "tip_id" in entry
@pytest.mark.asyncio
async def test_stats_for_fresh_user():
"""A user with no history should return zero/default stats without error."""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.get("/stats/brand-new-user-xyz-abc")
body = r.json()
assert r.status_code == 200
assert body["pulls"] == 0
assert body["cumulative_reward"] == 0.0
assert body["estimated_mean_reward"] == 0.0
class TestV2Normalization:
def test_rate_passthrough(self):
assert _norm_rate(0.0) == 0.0
assert _norm_rate(0.42) == 0.42
assert _norm_rate(1.0) == 1.0
def test_rate_none_zero(self):
assert _norm_rate(None) == 0.0
def test_rate_clipped(self):
assert _norm_rate(1.5) == 1.0
assert _norm_rate(-0.1) == 0.0
def test_dwell_none_zero(self):
assert _norm_dwell(None) == 0.0
def test_dwell_scales_to_0_1(self):
assert _norm_dwell(0) == 0.0
# 600_000 ms (10 min) is the clip ceiling
assert _norm_dwell(600_000) == 1.0
assert _norm_dwell(1_200_000) == 1.0
assert _norm_dwell(60_000) == pytest.approx(0.1)
def test_volume_monotonic_and_clipped(self):
assert _norm_volume(None) == 0.0
assert _norm_volume(0) == 0.0
assert _norm_volume(10) < _norm_volume(100)
# 100 tips ≈ full saturation
assert _norm_volume(100) == pytest.approx(1.0)
assert _norm_volume(10_000) == 1.0
def test_preferred_hour_alignment(self):
# Exact match → 1.0
assert _norm_preferred_hour(9, 9) == pytest.approx(1.0)
# 12h opposite → 0.0
assert _norm_preferred_hour(21, 9) == pytest.approx(0.0, abs=1e-10)
# 6h off → 0.5 (cos(π/2) = 0, scaled to 0.5)
assert _norm_preferred_hour(15, 9) == pytest.approx(0.5, abs=1e-10)
def test_preferred_hour_null_neutral(self):
# Null preference → neutral 0.5 rather than misleading "alignment at 0"
assert _norm_preferred_hour(None, 9) == 0.5
class TestFeatureVector12:
def test_shape(self):
v = build_feature_vector_12(
{"hour_of_day": 9, "is_overdue": True, "task_age_days": 2, "priority": 3},
day_of_week=2,
profile={
"completion_rate_30d": 0.5,
"dismiss_rate_30d": 0.1,
"mean_dwell_ms_30d": 60_000,
"preferred_hour": 9,
"tip_volume_30d": 20,
},
)
assert v.shape == (12,)
def test_first_seven_match_v1(self):
"""v2 must reduce to v1-style features on the first 7 dims so rollout
behaviour is predictable when profile is absent."""
from main import build_feature_vector_7
feat = {"hour_of_day": 14, "is_overdue": True, "task_age_days": 5, "priority": 2}
v1 = build_feature_vector_7(feat, day_of_week=3)
v2 = build_feature_vector_12(feat, day_of_week=3, profile=None)
assert (v1 == v2[:7]).all()
def test_missing_profile_defaults(self):
v = build_feature_vector_12({"hour_of_day": 9}, day_of_week=0, profile=None)
# completion, dismiss, dwell, volume → 0; preferred_hour → 0.5 neutral
assert v[7] == 0.0
assert v[8] == 0.0
assert v[9] == 0.0
assert v[10] == pytest.approx(0.5)
assert v[11] == 0.0
@pytest.mark.asyncio
async def test_score_egreedy_v2_returns_candidate():
payload = {
"user_id": "v2-user",
"candidates": [
{"id": "t:a", "content": "A", "source": "todoist",
"features": {"is_overdue": True, "task_age_days": 2, "priority": 3}},
{"id": "t:b", "content": "B", "source": "todoist",
"features": {"is_overdue": False, "task_age_days": 0, "priority": 1}},
],
"context": {"hour_of_day": 9, "day_of_week": 1},
"profile_features": {
"completion_rate_30d": 0.4,
"dismiss_rate_30d": 0.1,
"mean_dwell_ms_30d": 45_000,
"preferred_hour": 9,
"tip_volume_30d": 8,
},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.post("/score/egreedy/v2", json=payload)
assert r.status_code == 200
body = r.json()
assert body["tip_id"] in {"t:a", "t:b"}
assert body["policy"] == "egreedy-v2"
@pytest.mark.asyncio
async def test_score_egreedy_v2_accepts_missing_profile():
payload = {
"user_id": "v2-no-profile",
"candidates": [
{"id": "t:solo", "content": "Solo", "source": "todoist",
"features": {"is_overdue": False, "task_age_days": 0, "priority": 1}},
],
"context": {"hour_of_day": 10, "day_of_week": 0},
}
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r = await client.post("/score/egreedy/v2", json=payload)
assert r.status_code == 200
assert r.json()["tip_id"] == "t:solo"
@pytest.mark.asyncio
async def test_reward_egreedy_v2_updates_stats():
user_id = "v2-reward-stats"
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r0 = await client.get(f"/stats/egreedy/v2/{user_id}")
before = r0.json()["cumulative_reward"]
await client.post("/reward/egreedy/v2", json={
"user_id": user_id,
"tip_id": "t:r",
"reward": 1.0,
"features": {"hour_of_day": 9, "is_overdue": True, "task_age_days": 2, "priority": 3},
"day_of_week": 1,
"profile_features": {
"completion_rate_30d": 0.3,
"dismiss_rate_30d": 0.2,
"mean_dwell_ms_30d": 30_000,
"preferred_hour": 9,
"tip_volume_30d": 5,
},
})
r1 = await client.get(f"/stats/egreedy/v2/{user_id}")
body = r1.json()
assert body["cumulative_reward"] == pytest.approx(before + 1.0)
assert body["policy"] == "egreedy-v2"
assert len(body["theta"]) == 12
assert len(body["feature_labels"]) == 12
@pytest.mark.asyncio
async def test_reset_clears_v2_state():
user_id = "v2-reset"
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
await client.post("/score/egreedy/v2", json={
"user_id": user_id,
"candidates": [
{"id": "t:v2r", "content": "x", "source": "todoist",
"features": {"is_overdue": False, "task_age_days": 0, "priority": 1}},
],
"context": {"hour_of_day": 10, "day_of_week": 0},
})
r0 = await client.get(f"/stats/egreedy/v2/{user_id}")
assert r0.json()["pulls"] >= 1
await client.post(f"/reset/{user_id}")
r1 = await client.get(f"/stats/egreedy/v2/{user_id}")
assert r1.json()["pulls"] == 0
@pytest.mark.asyncio
async def test_reward_negative_value():
"""Dismissing a tip should decrease cumulative_reward."""
user_id = "dismiss-user-neg"
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
r0 = await client.get(f"/stats/{user_id}")
before = r0.json()["cumulative_reward"]
await client.post("/reward", json={
"user_id": user_id,
"tip_id": "t:neg",
"reward": -1.0,
"features": {"hour_of_day": 20, "is_overdue": False, "task_age_days": 0, "priority": 1},
})
r1 = await client.get(f"/stats/{user_id}")
assert r1.json()["cumulative_reward"] == pytest.approx(before - 1.0)