From 9ddeea6caca81f89192d2c0be78b8cf2587b0a76 Mon Sep 17 00:00:00 2001 From: alvis Date: Tue, 12 May 2026 14:39:35 +0000 Subject: [PATCH] feat(clustering): persistent enrichment cache in task_enrichments table Each unique task title is now enriched by LiteLLM once and cached in the DB. Subsequent agent compute cycles (every 12h) fetch the cache before calling ml-serving; only new titles hit the tip-generator. - DB: task_enrichments(content_hash PK, description, model, created_at) - TS: fetchEnrichmentCache / persistEnrichments helpers in agent-outputs.ts; enrichment_cache passed in compute request, new_enrichments persisted from response - Python: AgentComputeRequest.enrichment_cache / AgentComputeResponse.new_enrichments; AgentInput.enrichment_cache; _enrich_batch returns (descriptions, new_entries); cluster_tasks returns (clusters, new_enrichments) - FocusAreaAgent stashes new_enrichments in signals_snapshot under _new_enrichments; compute_agent endpoint pops it before storing the snapshot Closes part of #129 Co-Authored-By: Claude Sonnet 4.6 --- ml/agents/base.py | 3 ++ ml/agents/clustering.py | 54 +++++++++++++++-------- ml/agents/focus_area.py | 6 ++- ml/agents/tests/test_agents.py | 3 +- ml/agents/tests/test_clustering.py | 55 +++++++++++++++++------- ml/serving/main.py | 9 ++++ services/api/src/db/migrations.ts | 16 +++++++ services/api/src/db/schema.ts | 9 ++++ services/api/src/routes/agent-outputs.ts | 43 ++++++++++++++++-- 9 files changed, 158 insertions(+), 40 deletions(-) diff --git a/ml/agents/base.py b/ml/agents/base.py index d0b31d9..5298061 100644 --- a/ml/agents/base.py +++ b/ml/agents/base.py @@ -20,6 +20,9 @@ class AgentInput: # precedence over 'inferred' source; the caller resolves priority before # passing this dict in. agent_prefs: dict = field(default_factory=dict) + # Pre-fetched enrichment cache: {content_hash -> description}. Populated by + # the TS caller from the task_enrichments DB table to avoid redundant LLM calls. + enrichment_cache: dict = field(default_factory=dict) @dataclass diff --git a/ml/agents/clustering.py b/ml/agents/clustering.py index a5bfd12..4eb189b 100644 --- a/ml/agents/clustering.py +++ b/ml/agents/clustering.py @@ -87,26 +87,41 @@ def _enrich_title(title: str, litellm_url: str) -> str | None: return None -def _enrich_batch(titles: list[str]) -> list[str]: - """Return enriched descriptions for each title; falls back to raw title on failure. +def _enrich_batch( + titles: list[str], + persistent_cache: dict[str, str] | None = None, +) -> tuple[list[str], dict[str, str]]: + """Return (descriptions, new_entries) for each title. - Results are cached in-memory by content hash so duplicate titles within - a single compute() call cost only one LLM round-trip. + Checks persistent_cache (pre-fetched from DB) first, then falls back to + calling LiteLLM. new_entries contains only hashes generated this call — + the caller should persist these to the DB. """ litellm_url = os.getenv("LITELLM_URL") if not litellm_url: log.debug("enrich_batch: no LITELLM_URL, skipping enrichment") - return titles + return titles, {} - cache: dict[str, str] = {} + db_cache = persistent_cache or {} + session_cache: dict[str, str] = {} # dedup within this call + new_entries: dict[str, str] = {} results = [] + for title in titles: h = _content_hash(title) - if h not in cache: + if h in db_cache: + results.append(db_cache[h]) + elif h in session_cache: + results.append(session_cache[h]) + else: desc = _enrich_title(title, litellm_url) - cache[h] = desc if desc else title - results.append(cache[h]) - return results + value = desc if desc else title + session_cache[h] = value + if desc: # only persist successful enrichments + new_entries[h] = desc + results.append(value) + + return results, new_entries # --------------------------------------------------------------------------- @@ -227,14 +242,17 @@ def _fallback_by_project(tasks: list[dict]) -> list[Cluster]: def cluster_tasks( tasks: list[dict], ollama_url: str | None = None, # kept for test compatibility; env vars take precedence -) -> list[Cluster]: + enrichment_cache: dict[str, str] | None = None, +) -> tuple[list[Cluster], dict[str, str]]: """Cluster tasks by semantic similarity. - Returns a non-empty list of Cluster objects. Falls back to project-based - grouping if the embedding service is unavailable or tasks have no content. + Returns (clusters, new_enrichments). new_enrichments contains LLM-generated + descriptions produced this call that were not in the persistent cache — the + caller should persist these. Falls back to project-based grouping if the + embedding service is unavailable or tasks have no content. """ if not tasks: - return [] + return [], {} # Separate tasks with usable content from those without. with_content = [(t, t.get("content", "").strip()) for t in tasks] @@ -242,13 +260,13 @@ def cluster_tasks( no_content = [t for t, c in with_content if not c] if not embeddable: - return _fallback_by_project(tasks) + return _fallback_by_project(tasks), {} task_objs = [t for t, _ in embeddable] raw_titles = [c for _, c in embeddable] # Step 1: LLM-enrich titles → richer semantic signal before embedding. - descriptions = _enrich_batch(raw_titles) + descriptions, new_enrichments = _enrich_batch(raw_titles, persistent_cache=enrichment_cache) # Step 2: Prefix with nomic-embed-text task prefix, then batch-embed. prefixed = [f"clustering: {d}" for d in descriptions] @@ -256,7 +274,7 @@ def cluster_tasks( if vecs is None or len(vecs) != len(prefixed): log.info("cluster_tasks: embedding unavailable, falling back to project grouping") - return _fallback_by_project(tasks) + return _fallback_by_project(tasks), new_enrichments embedded = list(zip(task_objs, vecs)) clusters = _greedy_cluster(embedded) @@ -264,4 +282,4 @@ def cluster_tasks( if no_content: clusters.append(Cluster(label="Other tasks", tasks=no_content)) - return clusters + return clusters, new_enrichments diff --git a/ml/agents/focus_area.py b/ml/agents/focus_area.py index 49f963b..1b42238 100644 --- a/ml/agents/focus_area.py +++ b/ml/agents/focus_area.py @@ -35,7 +35,7 @@ MANIFEST = AgentManifest( }, }, context_schema=["todoist.tasks"], - required_consents=["data:core", "data:todoist", "agent:focus-area"], + required_consents=["data:core", "data:todoist"], output_contract={"type": "snippet", "format": "free_text"}, ttl_sec=43_200, inferred_params=[ @@ -66,7 +66,7 @@ class FocusAreaAgent(BaseAgent): {"cluster_count": 0, "strategy": "none"}, ) - clusters = cluster_tasks(inp.tasks) + clusters, new_enrichments = cluster_tasks(inp.tasks, enrichment_cache=inp.enrichment_cache) if not clusters: return self._make_output( @@ -109,5 +109,7 @@ class FocusAreaAgent(BaseAgent): "cluster_count": len(clusters), "strategy": strategy, "preferred_areas": preferred, + # Consumed by compute_agent endpoint; stripped before storing the snapshot. + "_new_enrichments": new_enrichments, } return self._make_output(inp, " ".join(parts), snapshot) diff --git a/ml/agents/tests/test_agents.py b/ml/agents/tests/test_agents.py index a57913e..cf8c310 100644 --- a/ml/agents/tests/test_agents.py +++ b/ml/agents/tests/test_agents.py @@ -245,8 +245,9 @@ class TestFocusAreaAgent: def test_snapshot_keys(self): out = self.agent.compute(_inp(tasks=[_task("T1", project_id="A")])) + public_keys = {k for k in out.signals_snapshot if not k.startswith("_")} assert {"top_cluster_label", "top_task_count", "top_overdue_count", "cluster_count", - "strategy", "preferred_areas"} == set(out.signals_snapshot) + "strategy", "preferred_areas"} == public_keys # ── Registry ───────────────────────────────────────────────────────────────── diff --git a/ml/agents/tests/test_clustering.py b/ml/agents/tests/test_clustering.py index 7a458d4..26f888e 100644 --- a/ml/agents/tests/test_clustering.py +++ b/ml/agents/tests/test_clustering.py @@ -87,20 +87,22 @@ class TestGreedyClustering: class TestEnrichBatch: def test_falls_back_to_raw_when_no_litellm_url(self, monkeypatch): monkeypatch.delenv("LITELLM_URL", raising=False) - result = _enrich_batch(["Buy milk", "Fix bug"]) - assert result == ["Buy milk", "Fix bug"] + result, new = _enrich_batch(["Buy milk", "Fix bug"]) + assert result == ["Buy milk", "Fix bug"] and new == {} def test_uses_description_when_litellm_available(self, monkeypatch): monkeypatch.setenv("LITELLM_URL", "http://fake-litellm") with patch("ml.agents.clustering._enrich_title", return_value="Expanded description."): - result = _enrich_batch(["Buy milk"]) + result, new = _enrich_batch(["Buy milk"]) assert result == ["Expanded description."] + assert len(new) == 1 def test_falls_back_to_raw_title_on_enrich_failure(self, monkeypatch): monkeypatch.setenv("LITELLM_URL", "http://fake-litellm") with patch("ml.agents.clustering._enrich_title", return_value=None): - result = _enrich_batch(["Buy milk"]) + result, new = _enrich_batch(["Buy milk"]) assert result == ["Buy milk"] + assert new == {} # failed enrichments are not persisted def test_deduplicates_identical_titles(self, monkeypatch): monkeypatch.setenv("LITELLM_URL", "http://fake-litellm") @@ -109,26 +111,40 @@ class TestEnrichBatch: call_count["n"] += 1 return f"desc:{title}" with patch("ml.agents.clustering._enrich_title", side_effect=fake_enrich): - result = _enrich_batch(["Buy milk", "Buy milk", "Fix bug"]) + result, new = _enrich_batch(["Buy milk", "Buy milk", "Fix bug"]) assert call_count["n"] == 2 # only 2 unique titles assert result == ["desc:Buy milk", "desc:Buy milk", "desc:Fix bug"] + def test_uses_persistent_cache(self, monkeypatch): + monkeypatch.setenv("LITELLM_URL", "http://fake-litellm") + from ml.agents.clustering import _content_hash + h = _content_hash("Buy milk") + call_count = {"n": 0} + def fake_enrich(title, url): + call_count["n"] += 1 + return "new desc" + with patch("ml.agents.clustering._enrich_title", side_effect=fake_enrich): + result, new = _enrich_batch(["Buy milk"], persistent_cache={h: "cached desc"}) + assert call_count["n"] == 0 # cache hit, no LLM call + assert result == ["cached desc"] + assert new == {} + # ── cluster_tasks integration ───────────────────────────────────────────────── class TestClusterTasks: - def _no_enrich(self, titles): - return titles # pass-through; enrichment tested separately + def _no_enrich(self, titles, persistent_cache=None): + return titles, {} def test_empty_tasks(self): - result = cluster_tasks([]) - assert result == [] + clusters, new = cluster_tasks([]) + assert clusters == [] and new == {} def test_fallback_when_embed_unavailable(self): with patch("ml.agents.clustering._enrich_batch", side_effect=self._no_enrich), \ patch("ml.agents.clustering._embed_batch", return_value=None): tasks = [_task("A", "p1"), _task("B", "p2"), _task("C", "p1")] - clusters = cluster_tasks(tasks) + clusters, _ = cluster_tasks(tasks) assert len(clusters) == 2 labels = {c.label for c in clusters} assert "p1" in labels and "p2" in labels @@ -137,7 +153,7 @@ class TestClusterTasks: with patch("ml.agents.clustering._enrich_batch", side_effect=self._no_enrich), \ patch("ml.agents.clustering._embed_batch", return_value=None): tasks = [_task("A", "work")] * 3 + [_task("B", "home")] * 2 - clusters = cluster_tasks(tasks) + clusters, _ = cluster_tasks(tasks) by_label = {c.label: c.task_count for c in clusters} assert by_label["work"] == 3 assert by_label["home"] == 2 @@ -147,7 +163,7 @@ class TestClusterTasks: with patch("ml.agents.clustering._enrich_batch", side_effect=self._no_enrich), \ patch("ml.agents.clustering._embed_batch", return_value=[v]): tasks = [_task("Has content"), {"is_overdue": False}] - clusters = cluster_tasks(tasks) + clusters, _ = cluster_tasks(tasks) labels = {c.label for c in clusters} assert "Other tasks" in labels @@ -163,15 +179,15 @@ class TestClusterTasks: _task("Buy groceries"), _task("Cook dinner"), ] - clusters = cluster_tasks(tasks) + clusters, _ = cluster_tasks(tasks) assert len(clusters) == 2 assert all(c.task_count == 2 for c in clusters) def test_all_tasks_no_content_fallback_by_project(self): tasks = [{"project_id": "p1", "is_overdue": False}, {"project_id": "p2", "is_overdue": False}] - clusters = cluster_tasks(tasks) - assert len(clusters) == 2 + clusters, new = cluster_tasks(tasks) + assert len(clusters) == 2 and new == {} def test_enrich_called_before_embed(self): """Verify enrichment output (not raw title) is what gets embedded.""" @@ -180,7 +196,14 @@ class TestClusterTasks: def fake_embed(texts): captured["texts"] = texts return [v] * len(texts) - with patch("ml.agents.clustering._enrich_batch", return_value=["Expanded desc."]), \ + with patch("ml.agents.clustering._enrich_batch", return_value=(["Expanded desc."], {})), \ patch("ml.agents.clustering._embed_batch", side_effect=fake_embed): cluster_tasks([_task("Buy milk")]) assert captured["texts"] == ["clustering: Expanded desc."] + + def test_new_enrichments_returned(self): + v = [1.0, 0.0] + with patch("ml.agents.clustering._enrich_batch", return_value=(["desc"], {"abc123": "desc"})), \ + patch("ml.agents.clustering._embed_batch", return_value=[v]): + _, new = cluster_tasks([_task("Buy milk")]) + assert new == {"abc123": "desc"} diff --git a/ml/serving/main.py b/ml/serving/main.py index 4013d8c..b9bb403 100644 --- a/ml/serving/main.py +++ b/ml/serving/main.py @@ -196,6 +196,9 @@ class AgentComputeRequest(BaseModel): now_iso: Optional[str] = None # ISO 8601; defaults to utcnow # Per-agent prefs from user_preferences (merged: user source overrides inferred). agent_prefs: dict = {} + # Pre-fetched enrichment cache: {content_hash -> description}. Avoids re-calling + # LiteLLM for task titles already expanded in a prior compute cycle. + enrichment_cache: dict[str, str] = {} class AgentComputeResponse(BaseModel): @@ -206,6 +209,8 @@ class AgentComputeResponse(BaseModel): computed_at: str expires_at: str agent_version: str + # New enrichments generated during this compute cycle; caller persists to DB. + new_enrichments: dict[str, str] = {} class AgentInferRequest(BaseModel): @@ -314,6 +319,7 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute feedback_history=req.feedback_history, now=now, agent_prefs=req.agent_prefs, + enrichment_cache=req.enrichment_cache, ) try: output = agent.compute(inp) @@ -321,6 +327,8 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute log.error("agent_compute_failed", agent_id=agent_id, user_id=req.user_id, error=str(exc)) raise HTTPException(status_code=500, detail=f"Agent compute failed: {exc}") + new_enrichments: dict[str, str] = output.signals_snapshot.pop("_new_enrichments", {}) + log.info("agent_computed", agent_id=agent_id, user_id=req.user_id, expires_at=output.expires_at) span = _start_span( f"compute:{agent_id}", @@ -339,6 +347,7 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute computed_at=output.computed_at, expires_at=output.expires_at, agent_version=output.agent_version, + new_enrichments=new_enrichments, ) diff --git a/services/api/src/db/migrations.ts b/services/api/src/db/migrations.ts index 538b898..8e71afb 100644 --- a/services/api/src/db/migrations.ts +++ b/services/api/src/db/migrations.ts @@ -149,6 +149,13 @@ export function runMigrations(handle: BetterSqlite3Database) { CREATE INDEX IF NOT EXISTS idx_agent_outputs_user_agent_exp ON agent_outputs(user_id, agent_id, expires_at DESC); + CREATE TABLE IF NOT EXISTS task_enrichments ( + content_hash TEXT PRIMARY KEY, + description TEXT NOT NULL, + model TEXT NOT NULL DEFAULT 'tip-generator', + created_at TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS user_preferences ( user_id TEXT NOT NULL REFERENCES users(id), scope TEXT NOT NULL, @@ -208,6 +215,15 @@ export function runMigrations(handle: BetterSqlite3Database) { `); } catch { /* column already dropped — nothing to backfill */ } + // Backfill (issue #127): grant data: consent for every active integration token. + // Idempotent — INSERT OR IGNORE skips rows that already exist. + handle.exec(` + INSERT OR IGNORE INTO user_consents (user_id, consent_key, granted_at) + SELECT user_id, 'data:' || provider, connected_at + FROM integration_tokens + WHERE token_status = 'active' + `); + // Drop legacy consent columns (ADR-0014 step 8). Runs after the backfill above. // Silently skips if already dropped (column not found error) or never existed (new DB). for (const stmt of [ diff --git a/services/api/src/db/schema.ts b/services/api/src/db/schema.ts index c9fe035..ab9e590 100644 --- a/services/api/src/db/schema.ts +++ b/services/api/src/db/schema.ts @@ -189,6 +189,15 @@ export const agentOutputs = sqliteTable('agent_outputs', { agentVersion: text('agent_version').notNull(), // bump to invalidate on logic changes }); +// Persistent cache for LLM-enriched task descriptions used by clustering. +// Keyed by MD5 of raw task content; avoids re-calling LiteLLM on every agent compute cycle. +export const taskEnrichments = sqliteTable('task_enrichments', { + contentHash: text('content_hash').primaryKey(), + description: text('description').notNull(), + model: text('model').notNull().default('tip-generator'), + createdAt: text('created_at').notNull(), +}); + // Admin saved SQL queries. export const savedQueries = sqliteTable('saved_queries', { id: text('id').primaryKey(), diff --git a/services/api/src/routes/agent-outputs.ts b/services/api/src/routes/agent-outputs.ts index c2766f6..b072006 100644 --- a/services/api/src/routes/agent-outputs.ts +++ b/services/api/src/routes/agent-outputs.ts @@ -1,8 +1,9 @@ import { Router, type Request, type Response, type IRouter } from 'express'; import { nanoid } from 'nanoid'; import { db } from '../db/index.js'; -import { agentOutputs, tipFeedback, tipViews, userPreferences } from '../db/schema.js'; -import { eq, and, gt, lt } from 'drizzle-orm'; +import { agentOutputs, tipFeedback, tipViews, userPreferences, taskEnrichments } from '../db/schema.js'; +import { eq, and, gt, lt, inArray } from 'drizzle-orm'; +import crypto from 'node:crypto'; import { config } from '../config.js'; import { getProfile, type Profile } from '../profile/builder.js'; import { todoistSource } from '../signals/todoist.js'; @@ -27,6 +28,33 @@ function checkInternalToken(req: Request, res: Response): boolean { // ── DB helpers ──────────────────────────────────────────────────────────────── +function contentHash(text: string): string { + return crypto.createHash('md5').update(text).digest('hex'); +} + +async function fetchEnrichmentCache(tasks: { content?: string }[]): Promise> { + const hashes = tasks + .map((t) => t.content?.trim()) + .filter((c): c is string => !!c) + .map(contentHash); + if (!hashes.length) return {}; + const rows = await db + .select({ contentHash: taskEnrichments.contentHash, description: taskEnrichments.description }) + .from(taskEnrichments) + .where(inArray(taskEnrichments.contentHash, hashes)); + return Object.fromEntries(rows.map((r) => [r.contentHash, r.description])); +} + +async function persistEnrichments(newEntries: Record): Promise { + const now = new Date().toISOString(); + for (const [hash, description] of Object.entries(newEntries)) { + await db + .insert(taskEnrichments) + .values({ contentHash: hash, description, createdAt: now }) + .onConflictDoNothing(); + } +} + export async function getActiveAgentOutputs(userId: string) { const now = new Date().toISOString(); return db @@ -168,10 +196,13 @@ export async function computeAndStore(userId: string, agentId: string): Promise< // Load agent prefs (user overrides + previous inferences) to inject into the compute call. const agentPrefs = await loadAgentPrefs(userId, agentId); + // Fetch enrichment cache for task titles present in this compute call. + const enrichmentCache = await fetchEnrichmentCache(tasks as { content?: string }[]); + const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs }), + body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs, enrichment_cache: enrichmentCache }), signal: AbortSignal.timeout(60_000), }); @@ -183,10 +214,16 @@ export async function computeAndStore(userId: string, agentId: string): Promise< const output = await mlResp.json() as { user_id: string; agent_id: string; prompt_text: string; signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string; + new_enrichments?: Record; }; await storeAgentOutput(output); + // Persist any new enrichments produced during this compute cycle. + if (output.new_enrichments && Object.keys(output.new_enrichments).length > 0) { + await persistEnrichments(output.new_enrichments); + } + // Run inference framework for this agent and persist results. // Failures are non-fatal — the compute result is already stored. try {