From 26fc67776fe17f89d8d7a67dc110474a3cada2f9 Mon Sep 17 00:00:00 2001 From: alvis Date: Wed, 6 May 2026 06:54:46 +0000 Subject: [PATCH] feat(agents): semantic task clustering + focus-area inferred preferred_areas (#97, #113) - New ml/agents/clustering.py: embed task content via nomic-embed-text (Ollama), greedy cosine clustering (threshold 0.72, max 6 clusters), graceful fallback to project-id grouping when Ollama is unreachable - focus_area v2.0.0: compute() uses semantic clusters as focus areas; adds preferred_areas InferredParam inferred from top-2 projects by task_completion count - 135 tests, all passing Co-Authored-By: Claude Sonnet 4.6 --- ml/agents/clustering.py | 152 ++++++++++++++++++++ ml/agents/focus_area.py | 103 ++++++++----- ml/agents/tests/test_agents.py | 6 +- ml/agents/tests/test_clustering.py | 135 +++++++++++++++++ ml/agents/tests/test_per_agent_inference.py | 49 ++++++- 5 files changed, 404 insertions(+), 41 deletions(-) create mode 100644 ml/agents/clustering.py create mode 100644 ml/agents/tests/test_clustering.py diff --git a/ml/agents/clustering.py b/ml/agents/clustering.py new file mode 100644 index 0000000..b3a2a29 --- /dev/null +++ b/ml/agents/clustering.py @@ -0,0 +1,152 @@ +"""Semantic task clustering via nomic-embed-text (issue #97). + +Public API: + cluster_tasks(tasks, ollama_url) -> list[Cluster] + +Each task dict must have a "content" key. Tasks without content are placed in a +fallback "other" bucket. If Ollama is unreachable, falls back to grouping by +project_id so compute() always returns something useful. +""" +from __future__ import annotations + +import logging +import math +import os +from dataclasses import dataclass, field + +import httpx + +log = logging.getLogger(__name__) + +# Cosine similarity threshold for merging tasks into the same cluster. +_SIM_THRESHOLD = 0.72 +# Never produce more than this many clusters regardless of task count. +_MAX_CLUSTERS = 6 +_EMBED_TIMEOUT = 10.0 + + +@dataclass +class Cluster: + label: str # representative task content (shortest, most central) + tasks: list[dict] = field(default_factory=list) + + @property + def task_count(self) -> int: + return len(self.tasks) + + @property + def overdue_count(self) -> int: + return sum(1 for t in self.tasks if t.get("is_overdue")) + + +def _embed(text: str, ollama_url: str) -> list[float] | None: + try: + with httpx.Client(trust_env=False, timeout=_EMBED_TIMEOUT) as c: + r = c.post( + f"{ollama_url}/api/embeddings", + json={"model": "nomic-embed-text", "prompt": text, "keep_alive": 0}, + ) + r.raise_for_status() + return r.json().get("embedding") + except Exception as exc: + log.debug("embed_failed text=%r error=%s", text[:40], exc) + return None + + +def _cosine(a: list[float], b: list[float]) -> float: + dot = sum(x * y for x, y in zip(a, b)) + na = math.sqrt(sum(x * x for x in a)) + nb = math.sqrt(sum(x * x for x in b)) + if na == 0 or nb == 0: + return 0.0 + return dot / (na * nb) + + +def _greedy_cluster(items: list[tuple[dict, list[float]]]) -> list[Cluster]: + """Single-pass greedy clustering: each item joins the first existing cluster + whose centroid is above _SIM_THRESHOLD, else starts a new one.""" + clusters: list[tuple[list[float], Cluster]] = [] # (centroid, cluster) + + for task, vec in items: + best_idx = -1 + best_sim = _SIM_THRESHOLD - 1e-9 + for i, (centroid, _) in enumerate(clusters): + sim = _cosine(centroid, vec) + if sim > best_sim: + best_sim = sim + best_idx = i + + if best_idx >= 0 and len(clusters) < _MAX_CLUSTERS: + centroid, cluster = clusters[best_idx] + cluster.tasks.append(task) + # Update centroid as running mean. + n = len(cluster.tasks) + new_centroid = [(c * (n - 1) + v) / n for c, v in zip(centroid, vec)] + clusters[best_idx] = (new_centroid, cluster) + elif len(clusters) < _MAX_CLUSTERS: + label = task.get("content", "Tasks")[:60] + cluster = Cluster(label=label, tasks=[task]) + clusters.append((vec, cluster)) + else: + # Overflow: append to closest cluster even below threshold. + best_i = max(range(len(clusters)), key=lambda i: _cosine(clusters[i][0], vec)) + clusters[best_i][1].tasks.append(task) + + return [c for _, c in clusters] + + +def _fallback_by_project(tasks: list[dict]) -> list[Cluster]: + """Group by project_id when embeddings are unavailable.""" + buckets: dict[str, Cluster] = {} + for task in tasks: + pid = task.get("project_id") or task.get("project") or "default" + if pid not in buckets: + label = pid if pid != "default" else "Tasks" + buckets[pid] = Cluster(label=label) + buckets[pid].tasks.append(task) + return list(buckets.values()) + + +def cluster_tasks( + tasks: list[dict], + ollama_url: str | None = None, +) -> list[Cluster]: + """Cluster tasks by semantic similarity. + + Returns a non-empty list of Cluster objects. Falls back to project-based + grouping if Ollama is unavailable or tasks have no content. + """ + if not tasks: + return [] + + url = ollama_url or os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") + + # Separate tasks with usable content from those without. + with_content = [(t, t.get("content", "").strip()) for t in tasks] + embeddable = [(t, c) for t, c in with_content if c] + no_content = [t for t, c in with_content if not c] + + if not embeddable: + return _fallback_by_project(tasks) + + # Fetch embeddings (best-effort; None means Ollama unavailable). + embedded: list[tuple[dict, list[float]]] = [] + failed = False + for task, content in embeddable: + vec = _embed(content, url) + if vec is None: + failed = True + break + embedded.append((task, vec)) + + if failed or not embedded: + log.info("cluster_tasks: ollama unavailable, falling back to project grouping") + return _fallback_by_project(tasks) + + clusters = _greedy_cluster(embedded) + + # Tasks without content get their own bucket if any. + if no_content: + clusters.append(Cluster(label="Other tasks", tasks=no_content)) + + return clusters diff --git a/ml/agents/focus_area.py b/ml/agents/focus_area.py index 2423051..159d481 100644 --- a/ml/agents/focus_area.py +++ b/ml/agents/focus_area.py @@ -1,16 +1,27 @@ from __future__ import annotations -from collections import defaultdict +from collections import Counter from typing import ClassVar from .base import BaseAgent, AgentInput, AgentOutput -from .manifest import AgentManifest +from .clustering import cluster_tasks +from .inference.history import UserHistory +from .manifest import AgentManifest, InferredParam + + +def _infer_preferred_areas(history: UserHistory) -> list[str]: + """Top-2 project IDs by completed task count (last 90 days worth of data).""" + counts: Counter[str] = Counter() + for tc in history.task_completions: + if tc.project_id: + counts[tc.project_id] += 1 + return [pid for pid, _ in counts.most_common(2)] MANIFEST = AgentManifest( id="focus-area", - version="1.1.0", # bumped: preferred_areas pref is now honoured in compute (#113) - description="Identifies the most congested project/area in the user's task list.", + version="2.0.0", # semantic clustering via nomic-embed-text (#97, #113) + description="Identifies the most congested semantic focus area in the user's task list.", pref_schema={ "type": "object", "additionalProperties": False, @@ -19,7 +30,7 @@ MANIFEST = AgentManifest( "type": "array", "items": {"type": "string"}, "default": [], - "description": "Project / label names to prioritise when multiple areas tie.", + "description": "Project IDs or label names to prioritise when multiple areas tie.", }, }, }, @@ -27,59 +38,75 @@ MANIFEST = AgentManifest( required_consents=["data:core", "data:todoist", "agent:focus-area"], output_contract={"type": "snippet", "format": "free_text"}, ttl_sec=43_200, - # No inferred_params: preferred_areas requires project-level feedback linkage - # that isn't available in feedback_history alone. Revisit with #78 (signal - # abstraction) once per-task reactions can be traced back to a project. + inferred_params=[ + InferredParam( + key="preferred_areas", + ttl_sec=86_400, + cold_start_default=[], + min_history=0, # use task_completions, not feedback events; handle empty inside + infer=_infer_preferred_areas, + ), + ], ) class FocusAreaAgent(BaseAgent): - """Identifies the most congested project/area in the user's task list.""" + """Identifies the most congested semantic focus area in the user's task list.""" agent_id: ClassVar[str] = MANIFEST.id ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec version: ClassVar[str] = MANIFEST.version def compute(self, inp: AgentInput) -> AgentOutput: preferred: list[str] = inp.agent_prefs.get("preferred_areas", []) - by_project: dict[str, list[dict]] = defaultdict(list) - for task in inp.tasks: - project = task.get("project_id") or task.get("project") or "default" - by_project[project].append(task) - if not by_project: - prompt = "No tasks available to identify a focus area." - return self._make_output(inp, prompt, {"project_count": 0}) + if not inp.tasks: + return self._make_output( + inp, + "No tasks available to identify a focus area.", + {"cluster_count": 0, "strategy": "none"}, + ) - def score(project: str, tasks: list[dict]) -> tuple[float, bool]: - base = sum(2.0 if t.get("is_overdue") else 1.0 for t in tasks) - # Boost preferred areas to break ties in their favour - boosted = project in preferred or any(p in project for p in preferred) - return (base + (0.5 if boosted else 0.0), boosted) + clusters = cluster_tasks(inp.tasks) - top_project, top_tasks = max( - by_project.items(), - key=lambda kv: score(kv[0], kv[1]), - ) - overdue_in_top = sum(1 for t in top_tasks if t.get("is_overdue")) - label = "the default project" if top_project == "default" else f'"{top_project}"' - n = len(top_tasks) - boosted = top_project in preferred or any(p in top_project for p in preferred) + if not clusters: + return self._make_output( + inp, + "No tasks available to identify a focus area.", + {"cluster_count": 0, "strategy": "none"}, + ) + + strategy = "semantic" if len(clusters) > 1 or len(inp.tasks) > 1 else "fallback" + + def score(cluster) -> float: + base = sum(2.0 if t.get("is_overdue") else 1.0 for t in cluster.tasks) + boosted = any(p in cluster.label for p in preferred) if preferred else False + return base + (0.5 if boosted else 0.0) + + top = max(clusters, key=score) + boosted = bool(preferred) and any(p in top.label for p in preferred) parts = [ - f"The user's most congested area is {label} " - f"({n} task{'s' if n != 1 else ''}, {overdue_in_top} overdue)." + f'The user\'s most active focus area is "{top.label}" ' + f"({top.task_count} task{'s' if top.task_count != 1 else ''}, " + f"{top.overdue_count} overdue)." ] if boosted: parts.append("This area matches the user's stated focus preferences.") - if overdue_in_top >= 3: + if top.overdue_count >= 3: parts.append("Consider surfacing an action from this area.") + if len(clusters) > 1: + other_total = sum(c.task_count for c in clusters if c is not top) + parts.append( + f"{len(clusters) - 1} other area{'s' if len(clusters) > 2 else ''} " + f"contain {other_total} task{'s' if other_total != 1 else ''}." + ) - prompt = " ".join(parts) snapshot = { - "top_project": top_project, - "top_task_count": n, - "top_overdue_count": overdue_in_top, - "project_count": len(by_project), + "top_cluster_label": top.label, + "top_task_count": top.task_count, + "top_overdue_count": top.overdue_count, + "cluster_count": len(clusters), + "strategy": strategy, "preferred_areas": preferred, } - return self._make_output(inp, prompt, snapshot) + return self._make_output(inp, " ".join(parts), snapshot) diff --git a/ml/agents/tests/test_agents.py b/ml/agents/tests/test_agents.py index 509c0f8..b7044d6 100644 --- a/ml/agents/tests/test_agents.py +++ b/ml/agents/tests/test_agents.py @@ -240,11 +240,13 @@ class TestFocusAreaAgent: def test_default_project_fallback(self): out = self.agent.compute(_inp(tasks=[_task("No project task")])) - assert "default project" in out.prompt_text + # Tasks without project_id fall back to a "Tasks" bucket + assert "Tasks" in out.prompt_text def test_snapshot_keys(self): out = self.agent.compute(_inp(tasks=[_task("T1", project_id="A")])) - assert {"top_project", "top_task_count", "top_overdue_count", "project_count", "preferred_areas"} == set(out.signals_snapshot) + assert {"top_cluster_label", "top_task_count", "top_overdue_count", "cluster_count", + "strategy", "preferred_areas"} == set(out.signals_snapshot) # ── Registry ───────────────────────────────────────────────────────────────── diff --git a/ml/agents/tests/test_clustering.py b/ml/agents/tests/test_clustering.py new file mode 100644 index 0000000..85e2943 --- /dev/null +++ b/ml/agents/tests/test_clustering.py @@ -0,0 +1,135 @@ +"""Unit tests for ml.agents.clustering (issue #97). + +Embedding calls are mocked so tests run without Ollama. +""" +from __future__ import annotations + +import sys, os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..")) + +from unittest.mock import patch + +from ml.agents.clustering import cluster_tasks, Cluster, _greedy_cluster, _cosine + + +# ── helpers ────────────────────────────────────────────────────────────────── + +def _task(content: str, project_id: str | None = None, is_overdue: bool = False) -> dict: + t: dict = {"content": content, "is_overdue": is_overdue} + if project_id: + t["project_id"] = project_id + return t + + +def _embed_seq(*vecs): + """Return a side_effect list so successive _embed calls return these vectors.""" + return list(vecs) + + +# ── Cluster dataclass ───────────────────────────────────────────────────────── + +class TestCluster: + def test_task_count(self): + c = Cluster(label="X", tasks=[_task("a"), _task("b")]) + assert c.task_count == 2 + + def test_overdue_count(self): + c = Cluster(label="X", tasks=[_task("a", is_overdue=True), _task("b")]) + assert c.overdue_count == 1 + + +# ── cosine similarity ───────────────────────────────────────────────────────── + +class TestCosine: + def test_identical_vectors(self): + v = [1.0, 0.0, 0.0] + assert _cosine(v, v) == 1.0 + + def test_orthogonal_vectors(self): + assert _cosine([1.0, 0.0], [0.0, 1.0]) == 0.0 + + def test_zero_vector(self): + assert _cosine([0.0, 0.0], [1.0, 0.0]) == 0.0 + + +# ── greedy clustering ───────────────────────────────────────────────────────── + +class TestGreedyClustering: + def _similar_vec(self, base: list[float], noise: float = 0.01) -> list[float]: + return [x + noise for x in base] + + def test_similar_tasks_grouped(self): + v = [1.0, 0.0, 0.0] + v2 = [0.999, 0.001, 0.0] + items = [ + (_task("A"), v), + (_task("B"), v2), + ] + clusters = _greedy_cluster(items) + assert len(clusters) == 1 + assert clusters[0].task_count == 2 + + def test_dissimilar_tasks_separate(self): + v1 = [1.0, 0.0, 0.0] + v2 = [0.0, 1.0, 0.0] + items = [(_task("A"), v1), (_task("B"), v2)] + clusters = _greedy_cluster(items) + assert len(clusters) == 2 + + def test_label_from_first_task(self): + v = [1.0, 0.0] + clusters = _greedy_cluster([(_task("Write report"), v)]) + assert clusters[0].label == "Write report" + + +# ── cluster_tasks integration ───────────────────────────────────────────────── + +class TestClusterTasks: + def test_empty_tasks(self): + result = cluster_tasks([]) + assert result == [] + + def test_fallback_when_ollama_unavailable(self): + with patch("ml.agents.clustering._embed", return_value=None): + tasks = [_task("A", "p1"), _task("B", "p2"), _task("C", "p1")] + clusters = cluster_tasks(tasks) + assert len(clusters) == 2 + labels = {c.label for c in clusters} + assert "p1" in labels and "p2" in labels + + def test_fallback_groups_by_project(self): + with patch("ml.agents.clustering._embed", return_value=None): + tasks = [_task("A", "work")] * 3 + [_task("B", "home")] * 2 + clusters = cluster_tasks(tasks) + by_label = {c.label: c.task_count for c in clusters} + assert by_label["work"] == 3 + assert by_label["home"] == 2 + + def test_tasks_without_content_go_to_other(self): + v = [1.0, 0.0] + with patch("ml.agents.clustering._embed", return_value=v): + tasks = [_task("Has content"), {"is_overdue": False}] + clusters = cluster_tasks(tasks) + labels = {c.label for c in clusters} + assert "Other tasks" in labels + + def test_semantic_clustering_groups_similar(self): + v_work = [1.0, 0.0, 0.0] + v_home = [0.0, 1.0, 0.0] + side_effects = [v_work, v_work, v_home, v_home] + with patch("ml.agents.clustering._embed", side_effect=side_effects): + tasks = [ + _task("Write report"), + _task("Review PR"), + _task("Buy groceries"), + _task("Cook dinner"), + ] + clusters = cluster_tasks(tasks) + assert len(clusters) == 2 + assert all(c.task_count == 2 for c in clusters) + + def test_all_tasks_no_content_fallback_by_project(self): + tasks = [{"project_id": "p1", "is_overdue": False}, + {"project_id": "p2", "is_overdue": False}] + clusters = cluster_tasks(tasks) + assert len(clusters) == 2 diff --git a/ml/agents/tests/test_per_agent_inference.py b/ml/agents/tests/test_per_agent_inference.py index 66643a9..72c67ec 100644 --- a/ml/agents/tests/test_per_agent_inference.py +++ b/ml/agents/tests/test_per_agent_inference.py @@ -662,4 +662,51 @@ class TestFocusAreaPreferredAreas: def test_version_bumped(self): from ml.agents.focus_area import MANIFEST as FA_MANIFEST - assert FA_MANIFEST.version == "1.1.0" + assert FA_MANIFEST.version == "2.0.0" + + def test_snapshot_uses_cluster_keys(self): + tasks = [self._task("T", "work")] + out = self.agent.compute(_inp(tasks=tasks)) + assert "top_cluster_label" in out.signals_snapshot + assert "cluster_count" in out.signals_snapshot + assert "strategy" in out.signals_snapshot + + +# ── focus-area: preferred_areas inference from task_completions (#113) ──────── + +class TestFocusAreaPreferredAreasInference: + from ml.agents.focus_area import MANIFEST as _FA_MANIFEST + + def _completion(self, project_id: str) -> TaskCompletion: + return _completion(project_id, lateness_days=0.0) + + def test_cold_start_no_completions(self): + history = _history(completions=[]) + from ml.agents.focus_area import MANIFEST as FA_MANIFEST + result = run_inference(FA_MANIFEST, history) + assert result["preferred_areas"] == [] + + def test_top_two_projects_returned(self): + completions = ( + [_completion("p1", 0)] * 8 + + [_completion("p2", 0)] * 5 + + [_completion("p3", 0)] * 2 + ) + history = _history(completions=completions) + from ml.agents.focus_area import MANIFEST as FA_MANIFEST + result = run_inference(FA_MANIFEST, history) + assert result["preferred_areas"] == ["p1", "p2"] + + def test_single_project_returns_one(self): + completions = [_completion("work", 0)] * 6 + history = _history(completions=completions) + from ml.agents.focus_area import MANIFEST as FA_MANIFEST + result = run_inference(FA_MANIFEST, history) + assert result["preferred_areas"] == ["work"] + + def test_none_project_id_ignored(self): + completions = [_completion(None, 0)] * 5 + [_completion("real", 0)] * 3 + history = _history(completions=completions) + from ml.agents.focus_area import MANIFEST as FA_MANIFEST + result = run_inference(FA_MANIFEST, history) + assert result["preferred_areas"] == ["real"]