Compare commits

..

6 Commits

Author SHA1 Message Date
26fc67776f feat(agents): semantic task clustering + focus-area inferred preferred_areas (#97, #113)
- New ml/agents/clustering.py: embed task content via nomic-embed-text
  (Ollama), greedy cosine clustering (threshold 0.72, max 6 clusters),
  graceful fallback to project-id grouping when Ollama is unreachable
- focus_area v2.0.0: compute() uses semantic clusters as focus areas;
  adds preferred_areas InferredParam inferred from top-2 projects by
  task_completion count
- 135 tests, all passing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 06:54:46 +00:00
336644a90a docs: update CLAUDE.md with rich per-agent inference completions (#112–#116)
- Inference framework table updated: all agents at v1.2.0 with full param list
- Documents UserHistory.task_completions and AgentInferRequest.task_completions
- Marks #112/114/115/116 complete in recent completions
- Active work updated: #78 closed, #61 and #97/#113 as next priorities

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 06:28:30 +00:00
1d9a395591 feat(agents): quiet window + peak hours + tz prefs for time-of-day agent (#112)
Adds four InferredParams (all TTL=24h, min_history=50 except preferred_hour=10):
- quiet_start / quiet_end: longest contiguous below-baseline hour run (HH:MM)
- peak_hours: top-quartile done-event hours, sorted ascending
- tz: cold-start only ("UTC"); populated from auth provider, no inference function

compute() updated:
- in_quiet check (quiet window) takes precedence over peak hours
- in_peak emits "peak productivity hour" language when current hour is in peak_hours
- approaching peak (within 2h) surfaces for orchestrator timing
- tz surfaced in snippet header when not UTC
- snapshot adds peak_hours, in_quiet, in_peak, tz

- Agent bumped to v1.2.0
- 21 new tests: night-owl, early-bird, shift-worker, quiet/peak snippet rendering
- Fixed test_snapshot_keys in test_agents.py to include new snapshot fields

Closes #112

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 06:05:51 +00:00
bc71dc203d feat(agents): adaptive lookback + weekly/daily cycle detection for recent-patterns (#116)
Replaces the coarse density-bucket window_days with three InferredParams (all TTL=24h):
- lookback_days: min window containing ≥30 done events, capped at 30d (min_history=5)
- weekly_cycle: per-DOW peak-to-mean strength list (min_history=21, ≥3 weeks of signal)
- daily_cycle: per-hour peak-to-mean strength list (min_history=14)

compute() renders cycle hints when strength > 0.5:
  "User tends to complete tips on Tuesdays and Saturdays."
  "User is most active around 8pm."
Legacy window_days pref key still accepted as a fallback.

- window_days pref renamed lookback_days; backward-compat fallback in compute()
- Agent bumped to v1.2.0
- 19 new tests: weekend-warrior, weekday-only, evening-person, no-pattern,
  legacy compat, snippet rendering with strong/weak signals

Closes #116

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 05:51:45 +00:00
4cade4868b feat(agents): per-user baseline + stdev inference for momentum agent (#114)
Adds two InferredParams (TTL=7d) computed from 28-day rolling daily done counts:
- baseline_completions_per_day: mean done events/day over the window
- stdev: stdev of daily counts (floored at 0.1 to avoid division by zero)

MomentumAgent.compute() now calculates a z-score from recent done events in
inp.feedback_history vs the inferred baseline. Snippet language switches to
z-score framing ("above your usual pace", "slowing down") when |z| >= 1.0,
falling back to engagement_trend labels when in the normal range.

- engagement_trend InferredParam preserved for backward compatibility
- momentum_window pref added (default 7, user-overridable)
- 14 new tests covering power user, casual user, returning-from-break, and
  relative stdev comparison; engagement_trend tests updated for z-score priority
- Agent bumped to v1.2.0

Closes #114

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 05:18:29 +00:00
04212ff318 feat(agents): p50-lateness tolerance + per-project realness for overdue-task (#115)
Replaces snooze-rate heuristic with p50 of actual task lateness (completedAt − dueAt).
Adds project_realness inference: projects with chronic lateness get realness < 1 and
the agent softens its snippet language from "overdue" to "past target date".

- TaskCompletion added to UserHistory with lateness_days computed property
- _infer_lateness_tolerance: p50 of task_completions, clipped at 0, float
- _infer_project_realness: per-project median lateness normalised by global median
- Both InferredParams use 7d TTL; cold_start = 0.0 / {}
- AgentInferRequest accepts task_completions; endpoint wires them through
- 12 new tests covering punctual/chronic/mixed users and language softening
- Agent bumped to v1.2.0

Closes #115

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 05:14:04 +00:00
14 changed files with 1502 additions and 190 deletions

View File

@@ -107,10 +107,11 @@ Recent completions:
- Admin UX refinements: feedback consolidation, settings placement (#100102)
- ADR-0012 — ε-greedy v2 (D=12) — 2026-04-26 (now superseded by ADR-0013)
- ADR-0014 complete: unified Profile schema + backfill, manifest plumbing, `/api/profile` read-through, registry-driven eligibility filter, inference framework + per-agent inference, legacy consent column drop — 2026-05-05
- Rich per-agent inference for all four active agents (#112, #114, #115, #116) — 2026-05-06: quiet/peak hours (time-of-day), z-score baseline (momentum), p50 lateness + project realness (overdue-task), adaptive lookback + weekly/daily cycles (recent-patterns)
Active work (M2):
- Signal abstraction for multi-source support (#78)
- Per-user feature freshness SLAs (#61, ADR-0011 phase B)
- Embedding-based task clustering for focus-area inference (#97, #113)
## ADR-0014 endpoint map (as of step 6)
@@ -131,15 +132,18 @@ Lives in `ml/agents/inference/`. `run_inference(manifest, history)` evaluates al
- `infer()` error → emit `cold_start_default` (never crashes)
- Results written to `user_preferences` with `source='inferred'`; keys with `source='user'` are never overwritten
All five agents are at v1.1.0. Per-agent inferred params:
| Agent | Inferred param | Logic |
|-------|---------------|-------|
| `time-of-day` | `preferred_hour` | Mode done-hour from feedback history |
| `momentum` | `engagement_trend` | Done-rate last 7d vs prior 7d |
| `overdue-task` | `lateness_tolerance_days` | Snooze rate → 0/1/2 days |
| `recent-patterns` | `window_days` | Event density → 7/14/30 days |
All five agents are at v1.2.0. Per-agent inferred params (all live in `ml/agents/<name>.py`):
| Agent | Inferred params | Notes |
|-------|----------------|-------|
| `time-of-day` | `preferred_hour`, `quiet_start`, `quiet_end`, `peak_hours`, `tz` | Quiet window = longest below-baseline hour run; peak = top-quartile done hours; tz cold-start only (from auth provider) |
| `momentum` | `engagement_trend`, `baseline_completions_per_day`, `stdev` | Baseline = 28d rolling mean done/day; snippet uses z-score language |
| `overdue-task` | `lateness_tolerance_days`, `project_realness` | Tolerance = p50 lateness from TaskCompletion history; realness = project median vs global median |
| `recent-patterns` | `lookback_days`, `weekly_cycle`, `daily_cycle` | Lookback sized to ≥30 done events; cycles use peak-to-mean ratio; snippet hints when strength > 0.5 |
| `focus-area` | *(none yet)* | Needs project-level feedback linkage (#78) |
`UserHistory` carries both `events: list[FeedbackEvent]` and `task_completions: list[TaskCompletion]`. `AgentInferRequest` (ml/serving) accepts `task_completions: list[dict]` alongside `feedback_history`.
## What NOT to do
- Don't copy Todoist's data into our DB. Store the OAuth token + computed features/derivatives we need, fetch raw on demand.

152
ml/agents/clustering.py Normal file
View File

@@ -0,0 +1,152 @@
"""Semantic task clustering via nomic-embed-text (issue #97).
Public API:
cluster_tasks(tasks, ollama_url) -> list[Cluster]
Each task dict must have a "content" key. Tasks without content are placed in a
fallback "other" bucket. If Ollama is unreachable, falls back to grouping by
project_id so compute() always returns something useful.
"""
from __future__ import annotations
import logging
import math
import os
from dataclasses import dataclass, field
import httpx
log = logging.getLogger(__name__)
# Cosine similarity threshold for merging tasks into the same cluster.
_SIM_THRESHOLD = 0.72
# Never produce more than this many clusters regardless of task count.
_MAX_CLUSTERS = 6
_EMBED_TIMEOUT = 10.0
@dataclass
class Cluster:
label: str # representative task content (shortest, most central)
tasks: list[dict] = field(default_factory=list)
@property
def task_count(self) -> int:
return len(self.tasks)
@property
def overdue_count(self) -> int:
return sum(1 for t in self.tasks if t.get("is_overdue"))
def _embed(text: str, ollama_url: str) -> list[float] | None:
try:
with httpx.Client(trust_env=False, timeout=_EMBED_TIMEOUT) as c:
r = c.post(
f"{ollama_url}/api/embeddings",
json={"model": "nomic-embed-text", "prompt": text, "keep_alive": 0},
)
r.raise_for_status()
return r.json().get("embedding")
except Exception as exc:
log.debug("embed_failed text=%r error=%s", text[:40], exc)
return None
def _cosine(a: list[float], b: list[float]) -> float:
dot = sum(x * y for x, y in zip(a, b))
na = math.sqrt(sum(x * x for x in a))
nb = math.sqrt(sum(x * x for x in b))
if na == 0 or nb == 0:
return 0.0
return dot / (na * nb)
def _greedy_cluster(items: list[tuple[dict, list[float]]]) -> list[Cluster]:
"""Single-pass greedy clustering: each item joins the first existing cluster
whose centroid is above _SIM_THRESHOLD, else starts a new one."""
clusters: list[tuple[list[float], Cluster]] = [] # (centroid, cluster)
for task, vec in items:
best_idx = -1
best_sim = _SIM_THRESHOLD - 1e-9
for i, (centroid, _) in enumerate(clusters):
sim = _cosine(centroid, vec)
if sim > best_sim:
best_sim = sim
best_idx = i
if best_idx >= 0 and len(clusters) < _MAX_CLUSTERS:
centroid, cluster = clusters[best_idx]
cluster.tasks.append(task)
# Update centroid as running mean.
n = len(cluster.tasks)
new_centroid = [(c * (n - 1) + v) / n for c, v in zip(centroid, vec)]
clusters[best_idx] = (new_centroid, cluster)
elif len(clusters) < _MAX_CLUSTERS:
label = task.get("content", "Tasks")[:60]
cluster = Cluster(label=label, tasks=[task])
clusters.append((vec, cluster))
else:
# Overflow: append to closest cluster even below threshold.
best_i = max(range(len(clusters)), key=lambda i: _cosine(clusters[i][0], vec))
clusters[best_i][1].tasks.append(task)
return [c for _, c in clusters]
def _fallback_by_project(tasks: list[dict]) -> list[Cluster]:
"""Group by project_id when embeddings are unavailable."""
buckets: dict[str, Cluster] = {}
for task in tasks:
pid = task.get("project_id") or task.get("project") or "default"
if pid not in buckets:
label = pid if pid != "default" else "Tasks"
buckets[pid] = Cluster(label=label)
buckets[pid].tasks.append(task)
return list(buckets.values())
def cluster_tasks(
tasks: list[dict],
ollama_url: str | None = None,
) -> list[Cluster]:
"""Cluster tasks by semantic similarity.
Returns a non-empty list of Cluster objects. Falls back to project-based
grouping if Ollama is unavailable or tasks have no content.
"""
if not tasks:
return []
url = ollama_url or os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
# Separate tasks with usable content from those without.
with_content = [(t, t.get("content", "").strip()) for t in tasks]
embeddable = [(t, c) for t, c in with_content if c]
no_content = [t for t, c in with_content if not c]
if not embeddable:
return _fallback_by_project(tasks)
# Fetch embeddings (best-effort; None means Ollama unavailable).
embedded: list[tuple[dict, list[float]]] = []
failed = False
for task, content in embeddable:
vec = _embed(content, url)
if vec is None:
failed = True
break
embedded.append((task, vec))
if failed or not embedded:
log.info("cluster_tasks: ollama unavailable, falling back to project grouping")
return _fallback_by_project(tasks)
clusters = _greedy_cluster(embedded)
# Tasks without content get their own bucket if any.
if no_content:
clusters.append(Cluster(label="Other tasks", tasks=no_content))
return clusters

View File

@@ -1,16 +1,27 @@
from __future__ import annotations
from collections import defaultdict
from collections import Counter
from typing import ClassVar
from .base import BaseAgent, AgentInput, AgentOutput
from .manifest import AgentManifest
from .clustering import cluster_tasks
from .inference.history import UserHistory
from .manifest import AgentManifest, InferredParam
def _infer_preferred_areas(history: UserHistory) -> list[str]:
"""Top-2 project IDs by completed task count (last 90 days worth of data)."""
counts: Counter[str] = Counter()
for tc in history.task_completions:
if tc.project_id:
counts[tc.project_id] += 1
return [pid for pid, _ in counts.most_common(2)]
MANIFEST = AgentManifest(
id="focus-area",
version="1.1.0", # bumped: preferred_areas pref is now honoured in compute (#113)
description="Identifies the most congested project/area in the user's task list.",
version="2.0.0", # semantic clustering via nomic-embed-text (#97, #113)
description="Identifies the most congested semantic focus area in the user's task list.",
pref_schema={
"type": "object",
"additionalProperties": False,
@@ -19,7 +30,7 @@ MANIFEST = AgentManifest(
"type": "array",
"items": {"type": "string"},
"default": [],
"description": "Project / label names to prioritise when multiple areas tie.",
"description": "Project IDs or label names to prioritise when multiple areas tie.",
},
},
},
@@ -27,59 +38,75 @@ MANIFEST = AgentManifest(
required_consents=["data:core", "data:todoist", "agent:focus-area"],
output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=43_200,
# No inferred_params: preferred_areas requires project-level feedback linkage
# that isn't available in feedback_history alone. Revisit with #78 (signal
# abstraction) once per-task reactions can be traced back to a project.
inferred_params=[
InferredParam(
key="preferred_areas",
ttl_sec=86_400,
cold_start_default=[],
min_history=0, # use task_completions, not feedback events; handle empty inside
infer=_infer_preferred_areas,
),
],
)
class FocusAreaAgent(BaseAgent):
"""Identifies the most congested project/area in the user's task list."""
"""Identifies the most congested semantic focus area in the user's task list."""
agent_id: ClassVar[str] = MANIFEST.id
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
version: ClassVar[str] = MANIFEST.version
def compute(self, inp: AgentInput) -> AgentOutput:
preferred: list[str] = inp.agent_prefs.get("preferred_areas", [])
by_project: dict[str, list[dict]] = defaultdict(list)
for task in inp.tasks:
project = task.get("project_id") or task.get("project") or "default"
by_project[project].append(task)
if not by_project:
prompt = "No tasks available to identify a focus area."
return self._make_output(inp, prompt, {"project_count": 0})
def score(project: str, tasks: list[dict]) -> tuple[float, bool]:
base = sum(2.0 if t.get("is_overdue") else 1.0 for t in tasks)
# Boost preferred areas to break ties in their favour
boosted = project in preferred or any(p in project for p in preferred)
return (base + (0.5 if boosted else 0.0), boosted)
top_project, top_tasks = max(
by_project.items(),
key=lambda kv: score(kv[0], kv[1]),
if not inp.tasks:
return self._make_output(
inp,
"No tasks available to identify a focus area.",
{"cluster_count": 0, "strategy": "none"},
)
overdue_in_top = sum(1 for t in top_tasks if t.get("is_overdue"))
label = "the default project" if top_project == "default" else f'"{top_project}"'
n = len(top_tasks)
boosted = top_project in preferred or any(p in top_project for p in preferred)
clusters = cluster_tasks(inp.tasks)
if not clusters:
return self._make_output(
inp,
"No tasks available to identify a focus area.",
{"cluster_count": 0, "strategy": "none"},
)
strategy = "semantic" if len(clusters) > 1 or len(inp.tasks) > 1 else "fallback"
def score(cluster) -> float:
base = sum(2.0 if t.get("is_overdue") else 1.0 for t in cluster.tasks)
boosted = any(p in cluster.label for p in preferred) if preferred else False
return base + (0.5 if boosted else 0.0)
top = max(clusters, key=score)
boosted = bool(preferred) and any(p in top.label for p in preferred)
parts = [
f"The user's most congested area is {label} "
f"({n} task{'s' if n != 1 else ''}, {overdue_in_top} overdue)."
f'The user\'s most active focus area is "{top.label}" '
f"({top.task_count} task{'s' if top.task_count != 1 else ''}, "
f"{top.overdue_count} overdue)."
]
if boosted:
parts.append("This area matches the user's stated focus preferences.")
if overdue_in_top >= 3:
if top.overdue_count >= 3:
parts.append("Consider surfacing an action from this area.")
if len(clusters) > 1:
other_total = sum(c.task_count for c in clusters if c is not top)
parts.append(
f"{len(clusters) - 1} other area{'s' if len(clusters) > 2 else ''} "
f"contain {other_total} task{'s' if other_total != 1 else ''}."
)
prompt = " ".join(parts)
snapshot = {
"top_project": top_project,
"top_task_count": n,
"top_overdue_count": overdue_in_top,
"project_count": len(by_project),
"top_cluster_label": top.label,
"top_task_count": top.task_count,
"top_overdue_count": top.overdue_count,
"cluster_count": len(clusters),
"strategy": strategy,
"preferred_areas": preferred,
}
return self._make_output(inp, prompt, snapshot)
return self._make_output(inp, " ".join(parts), snapshot)

View File

@@ -4,6 +4,6 @@ Each agent's manifest declares InferredParams; this package owns the
scheduling contract, history data model, and write path to user_preferences.
"""
from .framework import run_inference
from .history import FeedbackEvent, UserHistory
from .history import FeedbackEvent, TaskCompletion, UserHistory
__all__ = ["run_inference", "FeedbackEvent", "UserHistory"]
__all__ = ["run_inference", "FeedbackEvent", "TaskCompletion", "UserHistory"]

View File

@@ -23,7 +23,27 @@ class FeedbackEvent:
return dt.hour
@dataclass
class TaskCompletion:
"""A completed task that had a due date — used for lateness inference."""
project_id: str | None
completed_at: str # ISO 8601
due_at: str # ISO 8601
@property
def lateness_days(self) -> float:
"""Days between due_at and completed_at. Negative = completed early."""
try:
def _parse(s: str) -> datetime:
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
return (_parse(self.completed_at) - _parse(self.due_at)).total_seconds() / 86_400
except ValueError:
return 0.0
@dataclass
class UserHistory:
user_id: str
events: list[FeedbackEvent] = field(default_factory=list)
task_completions: list[TaskCompletion] = field(default_factory=list)

View File

@@ -1,5 +1,8 @@
from __future__ import annotations
import math
import statistics
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from typing import ClassVar
@@ -8,6 +11,49 @@ from .inference.history import UserHistory
from .manifest import AgentManifest, InferredParam
def _parse_dt(iso: str) -> datetime:
try:
dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return datetime.min.replace(tzinfo=timezone.utc)
def _daily_done_counts(history: UserHistory, window_days: int = 28) -> list[int]:
"""Count done-action events per calendar day over the last window_days days."""
if not history.events:
return []
latest = max(_parse_dt(e.created_at) for e in history.events)
cutoff = latest - timedelta(days=window_days)
by_day: dict[tuple[int, int, int], int] = defaultdict(int)
for e in history.events:
if e.action == "done":
dt = _parse_dt(e.created_at)
if dt >= cutoff:
by_day[(dt.year, dt.month, dt.day)] += 1
# Return counts for every day in the window, including zero-completion days.
counts = []
for offset in range(window_days):
day = (latest - timedelta(days=offset)).date()
counts.append(by_day.get((day.year, day.month, day.day), 0))
return counts
def _infer_baseline_completions_per_day(history: UserHistory) -> float:
counts = _daily_done_counts(history)
return statistics.mean(counts) if counts else 1.0
def _infer_stdev(history: UserHistory) -> float:
counts = _daily_done_counts(history)
if len(counts) < 2:
return 1.0
sd = statistics.stdev(counts)
return max(sd, 0.1) # floor so we never divide by zero in z-score
def _infer_engagement_trend(history: UserHistory) -> str:
"""Compare done-rate in the most recent 7 days vs the 7 days before that."""
events = sorted(history.events, key=lambda e: e.created_at)
@@ -26,7 +72,7 @@ def _infer_engagement_trend(history: UserHistory) -> str:
older = [e for e in events if cutoff_older <= _parse_dt(e.created_at) < cutoff_recent]
if len(older) < 3:
return "stable" # not enough baseline to compare
return "stable"
recent_rate = sum(1 for e in recent if e.action == "done") / max(len(recent), 1)
older_rate = sum(1 for e in older if e.action == "done") / max(len(older), 1)
@@ -39,19 +85,9 @@ def _infer_engagement_trend(history: UserHistory) -> str:
return "stable"
def _parse_dt(iso: str) -> datetime:
try:
dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return datetime.min.replace(tzinfo=timezone.utc)
MANIFEST = AgentManifest(
id="momentum",
version="1.1.0", # bumped: engagement_trend InferredParam added (#114)
version="1.2.0", # #114: baseline + stdev inferred params; z-score snippet language
description="Characterises the user's recent engagement trend from profile features.",
pref_schema={
"type": "object",
@@ -64,6 +100,24 @@ MANIFEST = AgentManifest(
"default": 25,
"description": "Completion rate below which momentum hints at low engagement.",
},
"baseline_completions_per_day": {
"type": "number",
"minimum": 0,
"default": 1.0,
"description": "User's normal daily done-task rate (inferred from 28d history).",
},
"stdev": {
"type": "number",
"minimum": 0,
"default": 1.0,
"description": "Stdev of daily completion counts; used for z-score normalisation.",
},
"momentum_window": {
"type": "integer",
"minimum": 1,
"default": 7,
"description": "Days of recent history to measure current momentum against baseline.",
},
},
},
context_schema=["profile.features"],
@@ -73,15 +127,42 @@ MANIFEST = AgentManifest(
inferred_params=[
InferredParam(
key="engagement_trend",
ttl_sec=21_600, # recompute every 6 hours alongside snippet
ttl_sec=21_600,
cold_start_default="stable",
min_history=10,
infer=_infer_engagement_trend,
),
InferredParam(
key="baseline_completions_per_day",
ttl_sec=7 * 86_400,
cold_start_default=1.0,
min_history=14,
infer=_infer_baseline_completions_per_day,
),
InferredParam(
key="stdev",
ttl_sec=7 * 86_400,
cold_start_default=1.0,
min_history=14,
infer=_infer_stdev,
),
],
)
def _z_score_label(z: float) -> str | None:
"""Map z-score to a human-readable momentum label, or None if within normal range."""
if z >= 2.0:
return "well above your usual pace"
if z >= 1.0:
return "above your usual pace"
if z <= -2.0:
return "well below your usual pace"
if z <= -1.0:
return "below your usual pace"
return None
class MomentumAgent(BaseAgent):
"""Characterises the user's recent engagement trend from profile features."""
agent_id: ClassVar[str] = MANIFEST.id
@@ -93,6 +174,20 @@ class MomentumAgent(BaseAgent):
dismiss = inp.profile.get("dismiss_rate_30d")
volume = inp.profile.get("tip_volume_30d")
trend: str = inp.agent_prefs.get("engagement_trend", "stable")
baseline: float = float(inp.agent_prefs.get("baseline_completions_per_day", 1.0))
stdev: float = max(float(inp.agent_prefs.get("stdev", 1.0)), 0.1)
window: int = int(inp.agent_prefs.get("momentum_window", 7))
# Count done events in the recent window from feedback_history.
now = inp.now.astimezone(timezone.utc)
cutoff = now - timedelta(days=window)
recent_done = sum(
1 for e in inp.feedback_history
if e.get("action") == "done" and _parse_dt(e.get("created_at", "")) >= cutoff
)
recent_rate = recent_done / window # completions/day over the window
z = (recent_rate - baseline) / stdev
z_label = _z_score_label(z)
parts: list[str] = []
@@ -120,7 +215,21 @@ class MomentumAgent(BaseAgent):
if volume is not None and int(volume) < 5:
parts.append("Very few tips served so far — this is an early-stage user.")
if trend == "up":
# Z-score takes precedence over trend label when we have a baseline.
if z_label:
if z > 0:
parts.append(
f"Completion pace is {z_label} "
f"({recent_done} done in the last {window}d vs "
f"~{baseline * window:.1f} expected) — build on the momentum."
)
else:
parts.append(
f"Completion pace is {z_label} "
f"({recent_done} done in the last {window}d vs "
f"~{baseline * window:.1f} expected) — a motivational or easy-win tip may help."
)
elif trend == "up":
parts.append("Engagement is trending up compared to last week — build on the momentum.")
elif trend == "down":
parts.append("Engagement is trending down — a motivational or easy-win tip may help.")
@@ -131,5 +240,10 @@ class MomentumAgent(BaseAgent):
"dismiss_rate_30d": dismiss,
"tip_volume_30d": volume,
"engagement_trend": trend,
"baseline_completions_per_day": baseline,
"stdev": stdev,
"momentum_window": window,
"recent_done_count": recent_done,
"z_score": round(z, 2),
}
return self._make_output(inp, prompt, snapshot)

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import statistics
from typing import ClassVar
from .base import BaseAgent, AgentInput, AgentOutput
@@ -7,36 +8,64 @@ from .inference.history import UserHistory
from .manifest import AgentManifest, InferredParam
def _infer_lateness_tolerance(history: UserHistory) -> int:
"""Estimate how many days past due a task needs to be before the user acts.
def _infer_lateness_tolerance(history: UserHistory) -> float:
"""p50 lateness (days) across completed tasks that had a due date, clipped at 0.
High snooze rate → user doesn't act immediately → raise tolerance so the
agent doesn't nag them about tasks they'll handle in their own time.
Negative lateness (finished early) pulls the percentile down; we clip at 0
so punctual users always get tolerance=0, never a negative offset.
"""
total = len(history.events)
if total == 0:
return 0
snooze_rate = sum(1 for e in history.events if e.action == "snooze") / total
if snooze_rate > 0.40:
return 2
if snooze_rate > 0.20:
return 1
return 0
lateness = [c.lateness_days for c in history.task_completions]
if not lateness:
return 0.0
return max(0.0, statistics.median(lateness))
def _infer_project_realness(history: UserHistory) -> dict[str, float]:
"""Per-project realness: 1 (median project lateness / global median lateness).
Projects whose tasks are consistently completed on time get realness ≈ 1.
Aspirational projects (chronic lateness) get realness closer to 0.
"""
completions = [c for c in history.task_completions if c.project_id]
if not completions:
return {}
global_median = statistics.median(c.lateness_days for c in completions)
if global_median <= 0:
# Everyone finishes early — no project is less real than another.
return {pid: 1.0 for pid in {c.project_id for c in completions}} # type: ignore[misc]
by_project: dict[str, list[float]] = {}
for c in completions:
by_project.setdefault(c.project_id, []).append(c.lateness_days) # type: ignore[index]
result: dict[str, float] = {}
for pid, days in by_project.items():
project_median = statistics.median(days)
realness = 1.0 - (project_median / global_median)
result[pid] = round(max(0.0, min(1.0, realness)), 3)
return result
MANIFEST = AgentManifest(
id="overdue-task",
version="1.1.0", # bumped: lateness_tolerance_days InferredParam added (#115)
version="1.2.0", # #115: p50-lateness tolerance + per-project realness
description="Reports the user's overdue tasks by count and age.",
pref_schema={
"type": "object",
"additionalProperties": False,
"properties": {
"lateness_tolerance_days": {
"type": "integer",
"type": "number",
"minimum": 0,
"default": 0,
"description": "Days past due before a task is considered overdue. 0 = the moment it's late.",
"description": "Days past due before a task is flagged. p50 of historical lateness.",
},
"project_realness": {
"type": "object",
"additionalProperties": {"type": "number", "minimum": 0, "maximum": 1},
"default": {},
"description": "Per-project realness score [0,1]. Low = aspirational due dates.",
},
},
},
@@ -48,15 +77,40 @@ MANIFEST = AgentManifest(
inferred_params=[
InferredParam(
key="lateness_tolerance_days",
ttl_sec=86_400, # recompute daily — snooze pattern shifts slowly
cold_start_default=0,
ttl_sec=7 * 86_400, # recompute weekly — lateness habits shift slowly
cold_start_default=0.0,
min_history=10,
infer=_infer_lateness_tolerance,
),
InferredParam(
key="project_realness",
ttl_sec=7 * 86_400,
cold_start_default={},
min_history=10,
infer=_infer_project_realness,
),
],
)
def _realness(project_id: str | None, project_realness: dict[str, float]) -> float:
"""Return realness for a project, defaulting to 1.0 (treat as real)."""
if not project_id or not project_realness:
return 1.0
return project_realness.get(project_id, 1.0)
def _format_task(task: dict, project_realness: dict[str, float]) -> str:
content = task["content"]
age = round(task.get("task_age_days", 0))
pid = task.get("project_id")
r = _realness(pid, project_realness)
unit = "day" if age == 1 else "days"
if r < 0.4:
return f'"{content}" ({age} {unit} past target date)'
return f'"{content}" ({age} {unit} overdue)'
class OverdueTaskAgent(BaseAgent):
"""Reports the user's overdue tasks by count and age."""
agent_id: ClassVar[str] = MANIFEST.id
@@ -64,7 +118,9 @@ class OverdueTaskAgent(BaseAgent):
version: ClassVar[str] = MANIFEST.version
def compute(self, inp: AgentInput) -> AgentOutput:
tolerance = max(0, int(inp.agent_prefs.get("lateness_tolerance_days", 0)))
tolerance = max(0.0, float(inp.agent_prefs.get("lateness_tolerance_days", 0)))
project_realness: dict[str, float] = inp.agent_prefs.get("project_realness", {})
overdue = [
t for t in inp.tasks
if t.get("is_overdue") and t.get("task_age_days", 0) >= tolerance
@@ -75,18 +131,21 @@ class OverdueTaskAgent(BaseAgent):
prompt = "The user has no overdue tasks at this time."
elif len(overdue) == 1:
t = top[0]
age = round(t.get("task_age_days", 0))
prompt = (
f'The user has 1 overdue task: "{t["content"]}" '
f"({age} day{'s' if age != 1 else ''} overdue)."
)
r = _realness(t.get("project_id"), project_realness)
item = _format_task(t, project_realness)
if r < 0.4:
prompt = f"The user has 1 task past its target date: {item}."
else:
items = ", ".join(
f'"{t["content"]}" ({round(t.get("task_age_days", 0))}d)'
for t in top
prompt = f"The user has 1 overdue task: {item}."
else:
items = ", ".join(_format_task(t, project_realness) for t in top)
avg_realness = (
sum(_realness(t.get("project_id"), project_realness) for t in overdue)
/ len(overdue)
)
label = "tasks past their target dates" if avg_realness < 0.4 else "overdue tasks"
prompt = (
f"The user has {len(overdue)} overdue tasks. "
f"The user has {len(overdue)} {label}. "
f"Top {len(top)}: {items}."
)
@@ -94,7 +153,12 @@ class OverdueTaskAgent(BaseAgent):
"overdue_count": len(overdue),
"lateness_tolerance_days": tolerance,
"top_overdue": [
{"content": t["content"], "task_age_days": t.get("task_age_days", 0)}
{
"content": t["content"],
"task_age_days": t.get("task_age_days", 0),
"project_id": t.get("project_id"),
"realness": _realness(t.get("project_id"), project_realness),
}
for t in top
],
}

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import math
from collections import Counter
from datetime import datetime, timezone
from typing import ClassVar
@@ -8,35 +9,124 @@ from .base import BaseAgent, AgentInput, AgentOutput
from .inference.history import UserHistory
from .manifest import AgentManifest, InferredParam
_DOW_NAMES = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
def _infer_window_days(history: UserHistory) -> int:
"""Infer the optimal lookback window from feedback event density.
More events per day → a shorter window captures the user's current state
accurately. Sparse feedback → widen the window to gather signal.
def _parse_dt(iso: str) -> datetime:
try:
dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return datetime.min.replace(tzinfo=timezone.utc)
def _infer_lookback_days(history: UserHistory) -> int:
"""Find the minimum window (days) that captures ≥30 done events, capped at 30.
Sorts done events newest-first, then measures the span to the 30th event.
If fewer than 30 done events exist, returns 30 (use the full cap).
"""
n = len(history.events)
if n >= 14:
return 7
if n >= 7:
return 14
done = sorted(
[e for e in history.events if e.action == "done"],
key=lambda e: e.created_at,
reverse=True,
)
if len(done) < 30:
return 30
latest = _parse_dt(done[0].created_at)
thirtieth = _parse_dt(done[29].created_at)
span = (latest - thirtieth).total_seconds() / 86_400
return max(1, min(30, math.ceil(span)))
def _infer_weekly_cycle(history: UserHistory) -> list[dict]:
"""Peak-to-mean ratio of done events per day-of-week (0=Monday … 6=Sunday).
Returns all 7 DOW entries so the caller can filter by strength threshold.
"""
by_dow: Counter[int] = Counter(
_parse_dt(e.created_at).weekday()
for e in history.events
if e.action == "done"
)
total = sum(by_dow.values())
if total == 0:
return []
mean = total / 7
return [
{
"dow": dow,
"strength": round(by_dow.get(dow, 0) / mean, 3),
"sample": f"completes most {_DOW_NAMES[dow]}s",
}
for dow in range(7)
]
def _infer_daily_cycle(history: UserHistory) -> list[dict]:
"""Peak-to-mean ratio of done events per hour-of-day (023).
Returns entries for hours that have at least one done event.
"""
by_hour: Counter[int] = Counter(
_parse_dt(e.created_at).hour
for e in history.events
if e.action == "done"
)
total = sum(by_hour.values())
if total == 0:
return []
mean = total / 24
return [
{
"hour": hour,
"strength": round(by_hour[hour] / mean, 3),
}
for hour in sorted(by_hour)
]
MANIFEST = AgentManifest(
id="recent-patterns",
version="1.1.0", # bumped: window_days InferredParam added (#116)
version="1.2.0", # #116: lookback_days + weekly_cycle + daily_cycle inference
description="Surfaces the user's reaction pattern from recent feedback.",
pref_schema={
"type": "object",
"additionalProperties": False,
"properties": {
"window_days": {
"lookback_days": {
"type": "integer",
"minimum": 1,
"maximum": 30,
"default": 7,
"description": "Lookback window for pattern analysis.",
"description": "Lookback window sized to capture ≥30 done events.",
},
"weekly_cycle": {
"type": "array",
"items": {
"type": "object",
"properties": {
"dow": {"type": "integer"},
"strength": {"type": "number"},
"sample": {"type": "string"},
},
},
"default": [],
"description": "Per-DOW completion strength (peak-to-mean ratio).",
},
"daily_cycle": {
"type": "array",
"items": {
"type": "object",
"properties": {
"hour": {"type": "integer"},
"strength": {"type": "number"},
},
},
"default": [],
"description": "Per-hour completion strength (peak-to-mean ratio).",
},
},
},
@@ -46,15 +136,45 @@ MANIFEST = AgentManifest(
ttl_sec=86_400,
inferred_params=[
InferredParam(
key="window_days",
ttl_sec=86_400, # recompute daily alongside snippet
key="lookback_days",
ttl_sec=86_400,
cold_start_default=7,
min_history=5,
infer=_infer_window_days,
infer=_infer_lookback_days,
),
InferredParam(
key="weekly_cycle",
ttl_sec=86_400,
cold_start_default=[],
min_history=21, # need ≥3 weeks to see a weekly signal
infer=_infer_weekly_cycle,
),
InferredParam(
key="daily_cycle",
ttl_sec=86_400,
cold_start_default=[],
min_history=14,
infer=_infer_daily_cycle,
),
],
)
_STRENGTH_THRESHOLD = 0.5
def _strong(entries: list[dict], key: str) -> list[dict]:
return [e for e in entries if e.get("strength", 0) > _STRENGTH_THRESHOLD]
def _hour_label(hour: int) -> str:
if hour == 0:
return "midnight"
if hour < 12:
return f"{hour}am"
if hour == 12:
return "noon"
return f"{hour - 12}pm"
class RecentPatternsAgent(BaseAgent):
"""Surfaces the user's reaction pattern from recent feedback."""
@@ -63,8 +183,15 @@ class RecentPatternsAgent(BaseAgent):
version: ClassVar[str] = MANIFEST.version
def compute(self, inp: AgentInput) -> AgentOutput:
window_days = max(1, int(inp.agent_prefs.get("window_days", 7)))
window_s = window_days * 86_400
# Support legacy window_days pref key for backward compat.
lookback_days = max(
1,
int(inp.agent_prefs.get("lookback_days", inp.agent_prefs.get("window_days", 7))),
)
weekly_cycle: list[dict] = inp.agent_prefs.get("weekly_cycle", [])
daily_cycle: list[dict] = inp.agent_prefs.get("daily_cycle", [])
window_s = lookback_days * 86_400
now_ts = inp.now.timestamp()
recent = [
@@ -76,16 +203,18 @@ class RecentPatternsAgent(BaseAgent):
total = len(recent)
dwell_ms = inp.profile.get("mean_dwell_ms_30d")
parts: list[str] = []
if total == 0:
prompt = f"No tip reactions recorded in the last {window_days} days."
parts.append(f"No tip reactions recorded in the last {lookback_days} days.")
else:
done = counts.get("done", 0)
dismissed = counts.get("dismiss", 0)
snoozed = counts.get("snooze", 0)
parts = [
f"Last {window_days} days: {total} tip reaction{'s' if total != 1 else ''}"
parts.append(
f"Last {lookback_days} days: {total} tip reaction{'s' if total != 1 else ''}"
f"{done} completed, {dismissed} dismissed, {snoozed} snoozed."
]
)
if dwell_ms is not None:
dwell_s = round(dwell_ms / 1000)
if dwell_s < 15:
@@ -98,13 +227,34 @@ class RecentPatternsAgent(BaseAgent):
parts.append(
f"Average dwell {dwell_s}s — user deliberates; prefer tips that reward reflection."
)
prompt = " ".join(parts)
# Cycle hints — only when strength > threshold.
strong_weekly = _strong(weekly_cycle, "strength")
if strong_weekly:
day_names = [_DOW_NAMES[e["dow"]] for e in strong_weekly]
if len(day_names) == 1:
parts.append(f"User tends to complete tips on {day_names[0]}s.")
else:
joined = ", ".join(day_names[:-1]) + f" and {day_names[-1]}"
parts.append(f"User tends to complete tips on {joined}s.")
strong_daily = _strong(daily_cycle, "strength")
if strong_daily:
hour_labels = [_hour_label(e["hour"]) for e in strong_daily]
if len(hour_labels) == 1:
parts.append(f"User is most active around {hour_labels[0]}.")
else:
joined = ", ".join(hour_labels[:-1]) + f" and {hour_labels[-1]}"
parts.append(f"User is most active around {joined}.")
prompt = " ".join(parts) if parts else "No engagement data available yet."
snapshot = {
"window_days": window_days,
"lookback_days": lookback_days,
"recent_total": total,
"action_counts": dict(counts),
"mean_dwell_ms_30d": dwell_ms,
"strong_weekly_days": [e["dow"] for e in strong_weekly],
"strong_daily_hours": [e["hour"] for e in strong_daily],
}
return self._make_output(inp, prompt, snapshot)

View File

@@ -153,7 +153,8 @@ class TestTimeOfDayAgent:
def test_snapshot_keys(self):
out = self.agent.compute(_inp())
assert {"hour", "day_of_week", "preferred_hour", "quiet_start", "quiet_end"} == set(out.signals_snapshot)
assert {"hour", "day_of_week", "preferred_hour", "quiet_start", "quiet_end",
"peak_hours", "in_quiet", "in_peak", "tz"} == set(out.signals_snapshot)
# ── RecentPatternsAgent ───────────────────────────────────────────────────────
@@ -239,11 +240,13 @@ class TestFocusAreaAgent:
def test_default_project_fallback(self):
out = self.agent.compute(_inp(tasks=[_task("No project task")]))
assert "default project" in out.prompt_text
# Tasks without project_id fall back to a "Tasks" bucket
assert "Tasks" in out.prompt_text
def test_snapshot_keys(self):
out = self.agent.compute(_inp(tasks=[_task("T1", project_id="A")]))
assert {"top_project", "top_task_count", "top_overdue_count", "project_count", "preferred_areas"} == set(out.signals_snapshot)
assert {"top_cluster_label", "top_task_count", "top_overdue_count", "cluster_count",
"strategy", "preferred_areas"} == set(out.signals_snapshot)
# ── Registry ─────────────────────────────────────────────────────────────────

View File

@@ -0,0 +1,135 @@
"""Unit tests for ml.agents.clustering (issue #97).
Embedding calls are mocked so tests run without Ollama.
"""
from __future__ import annotations
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
from unittest.mock import patch
from ml.agents.clustering import cluster_tasks, Cluster, _greedy_cluster, _cosine
# ── helpers ──────────────────────────────────────────────────────────────────
def _task(content: str, project_id: str | None = None, is_overdue: bool = False) -> dict:
t: dict = {"content": content, "is_overdue": is_overdue}
if project_id:
t["project_id"] = project_id
return t
def _embed_seq(*vecs):
"""Return a side_effect list so successive _embed calls return these vectors."""
return list(vecs)
# ── Cluster dataclass ─────────────────────────────────────────────────────────
class TestCluster:
def test_task_count(self):
c = Cluster(label="X", tasks=[_task("a"), _task("b")])
assert c.task_count == 2
def test_overdue_count(self):
c = Cluster(label="X", tasks=[_task("a", is_overdue=True), _task("b")])
assert c.overdue_count == 1
# ── cosine similarity ─────────────────────────────────────────────────────────
class TestCosine:
def test_identical_vectors(self):
v = [1.0, 0.0, 0.0]
assert _cosine(v, v) == 1.0
def test_orthogonal_vectors(self):
assert _cosine([1.0, 0.0], [0.0, 1.0]) == 0.0
def test_zero_vector(self):
assert _cosine([0.0, 0.0], [1.0, 0.0]) == 0.0
# ── greedy clustering ─────────────────────────────────────────────────────────
class TestGreedyClustering:
def _similar_vec(self, base: list[float], noise: float = 0.01) -> list[float]:
return [x + noise for x in base]
def test_similar_tasks_grouped(self):
v = [1.0, 0.0, 0.0]
v2 = [0.999, 0.001, 0.0]
items = [
(_task("A"), v),
(_task("B"), v2),
]
clusters = _greedy_cluster(items)
assert len(clusters) == 1
assert clusters[0].task_count == 2
def test_dissimilar_tasks_separate(self):
v1 = [1.0, 0.0, 0.0]
v2 = [0.0, 1.0, 0.0]
items = [(_task("A"), v1), (_task("B"), v2)]
clusters = _greedy_cluster(items)
assert len(clusters) == 2
def test_label_from_first_task(self):
v = [1.0, 0.0]
clusters = _greedy_cluster([(_task("Write report"), v)])
assert clusters[0].label == "Write report"
# ── cluster_tasks integration ─────────────────────────────────────────────────
class TestClusterTasks:
def test_empty_tasks(self):
result = cluster_tasks([])
assert result == []
def test_fallback_when_ollama_unavailable(self):
with patch("ml.agents.clustering._embed", return_value=None):
tasks = [_task("A", "p1"), _task("B", "p2"), _task("C", "p1")]
clusters = cluster_tasks(tasks)
assert len(clusters) == 2
labels = {c.label for c in clusters}
assert "p1" in labels and "p2" in labels
def test_fallback_groups_by_project(self):
with patch("ml.agents.clustering._embed", return_value=None):
tasks = [_task("A", "work")] * 3 + [_task("B", "home")] * 2
clusters = cluster_tasks(tasks)
by_label = {c.label: c.task_count for c in clusters}
assert by_label["work"] == 3
assert by_label["home"] == 2
def test_tasks_without_content_go_to_other(self):
v = [1.0, 0.0]
with patch("ml.agents.clustering._embed", return_value=v):
tasks = [_task("Has content"), {"is_overdue": False}]
clusters = cluster_tasks(tasks)
labels = {c.label for c in clusters}
assert "Other tasks" in labels
def test_semantic_clustering_groups_similar(self):
v_work = [1.0, 0.0, 0.0]
v_home = [0.0, 1.0, 0.0]
side_effects = [v_work, v_work, v_home, v_home]
with patch("ml.agents.clustering._embed", side_effect=side_effects):
tasks = [
_task("Write report"),
_task("Review PR"),
_task("Buy groceries"),
_task("Cook dinner"),
]
clusters = cluster_tasks(tasks)
assert len(clusters) == 2
assert all(c.task_count == 2 for c in clusters)
def test_all_tasks_no_content_fallback_by_project(self):
tasks = [{"project_id": "p1", "is_overdue": False},
{"project_id": "p2", "is_overdue": False}]
clusters = cluster_tasks(tasks)
assert len(clusters) == 2

View File

@@ -113,7 +113,7 @@ class TestTimeOfDayAgentWithInference:
assert "peak" in out.prompt_text
def test_version_bumped(self):
assert MANIFEST.version == "1.1.0"
assert MANIFEST.version == "1.2.0"
def test_manifest_has_preferred_hour_param(self):
keys = {p.key for p in MANIFEST.inferred_params}

View File

@@ -1,5 +1,5 @@
"""Per-agent inference tests: momentum (#114), overdue-task (#115), recent-patterns (#116),
and focus-area (#113) preferred_areas wiring."""
time-of-day (#112), and focus-area (#113) preferred_areas wiring."""
from __future__ import annotations
import sys, os
@@ -8,11 +8,12 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
from datetime import datetime, timezone
import pytest
from ml.agents.inference.history import FeedbackEvent, UserHistory
from ml.agents.inference.history import FeedbackEvent, TaskCompletion, UserHistory
from ml.agents.inference.framework import run_inference
from ml.agents.momentum import MomentumAgent, MANIFEST as MOMENTUM_MANIFEST
from ml.agents.overdue_task import OverdueTaskAgent, MANIFEST as OVERDUE_MANIFEST
from ml.agents.recent_patterns import RecentPatternsAgent, MANIFEST as RECENT_MANIFEST
from ml.agents.time_of_day import TimeOfDayAgent, MANIFEST as TOD_MANIFEST
from ml.agents.focus_area import FocusAreaAgent
from ml.agents.base import AgentInput
@@ -32,23 +33,47 @@ def _event(action: str, days_ago: float = 1.0) -> FeedbackEvent:
return FeedbackEvent(action=action, dwell_ms=dwell, created_at=ts)
def _history(*events: FeedbackEvent) -> UserHistory:
return UserHistory(user_id="u1", events=list(events))
def _history(*events: FeedbackEvent, completions: list[TaskCompletion] | None = None) -> UserHistory:
return UserHistory(user_id="u1", events=list(events), task_completions=completions or [])
# ── momentum: engagement_trend ───────────────────────────────────────────────
def _completion(project_id: str | None, lateness_days: float) -> TaskCompletion:
"""Build a TaskCompletion where completed_at is lateness_days after due_at."""
from datetime import timedelta
due = _NOW - timedelta(days=30)
completed = due + timedelta(days=lateness_days)
return TaskCompletion(
project_id=project_id,
completed_at=completed.isoformat(),
due_at=due.isoformat(),
)
class TestMomentumInference:
# ── momentum helpers ─────────────────────────────────────────────────────────
def _neutral_prefs(**extra) -> dict:
"""Prefs that put z-score in the normal range so trend label can show."""
return {"baseline_completions_per_day": 0.0, "stdev": 1.0, "momentum_window": 7, **extra}
def _feedback_done(n: int, days_ago: float = 1.0) -> list[dict]:
from datetime import timedelta
ts = (_NOW - timedelta(days=days_ago)).isoformat()
return [{"action": "done", "dwell_ms": 60_000, "created_at": ts}] * n
# ── momentum: engagement_trend inference ─────────────────────────────────────
class TestMomentumTrendInference:
def test_cold_start_below_min_history(self):
history = _history(*[_event("done", days_ago=i) for i in range(5)])
result = run_inference(MOMENTUM_MANIFEST, history)
assert result["engagement_trend"] == "stable" # cold_start_default
def test_trend_up_when_recent_done_rate_higher(self):
# 8 done in last 7 days, 1 done in prior 7 days → trending up
recent = [_event("done", days_ago=i) for i in range(1, 9)]
older = [_event("dismiss", days_ago=i) for i in range(8, 15)]
older[0] = _event("done", days_ago=8) # one done in older window
older[0] = _event("done", days_ago=8)
history = _history(*recent, *older)
result = run_inference(MOMENTUM_MANIFEST, history)
assert result["engagement_trend"] == "up"
@@ -66,113 +91,540 @@ class TestMomentumInference:
result = run_inference(MOMENTUM_MANIFEST, history)
assert result["engagement_trend"] == "stable"
def test_agent_uses_trend_in_snippet(self):
out = MomentumAgent().compute(_inp(agent_prefs={"engagement_trend": "up"}))
def test_trend_shown_when_z_score_normal(self):
# baseline=0 so z≈0 → no z label → trend label falls through
out = MomentumAgent().compute(_inp(agent_prefs=_neutral_prefs(engagement_trend="up")))
assert "trending up" in out.prompt_text
def test_agent_uses_down_trend_in_snippet(self):
out = MomentumAgent().compute(_inp(agent_prefs={"engagement_trend": "down"}))
def test_trend_down_shown_when_z_score_normal(self):
out = MomentumAgent().compute(_inp(agent_prefs=_neutral_prefs(engagement_trend="down")))
assert "trending down" in out.prompt_text
def test_snapshot_includes_trend(self):
out = MomentumAgent().compute(_inp(agent_prefs={"engagement_trend": "stable"}))
out = MomentumAgent().compute(_inp(agent_prefs=_neutral_prefs(engagement_trend="stable")))
assert "engagement_trend" in out.signals_snapshot
# ── momentum: baseline + stdev inference (#114) ───────────────────────────────
class TestMomentumBaselineInference:
def _events_n_per_day(self, done_per_day: int, n_days: int) -> list[FeedbackEvent]:
"""Generate done events spread across n_days."""
events = []
for d in range(n_days):
for _ in range(done_per_day):
events.append(_event("done", days_ago=d + 0.5))
return events
def test_cold_start_when_few_events(self):
history = _history(*[_event("done", days_ago=i) for i in range(5)])
result = run_inference(MOMENTUM_MANIFEST, history)
assert result["baseline_completions_per_day"] == 1.0
assert result["stdev"] == 1.0
def test_power_user_baseline_high(self):
# 5 done events per day for 20 days → baseline ≈ 5/day (over 28d window, zeros fill rest)
events = self._events_n_per_day(5, 20)
history = _history(*events)
result = run_inference(MOMENTUM_MANIFEST, history)
assert result["baseline_completions_per_day"] > 2.0
def test_casual_user_baseline_low(self):
# 1 done every 3 days + dismiss filler to clear min_history=14 → baseline ≈ 0.33/day
done_events = [_event("done", days_ago=d * 3 + 0.5) for d in range(7)]
filler = [_event("dismiss", days_ago=d + 0.5) for d in range(10)]
history = _history(*done_events, *filler)
result = run_inference(MOMENTUM_MANIFEST, history)
assert result["baseline_completions_per_day"] < 0.5
def test_stdev_reflects_variability(self):
# Alternating 0 and 4 done events → high stdev
events = []
for d in range(14):
if d % 2 == 0:
for _ in range(4):
events.append(_event("done", days_ago=d + 0.5))
history = _history(*events)
result = run_inference(MOMENTUM_MANIFEST, history)
assert result["stdev"] > 1.0
def test_consistent_user_lower_stdev_than_variable(self):
# Consistent 2/day for 28 days has lower stdev than alternating 0/4
consistent = self._events_n_per_day(2, 28)
variable = []
for d in range(14):
if d % 2 == 0:
for _ in range(4):
variable.append(_event("done", days_ago=d + 0.5))
else:
variable.append(_event("dismiss", days_ago=d + 0.5))
r_consistent = run_inference(MOMENTUM_MANIFEST, _history(*consistent))
r_variable = run_inference(MOMENTUM_MANIFEST, _history(*variable))
assert r_consistent["stdev"] < r_variable["stdev"]
# ── momentum: z-score snippet language ───────────────────────────────────────
class TestMomentumZScore:
def _prefs(self, baseline: float, stdev: float = 1.0) -> dict:
return {"baseline_completions_per_day": baseline, "stdev": stdev,
"momentum_window": 7, "engagement_trend": "stable"}
def test_power_user_above_baseline_says_above_usual(self):
# baseline=3/day, stdev=1.0, window=7 → expected rate=3; user did 35 → rate=5, z=2
prefs = self._prefs(baseline=3.0, stdev=1.0)
feedback = _feedback_done(35, days_ago=1.0)
out = MomentumAgent().compute(_inp(feedback_history=feedback, agent_prefs=prefs))
assert "above your usual" in out.prompt_text
def test_casual_user_slowing_down(self):
# baseline=1/day, user did 0 in 7d → z = (0 - 1) / 1 = -1 → below usual
prefs = self._prefs(baseline=1.0, stdev=1.0)
out = MomentumAgent().compute(_inp(feedback_history=[], agent_prefs=prefs))
assert "below your usual" in out.prompt_text
def test_returning_from_break_at_normal_rate(self):
# User just came back: 1 done, baseline=1/day, window=7 → z=(1/7-1)/1≈-0.86, within normal
prefs = self._prefs(baseline=1.0, stdev=1.0)
feedback = _feedback_done(1, days_ago=0.5)
out = MomentumAgent().compute(_inp(feedback_history=feedback, agent_prefs=prefs))
# z ≈ -0.86 → no z label, falls back to trend (stable → no extra sentence)
assert "above your usual" not in out.prompt_text
assert "below your usual" not in out.prompt_text
def test_snapshot_includes_z_score(self):
prefs = self._prefs(baseline=1.0)
out = MomentumAgent().compute(_inp(agent_prefs=prefs))
assert "z_score" in out.signals_snapshot
assert "recent_done_count" in out.signals_snapshot
def test_version_bumped(self):
assert MOMENTUM_MANIFEST.version == "1.1.0"
assert MOMENTUM_MANIFEST.version == "1.2.0"
# ── overdue-task: lateness_tolerance_days ────────────────────────────────────
# ── overdue-task: lateness_tolerance_days + project_realness (#115) ──────────
class TestOverdueTaskInference:
def test_cold_start_returns_zero(self):
history = _history(*[_event("done") for _ in range(5)])
result = run_inference(OVERDUE_MANIFEST, history)
assert result["lateness_tolerance_days"] == 0
# -- lateness_tolerance_days inference --
def test_high_snooze_rate_returns_two(self):
events = [_event("snooze")] * 8 + [_event("done")] * 2
history = _history(*events)
def test_cold_start_returns_zero_when_few_completions(self):
# Below min_history=10 task completions → cold start
cs = [_completion("p1", 2.0) for _ in range(5)]
history = _history(*[_event("done")] * 5, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history)
assert result["lateness_tolerance_days"] == 2
assert result["lateness_tolerance_days"] == 0.0
def test_moderate_snooze_returns_one(self):
events = [_event("snooze")] * 3 + [_event("done")] * 7
history = _history(*events)
def test_punctual_user_zero_tolerance(self):
# User always finishes early or on time (negative lateness) → tolerance 0
cs = [_completion("p1", -1.0) for _ in range(12)]
history = _history(*[_event("done")] * 12, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history)
assert result["lateness_tolerance_days"] == 1
assert result["lateness_tolerance_days"] == 0.0
def test_low_snooze_returns_zero(self):
events = [_event("done")] * 9 + [_event("snooze")] * 1
history = _history(*events)
def test_chronic_late_user_positive_tolerance(self):
# User consistently finishes 5 days late → p50 = 5
cs = [_completion("p1", 5.0) for _ in range(12)]
history = _history(*[_event("done")] * 12, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history)
assert result["lateness_tolerance_days"] == 0
assert result["lateness_tolerance_days"] == pytest.approx(5.0)
def test_mixed_lateness_uses_median(self):
# 6 tasks at +1d, 6 tasks at +3d → median = 2
cs = [_completion("p1", 1.0)] * 6 + [_completion("p1", 3.0)] * 6
history = _history(*[_event("done")] * 12, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history)
assert result["lateness_tolerance_days"] == pytest.approx(2.0)
# -- project_realness inference --
def test_project_realness_cold_start_empty(self):
cs = [_completion("p1", 1.0) for _ in range(5)] # below min_history
history = _history(*[_event("done")] * 5, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history)
assert result["project_realness"] == {}
def test_project_realness_punctual_project_scores_high(self):
# p1 always on time (0d late), p2 always 10d late → p1 should be realness ≈ 1
cs = [_completion("p1", 0.0)] * 6 + [_completion("p2", 10.0)] * 6
history = _history(*[_event("done")] * 12, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history)
assert result["project_realness"]["p1"] > result["project_realness"]["p2"]
def test_project_realness_values_clipped_01(self):
cs = [_completion("p1", 0.0)] * 6 + [_completion("p2", 100.0)] * 6
history = _history(*[_event("done")] * 12, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history)
for v in result["project_realness"].values():
assert 0.0 <= v <= 1.0
# -- compute() reads inferred prefs --
def test_tolerance_filters_tasks(self):
tasks = [
{"content": "Fresh overdue", "is_overdue": True, "task_age_days": 0.5},
{"content": "Old overdue", "is_overdue": True, "task_age_days": 3.0},
]
# tolerance=2 → only the 3-day task should count
out = OverdueTaskAgent().compute(_inp(tasks=tasks, agent_prefs={"lateness_tolerance_days": 2}))
assert "1 overdue task" in out.prompt_text
assert "Old overdue" in out.prompt_text
def test_snapshot_includes_tolerance(self):
tasks = [{"content": "T", "is_overdue": True, "task_age_days": 1.0}]
out = OverdueTaskAgent().compute(_inp(tasks=tasks, agent_prefs={"lateness_tolerance_days": 0}))
assert "lateness_tolerance_days" in out.signals_snapshot
def test_low_realness_softens_language(self):
tasks = [{"content": "Wishlist", "is_overdue": True, "task_age_days": 3.0,
"project_id": "aspirational"}]
prefs = {"lateness_tolerance_days": 0, "project_realness": {"aspirational": 0.2}}
out = OverdueTaskAgent().compute(_inp(tasks=tasks, agent_prefs=prefs))
assert "target date" in out.prompt_text
def test_high_realness_uses_overdue_language(self):
tasks = [{"content": "Critical", "is_overdue": True, "task_age_days": 3.0,
"project_id": "work"}]
prefs = {"lateness_tolerance_days": 0, "project_realness": {"work": 0.9}}
out = OverdueTaskAgent().compute(_inp(tasks=tasks, agent_prefs=prefs))
assert "overdue" in out.prompt_text
def test_snapshot_includes_realness(self):
tasks = [{"content": "T", "is_overdue": True, "task_age_days": 1.0, "project_id": "p1"}]
prefs = {"lateness_tolerance_days": 0, "project_realness": {"p1": 0.8}}
out = OverdueTaskAgent().compute(_inp(tasks=tasks, agent_prefs=prefs))
assert "realness" in out.signals_snapshot["top_overdue"][0]
def test_version_bumped(self):
assert OVERDUE_MANIFEST.version == "1.1.0"
assert OVERDUE_MANIFEST.version == "1.2.0"
# ── recent-patterns: window_days ─────────────────────────────────────────────
# ── recent-patterns: lookback_days + weekly_cycle + daily_cycle (#116) ────────
class TestRecentPatternsInference:
def test_cold_start_default_7(self):
history = _history(*[_event("done") for _ in range(3)]) # below min_history=5
result = run_inference(RECENT_MANIFEST, history)
assert result["window_days"] == 7 # cold_start_default
def test_sparse_history_widens_window(self):
history = _history(*[_event("done") for _ in range(5)]) # 5 events, n < 7 → 30 days
result = run_inference(RECENT_MANIFEST, history)
assert result["window_days"] == 30
def test_moderate_history_14_days(self):
history = _history(*[_event("done") for _ in range(10)]) # 7 ≤ n < 14 → 14 days
result = run_inference(RECENT_MANIFEST, history)
assert result["window_days"] == 14
def test_dense_history_stays_7(self):
history = _history(*[_event("done") for _ in range(20)]) # 20+ → 7 days
result = run_inference(RECENT_MANIFEST, history)
assert result["window_days"] == 7
def test_agent_uses_window_days_pref(self):
def _done_at(days_ago: float, hour: int = 10) -> FeedbackEvent:
"""Done event at a specific hour, N days ago."""
from datetime import timedelta
ts = (_NOW - timedelta(days=days_ago)).replace(hour=hour, minute=0, second=0, microsecond=0)
return FeedbackEvent(action="done", dwell_ms=60_000, created_at=ts.isoformat())
class TestRecentPatternsLookbackInference:
def test_cold_start_below_min_history(self):
history = _history(*[_event("done") for _ in range(3)])
result = run_inference(RECENT_MANIFEST, history)
assert result["lookback_days"] == 7 # cold_start_default
def test_sparse_done_history_returns_30(self):
# Only 10 done events → fewer than 30 → returns cap of 30
history = _history(*[_event("done") for _ in range(10)])
result = run_inference(RECENT_MANIFEST, history)
assert result["lookback_days"] == 30
def test_dense_done_history_returns_short_window(self):
# 30 done events all within the last 2 days → lookback_days = 1 or 2
events = [_event("done", days_ago=i * 0.05) for i in range(30)]
history = _history(*events)
result = run_inference(RECENT_MANIFEST, history)
assert result["lookback_days"] <= 2
def test_spread_history_spans_window_correctly(self):
# 30 done events spread over 15 days (1 per 0.5d) → window should be ≈15
events = [_event("done", days_ago=i * 0.5) for i in range(30)]
history = _history(*events)
result = run_inference(RECENT_MANIFEST, history)
assert result["lookback_days"] <= 16
def test_agent_respects_lookback_days_pref(self):
from datetime import timedelta
# 5 feedback events, all within 14 days but older than 7 days
feedback = [
{"action": "done", "dwell_ms": 60000,
"created_at": (_NOW - timedelta(days=10)).isoformat()}
] * 5
# With window_days=7 → 0 events seen; with window_days=14 → 5 events
out_narrow = RecentPatternsAgent().compute(
_inp(feedback_history=feedback, agent_prefs={"window_days": 7})
_inp(feedback_history=feedback, agent_prefs={"lookback_days": 7})
)
out_wide = RecentPatternsAgent().compute(
_inp(feedback_history=feedback, agent_prefs={"window_days": 14})
_inp(feedback_history=feedback, agent_prefs={"lookback_days": 14})
)
assert "No tip reactions" in out_narrow.prompt_text
assert "5 tip reactions" in out_wide.prompt_text
def test_snapshot_includes_window_days(self):
out = RecentPatternsAgent().compute(_inp(agent_prefs={"window_days": 14}))
assert out.signals_snapshot["window_days"] == 14
def test_legacy_window_days_pref_still_works(self):
from datetime import timedelta
feedback = [
{"action": "done", "dwell_ms": 60000,
"created_at": (_NOW - timedelta(days=10)).isoformat()}
] * 5
out = RecentPatternsAgent().compute(
_inp(feedback_history=feedback, agent_prefs={"window_days": 14})
)
assert "5 tip reactions" in out.prompt_text
def test_snapshot_includes_lookback_days(self):
out = RecentPatternsAgent().compute(_inp(agent_prefs={"lookback_days": 14}))
assert out.signals_snapshot["lookback_days"] == 14
class TestRecentPatternsWeeklyCycle:
def test_cold_start_returns_empty(self):
history = _history(*[_event("done") for _ in range(5)]) # below min_history=21
result = run_inference(RECENT_MANIFEST, history)
assert result["weekly_cycle"] == []
def _events_on_dow(self, target_dow: int, count: int, n_weeks: int = 4) -> list[FeedbackEvent]:
"""Generate `count` done events per week on `target_dow` (0=Mon…6=Sun).
_NOW is Thursday (weekday=3). days_back = (now_dow - target_dow) % 7
gives the offset to the most recent occurrence of target_dow.
"""
now_dow = _NOW.weekday() # 3 = Thursday
days_back = (now_dow - target_dow) % 7
if days_back == 0:
days_back = 7 # avoid "today" — use the previous occurrence
events = []
for week in range(n_weeks):
offset = days_back + week * 7
for _ in range(count):
events.append(_done_at(offset + 0.1, hour=11))
return events
def _weekend_warrior_history(self) -> UserHistory:
"""Many done events on Sat/Sun (dow 5 & 6), few on Tuesday (dow 1)."""
events = []
events += self._events_on_dow(5, count=5) # Saturday
events += self._events_on_dow(6, count=5) # Sunday
events += self._events_on_dow(1, count=1) # Tuesday — one per week
return _history(*events)
def test_weekend_warrior_strong_on_weekends(self):
history = self._weekend_warrior_history()
result = run_inference(RECENT_MANIFEST, history)
by_dow = {e["dow"]: e["strength"] for e in result["weekly_cycle"]}
assert by_dow.get(5, 0) > 1.0 # Saturday
assert by_dow.get(6, 0) > 1.0 # Sunday
def test_weekday_only_low_weekend_strength(self):
events = []
for dow in range(5): # MondayFriday
events += self._events_on_dow(dow, count=3)
# Saturday (5) and Sunday (6) get zero events
history = _history(*events)
result = run_inference(RECENT_MANIFEST, history)
by_dow = {e["dow"]: e["strength"] for e in result["weekly_cycle"]}
assert by_dow.get(5, 0) == 0.0 # Saturday
assert by_dow.get(6, 0) == 0.0 # Sunday
def test_snippet_includes_cycle_hint_when_strong(self):
# Inject a strong weekly_cycle pref directly
prefs = {
"lookback_days": 7,
"weekly_cycle": [{"dow": 1, "strength": 2.0, "sample": "completes most Tuesdays"}],
"daily_cycle": [],
}
out = RecentPatternsAgent().compute(_inp(agent_prefs=prefs))
assert "Tuesday" in out.prompt_text
def test_snippet_omits_cycle_hint_when_weak(self):
prefs = {
"lookback_days": 7,
"weekly_cycle": [{"dow": 1, "strength": 0.3, "sample": "completes most Tuesdays"}],
"daily_cycle": [],
}
out = RecentPatternsAgent().compute(_inp(agent_prefs=prefs))
assert "Tuesday" not in out.prompt_text
class TestRecentPatternsDailyCycle:
def test_cold_start_returns_empty(self):
history = _history(*[_event("done") for _ in range(5)]) # below min_history=14
result = run_inference(RECENT_MANIFEST, history)
assert result["daily_cycle"] == []
def _evening_person_history(self) -> UserHistory:
"""Many done events at 20:0021:00, few in the morning."""
events = []
for d in range(20):
for _ in range(4):
events.append(_done_at(d + 0.5, hour=20))
events.append(_done_at(d + 0.5, hour=9))
return _history(*events)
def test_evening_person_strong_at_evening_hours(self):
history = self._evening_person_history()
result = run_inference(RECENT_MANIFEST, history)
by_hour = {e["hour"]: e["strength"] for e in result["daily_cycle"]}
assert by_hour.get(20, 0) > 1.0
assert by_hour.get(9, 0) < by_hour.get(20, 0)
def test_snippet_includes_daily_hint_when_strong(self):
prefs = {
"lookback_days": 7,
"weekly_cycle": [],
"daily_cycle": [{"hour": 20, "strength": 3.0}],
}
out = RecentPatternsAgent().compute(_inp(agent_prefs=prefs))
assert "8pm" in out.prompt_text
def test_snippet_omits_daily_hint_when_weak(self):
prefs = {
"lookback_days": 7,
"weekly_cycle": [],
"daily_cycle": [{"hour": 20, "strength": 0.4}],
}
out = RecentPatternsAgent().compute(_inp(agent_prefs=prefs))
assert "8pm" not in out.prompt_text
def test_no_pattern_user_no_hints(self):
# Uniform distribution across all hours → strength ≈ 1.0 everywhere → no strong peaks
events = [_done_at(d + 0.5, hour=h) for d in range(3) for h in range(24)]
history = _history(*events)
result = run_inference(RECENT_MANIFEST, history)
strong = [e for e in result["daily_cycle"] if e["strength"] > 0.5]
# Uniform distribution → all strengths ≈ 1.0; but none dramatically above threshold
# Since strength = count/mean and all counts are equal, all = 1.0 exactly
# 1.0 is not > 0.5 threshold in snippet rendering, but IS > 0.5 so they'd show.
# For a flat distribution the caller sees no meaningful peak — verify no strength > 2
assert all(e["strength"] <= 1.1 for e in result["daily_cycle"])
def test_version_bumped(self):
assert RECENT_MANIFEST.version == "1.1.0"
assert RECENT_MANIFEST.version == "1.2.0"
# ── time-of-day: quiet_start/end + peak_hours inference (#112) ───────────────
def _tod_event(action: str, hour: int, days_ago: float = 1.0) -> FeedbackEvent:
"""Feedback event at a specific hour N days ago."""
from datetime import timedelta
dt = (_NOW - timedelta(days=days_ago)).replace(hour=hour, minute=0, second=0, microsecond=0)
return FeedbackEvent(action=action, dwell_ms=60_000, created_at=dt.isoformat())
def _tod_history(*events: FeedbackEvent) -> UserHistory:
return UserHistory(user_id="u1", events=list(events))
class TestTimeOfDayQuietWindow:
def test_cold_start_below_min_history(self):
history = _tod_history(*[_tod_event("done", 10) for _ in range(10)])
result = run_inference(TOD_MANIFEST, history)
assert result["quiet_start"] == "22:00"
assert result["quiet_end"] == "07:00"
def _night_owl_history(self) -> UserHistory:
"""Active 20:0023:00, quiet 02:0014:00."""
events = []
for d in range(10):
for h in [20, 21, 22, 23, 0, 1]:
events.append(_tod_event("done", h, days_ago=d + 0.5))
# Sparse during day
events.append(_tod_event("done", 15, days_ago=d + 0.5))
return _tod_history(*events)
def _early_bird_history(self) -> UserHistory:
"""Active 06:0010:00, quiet 21:0005:00."""
events = []
for d in range(10):
for h in [6, 7, 8, 9, 10]:
events.append(_tod_event("done", h, days_ago=d + 0.5))
events.append(_tod_event("done", 14, days_ago=d + 0.5))
return _tod_history(*events)
def test_early_bird_quiet_in_evening(self):
history = self._early_bird_history()
result = run_inference(TOD_MANIFEST, history)
# Quiet window should be in the evening/night range
start_h = int(result["quiet_start"].split(":")[0])
end_h = int(result["quiet_end"].split(":")[0])
# Quiet window spans from some evening hour into morning
assert start_h >= 18 or end_h <= 10 # covers night
def test_quiet_window_wraps_midnight(self):
# Night owl: heavy activity in evening, quiet 02:0014:00
history = self._night_owl_history()
result = run_inference(TOD_MANIFEST, history)
start_h = int(result["quiet_start"].split(":")[0])
end_h = int(result["quiet_end"].split(":")[0])
# The quiet window should span across midnight or be in daylight
# (start > end means wraps midnight)
is_wrapping = start_h > end_h
is_daytime = 2 <= start_h <= 14
assert is_wrapping or is_daytime
def test_format_is_hhmm(self):
history = self._early_bird_history()
result = run_inference(TOD_MANIFEST, history)
import re
assert re.match(r"^\d{2}:00$", result["quiet_start"])
assert re.match(r"^\d{2}:00$", result["quiet_end"])
class TestTimeOfDayPeakHours:
def _evening_person_history(self, n: int = 60) -> UserHistory:
"""Heavy done events at 19:00 and 20:00, light elsewhere."""
events = []
for i in range(n):
events.append(_tod_event("done", 19, days_ago=i * 0.5))
events.append(_tod_event("done", 20, days_ago=i * 0.5))
events.append(_tod_event("done", 10, days_ago=i * 0.5)) # low volume
return _tod_history(*events)
def test_cold_start_returns_default(self):
history = _tod_history(*[_tod_event("done", 10) for _ in range(5)])
result = run_inference(TOD_MANIFEST, history)
assert result["peak_hours"] == [9, 14, 20]
def test_evening_person_peak_hours_in_evening(self):
history = self._evening_person_history()
result = run_inference(TOD_MANIFEST, history)
assert 19 in result["peak_hours"] or 20 in result["peak_hours"]
def test_peak_hours_sorted(self):
history = self._evening_person_history()
result = run_inference(TOD_MANIFEST, history)
assert result["peak_hours"] == sorted(result["peak_hours"])
def test_shift_worker_peaks_at_unusual_hours(self):
"""Shift worker active at 02:00 and 03:00."""
events = [_tod_event("done", h, days_ago=i * 0.5)
for i in range(30) for h in [2, 3]]
events += [_tod_event("done", 14, days_ago=i * 0.5) for i in range(5)]
history = _tod_history(*events)
result = run_inference(TOD_MANIFEST, history)
assert 2 in result["peak_hours"] or 3 in result["peak_hours"]
class TestTimeOfDaySnippet:
agent = TimeOfDayAgent()
def _inp_at(self, hour: int, **prefs) -> AgentInput:
from datetime import timedelta
now = _NOW.replace(hour=hour)
return _inp(now=now, agent_prefs=prefs)
def test_in_peak_hour_says_peak(self):
out = self.agent.compute(self._inp_at(20, peak_hours=[20]))
assert "peak productivity hour" in out.prompt_text
def test_approaching_peak_says_approaching(self):
out = self.agent.compute(self._inp_at(18, peak_hours=[20]))
assert "approaching" in out.prompt_text.lower()
def test_quiet_window_overrides_peak(self):
# Even if hour is in peak_hours, quiet window wins
out = self.agent.compute(
self._inp_at(23, quiet_start="22:00", quiet_end="07:00", peak_hours=[23])
)
assert "quiet window" in out.prompt_text
def test_tz_shown_when_not_utc(self):
out = self.agent.compute(self._inp_at(10, tz="Europe/Moscow"))
assert "Europe/Moscow" in out.prompt_text
def test_snapshot_includes_peak_and_quiet(self):
out = self.agent.compute(self._inp_at(10, peak_hours=[10], quiet_start="22:00", quiet_end="07:00"))
assert "peak_hours" in out.signals_snapshot
assert "in_quiet" in out.signals_snapshot
assert "in_peak" in out.signals_snapshot
def test_version_bumped(self):
assert TOD_MANIFEST.version == "1.2.0"
def test_manifest_has_new_params(self):
keys = {p.key for p in TOD_MANIFEST.inferred_params}
assert {"quiet_start", "quiet_end", "peak_hours", "tz"}.issubset(keys)
# ── focus-area: preferred_areas wiring ───────────────────────────────────────
@@ -210,4 +662,51 @@ class TestFocusAreaPreferredAreas:
def test_version_bumped(self):
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
assert FA_MANIFEST.version == "1.1.0"
assert FA_MANIFEST.version == "2.0.0"
def test_snapshot_uses_cluster_keys(self):
tasks = [self._task("T", "work")]
out = self.agent.compute(_inp(tasks=tasks))
assert "top_cluster_label" in out.signals_snapshot
assert "cluster_count" in out.signals_snapshot
assert "strategy" in out.signals_snapshot
# ── focus-area: preferred_areas inference from task_completions (#113) ────────
class TestFocusAreaPreferredAreasInference:
from ml.agents.focus_area import MANIFEST as _FA_MANIFEST
def _completion(self, project_id: str) -> TaskCompletion:
return _completion(project_id, lateness_days=0.0)
def test_cold_start_no_completions(self):
history = _history(completions=[])
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == []
def test_top_two_projects_returned(self):
completions = (
[_completion("p1", 0)] * 8
+ [_completion("p2", 0)] * 5
+ [_completion("p3", 0)] * 2
)
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["p1", "p2"]
def test_single_project_returns_one(self):
completions = [_completion("work", 0)] * 6
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["work"]
def test_none_project_id_ignored(self):
completions = [_completion(None, 0)] * 5 + [_completion("real", 0)] * 3
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["real"]

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import statistics
from collections import Counter
from typing import ClassVar
@@ -9,6 +10,9 @@ from .manifest import AgentManifest, InferredParam
_DOW_NAMES = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
# min_history required before quiet/peak inference is meaningful (issue #112)
_MIN_HISTORY = 50
def _infer_preferred_hour(history: UserHistory) -> int:
"""Mode hour of day across all 'done' feedback events; falls back to 9."""
@@ -18,9 +22,75 @@ def _infer_preferred_hour(history: UserHistory) -> int:
return Counter(done_hours).most_common(1)[0][0]
def _quiet_window_hours(history: UserHistory) -> tuple[int, int]:
"""Return (start_hour, end_hour) of the longest below-baseline quiet window.
Counts all engagement events by hour. Baseline = mean hourly count.
Finds the longest contiguous run of below-baseline hours on the circular
clock; that run defines the quiet window.
"""
by_hour: Counter[int] = Counter(e.hour for e in history.events)
total = sum(by_hour.values())
baseline = total / 24
# Mark each of the 24 hours as below-baseline (True = quiet)
quiet: list[bool] = [by_hour.get(h, 0) < baseline for h in range(24)]
# Find longest contiguous run in circular array
best_start, best_len = 0, 0
run_start, run_len = 0, 0
# Double the sequence to handle wrap-around
for i in range(48):
h = i % 24
if quiet[h]:
if run_len == 0:
run_start = i
run_len += 1
if run_len > best_len:
best_len = run_len
best_start = run_start
else:
run_len = 0
if best_len == 0:
return (22, 7) # fallback
start = best_start % 24
end = (best_start + best_len) % 24
return (start, end)
def _infer_quiet_start(history: UserHistory) -> str:
start, _ = _quiet_window_hours(history)
return f"{start:02d}:00"
def _infer_quiet_end(history: UserHistory) -> str:
_, end = _quiet_window_hours(history)
return f"{end:02d}:00"
def _infer_peak_hours(history: UserHistory) -> list[int]:
"""Top-quartile hours by done-event count.
Computes done_count per hour, then returns hours above the 75th percentile
of non-zero hourly counts, sorted ascending.
"""
done_by_hour: Counter[int] = Counter(
e.hour for e in history.events if e.action == "done"
)
if not done_by_hour:
return [9, 14, 20]
counts = list(done_by_hour.values())
threshold = statistics.quantiles(counts, n=4)[-1] # 75th percentile
return sorted(h for h, c in done_by_hour.items() if c >= threshold)
MANIFEST = AgentManifest(
id="time-of-day",
version="1.1.0", # bumped: inferred_params added (ADR-0014 §3, #112)
version="1.2.0", # #112: quiet_start/end + peak_hours + tz inference
description="Frames the current moment relative to the user's productive peak and quiet hours.",
pref_schema={
"type": "object",
@@ -36,6 +106,23 @@ MANIFEST = AgentManifest(
"pattern": "^([01][0-9]|2[0-3]):[0-5][0-9]$",
"description": "HH:MM end of quiet hours.",
},
"peak_hours": {
"type": "array",
"items": {"type": "integer", "minimum": 0, "maximum": 23},
"default": [9, 14, 20],
"description": "Hours (023) with top-quartile completion density.",
},
"tz": {
"type": "string",
"default": "UTC",
"description": "IANA timezone; populated from auth provider, fallback UTC.",
},
"preferred_hour": {
"type": "integer",
"minimum": 0,
"maximum": 23,
"description": "Mode done-hour (legacy; superseded by peak_hours).",
},
},
},
context_schema=["profile.features"],
@@ -45,11 +132,40 @@ MANIFEST = AgentManifest(
inferred_params=[
InferredParam(
key="preferred_hour",
ttl_sec=3_600, # recompute hourly
ttl_sec=3_600,
cold_start_default=None,
min_history=10, # need at least 10 feedback events to be meaningful
min_history=10,
infer=_infer_preferred_hour,
),
InferredParam(
key="quiet_start",
ttl_sec=86_400,
cold_start_default="22:00",
min_history=_MIN_HISTORY,
infer=_infer_quiet_start,
),
InferredParam(
key="quiet_end",
ttl_sec=86_400,
cold_start_default="07:00",
min_history=_MIN_HISTORY,
infer=_infer_quiet_end,
),
InferredParam(
key="peak_hours",
ttl_sec=86_400,
cold_start_default=[9, 14, 20],
min_history=_MIN_HISTORY,
infer=_infer_peak_hours,
),
# tz is populated from the auth provider; no infer function.
InferredParam(
key="tz",
ttl_sec=86_400,
cold_start_default="UTC",
min_history=999_999, # effectively never inferred — always cold_start
infer=None,
),
],
)
@@ -62,18 +178,23 @@ class TimeOfDayAgent(BaseAgent):
def compute(self, inp: AgentInput) -> AgentOutput:
hour = inp.now.hour
dow = inp.now.weekday() # 0=Monday … 6=Sunday
dow = inp.now.weekday()
is_weekend = dow >= 5
# agent_prefs (inferred or user-set) take precedence over ML profile features.
preferred_raw = inp.agent_prefs.get("preferred_hour", inp.profile.get("preferred_hour"))
preferred = int(preferred_raw) if preferred_raw is not None else None
quiet_start: str | None = inp.agent_prefs.get("quiet_start")
quiet_end: str | None = inp.agent_prefs.get("quiet_end")
peak_hours: list[int] = inp.agent_prefs.get("peak_hours", [])
tz: str = inp.agent_prefs.get("tz", "UTC")
in_quiet = self._in_quiet_window(hour, quiet_start, quiet_end)
in_peak = hour in peak_hours
parts = [f"It is {hour:02d}:00 on {_DOW_NAMES[dow]} ({self._label(hour)})."]
if tz != "UTC":
parts[0] = f"It is {hour:02d}:00 ({tz}) on {_DOW_NAMES[dow]} ({self._label(hour)})."
if is_weekend:
parts.append("Weekend context — prefer personal or reflective tips over work tasks.")
@@ -83,8 +204,18 @@ class TimeOfDayAgent(BaseAgent):
f"User is in their quiet window ({quiet_start}{quiet_end}) — "
"avoid urgent or demanding tips."
)
if preferred is not None:
elif in_peak:
parts.append(
f"Hour {hour:02d}:00 is a peak productivity hour for this user — "
"a high-impact or challenging tip is appropriate."
)
elif peak_hours:
# Report nearest peak so orchestrator can time advice accordingly.
nearest = min(peak_hours, key=lambda p: min(abs(p - hour), 24 - abs(p - hour)))
delta = min(abs(nearest - hour), 24 - abs(nearest - hour))
if delta <= 2:
parts.append(f"Approaching peak productivity window ({nearest:02d}:00).")
elif preferred is not None:
delta = min(abs(hour - preferred), 24 - abs(hour - preferred))
if delta == 0:
parts.append(
@@ -103,6 +234,10 @@ class TimeOfDayAgent(BaseAgent):
"preferred_hour": preferred,
"quiet_start": quiet_start,
"quiet_end": quiet_end,
"peak_hours": peak_hours,
"in_quiet": in_quiet,
"in_peak": in_peak,
"tz": tz,
}
return self._make_output(inp, prompt, snapshot)

View File

@@ -40,7 +40,7 @@ if _repo_root not in sys.path:
from ml.agents.base import AgentInput # noqa: E402
from ml.agents.registry import get_agent, all_agents, all_manifests, get_manifest # noqa: E402
from ml.agents.inference import run_inference, FeedbackEvent, UserHistory # noqa: E402
from ml.agents.inference import run_inference, FeedbackEvent, TaskCompletion, UserHistory # noqa: E402
logging_config.configure()
@@ -142,6 +142,7 @@ class AgentComputeResponse(BaseModel):
class AgentInferRequest(BaseModel):
user_id: str
feedback_history: list[dict] = [] # [{action, dwell_ms, created_at}, …]
task_completions: list[dict] = [] # [{project_id, completed_at, due_at}, …]
class AgentInferResponse(BaseModel):
@@ -284,7 +285,15 @@ async def infer_agent(agent_id: str, req: AgentInferRequest) -> AgentInferRespon
)
for e in req.feedback_history
]
history = UserHistory(user_id=req.user_id, events=events)
completions = [
TaskCompletion(
project_id=c.get("project_id"),
completed_at=c.get("completed_at", ""),
due_at=c.get("due_at", ""),
)
for c in req.task_completions
]
history = UserHistory(user_id=req.user_id, events=events, task_completions=completions)
t0 = __import__("time").monotonic()
inferred = run_inference(manifest, history)