"""Semantic task clustering via nomic-embed-text (issue #97). Public API: cluster_tasks(tasks, ollama_url) -> list[Cluster] Each task dict must have a "content" key. Tasks without content are placed in a fallback "other" bucket. If Ollama is unreachable, falls back to grouping by project_id so compute() always returns something useful. """ from __future__ import annotations import logging import math import os from dataclasses import dataclass, field import httpx log = logging.getLogger(__name__) # Cosine similarity threshold for merging tasks into the same cluster. _SIM_THRESHOLD = 0.72 # Never produce more than this many clusters regardless of task count. _MAX_CLUSTERS = 6 _EMBED_TIMEOUT = 10.0 @dataclass class Cluster: label: str # representative task content (shortest, most central) tasks: list[dict] = field(default_factory=list) @property def task_count(self) -> int: return len(self.tasks) @property def overdue_count(self) -> int: return sum(1 for t in self.tasks if t.get("is_overdue")) def _embed(text: str, ollama_url: str) -> list[float] | None: try: with httpx.Client(trust_env=False, timeout=_EMBED_TIMEOUT) as c: r = c.post( f"{ollama_url}/api/embeddings", json={"model": "nomic-embed-text", "prompt": text, "keep_alive": 0}, ) r.raise_for_status() return r.json().get("embedding") except Exception as exc: log.debug("embed_failed text=%r error=%s", text[:40], exc) return None def _cosine(a: list[float], b: list[float]) -> float: dot = sum(x * y for x, y in zip(a, b)) na = math.sqrt(sum(x * x for x in a)) nb = math.sqrt(sum(x * x for x in b)) if na == 0 or nb == 0: return 0.0 return dot / (na * nb) def _greedy_cluster(items: list[tuple[dict, list[float]]]) -> list[Cluster]: """Single-pass greedy clustering: each item joins the first existing cluster whose centroid is above _SIM_THRESHOLD, else starts a new one.""" clusters: list[tuple[list[float], Cluster]] = [] # (centroid, cluster) for task, vec in items: best_idx = -1 best_sim = _SIM_THRESHOLD - 1e-9 for i, (centroid, _) in enumerate(clusters): sim = _cosine(centroid, vec) if sim > best_sim: best_sim = sim best_idx = i if best_idx >= 0 and len(clusters) < _MAX_CLUSTERS: centroid, cluster = clusters[best_idx] cluster.tasks.append(task) # Update centroid as running mean. n = len(cluster.tasks) new_centroid = [(c * (n - 1) + v) / n for c, v in zip(centroid, vec)] clusters[best_idx] = (new_centroid, cluster) elif len(clusters) < _MAX_CLUSTERS: label = task.get("content", "Tasks")[:60] cluster = Cluster(label=label, tasks=[task]) clusters.append((vec, cluster)) else: # Overflow: append to closest cluster even below threshold. best_i = max(range(len(clusters)), key=lambda i: _cosine(clusters[i][0], vec)) clusters[best_i][1].tasks.append(task) return [c for _, c in clusters] def _fallback_by_project(tasks: list[dict]) -> list[Cluster]: """Group by project_id when embeddings are unavailable.""" buckets: dict[str, Cluster] = {} for task in tasks: pid = task.get("project_id") or task.get("project") or "default" if pid not in buckets: label = pid if pid != "default" else "Tasks" buckets[pid] = Cluster(label=label) buckets[pid].tasks.append(task) return list(buckets.values()) def cluster_tasks( tasks: list[dict], ollama_url: str | None = None, ) -> list[Cluster]: """Cluster tasks by semantic similarity. Returns a non-empty list of Cluster objects. Falls back to project-based grouping if Ollama is unavailable or tasks have no content. """ if not tasks: return [] url = ollama_url or os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") # Separate tasks with usable content from those without. with_content = [(t, t.get("content", "").strip()) for t in tasks] embeddable = [(t, c) for t, c in with_content if c] no_content = [t for t, c in with_content if not c] if not embeddable: return _fallback_by_project(tasks) # Fetch embeddings (best-effort; None means Ollama unavailable). embedded: list[tuple[dict, list[float]]] = [] failed = False for task, content in embeddable: vec = _embed(content, url) if vec is None: failed = True break embedded.append((task, vec)) if failed or not embedded: log.info("cluster_tasks: ollama unavailable, falling back to project grouping") return _fallback_by_project(tasks) clusters = _greedy_cluster(embedded) # Tasks without content get their own bucket if any. if no_content: clusters.append(Cluster(label="Other tasks", tasks=no_content)) return clusters