refactor(focus-area): output all clusters as context; remove scoring and preferred_areas

The agent no longer picks a winner — it summarises every cluster so the
orchestrator can decide what's relevant. Scoring by overdue count overlapped
with the overdue-task agent. preferred_areas (project-ID based, broken label
matching) removed entirely.

Output format: numbered list of areas with task titles included.
Snapshot: {cluster_count, clusters: [{label, task_count, tasks}]}.
Version bumped to 3.0.0; inferred_params cleared.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-12 14:57:04 +00:00
parent 12c956b588
commit f6b89fc849
3 changed files with 69 additions and 168 deletions

View File

@@ -1,69 +1,37 @@
from __future__ import annotations
from collections import Counter
from typing import ClassVar
from .base import BaseAgent, AgentInput, AgentOutput
from .clustering import cluster_tasks
from .inference.history import UserHistory
from .manifest import AgentManifest, InferredParam
def _infer_preferred_areas(history: UserHistory) -> list[str]:
"""Top-2 project IDs by completed task count (last 90 days worth of data)."""
counts: Counter[str] = Counter()
for tc in history.task_completions:
if tc.project_id:
counts[tc.project_id] += 1
return [pid for pid, _ in counts.most_common(2)]
from .manifest import AgentManifest
MANIFEST = AgentManifest(
id="focus-area",
version="2.1.0", # 1h TTL + task-change detection (#129)
description="Identifies the most congested semantic focus area in the user's task list.",
pref_schema={
"type": "object",
"additionalProperties": False,
"properties": {
"preferred_areas": {
"type": "array",
"items": {"type": "string"},
"default": [],
"description": "Project IDs or label names to prioritise when multiple areas tie.",
},
},
},
version="3.0.0", # output all clusters as context; no scoring (#129)
description="Clusters the user's task list and summarises all areas for the orchestrator.",
pref_schema={"type": "object", "additionalProperties": False, "properties": {}},
context_schema=["todoist.tasks"],
required_consents=["data:core", "data:todoist"],
output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=86_400,
inferred_params=[
InferredParam(
key="preferred_areas",
ttl_sec=86_400,
cold_start_default=[],
min_history=0, # use task_completions, not feedback events; handle empty inside
infer=_infer_preferred_areas,
),
],
inferred_params=[],
)
class FocusAreaAgent(BaseAgent):
"""Identifies the most congested semantic focus area in the user's task list."""
"""Clusters tasks and outputs a full area summary for the orchestrator."""
agent_id: ClassVar[str] = MANIFEST.id
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
version: ClassVar[str] = MANIFEST.version # 2.1.0
version: ClassVar[str] = MANIFEST.version # 3.0.0
def compute(self, inp: AgentInput) -> AgentOutput:
preferred: list[str] = inp.agent_prefs.get("preferred_areas", [])
if not inp.tasks:
return self._make_output(
inp,
"No tasks available to identify a focus area.",
{"cluster_count": 0, "strategy": "none"},
"No tasks available to identify focus areas.",
{"cluster_count": 0},
)
clusters, new_enrichments = cluster_tasks(inp.tasks, enrichment_cache=inp.enrichment_cache)
@@ -71,45 +39,27 @@ class FocusAreaAgent(BaseAgent):
if not clusters:
return self._make_output(
inp,
"No tasks available to identify a focus area.",
{"cluster_count": 0, "strategy": "none"},
"No tasks available to identify focus areas.",
{"cluster_count": 0},
)
strategy = "semantic" if len(clusters) > 1 or len(inp.tasks) > 1 else "fallback"
lines = [f"The user's tasks are grouped into {len(clusters)} area(s):"]
for i, cluster in enumerate(clusters, 1):
titles = [t.get("content", "").strip() for t in cluster.tasks if t.get("content")]
titles_str = "; ".join(f'"{t}"' for t in titles[:8])
if len(titles) > 8:
titles_str += f" (and {len(titles) - 8} more)"
lines.append(f"{i}. {cluster.label}{cluster.task_count} task(s): {titles_str}")
def score(cluster) -> float:
base = sum(2.0 if t.get("is_overdue") else 1.0 for t in cluster.tasks)
boosted = any(p in cluster.label for p in preferred) if preferred else False
return base + (0.5 if boosted else 0.0)
top = max(clusters, key=score)
boosted = bool(preferred) and any(p in top.label for p in preferred)
parts = [
f'The user\'s most active focus area is "{top.label}" '
f"({top.task_count} task{'s' if top.task_count != 1 else ''}, "
f"{top.overdue_count} overdue). "
f"(Note: task titles may be in any language — always write the tip in English.)"
]
if boosted:
parts.append("This area matches the user's stated focus preferences.")
if top.overdue_count >= 3:
parts.append("Consider surfacing an action from this area.")
if len(clusters) > 1:
other_total = sum(c.task_count for c in clusters if c is not top)
parts.append(
f"{len(clusters) - 1} other area{'s' if len(clusters) > 2 else ''} "
f"contain {other_total} task{'s' if other_total != 1 else ''}."
)
lines.append("(Task titles may be in any language — always write the tip in English.)")
snapshot = {
"top_cluster_label": top.label,
"top_task_count": top.task_count,
"top_overdue_count": top.overdue_count,
"cluster_count": len(clusters),
"strategy": strategy,
"preferred_areas": preferred,
# Consumed by compute_agent endpoint; stripped before storing the snapshot.
"clusters": [
{"label": c.label, "task_count": c.task_count,
"tasks": [t.get("content", "") for t in c.tasks]}
for c in clusters
],
"_new_enrichments": new_enrichments,
}
return self._make_output(inp, " ".join(parts), snapshot)
return self._make_output(inp, "\n".join(lines), snapshot)

View File

@@ -213,41 +213,41 @@ class TestFocusAreaAgent:
out = self.agent.compute(_inp())
assert "no tasks" in out.prompt_text.lower()
def test_single_project(self):
tasks = [_task(f"T{i}", project_id="Work") for i in range(3)]
out = self.agent.compute(_inp(tasks=tasks))
assert '"Work"' in out.prompt_text
assert "3 tasks" in out.prompt_text
def test_most_congested_wins(self):
def test_lists_all_clusters(self):
tasks = (
[_task(f"W{i}", project_id="Work") for i in range(5)]
[_task(f"W{i}", project_id="Work") for i in range(3)]
+ [_task(f"H{i}", project_id="Home") for i in range(2)]
)
out = self.agent.compute(_inp(tasks=tasks))
assert '"Work"' in out.prompt_text
assert "Work" in out.prompt_text
assert "Home" in out.prompt_text
def test_overdue_weighting(self):
# Home has 2 tasks (1 overdue), Work has 3 non-overdue tasks
# Home score = 2+1 = 3; Work score = 3 — Home should win due to overdue weight
tasks = (
[_task("Home1", project_id="Home", is_overdue=True),
_task("Home2", project_id="Home")]
+ [_task(f"W{i}", project_id="Work") for i in range(3)]
)
def test_includes_task_titles(self):
tasks = [_task("Buy milk", project_id="Personal"), _task("Write report", project_id="Personal")]
out = self.agent.compute(_inp(tasks=tasks))
assert '"Work"' not in out.prompt_text or '"Home"' in out.prompt_text
assert '"Buy milk"' in out.prompt_text
assert '"Write report"' in out.prompt_text
def test_task_count_in_output(self):
tasks = [_task(f"T{i}", project_id="Work") for i in range(3)]
out = self.agent.compute(_inp(tasks=tasks))
assert "3 task" in out.prompt_text
def test_default_project_fallback(self):
out = self.agent.compute(_inp(tasks=[_task("No project task")]))
# Tasks without project_id fall back to a "Tasks" bucket
assert "Tasks" in out.prompt_text
def test_snapshot_keys(self):
out = self.agent.compute(_inp(tasks=[_task("T1", project_id="A")]))
public_keys = {k for k in out.signals_snapshot if not k.startswith("_")}
assert {"top_cluster_label", "top_task_count", "top_overdue_count", "cluster_count",
"strategy", "preferred_areas"} == public_keys
assert {"cluster_count", "clusters"} == public_keys
def test_snapshot_clusters_shape(self):
tasks = [_task("Buy milk", project_id="P1"), _task("Fix bug", project_id="P2")]
out = self.agent.compute(_inp(tasks=tasks))
clusters = out.signals_snapshot["clusters"]
assert isinstance(clusters, list)
assert all("label" in c and "task_count" in c and "tasks" in c for c in clusters)
# ── Registry ─────────────────────────────────────────────────────────────────

View File

@@ -627,86 +627,37 @@ class TestTimeOfDaySnippet:
assert {"quiet_start", "quiet_end", "peak_hours", "tz"}.issubset(keys)
# ── focus-area: preferred_areas wiring ───────────────────────────────────────
# ── focus-area: cluster summary output ───────────────────────────────────────
class TestFocusAreaPreferredAreas:
class TestFocusAreaOutput:
agent = FocusAreaAgent()
def _task(self, content: str, project_id: str, is_overdue: bool = False) -> dict:
return {"id": "t1", "content": content, "is_overdue": is_overdue,
def _task(self, content: str, project_id: str) -> dict:
return {"id": "t1", "content": content, "is_overdue": False,
"task_age_days": 2.0, "priority": 1, "project_id": project_id}
def test_preferred_area_wins_tie(self):
tasks = [
self._task("Work thing", "work"),
self._task("Home thing", "home"),
]
out = self.agent.compute(_inp(tasks=tasks, agent_prefs={"preferred_areas": ["work"]}))
assert "work" in out.prompt_text
assert "matches the user's stated focus preferences" in out.prompt_text
def test_no_preferred_areas_uses_congestion_score(self):
tasks = [
self._task("W1", "work"),
self._task("H1", "home"),
self._task("H2", "home"),
]
out = self.agent.compute(_inp(tasks=tasks))
# home has more tasks → wins without any preference
assert "home" in out.prompt_text
def test_snapshot_includes_preferred_areas(self):
tasks = [self._task("T", "work")]
out = self.agent.compute(_inp(tasks=tasks, agent_prefs={"preferred_areas": ["work"]}))
assert out.signals_snapshot["preferred_areas"] == ["work"]
def test_version_bumped(self):
def test_version(self):
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
assert FA_MANIFEST.version == "2.1.0"
assert FA_MANIFEST.version == "3.0.0"
def test_snapshot_uses_cluster_keys(self):
def test_all_clusters_in_output(self):
tasks = [self._task("Work thing", "work"), self._task("Home thing", "home")]
out = self.agent.compute(_inp(tasks=tasks))
assert "work" in out.prompt_text.lower()
assert "home" in out.prompt_text.lower()
def test_task_titles_in_output(self):
tasks = [self._task("Buy milk", "personal")]
out = self.agent.compute(_inp(tasks=tasks))
assert '"Buy milk"' in out.prompt_text
def test_snapshot_shape(self):
tasks = [self._task("T", "work")]
out = self.agent.compute(_inp(tasks=tasks))
assert "top_cluster_label" in out.signals_snapshot
assert "cluster_count" in out.signals_snapshot
assert "strategy" in out.signals_snapshot
public_keys = {k for k in out.signals_snapshot if not k.startswith("_")}
assert public_keys == {"cluster_count", "clusters"}
assert isinstance(out.signals_snapshot["clusters"], list)
# ── focus-area: preferred_areas inference from task_completions (#113) ────────
class TestFocusAreaPreferredAreasInference:
from ml.agents.focus_area import MANIFEST as _FA_MANIFEST
def _completion(self, project_id: str) -> TaskCompletion:
return _completion(project_id, lateness_days=0.0)
def test_cold_start_no_completions(self):
history = _history(completions=[])
def test_no_inferred_params(self):
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == []
def test_top_two_projects_returned(self):
completions = (
[_completion("p1", 0)] * 8
+ [_completion("p2", 0)] * 5
+ [_completion("p3", 0)] * 2
)
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["p1", "p2"]
def test_single_project_returns_one(self):
completions = [_completion("work", 0)] * 6
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["work"]
def test_none_project_id_ignored(self):
completions = [_completion(None, 0)] * 5 + [_completion("real", 0)] * 3
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["real"]
assert FA_MANIFEST.inferred_params == []