from __future__ import annotations import math from collections import Counter from datetime import datetime, timezone from typing import ClassVar from .base import BaseAgent, AgentInput, AgentOutput from .inference.history import UserHistory from .manifest import AgentManifest, InferredParam _DOW_NAMES = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] def _parse_dt(iso: str) -> datetime: try: dt = datetime.fromisoformat(iso.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except ValueError: return datetime.min.replace(tzinfo=timezone.utc) def _infer_lookback_days(history: UserHistory) -> int: """Find the minimum window (days) that captures ≥30 done events, capped at 30. Sorts done events newest-first, then measures the span to the 30th event. If fewer than 30 done events exist, returns 30 (use the full cap). """ done = sorted( [e for e in history.events if e.action == "done"], key=lambda e: e.created_at, reverse=True, ) if len(done) < 30: return 30 latest = _parse_dt(done[0].created_at) thirtieth = _parse_dt(done[29].created_at) span = (latest - thirtieth).total_seconds() / 86_400 return max(1, min(30, math.ceil(span))) def _infer_weekly_cycle(history: UserHistory) -> list[dict]: """Peak-to-mean ratio of done events per day-of-week (0=Monday … 6=Sunday). Returns all 7 DOW entries so the caller can filter by strength threshold. """ by_dow: Counter[int] = Counter( _parse_dt(e.created_at).weekday() for e in history.events if e.action == "done" ) total = sum(by_dow.values()) if total == 0: return [] mean = total / 7 return [ { "dow": dow, "strength": round(by_dow.get(dow, 0) / mean, 3), "sample": f"completes most {_DOW_NAMES[dow]}s", } for dow in range(7) ] def _infer_daily_cycle(history: UserHistory) -> list[dict]: """Peak-to-mean ratio of done events per hour-of-day (0–23). Returns entries for hours that have at least one done event. """ by_hour: Counter[int] = Counter( _parse_dt(e.created_at).hour for e in history.events if e.action == "done" ) total = sum(by_hour.values()) if total == 0: return [] mean = total / 24 return [ { "hour": hour, "strength": round(by_hour[hour] / mean, 3), } for hour in sorted(by_hour) ] MANIFEST = AgentManifest( id="recent-patterns", version="1.2.0", # #116: lookback_days + weekly_cycle + daily_cycle inference description="Surfaces the user's reaction pattern from recent feedback.", pref_schema={ "type": "object", "additionalProperties": False, "properties": { "lookback_days": { "type": "integer", "minimum": 1, "maximum": 30, "default": 7, "description": "Lookback window sized to capture ≥30 done events.", }, "weekly_cycle": { "type": "array", "items": { "type": "object", "properties": { "dow": {"type": "integer"}, "strength": {"type": "number"}, "sample": {"type": "string"}, }, }, "default": [], "description": "Per-DOW completion strength (peak-to-mean ratio).", }, "daily_cycle": { "type": "array", "items": { "type": "object", "properties": { "hour": {"type": "integer"}, "strength": {"type": "number"}, }, }, "default": [], "description": "Per-hour completion strength (peak-to-mean ratio).", }, }, }, context_schema=["tip_feedback", "profile.features"], required_consents=["data:core", "agent:recent-patterns"], output_contract={"type": "snippet", "format": "free_text"}, ttl_sec=86_400, inferred_params=[ InferredParam( key="lookback_days", ttl_sec=86_400, cold_start_default=7, min_history=5, infer=_infer_lookback_days, ), InferredParam( key="weekly_cycle", ttl_sec=86_400, cold_start_default=[], min_history=21, # need ≥3 weeks to see a weekly signal infer=_infer_weekly_cycle, ), InferredParam( key="daily_cycle", ttl_sec=86_400, cold_start_default=[], min_history=14, infer=_infer_daily_cycle, ), ], ) _STRENGTH_THRESHOLD = 0.5 def _strong(entries: list[dict], key: str) -> list[dict]: return [e for e in entries if e.get("strength", 0) > _STRENGTH_THRESHOLD] def _hour_label(hour: int) -> str: if hour == 0: return "midnight" if hour < 12: return f"{hour}am" if hour == 12: return "noon" return f"{hour - 12}pm" class RecentPatternsAgent(BaseAgent): """Surfaces the user's reaction pattern from recent feedback.""" agent_id: ClassVar[str] = MANIFEST.id ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec version: ClassVar[str] = MANIFEST.version def compute(self, inp: AgentInput) -> AgentOutput: # Support legacy window_days pref key for backward compat. lookback_days = max( 1, int(inp.agent_prefs.get("lookback_days", inp.agent_prefs.get("window_days", 7))), ) weekly_cycle: list[dict] = inp.agent_prefs.get("weekly_cycle", []) daily_cycle: list[dict] = inp.agent_prefs.get("daily_cycle", []) window_s = lookback_days * 86_400 now_ts = inp.now.timestamp() recent = [ f for f in inp.feedback_history if self._age_s(f.get("created_at", ""), now_ts) <= window_s ] counts: Counter[str] = Counter(f.get("action") for f in recent) total = len(recent) dwell_ms = inp.profile.get("mean_dwell_ms_30d") parts: list[str] = [] if total == 0: parts.append(f"No tip reactions recorded in the last {lookback_days} days.") else: done = counts.get("done", 0) dismissed = counts.get("dismiss", 0) snoozed = counts.get("snooze", 0) parts.append( f"Last {lookback_days} days: {total} tip reaction{'s' if total != 1 else ''} — " f"{done} completed, {dismissed} dismissed, {snoozed} snoozed." ) if dwell_ms is not None: dwell_s = round(dwell_ms / 1000) if dwell_s < 15: parts.append( "Average dwell is very short — user may be acting on auto-pilot; vary tip content." ) elif dwell_s < 60: parts.append(f"Average dwell {dwell_s}s — tips are being read.") else: parts.append( f"Average dwell {dwell_s}s — user deliberates; prefer tips that reward reflection." ) # Cycle hints — only when strength > threshold. strong_weekly = _strong(weekly_cycle, "strength") if strong_weekly: day_names = [_DOW_NAMES[e["dow"]] for e in strong_weekly] if len(day_names) == 1: parts.append(f"User tends to complete tips on {day_names[0]}s.") else: joined = ", ".join(day_names[:-1]) + f" and {day_names[-1]}" parts.append(f"User tends to complete tips on {joined}s.") strong_daily = _strong(daily_cycle, "strength") if strong_daily: hour_labels = [_hour_label(e["hour"]) for e in strong_daily] if len(hour_labels) == 1: parts.append(f"User is most active around {hour_labels[0]}.") else: joined = ", ".join(hour_labels[:-1]) + f" and {hour_labels[-1]}" parts.append(f"User is most active around {joined}.") prompt = " ".join(parts) if parts else "No engagement data available yet." snapshot = { "lookback_days": lookback_days, "recent_total": total, "action_counts": dict(counts), "mean_dwell_ms_30d": dwell_ms, "strong_weekly_days": [e["dow"] for e in strong_weekly], "strong_daily_hours": [e["hour"] for e in strong_daily], } return self._make_output(inp, prompt, snapshot) @staticmethod def _age_s(iso: str, now_ts: float) -> float: if not iso: return float("inf") try: dt = datetime.fromisoformat(iso.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return now_ts - dt.timestamp() except Exception: return float("inf")