feat(clustering): persistent enrichment cache in task_enrichments table
Each unique task title is now enriched by LiteLLM once and cached in the DB. Subsequent agent compute cycles (every 12h) fetch the cache before calling ml-serving; only new titles hit the tip-generator. - DB: task_enrichments(content_hash PK, description, model, created_at) - TS: fetchEnrichmentCache / persistEnrichments helpers in agent-outputs.ts; enrichment_cache passed in compute request, new_enrichments persisted from response - Python: AgentComputeRequest.enrichment_cache / AgentComputeResponse.new_enrichments; AgentInput.enrichment_cache; _enrich_batch returns (descriptions, new_entries); cluster_tasks returns (clusters, new_enrichments) - FocusAreaAgent stashes new_enrichments in signals_snapshot under _new_enrichments; compute_agent endpoint pops it before storing the snapshot Closes part of #129 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -149,6 +149,13 @@ export function runMigrations(handle: BetterSqlite3Database) {
|
||||
CREATE INDEX IF NOT EXISTS idx_agent_outputs_user_agent_exp
|
||||
ON agent_outputs(user_id, agent_id, expires_at DESC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS task_enrichments (
|
||||
content_hash TEXT PRIMARY KEY,
|
||||
description TEXT NOT NULL,
|
||||
model TEXT NOT NULL DEFAULT 'tip-generator',
|
||||
created_at TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS user_preferences (
|
||||
user_id TEXT NOT NULL REFERENCES users(id),
|
||||
scope TEXT NOT NULL,
|
||||
@@ -208,6 +215,15 @@ export function runMigrations(handle: BetterSqlite3Database) {
|
||||
`);
|
||||
} catch { /* column already dropped — nothing to backfill */ }
|
||||
|
||||
// Backfill (issue #127): grant data:<provider> consent for every active integration token.
|
||||
// Idempotent — INSERT OR IGNORE skips rows that already exist.
|
||||
handle.exec(`
|
||||
INSERT OR IGNORE INTO user_consents (user_id, consent_key, granted_at)
|
||||
SELECT user_id, 'data:' || provider, connected_at
|
||||
FROM integration_tokens
|
||||
WHERE token_status = 'active'
|
||||
`);
|
||||
|
||||
// Drop legacy consent columns (ADR-0014 step 8). Runs after the backfill above.
|
||||
// Silently skips if already dropped (column not found error) or never existed (new DB).
|
||||
for (const stmt of [
|
||||
|
||||
@@ -189,6 +189,15 @@ export const agentOutputs = sqliteTable('agent_outputs', {
|
||||
agentVersion: text('agent_version').notNull(), // bump to invalidate on logic changes
|
||||
});
|
||||
|
||||
// Persistent cache for LLM-enriched task descriptions used by clustering.
|
||||
// Keyed by MD5 of raw task content; avoids re-calling LiteLLM on every agent compute cycle.
|
||||
export const taskEnrichments = sqliteTable('task_enrichments', {
|
||||
contentHash: text('content_hash').primaryKey(),
|
||||
description: text('description').notNull(),
|
||||
model: text('model').notNull().default('tip-generator'),
|
||||
createdAt: text('created_at').notNull(),
|
||||
});
|
||||
|
||||
// Admin saved SQL queries.
|
||||
export const savedQueries = sqliteTable('saved_queries', {
|
||||
id: text('id').primaryKey(),
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { Router, type Request, type Response, type IRouter } from 'express';
|
||||
import { nanoid } from 'nanoid';
|
||||
import { db } from '../db/index.js';
|
||||
import { agentOutputs, tipFeedback, tipViews, userPreferences } from '../db/schema.js';
|
||||
import { eq, and, gt, lt } from 'drizzle-orm';
|
||||
import { agentOutputs, tipFeedback, tipViews, userPreferences, taskEnrichments } from '../db/schema.js';
|
||||
import { eq, and, gt, lt, inArray } from 'drizzle-orm';
|
||||
import crypto from 'node:crypto';
|
||||
import { config } from '../config.js';
|
||||
import { getProfile, type Profile } from '../profile/builder.js';
|
||||
import { todoistSource } from '../signals/todoist.js';
|
||||
@@ -27,6 +28,33 @@ function checkInternalToken(req: Request, res: Response): boolean {
|
||||
|
||||
// ── DB helpers ────────────────────────────────────────────────────────────────
|
||||
|
||||
function contentHash(text: string): string {
|
||||
return crypto.createHash('md5').update(text).digest('hex');
|
||||
}
|
||||
|
||||
async function fetchEnrichmentCache(tasks: { content?: string }[]): Promise<Record<string, string>> {
|
||||
const hashes = tasks
|
||||
.map((t) => t.content?.trim())
|
||||
.filter((c): c is string => !!c)
|
||||
.map(contentHash);
|
||||
if (!hashes.length) return {};
|
||||
const rows = await db
|
||||
.select({ contentHash: taskEnrichments.contentHash, description: taskEnrichments.description })
|
||||
.from(taskEnrichments)
|
||||
.where(inArray(taskEnrichments.contentHash, hashes));
|
||||
return Object.fromEntries(rows.map((r) => [r.contentHash, r.description]));
|
||||
}
|
||||
|
||||
async function persistEnrichments(newEntries: Record<string, string>): Promise<void> {
|
||||
const now = new Date().toISOString();
|
||||
for (const [hash, description] of Object.entries(newEntries)) {
|
||||
await db
|
||||
.insert(taskEnrichments)
|
||||
.values({ contentHash: hash, description, createdAt: now })
|
||||
.onConflictDoNothing();
|
||||
}
|
||||
}
|
||||
|
||||
export async function getActiveAgentOutputs(userId: string) {
|
||||
const now = new Date().toISOString();
|
||||
return db
|
||||
@@ -168,10 +196,13 @@ export async function computeAndStore(userId: string, agentId: string): Promise<
|
||||
// Load agent prefs (user overrides + previous inferences) to inject into the compute call.
|
||||
const agentPrefs = await loadAgentPrefs(userId, agentId);
|
||||
|
||||
// Fetch enrichment cache for task titles present in this compute call.
|
||||
const enrichmentCache = await fetchEnrichmentCache(tasks as { content?: string }[]);
|
||||
|
||||
const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs }),
|
||||
body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs, enrichment_cache: enrichmentCache }),
|
||||
signal: AbortSignal.timeout(60_000),
|
||||
});
|
||||
|
||||
@@ -183,10 +214,16 @@ export async function computeAndStore(userId: string, agentId: string): Promise<
|
||||
const output = await mlResp.json() as {
|
||||
user_id: string; agent_id: string; prompt_text: string;
|
||||
signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string;
|
||||
new_enrichments?: Record<string, string>;
|
||||
};
|
||||
|
||||
await storeAgentOutput(output);
|
||||
|
||||
// Persist any new enrichments produced during this compute cycle.
|
||||
if (output.new_enrichments && Object.keys(output.new_enrichments).length > 0) {
|
||||
await persistEnrichments(output.new_enrichments);
|
||||
}
|
||||
|
||||
// Run inference framework for this agent and persist results.
|
||||
// Failures are non-fatal — the compute result is already stored.
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user