diff --git a/ml/serving/main.py b/ml/serving/main.py index 878caf0..5fb4c6c 100644 --- a/ml/serving/main.py +++ b/ml/serving/main.py @@ -1,41 +1,24 @@ """ -oO ML Serving — Phase 1: LinUCB contextual bandit. +oO ML Serving — multi-agent orchestrator (ADR-0013). Contract: - POST /score LinUCB d=5 (baseline, kept as shadow-eligible) - POST /score/egreedy ε-greedy v1, d=7 (active — ADR-0007) - POST /score/egreedy/v2 ε-greedy v2, d=12, profile features (shadow — ADR-0012) - POST /reward, /reward/egreedy, /reward/egreedy/v2 - GET /stats/{user_id} LinUCB stats - GET /stats/egreedy/{user_id} ε-greedy v1 stats - GET /stats/egreedy/v2/{user_id} ε-greedy v2 stats - POST /reset/{user_id} clear all per-user bandit state - GET /features/{user_id} last 100 scored feature vectors - POST /generate LLM tip candidates via LiteLLM - GET /health { ok } - -Features (d=5): - hour_sin, hour_cos — cyclical time-of-day encoding - is_overdue — 0 or 1 - task_age_days — days since due date (clipped 0–30, normalised 0–1) - priority_norm — Todoist priority 1–4, normalised to 0–1 + POST /agents/{agent_id}/compute run a sub-agent, return prompt snippet + POST /recommend orchestrate agent snippets → one tip via LiteLLM + POST /generate LLM tip candidates (legacy; kept for bench/eval) + GET /health { ok, agents: [...] } """ from __future__ import annotations import json -import math import os import sys -import time -from collections import deque from contextlib import asynccontextmanager from datetime import datetime, timezone from pathlib import Path -from typing import Optional, Deque +from typing import Optional import httpx -import numpy as np import sentry_sdk import structlog import structlog.contextvars @@ -93,242 +76,12 @@ app.add_middleware(_TracingMiddleware) LITELLM_URL = os.getenv("LITELLM_URL", "http://localhost:4000") LITELLM_MASTER_KEY = os.getenv("LITELLM_MASTER_KEY", "sk-oo-dev") -STATE_DIR = Path(os.getenv("STATE_DIR", "/tmp/oo-bandit-state")) +STATE_DIR = Path(os.getenv("STATE_DIR", "/tmp/oo-serving-state")) STATE_DIR.mkdir(parents=True, exist_ok=True) -ALPHA = 1.0 # LinUCB exploration coefficient -D = 5 # LinUCB feature dimension -D7 = 7 # ε-greedy v1 feature dimension (adds day-of-week cyclical encoding) -D12 = 12 # ε-greedy v2 feature dimension (adds 5 profile features — ADR-0012) -EPSILON = 0.1 # ε-greedy exploration rate -FEATURE_HISTORY_SIZE = 100 # per-user ring buffer - - -# ── Per-user in-memory feature history ──────────────────────────────────── -_feature_history: dict[str, deque] = {} - -def get_feature_history(user_id: str) -> deque: - if user_id not in _feature_history: - _feature_history[user_id] = deque(maxlen=FEATURE_HISTORY_SIZE) - return _feature_history[user_id] - - -# ── Feature helpers ──────────────────────────────────────────────────────── - -def build_feature_vector(features: dict) -> np.ndarray: - hour = features.get("hour_of_day", 12) - hour_sin = math.sin(2 * math.pi * hour / 24) - hour_cos = math.cos(2 * math.pi * hour / 24) - is_overdue = float(bool(features.get("is_overdue", False))) - age = min(float(features.get("task_age_days", 0)), 30.0) / 30.0 - priority = (float(features.get("priority", 1)) - 1.0) / 3.0 - return np.array([hour_sin, hour_cos, is_overdue, age, priority], dtype=np.float64) - - -# ── Per-user bandit state (disjoint LinUCB, global arm) ─────────────────── - -# ── LinUCB state helpers ─────────────────────────────────────────────────── - -def state_path(user_id: str) -> Path: - safe = "".join(c if c.isalnum() else "_" for c in user_id) - return STATE_DIR / f"{safe}.json" - - -def load_state(user_id: str) -> tuple[np.ndarray, np.ndarray, dict]: - """Returns (A, b, meta). A is DxD, b is D-vector.""" - p = state_path(user_id) - if p.exists(): - raw = json.loads(p.read_text()) - A = np.array(raw["A"], dtype=np.float64) - b = np.array(raw["b"], dtype=np.float64) - meta = raw.get("meta", {}) - return A, b, meta - return np.identity(D, dtype=np.float64), np.zeros(D, dtype=np.float64), {} - - -def save_state(user_id: str, A: np.ndarray, b: np.ndarray, meta: dict) -> None: - p = state_path(user_id) - p.write_text(json.dumps({"A": A.tolist(), "b": b.tolist(), "meta": meta})) - - -# ── ε-greedy state helpers (d=7, extended features) ─────────────────────── - -def build_feature_vector_7(features: dict, day_of_week: int = 0) -> np.ndarray: - """d=7: base 5 features + day-of-week cyclical encoding.""" - base = build_feature_vector(features) - dow_sin = math.sin(2 * math.pi * day_of_week / 7) - dow_cos = math.cos(2 * math.pi * day_of_week / 7) - return np.append(base, [dow_sin, dow_cos]) - - -def state7_path(user_id: str) -> Path: - safe = "".join(c if c.isalnum() else "_" for c in user_id) - return STATE_DIR / f"{safe}_egreedy.json" - - -def load_state7(user_id: str) -> tuple[np.ndarray, np.ndarray, dict]: - """Returns (A, b, meta) for ε-greedy d=7 policy.""" - p = state7_path(user_id) - if p.exists(): - raw = json.loads(p.read_text()) - A = np.array(raw["A"], dtype=np.float64) - b = np.array(raw["b"], dtype=np.float64) - return A, b, raw.get("meta", {}) - return np.identity(D7, dtype=np.float64), np.zeros(D7, dtype=np.float64), {} - - -def save_state7(user_id: str, A: np.ndarray, b: np.ndarray, meta: dict) -> None: - p = state7_path(user_id) - p.write_text(json.dumps({"A": A.tolist(), "b": b.tolist(), "meta": meta})) - - -# ── ε-greedy v2 state helpers (d=12, profile features — ADR-0012) ───────── -# -# Normalization choices (see ADR-0012): -# completion_rate_30d — already 0–1, passthrough; null → 0 -# dismiss_rate_30d — already 0–1, passthrough; null → 0 -# mean_dwell_ms_30d — clipped to [0, 600_000 ms] (10 min), then /600_000 -# preferred_hour — circular alignment with context hour: -# (cos(2π·(now − pref)/24) + 1) / 2 → 0–1 -# captures "is the user's habitual peak near now?" -# null → 0.5 (neutral) -# tip_volume_30d — log1p(n) / log1p(100), clipped to [0, 1] - -_DWELL_CLIP_MS = 600_000.0 # 10 minutes -_VOLUME_LOG_MAX = math.log1p(100.0) - - -def _profile_value(profile: Optional[dict], key: str) -> Optional[float]: - if not profile: - return None - v = profile.get(key) - if v is None: - return None - try: - return float(v) - except (TypeError, ValueError): - return None - - -def _norm_rate(v: Optional[float]) -> float: - return 0.0 if v is None else max(0.0, min(1.0, v)) - - -def _norm_dwell(v: Optional[float]) -> float: - if v is None: - return 0.0 - return max(0.0, min(1.0, v / _DWELL_CLIP_MS)) - - -def _norm_volume(v: Optional[float]) -> float: - if v is None or v <= 0: - return 0.0 - return min(1.0, math.log1p(float(v)) / _VOLUME_LOG_MAX) - - -def _norm_preferred_hour(pref: Optional[float], now_hour: int) -> float: - if pref is None: - return 0.5 # neutral when the user has no established peak yet - delta = (float(pref) - float(now_hour)) * (2.0 * math.pi / 24.0) - return (math.cos(delta) + 1.0) / 2.0 - - -def build_feature_vector_12( - features: dict, - day_of_week: int = 0, - profile: Optional[dict] = None, -) -> np.ndarray: - """d=12: egreedy-v1's 7 dims + 5 normalized profile features (ADR-0012).""" - base7 = build_feature_vector_7(features, day_of_week) - now_hour = int(features.get("hour_of_day", 12)) - profile_dims = np.array( - [ - _norm_rate(_profile_value(profile, "completion_rate_30d")), - _norm_rate(_profile_value(profile, "dismiss_rate_30d")), - _norm_dwell(_profile_value(profile, "mean_dwell_ms_30d")), - _norm_preferred_hour(_profile_value(profile, "preferred_hour"), now_hour), - _norm_volume(_profile_value(profile, "tip_volume_30d")), - ], - dtype=np.float64, - ) - return np.concatenate([base7, profile_dims]) - - -def state12_path(user_id: str) -> Path: - safe = "".join(c if c.isalnum() else "_" for c in user_id) - return STATE_DIR / f"{safe}_egreedy_v2.json" - - -def load_state12(user_id: str) -> tuple[np.ndarray, np.ndarray, dict]: - p = state12_path(user_id) - if p.exists(): - raw = json.loads(p.read_text()) - A = np.array(raw["A"], dtype=np.float64) - b = np.array(raw["b"], dtype=np.float64) - return A, b, raw.get("meta", {}) - return np.identity(D12, dtype=np.float64), np.zeros(D12, dtype=np.float64), {} - - -def save_state12(user_id: str, A: np.ndarray, b: np.ndarray, meta: dict) -> None: - p = state12_path(user_id) - p.write_text(json.dumps({"A": A.tolist(), "b": b.tolist(), "meta": meta})) - # ── API models ───────────────────────────────────────────────────────────── -class CandidateFeatures(BaseModel): - hour_of_day: int = 12 - is_overdue: bool = False - task_age_days: float = 0.0 - priority: int = 1 - - -class Candidate(BaseModel): - id: str - content: str - source: str - source_id: Optional[str] = None - features: CandidateFeatures = CandidateFeatures() - - -class Context(BaseModel): - hour_of_day: int = 12 - day_of_week: int = 0 - - -class ScoreRequest(BaseModel): - user_id: str - candidates: list[Candidate] - context: Context = Context() - # User-level features computed by the API (#81 phase A). Accepted, logged, - # but not yet consumed by the bandit — extending the feature vector - # changes `D` and resets every user's learned state, which is a deliberate - # follow-up (phase B), not a side effect of this PR. - profile_features: Optional[dict] = None - - -class ScoreResponse(BaseModel): - tip_id: str - score: float - policy: str - - -class RewardRequest(BaseModel): - user_id: str - tip_id: str - reward: float # +1 done, +0.5 helpful, 0 snooze, -0.5 not_helpful, -1 dismiss - features: CandidateFeatures - day_of_week: int = 0 # included so egreedy can train dow features correctly - # Profile features at the time the tip was served. Ignored by /reward and - # /reward/egreedy; consumed by /reward/egreedy/v2 so the ridge update uses - # the same feature vector as the matching /score/egreedy/v2 call. - profile_features: Optional[dict] = None - - -class RewardResponse(BaseModel): - ok: bool - - class PromptContext(BaseModel): tasks: list[dict] = [] hour_of_day: int = 12 @@ -652,364 +405,3 @@ async def generate(req: GenerateRequest) -> GenerateResponse: ) -@app.post("/score", response_model=ScoreResponse) -def score(req: ScoreRequest) -> ScoreResponse: - if not req.candidates: - raise HTTPException(status_code=422, detail="No candidates") - - A, b, meta = load_state(req.user_id) - try: - A_inv = np.linalg.inv(A) - except np.linalg.LinAlgError: - A_inv = np.identity(D, dtype=np.float64) - - theta = A_inv @ b - - best_id = None - best_score = -float("inf") - best_features: dict = {} - - for candidate in req.candidates: - feat_dict = { - "hour_of_day": req.context.hour_of_day, - "is_overdue": candidate.features.is_overdue, - "task_age_days": candidate.features.task_age_days, - "priority": candidate.features.priority, - } - x = build_feature_vector(feat_dict) - exploit = float(theta @ x) - explore = ALPHA * math.sqrt(float(x @ A_inv @ x)) - ucb = exploit + explore - if ucb > best_score: - best_score = ucb - best_id = candidate.id - best_features = feat_dict - - # Log to feature history ring buffer - history = get_feature_history(req.user_id) - history.append({ - "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), - "features": best_features, - "score": best_score, - "tip_id": best_id, - }) - - # Update meta stats - meta["pulls"] = meta.get("pulls", 0) + 1 - meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) - save_state(req.user_id, A, b, meta) - - return ScoreResponse(tip_id=best_id, score=best_score, policy="linucb-v1") - - -@app.post("/reward", response_model=RewardResponse) -def reward(req: RewardRequest) -> RewardResponse: - A, b, meta = load_state(req.user_id) - feat_dict = { - "hour_of_day": req.features.hour_of_day, - "is_overdue": req.features.is_overdue, - "task_age_days": req.features.task_age_days, - "priority": req.features.priority, - } - x = build_feature_vector(feat_dict) - A += np.outer(x, x) - b += req.reward * x - - # Track cumulative reward in meta - meta["cumulative_reward"] = meta.get("cumulative_reward", 0.0) + req.reward - meta["reward_count"] = meta.get("reward_count", 0) + 1 - meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) - - save_state(req.user_id, A, b, meta) - return RewardResponse(ok=True) - - -@app.post("/score/egreedy", response_model=ScoreResponse) -def score_egreedy(req: ScoreRequest) -> ScoreResponse: - """ε-greedy policy with d=7 features (adds day-of-week encoding). - - Exploration: pick uniformly at random with probability ε. - Exploitation: pick argmax of linear payoff estimate θ·x. - Differs from LinUCB in: no UCB bonus, richer feature space. - """ - if not req.candidates: - raise HTTPException(status_code=422, detail="No candidates") - - A, b, meta = load_state7(req.user_id) - try: - A_inv = np.linalg.inv(A) - except np.linalg.LinAlgError: - A_inv = np.identity(D7, dtype=np.float64) - theta = A_inv @ b - - dow = req.context.day_of_week - exploring = np.random.random() < EPSILON - - if exploring: - chosen = req.candidates[np.random.randint(len(req.candidates))] - feat_dict = { - "hour_of_day": req.context.hour_of_day, - "is_overdue": chosen.features.is_overdue, - "task_age_days": chosen.features.task_age_days, - "priority": chosen.features.priority, - } - x = build_feature_vector_7(feat_dict, dow) - best_score = float(theta @ x) - best_id = chosen.id - else: - best_id = None - best_score = -float("inf") - feat_dict = {} - for candidate in req.candidates: - fd = { - "hour_of_day": req.context.hour_of_day, - "is_overdue": candidate.features.is_overdue, - "task_age_days": candidate.features.task_age_days, - "priority": candidate.features.priority, - } - x = build_feature_vector_7(fd, dow) - s = float(theta @ x) - if s > best_score: - best_score = s - best_id = candidate.id - feat_dict = fd - - history = get_feature_history(req.user_id) - history.append({ - "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), - "features": {**feat_dict, "day_of_week": dow, "exploring": exploring}, - "score": best_score, - "tip_id": best_id, - "policy": "egreedy-v1", - }) - - meta["pulls"] = meta.get("pulls", 0) + 1 - meta["explore_count"] = meta.get("explore_count", 0) + int(exploring) - meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) - save_state7(req.user_id, A, b, meta) - - return ScoreResponse(tip_id=best_id, score=best_score, policy="egreedy-v1") - - -@app.post("/reward/egreedy", response_model=RewardResponse) -def reward_egreedy(req: RewardRequest) -> RewardResponse: - """Update ε-greedy ridge estimator with observed reward.""" - A, b, meta = load_state7(req.user_id) - feat_dict = { - "hour_of_day": req.features.hour_of_day, - "is_overdue": req.features.is_overdue, - "task_age_days": req.features.task_age_days, - "priority": req.features.priority, - } - x = build_feature_vector_7(feat_dict, day_of_week=req.day_of_week) - A += np.outer(x, x) - b += req.reward * x - - meta["cumulative_reward"] = meta.get("cumulative_reward", 0.0) + req.reward - meta["reward_count"] = meta.get("reward_count", 0) + 1 - meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) - save_state7(req.user_id, A, b, meta) - return RewardResponse(ok=True) - - -@app.post("/score/egreedy/v2", response_model=ScoreResponse) -def score_egreedy_v2(req: ScoreRequest) -> ScoreResponse: - """ε-greedy v2 — d=12, adds 5 normalized profile features (ADR-0012). - - Shadow-only until offline sim + rollout per ADR-0002 completes. - Accepts the same ScoreRequest shape as v1; `profile_features` drives the - extra 5 dims (defaults: zeros for rates/volume/dwell, 0.5 neutral for - preferred_hour alignment). - """ - if not req.candidates: - raise HTTPException(status_code=422, detail="No candidates") - - A, b, meta = load_state12(req.user_id) - try: - A_inv = np.linalg.inv(A) - except np.linalg.LinAlgError: - A_inv = np.identity(D12, dtype=np.float64) - theta = A_inv @ b - - dow = req.context.day_of_week - exploring = np.random.random() < EPSILON - - if exploring: - chosen = req.candidates[np.random.randint(len(req.candidates))] - feat_dict = { - "hour_of_day": req.context.hour_of_day, - "is_overdue": chosen.features.is_overdue, - "task_age_days": chosen.features.task_age_days, - "priority": chosen.features.priority, - } - x = build_feature_vector_12(feat_dict, dow, req.profile_features) - best_score = float(theta @ x) - best_id = chosen.id - else: - best_id = None - best_score = -float("inf") - feat_dict = {} - for candidate in req.candidates: - fd = { - "hour_of_day": req.context.hour_of_day, - "is_overdue": candidate.features.is_overdue, - "task_age_days": candidate.features.task_age_days, - "priority": candidate.features.priority, - } - x = build_feature_vector_12(fd, dow, req.profile_features) - s = float(theta @ x) - if s > best_score: - best_score = s - best_id = candidate.id - feat_dict = fd - - history = get_feature_history(req.user_id) - history.append({ - "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), - "features": {**feat_dict, "day_of_week": dow, "exploring": exploring}, - "score": best_score, - "tip_id": best_id, - "policy": "egreedy-v2", - }) - - meta["pulls"] = meta.get("pulls", 0) + 1 - meta["explore_count"] = meta.get("explore_count", 0) + int(exploring) - meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) - save_state12(req.user_id, A, b, meta) - - return ScoreResponse(tip_id=best_id, score=best_score, policy="egreedy-v2") - - -@app.post("/reward/egreedy/v2", response_model=RewardResponse) -def reward_egreedy_v2(req: RewardRequest) -> RewardResponse: - """Update ε-greedy v2 ridge estimator using the d=12 feature vector.""" - A, b, meta = load_state12(req.user_id) - feat_dict = { - "hour_of_day": req.features.hour_of_day, - "is_overdue": req.features.is_overdue, - "task_age_days": req.features.task_age_days, - "priority": req.features.priority, - } - x = build_feature_vector_12(feat_dict, req.day_of_week, req.profile_features) - A += np.outer(x, x) - b += req.reward * x - - meta["cumulative_reward"] = meta.get("cumulative_reward", 0.0) + req.reward - meta["reward_count"] = meta.get("reward_count", 0) + 1 - meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) - save_state12(req.user_id, A, b, meta) - return RewardResponse(ok=True) - - -@app.get("/stats/egreedy/v2/{user_id}") -def stats_egreedy_v2(user_id: str): - """ε-greedy v2 policy stats — pulls, cumulative reward, θ vector.""" - A, b, meta = load_state12(user_id) - try: - theta = (np.linalg.inv(A) @ b).tolist() - except np.linalg.LinAlgError: - theta = [0.0] * D12 - - pulls = meta.get("pulls", 0) - cumulative_reward = meta.get("cumulative_reward", 0.0) - reward_count = meta.get("reward_count", 0) - explore_count = meta.get("explore_count", 0) - - return { - "user_id": user_id, - "policy": "egreedy-v2", - "pulls": pulls, - "reward_count": reward_count, - "cumulative_reward": cumulative_reward, - "estimated_mean_reward": cumulative_reward / reward_count if reward_count > 0 else 0.0, - "exploration_rate": explore_count / pulls if pulls > 0 else 0.0, - "theta": theta, - "feature_labels": [ - "hour_sin", "hour_cos", "is_overdue", "task_age", "priority", - "dow_sin", "dow_cos", - "completion_rate_30d", "dismiss_rate_30d", "mean_dwell_norm", - "preferred_hour_alignment", "tip_volume_norm", - ], - "last_updated": meta.get("last_updated"), - } - - -@app.get("/stats/egreedy/{user_id}") -def stats_egreedy(user_id: str): - """ε-greedy policy stats — pulls, cumulative reward, θ vector.""" - A, b, meta = load_state7(user_id) - try: - theta = (np.linalg.inv(A) @ b).tolist() - except np.linalg.LinAlgError: - theta = [0.0] * D7 - - pulls = meta.get("pulls", 0) - cumulative_reward = meta.get("cumulative_reward", 0.0) - reward_count = meta.get("reward_count", 0) - explore_count = meta.get("explore_count", 0) - - return { - "user_id": user_id, - "policy": "egreedy-v1", - "pulls": pulls, - "reward_count": reward_count, - "cumulative_reward": cumulative_reward, - "estimated_mean_reward": cumulative_reward / reward_count if reward_count > 0 else 0.0, - "exploration_rate": explore_count / pulls if pulls > 0 else 0.0, - "theta": theta, - "feature_labels": ["hour_sin", "hour_cos", "is_overdue", "task_age", "priority", "dow_sin", "dow_cos"], - "last_updated": meta.get("last_updated"), - } - - -@app.post("/reset/{user_id}", response_model=RewardResponse) -def reset(user_id: str) -> RewardResponse: - """Reset per-user bandit state (admin action).""" - p = state_path(user_id) - if p.exists(): - p.unlink() - p7 = state7_path(user_id) - if p7.exists(): - p7.unlink() - p12 = state12_path(user_id) - if p12.exists(): - p12.unlink() - if user_id in _feature_history: - _feature_history[user_id].clear() - return RewardResponse(ok=True) - - -@app.get("/stats/{user_id}") -def stats(user_id: str): - """Return current LinUCB state summary for a user.""" - A, b, meta = load_state(user_id) - try: - A_inv = np.linalg.inv(A) - theta = (A_inv @ b).tolist() - except np.linalg.LinAlgError: - theta = [0.0] * D - - pulls = meta.get("pulls", 0) - cumulative_reward = meta.get("cumulative_reward", 0.0) - reward_count = meta.get("reward_count", 0) - estimated_mean = cumulative_reward / reward_count if reward_count > 0 else 0.0 - - return { - "user_id": user_id, - "pulls": pulls, - "reward_count": reward_count, - "cumulative_reward": cumulative_reward, - "estimated_mean_reward": estimated_mean, - "theta": theta, - "last_updated": meta.get("last_updated"), - } - - -@app.get("/features/{user_id}") -def features(user_id: str): - """Return recent feature vectors logged at scoring time.""" - history = get_feature_history(user_id) - return { - "user_id": user_id, - "history": list(history), - } diff --git a/ml/serving/tests/test_score.py b/ml/serving/tests/test_score.py deleted file mode 100644 index e32b1b5..0000000 --- a/ml/serving/tests/test_score.py +++ /dev/null @@ -1,439 +0,0 @@ -""" -Unit tests for ml/serving — feature building and scoring contract. -Run with: pytest ml/serving/tests/ -""" -import math -import pytest -from httpx import AsyncClient, ASGITransport - -from main import ( - app, - build_feature_vector, - build_feature_vector_12, - _norm_dwell, - _norm_preferred_hour, - _norm_rate, - _norm_volume, -) - - -class TestFeatureVector: - def test_shape(self): - v = build_feature_vector({"hour_of_day": 8, "is_overdue": True, "task_age_days": 3, "priority": 3}) - assert v.shape == (5,) - - def test_hour_encoding_noon(self): - v = build_feature_vector({"hour_of_day": 12}) - # sin(2π * 12/24) = sin(π) ≈ 0 - assert abs(v[0]) < 1e-10 - # cos(2π * 12/24) = cos(π) = -1 - assert abs(v[1] - (-1.0)) < 1e-10 - - def test_hour_encoding_midnight(self): - v = build_feature_vector({"hour_of_day": 0}) - # sin(0) = 0 - assert abs(v[0]) < 1e-10 - # cos(0) = 1 - assert abs(v[1] - 1.0) < 1e-10 - - def test_hour_encoding_6am(self): - v = build_feature_vector({"hour_of_day": 6}) - # sin(2π * 6/24) = sin(π/2) = 1 - assert abs(v[0] - 1.0) < 1e-10 - # cos(π/2) = 0 - assert abs(v[1]) < 1e-10 - - def test_age_clipped_at_30(self): - v_long = build_feature_vector({"task_age_days": 100}) - v_cap = build_feature_vector({"task_age_days": 30}) - assert v_long[3] == v_cap[3] == 1.0 - - def test_age_zero(self): - v = build_feature_vector({"task_age_days": 0}) - assert v[3] == pytest.approx(0.0) - - def test_age_15_days_normalised(self): - v = build_feature_vector({"task_age_days": 15}) - assert v[3] == pytest.approx(0.5) - - def test_priority_normalised(self): - v1 = build_feature_vector({"priority": 1}) - v4 = build_feature_vector({"priority": 4}) - assert v1[4] == pytest.approx(0.0) - assert v4[4] == pytest.approx(1.0) - - def test_priority_2_and_3(self): - v2 = build_feature_vector({"priority": 2}) - v3 = build_feature_vector({"priority": 3}) - assert v2[4] == pytest.approx(1 / 3) - assert v3[4] == pytest.approx(2 / 3) - - def test_is_overdue_true(self): - v = build_feature_vector({"is_overdue": True}) - assert v[2] == 1.0 - - def test_is_overdue_false(self): - v = build_feature_vector({"is_overdue": False}) - assert v[2] == 0.0 - - def test_defaults_when_no_keys(self): - v = build_feature_vector({}) - # hour=12 → sin(π)≈0, cos(π)=-1 - assert abs(v[0]) < 1e-10 - assert abs(v[1] - (-1.0)) < 1e-10 - assert v[2] == 0.0 # is_overdue=False - assert v[3] == 0.0 # task_age_days=0 - assert v[4] == 0.0 # priority=1 → (1-1)/3=0 - - -@pytest.mark.asyncio -async def test_health(): - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - r = await client.get("/health") - assert r.status_code == 200 - assert r.json()["ok"] is True - - -@pytest.mark.asyncio -async def test_score_returns_a_candidate(): - payload = { - "user_id": "test-user", - "candidates": [ - {"id": "t:1", "content": "Task A", "source": "todoist", "source_id": "1", - "features": {"is_overdue": True, "task_age_days": 2, "priority": 3}}, - {"id": "t:2", "content": "Task B", "source": "todoist", "source_id": "2", - "features": {"is_overdue": False, "task_age_days": 0, "priority": 1}}, - ], - "context": {"hour_of_day": 9, "day_of_week": 1}, - } - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - r = await client.post("/score", json=payload) - assert r.status_code == 200 - body = r.json() - assert body["tip_id"] in {"t:1", "t:2"} - assert "policy" in body - assert body["policy"] == "linucb-v1" - assert isinstance(body["score"], float) - - -@pytest.mark.asyncio -async def test_score_single_candidate_always_selected(): - """With a single candidate there is no choice — it must be returned.""" - payload = { - "user_id": "solo-user", - "candidates": [ - {"id": "only:1", "content": "Only task", "source": "todoist", - "features": {"is_overdue": False, "task_age_days": 0, "priority": 1}}, - ], - "context": {"hour_of_day": 10, "day_of_week": 0}, - } - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - r = await client.post("/score", json=payload) - assert r.status_code == 200 - assert r.json()["tip_id"] == "only:1" - - -@pytest.mark.asyncio -async def test_score_empty_candidates_returns_422(): - payload = {"user_id": "u", "candidates": [], "context": {"hour_of_day": 9, "day_of_week": 1}} - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - r = await client.post("/score", json=payload) - assert r.status_code == 422 - - -@pytest.mark.asyncio -async def test_reward_accepted(): - payload = { - "user_id": "reward-user", - "tip_id": "t:1", - "reward": 1.0, - "features": {"hour_of_day": 9, "is_overdue": True, "task_age_days": 2, "priority": 3}, - } - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - r = await client.post("/reward", json=payload) - assert r.status_code == 200 - assert r.json()["ok"] is True - - -@pytest.mark.asyncio -async def test_reward_updates_stats(): - """Posting a reward should increase cumulative_reward in /stats.""" - user_id = "reward-stats-user" - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - r0 = await client.get(f"/stats/{user_id}") - before = r0.json()["cumulative_reward"] - - await client.post("/reward", json={ - "user_id": user_id, - "tip_id": "tip:x", - "reward": 1.0, - "features": {"hour_of_day": 8, "is_overdue": False, "task_age_days": 0, "priority": 2}, - }) - r1 = await client.get(f"/stats/{user_id}") - assert r1.json()["cumulative_reward"] == pytest.approx(before + 1.0) - - -@pytest.mark.asyncio -async def test_score_increments_pulls(): - user_id = "pull-counter-user" - payload = { - "user_id": user_id, - "candidates": [ - {"id": "t:p1", "content": "Pull task", "source": "todoist", - "features": {"is_overdue": False, "task_age_days": 1, "priority": 2}}, - ], - "context": {"hour_of_day": 10, "day_of_week": 2}, - } - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - r0 = await client.get(f"/stats/{user_id}") - pulls_before = r0.json()["pulls"] - - await client.post("/score", json=payload) - await client.post("/score", json=payload) - - r1 = await client.get(f"/stats/{user_id}") - assert r1.json()["pulls"] == pulls_before + 2 - - -@pytest.mark.asyncio -async def test_reset_clears_state(): - user_id = "reset-user" - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - # Score once to build state - await client.post("/score", json={ - "user_id": user_id, - "candidates": [ - {"id": "t:r", "content": "Reset task", "source": "todoist", - "features": {"is_overdue": True, "task_age_days": 5, "priority": 4}}, - ], - "context": {"hour_of_day": 14, "day_of_week": 3}, - }) - r_reset = await client.post(f"/reset/{user_id}") - assert r_reset.json()["ok"] is True - - r_stats = await client.get(f"/stats/{user_id}") - assert r_stats.json()["pulls"] == 0 - - -@pytest.mark.asyncio -async def test_features_endpoint_returns_history(): - user_id = "features-user" - payload = { - "user_id": user_id, - "candidates": [ - {"id": "t:f1", "content": "Feature task", "source": "todoist", - "features": {"is_overdue": False, "task_age_days": 0, "priority": 1}}, - ], - "context": {"hour_of_day": 7, "day_of_week": 0}, - } - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - await client.post("/score", json=payload) - r = await client.get(f"/features/{user_id}") - body = r.json() - assert r.status_code == 200 - assert "history" in body - assert len(body["history"]) >= 1 - entry = body["history"][-1] - assert "ts" in entry - assert "score" in entry - assert "tip_id" in entry - - -@pytest.mark.asyncio -async def test_stats_for_fresh_user(): - """A user with no history should return zero/default stats without error.""" - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - r = await client.get("/stats/brand-new-user-xyz-abc") - body = r.json() - assert r.status_code == 200 - assert body["pulls"] == 0 - assert body["cumulative_reward"] == 0.0 - assert body["estimated_mean_reward"] == 0.0 - - -class TestV2Normalization: - def test_rate_passthrough(self): - assert _norm_rate(0.0) == 0.0 - assert _norm_rate(0.42) == 0.42 - assert _norm_rate(1.0) == 1.0 - - def test_rate_none_zero(self): - assert _norm_rate(None) == 0.0 - - def test_rate_clipped(self): - assert _norm_rate(1.5) == 1.0 - assert _norm_rate(-0.1) == 0.0 - - def test_dwell_none_zero(self): - assert _norm_dwell(None) == 0.0 - - def test_dwell_scales_to_0_1(self): - assert _norm_dwell(0) == 0.0 - # 600_000 ms (10 min) is the clip ceiling - assert _norm_dwell(600_000) == 1.0 - assert _norm_dwell(1_200_000) == 1.0 - assert _norm_dwell(60_000) == pytest.approx(0.1) - - def test_volume_monotonic_and_clipped(self): - assert _norm_volume(None) == 0.0 - assert _norm_volume(0) == 0.0 - assert _norm_volume(10) < _norm_volume(100) - # 100 tips ≈ full saturation - assert _norm_volume(100) == pytest.approx(1.0) - assert _norm_volume(10_000) == 1.0 - - def test_preferred_hour_alignment(self): - # Exact match → 1.0 - assert _norm_preferred_hour(9, 9) == pytest.approx(1.0) - # 12h opposite → 0.0 - assert _norm_preferred_hour(21, 9) == pytest.approx(0.0, abs=1e-10) - # 6h off → 0.5 (cos(π/2) = 0, scaled to 0.5) - assert _norm_preferred_hour(15, 9) == pytest.approx(0.5, abs=1e-10) - - def test_preferred_hour_null_neutral(self): - # Null preference → neutral 0.5 rather than misleading "alignment at 0" - assert _norm_preferred_hour(None, 9) == 0.5 - - -class TestFeatureVector12: - def test_shape(self): - v = build_feature_vector_12( - {"hour_of_day": 9, "is_overdue": True, "task_age_days": 2, "priority": 3}, - day_of_week=2, - profile={ - "completion_rate_30d": 0.5, - "dismiss_rate_30d": 0.1, - "mean_dwell_ms_30d": 60_000, - "preferred_hour": 9, - "tip_volume_30d": 20, - }, - ) - assert v.shape == (12,) - - def test_first_seven_match_v1(self): - """v2 must reduce to v1-style features on the first 7 dims so rollout - behaviour is predictable when profile is absent.""" - from main import build_feature_vector_7 - feat = {"hour_of_day": 14, "is_overdue": True, "task_age_days": 5, "priority": 2} - v1 = build_feature_vector_7(feat, day_of_week=3) - v2 = build_feature_vector_12(feat, day_of_week=3, profile=None) - assert (v1 == v2[:7]).all() - - def test_missing_profile_defaults(self): - v = build_feature_vector_12({"hour_of_day": 9}, day_of_week=0, profile=None) - # completion, dismiss, dwell, volume → 0; preferred_hour → 0.5 neutral - assert v[7] == 0.0 - assert v[8] == 0.0 - assert v[9] == 0.0 - assert v[10] == pytest.approx(0.5) - assert v[11] == 0.0 - - -@pytest.mark.asyncio -async def test_score_egreedy_v2_returns_candidate(): - payload = { - "user_id": "v2-user", - "candidates": [ - {"id": "t:a", "content": "A", "source": "todoist", - "features": {"is_overdue": True, "task_age_days": 2, "priority": 3}}, - {"id": "t:b", "content": "B", "source": "todoist", - "features": {"is_overdue": False, "task_age_days": 0, "priority": 1}}, - ], - "context": {"hour_of_day": 9, "day_of_week": 1}, - "profile_features": { - "completion_rate_30d": 0.4, - "dismiss_rate_30d": 0.1, - "mean_dwell_ms_30d": 45_000, - "preferred_hour": 9, - "tip_volume_30d": 8, - }, - } - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - r = await client.post("/score/egreedy/v2", json=payload) - assert r.status_code == 200 - body = r.json() - assert body["tip_id"] in {"t:a", "t:b"} - assert body["policy"] == "egreedy-v2" - - -@pytest.mark.asyncio -async def test_score_egreedy_v2_accepts_missing_profile(): - payload = { - "user_id": "v2-no-profile", - "candidates": [ - {"id": "t:solo", "content": "Solo", "source": "todoist", - "features": {"is_overdue": False, "task_age_days": 0, "priority": 1}}, - ], - "context": {"hour_of_day": 10, "day_of_week": 0}, - } - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - r = await client.post("/score/egreedy/v2", json=payload) - assert r.status_code == 200 - assert r.json()["tip_id"] == "t:solo" - - -@pytest.mark.asyncio -async def test_reward_egreedy_v2_updates_stats(): - user_id = "v2-reward-stats" - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - r0 = await client.get(f"/stats/egreedy/v2/{user_id}") - before = r0.json()["cumulative_reward"] - - await client.post("/reward/egreedy/v2", json={ - "user_id": user_id, - "tip_id": "t:r", - "reward": 1.0, - "features": {"hour_of_day": 9, "is_overdue": True, "task_age_days": 2, "priority": 3}, - "day_of_week": 1, - "profile_features": { - "completion_rate_30d": 0.3, - "dismiss_rate_30d": 0.2, - "mean_dwell_ms_30d": 30_000, - "preferred_hour": 9, - "tip_volume_30d": 5, - }, - }) - r1 = await client.get(f"/stats/egreedy/v2/{user_id}") - body = r1.json() - assert body["cumulative_reward"] == pytest.approx(before + 1.0) - assert body["policy"] == "egreedy-v2" - assert len(body["theta"]) == 12 - assert len(body["feature_labels"]) == 12 - - -@pytest.mark.asyncio -async def test_reset_clears_v2_state(): - user_id = "v2-reset" - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - await client.post("/score/egreedy/v2", json={ - "user_id": user_id, - "candidates": [ - {"id": "t:v2r", "content": "x", "source": "todoist", - "features": {"is_overdue": False, "task_age_days": 0, "priority": 1}}, - ], - "context": {"hour_of_day": 10, "day_of_week": 0}, - }) - r0 = await client.get(f"/stats/egreedy/v2/{user_id}") - assert r0.json()["pulls"] >= 1 - - await client.post(f"/reset/{user_id}") - r1 = await client.get(f"/stats/egreedy/v2/{user_id}") - assert r1.json()["pulls"] == 0 - - -@pytest.mark.asyncio -async def test_reward_negative_value(): - """Dismissing a tip should decrease cumulative_reward.""" - user_id = "dismiss-user-neg" - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: - r0 = await client.get(f"/stats/{user_id}") - before = r0.json()["cumulative_reward"] - - await client.post("/reward", json={ - "user_id": user_id, - "tip_id": "t:neg", - "reward": -1.0, - "features": {"hour_of_day": 20, "is_overdue": False, "task_age_days": 0, "priority": 1}, - }) - r1 = await client.get(f"/stats/{user_id}") - assert r1.json()["cumulative_reward"] == pytest.approx(before - 1.0)