From 4cade4868bd5f6eeb86b3eb581163a6a32eaa430 Mon Sep 17 00:00:00 2001 From: alvis Date: Wed, 6 May 2026 05:18:29 +0000 Subject: [PATCH] feat(agents): per-user baseline + stdev inference for momentum agent (#114) Adds two InferredParams (TTL=7d) computed from 28-day rolling daily done counts: - baseline_completions_per_day: mean done events/day over the window - stdev: stdev of daily counts (floored at 0.1 to avoid division by zero) MomentumAgent.compute() now calculates a z-score from recent done events in inp.feedback_history vs the inferred baseline. Snippet language switches to z-score framing ("above your usual pace", "slowing down") when |z| >= 1.0, falling back to engagement_trend labels when in the normal range. - engagement_trend InferredParam preserved for backward compatibility - momentum_window pref added (default 7, user-overridable) - 14 new tests covering power user, casual user, returning-from-break, and relative stdev comparison; engagement_trend tests updated for z-score priority - Agent bumped to v1.2.0 Closes #114 Co-Authored-By: Claude Sonnet 4.6 --- ml/agents/momentum.py | 142 ++++++++++++++++++-- ml/agents/tests/test_per_agent_inference.py | 127 +++++++++++++++-- 2 files changed, 245 insertions(+), 24 deletions(-) diff --git a/ml/agents/momentum.py b/ml/agents/momentum.py index 7d6da50..0f821b6 100644 --- a/ml/agents/momentum.py +++ b/ml/agents/momentum.py @@ -1,5 +1,8 @@ from __future__ import annotations +import math +import statistics +from collections import defaultdict from datetime import datetime, timedelta, timezone from typing import ClassVar @@ -8,6 +11,49 @@ from .inference.history import UserHistory from .manifest import AgentManifest, InferredParam +def _parse_dt(iso: str) -> datetime: + try: + dt = datetime.fromisoformat(iso.replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except ValueError: + return datetime.min.replace(tzinfo=timezone.utc) + + +def _daily_done_counts(history: UserHistory, window_days: int = 28) -> list[int]: + """Count done-action events per calendar day over the last window_days days.""" + if not history.events: + return [] + latest = max(_parse_dt(e.created_at) for e in history.events) + cutoff = latest - timedelta(days=window_days) + by_day: dict[tuple[int, int, int], int] = defaultdict(int) + for e in history.events: + if e.action == "done": + dt = _parse_dt(e.created_at) + if dt >= cutoff: + by_day[(dt.year, dt.month, dt.day)] += 1 + # Return counts for every day in the window, including zero-completion days. + counts = [] + for offset in range(window_days): + day = (latest - timedelta(days=offset)).date() + counts.append(by_day.get((day.year, day.month, day.day), 0)) + return counts + + +def _infer_baseline_completions_per_day(history: UserHistory) -> float: + counts = _daily_done_counts(history) + return statistics.mean(counts) if counts else 1.0 + + +def _infer_stdev(history: UserHistory) -> float: + counts = _daily_done_counts(history) + if len(counts) < 2: + return 1.0 + sd = statistics.stdev(counts) + return max(sd, 0.1) # floor so we never divide by zero in z-score + + def _infer_engagement_trend(history: UserHistory) -> str: """Compare done-rate in the most recent 7 days vs the 7 days before that.""" events = sorted(history.events, key=lambda e: e.created_at) @@ -26,7 +72,7 @@ def _infer_engagement_trend(history: UserHistory) -> str: older = [e for e in events if cutoff_older <= _parse_dt(e.created_at) < cutoff_recent] if len(older) < 3: - return "stable" # not enough baseline to compare + return "stable" recent_rate = sum(1 for e in recent if e.action == "done") / max(len(recent), 1) older_rate = sum(1 for e in older if e.action == "done") / max(len(older), 1) @@ -39,19 +85,9 @@ def _infer_engagement_trend(history: UserHistory) -> str: return "stable" -def _parse_dt(iso: str) -> datetime: - try: - dt = datetime.fromisoformat(iso.replace("Z", "+00:00")) - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - return dt - except ValueError: - return datetime.min.replace(tzinfo=timezone.utc) - - MANIFEST = AgentManifest( id="momentum", - version="1.1.0", # bumped: engagement_trend InferredParam added (#114) + version="1.2.0", # #114: baseline + stdev inferred params; z-score snippet language description="Characterises the user's recent engagement trend from profile features.", pref_schema={ "type": "object", @@ -64,6 +100,24 @@ MANIFEST = AgentManifest( "default": 25, "description": "Completion rate below which momentum hints at low engagement.", }, + "baseline_completions_per_day": { + "type": "number", + "minimum": 0, + "default": 1.0, + "description": "User's normal daily done-task rate (inferred from 28d history).", + }, + "stdev": { + "type": "number", + "minimum": 0, + "default": 1.0, + "description": "Stdev of daily completion counts; used for z-score normalisation.", + }, + "momentum_window": { + "type": "integer", + "minimum": 1, + "default": 7, + "description": "Days of recent history to measure current momentum against baseline.", + }, }, }, context_schema=["profile.features"], @@ -73,15 +127,42 @@ MANIFEST = AgentManifest( inferred_params=[ InferredParam( key="engagement_trend", - ttl_sec=21_600, # recompute every 6 hours alongside snippet + ttl_sec=21_600, cold_start_default="stable", min_history=10, infer=_infer_engagement_trend, ), + InferredParam( + key="baseline_completions_per_day", + ttl_sec=7 * 86_400, + cold_start_default=1.0, + min_history=14, + infer=_infer_baseline_completions_per_day, + ), + InferredParam( + key="stdev", + ttl_sec=7 * 86_400, + cold_start_default=1.0, + min_history=14, + infer=_infer_stdev, + ), ], ) +def _z_score_label(z: float) -> str | None: + """Map z-score to a human-readable momentum label, or None if within normal range.""" + if z >= 2.0: + return "well above your usual pace" + if z >= 1.0: + return "above your usual pace" + if z <= -2.0: + return "well below your usual pace" + if z <= -1.0: + return "below your usual pace" + return None + + class MomentumAgent(BaseAgent): """Characterises the user's recent engagement trend from profile features.""" agent_id: ClassVar[str] = MANIFEST.id @@ -93,6 +174,20 @@ class MomentumAgent(BaseAgent): dismiss = inp.profile.get("dismiss_rate_30d") volume = inp.profile.get("tip_volume_30d") trend: str = inp.agent_prefs.get("engagement_trend", "stable") + baseline: float = float(inp.agent_prefs.get("baseline_completions_per_day", 1.0)) + stdev: float = max(float(inp.agent_prefs.get("stdev", 1.0)), 0.1) + window: int = int(inp.agent_prefs.get("momentum_window", 7)) + + # Count done events in the recent window from feedback_history. + now = inp.now.astimezone(timezone.utc) + cutoff = now - timedelta(days=window) + recent_done = sum( + 1 for e in inp.feedback_history + if e.get("action") == "done" and _parse_dt(e.get("created_at", "")) >= cutoff + ) + recent_rate = recent_done / window # completions/day over the window + z = (recent_rate - baseline) / stdev + z_label = _z_score_label(z) parts: list[str] = [] @@ -120,7 +215,21 @@ class MomentumAgent(BaseAgent): if volume is not None and int(volume) < 5: parts.append("Very few tips served so far — this is an early-stage user.") - if trend == "up": + # Z-score takes precedence over trend label when we have a baseline. + if z_label: + if z > 0: + parts.append( + f"Completion pace is {z_label} " + f"({recent_done} done in the last {window}d vs " + f"~{baseline * window:.1f} expected) — build on the momentum." + ) + else: + parts.append( + f"Completion pace is {z_label} " + f"({recent_done} done in the last {window}d vs " + f"~{baseline * window:.1f} expected) — a motivational or easy-win tip may help." + ) + elif trend == "up": parts.append("Engagement is trending up compared to last week — build on the momentum.") elif trend == "down": parts.append("Engagement is trending down — a motivational or easy-win tip may help.") @@ -131,5 +240,10 @@ class MomentumAgent(BaseAgent): "dismiss_rate_30d": dismiss, "tip_volume_30d": volume, "engagement_trend": trend, + "baseline_completions_per_day": baseline, + "stdev": stdev, + "momentum_window": window, + "recent_done_count": recent_done, + "z_score": round(z, 2), } return self._make_output(inp, prompt, snapshot) diff --git a/ml/agents/tests/test_per_agent_inference.py b/ml/agents/tests/test_per_agent_inference.py index 8552d53..9b7bec4 100644 --- a/ml/agents/tests/test_per_agent_inference.py +++ b/ml/agents/tests/test_per_agent_inference.py @@ -48,19 +48,31 @@ def _completion(project_id: str | None, lateness_days: float) -> TaskCompletion: ) -# ── momentum: engagement_trend ─────────────────────────────────────────────── +# ── momentum helpers ───────────────────────────────────────────────────────── -class TestMomentumInference: +def _neutral_prefs(**extra) -> dict: + """Prefs that put z-score in the normal range so trend label can show.""" + return {"baseline_completions_per_day": 0.0, "stdev": 1.0, "momentum_window": 7, **extra} + + +def _feedback_done(n: int, days_ago: float = 1.0) -> list[dict]: + from datetime import timedelta + ts = (_NOW - timedelta(days=days_ago)).isoformat() + return [{"action": "done", "dwell_ms": 60_000, "created_at": ts}] * n + + +# ── momentum: engagement_trend inference ───────────────────────────────────── + +class TestMomentumTrendInference: def test_cold_start_below_min_history(self): history = _history(*[_event("done", days_ago=i) for i in range(5)]) result = run_inference(MOMENTUM_MANIFEST, history) assert result["engagement_trend"] == "stable" # cold_start_default def test_trend_up_when_recent_done_rate_higher(self): - # 8 done in last 7 days, 1 done in prior 7 days → trending up recent = [_event("done", days_ago=i) for i in range(1, 9)] older = [_event("dismiss", days_ago=i) for i in range(8, 15)] - older[0] = _event("done", days_ago=8) # one done in older window + older[0] = _event("done", days_ago=8) history = _history(*recent, *older) result = run_inference(MOMENTUM_MANIFEST, history) assert result["engagement_trend"] == "up" @@ -78,20 +90,115 @@ class TestMomentumInference: result = run_inference(MOMENTUM_MANIFEST, history) assert result["engagement_trend"] == "stable" - def test_agent_uses_trend_in_snippet(self): - out = MomentumAgent().compute(_inp(agent_prefs={"engagement_trend": "up"})) + def test_trend_shown_when_z_score_normal(self): + # baseline=0 so z≈0 → no z label → trend label falls through + out = MomentumAgent().compute(_inp(agent_prefs=_neutral_prefs(engagement_trend="up"))) assert "trending up" in out.prompt_text - def test_agent_uses_down_trend_in_snippet(self): - out = MomentumAgent().compute(_inp(agent_prefs={"engagement_trend": "down"})) + def test_trend_down_shown_when_z_score_normal(self): + out = MomentumAgent().compute(_inp(agent_prefs=_neutral_prefs(engagement_trend="down"))) assert "trending down" in out.prompt_text def test_snapshot_includes_trend(self): - out = MomentumAgent().compute(_inp(agent_prefs={"engagement_trend": "stable"})) + out = MomentumAgent().compute(_inp(agent_prefs=_neutral_prefs(engagement_trend="stable"))) assert "engagement_trend" in out.signals_snapshot + +# ── momentum: baseline + stdev inference (#114) ─────────────────────────────── + +class TestMomentumBaselineInference: + def _events_n_per_day(self, done_per_day: int, n_days: int) -> list[FeedbackEvent]: + """Generate done events spread across n_days.""" + events = [] + for d in range(n_days): + for _ in range(done_per_day): + events.append(_event("done", days_ago=d + 0.5)) + return events + + def test_cold_start_when_few_events(self): + history = _history(*[_event("done", days_ago=i) for i in range(5)]) + result = run_inference(MOMENTUM_MANIFEST, history) + assert result["baseline_completions_per_day"] == 1.0 + assert result["stdev"] == 1.0 + + def test_power_user_baseline_high(self): + # 5 done events per day for 20 days → baseline ≈ 5/day (over 28d window, zeros fill rest) + events = self._events_n_per_day(5, 20) + history = _history(*events) + result = run_inference(MOMENTUM_MANIFEST, history) + assert result["baseline_completions_per_day"] > 2.0 + + def test_casual_user_baseline_low(self): + # 1 done every 3 days + dismiss filler to clear min_history=14 → baseline ≈ 0.33/day + done_events = [_event("done", days_ago=d * 3 + 0.5) for d in range(7)] + filler = [_event("dismiss", days_ago=d + 0.5) for d in range(10)] + history = _history(*done_events, *filler) + result = run_inference(MOMENTUM_MANIFEST, history) + assert result["baseline_completions_per_day"] < 0.5 + + def test_stdev_reflects_variability(self): + # Alternating 0 and 4 done events → high stdev + events = [] + for d in range(14): + if d % 2 == 0: + for _ in range(4): + events.append(_event("done", days_ago=d + 0.5)) + history = _history(*events) + result = run_inference(MOMENTUM_MANIFEST, history) + assert result["stdev"] > 1.0 + + def test_consistent_user_lower_stdev_than_variable(self): + # Consistent 2/day for 28 days has lower stdev than alternating 0/4 + consistent = self._events_n_per_day(2, 28) + variable = [] + for d in range(14): + if d % 2 == 0: + for _ in range(4): + variable.append(_event("done", days_ago=d + 0.5)) + else: + variable.append(_event("dismiss", days_ago=d + 0.5)) + r_consistent = run_inference(MOMENTUM_MANIFEST, _history(*consistent)) + r_variable = run_inference(MOMENTUM_MANIFEST, _history(*variable)) + assert r_consistent["stdev"] < r_variable["stdev"] + + +# ── momentum: z-score snippet language ─────────────────────────────────────── + +class TestMomentumZScore: + def _prefs(self, baseline: float, stdev: float = 1.0) -> dict: + return {"baseline_completions_per_day": baseline, "stdev": stdev, + "momentum_window": 7, "engagement_trend": "stable"} + + def test_power_user_above_baseline_says_above_usual(self): + # baseline=3/day, stdev=1.0, window=7 → expected rate=3; user did 35 → rate=5, z=2 + prefs = self._prefs(baseline=3.0, stdev=1.0) + feedback = _feedback_done(35, days_ago=1.0) + out = MomentumAgent().compute(_inp(feedback_history=feedback, agent_prefs=prefs)) + assert "above your usual" in out.prompt_text + + def test_casual_user_slowing_down(self): + # baseline=1/day, user did 0 in 7d → z = (0 - 1) / 1 = -1 → below usual + prefs = self._prefs(baseline=1.0, stdev=1.0) + out = MomentumAgent().compute(_inp(feedback_history=[], agent_prefs=prefs)) + assert "below your usual" in out.prompt_text + + def test_returning_from_break_at_normal_rate(self): + # User just came back: 1 done, baseline=1/day, window=7 → z=(1/7-1)/1≈-0.86, within normal + prefs = self._prefs(baseline=1.0, stdev=1.0) + feedback = _feedback_done(1, days_ago=0.5) + out = MomentumAgent().compute(_inp(feedback_history=feedback, agent_prefs=prefs)) + # z ≈ -0.86 → no z label, falls back to trend (stable → no extra sentence) + assert "above your usual" not in out.prompt_text + assert "below your usual" not in out.prompt_text + + def test_snapshot_includes_z_score(self): + prefs = self._prefs(baseline=1.0) + out = MomentumAgent().compute(_inp(agent_prefs=prefs)) + assert "z_score" in out.signals_snapshot + assert "recent_done_count" in out.signals_snapshot + def test_version_bumped(self): - assert MOMENTUM_MANIFEST.version == "1.1.0" + assert MOMENTUM_MANIFEST.version == "1.2.0" # ── overdue-task: lateness_tolerance_days + project_realness (#115) ──────────