Files
oO/ml/agents/momentum.py
alvis 4cade4868b feat(agents): per-user baseline + stdev inference for momentum agent (#114)
Adds two InferredParams (TTL=7d) computed from 28-day rolling daily done counts:
- baseline_completions_per_day: mean done events/day over the window
- stdev: stdev of daily counts (floored at 0.1 to avoid division by zero)

MomentumAgent.compute() now calculates a z-score from recent done events in
inp.feedback_history vs the inferred baseline. Snippet language switches to
z-score framing ("above your usual pace", "slowing down") when |z| >= 1.0,
falling back to engagement_trend labels when in the normal range.

- engagement_trend InferredParam preserved for backward compatibility
- momentum_window pref added (default 7, user-overridable)
- 14 new tests covering power user, casual user, returning-from-break, and
  relative stdev comparison; engagement_trend tests updated for z-score priority
- Agent bumped to v1.2.0

Closes #114

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 05:18:29 +00:00

250 lines
9.1 KiB
Python

from __future__ import annotations
import math
import statistics
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from typing import ClassVar
from .base import BaseAgent, AgentInput, AgentOutput
from .inference.history import UserHistory
from .manifest import AgentManifest, InferredParam
def _parse_dt(iso: str) -> datetime:
try:
dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return datetime.min.replace(tzinfo=timezone.utc)
def _daily_done_counts(history: UserHistory, window_days: int = 28) -> list[int]:
"""Count done-action events per calendar day over the last window_days days."""
if not history.events:
return []
latest = max(_parse_dt(e.created_at) for e in history.events)
cutoff = latest - timedelta(days=window_days)
by_day: dict[tuple[int, int, int], int] = defaultdict(int)
for e in history.events:
if e.action == "done":
dt = _parse_dt(e.created_at)
if dt >= cutoff:
by_day[(dt.year, dt.month, dt.day)] += 1
# Return counts for every day in the window, including zero-completion days.
counts = []
for offset in range(window_days):
day = (latest - timedelta(days=offset)).date()
counts.append(by_day.get((day.year, day.month, day.day), 0))
return counts
def _infer_baseline_completions_per_day(history: UserHistory) -> float:
counts = _daily_done_counts(history)
return statistics.mean(counts) if counts else 1.0
def _infer_stdev(history: UserHistory) -> float:
counts = _daily_done_counts(history)
if len(counts) < 2:
return 1.0
sd = statistics.stdev(counts)
return max(sd, 0.1) # floor so we never divide by zero in z-score
def _infer_engagement_trend(history: UserHistory) -> str:
"""Compare done-rate in the most recent 7 days vs the 7 days before that."""
events = sorted(history.events, key=lambda e: e.created_at)
if not events:
return "stable"
try:
latest = datetime.fromisoformat(events[-1].created_at.replace("Z", "+00:00"))
except ValueError:
return "stable"
cutoff_recent = latest - timedelta(days=7)
cutoff_older = latest - timedelta(days=14)
recent = [e for e in events if _parse_dt(e.created_at) >= cutoff_recent]
older = [e for e in events if cutoff_older <= _parse_dt(e.created_at) < cutoff_recent]
if len(older) < 3:
return "stable"
recent_rate = sum(1 for e in recent if e.action == "done") / max(len(recent), 1)
older_rate = sum(1 for e in older if e.action == "done") / max(len(older), 1)
delta = recent_rate - older_rate
if delta > 0.10:
return "up"
if delta < -0.10:
return "down"
return "stable"
MANIFEST = AgentManifest(
id="momentum",
version="1.2.0", # #114: baseline + stdev inferred params; z-score snippet language
description="Characterises the user's recent engagement trend from profile features.",
pref_schema={
"type": "object",
"additionalProperties": False,
"properties": {
"low_engagement_threshold_pct": {
"type": "integer",
"minimum": 0,
"maximum": 100,
"default": 25,
"description": "Completion rate below which momentum hints at low engagement.",
},
"baseline_completions_per_day": {
"type": "number",
"minimum": 0,
"default": 1.0,
"description": "User's normal daily done-task rate (inferred from 28d history).",
},
"stdev": {
"type": "number",
"minimum": 0,
"default": 1.0,
"description": "Stdev of daily completion counts; used for z-score normalisation.",
},
"momentum_window": {
"type": "integer",
"minimum": 1,
"default": 7,
"description": "Days of recent history to measure current momentum against baseline.",
},
},
},
context_schema=["profile.features"],
required_consents=["data:core", "agent:momentum"],
output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=21_600,
inferred_params=[
InferredParam(
key="engagement_trend",
ttl_sec=21_600,
cold_start_default="stable",
min_history=10,
infer=_infer_engagement_trend,
),
InferredParam(
key="baseline_completions_per_day",
ttl_sec=7 * 86_400,
cold_start_default=1.0,
min_history=14,
infer=_infer_baseline_completions_per_day,
),
InferredParam(
key="stdev",
ttl_sec=7 * 86_400,
cold_start_default=1.0,
min_history=14,
infer=_infer_stdev,
),
],
)
def _z_score_label(z: float) -> str | None:
"""Map z-score to a human-readable momentum label, or None if within normal range."""
if z >= 2.0:
return "well above your usual pace"
if z >= 1.0:
return "above your usual pace"
if z <= -2.0:
return "well below your usual pace"
if z <= -1.0:
return "below your usual pace"
return None
class MomentumAgent(BaseAgent):
"""Characterises the user's recent engagement trend from profile features."""
agent_id: ClassVar[str] = MANIFEST.id
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
version: ClassVar[str] = MANIFEST.version
def compute(self, inp: AgentInput) -> AgentOutput:
completion = inp.profile.get("completion_rate_30d")
dismiss = inp.profile.get("dismiss_rate_30d")
volume = inp.profile.get("tip_volume_30d")
trend: str = inp.agent_prefs.get("engagement_trend", "stable")
baseline: float = float(inp.agent_prefs.get("baseline_completions_per_day", 1.0))
stdev: float = max(float(inp.agent_prefs.get("stdev", 1.0)), 0.1)
window: int = int(inp.agent_prefs.get("momentum_window", 7))
# Count done events in the recent window from feedback_history.
now = inp.now.astimezone(timezone.utc)
cutoff = now - timedelta(days=window)
recent_done = sum(
1 for e in inp.feedback_history
if e.get("action") == "done" and _parse_dt(e.get("created_at", "")) >= cutoff
)
recent_rate = recent_done / window # completions/day over the window
z = (recent_rate - baseline) / stdev
z_label = _z_score_label(z)
parts: list[str] = []
if completion is not None:
pct = round(completion * 100)
if pct >= 50:
parts.append(f"The user completes {pct}% of tips (strong engagement).")
elif pct >= 25:
parts.append(f"The user completes {pct}% of tips (moderate engagement).")
else:
parts.append(
f"The user completes {pct}% of tips "
f"(low engagement — prefer simple, immediately actionable tips)."
)
else:
parts.append("No completion-rate data yet (new user).")
if dismiss is not None:
dpct = round(dismiss * 100)
if dpct >= 40:
parts.append(f"Dismiss rate is high ({dpct}%) — avoid repetitive or irrelevant tips.")
elif dpct <= 10:
parts.append(f"Dismiss rate is low ({dpct}%).")
if volume is not None and int(volume) < 5:
parts.append("Very few tips served so far — this is an early-stage user.")
# Z-score takes precedence over trend label when we have a baseline.
if z_label:
if z > 0:
parts.append(
f"Completion pace is {z_label} "
f"({recent_done} done in the last {window}d vs "
f"~{baseline * window:.1f} expected) — build on the momentum."
)
else:
parts.append(
f"Completion pace is {z_label} "
f"({recent_done} done in the last {window}d vs "
f"~{baseline * window:.1f} expected) — a motivational or easy-win tip may help."
)
elif trend == "up":
parts.append("Engagement is trending up compared to last week — build on the momentum.")
elif trend == "down":
parts.append("Engagement is trending down — a motivational or easy-win tip may help.")
prompt = " ".join(parts) if parts else "No engagement data available yet."
snapshot = {
"completion_rate_30d": completion,
"dismiss_rate_30d": dismiss,
"tip_volume_30d": volume,
"engagement_trend": trend,
"baseline_completions_per_day": baseline,
"stdev": stdev,
"momentum_window": window,
"recent_done_count": recent_done,
"z_score": round(z, 2),
}
return self._make_output(inp, prompt, snapshot)