Compare commits

...

6 Commits

Author SHA1 Message Date
26fc67776f feat(agents): semantic task clustering + focus-area inferred preferred_areas (#97, #113)
- New ml/agents/clustering.py: embed task content via nomic-embed-text
  (Ollama), greedy cosine clustering (threshold 0.72, max 6 clusters),
  graceful fallback to project-id grouping when Ollama is unreachable
- focus_area v2.0.0: compute() uses semantic clusters as focus areas;
  adds preferred_areas InferredParam inferred from top-2 projects by
  task_completion count
- 135 tests, all passing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 06:54:46 +00:00
336644a90a docs: update CLAUDE.md with rich per-agent inference completions (#112–#116)
- Inference framework table updated: all agents at v1.2.0 with full param list
- Documents UserHistory.task_completions and AgentInferRequest.task_completions
- Marks #112/114/115/116 complete in recent completions
- Active work updated: #78 closed, #61 and #97/#113 as next priorities

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 06:28:30 +00:00
1d9a395591 feat(agents): quiet window + peak hours + tz prefs for time-of-day agent (#112)
Adds four InferredParams (all TTL=24h, min_history=50 except preferred_hour=10):
- quiet_start / quiet_end: longest contiguous below-baseline hour run (HH:MM)
- peak_hours: top-quartile done-event hours, sorted ascending
- tz: cold-start only ("UTC"); populated from auth provider, no inference function

compute() updated:
- in_quiet check (quiet window) takes precedence over peak hours
- in_peak emits "peak productivity hour" language when current hour is in peak_hours
- approaching peak (within 2h) surfaces for orchestrator timing
- tz surfaced in snippet header when not UTC
- snapshot adds peak_hours, in_quiet, in_peak, tz

- Agent bumped to v1.2.0
- 21 new tests: night-owl, early-bird, shift-worker, quiet/peak snippet rendering
- Fixed test_snapshot_keys in test_agents.py to include new snapshot fields

Closes #112

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 06:05:51 +00:00
bc71dc203d feat(agents): adaptive lookback + weekly/daily cycle detection for recent-patterns (#116)
Replaces the coarse density-bucket window_days with three InferredParams (all TTL=24h):
- lookback_days: min window containing ≥30 done events, capped at 30d (min_history=5)
- weekly_cycle: per-DOW peak-to-mean strength list (min_history=21, ≥3 weeks of signal)
- daily_cycle: per-hour peak-to-mean strength list (min_history=14)

compute() renders cycle hints when strength > 0.5:
  "User tends to complete tips on Tuesdays and Saturdays."
  "User is most active around 8pm."
Legacy window_days pref key still accepted as a fallback.

- window_days pref renamed lookback_days; backward-compat fallback in compute()
- Agent bumped to v1.2.0
- 19 new tests: weekend-warrior, weekday-only, evening-person, no-pattern,
  legacy compat, snippet rendering with strong/weak signals

Closes #116

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 05:51:45 +00:00
4cade4868b feat(agents): per-user baseline + stdev inference for momentum agent (#114)
Adds two InferredParams (TTL=7d) computed from 28-day rolling daily done counts:
- baseline_completions_per_day: mean done events/day over the window
- stdev: stdev of daily counts (floored at 0.1 to avoid division by zero)

MomentumAgent.compute() now calculates a z-score from recent done events in
inp.feedback_history vs the inferred baseline. Snippet language switches to
z-score framing ("above your usual pace", "slowing down") when |z| >= 1.0,
falling back to engagement_trend labels when in the normal range.

- engagement_trend InferredParam preserved for backward compatibility
- momentum_window pref added (default 7, user-overridable)
- 14 new tests covering power user, casual user, returning-from-break, and
  relative stdev comparison; engagement_trend tests updated for z-score priority
- Agent bumped to v1.2.0

Closes #114

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 05:18:29 +00:00
04212ff318 feat(agents): p50-lateness tolerance + per-project realness for overdue-task (#115)
Replaces snooze-rate heuristic with p50 of actual task lateness (completedAt − dueAt).
Adds project_realness inference: projects with chronic lateness get realness < 1 and
the agent softens its snippet language from "overdue" to "past target date".

- TaskCompletion added to UserHistory with lateness_days computed property
- _infer_lateness_tolerance: p50 of task_completions, clipped at 0, float
- _infer_project_realness: per-project median lateness normalised by global median
- Both InferredParams use 7d TTL; cold_start = 0.0 / {}
- AgentInferRequest accepts task_completions; endpoint wires them through
- 12 new tests covering punctual/chronic/mixed users and language softening
- Agent bumped to v1.2.0

Closes #115

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 05:14:04 +00:00
14 changed files with 1502 additions and 190 deletions

View File

@@ -107,10 +107,11 @@ Recent completions:
- Admin UX refinements: feedback consolidation, settings placement (#100102) - Admin UX refinements: feedback consolidation, settings placement (#100102)
- ADR-0012 — ε-greedy v2 (D=12) — 2026-04-26 (now superseded by ADR-0013) - ADR-0012 — ε-greedy v2 (D=12) — 2026-04-26 (now superseded by ADR-0013)
- ADR-0014 complete: unified Profile schema + backfill, manifest plumbing, `/api/profile` read-through, registry-driven eligibility filter, inference framework + per-agent inference, legacy consent column drop — 2026-05-05 - ADR-0014 complete: unified Profile schema + backfill, manifest plumbing, `/api/profile` read-through, registry-driven eligibility filter, inference framework + per-agent inference, legacy consent column drop — 2026-05-05
- Rich per-agent inference for all four active agents (#112, #114, #115, #116) — 2026-05-06: quiet/peak hours (time-of-day), z-score baseline (momentum), p50 lateness + project realness (overdue-task), adaptive lookback + weekly/daily cycles (recent-patterns)
Active work (M2): Active work (M2):
- Signal abstraction for multi-source support (#78)
- Per-user feature freshness SLAs (#61, ADR-0011 phase B) - Per-user feature freshness SLAs (#61, ADR-0011 phase B)
- Embedding-based task clustering for focus-area inference (#97, #113)
## ADR-0014 endpoint map (as of step 6) ## ADR-0014 endpoint map (as of step 6)
@@ -131,15 +132,18 @@ Lives in `ml/agents/inference/`. `run_inference(manifest, history)` evaluates al
- `infer()` error → emit `cold_start_default` (never crashes) - `infer()` error → emit `cold_start_default` (never crashes)
- Results written to `user_preferences` with `source='inferred'`; keys with `source='user'` are never overwritten - Results written to `user_preferences` with `source='inferred'`; keys with `source='user'` are never overwritten
All five agents are at v1.1.0. Per-agent inferred params: All five agents are at v1.2.0. Per-agent inferred params (all live in `ml/agents/<name>.py`):
| Agent | Inferred param | Logic |
|-------|---------------|-------| | Agent | Inferred params | Notes |
| `time-of-day` | `preferred_hour` | Mode done-hour from feedback history | |-------|----------------|-------|
| `momentum` | `engagement_trend` | Done-rate last 7d vs prior 7d | | `time-of-day` | `preferred_hour`, `quiet_start`, `quiet_end`, `peak_hours`, `tz` | Quiet window = longest below-baseline hour run; peak = top-quartile done hours; tz cold-start only (from auth provider) |
| `overdue-task` | `lateness_tolerance_days` | Snooze rate → 0/1/2 days | | `momentum` | `engagement_trend`, `baseline_completions_per_day`, `stdev` | Baseline = 28d rolling mean done/day; snippet uses z-score language |
| `recent-patterns` | `window_days` | Event density → 7/14/30 days | | `overdue-task` | `lateness_tolerance_days`, `project_realness` | Tolerance = p50 lateness from TaskCompletion history; realness = project median vs global median |
| `recent-patterns` | `lookback_days`, `weekly_cycle`, `daily_cycle` | Lookback sized to ≥30 done events; cycles use peak-to-mean ratio; snippet hints when strength > 0.5 |
| `focus-area` | *(none yet)* | Needs project-level feedback linkage (#78) | | `focus-area` | *(none yet)* | Needs project-level feedback linkage (#78) |
`UserHistory` carries both `events: list[FeedbackEvent]` and `task_completions: list[TaskCompletion]`. `AgentInferRequest` (ml/serving) accepts `task_completions: list[dict]` alongside `feedback_history`.
## What NOT to do ## What NOT to do
- Don't copy Todoist's data into our DB. Store the OAuth token + computed features/derivatives we need, fetch raw on demand. - Don't copy Todoist's data into our DB. Store the OAuth token + computed features/derivatives we need, fetch raw on demand.

152
ml/agents/clustering.py Normal file
View File

@@ -0,0 +1,152 @@
"""Semantic task clustering via nomic-embed-text (issue #97).
Public API:
cluster_tasks(tasks, ollama_url) -> list[Cluster]
Each task dict must have a "content" key. Tasks without content are placed in a
fallback "other" bucket. If Ollama is unreachable, falls back to grouping by
project_id so compute() always returns something useful.
"""
from __future__ import annotations
import logging
import math
import os
from dataclasses import dataclass, field
import httpx
log = logging.getLogger(__name__)
# Cosine similarity threshold for merging tasks into the same cluster.
_SIM_THRESHOLD = 0.72
# Never produce more than this many clusters regardless of task count.
_MAX_CLUSTERS = 6
_EMBED_TIMEOUT = 10.0
@dataclass
class Cluster:
label: str # representative task content (shortest, most central)
tasks: list[dict] = field(default_factory=list)
@property
def task_count(self) -> int:
return len(self.tasks)
@property
def overdue_count(self) -> int:
return sum(1 for t in self.tasks if t.get("is_overdue"))
def _embed(text: str, ollama_url: str) -> list[float] | None:
try:
with httpx.Client(trust_env=False, timeout=_EMBED_TIMEOUT) as c:
r = c.post(
f"{ollama_url}/api/embeddings",
json={"model": "nomic-embed-text", "prompt": text, "keep_alive": 0},
)
r.raise_for_status()
return r.json().get("embedding")
except Exception as exc:
log.debug("embed_failed text=%r error=%s", text[:40], exc)
return None
def _cosine(a: list[float], b: list[float]) -> float:
dot = sum(x * y for x, y in zip(a, b))
na = math.sqrt(sum(x * x for x in a))
nb = math.sqrt(sum(x * x for x in b))
if na == 0 or nb == 0:
return 0.0
return dot / (na * nb)
def _greedy_cluster(items: list[tuple[dict, list[float]]]) -> list[Cluster]:
"""Single-pass greedy clustering: each item joins the first existing cluster
whose centroid is above _SIM_THRESHOLD, else starts a new one."""
clusters: list[tuple[list[float], Cluster]] = [] # (centroid, cluster)
for task, vec in items:
best_idx = -1
best_sim = _SIM_THRESHOLD - 1e-9
for i, (centroid, _) in enumerate(clusters):
sim = _cosine(centroid, vec)
if sim > best_sim:
best_sim = sim
best_idx = i
if best_idx >= 0 and len(clusters) < _MAX_CLUSTERS:
centroid, cluster = clusters[best_idx]
cluster.tasks.append(task)
# Update centroid as running mean.
n = len(cluster.tasks)
new_centroid = [(c * (n - 1) + v) / n for c, v in zip(centroid, vec)]
clusters[best_idx] = (new_centroid, cluster)
elif len(clusters) < _MAX_CLUSTERS:
label = task.get("content", "Tasks")[:60]
cluster = Cluster(label=label, tasks=[task])
clusters.append((vec, cluster))
else:
# Overflow: append to closest cluster even below threshold.
best_i = max(range(len(clusters)), key=lambda i: _cosine(clusters[i][0], vec))
clusters[best_i][1].tasks.append(task)
return [c for _, c in clusters]
def _fallback_by_project(tasks: list[dict]) -> list[Cluster]:
"""Group by project_id when embeddings are unavailable."""
buckets: dict[str, Cluster] = {}
for task in tasks:
pid = task.get("project_id") or task.get("project") or "default"
if pid not in buckets:
label = pid if pid != "default" else "Tasks"
buckets[pid] = Cluster(label=label)
buckets[pid].tasks.append(task)
return list(buckets.values())
def cluster_tasks(
tasks: list[dict],
ollama_url: str | None = None,
) -> list[Cluster]:
"""Cluster tasks by semantic similarity.
Returns a non-empty list of Cluster objects. Falls back to project-based
grouping if Ollama is unavailable or tasks have no content.
"""
if not tasks:
return []
url = ollama_url or os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
# Separate tasks with usable content from those without.
with_content = [(t, t.get("content", "").strip()) for t in tasks]
embeddable = [(t, c) for t, c in with_content if c]
no_content = [t for t, c in with_content if not c]
if not embeddable:
return _fallback_by_project(tasks)
# Fetch embeddings (best-effort; None means Ollama unavailable).
embedded: list[tuple[dict, list[float]]] = []
failed = False
for task, content in embeddable:
vec = _embed(content, url)
if vec is None:
failed = True
break
embedded.append((task, vec))
if failed or not embedded:
log.info("cluster_tasks: ollama unavailable, falling back to project grouping")
return _fallback_by_project(tasks)
clusters = _greedy_cluster(embedded)
# Tasks without content get their own bucket if any.
if no_content:
clusters.append(Cluster(label="Other tasks", tasks=no_content))
return clusters

View File

@@ -1,16 +1,27 @@
from __future__ import annotations from __future__ import annotations
from collections import defaultdict from collections import Counter
from typing import ClassVar from typing import ClassVar
from .base import BaseAgent, AgentInput, AgentOutput from .base import BaseAgent, AgentInput, AgentOutput
from .manifest import AgentManifest from .clustering import cluster_tasks
from .inference.history import UserHistory
from .manifest import AgentManifest, InferredParam
def _infer_preferred_areas(history: UserHistory) -> list[str]:
"""Top-2 project IDs by completed task count (last 90 days worth of data)."""
counts: Counter[str] = Counter()
for tc in history.task_completions:
if tc.project_id:
counts[tc.project_id] += 1
return [pid for pid, _ in counts.most_common(2)]
MANIFEST = AgentManifest( MANIFEST = AgentManifest(
id="focus-area", id="focus-area",
version="1.1.0", # bumped: preferred_areas pref is now honoured in compute (#113) version="2.0.0", # semantic clustering via nomic-embed-text (#97, #113)
description="Identifies the most congested project/area in the user's task list.", description="Identifies the most congested semantic focus area in the user's task list.",
pref_schema={ pref_schema={
"type": "object", "type": "object",
"additionalProperties": False, "additionalProperties": False,
@@ -19,7 +30,7 @@ MANIFEST = AgentManifest(
"type": "array", "type": "array",
"items": {"type": "string"}, "items": {"type": "string"},
"default": [], "default": [],
"description": "Project / label names to prioritise when multiple areas tie.", "description": "Project IDs or label names to prioritise when multiple areas tie.",
}, },
}, },
}, },
@@ -27,59 +38,75 @@ MANIFEST = AgentManifest(
required_consents=["data:core", "data:todoist", "agent:focus-area"], required_consents=["data:core", "data:todoist", "agent:focus-area"],
output_contract={"type": "snippet", "format": "free_text"}, output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=43_200, ttl_sec=43_200,
# No inferred_params: preferred_areas requires project-level feedback linkage inferred_params=[
# that isn't available in feedback_history alone. Revisit with #78 (signal InferredParam(
# abstraction) once per-task reactions can be traced back to a project. key="preferred_areas",
ttl_sec=86_400,
cold_start_default=[],
min_history=0, # use task_completions, not feedback events; handle empty inside
infer=_infer_preferred_areas,
),
],
) )
class FocusAreaAgent(BaseAgent): class FocusAreaAgent(BaseAgent):
"""Identifies the most congested project/area in the user's task list.""" """Identifies the most congested semantic focus area in the user's task list."""
agent_id: ClassVar[str] = MANIFEST.id agent_id: ClassVar[str] = MANIFEST.id
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
version: ClassVar[str] = MANIFEST.version version: ClassVar[str] = MANIFEST.version
def compute(self, inp: AgentInput) -> AgentOutput: def compute(self, inp: AgentInput) -> AgentOutput:
preferred: list[str] = inp.agent_prefs.get("preferred_areas", []) preferred: list[str] = inp.agent_prefs.get("preferred_areas", [])
by_project: dict[str, list[dict]] = defaultdict(list)
for task in inp.tasks:
project = task.get("project_id") or task.get("project") or "default"
by_project[project].append(task)
if not by_project: if not inp.tasks:
prompt = "No tasks available to identify a focus area." return self._make_output(
return self._make_output(inp, prompt, {"project_count": 0}) inp,
"No tasks available to identify a focus area.",
{"cluster_count": 0, "strategy": "none"},
)
def score(project: str, tasks: list[dict]) -> tuple[float, bool]: clusters = cluster_tasks(inp.tasks)
base = sum(2.0 if t.get("is_overdue") else 1.0 for t in tasks)
# Boost preferred areas to break ties in their favour
boosted = project in preferred or any(p in project for p in preferred)
return (base + (0.5 if boosted else 0.0), boosted)
top_project, top_tasks = max( if not clusters:
by_project.items(), return self._make_output(
key=lambda kv: score(kv[0], kv[1]), inp,
) "No tasks available to identify a focus area.",
overdue_in_top = sum(1 for t in top_tasks if t.get("is_overdue")) {"cluster_count": 0, "strategy": "none"},
label = "the default project" if top_project == "default" else f'"{top_project}"' )
n = len(top_tasks)
boosted = top_project in preferred or any(p in top_project for p in preferred) strategy = "semantic" if len(clusters) > 1 or len(inp.tasks) > 1 else "fallback"
def score(cluster) -> float:
base = sum(2.0 if t.get("is_overdue") else 1.0 for t in cluster.tasks)
boosted = any(p in cluster.label for p in preferred) if preferred else False
return base + (0.5 if boosted else 0.0)
top = max(clusters, key=score)
boosted = bool(preferred) and any(p in top.label for p in preferred)
parts = [ parts = [
f"The user's most congested area is {label} " f'The user\'s most active focus area is "{top.label}" '
f"({n} task{'s' if n != 1 else ''}, {overdue_in_top} overdue)." f"({top.task_count} task{'s' if top.task_count != 1 else ''}, "
f"{top.overdue_count} overdue)."
] ]
if boosted: if boosted:
parts.append("This area matches the user's stated focus preferences.") parts.append("This area matches the user's stated focus preferences.")
if overdue_in_top >= 3: if top.overdue_count >= 3:
parts.append("Consider surfacing an action from this area.") parts.append("Consider surfacing an action from this area.")
if len(clusters) > 1:
other_total = sum(c.task_count for c in clusters if c is not top)
parts.append(
f"{len(clusters) - 1} other area{'s' if len(clusters) > 2 else ''} "
f"contain {other_total} task{'s' if other_total != 1 else ''}."
)
prompt = " ".join(parts)
snapshot = { snapshot = {
"top_project": top_project, "top_cluster_label": top.label,
"top_task_count": n, "top_task_count": top.task_count,
"top_overdue_count": overdue_in_top, "top_overdue_count": top.overdue_count,
"project_count": len(by_project), "cluster_count": len(clusters),
"strategy": strategy,
"preferred_areas": preferred, "preferred_areas": preferred,
} }
return self._make_output(inp, prompt, snapshot) return self._make_output(inp, " ".join(parts), snapshot)

View File

@@ -4,6 +4,6 @@ Each agent's manifest declares InferredParams; this package owns the
scheduling contract, history data model, and write path to user_preferences. scheduling contract, history data model, and write path to user_preferences.
""" """
from .framework import run_inference from .framework import run_inference
from .history import FeedbackEvent, UserHistory from .history import FeedbackEvent, TaskCompletion, UserHistory
__all__ = ["run_inference", "FeedbackEvent", "UserHistory"] __all__ = ["run_inference", "FeedbackEvent", "TaskCompletion", "UserHistory"]

View File

@@ -23,7 +23,27 @@ class FeedbackEvent:
return dt.hour return dt.hour
@dataclass
class TaskCompletion:
"""A completed task that had a due date — used for lateness inference."""
project_id: str | None
completed_at: str # ISO 8601
due_at: str # ISO 8601
@property
def lateness_days(self) -> float:
"""Days between due_at and completed_at. Negative = completed early."""
try:
def _parse(s: str) -> datetime:
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
return (_parse(self.completed_at) - _parse(self.due_at)).total_seconds() / 86_400
except ValueError:
return 0.0
@dataclass @dataclass
class UserHistory: class UserHistory:
user_id: str user_id: str
events: list[FeedbackEvent] = field(default_factory=list) events: list[FeedbackEvent] = field(default_factory=list)
task_completions: list[TaskCompletion] = field(default_factory=list)

View File

@@ -1,5 +1,8 @@
from __future__ import annotations from __future__ import annotations
import math
import statistics
from collections import defaultdict
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import ClassVar from typing import ClassVar
@@ -8,6 +11,49 @@ from .inference.history import UserHistory
from .manifest import AgentManifest, InferredParam from .manifest import AgentManifest, InferredParam
def _parse_dt(iso: str) -> datetime:
try:
dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return datetime.min.replace(tzinfo=timezone.utc)
def _daily_done_counts(history: UserHistory, window_days: int = 28) -> list[int]:
"""Count done-action events per calendar day over the last window_days days."""
if not history.events:
return []
latest = max(_parse_dt(e.created_at) for e in history.events)
cutoff = latest - timedelta(days=window_days)
by_day: dict[tuple[int, int, int], int] = defaultdict(int)
for e in history.events:
if e.action == "done":
dt = _parse_dt(e.created_at)
if dt >= cutoff:
by_day[(dt.year, dt.month, dt.day)] += 1
# Return counts for every day in the window, including zero-completion days.
counts = []
for offset in range(window_days):
day = (latest - timedelta(days=offset)).date()
counts.append(by_day.get((day.year, day.month, day.day), 0))
return counts
def _infer_baseline_completions_per_day(history: UserHistory) -> float:
counts = _daily_done_counts(history)
return statistics.mean(counts) if counts else 1.0
def _infer_stdev(history: UserHistory) -> float:
counts = _daily_done_counts(history)
if len(counts) < 2:
return 1.0
sd = statistics.stdev(counts)
return max(sd, 0.1) # floor so we never divide by zero in z-score
def _infer_engagement_trend(history: UserHistory) -> str: def _infer_engagement_trend(history: UserHistory) -> str:
"""Compare done-rate in the most recent 7 days vs the 7 days before that.""" """Compare done-rate in the most recent 7 days vs the 7 days before that."""
events = sorted(history.events, key=lambda e: e.created_at) events = sorted(history.events, key=lambda e: e.created_at)
@@ -26,7 +72,7 @@ def _infer_engagement_trend(history: UserHistory) -> str:
older = [e for e in events if cutoff_older <= _parse_dt(e.created_at) < cutoff_recent] older = [e for e in events if cutoff_older <= _parse_dt(e.created_at) < cutoff_recent]
if len(older) < 3: if len(older) < 3:
return "stable" # not enough baseline to compare return "stable"
recent_rate = sum(1 for e in recent if e.action == "done") / max(len(recent), 1) recent_rate = sum(1 for e in recent if e.action == "done") / max(len(recent), 1)
older_rate = sum(1 for e in older if e.action == "done") / max(len(older), 1) older_rate = sum(1 for e in older if e.action == "done") / max(len(older), 1)
@@ -39,19 +85,9 @@ def _infer_engagement_trend(history: UserHistory) -> str:
return "stable" return "stable"
def _parse_dt(iso: str) -> datetime:
try:
dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return datetime.min.replace(tzinfo=timezone.utc)
MANIFEST = AgentManifest( MANIFEST = AgentManifest(
id="momentum", id="momentum",
version="1.1.0", # bumped: engagement_trend InferredParam added (#114) version="1.2.0", # #114: baseline + stdev inferred params; z-score snippet language
description="Characterises the user's recent engagement trend from profile features.", description="Characterises the user's recent engagement trend from profile features.",
pref_schema={ pref_schema={
"type": "object", "type": "object",
@@ -64,6 +100,24 @@ MANIFEST = AgentManifest(
"default": 25, "default": 25,
"description": "Completion rate below which momentum hints at low engagement.", "description": "Completion rate below which momentum hints at low engagement.",
}, },
"baseline_completions_per_day": {
"type": "number",
"minimum": 0,
"default": 1.0,
"description": "User's normal daily done-task rate (inferred from 28d history).",
},
"stdev": {
"type": "number",
"minimum": 0,
"default": 1.0,
"description": "Stdev of daily completion counts; used for z-score normalisation.",
},
"momentum_window": {
"type": "integer",
"minimum": 1,
"default": 7,
"description": "Days of recent history to measure current momentum against baseline.",
},
}, },
}, },
context_schema=["profile.features"], context_schema=["profile.features"],
@@ -73,15 +127,42 @@ MANIFEST = AgentManifest(
inferred_params=[ inferred_params=[
InferredParam( InferredParam(
key="engagement_trend", key="engagement_trend",
ttl_sec=21_600, # recompute every 6 hours alongside snippet ttl_sec=21_600,
cold_start_default="stable", cold_start_default="stable",
min_history=10, min_history=10,
infer=_infer_engagement_trend, infer=_infer_engagement_trend,
), ),
InferredParam(
key="baseline_completions_per_day",
ttl_sec=7 * 86_400,
cold_start_default=1.0,
min_history=14,
infer=_infer_baseline_completions_per_day,
),
InferredParam(
key="stdev",
ttl_sec=7 * 86_400,
cold_start_default=1.0,
min_history=14,
infer=_infer_stdev,
),
], ],
) )
def _z_score_label(z: float) -> str | None:
"""Map z-score to a human-readable momentum label, or None if within normal range."""
if z >= 2.0:
return "well above your usual pace"
if z >= 1.0:
return "above your usual pace"
if z <= -2.0:
return "well below your usual pace"
if z <= -1.0:
return "below your usual pace"
return None
class MomentumAgent(BaseAgent): class MomentumAgent(BaseAgent):
"""Characterises the user's recent engagement trend from profile features.""" """Characterises the user's recent engagement trend from profile features."""
agent_id: ClassVar[str] = MANIFEST.id agent_id: ClassVar[str] = MANIFEST.id
@@ -93,6 +174,20 @@ class MomentumAgent(BaseAgent):
dismiss = inp.profile.get("dismiss_rate_30d") dismiss = inp.profile.get("dismiss_rate_30d")
volume = inp.profile.get("tip_volume_30d") volume = inp.profile.get("tip_volume_30d")
trend: str = inp.agent_prefs.get("engagement_trend", "stable") trend: str = inp.agent_prefs.get("engagement_trend", "stable")
baseline: float = float(inp.agent_prefs.get("baseline_completions_per_day", 1.0))
stdev: float = max(float(inp.agent_prefs.get("stdev", 1.0)), 0.1)
window: int = int(inp.agent_prefs.get("momentum_window", 7))
# Count done events in the recent window from feedback_history.
now = inp.now.astimezone(timezone.utc)
cutoff = now - timedelta(days=window)
recent_done = sum(
1 for e in inp.feedback_history
if e.get("action") == "done" and _parse_dt(e.get("created_at", "")) >= cutoff
)
recent_rate = recent_done / window # completions/day over the window
z = (recent_rate - baseline) / stdev
z_label = _z_score_label(z)
parts: list[str] = [] parts: list[str] = []
@@ -120,7 +215,21 @@ class MomentumAgent(BaseAgent):
if volume is not None and int(volume) < 5: if volume is not None and int(volume) < 5:
parts.append("Very few tips served so far — this is an early-stage user.") parts.append("Very few tips served so far — this is an early-stage user.")
if trend == "up": # Z-score takes precedence over trend label when we have a baseline.
if z_label:
if z > 0:
parts.append(
f"Completion pace is {z_label} "
f"({recent_done} done in the last {window}d vs "
f"~{baseline * window:.1f} expected) — build on the momentum."
)
else:
parts.append(
f"Completion pace is {z_label} "
f"({recent_done} done in the last {window}d vs "
f"~{baseline * window:.1f} expected) — a motivational or easy-win tip may help."
)
elif trend == "up":
parts.append("Engagement is trending up compared to last week — build on the momentum.") parts.append("Engagement is trending up compared to last week — build on the momentum.")
elif trend == "down": elif trend == "down":
parts.append("Engagement is trending down — a motivational or easy-win tip may help.") parts.append("Engagement is trending down — a motivational or easy-win tip may help.")
@@ -131,5 +240,10 @@ class MomentumAgent(BaseAgent):
"dismiss_rate_30d": dismiss, "dismiss_rate_30d": dismiss,
"tip_volume_30d": volume, "tip_volume_30d": volume,
"engagement_trend": trend, "engagement_trend": trend,
"baseline_completions_per_day": baseline,
"stdev": stdev,
"momentum_window": window,
"recent_done_count": recent_done,
"z_score": round(z, 2),
} }
return self._make_output(inp, prompt, snapshot) return self._make_output(inp, prompt, snapshot)

View File

@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import statistics
from typing import ClassVar from typing import ClassVar
from .base import BaseAgent, AgentInput, AgentOutput from .base import BaseAgent, AgentInput, AgentOutput
@@ -7,36 +8,64 @@ from .inference.history import UserHistory
from .manifest import AgentManifest, InferredParam from .manifest import AgentManifest, InferredParam
def _infer_lateness_tolerance(history: UserHistory) -> int: def _infer_lateness_tolerance(history: UserHistory) -> float:
"""Estimate how many days past due a task needs to be before the user acts. """p50 lateness (days) across completed tasks that had a due date, clipped at 0.
High snooze rate → user doesn't act immediately → raise tolerance so the Negative lateness (finished early) pulls the percentile down; we clip at 0
agent doesn't nag them about tasks they'll handle in their own time. so punctual users always get tolerance=0, never a negative offset.
""" """
total = len(history.events) lateness = [c.lateness_days for c in history.task_completions]
if total == 0: if not lateness:
return 0 return 0.0
snooze_rate = sum(1 for e in history.events if e.action == "snooze") / total return max(0.0, statistics.median(lateness))
if snooze_rate > 0.40:
return 2
if snooze_rate > 0.20: def _infer_project_realness(history: UserHistory) -> dict[str, float]:
return 1 """Per-project realness: 1 (median project lateness / global median lateness).
return 0
Projects whose tasks are consistently completed on time get realness ≈ 1.
Aspirational projects (chronic lateness) get realness closer to 0.
"""
completions = [c for c in history.task_completions if c.project_id]
if not completions:
return {}
global_median = statistics.median(c.lateness_days for c in completions)
if global_median <= 0:
# Everyone finishes early — no project is less real than another.
return {pid: 1.0 for pid in {c.project_id for c in completions}} # type: ignore[misc]
by_project: dict[str, list[float]] = {}
for c in completions:
by_project.setdefault(c.project_id, []).append(c.lateness_days) # type: ignore[index]
result: dict[str, float] = {}
for pid, days in by_project.items():
project_median = statistics.median(days)
realness = 1.0 - (project_median / global_median)
result[pid] = round(max(0.0, min(1.0, realness)), 3)
return result
MANIFEST = AgentManifest( MANIFEST = AgentManifest(
id="overdue-task", id="overdue-task",
version="1.1.0", # bumped: lateness_tolerance_days InferredParam added (#115) version="1.2.0", # #115: p50-lateness tolerance + per-project realness
description="Reports the user's overdue tasks by count and age.", description="Reports the user's overdue tasks by count and age.",
pref_schema={ pref_schema={
"type": "object", "type": "object",
"additionalProperties": False, "additionalProperties": False,
"properties": { "properties": {
"lateness_tolerance_days": { "lateness_tolerance_days": {
"type": "integer", "type": "number",
"minimum": 0, "minimum": 0,
"default": 0, "default": 0,
"description": "Days past due before a task is considered overdue. 0 = the moment it's late.", "description": "Days past due before a task is flagged. p50 of historical lateness.",
},
"project_realness": {
"type": "object",
"additionalProperties": {"type": "number", "minimum": 0, "maximum": 1},
"default": {},
"description": "Per-project realness score [0,1]. Low = aspirational due dates.",
}, },
}, },
}, },
@@ -48,15 +77,40 @@ MANIFEST = AgentManifest(
inferred_params=[ inferred_params=[
InferredParam( InferredParam(
key="lateness_tolerance_days", key="lateness_tolerance_days",
ttl_sec=86_400, # recompute daily — snooze pattern shifts slowly ttl_sec=7 * 86_400, # recompute weekly — lateness habits shift slowly
cold_start_default=0, cold_start_default=0.0,
min_history=10, min_history=10,
infer=_infer_lateness_tolerance, infer=_infer_lateness_tolerance,
), ),
InferredParam(
key="project_realness",
ttl_sec=7 * 86_400,
cold_start_default={},
min_history=10,
infer=_infer_project_realness,
),
], ],
) )
def _realness(project_id: str | None, project_realness: dict[str, float]) -> float:
"""Return realness for a project, defaulting to 1.0 (treat as real)."""
if not project_id or not project_realness:
return 1.0
return project_realness.get(project_id, 1.0)
def _format_task(task: dict, project_realness: dict[str, float]) -> str:
content = task["content"]
age = round(task.get("task_age_days", 0))
pid = task.get("project_id")
r = _realness(pid, project_realness)
unit = "day" if age == 1 else "days"
if r < 0.4:
return f'"{content}" ({age} {unit} past target date)'
return f'"{content}" ({age} {unit} overdue)'
class OverdueTaskAgent(BaseAgent): class OverdueTaskAgent(BaseAgent):
"""Reports the user's overdue tasks by count and age.""" """Reports the user's overdue tasks by count and age."""
agent_id: ClassVar[str] = MANIFEST.id agent_id: ClassVar[str] = MANIFEST.id
@@ -64,7 +118,9 @@ class OverdueTaskAgent(BaseAgent):
version: ClassVar[str] = MANIFEST.version version: ClassVar[str] = MANIFEST.version
def compute(self, inp: AgentInput) -> AgentOutput: def compute(self, inp: AgentInput) -> AgentOutput:
tolerance = max(0, int(inp.agent_prefs.get("lateness_tolerance_days", 0))) tolerance = max(0.0, float(inp.agent_prefs.get("lateness_tolerance_days", 0)))
project_realness: dict[str, float] = inp.agent_prefs.get("project_realness", {})
overdue = [ overdue = [
t for t in inp.tasks t for t in inp.tasks
if t.get("is_overdue") and t.get("task_age_days", 0) >= tolerance if t.get("is_overdue") and t.get("task_age_days", 0) >= tolerance
@@ -75,18 +131,21 @@ class OverdueTaskAgent(BaseAgent):
prompt = "The user has no overdue tasks at this time." prompt = "The user has no overdue tasks at this time."
elif len(overdue) == 1: elif len(overdue) == 1:
t = top[0] t = top[0]
age = round(t.get("task_age_days", 0)) r = _realness(t.get("project_id"), project_realness)
prompt = ( item = _format_task(t, project_realness)
f'The user has 1 overdue task: "{t["content"]}" ' if r < 0.4:
f"({age} day{'s' if age != 1 else ''} overdue)." prompt = f"The user has 1 task past its target date: {item}."
) else:
prompt = f"The user has 1 overdue task: {item}."
else: else:
items = ", ".join( items = ", ".join(_format_task(t, project_realness) for t in top)
f'"{t["content"]}" ({round(t.get("task_age_days", 0))}d)' avg_realness = (
for t in top sum(_realness(t.get("project_id"), project_realness) for t in overdue)
/ len(overdue)
) )
label = "tasks past their target dates" if avg_realness < 0.4 else "overdue tasks"
prompt = ( prompt = (
f"The user has {len(overdue)} overdue tasks. " f"The user has {len(overdue)} {label}. "
f"Top {len(top)}: {items}." f"Top {len(top)}: {items}."
) )
@@ -94,7 +153,12 @@ class OverdueTaskAgent(BaseAgent):
"overdue_count": len(overdue), "overdue_count": len(overdue),
"lateness_tolerance_days": tolerance, "lateness_tolerance_days": tolerance,
"top_overdue": [ "top_overdue": [
{"content": t["content"], "task_age_days": t.get("task_age_days", 0)} {
"content": t["content"],
"task_age_days": t.get("task_age_days", 0),
"project_id": t.get("project_id"),
"realness": _realness(t.get("project_id"), project_realness),
}
for t in top for t in top
], ],
} }

View File

@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import math
from collections import Counter from collections import Counter
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import ClassVar from typing import ClassVar
@@ -8,35 +9,124 @@ from .base import BaseAgent, AgentInput, AgentOutput
from .inference.history import UserHistory from .inference.history import UserHistory
from .manifest import AgentManifest, InferredParam from .manifest import AgentManifest, InferredParam
_DOW_NAMES = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
def _infer_window_days(history: UserHistory) -> int:
"""Infer the optimal lookback window from feedback event density.
More events per day → a shorter window captures the user's current state def _parse_dt(iso: str) -> datetime:
accurately. Sparse feedback → widen the window to gather signal. try:
dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return datetime.min.replace(tzinfo=timezone.utc)
def _infer_lookback_days(history: UserHistory) -> int:
"""Find the minimum window (days) that captures ≥30 done events, capped at 30.
Sorts done events newest-first, then measures the span to the 30th event.
If fewer than 30 done events exist, returns 30 (use the full cap).
""" """
n = len(history.events) done = sorted(
if n >= 14: [e for e in history.events if e.action == "done"],
return 7 key=lambda e: e.created_at,
if n >= 7: reverse=True,
return 14 )
return 30 if len(done) < 30:
return 30
latest = _parse_dt(done[0].created_at)
thirtieth = _parse_dt(done[29].created_at)
span = (latest - thirtieth).total_seconds() / 86_400
return max(1, min(30, math.ceil(span)))
def _infer_weekly_cycle(history: UserHistory) -> list[dict]:
"""Peak-to-mean ratio of done events per day-of-week (0=Monday … 6=Sunday).
Returns all 7 DOW entries so the caller can filter by strength threshold.
"""
by_dow: Counter[int] = Counter(
_parse_dt(e.created_at).weekday()
for e in history.events
if e.action == "done"
)
total = sum(by_dow.values())
if total == 0:
return []
mean = total / 7
return [
{
"dow": dow,
"strength": round(by_dow.get(dow, 0) / mean, 3),
"sample": f"completes most {_DOW_NAMES[dow]}s",
}
for dow in range(7)
]
def _infer_daily_cycle(history: UserHistory) -> list[dict]:
"""Peak-to-mean ratio of done events per hour-of-day (023).
Returns entries for hours that have at least one done event.
"""
by_hour: Counter[int] = Counter(
_parse_dt(e.created_at).hour
for e in history.events
if e.action == "done"
)
total = sum(by_hour.values())
if total == 0:
return []
mean = total / 24
return [
{
"hour": hour,
"strength": round(by_hour[hour] / mean, 3),
}
for hour in sorted(by_hour)
]
MANIFEST = AgentManifest( MANIFEST = AgentManifest(
id="recent-patterns", id="recent-patterns",
version="1.1.0", # bumped: window_days InferredParam added (#116) version="1.2.0", # #116: lookback_days + weekly_cycle + daily_cycle inference
description="Surfaces the user's reaction pattern from recent feedback.", description="Surfaces the user's reaction pattern from recent feedback.",
pref_schema={ pref_schema={
"type": "object", "type": "object",
"additionalProperties": False, "additionalProperties": False,
"properties": { "properties": {
"window_days": { "lookback_days": {
"type": "integer", "type": "integer",
"minimum": 1, "minimum": 1,
"maximum": 30, "maximum": 30,
"default": 7, "default": 7,
"description": "Lookback window for pattern analysis.", "description": "Lookback window sized to capture ≥30 done events.",
},
"weekly_cycle": {
"type": "array",
"items": {
"type": "object",
"properties": {
"dow": {"type": "integer"},
"strength": {"type": "number"},
"sample": {"type": "string"},
},
},
"default": [],
"description": "Per-DOW completion strength (peak-to-mean ratio).",
},
"daily_cycle": {
"type": "array",
"items": {
"type": "object",
"properties": {
"hour": {"type": "integer"},
"strength": {"type": "number"},
},
},
"default": [],
"description": "Per-hour completion strength (peak-to-mean ratio).",
}, },
}, },
}, },
@@ -46,15 +136,45 @@ MANIFEST = AgentManifest(
ttl_sec=86_400, ttl_sec=86_400,
inferred_params=[ inferred_params=[
InferredParam( InferredParam(
key="window_days", key="lookback_days",
ttl_sec=86_400, # recompute daily alongside snippet ttl_sec=86_400,
cold_start_default=7, cold_start_default=7,
min_history=5, min_history=5,
infer=_infer_window_days, infer=_infer_lookback_days,
),
InferredParam(
key="weekly_cycle",
ttl_sec=86_400,
cold_start_default=[],
min_history=21, # need ≥3 weeks to see a weekly signal
infer=_infer_weekly_cycle,
),
InferredParam(
key="daily_cycle",
ttl_sec=86_400,
cold_start_default=[],
min_history=14,
infer=_infer_daily_cycle,
), ),
], ],
) )
_STRENGTH_THRESHOLD = 0.5
def _strong(entries: list[dict], key: str) -> list[dict]:
return [e for e in entries if e.get("strength", 0) > _STRENGTH_THRESHOLD]
def _hour_label(hour: int) -> str:
if hour == 0:
return "midnight"
if hour < 12:
return f"{hour}am"
if hour == 12:
return "noon"
return f"{hour - 12}pm"
class RecentPatternsAgent(BaseAgent): class RecentPatternsAgent(BaseAgent):
"""Surfaces the user's reaction pattern from recent feedback.""" """Surfaces the user's reaction pattern from recent feedback."""
@@ -63,8 +183,15 @@ class RecentPatternsAgent(BaseAgent):
version: ClassVar[str] = MANIFEST.version version: ClassVar[str] = MANIFEST.version
def compute(self, inp: AgentInput) -> AgentOutput: def compute(self, inp: AgentInput) -> AgentOutput:
window_days = max(1, int(inp.agent_prefs.get("window_days", 7))) # Support legacy window_days pref key for backward compat.
window_s = window_days * 86_400 lookback_days = max(
1,
int(inp.agent_prefs.get("lookback_days", inp.agent_prefs.get("window_days", 7))),
)
weekly_cycle: list[dict] = inp.agent_prefs.get("weekly_cycle", [])
daily_cycle: list[dict] = inp.agent_prefs.get("daily_cycle", [])
window_s = lookback_days * 86_400
now_ts = inp.now.timestamp() now_ts = inp.now.timestamp()
recent = [ recent = [
@@ -76,16 +203,18 @@ class RecentPatternsAgent(BaseAgent):
total = len(recent) total = len(recent)
dwell_ms = inp.profile.get("mean_dwell_ms_30d") dwell_ms = inp.profile.get("mean_dwell_ms_30d")
parts: list[str] = []
if total == 0: if total == 0:
prompt = f"No tip reactions recorded in the last {window_days} days." parts.append(f"No tip reactions recorded in the last {lookback_days} days.")
else: else:
done = counts.get("done", 0) done = counts.get("done", 0)
dismissed = counts.get("dismiss", 0) dismissed = counts.get("dismiss", 0)
snoozed = counts.get("snooze", 0) snoozed = counts.get("snooze", 0)
parts = [ parts.append(
f"Last {window_days} days: {total} tip reaction{'s' if total != 1 else ''}" f"Last {lookback_days} days: {total} tip reaction{'s' if total != 1 else ''}"
f"{done} completed, {dismissed} dismissed, {snoozed} snoozed." f"{done} completed, {dismissed} dismissed, {snoozed} snoozed."
] )
if dwell_ms is not None: if dwell_ms is not None:
dwell_s = round(dwell_ms / 1000) dwell_s = round(dwell_ms / 1000)
if dwell_s < 15: if dwell_s < 15:
@@ -98,13 +227,34 @@ class RecentPatternsAgent(BaseAgent):
parts.append( parts.append(
f"Average dwell {dwell_s}s — user deliberates; prefer tips that reward reflection." f"Average dwell {dwell_s}s — user deliberates; prefer tips that reward reflection."
) )
prompt = " ".join(parts)
# Cycle hints — only when strength > threshold.
strong_weekly = _strong(weekly_cycle, "strength")
if strong_weekly:
day_names = [_DOW_NAMES[e["dow"]] for e in strong_weekly]
if len(day_names) == 1:
parts.append(f"User tends to complete tips on {day_names[0]}s.")
else:
joined = ", ".join(day_names[:-1]) + f" and {day_names[-1]}"
parts.append(f"User tends to complete tips on {joined}s.")
strong_daily = _strong(daily_cycle, "strength")
if strong_daily:
hour_labels = [_hour_label(e["hour"]) for e in strong_daily]
if len(hour_labels) == 1:
parts.append(f"User is most active around {hour_labels[0]}.")
else:
joined = ", ".join(hour_labels[:-1]) + f" and {hour_labels[-1]}"
parts.append(f"User is most active around {joined}.")
prompt = " ".join(parts) if parts else "No engagement data available yet."
snapshot = { snapshot = {
"window_days": window_days, "lookback_days": lookback_days,
"recent_total": total, "recent_total": total,
"action_counts": dict(counts), "action_counts": dict(counts),
"mean_dwell_ms_30d": dwell_ms, "mean_dwell_ms_30d": dwell_ms,
"strong_weekly_days": [e["dow"] for e in strong_weekly],
"strong_daily_hours": [e["hour"] for e in strong_daily],
} }
return self._make_output(inp, prompt, snapshot) return self._make_output(inp, prompt, snapshot)

View File

@@ -153,7 +153,8 @@ class TestTimeOfDayAgent:
def test_snapshot_keys(self): def test_snapshot_keys(self):
out = self.agent.compute(_inp()) out = self.agent.compute(_inp())
assert {"hour", "day_of_week", "preferred_hour", "quiet_start", "quiet_end"} == set(out.signals_snapshot) assert {"hour", "day_of_week", "preferred_hour", "quiet_start", "quiet_end",
"peak_hours", "in_quiet", "in_peak", "tz"} == set(out.signals_snapshot)
# ── RecentPatternsAgent ─────────────────────────────────────────────────────── # ── RecentPatternsAgent ───────────────────────────────────────────────────────
@@ -239,11 +240,13 @@ class TestFocusAreaAgent:
def test_default_project_fallback(self): def test_default_project_fallback(self):
out = self.agent.compute(_inp(tasks=[_task("No project task")])) out = self.agent.compute(_inp(tasks=[_task("No project task")]))
assert "default project" in out.prompt_text # Tasks without project_id fall back to a "Tasks" bucket
assert "Tasks" in out.prompt_text
def test_snapshot_keys(self): def test_snapshot_keys(self):
out = self.agent.compute(_inp(tasks=[_task("T1", project_id="A")])) out = self.agent.compute(_inp(tasks=[_task("T1", project_id="A")]))
assert {"top_project", "top_task_count", "top_overdue_count", "project_count", "preferred_areas"} == set(out.signals_snapshot) assert {"top_cluster_label", "top_task_count", "top_overdue_count", "cluster_count",
"strategy", "preferred_areas"} == set(out.signals_snapshot)
# ── Registry ───────────────────────────────────────────────────────────────── # ── Registry ─────────────────────────────────────────────────────────────────

View File

@@ -0,0 +1,135 @@
"""Unit tests for ml.agents.clustering (issue #97).
Embedding calls are mocked so tests run without Ollama.
"""
from __future__ import annotations
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
from unittest.mock import patch
from ml.agents.clustering import cluster_tasks, Cluster, _greedy_cluster, _cosine
# ── helpers ──────────────────────────────────────────────────────────────────
def _task(content: str, project_id: str | None = None, is_overdue: bool = False) -> dict:
t: dict = {"content": content, "is_overdue": is_overdue}
if project_id:
t["project_id"] = project_id
return t
def _embed_seq(*vecs):
"""Return a side_effect list so successive _embed calls return these vectors."""
return list(vecs)
# ── Cluster dataclass ─────────────────────────────────────────────────────────
class TestCluster:
def test_task_count(self):
c = Cluster(label="X", tasks=[_task("a"), _task("b")])
assert c.task_count == 2
def test_overdue_count(self):
c = Cluster(label="X", tasks=[_task("a", is_overdue=True), _task("b")])
assert c.overdue_count == 1
# ── cosine similarity ─────────────────────────────────────────────────────────
class TestCosine:
def test_identical_vectors(self):
v = [1.0, 0.0, 0.0]
assert _cosine(v, v) == 1.0
def test_orthogonal_vectors(self):
assert _cosine([1.0, 0.0], [0.0, 1.0]) == 0.0
def test_zero_vector(self):
assert _cosine([0.0, 0.0], [1.0, 0.0]) == 0.0
# ── greedy clustering ─────────────────────────────────────────────────────────
class TestGreedyClustering:
def _similar_vec(self, base: list[float], noise: float = 0.01) -> list[float]:
return [x + noise for x in base]
def test_similar_tasks_grouped(self):
v = [1.0, 0.0, 0.0]
v2 = [0.999, 0.001, 0.0]
items = [
(_task("A"), v),
(_task("B"), v2),
]
clusters = _greedy_cluster(items)
assert len(clusters) == 1
assert clusters[0].task_count == 2
def test_dissimilar_tasks_separate(self):
v1 = [1.0, 0.0, 0.0]
v2 = [0.0, 1.0, 0.0]
items = [(_task("A"), v1), (_task("B"), v2)]
clusters = _greedy_cluster(items)
assert len(clusters) == 2
def test_label_from_first_task(self):
v = [1.0, 0.0]
clusters = _greedy_cluster([(_task("Write report"), v)])
assert clusters[0].label == "Write report"
# ── cluster_tasks integration ─────────────────────────────────────────────────
class TestClusterTasks:
def test_empty_tasks(self):
result = cluster_tasks([])
assert result == []
def test_fallback_when_ollama_unavailable(self):
with patch("ml.agents.clustering._embed", return_value=None):
tasks = [_task("A", "p1"), _task("B", "p2"), _task("C", "p1")]
clusters = cluster_tasks(tasks)
assert len(clusters) == 2
labels = {c.label for c in clusters}
assert "p1" in labels and "p2" in labels
def test_fallback_groups_by_project(self):
with patch("ml.agents.clustering._embed", return_value=None):
tasks = [_task("A", "work")] * 3 + [_task("B", "home")] * 2
clusters = cluster_tasks(tasks)
by_label = {c.label: c.task_count for c in clusters}
assert by_label["work"] == 3
assert by_label["home"] == 2
def test_tasks_without_content_go_to_other(self):
v = [1.0, 0.0]
with patch("ml.agents.clustering._embed", return_value=v):
tasks = [_task("Has content"), {"is_overdue": False}]
clusters = cluster_tasks(tasks)
labels = {c.label for c in clusters}
assert "Other tasks" in labels
def test_semantic_clustering_groups_similar(self):
v_work = [1.0, 0.0, 0.0]
v_home = [0.0, 1.0, 0.0]
side_effects = [v_work, v_work, v_home, v_home]
with patch("ml.agents.clustering._embed", side_effect=side_effects):
tasks = [
_task("Write report"),
_task("Review PR"),
_task("Buy groceries"),
_task("Cook dinner"),
]
clusters = cluster_tasks(tasks)
assert len(clusters) == 2
assert all(c.task_count == 2 for c in clusters)
def test_all_tasks_no_content_fallback_by_project(self):
tasks = [{"project_id": "p1", "is_overdue": False},
{"project_id": "p2", "is_overdue": False}]
clusters = cluster_tasks(tasks)
assert len(clusters) == 2

View File

@@ -113,7 +113,7 @@ class TestTimeOfDayAgentWithInference:
assert "peak" in out.prompt_text assert "peak" in out.prompt_text
def test_version_bumped(self): def test_version_bumped(self):
assert MANIFEST.version == "1.1.0" assert MANIFEST.version == "1.2.0"
def test_manifest_has_preferred_hour_param(self): def test_manifest_has_preferred_hour_param(self):
keys = {p.key for p in MANIFEST.inferred_params} keys = {p.key for p in MANIFEST.inferred_params}

View File

@@ -1,5 +1,5 @@
"""Per-agent inference tests: momentum (#114), overdue-task (#115), recent-patterns (#116), """Per-agent inference tests: momentum (#114), overdue-task (#115), recent-patterns (#116),
and focus-area (#113) preferred_areas wiring.""" time-of-day (#112), and focus-area (#113) preferred_areas wiring."""
from __future__ import annotations from __future__ import annotations
import sys, os import sys, os
@@ -8,11 +8,12 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
from datetime import datetime, timezone from datetime import datetime, timezone
import pytest import pytest
from ml.agents.inference.history import FeedbackEvent, UserHistory from ml.agents.inference.history import FeedbackEvent, TaskCompletion, UserHistory
from ml.agents.inference.framework import run_inference from ml.agents.inference.framework import run_inference
from ml.agents.momentum import MomentumAgent, MANIFEST as MOMENTUM_MANIFEST from ml.agents.momentum import MomentumAgent, MANIFEST as MOMENTUM_MANIFEST
from ml.agents.overdue_task import OverdueTaskAgent, MANIFEST as OVERDUE_MANIFEST from ml.agents.overdue_task import OverdueTaskAgent, MANIFEST as OVERDUE_MANIFEST
from ml.agents.recent_patterns import RecentPatternsAgent, MANIFEST as RECENT_MANIFEST from ml.agents.recent_patterns import RecentPatternsAgent, MANIFEST as RECENT_MANIFEST
from ml.agents.time_of_day import TimeOfDayAgent, MANIFEST as TOD_MANIFEST
from ml.agents.focus_area import FocusAreaAgent from ml.agents.focus_area import FocusAreaAgent
from ml.agents.base import AgentInput from ml.agents.base import AgentInput
@@ -32,23 +33,47 @@ def _event(action: str, days_ago: float = 1.0) -> FeedbackEvent:
return FeedbackEvent(action=action, dwell_ms=dwell, created_at=ts) return FeedbackEvent(action=action, dwell_ms=dwell, created_at=ts)
def _history(*events: FeedbackEvent) -> UserHistory: def _history(*events: FeedbackEvent, completions: list[TaskCompletion] | None = None) -> UserHistory:
return UserHistory(user_id="u1", events=list(events)) return UserHistory(user_id="u1", events=list(events), task_completions=completions or [])
# ── momentum: engagement_trend ─────────────────────────────────────────────── def _completion(project_id: str | None, lateness_days: float) -> TaskCompletion:
"""Build a TaskCompletion where completed_at is lateness_days after due_at."""
from datetime import timedelta
due = _NOW - timedelta(days=30)
completed = due + timedelta(days=lateness_days)
return TaskCompletion(
project_id=project_id,
completed_at=completed.isoformat(),
due_at=due.isoformat(),
)
class TestMomentumInference:
# ── momentum helpers ─────────────────────────────────────────────────────────
def _neutral_prefs(**extra) -> dict:
"""Prefs that put z-score in the normal range so trend label can show."""
return {"baseline_completions_per_day": 0.0, "stdev": 1.0, "momentum_window": 7, **extra}
def _feedback_done(n: int, days_ago: float = 1.0) -> list[dict]:
from datetime import timedelta
ts = (_NOW - timedelta(days=days_ago)).isoformat()
return [{"action": "done", "dwell_ms": 60_000, "created_at": ts}] * n
# ── momentum: engagement_trend inference ─────────────────────────────────────
class TestMomentumTrendInference:
def test_cold_start_below_min_history(self): def test_cold_start_below_min_history(self):
history = _history(*[_event("done", days_ago=i) for i in range(5)]) history = _history(*[_event("done", days_ago=i) for i in range(5)])
result = run_inference(MOMENTUM_MANIFEST, history) result = run_inference(MOMENTUM_MANIFEST, history)
assert result["engagement_trend"] == "stable" # cold_start_default assert result["engagement_trend"] == "stable" # cold_start_default
def test_trend_up_when_recent_done_rate_higher(self): def test_trend_up_when_recent_done_rate_higher(self):
# 8 done in last 7 days, 1 done in prior 7 days → trending up
recent = [_event("done", days_ago=i) for i in range(1, 9)] recent = [_event("done", days_ago=i) for i in range(1, 9)]
older = [_event("dismiss", days_ago=i) for i in range(8, 15)] older = [_event("dismiss", days_ago=i) for i in range(8, 15)]
older[0] = _event("done", days_ago=8) # one done in older window older[0] = _event("done", days_ago=8)
history = _history(*recent, *older) history = _history(*recent, *older)
result = run_inference(MOMENTUM_MANIFEST, history) result = run_inference(MOMENTUM_MANIFEST, history)
assert result["engagement_trend"] == "up" assert result["engagement_trend"] == "up"
@@ -66,113 +91,540 @@ class TestMomentumInference:
result = run_inference(MOMENTUM_MANIFEST, history) result = run_inference(MOMENTUM_MANIFEST, history)
assert result["engagement_trend"] == "stable" assert result["engagement_trend"] == "stable"
def test_agent_uses_trend_in_snippet(self): def test_trend_shown_when_z_score_normal(self):
out = MomentumAgent().compute(_inp(agent_prefs={"engagement_trend": "up"})) # baseline=0 so z≈0 → no z label → trend label falls through
out = MomentumAgent().compute(_inp(agent_prefs=_neutral_prefs(engagement_trend="up")))
assert "trending up" in out.prompt_text assert "trending up" in out.prompt_text
def test_agent_uses_down_trend_in_snippet(self): def test_trend_down_shown_when_z_score_normal(self):
out = MomentumAgent().compute(_inp(agent_prefs={"engagement_trend": "down"})) out = MomentumAgent().compute(_inp(agent_prefs=_neutral_prefs(engagement_trend="down")))
assert "trending down" in out.prompt_text assert "trending down" in out.prompt_text
def test_snapshot_includes_trend(self): def test_snapshot_includes_trend(self):
out = MomentumAgent().compute(_inp(agent_prefs={"engagement_trend": "stable"})) out = MomentumAgent().compute(_inp(agent_prefs=_neutral_prefs(engagement_trend="stable")))
assert "engagement_trend" in out.signals_snapshot assert "engagement_trend" in out.signals_snapshot
# ── momentum: baseline + stdev inference (#114) ───────────────────────────────
class TestMomentumBaselineInference:
def _events_n_per_day(self, done_per_day: int, n_days: int) -> list[FeedbackEvent]:
"""Generate done events spread across n_days."""
events = []
for d in range(n_days):
for _ in range(done_per_day):
events.append(_event("done", days_ago=d + 0.5))
return events
def test_cold_start_when_few_events(self):
history = _history(*[_event("done", days_ago=i) for i in range(5)])
result = run_inference(MOMENTUM_MANIFEST, history)
assert result["baseline_completions_per_day"] == 1.0
assert result["stdev"] == 1.0
def test_power_user_baseline_high(self):
# 5 done events per day for 20 days → baseline ≈ 5/day (over 28d window, zeros fill rest)
events = self._events_n_per_day(5, 20)
history = _history(*events)
result = run_inference(MOMENTUM_MANIFEST, history)
assert result["baseline_completions_per_day"] > 2.0
def test_casual_user_baseline_low(self):
# 1 done every 3 days + dismiss filler to clear min_history=14 → baseline ≈ 0.33/day
done_events = [_event("done", days_ago=d * 3 + 0.5) for d in range(7)]
filler = [_event("dismiss", days_ago=d + 0.5) for d in range(10)]
history = _history(*done_events, *filler)
result = run_inference(MOMENTUM_MANIFEST, history)
assert result["baseline_completions_per_day"] < 0.5
def test_stdev_reflects_variability(self):
# Alternating 0 and 4 done events → high stdev
events = []
for d in range(14):
if d % 2 == 0:
for _ in range(4):
events.append(_event("done", days_ago=d + 0.5))
history = _history(*events)
result = run_inference(MOMENTUM_MANIFEST, history)
assert result["stdev"] > 1.0
def test_consistent_user_lower_stdev_than_variable(self):
# Consistent 2/day for 28 days has lower stdev than alternating 0/4
consistent = self._events_n_per_day(2, 28)
variable = []
for d in range(14):
if d % 2 == 0:
for _ in range(4):
variable.append(_event("done", days_ago=d + 0.5))
else:
variable.append(_event("dismiss", days_ago=d + 0.5))
r_consistent = run_inference(MOMENTUM_MANIFEST, _history(*consistent))
r_variable = run_inference(MOMENTUM_MANIFEST, _history(*variable))
assert r_consistent["stdev"] < r_variable["stdev"]
# ── momentum: z-score snippet language ───────────────────────────────────────
class TestMomentumZScore:
def _prefs(self, baseline: float, stdev: float = 1.0) -> dict:
return {"baseline_completions_per_day": baseline, "stdev": stdev,
"momentum_window": 7, "engagement_trend": "stable"}
def test_power_user_above_baseline_says_above_usual(self):
# baseline=3/day, stdev=1.0, window=7 → expected rate=3; user did 35 → rate=5, z=2
prefs = self._prefs(baseline=3.0, stdev=1.0)
feedback = _feedback_done(35, days_ago=1.0)
out = MomentumAgent().compute(_inp(feedback_history=feedback, agent_prefs=prefs))
assert "above your usual" in out.prompt_text
def test_casual_user_slowing_down(self):
# baseline=1/day, user did 0 in 7d → z = (0 - 1) / 1 = -1 → below usual
prefs = self._prefs(baseline=1.0, stdev=1.0)
out = MomentumAgent().compute(_inp(feedback_history=[], agent_prefs=prefs))
assert "below your usual" in out.prompt_text
def test_returning_from_break_at_normal_rate(self):
# User just came back: 1 done, baseline=1/day, window=7 → z=(1/7-1)/1≈-0.86, within normal
prefs = self._prefs(baseline=1.0, stdev=1.0)
feedback = _feedback_done(1, days_ago=0.5)
out = MomentumAgent().compute(_inp(feedback_history=feedback, agent_prefs=prefs))
# z ≈ -0.86 → no z label, falls back to trend (stable → no extra sentence)
assert "above your usual" not in out.prompt_text
assert "below your usual" not in out.prompt_text
def test_snapshot_includes_z_score(self):
prefs = self._prefs(baseline=1.0)
out = MomentumAgent().compute(_inp(agent_prefs=prefs))
assert "z_score" in out.signals_snapshot
assert "recent_done_count" in out.signals_snapshot
def test_version_bumped(self): def test_version_bumped(self):
assert MOMENTUM_MANIFEST.version == "1.1.0" assert MOMENTUM_MANIFEST.version == "1.2.0"
# ── overdue-task: lateness_tolerance_days ──────────────────────────────────── # ── overdue-task: lateness_tolerance_days + project_realness (#115) ──────────
class TestOverdueTaskInference: class TestOverdueTaskInference:
def test_cold_start_returns_zero(self): # -- lateness_tolerance_days inference --
history = _history(*[_event("done") for _ in range(5)])
result = run_inference(OVERDUE_MANIFEST, history)
assert result["lateness_tolerance_days"] == 0
def test_high_snooze_rate_returns_two(self): def test_cold_start_returns_zero_when_few_completions(self):
events = [_event("snooze")] * 8 + [_event("done")] * 2 # Below min_history=10 task completions → cold start
history = _history(*events) cs = [_completion("p1", 2.0) for _ in range(5)]
history = _history(*[_event("done")] * 5, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history) result = run_inference(OVERDUE_MANIFEST, history)
assert result["lateness_tolerance_days"] == 2 assert result["lateness_tolerance_days"] == 0.0
def test_moderate_snooze_returns_one(self): def test_punctual_user_zero_tolerance(self):
events = [_event("snooze")] * 3 + [_event("done")] * 7 # User always finishes early or on time (negative lateness) → tolerance 0
history = _history(*events) cs = [_completion("p1", -1.0) for _ in range(12)]
history = _history(*[_event("done")] * 12, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history) result = run_inference(OVERDUE_MANIFEST, history)
assert result["lateness_tolerance_days"] == 1 assert result["lateness_tolerance_days"] == 0.0
def test_low_snooze_returns_zero(self): def test_chronic_late_user_positive_tolerance(self):
events = [_event("done")] * 9 + [_event("snooze")] * 1 # User consistently finishes 5 days late → p50 = 5
history = _history(*events) cs = [_completion("p1", 5.0) for _ in range(12)]
history = _history(*[_event("done")] * 12, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history) result = run_inference(OVERDUE_MANIFEST, history)
assert result["lateness_tolerance_days"] == 0 assert result["lateness_tolerance_days"] == pytest.approx(5.0)
def test_mixed_lateness_uses_median(self):
# 6 tasks at +1d, 6 tasks at +3d → median = 2
cs = [_completion("p1", 1.0)] * 6 + [_completion("p1", 3.0)] * 6
history = _history(*[_event("done")] * 12, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history)
assert result["lateness_tolerance_days"] == pytest.approx(2.0)
# -- project_realness inference --
def test_project_realness_cold_start_empty(self):
cs = [_completion("p1", 1.0) for _ in range(5)] # below min_history
history = _history(*[_event("done")] * 5, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history)
assert result["project_realness"] == {}
def test_project_realness_punctual_project_scores_high(self):
# p1 always on time (0d late), p2 always 10d late → p1 should be realness ≈ 1
cs = [_completion("p1", 0.0)] * 6 + [_completion("p2", 10.0)] * 6
history = _history(*[_event("done")] * 12, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history)
assert result["project_realness"]["p1"] > result["project_realness"]["p2"]
def test_project_realness_values_clipped_01(self):
cs = [_completion("p1", 0.0)] * 6 + [_completion("p2", 100.0)] * 6
history = _history(*[_event("done")] * 12, completions=cs)
result = run_inference(OVERDUE_MANIFEST, history)
for v in result["project_realness"].values():
assert 0.0 <= v <= 1.0
# -- compute() reads inferred prefs --
def test_tolerance_filters_tasks(self): def test_tolerance_filters_tasks(self):
tasks = [ tasks = [
{"content": "Fresh overdue", "is_overdue": True, "task_age_days": 0.5}, {"content": "Fresh overdue", "is_overdue": True, "task_age_days": 0.5},
{"content": "Old overdue", "is_overdue": True, "task_age_days": 3.0}, {"content": "Old overdue", "is_overdue": True, "task_age_days": 3.0},
] ]
# tolerance=2 → only the 3-day task should count
out = OverdueTaskAgent().compute(_inp(tasks=tasks, agent_prefs={"lateness_tolerance_days": 2})) out = OverdueTaskAgent().compute(_inp(tasks=tasks, agent_prefs={"lateness_tolerance_days": 2}))
assert "1 overdue task" in out.prompt_text assert "1 overdue task" in out.prompt_text
assert "Old overdue" in out.prompt_text assert "Old overdue" in out.prompt_text
def test_snapshot_includes_tolerance(self): def test_low_realness_softens_language(self):
tasks = [{"content": "T", "is_overdue": True, "task_age_days": 1.0}] tasks = [{"content": "Wishlist", "is_overdue": True, "task_age_days": 3.0,
out = OverdueTaskAgent().compute(_inp(tasks=tasks, agent_prefs={"lateness_tolerance_days": 0})) "project_id": "aspirational"}]
assert "lateness_tolerance_days" in out.signals_snapshot prefs = {"lateness_tolerance_days": 0, "project_realness": {"aspirational": 0.2}}
out = OverdueTaskAgent().compute(_inp(tasks=tasks, agent_prefs=prefs))
assert "target date" in out.prompt_text
def test_high_realness_uses_overdue_language(self):
tasks = [{"content": "Critical", "is_overdue": True, "task_age_days": 3.0,
"project_id": "work"}]
prefs = {"lateness_tolerance_days": 0, "project_realness": {"work": 0.9}}
out = OverdueTaskAgent().compute(_inp(tasks=tasks, agent_prefs=prefs))
assert "overdue" in out.prompt_text
def test_snapshot_includes_realness(self):
tasks = [{"content": "T", "is_overdue": True, "task_age_days": 1.0, "project_id": "p1"}]
prefs = {"lateness_tolerance_days": 0, "project_realness": {"p1": 0.8}}
out = OverdueTaskAgent().compute(_inp(tasks=tasks, agent_prefs=prefs))
assert "realness" in out.signals_snapshot["top_overdue"][0]
def test_version_bumped(self): def test_version_bumped(self):
assert OVERDUE_MANIFEST.version == "1.1.0" assert OVERDUE_MANIFEST.version == "1.2.0"
# ── recent-patterns: window_days ───────────────────────────────────────────── # ── recent-patterns: lookback_days + weekly_cycle + daily_cycle (#116) ────────
class TestRecentPatternsInference: def _done_at(days_ago: float, hour: int = 10) -> FeedbackEvent:
def test_cold_start_default_7(self): """Done event at a specific hour, N days ago."""
history = _history(*[_event("done") for _ in range(3)]) # below min_history=5 from datetime import timedelta
ts = (_NOW - timedelta(days=days_ago)).replace(hour=hour, minute=0, second=0, microsecond=0)
return FeedbackEvent(action="done", dwell_ms=60_000, created_at=ts.isoformat())
class TestRecentPatternsLookbackInference:
def test_cold_start_below_min_history(self):
history = _history(*[_event("done") for _ in range(3)])
result = run_inference(RECENT_MANIFEST, history) result = run_inference(RECENT_MANIFEST, history)
assert result["window_days"] == 7 # cold_start_default assert result["lookback_days"] == 7 # cold_start_default
def test_sparse_history_widens_window(self): def test_sparse_done_history_returns_30(self):
history = _history(*[_event("done") for _ in range(5)]) # 5 events, n < 7 → 30 days # Only 10 done events → fewer than 30 → returns cap of 30
history = _history(*[_event("done") for _ in range(10)])
result = run_inference(RECENT_MANIFEST, history) result = run_inference(RECENT_MANIFEST, history)
assert result["window_days"] == 30 assert result["lookback_days"] == 30
def test_moderate_history_14_days(self): def test_dense_done_history_returns_short_window(self):
history = _history(*[_event("done") for _ in range(10)]) # 7 ≤ n < 14 → 14 days # 30 done events all within the last 2 days → lookback_days = 1 or 2
events = [_event("done", days_ago=i * 0.05) for i in range(30)]
history = _history(*events)
result = run_inference(RECENT_MANIFEST, history) result = run_inference(RECENT_MANIFEST, history)
assert result["window_days"] == 14 assert result["lookback_days"] <= 2
def test_dense_history_stays_7(self): def test_spread_history_spans_window_correctly(self):
history = _history(*[_event("done") for _ in range(20)]) # 20+ → 7 days # 30 done events spread over 15 days (1 per 0.5d) → window should be ≈15
events = [_event("done", days_ago=i * 0.5) for i in range(30)]
history = _history(*events)
result = run_inference(RECENT_MANIFEST, history) result = run_inference(RECENT_MANIFEST, history)
assert result["window_days"] == 7 assert result["lookback_days"] <= 16
def test_agent_uses_window_days_pref(self): def test_agent_respects_lookback_days_pref(self):
from datetime import timedelta from datetime import timedelta
# 5 feedback events, all within 14 days but older than 7 days
feedback = [ feedback = [
{"action": "done", "dwell_ms": 60000, {"action": "done", "dwell_ms": 60000,
"created_at": (_NOW - timedelta(days=10)).isoformat()} "created_at": (_NOW - timedelta(days=10)).isoformat()}
] * 5 ] * 5
# With window_days=7 → 0 events seen; with window_days=14 → 5 events
out_narrow = RecentPatternsAgent().compute( out_narrow = RecentPatternsAgent().compute(
_inp(feedback_history=feedback, agent_prefs={"window_days": 7}) _inp(feedback_history=feedback, agent_prefs={"lookback_days": 7})
) )
out_wide = RecentPatternsAgent().compute( out_wide = RecentPatternsAgent().compute(
_inp(feedback_history=feedback, agent_prefs={"window_days": 14}) _inp(feedback_history=feedback, agent_prefs={"lookback_days": 14})
) )
assert "No tip reactions" in out_narrow.prompt_text assert "No tip reactions" in out_narrow.prompt_text
assert "5 tip reactions" in out_wide.prompt_text assert "5 tip reactions" in out_wide.prompt_text
def test_snapshot_includes_window_days(self): def test_legacy_window_days_pref_still_works(self):
out = RecentPatternsAgent().compute(_inp(agent_prefs={"window_days": 14})) from datetime import timedelta
assert out.signals_snapshot["window_days"] == 14 feedback = [
{"action": "done", "dwell_ms": 60000,
"created_at": (_NOW - timedelta(days=10)).isoformat()}
] * 5
out = RecentPatternsAgent().compute(
_inp(feedback_history=feedback, agent_prefs={"window_days": 14})
)
assert "5 tip reactions" in out.prompt_text
def test_snapshot_includes_lookback_days(self):
out = RecentPatternsAgent().compute(_inp(agent_prefs={"lookback_days": 14}))
assert out.signals_snapshot["lookback_days"] == 14
class TestRecentPatternsWeeklyCycle:
def test_cold_start_returns_empty(self):
history = _history(*[_event("done") for _ in range(5)]) # below min_history=21
result = run_inference(RECENT_MANIFEST, history)
assert result["weekly_cycle"] == []
def _events_on_dow(self, target_dow: int, count: int, n_weeks: int = 4) -> list[FeedbackEvent]:
"""Generate `count` done events per week on `target_dow` (0=Mon…6=Sun).
_NOW is Thursday (weekday=3). days_back = (now_dow - target_dow) % 7
gives the offset to the most recent occurrence of target_dow.
"""
now_dow = _NOW.weekday() # 3 = Thursday
days_back = (now_dow - target_dow) % 7
if days_back == 0:
days_back = 7 # avoid "today" — use the previous occurrence
events = []
for week in range(n_weeks):
offset = days_back + week * 7
for _ in range(count):
events.append(_done_at(offset + 0.1, hour=11))
return events
def _weekend_warrior_history(self) -> UserHistory:
"""Many done events on Sat/Sun (dow 5 & 6), few on Tuesday (dow 1)."""
events = []
events += self._events_on_dow(5, count=5) # Saturday
events += self._events_on_dow(6, count=5) # Sunday
events += self._events_on_dow(1, count=1) # Tuesday — one per week
return _history(*events)
def test_weekend_warrior_strong_on_weekends(self):
history = self._weekend_warrior_history()
result = run_inference(RECENT_MANIFEST, history)
by_dow = {e["dow"]: e["strength"] for e in result["weekly_cycle"]}
assert by_dow.get(5, 0) > 1.0 # Saturday
assert by_dow.get(6, 0) > 1.0 # Sunday
def test_weekday_only_low_weekend_strength(self):
events = []
for dow in range(5): # MondayFriday
events += self._events_on_dow(dow, count=3)
# Saturday (5) and Sunday (6) get zero events
history = _history(*events)
result = run_inference(RECENT_MANIFEST, history)
by_dow = {e["dow"]: e["strength"] for e in result["weekly_cycle"]}
assert by_dow.get(5, 0) == 0.0 # Saturday
assert by_dow.get(6, 0) == 0.0 # Sunday
def test_snippet_includes_cycle_hint_when_strong(self):
# Inject a strong weekly_cycle pref directly
prefs = {
"lookback_days": 7,
"weekly_cycle": [{"dow": 1, "strength": 2.0, "sample": "completes most Tuesdays"}],
"daily_cycle": [],
}
out = RecentPatternsAgent().compute(_inp(agent_prefs=prefs))
assert "Tuesday" in out.prompt_text
def test_snippet_omits_cycle_hint_when_weak(self):
prefs = {
"lookback_days": 7,
"weekly_cycle": [{"dow": 1, "strength": 0.3, "sample": "completes most Tuesdays"}],
"daily_cycle": [],
}
out = RecentPatternsAgent().compute(_inp(agent_prefs=prefs))
assert "Tuesday" not in out.prompt_text
class TestRecentPatternsDailyCycle:
def test_cold_start_returns_empty(self):
history = _history(*[_event("done") for _ in range(5)]) # below min_history=14
result = run_inference(RECENT_MANIFEST, history)
assert result["daily_cycle"] == []
def _evening_person_history(self) -> UserHistory:
"""Many done events at 20:0021:00, few in the morning."""
events = []
for d in range(20):
for _ in range(4):
events.append(_done_at(d + 0.5, hour=20))
events.append(_done_at(d + 0.5, hour=9))
return _history(*events)
def test_evening_person_strong_at_evening_hours(self):
history = self._evening_person_history()
result = run_inference(RECENT_MANIFEST, history)
by_hour = {e["hour"]: e["strength"] for e in result["daily_cycle"]}
assert by_hour.get(20, 0) > 1.0
assert by_hour.get(9, 0) < by_hour.get(20, 0)
def test_snippet_includes_daily_hint_when_strong(self):
prefs = {
"lookback_days": 7,
"weekly_cycle": [],
"daily_cycle": [{"hour": 20, "strength": 3.0}],
}
out = RecentPatternsAgent().compute(_inp(agent_prefs=prefs))
assert "8pm" in out.prompt_text
def test_snippet_omits_daily_hint_when_weak(self):
prefs = {
"lookback_days": 7,
"weekly_cycle": [],
"daily_cycle": [{"hour": 20, "strength": 0.4}],
}
out = RecentPatternsAgent().compute(_inp(agent_prefs=prefs))
assert "8pm" not in out.prompt_text
def test_no_pattern_user_no_hints(self):
# Uniform distribution across all hours → strength ≈ 1.0 everywhere → no strong peaks
events = [_done_at(d + 0.5, hour=h) for d in range(3) for h in range(24)]
history = _history(*events)
result = run_inference(RECENT_MANIFEST, history)
strong = [e for e in result["daily_cycle"] if e["strength"] > 0.5]
# Uniform distribution → all strengths ≈ 1.0; but none dramatically above threshold
# Since strength = count/mean and all counts are equal, all = 1.0 exactly
# 1.0 is not > 0.5 threshold in snippet rendering, but IS > 0.5 so they'd show.
# For a flat distribution the caller sees no meaningful peak — verify no strength > 2
assert all(e["strength"] <= 1.1 for e in result["daily_cycle"])
def test_version_bumped(self): def test_version_bumped(self):
assert RECENT_MANIFEST.version == "1.1.0" assert RECENT_MANIFEST.version == "1.2.0"
# ── time-of-day: quiet_start/end + peak_hours inference (#112) ───────────────
def _tod_event(action: str, hour: int, days_ago: float = 1.0) -> FeedbackEvent:
"""Feedback event at a specific hour N days ago."""
from datetime import timedelta
dt = (_NOW - timedelta(days=days_ago)).replace(hour=hour, minute=0, second=0, microsecond=0)
return FeedbackEvent(action=action, dwell_ms=60_000, created_at=dt.isoformat())
def _tod_history(*events: FeedbackEvent) -> UserHistory:
return UserHistory(user_id="u1", events=list(events))
class TestTimeOfDayQuietWindow:
def test_cold_start_below_min_history(self):
history = _tod_history(*[_tod_event("done", 10) for _ in range(10)])
result = run_inference(TOD_MANIFEST, history)
assert result["quiet_start"] == "22:00"
assert result["quiet_end"] == "07:00"
def _night_owl_history(self) -> UserHistory:
"""Active 20:0023:00, quiet 02:0014:00."""
events = []
for d in range(10):
for h in [20, 21, 22, 23, 0, 1]:
events.append(_tod_event("done", h, days_ago=d + 0.5))
# Sparse during day
events.append(_tod_event("done", 15, days_ago=d + 0.5))
return _tod_history(*events)
def _early_bird_history(self) -> UserHistory:
"""Active 06:0010:00, quiet 21:0005:00."""
events = []
for d in range(10):
for h in [6, 7, 8, 9, 10]:
events.append(_tod_event("done", h, days_ago=d + 0.5))
events.append(_tod_event("done", 14, days_ago=d + 0.5))
return _tod_history(*events)
def test_early_bird_quiet_in_evening(self):
history = self._early_bird_history()
result = run_inference(TOD_MANIFEST, history)
# Quiet window should be in the evening/night range
start_h = int(result["quiet_start"].split(":")[0])
end_h = int(result["quiet_end"].split(":")[0])
# Quiet window spans from some evening hour into morning
assert start_h >= 18 or end_h <= 10 # covers night
def test_quiet_window_wraps_midnight(self):
# Night owl: heavy activity in evening, quiet 02:0014:00
history = self._night_owl_history()
result = run_inference(TOD_MANIFEST, history)
start_h = int(result["quiet_start"].split(":")[0])
end_h = int(result["quiet_end"].split(":")[0])
# The quiet window should span across midnight or be in daylight
# (start > end means wraps midnight)
is_wrapping = start_h > end_h
is_daytime = 2 <= start_h <= 14
assert is_wrapping or is_daytime
def test_format_is_hhmm(self):
history = self._early_bird_history()
result = run_inference(TOD_MANIFEST, history)
import re
assert re.match(r"^\d{2}:00$", result["quiet_start"])
assert re.match(r"^\d{2}:00$", result["quiet_end"])
class TestTimeOfDayPeakHours:
def _evening_person_history(self, n: int = 60) -> UserHistory:
"""Heavy done events at 19:00 and 20:00, light elsewhere."""
events = []
for i in range(n):
events.append(_tod_event("done", 19, days_ago=i * 0.5))
events.append(_tod_event("done", 20, days_ago=i * 0.5))
events.append(_tod_event("done", 10, days_ago=i * 0.5)) # low volume
return _tod_history(*events)
def test_cold_start_returns_default(self):
history = _tod_history(*[_tod_event("done", 10) for _ in range(5)])
result = run_inference(TOD_MANIFEST, history)
assert result["peak_hours"] == [9, 14, 20]
def test_evening_person_peak_hours_in_evening(self):
history = self._evening_person_history()
result = run_inference(TOD_MANIFEST, history)
assert 19 in result["peak_hours"] or 20 in result["peak_hours"]
def test_peak_hours_sorted(self):
history = self._evening_person_history()
result = run_inference(TOD_MANIFEST, history)
assert result["peak_hours"] == sorted(result["peak_hours"])
def test_shift_worker_peaks_at_unusual_hours(self):
"""Shift worker active at 02:00 and 03:00."""
events = [_tod_event("done", h, days_ago=i * 0.5)
for i in range(30) for h in [2, 3]]
events += [_tod_event("done", 14, days_ago=i * 0.5) for i in range(5)]
history = _tod_history(*events)
result = run_inference(TOD_MANIFEST, history)
assert 2 in result["peak_hours"] or 3 in result["peak_hours"]
class TestTimeOfDaySnippet:
agent = TimeOfDayAgent()
def _inp_at(self, hour: int, **prefs) -> AgentInput:
from datetime import timedelta
now = _NOW.replace(hour=hour)
return _inp(now=now, agent_prefs=prefs)
def test_in_peak_hour_says_peak(self):
out = self.agent.compute(self._inp_at(20, peak_hours=[20]))
assert "peak productivity hour" in out.prompt_text
def test_approaching_peak_says_approaching(self):
out = self.agent.compute(self._inp_at(18, peak_hours=[20]))
assert "approaching" in out.prompt_text.lower()
def test_quiet_window_overrides_peak(self):
# Even if hour is in peak_hours, quiet window wins
out = self.agent.compute(
self._inp_at(23, quiet_start="22:00", quiet_end="07:00", peak_hours=[23])
)
assert "quiet window" in out.prompt_text
def test_tz_shown_when_not_utc(self):
out = self.agent.compute(self._inp_at(10, tz="Europe/Moscow"))
assert "Europe/Moscow" in out.prompt_text
def test_snapshot_includes_peak_and_quiet(self):
out = self.agent.compute(self._inp_at(10, peak_hours=[10], quiet_start="22:00", quiet_end="07:00"))
assert "peak_hours" in out.signals_snapshot
assert "in_quiet" in out.signals_snapshot
assert "in_peak" in out.signals_snapshot
def test_version_bumped(self):
assert TOD_MANIFEST.version == "1.2.0"
def test_manifest_has_new_params(self):
keys = {p.key for p in TOD_MANIFEST.inferred_params}
assert {"quiet_start", "quiet_end", "peak_hours", "tz"}.issubset(keys)
# ── focus-area: preferred_areas wiring ─────────────────────────────────────── # ── focus-area: preferred_areas wiring ───────────────────────────────────────
@@ -210,4 +662,51 @@ class TestFocusAreaPreferredAreas:
def test_version_bumped(self): def test_version_bumped(self):
from ml.agents.focus_area import MANIFEST as FA_MANIFEST from ml.agents.focus_area import MANIFEST as FA_MANIFEST
assert FA_MANIFEST.version == "1.1.0" assert FA_MANIFEST.version == "2.0.0"
def test_snapshot_uses_cluster_keys(self):
tasks = [self._task("T", "work")]
out = self.agent.compute(_inp(tasks=tasks))
assert "top_cluster_label" in out.signals_snapshot
assert "cluster_count" in out.signals_snapshot
assert "strategy" in out.signals_snapshot
# ── focus-area: preferred_areas inference from task_completions (#113) ────────
class TestFocusAreaPreferredAreasInference:
from ml.agents.focus_area import MANIFEST as _FA_MANIFEST
def _completion(self, project_id: str) -> TaskCompletion:
return _completion(project_id, lateness_days=0.0)
def test_cold_start_no_completions(self):
history = _history(completions=[])
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == []
def test_top_two_projects_returned(self):
completions = (
[_completion("p1", 0)] * 8
+ [_completion("p2", 0)] * 5
+ [_completion("p3", 0)] * 2
)
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["p1", "p2"]
def test_single_project_returns_one(self):
completions = [_completion("work", 0)] * 6
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["work"]
def test_none_project_id_ignored(self):
completions = [_completion(None, 0)] * 5 + [_completion("real", 0)] * 3
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["real"]

View File

@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import statistics
from collections import Counter from collections import Counter
from typing import ClassVar from typing import ClassVar
@@ -9,6 +10,9 @@ from .manifest import AgentManifest, InferredParam
_DOW_NAMES = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] _DOW_NAMES = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
# min_history required before quiet/peak inference is meaningful (issue #112)
_MIN_HISTORY = 50
def _infer_preferred_hour(history: UserHistory) -> int: def _infer_preferred_hour(history: UserHistory) -> int:
"""Mode hour of day across all 'done' feedback events; falls back to 9.""" """Mode hour of day across all 'done' feedback events; falls back to 9."""
@@ -18,9 +22,75 @@ def _infer_preferred_hour(history: UserHistory) -> int:
return Counter(done_hours).most_common(1)[0][0] return Counter(done_hours).most_common(1)[0][0]
def _quiet_window_hours(history: UserHistory) -> tuple[int, int]:
"""Return (start_hour, end_hour) of the longest below-baseline quiet window.
Counts all engagement events by hour. Baseline = mean hourly count.
Finds the longest contiguous run of below-baseline hours on the circular
clock; that run defines the quiet window.
"""
by_hour: Counter[int] = Counter(e.hour for e in history.events)
total = sum(by_hour.values())
baseline = total / 24
# Mark each of the 24 hours as below-baseline (True = quiet)
quiet: list[bool] = [by_hour.get(h, 0) < baseline for h in range(24)]
# Find longest contiguous run in circular array
best_start, best_len = 0, 0
run_start, run_len = 0, 0
# Double the sequence to handle wrap-around
for i in range(48):
h = i % 24
if quiet[h]:
if run_len == 0:
run_start = i
run_len += 1
if run_len > best_len:
best_len = run_len
best_start = run_start
else:
run_len = 0
if best_len == 0:
return (22, 7) # fallback
start = best_start % 24
end = (best_start + best_len) % 24
return (start, end)
def _infer_quiet_start(history: UserHistory) -> str:
start, _ = _quiet_window_hours(history)
return f"{start:02d}:00"
def _infer_quiet_end(history: UserHistory) -> str:
_, end = _quiet_window_hours(history)
return f"{end:02d}:00"
def _infer_peak_hours(history: UserHistory) -> list[int]:
"""Top-quartile hours by done-event count.
Computes done_count per hour, then returns hours above the 75th percentile
of non-zero hourly counts, sorted ascending.
"""
done_by_hour: Counter[int] = Counter(
e.hour for e in history.events if e.action == "done"
)
if not done_by_hour:
return [9, 14, 20]
counts = list(done_by_hour.values())
threshold = statistics.quantiles(counts, n=4)[-1] # 75th percentile
return sorted(h for h, c in done_by_hour.items() if c >= threshold)
MANIFEST = AgentManifest( MANIFEST = AgentManifest(
id="time-of-day", id="time-of-day",
version="1.1.0", # bumped: inferred_params added (ADR-0014 §3, #112) version="1.2.0", # #112: quiet_start/end + peak_hours + tz inference
description="Frames the current moment relative to the user's productive peak and quiet hours.", description="Frames the current moment relative to the user's productive peak and quiet hours.",
pref_schema={ pref_schema={
"type": "object", "type": "object",
@@ -36,6 +106,23 @@ MANIFEST = AgentManifest(
"pattern": "^([01][0-9]|2[0-3]):[0-5][0-9]$", "pattern": "^([01][0-9]|2[0-3]):[0-5][0-9]$",
"description": "HH:MM end of quiet hours.", "description": "HH:MM end of quiet hours.",
}, },
"peak_hours": {
"type": "array",
"items": {"type": "integer", "minimum": 0, "maximum": 23},
"default": [9, 14, 20],
"description": "Hours (023) with top-quartile completion density.",
},
"tz": {
"type": "string",
"default": "UTC",
"description": "IANA timezone; populated from auth provider, fallback UTC.",
},
"preferred_hour": {
"type": "integer",
"minimum": 0,
"maximum": 23,
"description": "Mode done-hour (legacy; superseded by peak_hours).",
},
}, },
}, },
context_schema=["profile.features"], context_schema=["profile.features"],
@@ -45,11 +132,40 @@ MANIFEST = AgentManifest(
inferred_params=[ inferred_params=[
InferredParam( InferredParam(
key="preferred_hour", key="preferred_hour",
ttl_sec=3_600, # recompute hourly ttl_sec=3_600,
cold_start_default=None, cold_start_default=None,
min_history=10, # need at least 10 feedback events to be meaningful min_history=10,
infer=_infer_preferred_hour, infer=_infer_preferred_hour,
), ),
InferredParam(
key="quiet_start",
ttl_sec=86_400,
cold_start_default="22:00",
min_history=_MIN_HISTORY,
infer=_infer_quiet_start,
),
InferredParam(
key="quiet_end",
ttl_sec=86_400,
cold_start_default="07:00",
min_history=_MIN_HISTORY,
infer=_infer_quiet_end,
),
InferredParam(
key="peak_hours",
ttl_sec=86_400,
cold_start_default=[9, 14, 20],
min_history=_MIN_HISTORY,
infer=_infer_peak_hours,
),
# tz is populated from the auth provider; no infer function.
InferredParam(
key="tz",
ttl_sec=86_400,
cold_start_default="UTC",
min_history=999_999, # effectively never inferred — always cold_start
infer=None,
),
], ],
) )
@@ -62,18 +178,23 @@ class TimeOfDayAgent(BaseAgent):
def compute(self, inp: AgentInput) -> AgentOutput: def compute(self, inp: AgentInput) -> AgentOutput:
hour = inp.now.hour hour = inp.now.hour
dow = inp.now.weekday() # 0=Monday … 6=Sunday dow = inp.now.weekday()
is_weekend = dow >= 5 is_weekend = dow >= 5
# agent_prefs (inferred or user-set) take precedence over ML profile features.
preferred_raw = inp.agent_prefs.get("preferred_hour", inp.profile.get("preferred_hour")) preferred_raw = inp.agent_prefs.get("preferred_hour", inp.profile.get("preferred_hour"))
preferred = int(preferred_raw) if preferred_raw is not None else None preferred = int(preferred_raw) if preferred_raw is not None else None
quiet_start: str | None = inp.agent_prefs.get("quiet_start") quiet_start: str | None = inp.agent_prefs.get("quiet_start")
quiet_end: str | None = inp.agent_prefs.get("quiet_end") quiet_end: str | None = inp.agent_prefs.get("quiet_end")
peak_hours: list[int] = inp.agent_prefs.get("peak_hours", [])
tz: str = inp.agent_prefs.get("tz", "UTC")
in_quiet = self._in_quiet_window(hour, quiet_start, quiet_end) in_quiet = self._in_quiet_window(hour, quiet_start, quiet_end)
in_peak = hour in peak_hours
parts = [f"It is {hour:02d}:00 on {_DOW_NAMES[dow]} ({self._label(hour)})."] parts = [f"It is {hour:02d}:00 on {_DOW_NAMES[dow]} ({self._label(hour)})."]
if tz != "UTC":
parts[0] = f"It is {hour:02d}:00 ({tz}) on {_DOW_NAMES[dow]} ({self._label(hour)})."
if is_weekend: if is_weekend:
parts.append("Weekend context — prefer personal or reflective tips over work tasks.") parts.append("Weekend context — prefer personal or reflective tips over work tasks.")
@@ -83,8 +204,18 @@ class TimeOfDayAgent(BaseAgent):
f"User is in their quiet window ({quiet_start}{quiet_end}) — " f"User is in their quiet window ({quiet_start}{quiet_end}) — "
"avoid urgent or demanding tips." "avoid urgent or demanding tips."
) )
elif in_peak:
if preferred is not None: parts.append(
f"Hour {hour:02d}:00 is a peak productivity hour for this user — "
"a high-impact or challenging tip is appropriate."
)
elif peak_hours:
# Report nearest peak so orchestrator can time advice accordingly.
nearest = min(peak_hours, key=lambda p: min(abs(p - hour), 24 - abs(p - hour)))
delta = min(abs(nearest - hour), 24 - abs(nearest - hour))
if delta <= 2:
parts.append(f"Approaching peak productivity window ({nearest:02d}:00).")
elif preferred is not None:
delta = min(abs(hour - preferred), 24 - abs(hour - preferred)) delta = min(abs(hour - preferred), 24 - abs(hour - preferred))
if delta == 0: if delta == 0:
parts.append( parts.append(
@@ -103,6 +234,10 @@ class TimeOfDayAgent(BaseAgent):
"preferred_hour": preferred, "preferred_hour": preferred,
"quiet_start": quiet_start, "quiet_start": quiet_start,
"quiet_end": quiet_end, "quiet_end": quiet_end,
"peak_hours": peak_hours,
"in_quiet": in_quiet,
"in_peak": in_peak,
"tz": tz,
} }
return self._make_output(inp, prompt, snapshot) return self._make_output(inp, prompt, snapshot)

View File

@@ -40,7 +40,7 @@ if _repo_root not in sys.path:
from ml.agents.base import AgentInput # noqa: E402 from ml.agents.base import AgentInput # noqa: E402
from ml.agents.registry import get_agent, all_agents, all_manifests, get_manifest # noqa: E402 from ml.agents.registry import get_agent, all_agents, all_manifests, get_manifest # noqa: E402
from ml.agents.inference import run_inference, FeedbackEvent, UserHistory # noqa: E402 from ml.agents.inference import run_inference, FeedbackEvent, TaskCompletion, UserHistory # noqa: E402
logging_config.configure() logging_config.configure()
@@ -141,7 +141,8 @@ class AgentComputeResponse(BaseModel):
class AgentInferRequest(BaseModel): class AgentInferRequest(BaseModel):
user_id: str user_id: str
feedback_history: list[dict] = [] # [{action, dwell_ms, created_at}, …] feedback_history: list[dict] = [] # [{action, dwell_ms, created_at}, …]
task_completions: list[dict] = [] # [{project_id, completed_at, due_at}, …]
class AgentInferResponse(BaseModel): class AgentInferResponse(BaseModel):
@@ -284,7 +285,15 @@ async def infer_agent(agent_id: str, req: AgentInferRequest) -> AgentInferRespon
) )
for e in req.feedback_history for e in req.feedback_history
] ]
history = UserHistory(user_id=req.user_id, events=events) completions = [
TaskCompletion(
project_id=c.get("project_id"),
completed_at=c.get("completed_at", ""),
due_at=c.get("due_at", ""),
)
for c in req.task_completions
]
history = UserHistory(user_id=req.user_id, events=events, task_completions=completions)
t0 = __import__("time").monotonic() t0 = __import__("time").monotonic()
inferred = run_inference(manifest, history) inferred = run_inference(manifest, history)