"""Semantic task clustering via nomic-embed-text (issue #97, #129). Public API: cluster_tasks(tasks) -> list[Cluster] Each task dict must have a "content" key. Tasks without content are placed in a fallback "other" bucket. If the embedding service is unreachable, falls back to grouping by project_id so compute() always returns something useful. Pipeline (ported from taskpile experiments/clustering_eval, prompt v1): 1. Expand each raw title via LiteLLM `tip-generator` (qwen2.5:1.5b) into a 3-sentence description. Cached in-memory by content hash within a compute cycle so duplicate titles cost one LLM call. 2. Prefix the expanded text with "clustering: " (nomic-embed-text task prefix). 3. Batch-embed via LiteLLM `embedder` (nomic-embed-text). Falls back to embedding raw titles when LLM expansion fails, and to project-based grouping when embeddings are unavailable. """ from __future__ import annotations import hashlib import logging import math import os from dataclasses import dataclass, field import httpx log = logging.getLogger(__name__) # Cosine similarity threshold for merging tasks into the same cluster. _SIM_THRESHOLD = 0.72 # Never produce more than this many clusters regardless of task count. _MAX_CLUSTERS = 6 _EMBED_TIMEOUT = 15.0 _ENRICH_TIMEOUT = 30.0 _ENRICH_PROMPT_V1 = ( "You are helping categorize a personal task. " "Write exactly 3 sentences in English describing what the task likely involves, " "what context or skills it needs, and why it might matter. " "Be concise and specific. Do not use bullet points or numbering.\n" "Task: {title}\n" "Description:" ) @dataclass class Cluster: label: str # representative task content (shortest, most central) tasks: list[dict] = field(default_factory=list) @property def task_count(self) -> int: return len(self.tasks) @property def overdue_count(self) -> int: return sum(1 for t in self.tasks if t.get("is_overdue")) # --------------------------------------------------------------------------- # LLM enrichment # --------------------------------------------------------------------------- def _content_hash(text: str) -> str: return hashlib.md5(text.encode()).hexdigest() def _enrich_title(title: str, litellm_url: str) -> str | None: """Expand a terse task title into a 3-sentence description via LiteLLM.""" try: with httpx.Client(trust_env=False, timeout=_ENRICH_TIMEOUT) as c: r = c.post( f"{litellm_url}/chat/completions", json={ "model": "tip-generator", "messages": [{"role": "user", "content": _ENRICH_PROMPT_V1.format(title=title)}], "max_tokens": 120, "temperature": 0.3, }, ) r.raise_for_status() return r.json()["choices"][0]["message"]["content"].strip() except Exception as exc: log.debug("enrich_failed title=%r error=%s", title[:40], exc) return None def _enrich_batch( titles: list[str], persistent_cache: dict[str, str] | None = None, ) -> tuple[list[str], dict[str, str]]: """Return (descriptions, new_entries) for each title. Checks persistent_cache (pre-fetched from DB) first, then falls back to calling LiteLLM. new_entries contains only hashes generated this call — the caller should persist these to the DB. """ litellm_url = os.getenv("LITELLM_URL") if not litellm_url: log.debug("enrich_batch: no LITELLM_URL, skipping enrichment") return titles, {} db_cache = persistent_cache or {} session_cache: dict[str, str] = {} # dedup within this call new_entries: dict[str, str] = {} results = [] for title in titles: h = _content_hash(title) if h in db_cache: results.append(db_cache[h]) elif h in session_cache: results.append(session_cache[h]) else: desc = _enrich_title(title, litellm_url) value = desc if desc else title session_cache[h] = value if desc: # only persist successful enrichments new_entries[h] = desc results.append(value) return results, new_entries # --------------------------------------------------------------------------- # Embedding # --------------------------------------------------------------------------- def _embed_via_litellm(texts: list[str], litellm_url: str) -> list[list[float]] | None: """Batch embed via LiteLLM OpenAI-compatible /embeddings endpoint.""" try: with httpx.Client(trust_env=False, timeout=_EMBED_TIMEOUT) as c: r = c.post( f"{litellm_url}/embeddings", json={"model": "embedder", "input": texts}, ) r.raise_for_status() data = r.json().get("data", []) ordered = sorted(data, key=lambda x: x["index"]) return [item["embedding"] for item in ordered] except Exception as exc: log.debug("litellm_embed_failed error=%s", exc) return None def _embed_via_ollama(texts: list[str], ollama_url: str) -> list[list[float]] | None: """Batch embed via Ollama /api/embed endpoint.""" try: results = [] with httpx.Client(trust_env=False, timeout=_EMBED_TIMEOUT) as c: for text in texts: r = c.post( f"{ollama_url}/api/embed", json={"model": "nomic-embed-text", "input": text}, ) r.raise_for_status() body = r.json() # /api/embed returns {"embeddings": [[...]]} embeddings = body.get("embeddings") if not embeddings: return None results.append(embeddings[0]) return results except Exception as exc: log.debug("ollama_embed_failed error=%s", exc) return None def _embed_batch(texts: list[str]) -> list[list[float]] | None: """Embed a list of texts, preferring LiteLLM over direct Ollama.""" litellm_url = os.getenv("LITELLM_URL") if litellm_url: vecs = _embed_via_litellm(texts, litellm_url) if vecs is not None: return vecs log.info("cluster: litellm embed failed, trying ollama fallback") ollama_url = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") return _embed_via_ollama(texts, ollama_url) # --------------------------------------------------------------------------- # Clustering # --------------------------------------------------------------------------- def _cosine(a: list[float], b: list[float]) -> float: dot = sum(x * y for x, y in zip(a, b)) na = math.sqrt(sum(x * x for x in a)) nb = math.sqrt(sum(x * x for x in b)) if na == 0 or nb == 0: return 0.0 return dot / (na * nb) def _greedy_cluster(items: list[tuple[dict, list[float]]]) -> list[Cluster]: """Single-pass greedy clustering: each item joins the first existing cluster whose centroid is above _SIM_THRESHOLD, else starts a new one.""" clusters: list[tuple[list[float], Cluster]] = [] # (centroid, cluster) for task, vec in items: best_idx = -1 best_sim = _SIM_THRESHOLD - 1e-9 for i, (centroid, _) in enumerate(clusters): sim = _cosine(centroid, vec) if sim > best_sim: best_sim = sim best_idx = i if best_idx >= 0 and len(clusters) < _MAX_CLUSTERS: centroid, cluster = clusters[best_idx] cluster.tasks.append(task) # Update centroid as running mean. n = len(cluster.tasks) new_centroid = [(c * (n - 1) + v) / n for c, v in zip(centroid, vec)] clusters[best_idx] = (new_centroid, cluster) elif len(clusters) < _MAX_CLUSTERS: label = task.get("content", "Tasks")[:60] cluster = Cluster(label=label, tasks=[task]) clusters.append((vec, cluster)) else: # Overflow: append to closest cluster even below threshold. best_i = max(range(len(clusters)), key=lambda i: _cosine(clusters[i][0], vec)) clusters[best_i][1].tasks.append(task) return [c for _, c in clusters] def _fallback_by_project(tasks: list[dict]) -> list[Cluster]: """Group by project_id when embeddings are unavailable.""" buckets: dict[str, Cluster] = {} for task in tasks: pid = task.get("project_id") or task.get("project") or "default" if pid not in buckets: label = pid if pid != "default" else "Tasks" buckets[pid] = Cluster(label=label) buckets[pid].tasks.append(task) return list(buckets.values()) def cluster_tasks( tasks: list[dict], ollama_url: str | None = None, # kept for test compatibility; env vars take precedence enrichment_cache: dict[str, str] | None = None, ) -> tuple[list[Cluster], dict[str, str]]: """Cluster tasks by semantic similarity. Returns (clusters, new_enrichments). new_enrichments contains LLM-generated descriptions produced this call that were not in the persistent cache — the caller should persist these. Falls back to project-based grouping if the embedding service is unavailable or tasks have no content. """ if not tasks: return [], {} # Separate tasks with usable content from those without. with_content = [(t, t.get("content", "").strip()) for t in tasks] embeddable = [(t, c) for t, c in with_content if c] no_content = [t for t, c in with_content if not c] if not embeddable: return _fallback_by_project(tasks), {} task_objs = [t for t, _ in embeddable] raw_titles = [c for _, c in embeddable] # Step 1: LLM-enrich titles → richer semantic signal before embedding. descriptions, new_enrichments = _enrich_batch(raw_titles, persistent_cache=enrichment_cache) # Attach enriched description to each task dict so consumers (e.g. focus-area) # can show the expanded text instead of the terse raw title. for task, desc in zip(task_objs, descriptions): task["enriched_description"] = desc # Step 2: Prefix with nomic-embed-text task prefix, then batch-embed. prefixed = [f"clustering: {d}" for d in descriptions] vecs = _embed_batch(prefixed) if vecs is None or len(vecs) != len(prefixed): log.info("cluster_tasks: embedding unavailable, falling back to project grouping") return _fallback_by_project(tasks), new_enrichments embedded = list(zip(task_objs, vecs)) clusters = _greedy_cluster(embedded) if no_content: clusters.append(Cluster(label="Other tasks", tasks=no_content)) return clusters, new_enrichments