feat(agents): semantic task clustering + focus-area inferred preferred_areas (#97, #113)

- New ml/agents/clustering.py: embed task content via nomic-embed-text
  (Ollama), greedy cosine clustering (threshold 0.72, max 6 clusters),
  graceful fallback to project-id grouping when Ollama is unreachable
- focus_area v2.0.0: compute() uses semantic clusters as focus areas;
  adds preferred_areas InferredParam inferred from top-2 projects by
  task_completion count
- 135 tests, all passing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-06 06:54:46 +00:00
parent 336644a90a
commit 26fc67776f
5 changed files with 404 additions and 41 deletions

152
ml/agents/clustering.py Normal file
View File

@@ -0,0 +1,152 @@
"""Semantic task clustering via nomic-embed-text (issue #97).
Public API:
cluster_tasks(tasks, ollama_url) -> list[Cluster]
Each task dict must have a "content" key. Tasks without content are placed in a
fallback "other" bucket. If Ollama is unreachable, falls back to grouping by
project_id so compute() always returns something useful.
"""
from __future__ import annotations
import logging
import math
import os
from dataclasses import dataclass, field
import httpx
log = logging.getLogger(__name__)
# Cosine similarity threshold for merging tasks into the same cluster.
_SIM_THRESHOLD = 0.72
# Never produce more than this many clusters regardless of task count.
_MAX_CLUSTERS = 6
_EMBED_TIMEOUT = 10.0
@dataclass
class Cluster:
label: str # representative task content (shortest, most central)
tasks: list[dict] = field(default_factory=list)
@property
def task_count(self) -> int:
return len(self.tasks)
@property
def overdue_count(self) -> int:
return sum(1 for t in self.tasks if t.get("is_overdue"))
def _embed(text: str, ollama_url: str) -> list[float] | None:
try:
with httpx.Client(trust_env=False, timeout=_EMBED_TIMEOUT) as c:
r = c.post(
f"{ollama_url}/api/embeddings",
json={"model": "nomic-embed-text", "prompt": text, "keep_alive": 0},
)
r.raise_for_status()
return r.json().get("embedding")
except Exception as exc:
log.debug("embed_failed text=%r error=%s", text[:40], exc)
return None
def _cosine(a: list[float], b: list[float]) -> float:
dot = sum(x * y for x, y in zip(a, b))
na = math.sqrt(sum(x * x for x in a))
nb = math.sqrt(sum(x * x for x in b))
if na == 0 or nb == 0:
return 0.0
return dot / (na * nb)
def _greedy_cluster(items: list[tuple[dict, list[float]]]) -> list[Cluster]:
"""Single-pass greedy clustering: each item joins the first existing cluster
whose centroid is above _SIM_THRESHOLD, else starts a new one."""
clusters: list[tuple[list[float], Cluster]] = [] # (centroid, cluster)
for task, vec in items:
best_idx = -1
best_sim = _SIM_THRESHOLD - 1e-9
for i, (centroid, _) in enumerate(clusters):
sim = _cosine(centroid, vec)
if sim > best_sim:
best_sim = sim
best_idx = i
if best_idx >= 0 and len(clusters) < _MAX_CLUSTERS:
centroid, cluster = clusters[best_idx]
cluster.tasks.append(task)
# Update centroid as running mean.
n = len(cluster.tasks)
new_centroid = [(c * (n - 1) + v) / n for c, v in zip(centroid, vec)]
clusters[best_idx] = (new_centroid, cluster)
elif len(clusters) < _MAX_CLUSTERS:
label = task.get("content", "Tasks")[:60]
cluster = Cluster(label=label, tasks=[task])
clusters.append((vec, cluster))
else:
# Overflow: append to closest cluster even below threshold.
best_i = max(range(len(clusters)), key=lambda i: _cosine(clusters[i][0], vec))
clusters[best_i][1].tasks.append(task)
return [c for _, c in clusters]
def _fallback_by_project(tasks: list[dict]) -> list[Cluster]:
"""Group by project_id when embeddings are unavailable."""
buckets: dict[str, Cluster] = {}
for task in tasks:
pid = task.get("project_id") or task.get("project") or "default"
if pid not in buckets:
label = pid if pid != "default" else "Tasks"
buckets[pid] = Cluster(label=label)
buckets[pid].tasks.append(task)
return list(buckets.values())
def cluster_tasks(
tasks: list[dict],
ollama_url: str | None = None,
) -> list[Cluster]:
"""Cluster tasks by semantic similarity.
Returns a non-empty list of Cluster objects. Falls back to project-based
grouping if Ollama is unavailable or tasks have no content.
"""
if not tasks:
return []
url = ollama_url or os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
# Separate tasks with usable content from those without.
with_content = [(t, t.get("content", "").strip()) for t in tasks]
embeddable = [(t, c) for t, c in with_content if c]
no_content = [t for t, c in with_content if not c]
if not embeddable:
return _fallback_by_project(tasks)
# Fetch embeddings (best-effort; None means Ollama unavailable).
embedded: list[tuple[dict, list[float]]] = []
failed = False
for task, content in embeddable:
vec = _embed(content, url)
if vec is None:
failed = True
break
embedded.append((task, vec))
if failed or not embedded:
log.info("cluster_tasks: ollama unavailable, falling back to project grouping")
return _fallback_by_project(tasks)
clusters = _greedy_cluster(embedded)
# Tasks without content get their own bucket if any.
if no_content:
clusters.append(Cluster(label="Other tasks", tasks=no_content))
return clusters

View File

@@ -1,16 +1,27 @@
from __future__ import annotations
from collections import defaultdict
from collections import Counter
from typing import ClassVar
from .base import BaseAgent, AgentInput, AgentOutput
from .manifest import AgentManifest
from .clustering import cluster_tasks
from .inference.history import UserHistory
from .manifest import AgentManifest, InferredParam
def _infer_preferred_areas(history: UserHistory) -> list[str]:
"""Top-2 project IDs by completed task count (last 90 days worth of data)."""
counts: Counter[str] = Counter()
for tc in history.task_completions:
if tc.project_id:
counts[tc.project_id] += 1
return [pid for pid, _ in counts.most_common(2)]
MANIFEST = AgentManifest(
id="focus-area",
version="1.1.0", # bumped: preferred_areas pref is now honoured in compute (#113)
description="Identifies the most congested project/area in the user's task list.",
version="2.0.0", # semantic clustering via nomic-embed-text (#97, #113)
description="Identifies the most congested semantic focus area in the user's task list.",
pref_schema={
"type": "object",
"additionalProperties": False,
@@ -19,7 +30,7 @@ MANIFEST = AgentManifest(
"type": "array",
"items": {"type": "string"},
"default": [],
"description": "Project / label names to prioritise when multiple areas tie.",
"description": "Project IDs or label names to prioritise when multiple areas tie.",
},
},
},
@@ -27,59 +38,75 @@ MANIFEST = AgentManifest(
required_consents=["data:core", "data:todoist", "agent:focus-area"],
output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=43_200,
# No inferred_params: preferred_areas requires project-level feedback linkage
# that isn't available in feedback_history alone. Revisit with #78 (signal
# abstraction) once per-task reactions can be traced back to a project.
inferred_params=[
InferredParam(
key="preferred_areas",
ttl_sec=86_400,
cold_start_default=[],
min_history=0, # use task_completions, not feedback events; handle empty inside
infer=_infer_preferred_areas,
),
],
)
class FocusAreaAgent(BaseAgent):
"""Identifies the most congested project/area in the user's task list."""
"""Identifies the most congested semantic focus area in the user's task list."""
agent_id: ClassVar[str] = MANIFEST.id
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
version: ClassVar[str] = MANIFEST.version
def compute(self, inp: AgentInput) -> AgentOutput:
preferred: list[str] = inp.agent_prefs.get("preferred_areas", [])
by_project: dict[str, list[dict]] = defaultdict(list)
for task in inp.tasks:
project = task.get("project_id") or task.get("project") or "default"
by_project[project].append(task)
if not by_project:
prompt = "No tasks available to identify a focus area."
return self._make_output(inp, prompt, {"project_count": 0})
def score(project: str, tasks: list[dict]) -> tuple[float, bool]:
base = sum(2.0 if t.get("is_overdue") else 1.0 for t in tasks)
# Boost preferred areas to break ties in their favour
boosted = project in preferred or any(p in project for p in preferred)
return (base + (0.5 if boosted else 0.0), boosted)
top_project, top_tasks = max(
by_project.items(),
key=lambda kv: score(kv[0], kv[1]),
if not inp.tasks:
return self._make_output(
inp,
"No tasks available to identify a focus area.",
{"cluster_count": 0, "strategy": "none"},
)
overdue_in_top = sum(1 for t in top_tasks if t.get("is_overdue"))
label = "the default project" if top_project == "default" else f'"{top_project}"'
n = len(top_tasks)
boosted = top_project in preferred or any(p in top_project for p in preferred)
clusters = cluster_tasks(inp.tasks)
if not clusters:
return self._make_output(
inp,
"No tasks available to identify a focus area.",
{"cluster_count": 0, "strategy": "none"},
)
strategy = "semantic" if len(clusters) > 1 or len(inp.tasks) > 1 else "fallback"
def score(cluster) -> float:
base = sum(2.0 if t.get("is_overdue") else 1.0 for t in cluster.tasks)
boosted = any(p in cluster.label for p in preferred) if preferred else False
return base + (0.5 if boosted else 0.0)
top = max(clusters, key=score)
boosted = bool(preferred) and any(p in top.label for p in preferred)
parts = [
f"The user's most congested area is {label} "
f"({n} task{'s' if n != 1 else ''}, {overdue_in_top} overdue)."
f'The user\'s most active focus area is "{top.label}" '
f"({top.task_count} task{'s' if top.task_count != 1 else ''}, "
f"{top.overdue_count} overdue)."
]
if boosted:
parts.append("This area matches the user's stated focus preferences.")
if overdue_in_top >= 3:
if top.overdue_count >= 3:
parts.append("Consider surfacing an action from this area.")
if len(clusters) > 1:
other_total = sum(c.task_count for c in clusters if c is not top)
parts.append(
f"{len(clusters) - 1} other area{'s' if len(clusters) > 2 else ''} "
f"contain {other_total} task{'s' if other_total != 1 else ''}."
)
prompt = " ".join(parts)
snapshot = {
"top_project": top_project,
"top_task_count": n,
"top_overdue_count": overdue_in_top,
"project_count": len(by_project),
"top_cluster_label": top.label,
"top_task_count": top.task_count,
"top_overdue_count": top.overdue_count,
"cluster_count": len(clusters),
"strategy": strategy,
"preferred_areas": preferred,
}
return self._make_output(inp, prompt, snapshot)
return self._make_output(inp, " ".join(parts), snapshot)

View File

@@ -240,11 +240,13 @@ class TestFocusAreaAgent:
def test_default_project_fallback(self):
out = self.agent.compute(_inp(tasks=[_task("No project task")]))
assert "default project" in out.prompt_text
# Tasks without project_id fall back to a "Tasks" bucket
assert "Tasks" in out.prompt_text
def test_snapshot_keys(self):
out = self.agent.compute(_inp(tasks=[_task("T1", project_id="A")]))
assert {"top_project", "top_task_count", "top_overdue_count", "project_count", "preferred_areas"} == set(out.signals_snapshot)
assert {"top_cluster_label", "top_task_count", "top_overdue_count", "cluster_count",
"strategy", "preferred_areas"} == set(out.signals_snapshot)
# ── Registry ─────────────────────────────────────────────────────────────────

View File

@@ -0,0 +1,135 @@
"""Unit tests for ml.agents.clustering (issue #97).
Embedding calls are mocked so tests run without Ollama.
"""
from __future__ import annotations
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
from unittest.mock import patch
from ml.agents.clustering import cluster_tasks, Cluster, _greedy_cluster, _cosine
# ── helpers ──────────────────────────────────────────────────────────────────
def _task(content: str, project_id: str | None = None, is_overdue: bool = False) -> dict:
t: dict = {"content": content, "is_overdue": is_overdue}
if project_id:
t["project_id"] = project_id
return t
def _embed_seq(*vecs):
"""Return a side_effect list so successive _embed calls return these vectors."""
return list(vecs)
# ── Cluster dataclass ─────────────────────────────────────────────────────────
class TestCluster:
def test_task_count(self):
c = Cluster(label="X", tasks=[_task("a"), _task("b")])
assert c.task_count == 2
def test_overdue_count(self):
c = Cluster(label="X", tasks=[_task("a", is_overdue=True), _task("b")])
assert c.overdue_count == 1
# ── cosine similarity ─────────────────────────────────────────────────────────
class TestCosine:
def test_identical_vectors(self):
v = [1.0, 0.0, 0.0]
assert _cosine(v, v) == 1.0
def test_orthogonal_vectors(self):
assert _cosine([1.0, 0.0], [0.0, 1.0]) == 0.0
def test_zero_vector(self):
assert _cosine([0.0, 0.0], [1.0, 0.0]) == 0.0
# ── greedy clustering ─────────────────────────────────────────────────────────
class TestGreedyClustering:
def _similar_vec(self, base: list[float], noise: float = 0.01) -> list[float]:
return [x + noise for x in base]
def test_similar_tasks_grouped(self):
v = [1.0, 0.0, 0.0]
v2 = [0.999, 0.001, 0.0]
items = [
(_task("A"), v),
(_task("B"), v2),
]
clusters = _greedy_cluster(items)
assert len(clusters) == 1
assert clusters[0].task_count == 2
def test_dissimilar_tasks_separate(self):
v1 = [1.0, 0.0, 0.0]
v2 = [0.0, 1.0, 0.0]
items = [(_task("A"), v1), (_task("B"), v2)]
clusters = _greedy_cluster(items)
assert len(clusters) == 2
def test_label_from_first_task(self):
v = [1.0, 0.0]
clusters = _greedy_cluster([(_task("Write report"), v)])
assert clusters[0].label == "Write report"
# ── cluster_tasks integration ─────────────────────────────────────────────────
class TestClusterTasks:
def test_empty_tasks(self):
result = cluster_tasks([])
assert result == []
def test_fallback_when_ollama_unavailable(self):
with patch("ml.agents.clustering._embed", return_value=None):
tasks = [_task("A", "p1"), _task("B", "p2"), _task("C", "p1")]
clusters = cluster_tasks(tasks)
assert len(clusters) == 2
labels = {c.label for c in clusters}
assert "p1" in labels and "p2" in labels
def test_fallback_groups_by_project(self):
with patch("ml.agents.clustering._embed", return_value=None):
tasks = [_task("A", "work")] * 3 + [_task("B", "home")] * 2
clusters = cluster_tasks(tasks)
by_label = {c.label: c.task_count for c in clusters}
assert by_label["work"] == 3
assert by_label["home"] == 2
def test_tasks_without_content_go_to_other(self):
v = [1.0, 0.0]
with patch("ml.agents.clustering._embed", return_value=v):
tasks = [_task("Has content"), {"is_overdue": False}]
clusters = cluster_tasks(tasks)
labels = {c.label for c in clusters}
assert "Other tasks" in labels
def test_semantic_clustering_groups_similar(self):
v_work = [1.0, 0.0, 0.0]
v_home = [0.0, 1.0, 0.0]
side_effects = [v_work, v_work, v_home, v_home]
with patch("ml.agents.clustering._embed", side_effect=side_effects):
tasks = [
_task("Write report"),
_task("Review PR"),
_task("Buy groceries"),
_task("Cook dinner"),
]
clusters = cluster_tasks(tasks)
assert len(clusters) == 2
assert all(c.task_count == 2 for c in clusters)
def test_all_tasks_no_content_fallback_by_project(self):
tasks = [{"project_id": "p1", "is_overdue": False},
{"project_id": "p2", "is_overdue": False}]
clusters = cluster_tasks(tasks)
assert len(clusters) == 2

View File

@@ -662,4 +662,51 @@ class TestFocusAreaPreferredAreas:
def test_version_bumped(self):
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
assert FA_MANIFEST.version == "1.1.0"
assert FA_MANIFEST.version == "2.0.0"
def test_snapshot_uses_cluster_keys(self):
tasks = [self._task("T", "work")]
out = self.agent.compute(_inp(tasks=tasks))
assert "top_cluster_label" in out.signals_snapshot
assert "cluster_count" in out.signals_snapshot
assert "strategy" in out.signals_snapshot
# ── focus-area: preferred_areas inference from task_completions (#113) ────────
class TestFocusAreaPreferredAreasInference:
from ml.agents.focus_area import MANIFEST as _FA_MANIFEST
def _completion(self, project_id: str) -> TaskCompletion:
return _completion(project_id, lateness_days=0.0)
def test_cold_start_no_completions(self):
history = _history(completions=[])
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == []
def test_top_two_projects_returned(self):
completions = (
[_completion("p1", 0)] * 8
+ [_completion("p2", 0)] * 5
+ [_completion("p3", 0)] * 2
)
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["p1", "p2"]
def test_single_project_returns_one(self):
completions = [_completion("work", 0)] * 6
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["work"]
def test_none_project_id_ignored(self):
completions = [_completion(None, 0)] * 5 + [_completion("real", 0)] * 3
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["real"]