Files
oO/ml/agents/base.py
alvis 9ddeea6cac feat(clustering): persistent enrichment cache in task_enrichments table
Each unique task title is now enriched by LiteLLM once and cached in the DB.
Subsequent agent compute cycles (every 12h) fetch the cache before calling
ml-serving; only new titles hit the tip-generator.

- DB: task_enrichments(content_hash PK, description, model, created_at)
- TS: fetchEnrichmentCache / persistEnrichments helpers in agent-outputs.ts;
  enrichment_cache passed in compute request, new_enrichments persisted from response
- Python: AgentComputeRequest.enrichment_cache / AgentComputeResponse.new_enrichments;
  AgentInput.enrichment_cache; _enrich_batch returns (descriptions, new_entries);
  cluster_tasks returns (clusters, new_enrichments)
- FocusAreaAgent stashes new_enrichments in signals_snapshot under _new_enrichments;
  compute_agent endpoint pops it before storing the snapshot

Closes part of #129

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 14:39:35 +00:00

62 lines
2.6 KiB
Python

"""Base class and shared data structures for all recommendation sub-agents."""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import ClassVar
@dataclass
class AgentInput:
"""Everything an agent may need to produce its prompt snippet."""
user_id: str
tasks: list[dict] # task signal dicts (content, priority, is_overdue, …)
profile: dict[str, float | None] # profile feature values keyed by feature name
feedback_history: list[dict] = field(default_factory=list) # [{action, dwell_ms, created_at}, …]
now: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
# Per-agent inferred/user prefs loaded from user_preferences (ADR-0014 §3).
# Keys match the agent's pref_schema + inferred_params. 'user' source takes
# precedence over 'inferred' source; the caller resolves priority before
# passing this dict in.
agent_prefs: dict = field(default_factory=dict)
# Pre-fetched enrichment cache: {content_hash -> description}. Populated by
# the TS caller from the task_enrichments DB table to avoid redundant LLM calls.
enrichment_cache: dict = field(default_factory=dict)
@dataclass
class AgentOutput:
"""Result produced by an agent; persisted to agent_outputs table."""
user_id: str
agent_id: str
prompt_text: str # snippet passed to the orchestrator
signals_snapshot: dict # inputs consumed (for explainability / debugging)
computed_at: str # ISO 8601
expires_at: str # ISO 8601
agent_version: str
class BaseAgent(ABC):
agent_id: ClassVar[str]
ttl_seconds: ClassVar[int]
version: ClassVar[str]
@abstractmethod
def compute(self, inp: AgentInput) -> AgentOutput:
"""Analyse inp and return a prompt snippet describing what was found."""
...
def _make_output(self, inp: AgentInput, prompt_text: str, snapshot: dict) -> AgentOutput:
computed_at = inp.now.astimezone(timezone.utc).isoformat()
expires_at = (inp.now.astimezone(timezone.utc) + timedelta(seconds=self.ttl_seconds)).isoformat()
return AgentOutput(
user_id=inp.user_id,
agent_id=self.agent_id,
prompt_text=prompt_text,
signals_snapshot=snapshot,
computed_at=computed_at,
expires_at=expires_at,
agent_version=self.version,
)