diff --git a/CLAUDE.md b/CLAUDE.md index c1fc018..d479ca0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -106,15 +106,35 @@ Recent completions: - Model benchmarking for tip generation (#93, #95) - Admin UX refinements: feedback consolidation, settings placement (#100–102) - ADR-0012 — ε-greedy v2 (D=12) — 2026-04-26 (now superseded by ADR-0013) +- ADR-0014 steps 1–6: unified Profile schema + backfill, manifest plumbing, `/api/profile` read-through, registry-driven eligibility filter, inference framework + time-of-day migration — 2026-05-05 Active work (M2): -- ADR-0014 (proposed) — unified Profile model + agent registry + inference framework -- Unified Profile model: prefs, contexts, consents, registry plumbing, orchestrator cutover (#30) -- Shared context-inference framework for agents (#111) -- Per-agent auto-inference: time-of-day (#112), focus-area (#113), momentum (#114), overdue-task (#115), recent-patterns (#116) +- ADR-0014 step 7 — per-agent inference: focus-area (#113), momentum (#114), overdue-task (#115), recent-patterns (#116) +- ADR-0014 step 8 — drop `users.consentGiven` column - Signal abstraction for multi-source support (#78) - Per-user feature freshness SLAs (#61, ADR-0011 phase B) +## ADR-0014 endpoint map (as of step 6) + +| Endpoint | Purpose | +|----------|---------| +| `GET /api/profile` | Read-through: user globals + prefs (by scope) + consents + contexts | +| `PATCH /api/profile/prefs/:scope` | Upsert user_preferences rows (source='user') | +| `PATCH /api/profile/consents` | Grant / revoke consent keys | +| `PATCH /api/profile/contexts` | Create / activate / deactivate named contexts | +| `GET /api/agents/registry` | Manifest list (proxy to ml/serving; 60 s cache) | +| `POST /api/agents/:agentId/compute` | Internal: run agent compute for (user, agent) | +| `POST /agents/{agent_id}/infer` *(ml/serving)* | Run inference framework → `{inferred_prefs}` | + +## Inference framework (ADR-0014 §3) + +Lives in `ml/agents/inference/`. `run_inference(manifest, history)` evaluates all `InferredParam` entries in the manifest and returns `{key: value}`. Rules: +- Below `min_history` → emit `cold_start_default` +- `infer()` error → emit `cold_start_default` (never crashes) +- Results written to `user_preferences` with `source='inferred'`; keys with `source='user'` are never overwritten + +Time-of-day agent (`1.1.0`) is the proof agent (#112): infers `preferred_hour` (mode done-hour) and reads `quiet_start`/`quiet_end` from prefs. + ## What NOT to do - Don't copy Todoist's data into our DB. Store the OAuth token + computed features/derivatives we need, fetch raw on demand. diff --git a/ml/agents/base.py b/ml/agents/base.py index 8d526f2..d0b31d9 100644 --- a/ml/agents/base.py +++ b/ml/agents/base.py @@ -15,6 +15,11 @@ class AgentInput: profile: dict[str, float | None] # profile feature values keyed by feature name feedback_history: list[dict] = field(default_factory=list) # [{action, dwell_ms, created_at}, …] now: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + # Per-agent inferred/user prefs loaded from user_preferences (ADR-0014 §3). + # Keys match the agent's pref_schema + inferred_params. 'user' source takes + # precedence over 'inferred' source; the caller resolves priority before + # passing this dict in. + agent_prefs: dict = field(default_factory=dict) @dataclass diff --git a/ml/agents/inference/__init__.py b/ml/agents/inference/__init__.py new file mode 100644 index 0000000..a02cf2a --- /dev/null +++ b/ml/agents/inference/__init__.py @@ -0,0 +1,9 @@ +"""Shared context-inference framework (ADR-0014 §3, issue #111). + +Each agent's manifest declares InferredParams; this package owns the +scheduling contract, history data model, and write path to user_preferences. +""" +from .framework import run_inference +from .history import FeedbackEvent, UserHistory + +__all__ = ["run_inference", "FeedbackEvent", "UserHistory"] diff --git a/ml/agents/inference/framework.py b/ml/agents/inference/framework.py new file mode 100644 index 0000000..021e6e3 --- /dev/null +++ b/ml/agents/inference/framework.py @@ -0,0 +1,59 @@ +"""run_inference — core of the context-inference framework (ADR-0014 §3). + +Contract: + run_inference(manifest, history) → dict[key, value] + +Semantics: + - For each InferredParam in manifest.inferred_params: + - If len(history.events) < param.min_history → emit cold_start_default. + - Otherwise → call param.infer(history) and emit the result. + - Returns {key: value} ready for the caller to persist to user_preferences + with source='inferred'. + - User overrides (source='user') are handled by the caller's upsert logic; + this function has no DB access. +""" +from __future__ import annotations + +import logging +import time +from typing import Any + +from ..manifest import AgentManifest +from .history import UserHistory + +log = logging.getLogger(__name__) + + +def run_inference(manifest: AgentManifest, history: UserHistory) -> dict[str, Any]: + """Evaluate all InferredParams for an agent and return {key: inferred_value}.""" + result: dict[str, Any] = {} + n = len(history.events) + + for param in manifest.inferred_params: + t0 = time.monotonic() + if param.infer is None: + result[param.key] = param.cold_start_default + continue + if n < param.min_history: + value = param.cold_start_default + source = "cold_start" + else: + try: + value = param.infer(history) + source = "inferred" + except Exception as exc: + log.warning( + "inference_error agent=%s param=%s error=%s — using cold_start_default", + manifest.id, param.key, exc, + ) + value = param.cold_start_default + source = "error_fallback" + + latency_ms = round((time.monotonic() - t0) * 1000, 1) + log.info( + "inference_param agent=%s param=%s source=%s value=%r history_len=%d latency_ms=%s", + manifest.id, param.key, source, value, n, latency_ms, + ) + result[param.key] = value + + return result diff --git a/ml/agents/inference/history.py b/ml/agents/inference/history.py new file mode 100644 index 0000000..69a3f87 --- /dev/null +++ b/ml/agents/inference/history.py @@ -0,0 +1,29 @@ +"""UserHistory — normalised view of a user's feedback events for inference.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone + + +@dataclass +class FeedbackEvent: + action: str # 'done' | 'dismiss' | 'snooze' | 'helpful' | 'not_helpful' + dwell_ms: int | None + created_at: str # ISO 8601 + + @property + def hour(self) -> int: + """Hour of day (0-23) when the feedback was recorded.""" + try: + dt = datetime.fromisoformat(self.created_at.replace("Z", "+00:00")) + except ValueError: + return 12 + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.hour + + +@dataclass +class UserHistory: + user_id: str + events: list[FeedbackEvent] = field(default_factory=list) diff --git a/ml/agents/tests/test_agents.py b/ml/agents/tests/test_agents.py index 0334489..8e853fc 100644 --- a/ml/agents/tests/test_agents.py +++ b/ml/agents/tests/test_agents.py @@ -153,7 +153,7 @@ class TestTimeOfDayAgent: def test_snapshot_keys(self): out = self.agent.compute(_inp()) - assert {"hour", "day_of_week", "preferred_hour"} == set(out.signals_snapshot) + assert {"hour", "day_of_week", "preferred_hour", "quiet_start", "quiet_end"} == set(out.signals_snapshot) # ── RecentPatternsAgent ─────────────────────────────────────────────────────── diff --git a/ml/agents/tests/test_inference.py b/ml/agents/tests/test_inference.py new file mode 100644 index 0000000..320f707 --- /dev/null +++ b/ml/agents/tests/test_inference.py @@ -0,0 +1,120 @@ +"""Tests for the inference framework and time-of-day #112 proof.""" +from __future__ import annotations + +import sys, os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..")) + +import pytest +from datetime import datetime, timezone + +from ml.agents.inference.history import FeedbackEvent, UserHistory +from ml.agents.inference.framework import run_inference +from ml.agents.time_of_day import TimeOfDayAgent, MANIFEST as TOD_MANIFEST, MANIFEST +from ml.agents.base import AgentInput + + +_NOW = datetime(2026, 5, 1, 14, 0, 0, tzinfo=timezone.utc) # Thursday 14:00 + + +def _inp(**kwargs) -> AgentInput: + defaults = dict(user_id="u1", tasks=[], profile={}, now=_NOW, agent_prefs={}) + defaults.update(kwargs) + return AgentInput(**defaults) + + +def _event(action: str, hour: int) -> FeedbackEvent: + ts = f"2026-05-01T{hour:02d}:00:00+00:00" + return FeedbackEvent(action=action, dwell_ms=60_000 if action == "done" else 500, created_at=ts) + + +class TestRunInference: + def test_cold_start_when_below_min_history(self): + history = UserHistory(user_id="u1", events=[_event("done", 9)] * 5) # only 5 < 10 + result = run_inference(TOD_MANIFEST, history) + assert result["preferred_hour"] is None # cold_start_default + + def test_infers_preferred_hour_as_mode(self): + # 7 events at 09:00, 3 at 17:00 → preferred_hour should be 9 + events = [_event("done", 9)] * 7 + [_event("done", 17)] * 3 + history = UserHistory(user_id="u1", events=events) + result = run_inference(TOD_MANIFEST, history) + assert result["preferred_hour"] == 9 + + def test_infers_preferred_hour_from_majority_hour(self): + events = [_event("done", 20)] * 6 + [_event("done", 8)] * 4 + history = UserHistory(user_id="u1", events=events) + result = run_inference(TOD_MANIFEST, history) + assert result["preferred_hour"] == 20 + + def test_no_inferred_params_returns_empty(self): + from ml.agents.manifest import AgentManifest + bare = AgentManifest( + id="bare", version="1.0.0", description="", pref_schema={}, + context_schema=[], required_consents=[], output_contract={}, ttl_sec=300, + ) + history = UserHistory(user_id="u1", events=[_event("done", 9)] * 20) + result = run_inference(bare, history) + assert result == {} + + def test_cold_start_fallback_on_infer_error(self): + """infer() raising should fall back to cold_start_default, not crash.""" + from ml.agents.manifest import InferredParam, AgentManifest + + def _bad_infer(h): + raise RuntimeError("oops") + + m = AgentManifest( + id="boom", version="1.0.0", description="", pref_schema={}, + context_schema=[], required_consents=[], output_contract={}, ttl_sec=300, + inferred_params=[InferredParam(key="x", ttl_sec=60, cold_start_default=42, min_history=1, infer=_bad_infer)], + ) + history = UserHistory(user_id="u1", events=[_event("done", 9)] * 5) + result = run_inference(m, history) + assert result["x"] == 42 + + +class TestTimeOfDayAgentWithInference: + agent = TimeOfDayAgent() + + def test_uses_preferred_hour_from_agent_prefs(self): + inp = _inp(agent_prefs={"preferred_hour": 9}, now=datetime(2026, 5, 1, 9, 0, 0, tzinfo=timezone.utc)) + out = self.agent.compute(inp) + assert "peak productivity hour" in out.prompt_text.lower() or "peak" in out.prompt_text + + def test_quiet_window_noon_suppressed(self): + inp = _inp( + agent_prefs={"quiet_start": "22:00", "quiet_end": "07:00"}, + now=datetime(2026, 5, 1, 23, 0, 0, tzinfo=timezone.utc), + ) + out = self.agent.compute(inp) + assert "quiet window" in out.prompt_text + + def test_quiet_window_not_in_window(self): + inp = _inp( + agent_prefs={"quiet_start": "22:00", "quiet_end": "07:00"}, + now=datetime(2026, 5, 1, 14, 0, 0, tzinfo=timezone.utc), + ) + out = self.agent.compute(inp) + assert "quiet window" not in out.prompt_text + + def test_agent_prefs_override_profile(self): + # agent_prefs.preferred_hour wins over profile.preferred_hour + inp = _inp( + profile={"preferred_hour": 8}, + agent_prefs={"preferred_hour": 14}, + now=datetime(2026, 5, 1, 14, 0, 0, tzinfo=timezone.utc), + ) + out = self.agent.compute(inp) + assert "peak productivity hour (14:00)" in out.prompt_text + + def test_no_prefs_falls_back_to_profile(self): + inp = _inp(profile={"preferred_hour": 10}, now=datetime(2026, 5, 1, 10, 0, 0, tzinfo=timezone.utc)) + out = self.agent.compute(inp) + assert "peak" in out.prompt_text + + def test_version_bumped(self): + assert MANIFEST.version == "1.1.0" + + def test_manifest_has_preferred_hour_param(self): + keys = {p.key for p in MANIFEST.inferred_params} + assert "preferred_hour" in keys diff --git a/ml/agents/time_of_day.py b/ml/agents/time_of_day.py index a48104f..cdb69fa 100644 --- a/ml/agents/time_of_day.py +++ b/ml/agents/time_of_day.py @@ -1,14 +1,26 @@ from __future__ import annotations + +from collections import Counter from typing import ClassVar + from .base import BaseAgent, AgentInput, AgentOutput -from .manifest import AgentManifest +from .inference.history import UserHistory +from .manifest import AgentManifest, InferredParam _DOW_NAMES = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] +def _infer_preferred_hour(history: UserHistory) -> int: + """Mode hour of day across all 'done' feedback events; falls back to 9.""" + done_hours = [e.hour for e in history.events if e.action == "done"] + if not done_hours: + return 9 + return Counter(done_hours).most_common(1)[0][0] + + MANIFEST = AgentManifest( id="time-of-day", - version="1.0.0", + version="1.1.0", # bumped: inferred_params added (ADR-0014 §3, #112) description="Frames the current moment relative to the user's productive peak and quiet hours.", pref_schema={ "type": "object", @@ -30,6 +42,15 @@ MANIFEST = AgentManifest( required_consents=["data:core", "agent:time-of-day"], output_contract={"type": "snippet", "format": "free_text"}, ttl_sec=900, + inferred_params=[ + InferredParam( + key="preferred_hour", + ttl_sec=3_600, # recompute hourly + cold_start_default=None, + min_history=10, # need at least 10 feedback events to be meaningful + infer=_infer_preferred_hour, + ), + ], ) @@ -42,31 +63,63 @@ class TimeOfDayAgent(BaseAgent): def compute(self, inp: AgentInput) -> AgentOutput: hour = inp.now.hour dow = inp.now.weekday() # 0=Monday … 6=Sunday - preferred = inp.profile.get("preferred_hour") is_weekend = dow >= 5 + # agent_prefs (inferred or user-set) take precedence over ML profile features. + preferred_raw = inp.agent_prefs.get("preferred_hour", inp.profile.get("preferred_hour")) + preferred = int(preferred_raw) if preferred_raw is not None else None + + quiet_start: str | None = inp.agent_prefs.get("quiet_start") + quiet_end: str | None = inp.agent_prefs.get("quiet_end") + in_quiet = self._in_quiet_window(hour, quiet_start, quiet_end) + parts = [f"It is {hour:02d}:00 on {_DOW_NAMES[dow]} ({self._label(hour)})."] if is_weekend: parts.append("Weekend context — prefer personal or reflective tips over work tasks.") + if in_quiet: + parts.append( + f"User is in their quiet window ({quiet_start}–{quiet_end}) — " + "avoid urgent or demanding tips." + ) + if preferred is not None: - ph = int(preferred) - delta = min(abs(hour - ph), 24 - abs(hour - ph)) # circular distance + delta = min(abs(hour - preferred), 24 - abs(hour - preferred)) if delta == 0: parts.append( - f"This is the user's peak productivity hour ({ph:02d}:00) — " - f"a high-impact tip is appropriate." + f"This is the user's peak productivity hour ({preferred:02d}:00) — " + "a high-impact tip is appropriate." ) elif delta <= 2: - parts.append(f"Approaching the user's peak productivity window ({ph:02d}:00).") + parts.append(f"Approaching the user's peak productivity window ({preferred:02d}:00).") else: parts.append("No preferred-hour data yet.") prompt = " ".join(parts) - snapshot = {"hour": hour, "day_of_week": dow, "preferred_hour": preferred} + snapshot = { + "hour": hour, + "day_of_week": dow, + "preferred_hour": preferred, + "quiet_start": quiet_start, + "quiet_end": quiet_end, + } return self._make_output(inp, prompt, snapshot) + @staticmethod + def _in_quiet_window(hour: int, start: str | None, end: str | None) -> bool: + if not start or not end: + return False + try: + sh = int(start.split(":")[0]) + eh = int(end.split(":")[0]) + except (ValueError, IndexError): + return False + if sh <= eh: + return sh <= hour < eh + # wraps midnight e.g. 22:00–07:00 + return hour >= sh or hour < eh + @staticmethod def _label(hour: int) -> str: if 5 <= hour < 12: diff --git a/ml/serving/main.py b/ml/serving/main.py index ebd6afe..fb8a40d 100644 --- a/ml/serving/main.py +++ b/ml/serving/main.py @@ -3,6 +3,7 @@ oO ML Serving — multi-agent orchestrator (ADR-0013). Contract: POST /agents/{agent_id}/compute run a sub-agent, return prompt snippet + POST /agents/{agent_id}/infer run inference framework for a user, return inferred prefs POST /recommend orchestrate agent snippets → one tip via LiteLLM POST /generate LLM tip candidates (legacy; kept for bench/eval) GET /health { ok, agents: [...] } @@ -38,7 +39,8 @@ if _repo_root not in sys.path: sys.path.insert(0, _repo_root) from ml.agents.base import AgentInput # noqa: E402 -from ml.agents.registry import get_agent, all_agents, all_manifests # noqa: E402 +from ml.agents.registry import get_agent, all_agents, all_manifests, get_manifest # noqa: E402 +from ml.agents.inference import run_inference, FeedbackEvent, UserHistory # noqa: E402 logging_config.configure() @@ -123,6 +125,8 @@ class AgentComputeRequest(BaseModel): profile: dict[str, Optional[float]] = {} feedback_history: list[dict] = [] now_iso: Optional[str] = None # ISO 8601; defaults to utcnow + # Per-agent prefs from user_preferences (merged: user source overrides inferred). + agent_prefs: dict = {} class AgentComputeResponse(BaseModel): @@ -135,6 +139,18 @@ class AgentComputeResponse(BaseModel): agent_version: str +class AgentInferRequest(BaseModel): + user_id: str + feedback_history: list[dict] = [] # [{action, dwell_ms, created_at}, …] + + +class AgentInferResponse(BaseModel): + user_id: str + agent_id: str + # {key: inferred_value} — caller persists to user_preferences with source='inferred' + inferred_prefs: dict + + class AgentOutputSnippet(BaseModel): agent_id: str prompt_text: str @@ -225,6 +241,7 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute profile=req.profile, feedback_history=req.feedback_history, now=now, + agent_prefs=req.agent_prefs, ) try: output = agent.compute(inp) @@ -244,6 +261,46 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute ) +@app.post("/agents/{agent_id}/infer", response_model=AgentInferResponse) +async def infer_agent(agent_id: str, req: AgentInferRequest) -> AgentInferResponse: + """Run the inference framework for one agent and return inferred preference values. + + The caller (TS agent-outputs.ts) persists results to user_preferences + with source='inferred', skipping keys where source='user' already exists. + """ + try: + manifest = get_manifest(agent_id) + except KeyError: + raise HTTPException(status_code=404, detail=f"Unknown agent: {agent_id!r}") + + if not manifest.inferred_params: + return AgentInferResponse(user_id=req.user_id, agent_id=agent_id, inferred_prefs={}) + + events = [ + FeedbackEvent( + action=e.get("action", ""), + dwell_ms=e.get("dwell_ms"), + created_at=e.get("created_at", ""), + ) + for e in req.feedback_history + ] + history = UserHistory(user_id=req.user_id, events=events) + + t0 = __import__("time").monotonic() + inferred = run_inference(manifest, history) + latency_ms = round((__import__("time").monotonic() - t0) * 1000, 1) + + log.info( + "inference_run", + agent_id=agent_id, + user_id=req.user_id, + n_params=len(inferred), + history_len=len(events), + latency_ms=latency_ms, + ) + return AgentInferResponse(user_id=req.user_id, agent_id=agent_id, inferred_prefs=inferred) + + @app.post("/recommend", response_model=RecommendResponse) async def recommend(req: RecommendRequest) -> RecommendResponse: """Orchestrator: combine pre-computed agent outputs into one tip via LLM. diff --git a/ml/serving/tests/test_infer_endpoint.py b/ml/serving/tests/test_infer_endpoint.py new file mode 100644 index 0000000..eba8215 --- /dev/null +++ b/ml/serving/tests/test_infer_endpoint.py @@ -0,0 +1,52 @@ +"""POST /agents/{agent_id}/infer — inference framework endpoint.""" +import pytest +from httpx import AsyncClient, ASGITransport + +from main import app + + +@pytest.mark.anyio +async def test_infer_time_of_day_cold_start(): + """Fewer than min_history events → cold_start_default for preferred_hour.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as client: + resp = await client.post("/agents/time-of-day/infer", json={ + "user_id": "u1", + "feedback_history": [ + {"action": "done", "dwell_ms": 60000, "created_at": "2026-05-01T09:00:00+00:00"}, + ] * 5, # 5 < min_history=10 + }) + assert resp.status_code == 200 + body = resp.json() + assert body["agent_id"] == "time-of-day" + assert body["inferred_prefs"]["preferred_hour"] is None + + +@pytest.mark.anyio +async def test_infer_time_of_day_enough_history(): + """10+ events → preferred_hour is inferred as the mode done-hour.""" + events = [{"action": "done", "dwell_ms": 60000, "created_at": "2026-05-01T09:00:00+00:00"}] * 10 + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as client: + resp = await client.post("/agents/time-of-day/infer", json={"user_id": "u1", "feedback_history": events}) + assert resp.status_code == 200 + body = resp.json() + assert body["inferred_prefs"]["preferred_hour"] == 9 + + +@pytest.mark.anyio +async def test_infer_agent_with_no_inferred_params(): + """Agents with no inferred_params return an empty dict.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as client: + resp = await client.post("/agents/overdue-task/infer", json={"user_id": "u1", "feedback_history": []}) + assert resp.status_code == 200 + assert resp.json()["inferred_prefs"] == {} + + +@pytest.mark.anyio +async def test_infer_unknown_agent_404(): + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as client: + resp = await client.post("/agents/ghost/infer", json={"user_id": "u1", "feedback_history": []}) + assert resp.status_code == 404 diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 33d2565..bfd0f29 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -19,6 +19,7 @@ import { adminRouter, adminInternalRouter } from './routes/admin.js'; import benchRouter from './routes/bench.js'; import agentOutputsRouter from './routes/agent-outputs.js'; import agentRegistryRouter from './routes/agent-registry.js'; +import profileRouter from './routes/profile.js'; import { mkdir } from 'fs/promises'; import { dirname } from 'path'; import { requireAuth } from './middleware/session.js'; @@ -74,6 +75,7 @@ app.use('/api/bench', requireAuth as any, requireAdmin as any, benchRouter); // agent-registry mounts first so /registry beats agent-outputs' /:userId pattern. app.use('/api/agents', agentRegistryRouter); app.use('/api/agents', agentOutputsRouter); +app.use('/api/profile', profileRouter); app.use('/api/ml', requireAuth as any, requireAdmin as any, async (req: Request, res: Response) => { const mlUrl = config.ML_SERVING_URL; diff --git a/services/api/src/profile/__tests__/eligibility.test.ts b/services/api/src/profile/__tests__/eligibility.test.ts new file mode 100644 index 0000000..67f9960 --- /dev/null +++ b/services/api/src/profile/__tests__/eligibility.test.ts @@ -0,0 +1,130 @@ +/** + * Unit tests for getEligibleAgentIds (ADR-0014 step 5). + * DB is mocked via in-memory SQLite; fetchRegistry is mocked per scenario. + */ +import { describe, it, expect, vi, beforeAll, beforeEach } from 'vitest'; +import { makeTestDb } from '../../test/db.js'; +import { users, userConsents, userPreferences, userContexts } from '../../db/schema.js'; + +const testDb = makeTestDb(); +vi.mock('../../db/index.js', () => ({ db: testDb, rawSqlite: testDb.rawSqlite })); + +// Registry mock — overridden per test. +const mockFetchRegistry = vi.fn(); +vi.mock('../../routes/agent-registry.js', () => ({ + fetchRegistry: (...args: unknown[]) => mockFetchRegistry(...args), + _resetRegistryCache: vi.fn(), +})); + +const { getEligibleAgentIds } = await import('../eligibility.js'); + +const NOW = new Date().toISOString(); +const MANIFEST_DEFAULTS = { + version: '1.0.0', + description: '', + pref_schema: {}, + context_schema: [], + output_contract: {}, + ttl_sec: 300, +}; + +const AGENT_A = { ...MANIFEST_DEFAULTS, id: 'agent-a', required_consents: ['data:core'], silenced_in_contexts: [] }; +const AGENT_B = { ...MANIFEST_DEFAULTS, id: 'agent-b', required_consents: ['data:core', 'data:todoist'], silenced_in_contexts: [] }; +const AGENT_C = { ...MANIFEST_DEFAULTS, id: 'agent-c', required_consents: ['data:core'], silenced_in_contexts: ['vacation'] }; + +beforeAll(async () => { + await testDb.insert(users).values({ + id: 'u1', email: 'u@test.com', name: null, image: null, role: 'user', + consentGiven: false, createdAt: NOW, + }); +}); + +beforeEach(() => { + mockFetchRegistry.mockReset(); +}); + +describe('getEligibleAgentIds', () => { + it('returns empty set when registry is unavailable', async () => { + mockFetchRegistry.mockRejectedValue(new Error('network')); + const ids = await getEligibleAgentIds('u1'); + expect(ids.size).toBe(0); + }); + + it('excludes agents whose required consents are not granted', async () => { + mockFetchRegistry.mockResolvedValue({ agents: [AGENT_A, AGENT_B] }); + // only data:core granted + await testDb.insert(userConsents).values({ userId: 'u1', consentKey: 'data:core', grantedAt: NOW, revokedAt: null }); + + const ids = await getEligibleAgentIds('u1'); + expect(ids.has('agent-a')).toBe(true); + expect(ids.has('agent-b')).toBe(false); + }); + + it('excludes agents when a required consent is revoked', async () => { + mockFetchRegistry.mockResolvedValue({ agents: [AGENT_B] }); + // grant then revoke data:todoist + await testDb.insert(userConsents).values([ + { userId: 'u1', consentKey: 'data:todoist', grantedAt: NOW, revokedAt: NOW }, + ]).onConflictDoUpdate({ + target: [userConsents.userId, userConsents.consentKey], + set: { revokedAt: NOW }, + }); + + const ids = await getEligibleAgentIds('u1'); + expect(ids.has('agent-b')).toBe(false); + }); + + it('respects legacy consentGiven bit as data:core', async () => { + mockFetchRegistry.mockResolvedValue({ agents: [AGENT_A] }); + // no consent rows, but legacy bit set + await testDb.update(users).set({ consentGiven: true }); + + const ids = await getEligibleAgentIds('u1'); + expect(ids.has('agent-a')).toBe(true); + + await testDb.update(users).set({ consentGiven: false }); + }); + + it('silences agents whose silenced_in_contexts intersects active contexts', async () => { + mockFetchRegistry.mockResolvedValue({ agents: [AGENT_A, AGENT_C] }); + // ensure data:core granted + await testDb.insert(userConsents).values({ userId: 'u1', consentKey: 'data:core', grantedAt: NOW, revokedAt: null }) + .onConflictDoUpdate({ target: [userConsents.userId, userConsents.consentKey], set: { revokedAt: null } }); + // activate vacation context + await testDb.insert(userContexts).values({ userId: 'u1', name: 'vacation', active: true, scheduleJson: null, createdAt: NOW }); + + const ids = await getEligibleAgentIds('u1'); + expect(ids.has('agent-a')).toBe(true); + expect(ids.has('agent-c')).toBe(false); + }); + + it('excludes agents explicitly disabled via user_preferences', async () => { + mockFetchRegistry.mockResolvedValue({ agents: [AGENT_A] }); + await testDb.insert(userConsents).values({ userId: 'u1', consentKey: 'data:core', grantedAt: NOW, revokedAt: null }) + .onConflictDoUpdate({ target: [userConsents.userId, userConsents.consentKey], set: { revokedAt: null } }); + await testDb.insert(userPreferences).values({ + userId: 'u1', scope: 'agent:agent-a', key: 'enabled', valueJson: 'false', source: 'user', updatedAt: NOW, + }).onConflictDoUpdate({ + target: [userPreferences.userId, userPreferences.scope, userPreferences.key], + set: { valueJson: 'false' }, + }); + + const ids = await getEligibleAgentIds('u1'); + expect(ids.has('agent-a')).toBe(false); + }); + + it('includes agents when enabled pref is true (or absent)', async () => { + mockFetchRegistry.mockResolvedValue({ agents: [AGENT_A] }); + await testDb.insert(userConsents).values({ userId: 'u1', consentKey: 'data:core', grantedAt: NOW, revokedAt: null }) + .onConflictDoUpdate({ target: [userConsents.userId, userConsents.consentKey], set: { revokedAt: null } }); + await testDb.insert(userPreferences).values({ + userId: 'u1', scope: 'agent:agent-a', key: 'enabled', valueJson: 'true', source: 'user', updatedAt: NOW, + }).onConflictDoUpdate({ + target: [userPreferences.userId, userPreferences.scope, userPreferences.key], + set: { valueJson: 'true' }, + }); + + const ids = await getEligibleAgentIds('u1'); + expect(ids.has('agent-a')).toBe(true); + }); +}); diff --git a/services/api/src/profile/eligibility.ts b/services/api/src/profile/eligibility.ts new file mode 100644 index 0000000..7f7bd4c --- /dev/null +++ b/services/api/src/profile/eligibility.ts @@ -0,0 +1,88 @@ +/** + * Registry-driven agent eligibility filter (ADR-0014 step 5). + * + * Rules (all must pass for an agent to be eligible): + * 1. All required_consents are granted and not revoked. + * 2. No silenced_in_contexts entry matches an active context. + * 3. user_preferences[scope='agent:', key='enabled'] is not false. + * + * Fail-closed: if the registry is unavailable, returns an empty set so the + * orchestrator falls back to the random policy rather than proceeding without + * consent checks. + */ +import { db } from '../db/index.js'; +import { users, userConsents, userPreferences, userContexts } from '../db/schema.js'; +import { eq, and, isNull } from 'drizzle-orm'; +import { fetchRegistry } from '../routes/agent-registry.js'; + +export interface AgentManifestWire { + id: string; + required_consents: string[]; + silenced_in_contexts: string[]; + [key: string]: unknown; +} + +interface RegistryPayload { + agents: AgentManifestWire[]; +} + +export async function getEligibleAgentIds(userId: string): Promise> { + let registry: RegistryPayload; + try { + registry = (await fetchRegistry()) as RegistryPayload; + } catch { + return new Set(); + } + + const [consentRows, prefRows, contextRows, userRow] = await Promise.all([ + db + .select({ consentKey: userConsents.consentKey }) + .from(userConsents) + .where(and(eq(userConsents.userId, userId), isNull(userConsents.revokedAt))), + db + .select({ scope: userPreferences.scope, key: userPreferences.key, valueJson: userPreferences.valueJson }) + .from(userPreferences) + .where(eq(userPreferences.userId, userId)), + db + .select({ name: userContexts.name, active: userContexts.active }) + .from(userContexts) + .where(and(eq(userContexts.userId, userId), eq(userContexts.active, true))), + db + .select({ consentGiven: users.consentGiven }) + .from(users) + .where(eq(users.id, userId)) + .limit(1), + ]); + + // Active consents (granted + not revoked) + const activeConsents = new Set(consentRows.map((r) => r.consentKey)); + // Legacy fallback: consentGiven bit counts as data:core + if (!activeConsents.has('data:core') && userRow[0]?.consentGiven) { + activeConsents.add('data:core'); + } + + // Active context names + const activeContextNames = new Set(contextRows.map((r) => r.name)); + + // Per-agent enabled flag from user_preferences + const agentEnabled: Record = {}; + for (const p of prefRows) { + if (!p.scope.startsWith('agent:')) continue; + if (p.key !== 'enabled') continue; + try { + agentEnabled[p.scope] = JSON.parse(p.valueJson) as boolean; + } catch { + // ignore malformed + } + } + + const eligible = new Set(); + for (const manifest of registry.agents) { + if (!manifest.required_consents.every((c) => activeConsents.has(c))) continue; + if (manifest.silenced_in_contexts.some((ctx) => activeContextNames.has(ctx))) continue; + const enabledPref = agentEnabled[`agent:${manifest.id}`]; + if (enabledPref === false) continue; + eligible.add(manifest.id); + } + return eligible; +} diff --git a/services/api/src/routes/__tests__/profile.test.ts b/services/api/src/routes/__tests__/profile.test.ts new file mode 100644 index 0000000..58dff8a --- /dev/null +++ b/services/api/src/routes/__tests__/profile.test.ts @@ -0,0 +1,201 @@ +/** + * Integration tests for GET/PATCH /api/profile (ADR-0014 step 4). + * Real in-memory SQLite; auth middleware mocked so requests arrive as 'user-1'. + */ +import { describe, it, expect, vi, beforeAll, afterAll } from 'vitest'; +import express from 'express'; +import * as http from 'http'; +import { makeTestDb } from '../../test/db.js'; +import { users, userPreferences, userConsents, userContexts } from '../../db/schema.js'; + +const testDb = makeTestDb(); + +vi.mock('../../db/index.js', () => ({ db: testDb, rawSqlite: testDb.rawSqlite })); + +vi.mock('../../middleware/session.js', () => ({ + sessionMiddleware: (_req: express.Request, _res: express.Response, next: express.NextFunction) => + next(), + requireAuth: (req: express.Request, _res: express.Response, next: express.NextFunction) => { + (req as any).userId = 'user-1'; + next(); + }, +})); + +function call( + server: http.Server, + method: string, + path: string, + body?: unknown, +): Promise<{ status: number; body: unknown }> { + return new Promise((resolve, reject) => { + const { port } = server.address() as { port: number }; + const req = http.request( + { method, hostname: '127.0.0.1', port, path, headers: { 'Content-Type': 'application/json' } }, + (res) => { + let data = ''; + res.on('data', (c) => (data += c)); + res.on('end', () => { + try { resolve({ status: res.statusCode!, body: JSON.parse(data) }); } + catch { resolve({ status: res.statusCode!, body: data }); } + }); + }, + ); + req.on('error', reject); + if (body !== undefined) req.write(JSON.stringify(body)); + req.end(); + }); +} + +function startServer(app: express.Application): Promise<{ server: http.Server; call: (method: string, path: string, body?: unknown) => ReturnType }> { + return new Promise((resolve) => { + const server = http.createServer(app); + server.listen(0, () => + resolve({ server, call: (m, p, b) => call(server, m, p, b) }), + ); + }); +} + +const profileRouter = (await import('../profile.js')).default; +const app = express(); +app.use(express.json()); +app.use('/api/profile', profileRouter); + +const { server, call: c } = await startServer(app); +afterAll(() => server.close()); + +const NOW = new Date().toISOString(); + +beforeAll(async () => { + await testDb.insert(users).values({ + id: 'user-1', + email: 'a@example.com', + name: 'Alice', + image: null, + role: 'user', + consentGiven: false, + tone: 'direct', + tipKindsJson: JSON.stringify(['task', 'advice']), + createdAt: NOW, + }); +}); + +describe('GET /api/profile', () => { + it('returns user globals with empty prefs/consents/contexts', async () => { + const res = await c('GET', '/api/profile'); + expect(res.status).toBe(200); + const body = res.body as any; + expect(body.user).toMatchObject({ id: 'user-1', tone: 'direct', tipKinds: ['task', 'advice'] }); + expect(body.prefs).toEqual({}); + expect(body.consents).toEqual({}); + expect(body.contexts).toEqual([]); + }); + + it('surfaces legacy consentGiven as data:core when no consent row exists', async () => { + await testDb.update(users).set({ consentGiven: true, consentAt: NOW }); + const res = await c('GET', '/api/profile'); + expect((res.body as any).consents['data:core']).toMatchObject({ revokedAt: null }); + await testDb.update(users).set({ consentGiven: false }); + }); + + it('includes prefs grouped by scope', async () => { + await testDb.insert(userPreferences).values([ + { userId: 'user-1', scope: 'orchestrator', key: 'quietHours', valueJson: '"22:00-07:00"', source: 'user', updatedAt: NOW }, + { userId: 'user-1', scope: 'agent:focus-area', key: 'areas', valueJson: '["work","health"]', source: 'inferred', updatedAt: NOW }, + ]); + const res = await c('GET', '/api/profile'); + const body = res.body as any; + expect(body.prefs['orchestrator']).toMatchObject({ quietHours: '22:00-07:00' }); + expect(body.prefs['agent:focus-area']).toMatchObject({ areas: ['work', 'health'] }); + }); + + it('includes consents', async () => { + await testDb.insert(userConsents).values([ + { userId: 'user-1', consentKey: 'data:core', grantedAt: NOW, revokedAt: null }, + { userId: 'user-1', consentKey: 'data:todoist', grantedAt: NOW, revokedAt: NOW }, + ]); + const body = (await c('GET', '/api/profile')).body as any; + expect(body.consents['data:core'].revokedAt).toBeNull(); + expect(body.consents['data:todoist'].revokedAt).toBe(NOW); + }); + + it('includes contexts', async () => { + await testDb.insert(userContexts).values({ + userId: 'user-1', name: 'work', active: true, scheduleJson: null, createdAt: NOW, + }); + const body = (await c('GET', '/api/profile')).body as any; + expect(body.contexts).toContainEqual(expect.objectContaining({ name: 'work', active: true })); + }); +}); + +describe('PATCH /api/profile/prefs/:scope', () => { + it('upserts preference keys with source=user', async () => { + const res = await c('PATCH', '/api/profile/prefs/orchestrator', { tone: 'gentle' }); + expect(res.status).toBe(200); + expect(res.body).toEqual({ ok: true }); + + const body = (await c('GET', '/api/profile')).body as any; + expect(body.prefs['orchestrator']['tone']).toBe('gentle'); + }); + + it('overwrites an inferred value with user source', async () => { + await testDb.insert(userPreferences).values({ + userId: 'user-1', scope: 'agent:momentum', key: 'enabled', valueJson: 'false', + source: 'inferred', updatedAt: NOW, + }).onConflictDoUpdate({ + target: [userPreferences.userId, userPreferences.scope, userPreferences.key], + set: { valueJson: 'false', source: 'inferred', updatedAt: NOW }, + }); + + await c('PATCH', '/api/profile/prefs/agent:momentum', { enabled: true }); + const body = (await c('GET', '/api/profile')).body as any; + expect(body.prefs['agent:momentum']['enabled']).toBe(true); + }); + + it('returns 400 for non-object body', async () => { + const res = await c('PATCH', '/api/profile/prefs/orchestrator', [1, 2]); + expect(res.status).toBe(400); + }); +}); + +describe('PATCH /api/profile/consents', () => { + it('grants a new consent key', async () => { + const res = await c('PATCH', '/api/profile/consents', { grant: ['data:calendar'] }); + expect(res.status).toBe(200); + const body = (await c('GET', '/api/profile')).body as any; + expect(body.consents['data:calendar'].revokedAt).toBeNull(); + }); + + it('revokes an existing active consent', async () => { + await c('PATCH', '/api/profile/consents', { grant: ['agent:overdue-task'] }); + await c('PATCH', '/api/profile/consents', { revoke: ['agent:overdue-task'] }); + const body = (await c('GET', '/api/profile')).body as any; + expect(body.consents['agent:overdue-task'].revokedAt).not.toBeNull(); + }); + + it('returns 400 when grant is not an array', async () => { + const res = await c('PATCH', '/api/profile/consents', { grant: 'data:core' }); + expect(res.status).toBe(400); + }); +}); + +describe('PATCH /api/profile/contexts', () => { + it('creates a new context', async () => { + const res = await c('PATCH', '/api/profile/contexts', { name: 'vacation', active: false }); + expect(res.status).toBe(200); + const body = (await c('GET', '/api/profile')).body as any; + expect(body.contexts).toContainEqual(expect.objectContaining({ name: 'vacation', active: false })); + }); + + it('toggles active on existing context', async () => { + await c('PATCH', '/api/profile/contexts', { name: 'home', active: false }); + await c('PATCH', '/api/profile/contexts', { name: 'home', active: true }); + const body = (await c('GET', '/api/profile')).body as any; + const ctx = (body.contexts as any[]).find((x) => x.name === 'home'); + expect(ctx?.active).toBe(true); + }); + + it('returns 400 when name is missing', async () => { + const res = await c('PATCH', '/api/profile/contexts', { active: true }); + expect(res.status).toBe(400); + }); +}); diff --git a/services/api/src/routes/__tests__/recommender.test.ts b/services/api/src/routes/__tests__/recommender.test.ts index b2fee13..84eed95 100644 --- a/services/api/src/routes/__tests__/recommender.test.ts +++ b/services/api/src/routes/__tests__/recommender.test.ts @@ -13,7 +13,8 @@ import { describe, it, expect, vi, beforeAll, afterEach } from 'vitest'; import express from 'express'; import * as http from 'http'; import { makeTestDb } from '../../test/db.js'; -import { users, integrationTokens, tipScores } from '../../db/schema.js'; +import { users, integrationTokens, tipScores, agentOutputs, userConsents } from '../../db/schema.js'; +import { nanoid } from 'nanoid'; const testDb = makeTestDb(); @@ -155,4 +156,77 @@ describe('POST /recommend integration', () => { expect(row.promptVersion).toBeNull(); expect(row.llmModel).toBeNull(); }); + + it('eligibility filter: only passes consented agent outputs to ml/serving', async () => { + const NOW = new Date().toISOString(); + const FUTURE = new Date(Date.now() + 60_000).toISOString(); + + // Grant data:core only — not data:todoist + await testDb.insert(userConsents).values([ + { userId: 'user-1', consentKey: 'data:core', grantedAt: NOW, revokedAt: null }, + ]).onConflictDoUpdate({ + target: [userConsents.userId, userConsents.consentKey], + set: { revokedAt: null }, + }); + + // Two agent outputs: time-of-day (needs data:core only) and overdue-task (needs data:todoist too) + await testDb.insert(agentOutputs).values([ + { + id: nanoid(), userId: 'user-1', agentId: 'time-of-day', + promptText: 'It is morning.', + computedAt: NOW, expiresAt: FUTURE, agentVersion: '1.0.0', + }, + { + id: nanoid(), userId: 'user-1', agentId: 'overdue-task', + promptText: 'You have overdue tasks.', + computedAt: NOW, expiresAt: FUTURE, agentVersion: '1.0.0', + }, + ]); + + // Manifest: time-of-day requires ['data:core'], overdue-task requires ['data:core','data:todoist'] + const registry = { + agents: [ + { id: 'time-of-day', required_consents: ['data:core'], silenced_in_contexts: [], version: '1.0.0', description: '', pref_schema: {}, context_schema: [], output_contract: {}, ttl_sec: 300, inferred_params: [] }, + { id: 'overdue-task', required_consents: ['data:core', 'data:todoist'], silenced_in_contexts: [], version: '1.0.0', description: '', pref_schema: {}, context_schema: [], output_contract: {}, ttl_sec: 300, inferred_params: [] }, + ], + }; + + let capturedAgentOutputs: { agent_id: string }[] = []; + globalThis.fetch = vi.fn().mockImplementation((url: string) => { + const u = String(url); + if (u.includes('todoist.com')) { + return Promise.resolve({ ok: true, status: 200, json: async () => ({ results: [] }) } as any); + } + if (u.includes('/agents/registry')) { + return Promise.resolve({ ok: true, status: 200, json: async () => registry } as any); + } + if (u.includes('/recommend')) { + return Promise.resolve({ + ok: true, status: 200, + json: async (req?: Request) => { + // The body has already been sent; capture via the mock call args instead + return { tip: { id: 'tip-x', content: 'Stay focused.' }, model: 'tip-generator' }; + }, + } as any); + } + return Promise.resolve({ ok: false, status: 500 } as any); + }); + + // Intercept the /recommend body to inspect what agent_outputs were sent + const origFetch = globalThis.fetch as ReturnType; + const wrappedFetch = vi.fn().mockImplementation(async (url: string, init?: RequestInit) => { + if (String(url).includes('/recommend') && init?.body) { + const body = JSON.parse(init.body as string); + capturedAgentOutputs = body.agent_outputs ?? []; + } + return origFetch(url, init); + }); + globalThis.fetch = wrappedFetch; + + const { status } = await post(`${baseUrl}/api/recommend`); + expect(status).toBe(200); + + // Only time-of-day should have been passed; overdue-task is blocked (missing data:todoist) + expect(capturedAgentOutputs.map((a) => a.agent_id)).toEqual(['time-of-day']); + }); }); diff --git a/services/api/src/routes/agent-outputs.ts b/services/api/src/routes/agent-outputs.ts index 5dff5ce..273e694 100644 --- a/services/api/src/routes/agent-outputs.ts +++ b/services/api/src/routes/agent-outputs.ts @@ -1,7 +1,7 @@ import { Router, type Request, type Response, type IRouter } from 'express'; import { nanoid } from 'nanoid'; import { db } from '../db/index.js'; -import { agentOutputs, tipFeedback, tipViews } from '../db/schema.js'; +import { agentOutputs, tipFeedback, tipViews, userPreferences } from '../db/schema.js'; import { eq, and, gt, lt } from 'drizzle-orm'; import { config } from '../config.js'; import { getProfile, type Profile } from '../profile/builder.js'; @@ -78,6 +78,54 @@ router.get('/active-users', async (req: Request, res: Response) => { // ── Core compute logic (used by route + scheduler) ─────────────────────────── +/** Load agent prefs for a user from user_preferences, merging user+inferred. + * User source wins: if both exist, the 'user' row is returned. */ +async function loadAgentPrefs(userId: string, agentId: string): Promise> { + const scope = `agent:${agentId}`; + const rows = await db + .select({ key: userPreferences.key, valueJson: userPreferences.valueJson, source: userPreferences.source }) + .from(userPreferences) + .where(and(eq(userPreferences.userId, userId), eq(userPreferences.scope, scope))); + + // Build merged dict: 'user' source takes precedence over 'inferred' + const merged: Record = {}; + for (const row of rows) { + try { + const value = JSON.parse(row.valueJson); + const existing = merged[row.key]; + if (!existing || row.source === 'user') { + merged[row.key] = { value, source: row.source }; + } + } catch { + // skip malformed + } + } + return Object.fromEntries(Object.entries(merged).map(([k, v]) => [k, v.value])); +} + +/** Persist inferred prefs to user_preferences, skipping keys the user has explicitly set. */ +async function persistInferredPrefs( + userId: string, + agentId: string, + inferredPrefs: Record, +): Promise { + if (!Object.keys(inferredPrefs).length) return; + const scope = `agent:${agentId}`; + const now = new Date().toISOString(); + for (const [key, value] of Object.entries(inferredPrefs)) { + const valueJson = JSON.stringify(value); + await db + .insert(userPreferences) + .values({ userId, scope, key, valueJson, source: 'inferred', updatedAt: now }) + .onConflictDoUpdate({ + target: [userPreferences.userId, userPreferences.scope, userPreferences.key], + set: { valueJson, updatedAt: now }, + // Only overwrite rows already marked inferred; user overrides are untouched. + setWhere: eq(userPreferences.source, 'inferred'), + }); + } +} + export async function computeAndStore(userId: string, agentId: string): Promise { let tasks: object[] = []; try { @@ -111,10 +159,13 @@ export async function computeAndStore(userId: string, agentId: string): Promise< created_at: f.createdAt, })); + // Load agent prefs (user overrides + previous inferences) to inject into the compute call. + const agentPrefs = await loadAgentPrefs(userId, agentId); + const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory }), + body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs }), signal: AbortSignal.timeout(15_000), }); @@ -129,6 +180,23 @@ export async function computeAndStore(userId: string, agentId: string): Promise< }; await storeAgentOutput(output); + + // Run inference framework for this agent and persist results. + // Failures are non-fatal — the compute result is already stored. + try { + const inferResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/infer`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ user_id: userId, feedback_history: feedbackHistory }), + signal: AbortSignal.timeout(10_000), + }); + if (inferResp.ok) { + const inferResult = await inferResp.json() as { inferred_prefs: Record }; + await persistInferredPrefs(userId, agentId, inferResult.inferred_prefs); + } + } catch { + // inference failure is non-fatal + } } // ── POST /api/agents/:agentId/compute ───────────────────────────────────────── diff --git a/services/api/src/routes/agent-registry.ts b/services/api/src/routes/agent-registry.ts index a6ae86e..e7c92d6 100644 --- a/services/api/src/routes/agent-registry.ts +++ b/services/api/src/routes/agent-registry.ts @@ -13,7 +13,7 @@ export function _resetRegistryCache() { _cache = null; } -async function fetchRegistry(): Promise { +export async function fetchRegistry(): Promise { if (_cache && Date.now() - _cache.fetchedAt < CACHE_TTL_MS) return _cache.payload; const upstream = await fetch(`${config.ML_SERVING_URL}/agents/registry`, { signal: AbortSignal.timeout(5000), diff --git a/services/api/src/routes/profile.ts b/services/api/src/routes/profile.ts new file mode 100644 index 0000000..912658b --- /dev/null +++ b/services/api/src/routes/profile.ts @@ -0,0 +1,202 @@ +/** + * GET /api/profile — read-through: user globals + prefs + contexts + consents + * PATCH /api/profile/prefs/:scope — upsert user_preferences rows (source='user') + * PATCH /api/profile/consents — grant or revoke consent keys + * PATCH /api/profile/contexts — activate/deactivate or create user contexts + * + * ADR-0014 step 4. + */ +import { Router, type Response, type IRouter } from 'express'; +import { db } from '../db/index.js'; +import { + users, + userPreferences, + userConsents, + userContexts, +} from '../db/schema.js'; +import { eq, and, isNull } from 'drizzle-orm'; +import { requireAuth, type AuthenticatedRequest } from '../middleware/session.js'; + +const router: IRouter = Router(); + +// ── GET /api/profile ───────────────────────────────────────────────────────── + +router.get('/', requireAuth as any, async (req: AuthenticatedRequest, res: Response) => { + const userId = req.userId!; + + const [user] = await db.select().from(users).where(eq(users.id, userId)).limit(1); + if (!user || user.deletedAt) { + res.status(404).json({ error: 'User not found' }); + return; + } + + const [prefs, consents, contexts] = await Promise.all([ + db.select().from(userPreferences).where(eq(userPreferences.userId, userId)), + db.select().from(userConsents).where(eq(userConsents.userId, userId)), + db.select().from(userContexts).where(eq(userContexts.userId, userId)), + ]); + + // Group prefs by scope: { 'orchestrator': { key: value_json, … }, 'agent:foo': { … } } + const prefsByScope: Record> = {}; + for (const p of prefs) { + if (!prefsByScope[p.scope]) prefsByScope[p.scope] = {}; + try { + prefsByScope[p.scope][p.key] = JSON.parse(p.valueJson); + } catch { + prefsByScope[p.scope][p.key] = p.valueJson; + } + } + + // Consents: include both active and revoked (callers can filter on revokedAt) + // Also fold in the legacy consentGiven bit if no user_consents row exists yet. + const consentMap: Record = {}; + for (const c of consents) { + consentMap[c.consentKey] = { grantedAt: c.grantedAt, revokedAt: c.revokedAt ?? null }; + } + // Legacy fallback: if data:core is missing and the old bit is set, synthesise it. + if (!consentMap['data:core'] && user.consentGiven) { + consentMap['data:core'] = { grantedAt: user.consentAt ?? user.createdAt, revokedAt: null }; + } + + res.json({ + user: { + id: user.id, + email: user.email, + name: user.name, + image: user.image, + tone: user.tone ?? null, + tipKinds: user.tipKindsJson ? JSON.parse(user.tipKindsJson) : null, + }, + prefs: prefsByScope, + consents: consentMap, + contexts: contexts.map((c) => ({ + name: c.name, + active: c.active, + schedule: c.scheduleJson ? JSON.parse(c.scheduleJson) : null, + createdAt: c.createdAt, + })), + }); +}); + +// ── PATCH /api/profile/prefs/:scope ────────────────────────────────────────── +// Body: { [key]: value } — each key is upserted as source='user'. + +router.patch('/prefs/:scope', requireAuth as any, async (req: AuthenticatedRequest, res: Response) => { + const userId = req.userId!; + const { scope } = req.params; + const body = req.body as Record; + + if (!scope || typeof scope !== 'string') { + res.status(400).json({ error: 'scope is required' }); + return; + } + if (!body || typeof body !== 'object' || Array.isArray(body)) { + res.status(400).json({ error: 'body must be a JSON object' }); + return; + } + + const now = new Date().toISOString(); + for (const [key, value] of Object.entries(body)) { + const valueJson = JSON.stringify(value); + await db + .insert(userPreferences) + .values({ userId, scope, key, valueJson, source: 'user', updatedAt: now }) + .onConflictDoUpdate({ + target: [userPreferences.userId, userPreferences.scope, userPreferences.key], + set: { valueJson, source: 'user', updatedAt: now }, + }); + } + + res.json({ ok: true }); +}); + +// ── PATCH /api/profile/consents ─────────────────────────────────────────────── +// Body: { grant?: string[], revoke?: string[] } + +router.patch('/consents', requireAuth as any, async (req: AuthenticatedRequest, res: Response) => { + const userId = req.userId!; + const { grant = [], revoke = [] } = req.body as { grant?: string[]; revoke?: string[] }; + + if (!Array.isArray(grant) || !Array.isArray(revoke)) { + res.status(400).json({ error: 'grant and revoke must be arrays' }); + return; + } + + const now = new Date().toISOString(); + + for (const key of grant) { + await db + .insert(userConsents) + .values({ userId, consentKey: key, grantedAt: now, revokedAt: null }) + .onConflictDoUpdate({ + target: [userConsents.userId, userConsents.consentKey], + set: { grantedAt: now, revokedAt: null }, + }); + } + + for (const key of revoke) { + await db + .update(userConsents) + .set({ revokedAt: now }) + .where( + and( + eq(userConsents.userId, userId), + eq(userConsents.consentKey, key), + isNull(userConsents.revokedAt), + ), + ); + } + + res.json({ ok: true }); +}); + +// ── PATCH /api/profile/contexts ─────────────────────────────────────────────── +// Body: { name: string, active?: boolean, schedule?: object|null } +// Creates the row if it doesn't exist; toggles active / updates schedule. + +router.patch('/contexts', requireAuth as any, async (req: AuthenticatedRequest, res: Response) => { + const userId = req.userId!; + const { name, active, schedule } = req.body as { + name?: string; + active?: boolean; + schedule?: unknown; + }; + + if (!name || typeof name !== 'string') { + res.status(400).json({ error: 'name is required' }); + return; + } + + const now = new Date().toISOString(); + const scheduleJson = schedule !== undefined ? JSON.stringify(schedule) : undefined; + + const existing = await db + .select() + .from(userContexts) + .where(and(eq(userContexts.userId, userId), eq(userContexts.name, name))) + .limit(1); + + if (existing.length === 0) { + await db.insert(userContexts).values({ + userId, + name, + active: active ?? false, + scheduleJson: scheduleJson ?? null, + createdAt: now, + }); + } else { + const set: Partial = {}; + if (active !== undefined) set.active = active; + if (scheduleJson !== undefined) set.scheduleJson = scheduleJson; + if (Object.keys(set).length > 0) { + await db + .update(userContexts) + .set(set) + .where(and(eq(userContexts.userId, userId), eq(userContexts.name, name))); + } + } + + res.json({ ok: true }); +}); + +export default router; diff --git a/services/api/src/routes/recommender.ts b/services/api/src/routes/recommender.ts index bc2dd7f..3dd0f39 100644 --- a/services/api/src/routes/recommender.ts +++ b/services/api/src/routes/recommender.ts @@ -12,6 +12,7 @@ import { todoistSource, dueAgeDays } from '../signals/todoist.js'; export { dueAgeDays }; import { SignalAggregator } from '../signals/aggregator.js'; import { getActiveAgentOutputs } from './agent-outputs.js'; +import { getEligibleAgentIds } from '../profile/eligibility.js'; const router: ExpressRouter = Router(); @@ -58,11 +59,13 @@ async function fetchOrchestratorTip( dayOfWeek: number, traceparent?: string, ): Promise { - const agentRows = await getActiveAgentOutputs(userId); - const agentOutputs = agentRows.map((r) => ({ - agent_id: r.agentId, - prompt_text: r.promptText, - })); + const [allAgentRows, eligibleIds] = await Promise.all([ + getActiveAgentOutputs(userId), + getEligibleAgentIds(userId), + ]); + const agentOutputs = allAgentRows + .filter((r) => eligibleIds.has(r.agentId)) + .map((r) => ({ agent_id: r.agentId, prompt_text: r.promptText })); const tasks = signals.slice(0, 10).map((s) => ({ content: s.content,