feat(features): per-feature freshness spec — JIT vs batched (#61)
Each ml/features/*.py now declares freshness, source, and fallback per feature. ProfileFeature gains ttl_sec (mirrored from registry.ts), freshness="batched", source, and fallback. context.py adds ContextFeatureSpec + CONTEXT_FEATURES for the three JIT features (hour_of_day, day_of_week, tasks). CI test parses ttlSec from registry.ts to catch drift. ml/README updated with split JIT/batched feature contract. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
22
ml/README.md
22
ml/README.md
@@ -5,7 +5,7 @@ Python. Owns models, features, training, online scoring.
|
|||||||
| Dir | Role | Phase |
|
| Dir | Role | Phase |
|
||||||
|---|---|---|
|
|---|---|---|
|
||||||
| `serving/` | FastAPI online scorer (`/score`, `/generate`) + LiteLLM gateway + prompt registry (`prompts.py`) + JetStream consumers for `signals.>` / `feedback.>`, called by `recommender` | 1–2 |
|
| `serving/` | FastAPI online scorer (`/score`, `/generate`) + LiteLLM gateway + prompt registry (`prompts.py`) + JetStream consumers for `signals.>` / `feedback.>`, called by `recommender` | 1–2 |
|
||||||
| `features/` | context assembler (`context.py`): signals → `PromptContext`; Feast adapter later | 2 |
|
| `features/` | context assembler (`context.py`): signals → `PromptContext`; profile-feature schema mirror (`profile_schema.py`); Feast adapter later | 2 |
|
||||||
| `pipelines/` | batch feature + training DAGs (Prefect/Airflow) | 4 |
|
| `pipelines/` | batch feature + training DAGs (Prefect/Airflow) | 4 |
|
||||||
| `registry/` | MLflow-backed model registry integration | 4 |
|
| `registry/` | MLflow-backed model registry integration | 4 |
|
||||||
| `experiments/` | A/B assignment + multi-armed bandit policies | 4 |
|
| `experiments/` | A/B assignment + multi-armed bandit policies | 4 |
|
||||||
@@ -18,14 +18,24 @@ Python. Owns models, features, training, online scoring.
|
|||||||
- Training reads from the offline feature store; serving reads from the online feature store; definitions are shared (no train/serve skew).
|
- Training reads from the offline feature store; serving reads from the online feature store; definitions are shared (no train/serve skew).
|
||||||
- Shadow deploys before any policy change that affects real users.
|
- Shadow deploys before any policy change that affects real users.
|
||||||
|
|
||||||
## Profile-feature contract
|
## Feature contract
|
||||||
|
|
||||||
|
### Profile features (batched)
|
||||||
|
|
||||||
User-level features (completion rate, preferred hour, tip volume…) are computed
|
User-level features (completion rate, preferred hour, tip volume…) are computed
|
||||||
by the TypeScript recommender and shipped to ml/serving on every `/score` and
|
by the TypeScript recommender and shipped to `ml/serving` on every `/score` and
|
||||||
`/generate` call as `profile_features: dict | None`. The Python mirror in
|
`/generate` call as `profile_features: dict | None`. The Python mirror in
|
||||||
`features/profile_schema.py` documents the available names + dtypes — keep it
|
`features/profile_schema.py` documents each feature's name, dtype, TTL, source,
|
||||||
in sync with `services/api/src/profile/registry.ts` (a CI-style test asserts
|
and null fallback — keep it in sync with `services/api/src/profile/registry.ts`
|
||||||
the name sets match). See ADR-0011.
|
(a CI-style test asserts names and `ttlSec` values match). See ADR-0011.
|
||||||
|
|
||||||
|
### Context features (JIT)
|
||||||
|
|
||||||
|
Request-time signals assembled by `features/context.py` (`hour_of_day`,
|
||||||
|
`day_of_week`, task list). These are never cached — they are derived from the
|
||||||
|
system clock and the live Todoist feed at the moment of the score call.
|
||||||
|
`CONTEXT_FEATURES` in `context.py` declares freshness, source, and fallback for
|
||||||
|
each field (issue #61).
|
||||||
|
|
||||||
## Prompt registry
|
## Prompt registry
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,8 @@
|
|||||||
from .context import build_context, PromptContext, TaskSignal
|
from .context import build_context, PromptContext, TaskSignal, ContextFeatureSpec, CONTEXT_FEATURES
|
||||||
|
from .profile_schema import ProfileFeature, PROFILE_FEATURES, feature_names
|
||||||
|
|
||||||
__all__ = ["build_context", "PromptContext", "TaskSignal"]
|
__all__ = [
|
||||||
|
"build_context", "PromptContext", "TaskSignal",
|
||||||
|
"ContextFeatureSpec", "CONTEXT_FEATURES",
|
||||||
|
"ProfileFeature", "PROFILE_FEATURES", "feature_names",
|
||||||
|
]
|
||||||
|
|||||||
@@ -2,12 +2,56 @@
|
|||||||
Context assembler — converts raw user signals into a PromptContext for LLM tip generation.
|
Context assembler — converts raw user signals into a PromptContext for LLM tip generation.
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
from ml.features.context import build_context
|
from ml.features.context import build_context, CONTEXT_FEATURES
|
||||||
ctx = build_context(tasks, hour_of_day=9, day_of_week=2)
|
ctx = build_context(tasks, hour_of_day=9, day_of_week=2)
|
||||||
|
|
||||||
|
Feature-spec (issue #61):
|
||||||
|
All context features are JIT — they are assembled at request time from live
|
||||||
|
sources (system clock, caller-supplied task list) rather than read from a
|
||||||
|
cached profile store. They carry no TTL because they are never persisted.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Literal
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class ContextFeatureSpec:
|
||||||
|
name: str
|
||||||
|
dtype: Literal["numeric", "categorical", "list"]
|
||||||
|
freshness: Literal["jit", "batched"]
|
||||||
|
source: str
|
||||||
|
fallback: str
|
||||||
|
description: str
|
||||||
|
|
||||||
|
|
||||||
|
CONTEXT_FEATURES: tuple[ContextFeatureSpec, ...] = (
|
||||||
|
ContextFeatureSpec(
|
||||||
|
name="hour_of_day",
|
||||||
|
dtype="numeric",
|
||||||
|
freshness="jit",
|
||||||
|
source="request",
|
||||||
|
fallback="12",
|
||||||
|
description="Current hour (0–23), supplied by the caller at score time.",
|
||||||
|
),
|
||||||
|
ContextFeatureSpec(
|
||||||
|
name="day_of_week",
|
||||||
|
dtype="numeric",
|
||||||
|
freshness="jit",
|
||||||
|
source="request",
|
||||||
|
fallback="0",
|
||||||
|
description="ISO weekday (0=Monday … 6=Sunday), supplied by the caller at score time.",
|
||||||
|
),
|
||||||
|
ContextFeatureSpec(
|
||||||
|
name="tasks",
|
||||||
|
dtype="list",
|
||||||
|
freshness="jit",
|
||||||
|
source="todoist-integration",
|
||||||
|
fallback="[]",
|
||||||
|
description="User's open tasks fetched live from the Todoist integration at request time.",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|||||||
@@ -8,6 +8,12 @@ code (ml/serving, eval harnesses, notebooks) knows what fields to expect on
|
|||||||
|
|
||||||
Update this file whenever you add or rename a feature in the TS registry.
|
Update this file whenever you add or rename a feature in the TS registry.
|
||||||
The accompanying test asserts the two stay in sync at the name level.
|
The accompanying test asserts the two stay in sync at the name level.
|
||||||
|
|
||||||
|
Feature-spec fields (issue #61):
|
||||||
|
freshness — "batched": value cached in profile store, recomputed on TTL/event.
|
||||||
|
ttl_sec — cache lifetime in seconds; mirrors ``ttlSec`` in registry.ts.
|
||||||
|
source — where the value originates.
|
||||||
|
fallback — raw value returned when the feature is unavailable (null stored).
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
@@ -16,6 +22,10 @@ from typing import Literal
|
|||||||
|
|
||||||
|
|
||||||
Dtype = Literal["numeric", "categorical"]
|
Dtype = Literal["numeric", "categorical"]
|
||||||
|
Freshness = Literal["jit", "batched"]
|
||||||
|
|
||||||
|
_HOUR = 3600
|
||||||
|
_DAY = 86_400
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
@@ -23,28 +33,57 @@ class ProfileFeature:
|
|||||||
name: str
|
name: str
|
||||||
dtype: Dtype
|
dtype: Dtype
|
||||||
description: str
|
description: str
|
||||||
|
freshness: Freshness
|
||||||
|
ttl_sec: int
|
||||||
|
source: str
|
||||||
|
fallback: str
|
||||||
|
|
||||||
|
|
||||||
PROFILE_FEATURES: tuple[ProfileFeature, ...] = (
|
PROFILE_FEATURES: tuple[ProfileFeature, ...] = (
|
||||||
ProfileFeature(
|
ProfileFeature(
|
||||||
"completion_rate_30d", "numeric",
|
name="completion_rate_30d",
|
||||||
'Fraction of tips served in the last 30 days that received a "done" reaction.',
|
dtype="numeric",
|
||||||
|
description='Fraction of tips served in the last 30 days that received a "done" reaction.',
|
||||||
|
freshness="batched",
|
||||||
|
ttl_sec=6 * _HOUR,
|
||||||
|
source="profile_store",
|
||||||
|
fallback="0.0",
|
||||||
),
|
),
|
||||||
ProfileFeature(
|
ProfileFeature(
|
||||||
"dismiss_rate_30d", "numeric",
|
name="dismiss_rate_30d",
|
||||||
'Fraction of tips served in the last 30 days that received a "dismiss" reaction.',
|
dtype="numeric",
|
||||||
|
description='Fraction of tips served in the last 30 days that received a "dismiss" reaction.',
|
||||||
|
freshness="batched",
|
||||||
|
ttl_sec=6 * _HOUR,
|
||||||
|
source="profile_store",
|
||||||
|
fallback="0.0",
|
||||||
),
|
),
|
||||||
ProfileFeature(
|
ProfileFeature(
|
||||||
"mean_dwell_ms_30d", "numeric",
|
name="mean_dwell_ms_30d",
|
||||||
"Average dwell time (ms between served and reacted) over the last 30 days.",
|
dtype="numeric",
|
||||||
|
description="Average dwell time (ms between served and reacted) over the last 30 days.",
|
||||||
|
freshness="batched",
|
||||||
|
ttl_sec=6 * _HOUR,
|
||||||
|
source="profile_store",
|
||||||
|
fallback="null — serving normalises to 0.0",
|
||||||
),
|
),
|
||||||
ProfileFeature(
|
ProfileFeature(
|
||||||
"preferred_hour", "numeric",
|
name="preferred_hour",
|
||||||
'Hour-of-day with the most "done" reactions in the last 30 days (0-23).',
|
dtype="numeric",
|
||||||
|
description='Hour-of-day with the most "done" reactions in the last 30 days (0–23).',
|
||||||
|
freshness="batched",
|
||||||
|
ttl_sec=_DAY,
|
||||||
|
source="profile_store",
|
||||||
|
fallback="null — serving normalises to 0.5 (neutral alignment)",
|
||||||
),
|
),
|
||||||
ProfileFeature(
|
ProfileFeature(
|
||||||
"tip_volume_30d", "numeric",
|
name="tip_volume_30d",
|
||||||
"Number of tips served to the user in the last 30 days.",
|
dtype="numeric",
|
||||||
|
description="Number of tips served to the user in the last 30 days.",
|
||||||
|
freshness="batched",
|
||||||
|
ttl_sec=_HOUR,
|
||||||
|
source="profile_store",
|
||||||
|
fallback="0",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
"""Tests for ml/features/context.py"""
|
"""Tests for ml/features/context.py"""
|
||||||
import pytest
|
import pytest
|
||||||
import sys, os; sys.path.insert(0, os.path.dirname(__file__))
|
import sys, os; sys.path.insert(0, os.path.dirname(__file__))
|
||||||
from context import build_context, TaskSignal, PromptContext
|
from context import build_context, TaskSignal, PromptContext, CONTEXT_FEATURES
|
||||||
|
|
||||||
|
|
||||||
def test_empty_tasks():
|
def test_empty_tasks():
|
||||||
@@ -62,3 +62,30 @@ def test_due_date_none_preserved():
|
|||||||
tasks = [TaskSignal(id="x", content="No due", due_date=None)]
|
tasks = [TaskSignal(id="x", content="No due", due_date=None)]
|
||||||
ctx = build_context(tasks)
|
ctx = build_context(tasks)
|
||||||
assert ctx.tasks[0]["due_date"] is None
|
assert ctx.tasks[0]["due_date"] is None
|
||||||
|
|
||||||
|
|
||||||
|
# ── CONTEXT_FEATURES spec tests (issue #61) ──────────────────────────────────
|
||||||
|
|
||||||
|
def test_context_features_expected_names():
|
||||||
|
names = {f.name for f in CONTEXT_FEATURES}
|
||||||
|
assert names == {"hour_of_day", "day_of_week", "tasks"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_context_features_all_jit():
|
||||||
|
for f in CONTEXT_FEATURES:
|
||||||
|
assert f.freshness == "jit", f"{f.name}: expected freshness='jit', got {f.freshness!r}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_context_features_source_set():
|
||||||
|
for f in CONTEXT_FEATURES:
|
||||||
|
assert f.source, f"{f.name}: source must not be empty"
|
||||||
|
|
||||||
|
|
||||||
|
def test_context_features_fallback_set():
|
||||||
|
for f in CONTEXT_FEATURES:
|
||||||
|
assert f.fallback, f"{f.name}: fallback must not be empty"
|
||||||
|
|
||||||
|
|
||||||
|
def test_context_features_no_duplicates():
|
||||||
|
names = [f.name for f in CONTEXT_FEATURES]
|
||||||
|
assert len(names) == len(set(names)), f"duplicate names: {names}"
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
"""Smoke test for profile_schema mirror (#81 phase A).
|
"""Smoke test for profile_schema mirror (#81 phase A, #61 freshness spec).
|
||||||
|
|
||||||
The TS registry in services/api/src/profile/registry.ts is the source of truth.
|
The TS registry in services/api/src/profile/registry.ts is the source of truth.
|
||||||
This test checks the names listed here match the registry by reading the TS
|
This test checks the names listed here match the registry by reading the TS
|
||||||
@@ -14,6 +14,18 @@ from ml.features.profile_schema import PROFILE_FEATURES, feature_names
|
|||||||
|
|
||||||
REGISTRY_PATH = Path(__file__).resolve().parents[2] / "services" / "api" / "src" / "profile" / "registry.ts"
|
REGISTRY_PATH = Path(__file__).resolve().parents[2] / "services" / "api" / "src" / "profile" / "registry.ts"
|
||||||
|
|
||||||
|
_HOUR = 3600
|
||||||
|
_DAY = 86_400
|
||||||
|
|
||||||
|
# Expected ttl_sec values mirrored from registry.ts — keeps the two in sync.
|
||||||
|
_EXPECTED_TTL: dict[str, int] = {
|
||||||
|
"completion_rate_30d": 6 * _HOUR,
|
||||||
|
"dismiss_rate_30d": 6 * _HOUR,
|
||||||
|
"mean_dwell_ms_30d": 6 * _HOUR,
|
||||||
|
"preferred_hour": _DAY,
|
||||||
|
"tip_volume_30d": _HOUR,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def _ts_registry_names() -> set[str]:
|
def _ts_registry_names() -> set[str]:
|
||||||
text = REGISTRY_PATH.read_text(encoding="utf-8")
|
text = REGISTRY_PATH.read_text(encoding="utf-8")
|
||||||
@@ -21,6 +33,35 @@ def _ts_registry_names() -> set[str]:
|
|||||||
return set(re.findall(r"name:\s*'([a-zA-Z0-9_]+)'", text))
|
return set(re.findall(r"name:\s*'([a-zA-Z0-9_]+)'", text))
|
||||||
|
|
||||||
|
|
||||||
|
def _ts_registry_ttls() -> dict[str, int]:
|
||||||
|
"""Parse ttlSec values from registry.ts (crude but sufficient for drift detection).
|
||||||
|
|
||||||
|
Handles TS symbolic constants (HOUR, DAY) and expressions like ``6 * HOUR``.
|
||||||
|
"""
|
||||||
|
text = REGISTRY_PATH.read_text(encoding="utf-8")
|
||||||
|
|
||||||
|
# Extract numeric constants: `const HOUR = 3600;` or `const DAY = 86_400;`
|
||||||
|
consts: dict[str, int] = {}
|
||||||
|
for m in re.finditer(r"const\s+([A-Z_]+)\s*=\s*([\d_]+)", text):
|
||||||
|
consts[m.group(1)] = int(m.group(2).replace("_", ""))
|
||||||
|
|
||||||
|
def _eval_expr(expr: str) -> int:
|
||||||
|
tokens = [t.strip() for t in expr.split("*")]
|
||||||
|
result = 1
|
||||||
|
for t in tokens:
|
||||||
|
result *= consts[t] if t in consts else int(t)
|
||||||
|
return result
|
||||||
|
|
||||||
|
result: dict[str, int] = {}
|
||||||
|
for block in re.split(r"\{", text):
|
||||||
|
name_m = re.search(r"name:\s*'([a-zA-Z0-9_]+)'", block)
|
||||||
|
# ttlSec may be a constant name, a number, or `N * CONST`
|
||||||
|
ttl_m = re.search(r"ttlSec:\s*([A-Za-z0-9_]+(?:\s*\*\s*[A-Za-z0-9_]+)?)", block)
|
||||||
|
if name_m and ttl_m:
|
||||||
|
result[name_m.group(1)] = _eval_expr(ttl_m.group(1))
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
def test_python_mirror_matches_ts_registry():
|
def test_python_mirror_matches_ts_registry():
|
||||||
py_names = feature_names()
|
py_names = feature_names()
|
||||||
ts_names = _ts_registry_names()
|
ts_names = _ts_registry_names()
|
||||||
@@ -39,3 +80,34 @@ def test_profile_schema_no_duplicates():
|
|||||||
def test_profile_schema_dtypes_known():
|
def test_profile_schema_dtypes_known():
|
||||||
for f in PROFILE_FEATURES:
|
for f in PROFILE_FEATURES:
|
||||||
assert f.dtype in {"numeric", "categorical"}
|
assert f.dtype in {"numeric", "categorical"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_all_profile_features_are_batched():
|
||||||
|
for f in PROFILE_FEATURES:
|
||||||
|
assert f.freshness == "batched", f"{f.name}: expected freshness='batched', got {f.freshness!r}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_profile_feature_ttl_matches_ts_registry():
|
||||||
|
ts_ttls = _ts_registry_ttls()
|
||||||
|
for f in PROFILE_FEATURES:
|
||||||
|
assert f.name in ts_ttls, f"{f.name} not found in TS registry ttlSec parse"
|
||||||
|
assert f.ttl_sec == ts_ttls[f.name], (
|
||||||
|
f"{f.name}: Python ttl_sec={f.ttl_sec} != TS ttlSec={ts_ttls[f.name]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_profile_feature_ttl_matches_expected():
|
||||||
|
for f in PROFILE_FEATURES:
|
||||||
|
assert f.ttl_sec == _EXPECTED_TTL[f.name], (
|
||||||
|
f"{f.name}: ttl_sec={f.ttl_sec}, expected {_EXPECTED_TTL[f.name]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_profile_feature_source_is_profile_store():
|
||||||
|
for f in PROFILE_FEATURES:
|
||||||
|
assert f.source == "profile_store", f"{f.name}: unexpected source {f.source!r}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_profile_feature_fallback_set():
|
||||||
|
for f in PROFILE_FEATURES:
|
||||||
|
assert f.fallback, f"{f.name}: fallback must not be empty"
|
||||||
|
|||||||
Reference in New Issue
Block a user