Files
oO/ml/agents/recent_patterns.py
alvis 772bb6e194 feat(consents): auto-grant data:<provider> on connect; remove agent: consents (ADR-0015)
- integrations.ts: grant data:<provider> on OAuth callback, revoke on disconnect
- Backfill migration: INSERT OR IGNORE data:<provider> for all active tokens
- Agent manifests: drop agent:<id> from required_consents (momentum, time-of-day,
  overdue-task, recent-patterns, health-vitals) — per-agent control is a preference
- eligibility.ts: update comment to reflect data:-only consent model
- test_manifest.py: assert no agent: consents remain in any manifest
- migrations.test.ts: backfill idempotency tests for issue #127
- Dockerfile.api: drop --offline flag (fixes ERR_PNPM_NO_OFFLINE_META)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 15:09:58 +00:00

272 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import math
from collections import Counter
from datetime import datetime, timezone
from typing import ClassVar
from .base import BaseAgent, AgentInput, AgentOutput
from .inference.history import UserHistory
from .manifest import AgentManifest, InferredParam
_DOW_NAMES = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
def _parse_dt(iso: str) -> datetime:
try:
dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return datetime.min.replace(tzinfo=timezone.utc)
def _infer_lookback_days(history: UserHistory) -> int:
"""Find the minimum window (days) that captures ≥30 done events, capped at 30.
Sorts done events newest-first, then measures the span to the 30th event.
If fewer than 30 done events exist, returns 30 (use the full cap).
"""
done = sorted(
[e for e in history.events if e.action == "done"],
key=lambda e: e.created_at,
reverse=True,
)
if len(done) < 30:
return 30
latest = _parse_dt(done[0].created_at)
thirtieth = _parse_dt(done[29].created_at)
span = (latest - thirtieth).total_seconds() / 86_400
return max(1, min(30, math.ceil(span)))
def _infer_weekly_cycle(history: UserHistory) -> list[dict]:
"""Peak-to-mean ratio of done events per day-of-week (0=Monday … 6=Sunday).
Returns all 7 DOW entries so the caller can filter by strength threshold.
"""
by_dow: Counter[int] = Counter(
_parse_dt(e.created_at).weekday()
for e in history.events
if e.action == "done"
)
total = sum(by_dow.values())
if total == 0:
return []
mean = total / 7
return [
{
"dow": dow,
"strength": round(by_dow.get(dow, 0) / mean, 3),
"sample": f"completes most {_DOW_NAMES[dow]}s",
}
for dow in range(7)
]
def _infer_daily_cycle(history: UserHistory) -> list[dict]:
"""Peak-to-mean ratio of done events per hour-of-day (023).
Returns entries for hours that have at least one done event.
"""
by_hour: Counter[int] = Counter(
_parse_dt(e.created_at).hour
for e in history.events
if e.action == "done"
)
total = sum(by_hour.values())
if total == 0:
return []
mean = total / 24
return [
{
"hour": hour,
"strength": round(by_hour[hour] / mean, 3),
}
for hour in sorted(by_hour)
]
MANIFEST = AgentManifest(
id="recent-patterns",
version="1.2.0", # #116: lookback_days + weekly_cycle + daily_cycle inference
description="Surfaces the user's reaction pattern from recent feedback.",
pref_schema={
"type": "object",
"additionalProperties": False,
"properties": {
"lookback_days": {
"type": "integer",
"minimum": 1,
"maximum": 30,
"default": 7,
"description": "Lookback window sized to capture ≥30 done events.",
},
"weekly_cycle": {
"type": "array",
"items": {
"type": "object",
"properties": {
"dow": {"type": "integer"},
"strength": {"type": "number"},
"sample": {"type": "string"},
},
},
"default": [],
"description": "Per-DOW completion strength (peak-to-mean ratio).",
},
"daily_cycle": {
"type": "array",
"items": {
"type": "object",
"properties": {
"hour": {"type": "integer"},
"strength": {"type": "number"},
},
},
"default": [],
"description": "Per-hour completion strength (peak-to-mean ratio).",
},
},
},
context_schema=["tip_feedback", "profile.features"],
required_consents=["data:core"],
output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=86_400,
inferred_params=[
InferredParam(
key="lookback_days",
ttl_sec=86_400,
cold_start_default=7,
min_history=5,
infer=_infer_lookback_days,
),
InferredParam(
key="weekly_cycle",
ttl_sec=86_400,
cold_start_default=[],
min_history=21, # need ≥3 weeks to see a weekly signal
infer=_infer_weekly_cycle,
),
InferredParam(
key="daily_cycle",
ttl_sec=86_400,
cold_start_default=[],
min_history=14,
infer=_infer_daily_cycle,
),
],
)
_STRENGTH_THRESHOLD = 0.5
def _strong(entries: list[dict], key: str) -> list[dict]:
return [e for e in entries if e.get("strength", 0) > _STRENGTH_THRESHOLD]
def _hour_label(hour: int) -> str:
if hour == 0:
return "midnight"
if hour < 12:
return f"{hour}am"
if hour == 12:
return "noon"
return f"{hour - 12}pm"
class RecentPatternsAgent(BaseAgent):
"""Surfaces the user's reaction pattern from recent feedback."""
agent_id: ClassVar[str] = MANIFEST.id
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
version: ClassVar[str] = MANIFEST.version
def compute(self, inp: AgentInput) -> AgentOutput:
# Support legacy window_days pref key for backward compat.
lookback_days = max(
1,
int(inp.agent_prefs.get("lookback_days", inp.agent_prefs.get("window_days", 7))),
)
weekly_cycle: list[dict] = inp.agent_prefs.get("weekly_cycle", [])
daily_cycle: list[dict] = inp.agent_prefs.get("daily_cycle", [])
window_s = lookback_days * 86_400
now_ts = inp.now.timestamp()
recent = [
f for f in inp.feedback_history
if self._age_s(f.get("created_at", ""), now_ts) <= window_s
]
counts: Counter[str] = Counter(f.get("action") for f in recent)
total = len(recent)
dwell_ms = inp.profile.get("mean_dwell_ms_30d")
parts: list[str] = []
if total == 0:
parts.append(f"No tip reactions recorded in the last {lookback_days} days.")
else:
done = counts.get("done", 0)
dismissed = counts.get("dismiss", 0)
snoozed = counts.get("snooze", 0)
parts.append(
f"Last {lookback_days} days: {total} tip reaction{'s' if total != 1 else ''}"
f"{done} completed, {dismissed} dismissed, {snoozed} snoozed."
)
if dwell_ms is not None:
dwell_s = round(dwell_ms / 1000)
if dwell_s < 15:
parts.append(
"Average dwell is very short — user may be acting on auto-pilot; vary tip content."
)
elif dwell_s < 60:
parts.append(f"Average dwell {dwell_s}s — tips are being read.")
else:
parts.append(
f"Average dwell {dwell_s}s — user deliberates; prefer tips that reward reflection."
)
# Cycle hints — only when strength > threshold.
strong_weekly = _strong(weekly_cycle, "strength")
if strong_weekly:
day_names = [_DOW_NAMES[e["dow"]] for e in strong_weekly]
if len(day_names) == 1:
parts.append(f"User tends to complete tips on {day_names[0]}s.")
else:
joined = ", ".join(day_names[:-1]) + f" and {day_names[-1]}"
parts.append(f"User tends to complete tips on {joined}s.")
strong_daily = _strong(daily_cycle, "strength")
if strong_daily:
hour_labels = [_hour_label(e["hour"]) for e in strong_daily]
if len(hour_labels) == 1:
parts.append(f"User is most active around {hour_labels[0]}.")
else:
joined = ", ".join(hour_labels[:-1]) + f" and {hour_labels[-1]}"
parts.append(f"User is most active around {joined}.")
prompt = " ".join(parts) if parts else "No engagement data available yet."
snapshot = {
"lookback_days": lookback_days,
"recent_total": total,
"action_counts": dict(counts),
"mean_dwell_ms_30d": dwell_ms,
"strong_weekly_days": [e["dow"] for e in strong_weekly],
"strong_daily_hours": [e["hour"] for e in strong_daily],
}
return self._make_output(inp, prompt, snapshot)
@staticmethod
def _age_s(iso: str, now_ts: float) -> float:
if not iso:
return float("inf")
try:
dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return now_ts - dt.timestamp()
except Exception:
return float("inf")