# ml/serving FastAPI online scorer, tip generator, and JetStream consumer. ## Contract | Endpoint | Description | |----------|-------------| | `POST /score` | LinUCB d=5 (baseline, shadow-eligible) | | `POST /score/egreedy` | ε-greedy v1, d=7 (active policy — ADR-0007) | | `POST /score/egreedy/v2` | ε-greedy v2, d=12 + profile features (shadow — ADR-0012) | | `POST /reward` / `/reward/egreedy` / `/reward/egreedy/v2` | Online reward update per policy | | `POST /generate` | LLM tip candidates via LiteLLM `tip-generator` alias | | `GET /stats/{user_id}` / `/stats/egreedy/{user_id}` / `/stats/egreedy/v2/{user_id}` | Per-user policy stats | | `GET /features/{user_id}` | Last 100 scored feature vectors (ring buffer) | | `POST /reset/{user_id}` | Clear all per-user bandit state (admin) | | `GET /health` | `{ ok, nats: { enabled, consumers: { signals, feedback } } }` | Called by `services/api/src/recommender/` over HTTP. Contract is stable across policy swaps. ## Feature dimensions | Policy | d | Extra dims vs previous | |--------|---|------------------------| | LinUCB v1 | 5 | hour_sin/cos, is_overdue, task_age, priority | | ε-greedy v1 | 7 | + dow_sin/cos | | ε-greedy v2 | 12 | + 5 profile features (ADR-0012) | Profile features are computed by the TypeScript API and shipped on each `/score` call as `profile_features`. See `ml/README.md` and ADR-0011. ## JetStream consumers On startup, `nats_consumer.py` registers two durable push consumers against NATS JetStream: | Consumer | Stream | Subjects | Durable name | |----------|--------|----------|--------------| | signals | `signals` | `signals.>` | `feature-pipeline-signals` | | feedback | `feedback` | `feedback.>` | `feature-pipeline-feedback` | **Handled subjects:** - `signals.task.synced` — writes `{last_sync_ts, task_count}` to `{STATE_DIR}/{user}_sync.json` - `signals.tip.feedback` — logged for observability; reward update happens via the HTTP path in the recommender **Payload validation:** each message is validated against the pydantic models in `schemas.py` (mirroring `packages/shared-types/events/oo/events/v1/`). A `ValidationError` triggers a nak so the message is redelivered rather than silently dropped. **Ack semantics:** explicit ack on success; nak for redelivery on error; dead-lettered after `NATS_MAX_DELIVER` attempts. **Disabled** when `NATS_URL` is unset (default in local dev without NATS). No import of `nats-py` occurs in that case. ## Observability Logs are structured JSON via **structlog**. Every line includes `level`, `logger`, `timestamp`, and — when a W3C `traceparent` header is present on the incoming request — `trace_id` bound via Python `contextvars`, so all log lines within a request carry the same trace ID as the upstream API call. Sentry error capture is active when `SENTRY_DSN` is set. ## Config | Env var | Default | Description | |---------|---------|-------------| | `STATE_DIR` | `/tmp/oo-bandit-state` | Directory for per-user bandit state JSON files | | `LITELLM_URL` | `http://localhost:4000` | LiteLLM gateway | | `LITELLM_MASTER_KEY` | `sk-oo-dev` | LiteLLM auth key | | `NATS_URL` | `` | NATS broker URL; empty = consumers disabled | | `NATS_DURABLE_PREFIX` | `feature-pipeline` | Prefix for durable consumer names | | `NATS_MAX_DELIVER` | `5` | Max redelivery attempts before dropping | | `DEFAULT_PROMPT_VERSION` | `v1` | Fallback prompt version for `/generate` | | `ENV` | `development` | Environment label (passed to Sentry) | | `SENTRY_DSN` | `` | Sentry DSN; empty = Sentry disabled | ## Health story `GET /health` returns `{ ok: true }` plus NATS consumer state: ```json { "ok": true, "nats": { "enabled": true, "consumers": { "signals": { "last_msg_ts": "2026-04-25T10:00:00Z", "processed": 42, "errors": 0 }, "feedback": { "last_msg_ts": null, "processed": 0, "errors": 0 } } } } ``` `last_msg_ts` is `null` until the first message arrives. Used by docker-compose healthcheck. ## Extraction criteria Extract to its own process (already is one). Extract to a dedicated host / GPU node when: - p99 scoring latency exceeds 50 ms under load, **or** - model weights are too large to share memory with the Python process on the current host. ## State Per-user bandit state is stored as JSON files in `STATE_DIR`: | File pattern | Policy | |---|---| | `{user}.json` | LinUCB v1 | | `{user}_egreedy.json` | ε-greedy v1 | | `{user}_egreedy_v2.json` | ε-greedy v2 | | `{user}_sync.json` | Last task sync metadata (written by JetStream consumer) |