from __future__ import annotations import math import statistics from collections import defaultdict from datetime import datetime, timedelta, timezone from typing import ClassVar from .base import BaseAgent, AgentInput, AgentOutput from .inference.history import UserHistory from .manifest import AgentManifest, InferredParam def _parse_dt(iso: str) -> datetime: try: dt = datetime.fromisoformat(iso.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except ValueError: return datetime.min.replace(tzinfo=timezone.utc) def _daily_done_counts(history: UserHistory, window_days: int = 28) -> list[int]: """Count done-action events per calendar day over the last window_days days.""" if not history.events: return [] latest = max(_parse_dt(e.created_at) for e in history.events) cutoff = latest - timedelta(days=window_days) by_day: dict[tuple[int, int, int], int] = defaultdict(int) for e in history.events: if e.action == "done": dt = _parse_dt(e.created_at) if dt >= cutoff: by_day[(dt.year, dt.month, dt.day)] += 1 # Return counts for every day in the window, including zero-completion days. counts = [] for offset in range(window_days): day = (latest - timedelta(days=offset)).date() counts.append(by_day.get((day.year, day.month, day.day), 0)) return counts def _infer_baseline_completions_per_day(history: UserHistory) -> float: counts = _daily_done_counts(history) return statistics.mean(counts) if counts else 1.0 def _infer_stdev(history: UserHistory) -> float: counts = _daily_done_counts(history) if len(counts) < 2: return 1.0 sd = statistics.stdev(counts) return max(sd, 0.1) # floor so we never divide by zero in z-score def _infer_engagement_trend(history: UserHistory) -> str: """Compare done-rate in the most recent 7 days vs the 7 days before that.""" events = sorted(history.events, key=lambda e: e.created_at) if not events: return "stable" try: latest = datetime.fromisoformat(events[-1].created_at.replace("Z", "+00:00")) except ValueError: return "stable" cutoff_recent = latest - timedelta(days=7) cutoff_older = latest - timedelta(days=14) recent = [e for e in events if _parse_dt(e.created_at) >= cutoff_recent] older = [e for e in events if cutoff_older <= _parse_dt(e.created_at) < cutoff_recent] if len(older) < 3: return "stable" recent_rate = sum(1 for e in recent if e.action == "done") / max(len(recent), 1) older_rate = sum(1 for e in older if e.action == "done") / max(len(older), 1) delta = recent_rate - older_rate if delta > 0.10: return "up" if delta < -0.10: return "down" return "stable" MANIFEST = AgentManifest( id="momentum", version="1.2.0", # #114: baseline + stdev inferred params; z-score snippet language description="Characterises the user's recent engagement trend from profile features.", pref_schema={ "type": "object", "additionalProperties": False, "properties": { "low_engagement_threshold_pct": { "type": "integer", "minimum": 0, "maximum": 100, "default": 25, "description": "Completion rate below which momentum hints at low engagement.", }, "baseline_completions_per_day": { "type": "number", "minimum": 0, "default": 1.0, "description": "User's normal daily done-task rate (inferred from 28d history).", }, "stdev": { "type": "number", "minimum": 0, "default": 1.0, "description": "Stdev of daily completion counts; used for z-score normalisation.", }, "momentum_window": { "type": "integer", "minimum": 1, "default": 7, "description": "Days of recent history to measure current momentum against baseline.", }, }, }, context_schema=["profile.features"], required_consents=["data:core", "agent:momentum"], output_contract={"type": "snippet", "format": "free_text"}, ttl_sec=21_600, inferred_params=[ InferredParam( key="engagement_trend", ttl_sec=21_600, cold_start_default="stable", min_history=10, infer=_infer_engagement_trend, ), InferredParam( key="baseline_completions_per_day", ttl_sec=7 * 86_400, cold_start_default=1.0, min_history=14, infer=_infer_baseline_completions_per_day, ), InferredParam( key="stdev", ttl_sec=7 * 86_400, cold_start_default=1.0, min_history=14, infer=_infer_stdev, ), ], ) def _z_score_label(z: float) -> str | None: """Map z-score to a human-readable momentum label, or None if within normal range.""" if z >= 2.0: return "well above your usual pace" if z >= 1.0: return "above your usual pace" if z <= -2.0: return "well below your usual pace" if z <= -1.0: return "below your usual pace" return None class MomentumAgent(BaseAgent): """Characterises the user's recent engagement trend from profile features.""" agent_id: ClassVar[str] = MANIFEST.id ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec version: ClassVar[str] = MANIFEST.version def compute(self, inp: AgentInput) -> AgentOutput: completion = inp.profile.get("completion_rate_30d") dismiss = inp.profile.get("dismiss_rate_30d") volume = inp.profile.get("tip_volume_30d") trend: str = inp.agent_prefs.get("engagement_trend", "stable") baseline: float = float(inp.agent_prefs.get("baseline_completions_per_day", 1.0)) stdev: float = max(float(inp.agent_prefs.get("stdev", 1.0)), 0.1) window: int = int(inp.agent_prefs.get("momentum_window", 7)) # Count done events in the recent window from feedback_history. now = inp.now.astimezone(timezone.utc) cutoff = now - timedelta(days=window) recent_done = sum( 1 for e in inp.feedback_history if e.get("action") == "done" and _parse_dt(e.get("created_at", "")) >= cutoff ) recent_rate = recent_done / window # completions/day over the window z = (recent_rate - baseline) / stdev z_label = _z_score_label(z) parts: list[str] = [] if completion is not None: pct = round(completion * 100) if pct >= 50: parts.append(f"The user completes {pct}% of tips (strong engagement).") elif pct >= 25: parts.append(f"The user completes {pct}% of tips (moderate engagement).") else: parts.append( f"The user completes {pct}% of tips " f"(low engagement — prefer simple, immediately actionable tips)." ) else: parts.append("No completion-rate data yet (new user).") if dismiss is not None: dpct = round(dismiss * 100) if dpct >= 40: parts.append(f"Dismiss rate is high ({dpct}%) — avoid repetitive or irrelevant tips.") elif dpct <= 10: parts.append(f"Dismiss rate is low ({dpct}%).") if volume is not None and int(volume) < 5: parts.append("Very few tips served so far — this is an early-stage user.") # Z-score takes precedence over trend label when we have a baseline. if z_label: if z > 0: parts.append( f"Completion pace is {z_label} " f"({recent_done} done in the last {window}d vs " f"~{baseline * window:.1f} expected) — build on the momentum." ) else: parts.append( f"Completion pace is {z_label} " f"({recent_done} done in the last {window}d vs " f"~{baseline * window:.1f} expected) — a motivational or easy-win tip may help." ) elif trend == "up": parts.append("Engagement is trending up compared to last week — build on the momentum.") elif trend == "down": parts.append("Engagement is trending down — a motivational or easy-win tip may help.") prompt = " ".join(parts) if parts else "No engagement data available yet." snapshot = { "completion_rate_30d": completion, "dismiss_rate_30d": dismiss, "tip_volume_30d": volume, "engagement_trend": trend, "baseline_completions_per_day": baseline, "stdev": stdev, "momentum_window": window, "recent_done_count": recent_done, "z_score": round(z, 2), } return self._make_output(inp, prompt, snapshot)