From d12f11d29d2f83e0e3118b34f638e0408414ca7e Mon Sep 17 00:00:00 2001 From: alvis Date: Tue, 12 May 2026 14:45:15 +0000 Subject: [PATCH] feat(clustering): 1h TTL + skip recompute when tasks unchanged MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit focus-area now recomputes at most once per hour, and only if the task list actually changed since the last compute. - focus-area TTL: 43200s → 3600s; version bumped to 2.1.0 - computeAndStore hashes sorted task contents (MD5) and checks the stored _task_hash in the existing snapshot; skips the ml-serving call when the hash matches and the output isn't expired - ml-serving injects _task_hash into the snapshot so the next cycle can compare Co-Authored-By: Claude Sonnet 4.6 --- ml/agents/focus_area.py | 6 ++--- ml/agents/tests/test_per_agent_inference.py | 2 +- ml/serving/main.py | 5 ++++ services/api/src/routes/agent-outputs.ts | 30 ++++++++++++++++++++- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/ml/agents/focus_area.py b/ml/agents/focus_area.py index 1b42238..4ee192b 100644 --- a/ml/agents/focus_area.py +++ b/ml/agents/focus_area.py @@ -20,7 +20,7 @@ def _infer_preferred_areas(history: UserHistory) -> list[str]: MANIFEST = AgentManifest( id="focus-area", - version="2.0.0", # semantic clustering via nomic-embed-text (#97, #113) + version="2.1.0", # 1h TTL + task-change detection (#129) description="Identifies the most congested semantic focus area in the user's task list.", pref_schema={ "type": "object", @@ -37,7 +37,7 @@ MANIFEST = AgentManifest( context_schema=["todoist.tasks"], required_consents=["data:core", "data:todoist"], output_contract={"type": "snippet", "format": "free_text"}, - ttl_sec=43_200, + ttl_sec=3_600, inferred_params=[ InferredParam( key="preferred_areas", @@ -54,7 +54,7 @@ class FocusAreaAgent(BaseAgent): """Identifies the most congested semantic focus area in the user's task list.""" agent_id: ClassVar[str] = MANIFEST.id ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec - version: ClassVar[str] = MANIFEST.version + version: ClassVar[str] = MANIFEST.version # 2.1.0 def compute(self, inp: AgentInput) -> AgentOutput: preferred: list[str] = inp.agent_prefs.get("preferred_areas", []) diff --git a/ml/agents/tests/test_per_agent_inference.py b/ml/agents/tests/test_per_agent_inference.py index 72c67ec..2505449 100644 --- a/ml/agents/tests/test_per_agent_inference.py +++ b/ml/agents/tests/test_per_agent_inference.py @@ -662,7 +662,7 @@ class TestFocusAreaPreferredAreas: def test_version_bumped(self): from ml.agents.focus_area import MANIFEST as FA_MANIFEST - assert FA_MANIFEST.version == "2.0.0" + assert FA_MANIFEST.version == "2.1.0" def test_snapshot_uses_cluster_keys(self): tasks = [self._task("T", "work")] diff --git a/ml/serving/main.py b/ml/serving/main.py index b9bb403..7799cfc 100644 --- a/ml/serving/main.py +++ b/ml/serving/main.py @@ -199,6 +199,9 @@ class AgentComputeRequest(BaseModel): # Pre-fetched enrichment cache: {content_hash -> description}. Avoids re-calling # LiteLLM for task titles already expanded in a prior compute cycle. enrichment_cache: dict[str, str] = {} + # MD5 of sorted task contents; stored in snapshot so the next cycle can skip + # recompute when the task list hasn't changed. + task_hash: Optional[str] = None class AgentComputeResponse(BaseModel): @@ -327,6 +330,8 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute log.error("agent_compute_failed", agent_id=agent_id, user_id=req.user_id, error=str(exc)) raise HTTPException(status_code=500, detail=f"Agent compute failed: {exc}") + if req.task_hash: + output.signals_snapshot["_task_hash"] = req.task_hash new_enrichments: dict[str, str] = output.signals_snapshot.pop("_new_enrichments", {}) log.info("agent_computed", agent_id=agent_id, user_id=req.user_id, expires_at=output.expires_at) diff --git a/services/api/src/routes/agent-outputs.ts b/services/api/src/routes/agent-outputs.ts index b072006..fe49e15 100644 --- a/services/api/src/routes/agent-outputs.ts +++ b/services/api/src/routes/agent-outputs.ts @@ -155,6 +155,31 @@ async function persistInferredPrefs( } } +function taskListHash(tasks: { content?: string }[]): string { + const sorted = tasks + .map((t) => t.content?.trim() ?? '') + .filter(Boolean) + .sort() + .join('\n'); + return crypto.createHash('md5').update(sorted).digest('hex'); +} + +async function isUpToDate(userId: string, agentId: string, currentHash: string): Promise { + const now = new Date().toISOString(); + const rows = await db + .select({ signalsSnapshot: agentOutputs.signalsSnapshot, expiresAt: agentOutputs.expiresAt }) + .from(agentOutputs) + .where(and(eq(agentOutputs.userId, userId), eq(agentOutputs.agentId, agentId))) + .limit(1); + if (!rows.length) return false; + const row = rows[0]; + if (row.expiresAt <= now) return false; + try { + const snapshot = JSON.parse(row.signalsSnapshot ?? '{}') as { _task_hash?: string }; + return snapshot._task_hash === currentHash; + } catch { return false; } +} + export async function computeAndStore(userId: string, agentId: string): Promise { let tasks: object[] = []; try { @@ -176,6 +201,9 @@ export async function computeAndStore(userId: string, agentId: string): Promise< // No integration or fetch error — agents that need tasks will report "no tasks" } + const currentTaskHash = taskListHash(tasks as { content?: string }[]); + if (await isUpToDate(userId, agentId, currentTaskHash)) return; + let profile: Profile = {}; try { profile = await getProfile(userId); @@ -202,7 +230,7 @@ export async function computeAndStore(userId: string, agentId: string): Promise< const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs, enrichment_cache: enrichmentCache }), + body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs, enrichment_cache: enrichmentCache, task_hash: currentTaskHash }), signal: AbortSignal.timeout(60_000), });