""" oO ML Serving — Phase 1: LinUCB contextual bandit. Contract: POST /score { user_id, candidates, context } → { tip_id, score, policy } POST /reward { user_id, tip_id, reward, features } → { ok } POST /reset/{user_id} → { ok } GET /stats/{user_id} → { pulls, cumulative_reward, estimated_mean, last_updated } GET /features/{user_id} → { history: [{ ts, features, score }] } GET /health → { ok } Features (d=5): hour_sin, hour_cos — cyclical time-of-day encoding is_overdue — 0 or 1 task_age_days — days since due date (clipped 0–30, normalised 0–1) priority_norm — Todoist priority 1–4, normalised to 0–1 """ from __future__ import annotations import json import math import os import time from collections import deque from pathlib import Path from typing import Optional, Deque import httpx import numpy as np from fastapi import FastAPI, HTTPException from pydantic import BaseModel from prompts import get_prompt app = FastAPI(title="oO ML Serving", version="1.0.0") LITELLM_URL = os.getenv("LITELLM_URL", "http://localhost:4000") LITELLM_MASTER_KEY = os.getenv("LITELLM_MASTER_KEY", "sk-oo-dev") STATE_DIR = Path(os.getenv("STATE_DIR", "/tmp/oo-bandit-state")) STATE_DIR.mkdir(parents=True, exist_ok=True) ALPHA = 1.0 # LinUCB exploration coefficient D = 5 # LinUCB feature dimension D7 = 7 # ε-greedy feature dimension (adds day-of-week cyclical encoding) EPSILON = 0.1 # ε-greedy exploration rate FEATURE_HISTORY_SIZE = 100 # per-user ring buffer # ── Per-user in-memory feature history ──────────────────────────────────── _feature_history: dict[str, deque] = {} def get_feature_history(user_id: str) -> deque: if user_id not in _feature_history: _feature_history[user_id] = deque(maxlen=FEATURE_HISTORY_SIZE) return _feature_history[user_id] # ── Feature helpers ──────────────────────────────────────────────────────── def build_feature_vector(features: dict) -> np.ndarray: hour = features.get("hour_of_day", 12) hour_sin = math.sin(2 * math.pi * hour / 24) hour_cos = math.cos(2 * math.pi * hour / 24) is_overdue = float(bool(features.get("is_overdue", False))) age = min(float(features.get("task_age_days", 0)), 30.0) / 30.0 priority = (float(features.get("priority", 1)) - 1.0) / 3.0 return np.array([hour_sin, hour_cos, is_overdue, age, priority], dtype=np.float64) # ── Per-user bandit state (disjoint LinUCB, global arm) ─────────────────── # ── LinUCB state helpers ─────────────────────────────────────────────────── def state_path(user_id: str) -> Path: safe = "".join(c if c.isalnum() else "_" for c in user_id) return STATE_DIR / f"{safe}.json" def load_state(user_id: str) -> tuple[np.ndarray, np.ndarray, dict]: """Returns (A, b, meta). A is DxD, b is D-vector.""" p = state_path(user_id) if p.exists(): raw = json.loads(p.read_text()) A = np.array(raw["A"], dtype=np.float64) b = np.array(raw["b"], dtype=np.float64) meta = raw.get("meta", {}) return A, b, meta return np.identity(D, dtype=np.float64), np.zeros(D, dtype=np.float64), {} def save_state(user_id: str, A: np.ndarray, b: np.ndarray, meta: dict) -> None: p = state_path(user_id) p.write_text(json.dumps({"A": A.tolist(), "b": b.tolist(), "meta": meta})) # ── ε-greedy state helpers (d=7, extended features) ─────────────────────── def build_feature_vector_7(features: dict, day_of_week: int = 0) -> np.ndarray: """d=7: base 5 features + day-of-week cyclical encoding.""" base = build_feature_vector(features) dow_sin = math.sin(2 * math.pi * day_of_week / 7) dow_cos = math.cos(2 * math.pi * day_of_week / 7) return np.append(base, [dow_sin, dow_cos]) def state7_path(user_id: str) -> Path: safe = "".join(c if c.isalnum() else "_" for c in user_id) return STATE_DIR / f"{safe}_egreedy.json" def load_state7(user_id: str) -> tuple[np.ndarray, np.ndarray, dict]: """Returns (A, b, meta) for ε-greedy d=7 policy.""" p = state7_path(user_id) if p.exists(): raw = json.loads(p.read_text()) A = np.array(raw["A"], dtype=np.float64) b = np.array(raw["b"], dtype=np.float64) return A, b, raw.get("meta", {}) return np.identity(D7, dtype=np.float64), np.zeros(D7, dtype=np.float64), {} def save_state7(user_id: str, A: np.ndarray, b: np.ndarray, meta: dict) -> None: p = state7_path(user_id) p.write_text(json.dumps({"A": A.tolist(), "b": b.tolist(), "meta": meta})) # ── API models ───────────────────────────────────────────────────────────── class CandidateFeatures(BaseModel): hour_of_day: int = 12 is_overdue: bool = False task_age_days: float = 0.0 priority: int = 1 class Candidate(BaseModel): id: str content: str source: str source_id: Optional[str] = None features: CandidateFeatures = CandidateFeatures() class Context(BaseModel): hour_of_day: int = 12 day_of_week: int = 0 class ScoreRequest(BaseModel): user_id: str candidates: list[Candidate] context: Context = Context() class ScoreResponse(BaseModel): tip_id: str score: float policy: str class RewardRequest(BaseModel): user_id: str tip_id: str reward: float # +1 done, +0.5 helpful, 0 snooze, -0.5 not_helpful, -1 dismiss features: CandidateFeatures day_of_week: int = 0 # included so egreedy can train dow features correctly class RewardResponse(BaseModel): ok: bool class PromptContext(BaseModel): tasks: list[dict] = [] hour_of_day: int = 12 day_of_week: int = 0 extra: dict = {} class GenerateRequest(BaseModel): user_id: str context: PromptContext = PromptContext() n: int = 3 prompt_version: Optional[str] = None # None → server default (env DEFAULT_PROMPT_VERSION) class TipCandidate(BaseModel): id: str content: str source: str = "llm" rationale: Optional[str] = None class GenerateResponse(BaseModel): candidates: list[TipCandidate] model: str prompt_version: str prompt_tokens: int = 0 completion_tokens: int = 0 # ── Endpoints ────────────────────────────────────────────────────────────── @app.get("/health") def health(): return {"ok": True} _RETRY_SUFFIX = ( "\n\nYour previous response was not valid JSON. " "Reply ONLY with the JSON array — no prose, no markdown fences." ) _MAX_GENERATE_RETRIES = 2 def _parse_llm_json(raw: str) -> list[dict]: """Strip markdown fences and parse JSON array. Raises ValueError on failure.""" text = raw.strip() if text.startswith("```"): parts = text.split("```") text = parts[1] if len(parts) > 1 else text if text.startswith("json"): text = text[4:] return json.loads(text) @app.post("/generate", response_model=GenerateResponse) async def generate(req: GenerateRequest) -> GenerateResponse: """Generate tip candidates via LiteLLM → tip-generator alias. Retries up to _MAX_GENERATE_RETRIES times on malformed JSON, appending a correction hint to the conversation so the model can self-correct. """ try: prompt_template = get_prompt(req.prompt_version) except KeyError as e: raise HTTPException(status_code=422, detail=f"Unknown prompt_version: {e.args[0]}") user_msg = prompt_template.build_user(req.context, req.n) messages: list[dict] = [ {"role": "system", "content": prompt_template.system}, {"role": "user", "content": user_msg}, ] headers = {"Authorization": f"Bearer {LITELLM_MASTER_KEY}"} last_parse_error: str = "" last_raw: str = "" total_usage: dict = {"prompt_tokens": 0, "completion_tokens": 0} model_used = "tip-generator" async with httpx.AsyncClient(timeout=30.0) as client: for attempt in range(1 + _MAX_GENERATE_RETRIES): payload = {"model": "tip-generator", "messages": messages, "temperature": 0.7} try: resp = await client.post( f"{LITELLM_URL}/chat/completions", json=payload, headers=headers, ) resp.raise_for_status() except httpx.HTTPStatusError as e: raise HTTPException(status_code=502, detail=f"LiteLLM error: {e.response.text}") except httpx.RequestError as e: raise HTTPException(status_code=503, detail=f"LiteLLM unreachable: {e}") data = resp.json() usage = data.get("usage", {}) total_usage["prompt_tokens"] += usage.get("prompt_tokens", 0) total_usage["completion_tokens"] += usage.get("completion_tokens", 0) model_used = data.get("model", "tip-generator") last_raw = data["choices"][0]["message"]["content"] try: items = _parse_llm_json(last_raw) break except (json.JSONDecodeError, ValueError) as e: last_parse_error = str(e) # Feed the bad reply back so the model can self-correct messages.append({"role": "assistant", "content": last_raw}) messages.append({"role": "user", "content": _RETRY_SUFFIX}) else: raise HTTPException( status_code=502, detail=f"LLM returned invalid JSON after {_MAX_GENERATE_RETRIES} retries: " f"{last_parse_error}\n{last_raw[:200]}", ) candidates = [ TipCandidate( id=item.get("id", f"tip-{i}"), content=item.get("content", ""), rationale=item.get("rationale"), ) for i, item in enumerate(items) ] return GenerateResponse( candidates=candidates, model=model_used, prompt_version=prompt_template.version, prompt_tokens=total_usage["prompt_tokens"], completion_tokens=total_usage["completion_tokens"], ) @app.post("/score", response_model=ScoreResponse) def score(req: ScoreRequest) -> ScoreResponse: if not req.candidates: raise HTTPException(status_code=422, detail="No candidates") A, b, meta = load_state(req.user_id) try: A_inv = np.linalg.inv(A) except np.linalg.LinAlgError: A_inv = np.identity(D, dtype=np.float64) theta = A_inv @ b best_id = None best_score = -float("inf") best_features: dict = {} for candidate in req.candidates: feat_dict = { "hour_of_day": req.context.hour_of_day, "is_overdue": candidate.features.is_overdue, "task_age_days": candidate.features.task_age_days, "priority": candidate.features.priority, } x = build_feature_vector(feat_dict) exploit = float(theta @ x) explore = ALPHA * math.sqrt(float(x @ A_inv @ x)) ucb = exploit + explore if ucb > best_score: best_score = ucb best_id = candidate.id best_features = feat_dict # Log to feature history ring buffer history = get_feature_history(req.user_id) history.append({ "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "features": best_features, "score": best_score, "tip_id": best_id, }) # Update meta stats meta["pulls"] = meta.get("pulls", 0) + 1 meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) save_state(req.user_id, A, b, meta) return ScoreResponse(tip_id=best_id, score=best_score, policy="linucb-v1") @app.post("/reward", response_model=RewardResponse) def reward(req: RewardRequest) -> RewardResponse: A, b, meta = load_state(req.user_id) feat_dict = { "hour_of_day": req.features.hour_of_day, "is_overdue": req.features.is_overdue, "task_age_days": req.features.task_age_days, "priority": req.features.priority, } x = build_feature_vector(feat_dict) A += np.outer(x, x) b += req.reward * x # Track cumulative reward in meta meta["cumulative_reward"] = meta.get("cumulative_reward", 0.0) + req.reward meta["reward_count"] = meta.get("reward_count", 0) + 1 meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) save_state(req.user_id, A, b, meta) return RewardResponse(ok=True) @app.post("/score/egreedy", response_model=ScoreResponse) def score_egreedy(req: ScoreRequest) -> ScoreResponse: """ε-greedy policy with d=7 features (adds day-of-week encoding). Exploration: pick uniformly at random with probability ε. Exploitation: pick argmax of linear payoff estimate θ·x. Differs from LinUCB in: no UCB bonus, richer feature space. """ if not req.candidates: raise HTTPException(status_code=422, detail="No candidates") A, b, meta = load_state7(req.user_id) try: A_inv = np.linalg.inv(A) except np.linalg.LinAlgError: A_inv = np.identity(D7, dtype=np.float64) theta = A_inv @ b dow = req.context.day_of_week exploring = np.random.random() < EPSILON if exploring: chosen = req.candidates[np.random.randint(len(req.candidates))] feat_dict = { "hour_of_day": req.context.hour_of_day, "is_overdue": chosen.features.is_overdue, "task_age_days": chosen.features.task_age_days, "priority": chosen.features.priority, } x = build_feature_vector_7(feat_dict, dow) best_score = float(theta @ x) best_id = chosen.id else: best_id = None best_score = -float("inf") feat_dict = {} for candidate in req.candidates: fd = { "hour_of_day": req.context.hour_of_day, "is_overdue": candidate.features.is_overdue, "task_age_days": candidate.features.task_age_days, "priority": candidate.features.priority, } x = build_feature_vector_7(fd, dow) s = float(theta @ x) if s > best_score: best_score = s best_id = candidate.id feat_dict = fd history = get_feature_history(req.user_id) history.append({ "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "features": {**feat_dict, "day_of_week": dow, "exploring": exploring}, "score": best_score, "tip_id": best_id, "policy": "egreedy-v1", }) meta["pulls"] = meta.get("pulls", 0) + 1 meta["explore_count"] = meta.get("explore_count", 0) + int(exploring) meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) save_state7(req.user_id, A, b, meta) return ScoreResponse(tip_id=best_id, score=best_score, policy="egreedy-v1") @app.post("/reward/egreedy", response_model=RewardResponse) def reward_egreedy(req: RewardRequest) -> RewardResponse: """Update ε-greedy ridge estimator with observed reward.""" A, b, meta = load_state7(req.user_id) feat_dict = { "hour_of_day": req.features.hour_of_day, "is_overdue": req.features.is_overdue, "task_age_days": req.features.task_age_days, "priority": req.features.priority, } x = build_feature_vector_7(feat_dict, day_of_week=req.day_of_week) A += np.outer(x, x) b += req.reward * x meta["cumulative_reward"] = meta.get("cumulative_reward", 0.0) + req.reward meta["reward_count"] = meta.get("reward_count", 0) + 1 meta["last_updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) save_state7(req.user_id, A, b, meta) return RewardResponse(ok=True) @app.get("/stats/egreedy/{user_id}") def stats_egreedy(user_id: str): """ε-greedy policy stats — pulls, cumulative reward, θ vector.""" A, b, meta = load_state7(user_id) try: theta = (np.linalg.inv(A) @ b).tolist() except np.linalg.LinAlgError: theta = [0.0] * D7 pulls = meta.get("pulls", 0) cumulative_reward = meta.get("cumulative_reward", 0.0) reward_count = meta.get("reward_count", 0) explore_count = meta.get("explore_count", 0) return { "user_id": user_id, "policy": "egreedy-v1", "pulls": pulls, "reward_count": reward_count, "cumulative_reward": cumulative_reward, "estimated_mean_reward": cumulative_reward / reward_count if reward_count > 0 else 0.0, "exploration_rate": explore_count / pulls if pulls > 0 else 0.0, "theta": theta, "feature_labels": ["hour_sin", "hour_cos", "is_overdue", "task_age", "priority", "dow_sin", "dow_cos"], "last_updated": meta.get("last_updated"), } @app.post("/reset/{user_id}", response_model=RewardResponse) def reset(user_id: str) -> RewardResponse: """Reset per-user bandit state (admin action).""" p = state_path(user_id) if p.exists(): p.unlink() p7 = state7_path(user_id) if p7.exists(): p7.unlink() if user_id in _feature_history: _feature_history[user_id].clear() return RewardResponse(ok=True) @app.get("/stats/{user_id}") def stats(user_id: str): """Return current LinUCB state summary for a user.""" A, b, meta = load_state(user_id) try: A_inv = np.linalg.inv(A) theta = (A_inv @ b).tolist() except np.linalg.LinAlgError: theta = [0.0] * D pulls = meta.get("pulls", 0) cumulative_reward = meta.get("cumulative_reward", 0.0) reward_count = meta.get("reward_count", 0) estimated_mean = cumulative_reward / reward_count if reward_count > 0 else 0.0 return { "user_id": user_id, "pulls": pulls, "reward_count": reward_count, "cumulative_reward": cumulative_reward, "estimated_mean_reward": estimated_mean, "theta": theta, "last_updated": meta.get("last_updated"), } @app.get("/features/{user_id}") def features(user_id: str): """Return recent feature vectors logged at scoring time.""" history = get_feature_history(user_id) return { "user_id": user_id, "history": list(history), }