from __future__ import annotations from collections import Counter from typing import ClassVar from .base import BaseAgent, AgentInput, AgentOutput from .clustering import cluster_tasks from .inference.history import UserHistory from .manifest import AgentManifest, InferredParam def _infer_preferred_areas(history: UserHistory) -> list[str]: """Top-2 project IDs by completed task count (last 90 days worth of data).""" counts: Counter[str] = Counter() for tc in history.task_completions: if tc.project_id: counts[tc.project_id] += 1 return [pid for pid, _ in counts.most_common(2)] MANIFEST = AgentManifest( id="focus-area", version="2.1.0", # 1h TTL + task-change detection (#129) description="Identifies the most congested semantic focus area in the user's task list.", pref_schema={ "type": "object", "additionalProperties": False, "properties": { "preferred_areas": { "type": "array", "items": {"type": "string"}, "default": [], "description": "Project IDs or label names to prioritise when multiple areas tie.", }, }, }, context_schema=["todoist.tasks"], required_consents=["data:core", "data:todoist"], output_contract={"type": "snippet", "format": "free_text"}, ttl_sec=86_400, inferred_params=[ InferredParam( key="preferred_areas", ttl_sec=86_400, cold_start_default=[], min_history=0, # use task_completions, not feedback events; handle empty inside infer=_infer_preferred_areas, ), ], ) class FocusAreaAgent(BaseAgent): """Identifies the most congested semantic focus area in the user's task list.""" agent_id: ClassVar[str] = MANIFEST.id ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec version: ClassVar[str] = MANIFEST.version # 2.1.0 def compute(self, inp: AgentInput) -> AgentOutput: preferred: list[str] = inp.agent_prefs.get("preferred_areas", []) if not inp.tasks: return self._make_output( inp, "No tasks available to identify a focus area.", {"cluster_count": 0, "strategy": "none"}, ) clusters, new_enrichments = cluster_tasks(inp.tasks, enrichment_cache=inp.enrichment_cache) if not clusters: return self._make_output( inp, "No tasks available to identify a focus area.", {"cluster_count": 0, "strategy": "none"}, ) strategy = "semantic" if len(clusters) > 1 or len(inp.tasks) > 1 else "fallback" def score(cluster) -> float: base = sum(2.0 if t.get("is_overdue") else 1.0 for t in cluster.tasks) boosted = any(p in cluster.label for p in preferred) if preferred else False return base + (0.5 if boosted else 0.0) top = max(clusters, key=score) boosted = bool(preferred) and any(p in top.label for p in preferred) parts = [ f'The user\'s most active focus area is "{top.label}" ' f"({top.task_count} task{'s' if top.task_count != 1 else ''}, " f"{top.overdue_count} overdue). " f"(Note: task titles may be in any language — always write the tip in English.)" ] if boosted: parts.append("This area matches the user's stated focus preferences.") if top.overdue_count >= 3: parts.append("Consider surfacing an action from this area.") if len(clusters) > 1: other_total = sum(c.task_count for c in clusters if c is not top) parts.append( f"{len(clusters) - 1} other area{'s' if len(clusters) > 2 else ''} " f"contain {other_total} task{'s' if other_total != 1 else ''}." ) snapshot = { "top_cluster_label": top.label, "top_task_count": top.task_count, "top_overdue_count": top.overdue_count, "cluster_count": len(clusters), "strategy": strategy, "preferred_areas": preferred, # Consumed by compute_agent endpoint; stripped before storing the snapshot. "_new_enrichments": new_enrichments, } return self._make_output(inp, " ".join(parts), snapshot)