feat(clustering): 1h TTL + skip recompute when tasks unchanged

focus-area now recomputes at most once per hour, and only if the task list
actually changed since the last compute.

- focus-area TTL: 43200s → 3600s; version bumped to 2.1.0
- computeAndStore hashes sorted task contents (MD5) and checks the stored
  _task_hash in the existing snapshot; skips the ml-serving call when the
  hash matches and the output isn't expired
- ml-serving injects _task_hash into the snapshot so the next cycle can compare

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-12 14:45:15 +00:00
parent 9ddeea6cac
commit d12f11d29d
4 changed files with 38 additions and 5 deletions

View File

@@ -20,7 +20,7 @@ def _infer_preferred_areas(history: UserHistory) -> list[str]:
MANIFEST = AgentManifest( MANIFEST = AgentManifest(
id="focus-area", id="focus-area",
version="2.0.0", # semantic clustering via nomic-embed-text (#97, #113) version="2.1.0", # 1h TTL + task-change detection (#129)
description="Identifies the most congested semantic focus area in the user's task list.", description="Identifies the most congested semantic focus area in the user's task list.",
pref_schema={ pref_schema={
"type": "object", "type": "object",
@@ -37,7 +37,7 @@ MANIFEST = AgentManifest(
context_schema=["todoist.tasks"], context_schema=["todoist.tasks"],
required_consents=["data:core", "data:todoist"], required_consents=["data:core", "data:todoist"],
output_contract={"type": "snippet", "format": "free_text"}, output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=43_200, ttl_sec=3_600,
inferred_params=[ inferred_params=[
InferredParam( InferredParam(
key="preferred_areas", key="preferred_areas",
@@ -54,7 +54,7 @@ class FocusAreaAgent(BaseAgent):
"""Identifies the most congested semantic focus area in the user's task list.""" """Identifies the most congested semantic focus area in the user's task list."""
agent_id: ClassVar[str] = MANIFEST.id agent_id: ClassVar[str] = MANIFEST.id
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
version: ClassVar[str] = MANIFEST.version version: ClassVar[str] = MANIFEST.version # 2.1.0
def compute(self, inp: AgentInput) -> AgentOutput: def compute(self, inp: AgentInput) -> AgentOutput:
preferred: list[str] = inp.agent_prefs.get("preferred_areas", []) preferred: list[str] = inp.agent_prefs.get("preferred_areas", [])

View File

@@ -662,7 +662,7 @@ class TestFocusAreaPreferredAreas:
def test_version_bumped(self): def test_version_bumped(self):
from ml.agents.focus_area import MANIFEST as FA_MANIFEST from ml.agents.focus_area import MANIFEST as FA_MANIFEST
assert FA_MANIFEST.version == "2.0.0" assert FA_MANIFEST.version == "2.1.0"
def test_snapshot_uses_cluster_keys(self): def test_snapshot_uses_cluster_keys(self):
tasks = [self._task("T", "work")] tasks = [self._task("T", "work")]

View File

@@ -199,6 +199,9 @@ class AgentComputeRequest(BaseModel):
# Pre-fetched enrichment cache: {content_hash -> description}. Avoids re-calling # Pre-fetched enrichment cache: {content_hash -> description}. Avoids re-calling
# LiteLLM for task titles already expanded in a prior compute cycle. # LiteLLM for task titles already expanded in a prior compute cycle.
enrichment_cache: dict[str, str] = {} enrichment_cache: dict[str, str] = {}
# MD5 of sorted task contents; stored in snapshot so the next cycle can skip
# recompute when the task list hasn't changed.
task_hash: Optional[str] = None
class AgentComputeResponse(BaseModel): class AgentComputeResponse(BaseModel):
@@ -327,6 +330,8 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute
log.error("agent_compute_failed", agent_id=agent_id, user_id=req.user_id, error=str(exc)) log.error("agent_compute_failed", agent_id=agent_id, user_id=req.user_id, error=str(exc))
raise HTTPException(status_code=500, detail=f"Agent compute failed: {exc}") raise HTTPException(status_code=500, detail=f"Agent compute failed: {exc}")
if req.task_hash:
output.signals_snapshot["_task_hash"] = req.task_hash
new_enrichments: dict[str, str] = output.signals_snapshot.pop("_new_enrichments", {}) new_enrichments: dict[str, str] = output.signals_snapshot.pop("_new_enrichments", {})
log.info("agent_computed", agent_id=agent_id, user_id=req.user_id, expires_at=output.expires_at) log.info("agent_computed", agent_id=agent_id, user_id=req.user_id, expires_at=output.expires_at)

View File

@@ -155,6 +155,31 @@ async function persistInferredPrefs(
} }
} }
function taskListHash(tasks: { content?: string }[]): string {
const sorted = tasks
.map((t) => t.content?.trim() ?? '')
.filter(Boolean)
.sort()
.join('\n');
return crypto.createHash('md5').update(sorted).digest('hex');
}
async function isUpToDate(userId: string, agentId: string, currentHash: string): Promise<boolean> {
const now = new Date().toISOString();
const rows = await db
.select({ signalsSnapshot: agentOutputs.signalsSnapshot, expiresAt: agentOutputs.expiresAt })
.from(agentOutputs)
.where(and(eq(agentOutputs.userId, userId), eq(agentOutputs.agentId, agentId)))
.limit(1);
if (!rows.length) return false;
const row = rows[0];
if (row.expiresAt <= now) return false;
try {
const snapshot = JSON.parse(row.signalsSnapshot ?? '{}') as { _task_hash?: string };
return snapshot._task_hash === currentHash;
} catch { return false; }
}
export async function computeAndStore(userId: string, agentId: string): Promise<void> { export async function computeAndStore(userId: string, agentId: string): Promise<void> {
let tasks: object[] = []; let tasks: object[] = [];
try { try {
@@ -176,6 +201,9 @@ export async function computeAndStore(userId: string, agentId: string): Promise<
// No integration or fetch error — agents that need tasks will report "no tasks" // No integration or fetch error — agents that need tasks will report "no tasks"
} }
const currentTaskHash = taskListHash(tasks as { content?: string }[]);
if (await isUpToDate(userId, agentId, currentTaskHash)) return;
let profile: Profile = {}; let profile: Profile = {};
try { try {
profile = await getProfile(userId); profile = await getProfile(userId);
@@ -202,7 +230,7 @@ export async function computeAndStore(userId: string, agentId: string): Promise<
const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, { const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, {
method: 'POST', method: 'POST',
headers: { 'Content-Type': 'application/json' }, headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs, enrichment_cache: enrichmentCache }), body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs, enrichment_cache: enrichmentCache, task_hash: currentTaskHash }),
signal: AbortSignal.timeout(60_000), signal: AbortSignal.timeout(60_000),
}); });