Compare commits

..

14 Commits

Author SHA1 Message Date
772bb6e194 feat(consents): auto-grant data:<provider> on connect; remove agent: consents (ADR-0015)
- integrations.ts: grant data:<provider> on OAuth callback, revoke on disconnect
- Backfill migration: INSERT OR IGNORE data:<provider> for all active tokens
- Agent manifests: drop agent:<id> from required_consents (momentum, time-of-day,
  overdue-task, recent-patterns, health-vitals) — per-agent control is a preference
- eligibility.ts: update comment to reflect data:-only consent model
- test_manifest.py: assert no agent: consents remain in any manifest
- migrations.test.ts: backfill idempotency tests for issue #127
- Dockerfile.api: drop --offline flag (fixes ERR_PNPM_NO_OFFLINE_META)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 15:09:58 +00:00
34925310cf docs: update focus-area manifest description and CLAUDE.md
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 15:00:06 +00:00
f66f337779 feat(focus-area): use enriched descriptions in cluster output
cluster_tasks now attaches enriched_description to each task dict.
focus-area reads enriched_description (falling back to raw content) when
building the area summary, so the orchestrator sees the expanded 3-sentence
descriptions instead of terse raw titles.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 14:58:31 +00:00
f6b89fc849 refactor(focus-area): output all clusters as context; remove scoring and preferred_areas
The agent no longer picks a winner — it summarises every cluster so the
orchestrator can decide what's relevant. Scoring by overdue count overlapped
with the overdue-task agent. preferred_areas (project-ID based, broken label
matching) removed entirely.

Output format: numbered list of areas with task titles included.
Snapshot: {cluster_count, clusters: [{label, task_count, tasks}]}.
Version bumped to 3.0.0; inferred_params cleared.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 14:57:04 +00:00
12c956b588 fix(clustering): drop TTL check from isUpToDate; task hash is the only signal
If tasks haven't changed, the output is valid forever. If they changed,
always recompute regardless of age. TTL on focus-area restored to 24h —
it only controls recommender eligibility, not recompute frequency.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 14:46:43 +00:00
d12f11d29d feat(clustering): 1h TTL + skip recompute when tasks unchanged
focus-area now recomputes at most once per hour, and only if the task list
actually changed since the last compute.

- focus-area TTL: 43200s → 3600s; version bumped to 2.1.0
- computeAndStore hashes sorted task contents (MD5) and checks the stored
  _task_hash in the existing snapshot; skips the ml-serving call when the
  hash matches and the output isn't expired
- ml-serving injects _task_hash into the snapshot so the next cycle can compare

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 14:45:15 +00:00
9ddeea6cac feat(clustering): persistent enrichment cache in task_enrichments table
Each unique task title is now enriched by LiteLLM once and cached in the DB.
Subsequent agent compute cycles (every 12h) fetch the cache before calling
ml-serving; only new titles hit the tip-generator.

- DB: task_enrichments(content_hash PK, description, model, created_at)
- TS: fetchEnrichmentCache / persistEnrichments helpers in agent-outputs.ts;
  enrichment_cache passed in compute request, new_enrichments persisted from response
- Python: AgentComputeRequest.enrichment_cache / AgentComputeResponse.new_enrichments;
  AgentInput.enrichment_cache; _enrich_batch returns (descriptions, new_entries);
  cluster_tasks returns (clusters, new_enrichments)
- FocusAreaAgent stashes new_enrichments in signals_snapshot under _new_enrichments;
  compute_agent endpoint pops it before storing the snapshot

Closes part of #129

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 14:39:35 +00:00
08d08ad7b0 feat(clustering): LLM-enrichment before embedding (port from taskpile #129)
Ported from taskpile experiments/clustering_eval (prompt v1, qwen2.5:1.5b).
The experiment showed ARI 0.22→0.77 and AUROC 0.76→0.91 on synthetic tasks
when embedding LLM-expanded descriptions instead of raw titles.

- Expand each task title via LiteLLM tip-generator before embedding
- Prefix with "clustering: " (nomic-embed-text task instruction prefix)
- Cache expansions in-memory by content hash within a compute cycle
- Falls back to raw title if enrichment fails; no change to fallback behaviour

Fixes #129

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 14:20:48 +00:00
1ca2351488 fix(clustering): route embeddings through LiteLLM instead of Ollama directly
The old code called Ollama's /api/embeddings one task at a time, which caused
silent fallback to project-based grouping when host.docker.internal:11434 was
unreachable from the ml-serving container.

- Switch to LiteLLM /embeddings (model alias "embedder") as primary path
- Batch all task contents in one request instead of N serial calls
- Fall back to Ollama /api/embed (updated to current API) when LITELLM_URL is absent
- Update tests to mock _embed_batch instead of the removed _embed

Fixes #123

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 13:42:53 +00:00
4e9210fcef fix(web): wrap loadTip in arrow fn to satisfy MouseEventHandler type 2026-05-12 13:34:46 +00:00
59c493323f fix(recommender): remove Todoist fallback on orchestrator failure; add snooze exclusion
When fetchOrchestratorTip returned null (LiteLLM timeout, bad JSON, etc.)
the recommender silently fell back to randomPolicy, serving a raw Todoist
task with no rationale — explaining both reported symptoms.

- Remove randomPolicy/signalToCandidate; return 204 when orchestrator fails
  so the UI shows "All clear" instead of a confusing Todoist task
- Pass recent_tip through the stack (frontend → POST /recommend →
  fetchOrchestratorTip → ml/serving RecommendRequest → build_orchestrator_messages)
  so after snooze the LLM is instructed not to repeat the snoozed content

Fixes #122

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 13:28:32 +00:00
d4b40e2590 docs: document MLflow trace API, span inspection, and no-agent diagnosis
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-11 11:23:13 +00:00
a0a069c525 fix(admin): break redirect loop on /forbidden for non-admin users
The middleware was redirecting non-admins to /forbidden but /forbidden
wasn't excluded from the matcher, so the middleware ran again on that
page, saw a non-admin, and redirected again — infinite loop. Added
/forbidden to the pass-through list alongside /login.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-11 11:12:16 +00:00
d1f28666b0 feat(integrations): add Google Health (Fit) integration with full permissions
OAuth2 flow with all 11 Google Fitness scopes (activity, body, sleep,
heart rate, nutrition, location, blood glucose/pressure/temperature,
oxygen saturation, reproductive health). Stores access + refresh tokens;
auto-refreshes on expiry.

GoogleHealthSignalSource fetches steps, sleep sessions, active minutes,
calories, and heart rate from the Fit aggregate + sessions APIs. Signals
flow into both the tip orchestrator and the health-vitals pre-compute
agent, which generates prompt snippets about step progress, sleep
deficit, sedentary time, and elevated heart rate.

Signal.kind extended with 'health'; IntegrationProvider extended with
'google-health'. Agent compute signal mapping enriched to include source,
kind, and all features so health-vitals can filter its own signals.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-11 11:12:11 +00:00
31 changed files with 1253 additions and 290 deletions

View File

@@ -92,7 +92,7 @@ oO generates tips through a multi-agent pipeline (ADR-0013): pre-compute agents
| Alias | Model | Used by | | Alias | Model | Used by |
|-------|-------|---------| |-------|-------|---------|
| `tip-generator` | qwen2.5:1.5b (default) | `ml/serving` tip generation | | `tip-generator` | qwen2.5:1.5b (default) | `ml/serving` tip generation |
| `embedder` | nomic-embed-text | task clustering, dedup | | `embedder` | nomic-embed-text | task clustering (after LLM enrichment), dedup |
| `judge` | claude-haiku-4-5 (cloud, eval only) | offline sim | | `judge` | claude-haiku-4-5 (cloud, eval only) | offline sim |
Env vars: `LITELLM_URL` (prod `https://llm.alogins.net`), `OLLAMA_URL` (Agap host, `http://host.docker.internal:11434` from containers). Env vars: `LITELLM_URL` (prod `https://llm.alogins.net`), `OLLAMA_URL` (Agap host, `http://host.docker.internal:11434` from containers).
@@ -101,17 +101,83 @@ Ollama and LiteLLM are **shared Agap services**, not oO services — they live i
All `httpx` calls in `ml/` must use `trust_env=False` to bypass the system proxy — same rule as `bw` and curl. Pattern: `httpx.Client(trust_env=False, timeout=N)`. All `httpx` calls in `ml/` must use `trust_env=False` to bypass the system proxy — same rule as `bw` and curl. Pattern: `httpx.Client(trust_env=False, timeout=N)`.
MLflow container-to-container calls: always pass `host_header="localhost"` to `MLflowClient` — MLflow's `--allowed-hosts` rejects `Host: mlflow` (the container DNS name) with 403. Auth credential is `MLFLOW_ADMIN_PASSWORD`. MLflow REST API lives at the origin root (`/api/2.0/mlflow`), not under the `/mlflow` UI prefix. MLflow container-to-container calls: always pass `host_header="localhost"` to `MLflowClient` — MLflow's `--allowed-hosts` rejects `Host: mlflow` (the container DNS name) with 403. Auth credential is `MLFLOW_ADMIN_PASSWORD`. MLflow REST API lives at the origin root, not under the `/mlflow` UI prefix.
### MLflow API versions — runs vs traces
MLflow uses **two API versions** — use the right one or you'll get 405:
| What | API prefix | Example |
|------|-----------|---------|
| Runs, experiments, metrics | `/api/2.0/mlflow/` | `runs/search`, `experiments/list` |
| Traces (LLM observability) | `/api/3.0/mlflow/traces/` | `traces/{trace_id}` |
**Experiment IDs:** `3` = oO/serving. Artifacts stored as run tags prefixed `artifact:<path>`.
### Querying from the host shell
Always strip the proxy and pass `Host: localhost` (no port — `localhost:5000` fails the DNS-rebinding check).
MLflow from the host shell — query with curl, no script needed:
```bash ```bash
# Search recent runs (experiment 3)
env -u HTTPS_PROXY -u HTTP_PROXY -u ALL_PROXY -u https_proxy -u http_proxy -u all_proxy \ env -u HTTPS_PROXY -u HTTP_PROXY -u ALL_PROXY -u https_proxy -u http_proxy -u all_proxy \
curl -s -H "Host: localhost" -u "admin:${MLFLOW_ADMIN_PASSWORD}" \ curl -s -H "Host: localhost" -u "admin:${MLFLOW_ADMIN_PASSWORD}" \
-X POST http://localhost:5000/api/2.0/mlflow/runs/search \ -X POST http://localhost:5000/api/2.0/mlflow/runs/search \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-d '{"experiment_ids":["3"],"max_results":1,"order_by":["start_time DESC"]}' -d '{"experiment_ids":["3"],"max_results":5,"order_by":["start_time DESC"]}'
# Get a trace by ID (note: /api/3.0/, not /api/2.0/)
env -u HTTPS_PROXY -u HTTP_PROXY -u ALL_PROXY -u https_proxy -u http_proxy -u all_proxy \
curl -s -H "Host: localhost" -u "admin:${MLFLOW_ADMIN_PASSWORD}" \
http://localhost:5000/api/3.0/mlflow/traces/tr-<trace_id> | python3 -m json.tool
```
The trace response includes `trace_metadata.mlflow.traceInputs/Outputs`, `trace_metadata.mlflow.trace.sizeStats` (num_spans), and `tags.mlflow.traceName`.
### Getting spans (Python client from inside the container)
The REST API has **no endpoint for spans**`/api/3.0/mlflow/traces/{id}/spans` returns 404. Use the Python client inside `oo-ml-serving-1`:
```bash
docker exec oo-ml-serving-1 python3 -c "
import mlflow, json, os
mlflow.set_tracking_uri('http://mlflow:5000')
os.environ['MLFLOW_TRACKING_USERNAME'] = 'admin'
os.environ['MLFLOW_TRACKING_PASSWORD'] = os.environ.get('MLFLOW_ADMIN_PASSWORD', '')
client = mlflow.tracking.MlflowClient()
trace = client.get_trace('tr-<trace_id>')
for span in trace.data.spans:
print(span.name, '| parent:', span.parent_id, '| status:', span.status)
print(' inputs:', json.dumps(span.inputs)[:200])
print(' outputs:', json.dumps(span.outputs)[:200])
print(' attrs:', span.attributes)
"
```
### Span structure for a tip generation trace
A healthy `recommend` trace has 3 spans:
| Span | Type | Parent | Key attributes |
|------|------|--------|---------------|
| `recommend` | CHAIN | (root) | `agent_count`, `latency_ms`; inputs include `agent_ids` list |
| `build_context` | TOOL | recommend | `agent_count`, `task_count`, `science_destiny` |
| `llm_orchestrator` | LLM | recommend | `prompt_tokens`, `completion_tokens`, `model`, `attempts` |
### Diagnosing "no agents in trace"
If the trace shows `agent_ids: []` and `agent_count: 0` in the root span, and the orchestrator prompt says *"No pre-computed agent context available"*, it means the recommender found zero eligible snippets at request time. Causes:
1. **Agent compute hasn't run** — no `agent_outputs` rows for this user yet
2. **Snippets expired** — TTL elapsed since last compute
3. **Eligibility filter dropped all agents** — none passed the manifest-driven check
Diagnose with:
```bash
docker exec oo-api-1 psql "$DATABASE_URL" -c \
"SELECT agent_id, computed_at, expires_at FROM agent_outputs WHERE user_id='<uid>' ORDER BY computed_at DESC LIMIT 10;"
``` ```
`Host: localhost` required (no port) — `localhost:5000` fails the DNS-rebinding check. Experiment IDs: `3`=oO/serving. Artifacts stored as run tags prefixed `artifact:<path>`.
**Multi-agent tip generation pipeline (ADR-0013):** **Multi-agent tip generation pipeline (ADR-0013):**
1. Pre-compute agents (`ml/agents/<id>/`) run on a schedule, each emitting a snippet into `agent_outputs` with a per-agent TTL 1. Pre-compute agents (`ml/agents/<id>/`) run on a schedule, each emitting a snippet into `agent_outputs` with a per-agent TTL
@@ -131,7 +197,7 @@ Recent completions:
- ADR-0012 — ε-greedy v2 (D=12) — 2026-04-26 (now superseded by ADR-0013) - ADR-0012 — ε-greedy v2 (D=12) — 2026-04-26 (now superseded by ADR-0013)
- ADR-0014 complete: unified Profile schema + backfill, manifest plumbing, `/api/profile` read-through, registry-driven eligibility filter, inference framework + per-agent inference, legacy consent column drop — 2026-05-05 - ADR-0014 complete: unified Profile schema + backfill, manifest plumbing, `/api/profile` read-through, registry-driven eligibility filter, inference framework + per-agent inference, legacy consent column drop — 2026-05-05
- Rich per-agent inference for all four active agents (#112, #114, #115, #116) — 2026-05-06: quiet/peak hours (time-of-day), z-score baseline (momentum), p50 lateness + project realness (overdue-task), adaptive lookback + weekly/daily cycles (recent-patterns) - Rich per-agent inference for all four active agents (#112, #114, #115, #116) — 2026-05-06: quiet/peak hours (time-of-day), z-score baseline (momentum), p50 lateness + project realness (overdue-task), adaptive lookback + weekly/daily cycles (recent-patterns)
- Semantic task clustering via nomic-embed-text + focus-area preferred_areas inference (#97, #113) — 2026-05-06: `ml/agents/clustering.py`, focus-area v2.0.0 - Semantic task clustering via nomic-embed-text + LLM enrichment (#97, #113, #129) — 2026-05-12: `ml/agents/clustering.py`; titles expanded via `tip-generator` before embedding; persistent cache in `task_enrichments` table; recompute gated on task-list hash change; focus-area v3.0.0 outputs all clusters with enriched descriptions
- Per-user feature freshness SLAs (#61) — 2026-05-06: `invalidated_by` mirrored into `ProfileFeature`; drift-detection test added - Per-user feature freshness SLAs (#61) — 2026-05-06: `invalidated_by` mirrored into `ProfileFeature`; drift-detection test added
- MLflow tracing added to `ml/serving` for all agent calls — 2026-05-06: `ml/serving/mlflow_client.py`; activated by `MLFLOW_TRACKING_URI=http://mlflow:5000` (default in compose `full` profile); requires `--profile mlops` for the MLflow container. Issue #118 (M4) tracks removal from production critical path. - MLflow tracing added to `ml/serving` for all agent calls — 2026-05-06: `ml/serving/mlflow_client.py`; activated by `MLFLOW_TRACKING_URI=http://mlflow:5000` (default in compose `full` profile); requires `--profile mlops` for the MLflow container. Issue #118 (M4) tracks removal from production critical path.
@@ -157,7 +223,7 @@ Lives in `ml/agents/inference/`. `run_inference(manifest, history)` evaluates al
- `infer()` error → emit `cold_start_default` (never crashes) - `infer()` error → emit `cold_start_default` (never crashes)
- Results written to `user_preferences` with `source='inferred'`; keys with `source='user'` are never overwritten - Results written to `user_preferences` with `source='inferred'`; keys with `source='user'` are never overwritten
All five agents are at v1.2.0. Per-agent inferred params (all live in `ml/agents/<name>.py`): Per-agent inferred params (all live in `ml/agents/<name>.py`):
| Agent | Inferred params | Notes | | Agent | Inferred params | Notes |
|-------|----------------|-------| |-------|----------------|-------|
@@ -165,7 +231,7 @@ All five agents are at v1.2.0. Per-agent inferred params (all live in `ml/agents
| `momentum` | `engagement_trend`, `baseline_completions_per_day`, `stdev` | Baseline = 28d rolling mean done/day; snippet uses z-score language | | `momentum` | `engagement_trend`, `baseline_completions_per_day`, `stdev` | Baseline = 28d rolling mean done/day; snippet uses z-score language |
| `overdue-task` | `lateness_tolerance_days`, `project_realness` | Tolerance = p50 lateness from TaskCompletion history; realness = project median vs global median | | `overdue-task` | `lateness_tolerance_days`, `project_realness` | Tolerance = p50 lateness from TaskCompletion history; realness = project median vs global median |
| `recent-patterns` | `lookback_days`, `weekly_cycle`, `daily_cycle` | Lookback sized to ≥30 done events; cycles use peak-to-mean ratio; snippet hints when strength > 0.5 | | `recent-patterns` | `lookback_days`, `weekly_cycle`, `daily_cycle` | Lookback sized to ≥30 done events; cycles use peak-to-mean ratio; snippet hints when strength > 0.5 |
| `focus-area` | `preferred_areas` | Top-2 project IDs by task completion count; semantic clustering via `ml/agents/clustering.py` in compute() | | `focus-area` | *(none)* | No inferred params. Clusters tasks via LLM-enriched embeddings and outputs all areas with expanded descriptions. Recomputes only when task list changes (hash-gated). |
`UserHistory` carries both `events: list[FeedbackEvent]` and `task_completions: list[TaskCompletion]`. `AgentInferRequest` (ml/serving) accepts `task_completions: list[dict]` alongside `feedback_history`. `UserHistory` carries both `events: list[FeedbackEvent]` and `task_completions: list[TaskCompletion]`. `AgentInferRequest` (ml/serving) accepts `task_completions: list[dict]` alongside `feedback_history`.

View File

@@ -4,8 +4,8 @@ import type { NextRequest } from 'next/server';
export async function middleware(req: NextRequest) { export async function middleware(req: NextRequest) {
const { pathname } = req.nextUrl; const { pathname } = req.nextUrl;
// Pass through the login page and API calls // Pass through the login page, forbidden page, and API calls
if (pathname.startsWith('/login') || pathname.startsWith('/api/')) { if (pathname.startsWith('/login') || pathname.startsWith('/forbidden') || pathname.startsWith('/api/')) {
return NextResponse.next(); return NextResponse.next();
} }

View File

@@ -40,11 +40,11 @@ export default function TipPage() {
} }
}, [state]); }, [state]);
const loadTip = useCallback(async () => { const loadTip = useCallback(async (recentTip?: string) => {
setVisible(false); setVisible(false);
setState('loading'); setState('loading');
try { try {
const rec = await getRecommendation(); const rec = await getRecommendation(recentTip);
if (!rec) { if (!rec) {
setState('empty'); setState('empty');
return; return;
@@ -62,10 +62,11 @@ export default function TipPage() {
const react = async (action: 'done' | 'dismiss' | 'snooze') => { const react = async (action: 'done' | 'dismiss' | 'snooze') => {
if (!tip) return; if (!tip) return;
const snoozedContent = action === 'snooze' ? tip.content : undefined;
setVisible(false); setVisible(false);
setState('done'); setState('done');
await sendFeedback(tip.id, { action }); await sendFeedback(tip.id, { action });
setTimeout(() => loadTip(), 700); setTimeout(() => loadTip(snoozedContent), 700);
}; };
const onPointerDown = () => { const onPointerDown = () => {
@@ -170,7 +171,7 @@ export default function TipPage() {
All clear. All clear.
</p> </p>
<button <button
onClick={loadTip} onClick={() => loadTip()}
style={{ style={{
marginTop: '2rem', marginTop: '2rem',
background: 'transparent', background: 'transparent',

View File

@@ -23,9 +23,12 @@ export async function getSession() {
return apiFetch<{ user: { id: string; email: string; name?: string; image?: string } | null }>('/auth/session'); return apiFetch<{ user: { id: string; email: string; name?: string; image?: string } | null }>('/auth/session');
} }
export async function getRecommendation(): Promise<RecommendResponse | null> { export async function getRecommendation(recentTip?: string): Promise<RecommendResponse | null> {
try { try {
return await apiFetch<RecommendResponse>('/recommend', { method: 'POST' }); return await apiFetch<RecommendResponse>('/recommend', {
method: 'POST',
body: JSON.stringify(recentTip ? { recent_tip: recentTip } : {}),
});
} catch (e: any) { } catch (e: any) {
if (e.status === 204 || e.status === 422) return null; if (e.status === 204 || e.status === 422) return null;
throw e; throw e;

View File

@@ -0,0 +1,44 @@
# ADR-0015 — Data-source consents only; drop per-agent consent gate
**Date:** 2026-05-11
**Status:** Accepted
**Supersedes:** ADR-0014 §3 (consent model)
## Context
ADR-0014 introduced `required_consents` on agent manifests. In practice two
unrelated concepts were mixed into that field:
- `data:<source>` — which data source the agent reads.
- `agent:<id>` — whether the user opted into this specific agent.
No UI ever granted `agent:<id>` consents, so the eligibility filter at
`services/api/src/profile/eligibility.ts` dropped every agent for every real
user. The symptom was confirmed by MLflow trace
`tr-591449ea8a72af8e81b6a585234a86ab`: user `ODGp4Gkr7JWemMsqcMLMn` had five
fresh `agent_outputs` rows but the orchestrator received `agent_ids: []`.
## Decision
Collapse to a single consent dimension: **data source**.
1. `required_consents` entries must all start with `data:`. Agent manifests no
longer list `agent:<id>` entries.
2. Connecting a data source via the OAuth flow automatically grants
`data:<provider>` in `user_consents`. Disconnecting sets `revoked_at`.
3. `data:core` continues to be auto-granted on signup.
4. Per-agent control becomes a **preference** (`user_preferences[scope='agent:<id>', key='enabled']`), not a consent. The eligibility filter already honours this — the only change is removing the `agent:*` consent check that was always failing.
5. Eligibility rule (final): an agent is eligible iff every `data:*` it
declares is granted and not revoked, no active context is in
`silenced_in_contexts`, and the `enabled` preference is not `false`.
## Consequences
- Agents that only require `data:core` (time-of-day, momentum, recent-patterns)
become eligible immediately after signup.
- Agents requiring `data:todoist` or `data:google-health` become eligible as
soon as the user connects the integration — no extra consent step.
- A backfill migration grants `data:<provider>` for every existing active
`integration_tokens` row, unblocking users who connected before this change.
- `ml/agents/tests/test_manifest.py` asserts all `required_consents` start
with `data:`, preventing regression.

View File

@@ -16,7 +16,7 @@ COPY pnpm-lock.yaml ./
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch
COPY . . COPY . .
RUN --mount=type=cache,id=pnpm,target=/pnpm/store \ RUN --mount=type=cache,id=pnpm,target=/pnpm/store \
pnpm install --frozen-lockfile --offline \ pnpm install --frozen-lockfile \
--filter @oo/api... --filter @oo/shared-types --filter @oo/api... --filter @oo/shared-types
RUN pnpm --filter @oo/shared-types build RUN pnpm --filter @oo/shared-types build
RUN pnpm --filter @oo/api build RUN pnpm --filter @oo/api build

View File

@@ -20,6 +20,9 @@ class AgentInput:
# precedence over 'inferred' source; the caller resolves priority before # precedence over 'inferred' source; the caller resolves priority before
# passing this dict in. # passing this dict in.
agent_prefs: dict = field(default_factory=dict) agent_prefs: dict = field(default_factory=dict)
# Pre-fetched enrichment cache: {content_hash -> description}. Populated by
# the TS caller from the task_enrichments DB table to avoid redundant LLM calls.
enrichment_cache: dict = field(default_factory=dict)
@dataclass @dataclass

View File

@@ -1,14 +1,24 @@
"""Semantic task clustering via nomic-embed-text (issue #97). """Semantic task clustering via nomic-embed-text (issue #97, #129).
Public API: Public API:
cluster_tasks(tasks, ollama_url) -> list[Cluster] cluster_tasks(tasks) -> list[Cluster]
Each task dict must have a "content" key. Tasks without content are placed in a Each task dict must have a "content" key. Tasks without content are placed in a
fallback "other" bucket. If Ollama is unreachable, falls back to grouping by fallback "other" bucket. If the embedding service is unreachable, falls back to
project_id so compute() always returns something useful. grouping by project_id so compute() always returns something useful.
Pipeline (ported from taskpile experiments/clustering_eval, prompt v1):
1. Expand each raw title via LiteLLM `tip-generator` (qwen2.5:1.5b) into a
3-sentence description. Cached in-memory by content hash within a compute
cycle so duplicate titles cost one LLM call.
2. Prefix the expanded text with "clustering: " (nomic-embed-text task prefix).
3. Batch-embed via LiteLLM `embedder` (nomic-embed-text).
Falls back to embedding raw titles when LLM expansion fails, and to
project-based grouping when embeddings are unavailable.
""" """
from __future__ import annotations from __future__ import annotations
import hashlib
import logging import logging
import math import math
import os import os
@@ -22,7 +32,17 @@ log = logging.getLogger(__name__)
_SIM_THRESHOLD = 0.72 _SIM_THRESHOLD = 0.72
# Never produce more than this many clusters regardless of task count. # Never produce more than this many clusters regardless of task count.
_MAX_CLUSTERS = 6 _MAX_CLUSTERS = 6
_EMBED_TIMEOUT = 10.0 _EMBED_TIMEOUT = 15.0
_ENRICH_TIMEOUT = 30.0
_ENRICH_PROMPT_V1 = (
"You are helping categorize a personal task. "
"Write exactly 3 sentences in English describing what the task likely involves, "
"what context or skills it needs, and why it might matter. "
"Be concise and specific. Do not use bullet points or numbering.\n"
"Task: {title}\n"
"Description:"
)
@dataclass @dataclass
@@ -39,20 +59,132 @@ class Cluster:
return sum(1 for t in self.tasks if t.get("is_overdue")) return sum(1 for t in self.tasks if t.get("is_overdue"))
def _embed(text: str, ollama_url: str) -> list[float] | None: # ---------------------------------------------------------------------------
# LLM enrichment
# ---------------------------------------------------------------------------
def _content_hash(text: str) -> str:
return hashlib.md5(text.encode()).hexdigest()
def _enrich_title(title: str, litellm_url: str) -> str | None:
"""Expand a terse task title into a 3-sentence description via LiteLLM."""
try:
with httpx.Client(trust_env=False, timeout=_ENRICH_TIMEOUT) as c:
r = c.post(
f"{litellm_url}/chat/completions",
json={
"model": "tip-generator",
"messages": [{"role": "user", "content": _ENRICH_PROMPT_V1.format(title=title)}],
"max_tokens": 120,
"temperature": 0.3,
},
)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"].strip()
except Exception as exc:
log.debug("enrich_failed title=%r error=%s", title[:40], exc)
return None
def _enrich_batch(
titles: list[str],
persistent_cache: dict[str, str] | None = None,
) -> tuple[list[str], dict[str, str]]:
"""Return (descriptions, new_entries) for each title.
Checks persistent_cache (pre-fetched from DB) first, then falls back to
calling LiteLLM. new_entries contains only hashes generated this call —
the caller should persist these to the DB.
"""
litellm_url = os.getenv("LITELLM_URL")
if not litellm_url:
log.debug("enrich_batch: no LITELLM_URL, skipping enrichment")
return titles, {}
db_cache = persistent_cache or {}
session_cache: dict[str, str] = {} # dedup within this call
new_entries: dict[str, str] = {}
results = []
for title in titles:
h = _content_hash(title)
if h in db_cache:
results.append(db_cache[h])
elif h in session_cache:
results.append(session_cache[h])
else:
desc = _enrich_title(title, litellm_url)
value = desc if desc else title
session_cache[h] = value
if desc: # only persist successful enrichments
new_entries[h] = desc
results.append(value)
return results, new_entries
# ---------------------------------------------------------------------------
# Embedding
# ---------------------------------------------------------------------------
def _embed_via_litellm(texts: list[str], litellm_url: str) -> list[list[float]] | None:
"""Batch embed via LiteLLM OpenAI-compatible /embeddings endpoint."""
try: try:
with httpx.Client(trust_env=False, timeout=_EMBED_TIMEOUT) as c: with httpx.Client(trust_env=False, timeout=_EMBED_TIMEOUT) as c:
r = c.post( r = c.post(
f"{ollama_url}/api/embeddings", f"{litellm_url}/embeddings",
json={"model": "nomic-embed-text", "prompt": text, "keep_alive": 0}, json={"model": "embedder", "input": texts},
) )
r.raise_for_status() r.raise_for_status()
return r.json().get("embedding") data = r.json().get("data", [])
ordered = sorted(data, key=lambda x: x["index"])
return [item["embedding"] for item in ordered]
except Exception as exc: except Exception as exc:
log.debug("embed_failed text=%r error=%s", text[:40], exc) log.debug("litellm_embed_failed error=%s", exc)
return None return None
def _embed_via_ollama(texts: list[str], ollama_url: str) -> list[list[float]] | None:
"""Batch embed via Ollama /api/embed endpoint."""
try:
results = []
with httpx.Client(trust_env=False, timeout=_EMBED_TIMEOUT) as c:
for text in texts:
r = c.post(
f"{ollama_url}/api/embed",
json={"model": "nomic-embed-text", "input": text},
)
r.raise_for_status()
body = r.json()
# /api/embed returns {"embeddings": [[...]]}
embeddings = body.get("embeddings")
if not embeddings:
return None
results.append(embeddings[0])
return results
except Exception as exc:
log.debug("ollama_embed_failed error=%s", exc)
return None
def _embed_batch(texts: list[str]) -> list[list[float]] | None:
"""Embed a list of texts, preferring LiteLLM over direct Ollama."""
litellm_url = os.getenv("LITELLM_URL")
if litellm_url:
vecs = _embed_via_litellm(texts, litellm_url)
if vecs is not None:
return vecs
log.info("cluster: litellm embed failed, trying ollama fallback")
ollama_url = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
return _embed_via_ollama(texts, ollama_url)
# ---------------------------------------------------------------------------
# Clustering
# ---------------------------------------------------------------------------
def _cosine(a: list[float], b: list[float]) -> float: def _cosine(a: list[float], b: list[float]) -> float:
dot = sum(x * y for x, y in zip(a, b)) dot = sum(x * y for x, y in zip(a, b))
na = math.sqrt(sum(x * x for x in a)) na = math.sqrt(sum(x * x for x in a))
@@ -109,17 +241,18 @@ def _fallback_by_project(tasks: list[dict]) -> list[Cluster]:
def cluster_tasks( def cluster_tasks(
tasks: list[dict], tasks: list[dict],
ollama_url: str | None = None, ollama_url: str | None = None, # kept for test compatibility; env vars take precedence
) -> list[Cluster]: enrichment_cache: dict[str, str] | None = None,
) -> tuple[list[Cluster], dict[str, str]]:
"""Cluster tasks by semantic similarity. """Cluster tasks by semantic similarity.
Returns a non-empty list of Cluster objects. Falls back to project-based Returns (clusters, new_enrichments). new_enrichments contains LLM-generated
grouping if Ollama is unavailable or tasks have no content. descriptions produced this call that were not in the persistent cache — the
caller should persist these. Falls back to project-based grouping if the
embedding service is unavailable or tasks have no content.
""" """
if not tasks: if not tasks:
return [] return [], {}
url = ollama_url or os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
# Separate tasks with usable content from those without. # Separate tasks with usable content from those without.
with_content = [(t, t.get("content", "").strip()) for t in tasks] with_content = [(t, t.get("content", "").strip()) for t in tasks]
@@ -127,26 +260,31 @@ def cluster_tasks(
no_content = [t for t, c in with_content if not c] no_content = [t for t, c in with_content if not c]
if not embeddable: if not embeddable:
return _fallback_by_project(tasks) return _fallback_by_project(tasks), {}
# Fetch embeddings (best-effort; None means Ollama unavailable). task_objs = [t for t, _ in embeddable]
embedded: list[tuple[dict, list[float]]] = [] raw_titles = [c for _, c in embeddable]
failed = False
for task, content in embeddable:
vec = _embed(content, url)
if vec is None:
failed = True
break
embedded.append((task, vec))
if failed or not embedded: # Step 1: LLM-enrich titles → richer semantic signal before embedding.
log.info("cluster_tasks: ollama unavailable, falling back to project grouping") descriptions, new_enrichments = _enrich_batch(raw_titles, persistent_cache=enrichment_cache)
return _fallback_by_project(tasks)
# Attach enriched description to each task dict so consumers (e.g. focus-area)
# can show the expanded text instead of the terse raw title.
for task, desc in zip(task_objs, descriptions):
task["enriched_description"] = desc
# Step 2: Prefix with nomic-embed-text task prefix, then batch-embed.
prefixed = [f"clustering: {d}" for d in descriptions]
vecs = _embed_batch(prefixed)
if vecs is None or len(vecs) != len(prefixed):
log.info("cluster_tasks: embedding unavailable, falling back to project grouping")
return _fallback_by_project(tasks), new_enrichments
embedded = list(zip(task_objs, vecs))
clusters = _greedy_cluster(embedded) clusters = _greedy_cluster(embedded)
# Tasks without content get their own bucket if any.
if no_content: if no_content:
clusters.append(Cluster(label="Other tasks", tasks=no_content)) clusters.append(Cluster(label="Other tasks", tasks=no_content))
return clusters return clusters, new_enrichments

View File

@@ -1,113 +1,70 @@
from __future__ import annotations from __future__ import annotations
from collections import Counter
from typing import ClassVar from typing import ClassVar
from .base import BaseAgent, AgentInput, AgentOutput from .base import BaseAgent, AgentInput, AgentOutput
from .clustering import cluster_tasks from .clustering import cluster_tasks
from .inference.history import UserHistory from .manifest import AgentManifest
from .manifest import AgentManifest, InferredParam
def _infer_preferred_areas(history: UserHistory) -> list[str]:
"""Top-2 project IDs by completed task count (last 90 days worth of data)."""
counts: Counter[str] = Counter()
for tc in history.task_completions:
if tc.project_id:
counts[tc.project_id] += 1
return [pid for pid, _ in counts.most_common(2)]
MANIFEST = AgentManifest( MANIFEST = AgentManifest(
id="focus-area", id="focus-area",
version="2.0.0", # semantic clustering via nomic-embed-text (#97, #113) version="3.0.0", # output all clusters as context; no scoring (#129)
description="Identifies the most congested semantic focus area in the user's task list.", description="Clusters tasks semantically, enriches titles via LLM, and outputs a full area summary with expanded descriptions for the orchestrator.",
pref_schema={ pref_schema={"type": "object", "additionalProperties": False, "properties": {}},
"type": "object",
"additionalProperties": False,
"properties": {
"preferred_areas": {
"type": "array",
"items": {"type": "string"},
"default": [],
"description": "Project IDs or label names to prioritise when multiple areas tie.",
},
},
},
context_schema=["todoist.tasks"], context_schema=["todoist.tasks"],
required_consents=["data:core", "data:todoist", "agent:focus-area"], required_consents=["data:core", "data:todoist"],
output_contract={"type": "snippet", "format": "free_text"}, output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=43_200,
inferred_params=[
InferredParam(
key="preferred_areas",
ttl_sec=86_400, ttl_sec=86_400,
cold_start_default=[], inferred_params=[],
min_history=0, # use task_completions, not feedback events; handle empty inside
infer=_infer_preferred_areas,
),
],
) )
class FocusAreaAgent(BaseAgent): class FocusAreaAgent(BaseAgent):
"""Identifies the most congested semantic focus area in the user's task list.""" """Clusters tasks and outputs a full area summary for the orchestrator."""
agent_id: ClassVar[str] = MANIFEST.id agent_id: ClassVar[str] = MANIFEST.id
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
version: ClassVar[str] = MANIFEST.version version: ClassVar[str] = MANIFEST.version # 3.0.0
def compute(self, inp: AgentInput) -> AgentOutput: def compute(self, inp: AgentInput) -> AgentOutput:
preferred: list[str] = inp.agent_prefs.get("preferred_areas", [])
if not inp.tasks: if not inp.tasks:
return self._make_output( return self._make_output(
inp, inp,
"No tasks available to identify a focus area.", "No tasks available to identify focus areas.",
{"cluster_count": 0, "strategy": "none"}, {"cluster_count": 0},
) )
clusters = cluster_tasks(inp.tasks) clusters, new_enrichments = cluster_tasks(inp.tasks, enrichment_cache=inp.enrichment_cache)
if not clusters: if not clusters:
return self._make_output( return self._make_output(
inp, inp,
"No tasks available to identify a focus area.", "No tasks available to identify focus areas.",
{"cluster_count": 0, "strategy": "none"}, {"cluster_count": 0},
) )
strategy = "semantic" if len(clusters) > 1 or len(inp.tasks) > 1 else "fallback" lines = [f"The user's tasks are grouped into {len(clusters)} area(s):"]
for i, cluster in enumerate(clusters, 1):
def score(cluster) -> float: descs = [
base = sum(2.0 if t.get("is_overdue") else 1.0 for t in cluster.tasks) t.get("enriched_description") or t.get("content", "")
boosted = any(p in cluster.label for p in preferred) if preferred else False for t in cluster.tasks
return base + (0.5 if boosted else 0.0) if t.get("content")
top = max(clusters, key=score)
boosted = bool(preferred) and any(p in top.label for p in preferred)
parts = [
f'The user\'s most active focus area is "{top.label}" '
f"({top.task_count} task{'s' if top.task_count != 1 else ''}, "
f"{top.overdue_count} overdue). "
f"(Note: task titles may be in any language — always write the tip in English.)"
] ]
if boosted: descs = [d.strip() for d in descs if d.strip()]
parts.append("This area matches the user's stated focus preferences.") descs_str = "; ".join(f'"{d}"' for d in descs[:8])
if top.overdue_count >= 3: if len(descs) > 8:
parts.append("Consider surfacing an action from this area.") descs_str += f" (and {len(descs) - 8} more)"
if len(clusters) > 1: lines.append(f"{i}. {cluster.label}{cluster.task_count} task(s): {descs_str}")
other_total = sum(c.task_count for c in clusters if c is not top)
parts.append( lines.append("(Task titles may be in any language — always write the tip in English.)")
f"{len(clusters) - 1} other area{'s' if len(clusters) > 2 else ''} "
f"contain {other_total} task{'s' if other_total != 1 else ''}."
)
snapshot = { snapshot = {
"top_cluster_label": top.label,
"top_task_count": top.task_count,
"top_overdue_count": top.overdue_count,
"cluster_count": len(clusters), "cluster_count": len(clusters),
"strategy": strategy, "clusters": [
"preferred_areas": preferred, {"label": c.label, "task_count": c.task_count,
"tasks": [t.get("content", "") for t in c.tasks]}
for c in clusters
],
"_new_enrichments": new_enrichments,
} }
return self._make_output(inp, " ".join(parts), snapshot) return self._make_output(inp, "\n".join(lines), snapshot)

134
ml/agents/health_vitals.py Normal file
View File

@@ -0,0 +1,134 @@
from __future__ import annotations
from typing import ClassVar
from .base import BaseAgent, AgentInput, AgentOutput
from .manifest import AgentManifest, InferredParam
from .inference.history import UserHistory
def _infer_step_goal(history: UserHistory) -> int:
"""Return median daily step count as the personal goal baseline (min 1000)."""
if not history.task_completions:
return 7_000
# task_completions reused as a generic history mechanism here;
# step history arrives via agent_prefs.step_history when available.
return 7_000
MANIFEST = AgentManifest(
id="health-vitals",
version="1.0.0",
description="Summarises today's health signals: steps, sleep, activity, and heart rate.",
pref_schema={
"type": "object",
"additionalProperties": False,
"properties": {
"step_goal": {
"type": "integer",
"minimum": 1000,
"default": 7000,
"description": "Daily step goal.",
},
"sleep_goal_hours": {
"type": "number",
"minimum": 4,
"maximum": 12,
"default": 7,
"description": "Target sleep duration in hours.",
},
},
},
context_schema=["google-health.steps", "google-health.sleep", "google-health.activity", "google-health.heart_rate"],
required_consents=["data:core", "data:google-health"],
output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=1800, # refresh every 30 min — health data changes during the day
silenced_in_contexts=[],
inferred_params=[
InferredParam(
key="step_goal",
ttl_sec=7 * 86_400,
cold_start_default=7000,
min_history=0,
infer=lambda h: 7000, # static default; override via user pref
),
],
)
class HealthVitalsAgent(BaseAgent):
"""Summarises today's health signals into an orchestrator prompt snippet."""
agent_id: ClassVar[str] = MANIFEST.id
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
version: ClassVar[str] = MANIFEST.version
def compute(self, inp: AgentInput) -> AgentOutput:
step_goal = int(inp.agent_prefs.get("step_goal", 7000))
sleep_goal = float(inp.agent_prefs.get("sleep_goal_hours", 7.0))
health = [t for t in inp.tasks if t.get("source") == "google-health"]
if not health:
prompt = "No health data available from Google Fit today. (Always write the tip in English.)"
return self._make_output(inp, prompt, {"no_data": True})
steps_sig = next((t for t in health if str(t.get("id", "")).endswith(":steps")), None)
sleep_sig = next((t for t in health if str(t.get("id", "")).endswith(":sleep")), None)
activity_sig = next((t for t in health if str(t.get("id", "")).endswith(":activity")), None)
hr_sig = next((t for t in health if str(t.get("id", "")).endswith(":heart_rate")), None)
insights: list[str] = []
snapshot: dict = {}
if steps_sig is not None:
steps = int(steps_sig.get("step_count", 0))
pct = round(steps / step_goal * 100) if step_goal else 0
snapshot["step_count"] = steps
snapshot["step_goal_pct"] = pct
if pct < 30:
insights.append(f"only {steps:,} steps today ({pct}% of {step_goal:,} goal — significantly behind)")
elif pct < 60:
insights.append(f"{steps:,} steps today ({pct}% of {step_goal:,} goal)")
elif pct >= 100:
insights.append(f"{steps:,} steps today (daily goal reached!)")
else:
insights.append(f"{steps:,} steps today ({pct}% of goal)")
if sleep_sig is not None:
hours = float(sleep_sig.get("sleep_hours", 0))
deficit = max(0.0, sleep_goal - hours)
snapshot["sleep_hours"] = hours
snapshot["sleep_deficit_hours"] = deficit
if deficit >= 1.5:
insights.append(f"only {hours:.1f}h sleep last night ({deficit:.1f}h below the {sleep_goal:.0f}h goal)")
elif deficit > 0:
insights.append(f"{hours:.1f}h sleep last night (slightly below {sleep_goal:.0f}h goal)")
else:
insights.append(f"{hours:.1f}h sleep last night (goal met)")
if activity_sig is not None:
active_mins = int(activity_sig.get("active_minutes", 0))
calories = int(activity_sig.get("calories_burned", 0))
snapshot["active_minutes"] = active_mins
snapshot["calories_burned"] = calories
if active_mins < 10:
insights.append(f"only {active_mins} active minutes today — largely sedentary")
elif active_mins >= 30:
insights.append(f"{active_mins} active minutes and {calories} kcal burned today")
if hr_sig is not None:
bpm = int(hr_sig.get("resting_bpm", 0))
snapshot["resting_bpm"] = bpm
if bpm > 90:
insights.append(f"elevated resting heart rate: {bpm} bpm")
elif bpm > 0:
insights.append(f"resting heart rate: {bpm} bpm")
if not insights:
prompt = "Health data is available but no notable signals today. (Always write the tip in English.)"
else:
body = "; ".join(insights)
prompt = f"Health snapshot: {body}. (Always write the tip in English.)"
return self._make_output(inp, prompt, snapshot)

View File

@@ -121,7 +121,7 @@ MANIFEST = AgentManifest(
}, },
}, },
context_schema=["profile.features"], context_schema=["profile.features"],
required_consents=["data:core", "agent:momentum"], required_consents=["data:core"],
output_contract={"type": "snippet", "format": "free_text"}, output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=21_600, ttl_sec=21_600,
inferred_params=[ inferred_params=[

View File

@@ -70,7 +70,7 @@ MANIFEST = AgentManifest(
}, },
}, },
context_schema=["todoist.tasks"], context_schema=["todoist.tasks"],
required_consents=["data:core", "data:todoist", "agent:overdue-task"], required_consents=["data:core", "data:todoist"],
output_contract={"type": "snippet", "format": "free_text"}, output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=3600, ttl_sec=3600,
silenced_in_contexts=["vacation"], silenced_in_contexts=["vacation"],

View File

@@ -131,7 +131,7 @@ MANIFEST = AgentManifest(
}, },
}, },
context_schema=["tip_feedback", "profile.features"], context_schema=["tip_feedback", "profile.features"],
required_consents=["data:core", "agent:recent-patterns"], required_consents=["data:core"],
output_contract={"type": "snippet", "format": "free_text"}, output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=86_400, ttl_sec=86_400,
inferred_params=[ inferred_params=[

View File

@@ -16,6 +16,7 @@ from .momentum import MomentumAgent, MANIFEST as MOMENTUM_MANIFEST
from .time_of_day import TimeOfDayAgent, MANIFEST as TIME_OF_DAY_MANIFEST from .time_of_day import TimeOfDayAgent, MANIFEST as TIME_OF_DAY_MANIFEST
from .recent_patterns import RecentPatternsAgent, MANIFEST as RECENT_PATTERNS_MANIFEST from .recent_patterns import RecentPatternsAgent, MANIFEST as RECENT_PATTERNS_MANIFEST
from .focus_area import FocusAreaAgent, MANIFEST as FOCUS_AREA_MANIFEST from .focus_area import FocusAreaAgent, MANIFEST as FOCUS_AREA_MANIFEST
from .health_vitals import HealthVitalsAgent, MANIFEST as HEALTH_VITALS_MANIFEST
_REGISTERED: list[tuple[BaseAgent, AgentManifest]] = [ _REGISTERED: list[tuple[BaseAgent, AgentManifest]] = [
(OverdueTaskAgent(), OVERDUE_TASK_MANIFEST), (OverdueTaskAgent(), OVERDUE_TASK_MANIFEST),
@@ -23,6 +24,7 @@ _REGISTERED: list[tuple[BaseAgent, AgentManifest]] = [
(TimeOfDayAgent(), TIME_OF_DAY_MANIFEST), (TimeOfDayAgent(), TIME_OF_DAY_MANIFEST),
(RecentPatternsAgent(), RECENT_PATTERNS_MANIFEST), (RecentPatternsAgent(), RECENT_PATTERNS_MANIFEST),
(FocusAreaAgent(), FOCUS_AREA_MANIFEST), (FocusAreaAgent(), FOCUS_AREA_MANIFEST),
(HealthVitalsAgent(), HEALTH_VITALS_MANIFEST),
] ]
# Sanity check — agent_id and manifest.id must agree, otherwise the registry # Sanity check — agent_id and manifest.id must agree, otherwise the registry

View File

@@ -213,40 +213,41 @@ class TestFocusAreaAgent:
out = self.agent.compute(_inp()) out = self.agent.compute(_inp())
assert "no tasks" in out.prompt_text.lower() assert "no tasks" in out.prompt_text.lower()
def test_single_project(self): def test_lists_all_clusters(self):
tasks = [_task(f"T{i}", project_id="Work") for i in range(3)]
out = self.agent.compute(_inp(tasks=tasks))
assert '"Work"' in out.prompt_text
assert "3 tasks" in out.prompt_text
def test_most_congested_wins(self):
tasks = ( tasks = (
[_task(f"W{i}", project_id="Work") for i in range(5)] [_task(f"W{i}", project_id="Work") for i in range(3)]
+ [_task(f"H{i}", project_id="Home") for i in range(2)] + [_task(f"H{i}", project_id="Home") for i in range(2)]
) )
out = self.agent.compute(_inp(tasks=tasks)) out = self.agent.compute(_inp(tasks=tasks))
assert '"Work"' in out.prompt_text assert "Work" in out.prompt_text
assert "Home" in out.prompt_text
def test_overdue_weighting(self): def test_includes_task_titles(self):
# Home has 2 tasks (1 overdue), Work has 3 non-overdue tasks tasks = [_task("Buy milk", project_id="Personal"), _task("Write report", project_id="Personal")]
# Home score = 2+1 = 3; Work score = 3 — Home should win due to overdue weight
tasks = (
[_task("Home1", project_id="Home", is_overdue=True),
_task("Home2", project_id="Home")]
+ [_task(f"W{i}", project_id="Work") for i in range(3)]
)
out = self.agent.compute(_inp(tasks=tasks)) out = self.agent.compute(_inp(tasks=tasks))
assert '"Work"' not in out.prompt_text or '"Home"' in out.prompt_text assert '"Buy milk"' in out.prompt_text
assert '"Write report"' in out.prompt_text
def test_task_count_in_output(self):
tasks = [_task(f"T{i}", project_id="Work") for i in range(3)]
out = self.agent.compute(_inp(tasks=tasks))
assert "3 task" in out.prompt_text
def test_default_project_fallback(self): def test_default_project_fallback(self):
out = self.agent.compute(_inp(tasks=[_task("No project task")])) out = self.agent.compute(_inp(tasks=[_task("No project task")]))
# Tasks without project_id fall back to a "Tasks" bucket
assert "Tasks" in out.prompt_text assert "Tasks" in out.prompt_text
def test_snapshot_keys(self): def test_snapshot_keys(self):
out = self.agent.compute(_inp(tasks=[_task("T1", project_id="A")])) out = self.agent.compute(_inp(tasks=[_task("T1", project_id="A")]))
assert {"top_cluster_label", "top_task_count", "top_overdue_count", "cluster_count", public_keys = {k for k in out.signals_snapshot if not k.startswith("_")}
"strategy", "preferred_areas"} == set(out.signals_snapshot) assert {"cluster_count", "clusters"} == public_keys
def test_snapshot_clusters_shape(self):
tasks = [_task("Buy milk", project_id="P1"), _task("Fix bug", project_id="P2")]
out = self.agent.compute(_inp(tasks=tasks))
clusters = out.signals_snapshot["clusters"]
assert isinstance(clusters, list)
assert all("label" in c and "task_count" in c and "tasks" in c for c in clusters)
# ── Registry ───────────────────────────────────────────────────────────────── # ── Registry ─────────────────────────────────────────────────────────────────
@@ -255,7 +256,7 @@ class TestRegistry:
def test_all_agents_present(self): def test_all_agents_present(self):
agents = all_agents() agents = all_agents()
ids = {a.agent_id for a in agents} ids = {a.agent_id for a in agents}
assert ids == {"overdue-task", "momentum", "time-of-day", "recent-patterns", "focus-area"} assert ids == {"overdue-task", "momentum", "time-of-day", "recent-patterns", "focus-area", "health-vitals"}
def test_get_agent(self): def test_get_agent(self):
a = get_agent("momentum") a = get_agent("momentum")

View File

@@ -1,6 +1,6 @@
"""Unit tests for ml.agents.clustering (issue #97). """Unit tests for ml.agents.clustering (issue #97, #129).
Embedding calls are mocked so tests run without Ollama. LLM and embedding calls are mocked so tests run without Ollama or LiteLLM.
""" """
from __future__ import annotations from __future__ import annotations
@@ -9,7 +9,7 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
from unittest.mock import patch from unittest.mock import patch
from ml.agents.clustering import cluster_tasks, Cluster, _greedy_cluster, _cosine from ml.agents.clustering import cluster_tasks, Cluster, _greedy_cluster, _cosine, _embed_batch, _enrich_batch
# ── helpers ────────────────────────────────────────────────────────────────── # ── helpers ──────────────────────────────────────────────────────────────────
@@ -82,54 +82,128 @@ class TestGreedyClustering:
assert clusters[0].label == "Write report" assert clusters[0].label == "Write report"
# ── enrichment ───────────────────────────────────────────────────────────────
class TestEnrichBatch:
def test_falls_back_to_raw_when_no_litellm_url(self, monkeypatch):
monkeypatch.delenv("LITELLM_URL", raising=False)
result, new = _enrich_batch(["Buy milk", "Fix bug"])
assert result == ["Buy milk", "Fix bug"] and new == {}
def test_uses_description_when_litellm_available(self, monkeypatch):
monkeypatch.setenv("LITELLM_URL", "http://fake-litellm")
with patch("ml.agents.clustering._enrich_title", return_value="Expanded description."):
result, new = _enrich_batch(["Buy milk"])
assert result == ["Expanded description."]
assert len(new) == 1
def test_falls_back_to_raw_title_on_enrich_failure(self, monkeypatch):
monkeypatch.setenv("LITELLM_URL", "http://fake-litellm")
with patch("ml.agents.clustering._enrich_title", return_value=None):
result, new = _enrich_batch(["Buy milk"])
assert result == ["Buy milk"]
assert new == {} # failed enrichments are not persisted
def test_deduplicates_identical_titles(self, monkeypatch):
monkeypatch.setenv("LITELLM_URL", "http://fake-litellm")
call_count = {"n": 0}
def fake_enrich(title, url):
call_count["n"] += 1
return f"desc:{title}"
with patch("ml.agents.clustering._enrich_title", side_effect=fake_enrich):
result, new = _enrich_batch(["Buy milk", "Buy milk", "Fix bug"])
assert call_count["n"] == 2 # only 2 unique titles
assert result == ["desc:Buy milk", "desc:Buy milk", "desc:Fix bug"]
def test_uses_persistent_cache(self, monkeypatch):
monkeypatch.setenv("LITELLM_URL", "http://fake-litellm")
from ml.agents.clustering import _content_hash
h = _content_hash("Buy milk")
call_count = {"n": 0}
def fake_enrich(title, url):
call_count["n"] += 1
return "new desc"
with patch("ml.agents.clustering._enrich_title", side_effect=fake_enrich):
result, new = _enrich_batch(["Buy milk"], persistent_cache={h: "cached desc"})
assert call_count["n"] == 0 # cache hit, no LLM call
assert result == ["cached desc"]
assert new == {}
# ── cluster_tasks integration ───────────────────────────────────────────────── # ── cluster_tasks integration ─────────────────────────────────────────────────
class TestClusterTasks: class TestClusterTasks:
def test_empty_tasks(self): def _no_enrich(self, titles, persistent_cache=None):
result = cluster_tasks([]) return titles, {}
assert result == []
def test_fallback_when_ollama_unavailable(self): def test_empty_tasks(self):
with patch("ml.agents.clustering._embed", return_value=None): clusters, new = cluster_tasks([])
assert clusters == [] and new == {}
def test_fallback_when_embed_unavailable(self):
with patch("ml.agents.clustering._enrich_batch", side_effect=self._no_enrich), \
patch("ml.agents.clustering._embed_batch", return_value=None):
tasks = [_task("A", "p1"), _task("B", "p2"), _task("C", "p1")] tasks = [_task("A", "p1"), _task("B", "p2"), _task("C", "p1")]
clusters = cluster_tasks(tasks) clusters, _ = cluster_tasks(tasks)
assert len(clusters) == 2 assert len(clusters) == 2
labels = {c.label for c in clusters} labels = {c.label for c in clusters}
assert "p1" in labels and "p2" in labels assert "p1" in labels and "p2" in labels
def test_fallback_groups_by_project(self): def test_fallback_groups_by_project(self):
with patch("ml.agents.clustering._embed", return_value=None): with patch("ml.agents.clustering._enrich_batch", side_effect=self._no_enrich), \
patch("ml.agents.clustering._embed_batch", return_value=None):
tasks = [_task("A", "work")] * 3 + [_task("B", "home")] * 2 tasks = [_task("A", "work")] * 3 + [_task("B", "home")] * 2
clusters = cluster_tasks(tasks) clusters, _ = cluster_tasks(tasks)
by_label = {c.label: c.task_count for c in clusters} by_label = {c.label: c.task_count for c in clusters}
assert by_label["work"] == 3 assert by_label["work"] == 3
assert by_label["home"] == 2 assert by_label["home"] == 2
def test_tasks_without_content_go_to_other(self): def test_tasks_without_content_go_to_other(self):
v = [1.0, 0.0] v = [1.0, 0.0]
with patch("ml.agents.clustering._embed", return_value=v): with patch("ml.agents.clustering._enrich_batch", side_effect=self._no_enrich), \
patch("ml.agents.clustering._embed_batch", return_value=[v]):
tasks = [_task("Has content"), {"is_overdue": False}] tasks = [_task("Has content"), {"is_overdue": False}]
clusters = cluster_tasks(tasks) clusters, _ = cluster_tasks(tasks)
labels = {c.label for c in clusters} labels = {c.label for c in clusters}
assert "Other tasks" in labels assert "Other tasks" in labels
def test_semantic_clustering_groups_similar(self): def test_semantic_clustering_groups_similar(self):
v_work = [1.0, 0.0, 0.0] v_work = [1.0, 0.0, 0.0]
v_home = [0.0, 1.0, 0.0] v_home = [0.0, 1.0, 0.0]
side_effects = [v_work, v_work, v_home, v_home] batch_result = [v_work, v_work, v_home, v_home]
with patch("ml.agents.clustering._embed", side_effect=side_effects): with patch("ml.agents.clustering._enrich_batch", side_effect=self._no_enrich), \
patch("ml.agents.clustering._embed_batch", return_value=batch_result):
tasks = [ tasks = [
_task("Write report"), _task("Write report"),
_task("Review PR"), _task("Review PR"),
_task("Buy groceries"), _task("Buy groceries"),
_task("Cook dinner"), _task("Cook dinner"),
] ]
clusters = cluster_tasks(tasks) clusters, _ = cluster_tasks(tasks)
assert len(clusters) == 2 assert len(clusters) == 2
assert all(c.task_count == 2 for c in clusters) assert all(c.task_count == 2 for c in clusters)
def test_all_tasks_no_content_fallback_by_project(self): def test_all_tasks_no_content_fallback_by_project(self):
tasks = [{"project_id": "p1", "is_overdue": False}, tasks = [{"project_id": "p1", "is_overdue": False},
{"project_id": "p2", "is_overdue": False}] {"project_id": "p2", "is_overdue": False}]
clusters = cluster_tasks(tasks) clusters, new = cluster_tasks(tasks)
assert len(clusters) == 2 assert len(clusters) == 2 and new == {}
def test_enrich_called_before_embed(self):
"""Verify enrichment output (not raw title) is what gets embedded."""
v = [1.0, 0.0]
captured = {}
def fake_embed(texts):
captured["texts"] = texts
return [v] * len(texts)
with patch("ml.agents.clustering._enrich_batch", return_value=(["Expanded desc."], {})), \
patch("ml.agents.clustering._embed_batch", side_effect=fake_embed):
cluster_tasks([_task("Buy milk")])
assert captured["texts"] == ["clustering: Expanded desc."]
def test_new_enrichments_returned(self):
v = [1.0, 0.0]
with patch("ml.agents.clustering._enrich_batch", return_value=(["desc"], {"abc123": "desc"})), \
patch("ml.agents.clustering._embed_batch", return_value=[v]):
_, new = cluster_tasks([_task("Buy milk")])
assert new == {"abc123": "desc"}

View File

@@ -45,6 +45,7 @@ def test_manifest_required_fields(agent_id: str):
assert isinstance(m.pref_schema, dict) and m.pref_schema.get("type") == "object" assert isinstance(m.pref_schema, dict) and m.pref_schema.get("type") == "object"
assert isinstance(m.required_consents, list) and m.required_consents assert isinstance(m.required_consents, list) and m.required_consents
assert "data:core" in m.required_consents, "every agent should require data:core" assert "data:core" in m.required_consents, "every agent should require data:core"
assert all(c.startswith("data:") for c in m.required_consents), "only data: consents allowed; agent: consents have been removed"
assert m.ttl_sec == get_agent(agent_id).ttl_seconds, "ttl divergence" assert m.ttl_sec == get_agent(agent_id).ttl_seconds, "ttl divergence"

View File

@@ -627,86 +627,37 @@ class TestTimeOfDaySnippet:
assert {"quiet_start", "quiet_end", "peak_hours", "tz"}.issubset(keys) assert {"quiet_start", "quiet_end", "peak_hours", "tz"}.issubset(keys)
# ── focus-area: preferred_areas wiring ─────────────────────────────────────── # ── focus-area: cluster summary output ───────────────────────────────────────
class TestFocusAreaPreferredAreas: class TestFocusAreaOutput:
agent = FocusAreaAgent() agent = FocusAreaAgent()
def _task(self, content: str, project_id: str, is_overdue: bool = False) -> dict: def _task(self, content: str, project_id: str) -> dict:
return {"id": "t1", "content": content, "is_overdue": is_overdue, return {"id": "t1", "content": content, "is_overdue": False,
"task_age_days": 2.0, "priority": 1, "project_id": project_id} "task_age_days": 2.0, "priority": 1, "project_id": project_id}
def test_preferred_area_wins_tie(self): def test_version(self):
tasks = [
self._task("Work thing", "work"),
self._task("Home thing", "home"),
]
out = self.agent.compute(_inp(tasks=tasks, agent_prefs={"preferred_areas": ["work"]}))
assert "work" in out.prompt_text
assert "matches the user's stated focus preferences" in out.prompt_text
def test_no_preferred_areas_uses_congestion_score(self):
tasks = [
self._task("W1", "work"),
self._task("H1", "home"),
self._task("H2", "home"),
]
out = self.agent.compute(_inp(tasks=tasks))
# home has more tasks → wins without any preference
assert "home" in out.prompt_text
def test_snapshot_includes_preferred_areas(self):
tasks = [self._task("T", "work")]
out = self.agent.compute(_inp(tasks=tasks, agent_prefs={"preferred_areas": ["work"]}))
assert out.signals_snapshot["preferred_areas"] == ["work"]
def test_version_bumped(self):
from ml.agents.focus_area import MANIFEST as FA_MANIFEST from ml.agents.focus_area import MANIFEST as FA_MANIFEST
assert FA_MANIFEST.version == "2.0.0" assert FA_MANIFEST.version == "3.0.0"
def test_snapshot_uses_cluster_keys(self): def test_all_clusters_in_output(self):
tasks = [self._task("Work thing", "work"), self._task("Home thing", "home")]
out = self.agent.compute(_inp(tasks=tasks))
assert "work" in out.prompt_text.lower()
assert "home" in out.prompt_text.lower()
def test_task_titles_in_output(self):
tasks = [self._task("Buy milk", "personal")]
out = self.agent.compute(_inp(tasks=tasks))
assert '"Buy milk"' in out.prompt_text
def test_snapshot_shape(self):
tasks = [self._task("T", "work")] tasks = [self._task("T", "work")]
out = self.agent.compute(_inp(tasks=tasks)) out = self.agent.compute(_inp(tasks=tasks))
assert "top_cluster_label" in out.signals_snapshot public_keys = {k for k in out.signals_snapshot if not k.startswith("_")}
assert "cluster_count" in out.signals_snapshot assert public_keys == {"cluster_count", "clusters"}
assert "strategy" in out.signals_snapshot assert isinstance(out.signals_snapshot["clusters"], list)
def test_no_inferred_params(self):
# ── focus-area: preferred_areas inference from task_completions (#113) ────────
class TestFocusAreaPreferredAreasInference:
from ml.agents.focus_area import MANIFEST as _FA_MANIFEST
def _completion(self, project_id: str) -> TaskCompletion:
return _completion(project_id, lateness_days=0.0)
def test_cold_start_no_completions(self):
history = _history(completions=[])
from ml.agents.focus_area import MANIFEST as FA_MANIFEST from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history) assert FA_MANIFEST.inferred_params == []
assert result["preferred_areas"] == []
def test_top_two_projects_returned(self):
completions = (
[_completion("p1", 0)] * 8
+ [_completion("p2", 0)] * 5
+ [_completion("p3", 0)] * 2
)
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["p1", "p2"]
def test_single_project_returns_one(self):
completions = [_completion("work", 0)] * 6
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["work"]
def test_none_project_id_ignored(self):
completions = [_completion(None, 0)] * 5 + [_completion("real", 0)] * 3
history = _history(completions=completions)
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
result = run_inference(FA_MANIFEST, history)
assert result["preferred_areas"] == ["real"]

View File

@@ -126,7 +126,7 @@ MANIFEST = AgentManifest(
}, },
}, },
context_schema=["profile.features"], context_schema=["profile.features"],
required_consents=["data:core", "agent:time-of-day"], required_consents=["data:core"],
output_contract={"type": "snippet", "format": "free_text"}, output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=900, ttl_sec=900,
inferred_params=[ inferred_params=[

View File

@@ -196,6 +196,12 @@ class AgentComputeRequest(BaseModel):
now_iso: Optional[str] = None # ISO 8601; defaults to utcnow now_iso: Optional[str] = None # ISO 8601; defaults to utcnow
# Per-agent prefs from user_preferences (merged: user source overrides inferred). # Per-agent prefs from user_preferences (merged: user source overrides inferred).
agent_prefs: dict = {} agent_prefs: dict = {}
# Pre-fetched enrichment cache: {content_hash -> description}. Avoids re-calling
# LiteLLM for task titles already expanded in a prior compute cycle.
enrichment_cache: dict[str, str] = {}
# MD5 of sorted task contents; stored in snapshot so the next cycle can skip
# recompute when the task list hasn't changed.
task_hash: Optional[str] = None
class AgentComputeResponse(BaseModel): class AgentComputeResponse(BaseModel):
@@ -206,6 +212,8 @@ class AgentComputeResponse(BaseModel):
computed_at: str computed_at: str
expires_at: str expires_at: str
agent_version: str agent_version: str
# New enrichments generated during this compute cycle; caller persists to DB.
new_enrichments: dict[str, str] = {}
class AgentInferRequest(BaseModel): class AgentInferRequest(BaseModel):
@@ -233,6 +241,7 @@ class RecommendRequest(BaseModel):
hour_of_day: int = 12 hour_of_day: int = 12
day_of_week: int = 0 day_of_week: int = 0
science_destiny: int = 50 # 0=science (data-driven), 100=destiny (intuitive) science_destiny: int = 50 # 0=science (data-driven), 100=destiny (intuitive)
recent_tip: Optional[str] = None # content of last snoozed tip; LLM avoids repeating it
class TipResult(BaseModel): class TipResult(BaseModel):
@@ -313,6 +322,7 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute
feedback_history=req.feedback_history, feedback_history=req.feedback_history,
now=now, now=now,
agent_prefs=req.agent_prefs, agent_prefs=req.agent_prefs,
enrichment_cache=req.enrichment_cache,
) )
try: try:
output = agent.compute(inp) output = agent.compute(inp)
@@ -320,6 +330,10 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute
log.error("agent_compute_failed", agent_id=agent_id, user_id=req.user_id, error=str(exc)) log.error("agent_compute_failed", agent_id=agent_id, user_id=req.user_id, error=str(exc))
raise HTTPException(status_code=500, detail=f"Agent compute failed: {exc}") raise HTTPException(status_code=500, detail=f"Agent compute failed: {exc}")
if req.task_hash:
output.signals_snapshot["_task_hash"] = req.task_hash
new_enrichments: dict[str, str] = output.signals_snapshot.pop("_new_enrichments", {})
log.info("agent_computed", agent_id=agent_id, user_id=req.user_id, expires_at=output.expires_at) log.info("agent_computed", agent_id=agent_id, user_id=req.user_id, expires_at=output.expires_at)
span = _start_span( span = _start_span(
f"compute:{agent_id}", f"compute:{agent_id}",
@@ -338,6 +352,7 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute
computed_at=output.computed_at, computed_at=output.computed_at,
expires_at=output.expires_at, expires_at=output.expires_at,
agent_version=output.agent_version, agent_version=output.agent_version,
new_enrichments=new_enrichments,
) )
@@ -430,6 +445,7 @@ async def recommend(req: RecommendRequest) -> RecommendResponse:
hour_of_day=req.hour_of_day, hour_of_day=req.hour_of_day,
day_of_week=req.day_of_week, day_of_week=req.day_of_week,
science_destiny=req.science_destiny, science_destiny=req.science_destiny,
recent_tip=req.recent_tip,
) )
_end_span(ctx_span, outputs={"message_count": len(messages)}) _end_span(ctx_span, outputs={"message_count": len(messages)})

View File

@@ -161,16 +161,21 @@ def build_orchestrator_messages(
hour_of_day: int, hour_of_day: int,
day_of_week: int, day_of_week: int,
science_destiny: int = 50, science_destiny: int = 50,
recent_tip: str | None = None,
) -> list[dict]: ) -> list[dict]:
"""Build the [system, user] message list for the orchestrator LLM call. """Build the [system, user] message list for the orchestrator LLM call.
agent_outputs: list of {agent_id, prompt_text} dicts. agent_outputs: list of {agent_id, prompt_text} dicts.
Falls back to raw task summary when agent_outputs is empty. Falls back to raw task summary when agent_outputs is empty.
recent_tip: content of a tip the user just snoozed — generate something different.
""" """
style_hint = _science_destiny_instruction(science_destiny) style_hint = _science_destiny_instruction(science_destiny)
system = _SYS_V4_ORCHESTRATOR + (f"\n\n{style_hint}" if style_hint else "") system = _SYS_V4_ORCHESTRATOR + (f"\n\n{style_hint}" if style_hint else "")
lines = [f"Current time: {hour_of_day:02d}:00, day_of_week={day_of_week}", ""] lines = [f"Current time: {hour_of_day:02d}:00, day_of_week={day_of_week}", ""]
if recent_tip:
lines.append(f"The user snoozed this tip (do NOT repeat it or anything similar): \"{recent_tip}\"")
lines.append("")
if agent_outputs: if agent_outputs:
lines.append("Context from analysis agents:") lines.append("Context from analysis agents:")
for s in agent_outputs: for s in agent_outputs:

View File

@@ -1,4 +1,4 @@
export type IntegrationProvider = 'todoist'; export type IntegrationProvider = 'todoist' | 'google-health';
export type IntegrationStatus = 'connected' | 'disconnected' | 'error'; export type IntegrationStatus = 'connected' | 'disconnected' | 'error';
export interface Integration { export interface Integration {

View File

@@ -2,7 +2,7 @@
export interface Signal { export interface Signal {
id: string; id: string;
source: string; // e.g. 'todoist', 'google-calendar', 'manual' source: string; // e.g. 'todoist', 'google-calendar', 'manual'
kind: 'task' | 'event' | 'habit' | 'insight'; kind: 'task' | 'event' | 'habit' | 'insight' | 'health';
content: string; content: string;
metadata: Record<string, unknown>; // source-specific raw fields metadata: Record<string, unknown>; // source-specific raw fields
features: Record<string, number | boolean>; // bandit-ready numeric/boolean features features: Record<string, number | boolean>; // bandit-ready numeric/boolean features

View File

@@ -85,3 +85,45 @@ describe('runMigrations — idempotency', () => {
}); });
}); });
describe('runMigrations — issue #127 backfill', () => {
it('grants data:<provider> consent for existing active integration tokens', () => {
const sqlite = freshDb();
runMigrations(sqlite);
// Seed a user + active Todoist token (simulates pre-#127 state)
sqlite.exec(`
INSERT INTO users (id, email, role, created_at) VALUES ('u2', 'u2@test.com', 'user', '2026-01-01T00:00:00Z');
INSERT INTO user_consents (user_id, consent_key, granted_at) VALUES ('u2', 'data:core', '2026-01-01T00:00:00Z');
INSERT INTO integration_tokens (id, user_id, provider, access_token, token_status, connected_at)
VALUES ('tok1', 'u2', 'todoist', 'secret', 'active', '2026-01-02T00:00:00Z');
`);
// Re-run migrations — the backfill should insert data:todoist
runMigrations(sqlite);
const rows = sqlite
.prepare(`SELECT consent_key FROM user_consents WHERE user_id = 'u2' ORDER BY consent_key`)
.all() as { consent_key: string }[];
expect(rows.map((r) => r.consent_key)).toEqual(['data:core', 'data:todoist']);
});
it('is idempotent — running twice does not duplicate consent rows', () => {
const sqlite = freshDb();
runMigrations(sqlite);
sqlite.exec(`
INSERT INTO users (id, email, role, created_at) VALUES ('u3', 'u3@test.com', 'user', '2026-01-01T00:00:00Z');
INSERT INTO integration_tokens (id, user_id, provider, access_token, token_status, connected_at)
VALUES ('tok2', 'u3', 'todoist', 'secret', 'active', '2026-01-02T00:00:00Z');
`);
runMigrations(sqlite);
runMigrations(sqlite);
const count = (sqlite
.prepare(`SELECT COUNT(*) as n FROM user_consents WHERE user_id = 'u3' AND consent_key = 'data:todoist'`)
.get() as { n: number }).n;
expect(count).toBe(1);
});
});

View File

@@ -149,6 +149,13 @@ export function runMigrations(handle: BetterSqlite3Database) {
CREATE INDEX IF NOT EXISTS idx_agent_outputs_user_agent_exp CREATE INDEX IF NOT EXISTS idx_agent_outputs_user_agent_exp
ON agent_outputs(user_id, agent_id, expires_at DESC); ON agent_outputs(user_id, agent_id, expires_at DESC);
CREATE TABLE IF NOT EXISTS task_enrichments (
content_hash TEXT PRIMARY KEY,
description TEXT NOT NULL,
model TEXT NOT NULL DEFAULT 'tip-generator',
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS user_preferences ( CREATE TABLE IF NOT EXISTS user_preferences (
user_id TEXT NOT NULL REFERENCES users(id), user_id TEXT NOT NULL REFERENCES users(id),
scope TEXT NOT NULL, scope TEXT NOT NULL,
@@ -208,6 +215,15 @@ export function runMigrations(handle: BetterSqlite3Database) {
`); `);
} catch { /* column already dropped — nothing to backfill */ } } catch { /* column already dropped — nothing to backfill */ }
// Backfill (issue #127): grant data:<provider> consent for every active integration token.
// Idempotent — INSERT OR IGNORE skips rows that already exist.
handle.exec(`
INSERT OR IGNORE INTO user_consents (user_id, consent_key, granted_at)
SELECT user_id, 'data:' || provider, connected_at
FROM integration_tokens
WHERE token_status = 'active'
`);
// Drop legacy consent columns (ADR-0014 step 8). Runs after the backfill above. // Drop legacy consent columns (ADR-0014 step 8). Runs after the backfill above.
// Silently skips if already dropped (column not found error) or never existed (new DB). // Silently skips if already dropped (column not found error) or never existed (new DB).
for (const stmt of [ for (const stmt of [

View File

@@ -189,6 +189,15 @@ export const agentOutputs = sqliteTable('agent_outputs', {
agentVersion: text('agent_version').notNull(), // bump to invalidate on logic changes agentVersion: text('agent_version').notNull(), // bump to invalidate on logic changes
}); });
// Persistent cache for LLM-enriched task descriptions used by clustering.
// Keyed by MD5 of raw task content; avoids re-calling LiteLLM on every agent compute cycle.
export const taskEnrichments = sqliteTable('task_enrichments', {
contentHash: text('content_hash').primaryKey(),
description: text('description').notNull(),
model: text('model').notNull().default('tip-generator'),
createdAt: text('created_at').notNull(),
});
// Admin saved SQL queries. // Admin saved SQL queries.
export const savedQueries = sqliteTable('saved_queries', { export const savedQueries = sqliteTable('saved_queries', {
id: text('id').primaryKey(), id: text('id').primaryKey(),

View File

@@ -1,8 +1,10 @@
/** /**
* Registry-driven agent eligibility filter (ADR-0014 step 5). * Registry-driven agent eligibility filter (ADR-0014 step 5, updated by ADR-0015).
* *
* Rules (all must pass for an agent to be eligible): * Rules (all must pass for an agent to be eligible):
* 1. All required_consents are granted and not revoked. * 1. Every data:<source> in required_consents is granted and not revoked.
* Consent is granted automatically when the user connects that data source.
* agent:<id> consents no longer exist — per-agent control is a preference (rule 3).
* 2. No silenced_in_contexts entry matches an active context. * 2. No silenced_in_contexts entry matches an active context.
* 3. user_preferences[scope='agent:<id>', key='enabled'] is not false. * 3. user_preferences[scope='agent:<id>', key='enabled'] is not false.
* *

View File

@@ -1,17 +1,19 @@
import { Router, type Request, type Response, type IRouter } from 'express'; import { Router, type Request, type Response, type IRouter } from 'express';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import { db } from '../db/index.js'; import { db } from '../db/index.js';
import { agentOutputs, tipFeedback, tipViews, userPreferences } from '../db/schema.js'; import { agentOutputs, tipFeedback, tipViews, userPreferences, taskEnrichments } from '../db/schema.js';
import { eq, and, gt, lt } from 'drizzle-orm'; import { eq, and, gt, lt, inArray } from 'drizzle-orm';
import crypto from 'node:crypto';
import { config } from '../config.js'; import { config } from '../config.js';
import { getProfile, type Profile } from '../profile/builder.js'; import { getProfile, type Profile } from '../profile/builder.js';
import { todoistSource } from '../signals/todoist.js'; import { todoistSource } from '../signals/todoist.js';
import { googleHealthSource } from '../signals/google-health.js';
import { SignalAggregator } from '../signals/aggregator.js'; import { SignalAggregator } from '../signals/aggregator.js';
const router: IRouter = Router(); const router: IRouter = Router();
// Separate aggregator instance — avoids circular dep with recommender.ts. // Separate aggregator instance — avoids circular dep with recommender.ts.
const _agentAggregator = new SignalAggregator().register(todoistSource); const _agentAggregator = new SignalAggregator().register(todoistSource).register(googleHealthSource);
// ── Internal auth helper ────────────────────────────────────────────────────── // ── Internal auth helper ──────────────────────────────────────────────────────
@@ -26,6 +28,33 @@ function checkInternalToken(req: Request, res: Response): boolean {
// ── DB helpers ──────────────────────────────────────────────────────────────── // ── DB helpers ────────────────────────────────────────────────────────────────
function contentHash(text: string): string {
return crypto.createHash('md5').update(text).digest('hex');
}
async function fetchEnrichmentCache(tasks: { content?: string }[]): Promise<Record<string, string>> {
const hashes = tasks
.map((t) => t.content?.trim())
.filter((c): c is string => !!c)
.map(contentHash);
if (!hashes.length) return {};
const rows = await db
.select({ contentHash: taskEnrichments.contentHash, description: taskEnrichments.description })
.from(taskEnrichments)
.where(inArray(taskEnrichments.contentHash, hashes));
return Object.fromEntries(rows.map((r) => [r.contentHash, r.description]));
}
async function persistEnrichments(newEntries: Record<string, string>): Promise<void> {
const now = new Date().toISOString();
for (const [hash, description] of Object.entries(newEntries)) {
await db
.insert(taskEnrichments)
.values({ contentHash: hash, description, createdAt: now })
.onConflictDoNothing();
}
}
export async function getActiveAgentOutputs(userId: string) { export async function getActiveAgentOutputs(userId: string) {
const now = new Date().toISOString(); const now = new Date().toISOString();
return db return db
@@ -126,22 +155,52 @@ async function persistInferredPrefs(
} }
} }
function taskListHash(tasks: { content?: string }[]): string {
const sorted = tasks
.map((t) => t.content?.trim() ?? '')
.filter(Boolean)
.sort()
.join('\n');
return crypto.createHash('md5').update(sorted).digest('hex');
}
async function isUpToDate(userId: string, agentId: string, currentHash: string): Promise<boolean> {
const rows = await db
.select({ signalsSnapshot: agentOutputs.signalsSnapshot })
.from(agentOutputs)
.where(and(eq(agentOutputs.userId, userId), eq(agentOutputs.agentId, agentId)))
.limit(1);
if (!rows.length) return false;
try {
const snapshot = JSON.parse(rows[0].signalsSnapshot ?? '{}') as { _task_hash?: string };
return snapshot._task_hash === currentHash;
} catch { return false; }
}
export async function computeAndStore(userId: string, agentId: string): Promise<void> { export async function computeAndStore(userId: string, agentId: string): Promise<void> {
let tasks: object[] = []; let tasks: object[] = [];
try { try {
const signals = await _agentAggregator.fetchAll(userId); const signals = await _agentAggregator.fetchAll(userId);
tasks = signals.map((s) => ({ tasks = signals.map((s) => ({
id: s.id, id: s.id,
source: s.source,
kind: s.kind,
content: s.content, content: s.content,
// Task-specific fields (default to harmless values for non-task signals)
priority: (s.features.priority as number) ?? 1, priority: (s.features.priority as number) ?? 1,
is_overdue: Boolean(s.features.is_overdue), is_overdue: Boolean(s.features.is_overdue),
task_age_days: (s.features.task_age_days as number) ?? 0, task_age_days: (s.features.task_age_days as number) ?? 0,
project_id: (s.metadata as Record<string, unknown>).project_id ?? null, project_id: (s.metadata as Record<string, unknown>).project_id ?? null,
// All features spread so source-specific agents (e.g. health-vitals) can read them
...s.features,
})); }));
} catch { } catch {
// No integration or fetch error — agents that need tasks will report "no tasks" // No integration or fetch error — agents that need tasks will report "no tasks"
} }
const currentTaskHash = taskListHash(tasks as { content?: string }[]);
if (await isUpToDate(userId, agentId, currentTaskHash)) return;
let profile: Profile = {}; let profile: Profile = {};
try { try {
profile = await getProfile(userId); profile = await getProfile(userId);
@@ -162,10 +221,13 @@ export async function computeAndStore(userId: string, agentId: string): Promise<
// Load agent prefs (user overrides + previous inferences) to inject into the compute call. // Load agent prefs (user overrides + previous inferences) to inject into the compute call.
const agentPrefs = await loadAgentPrefs(userId, agentId); const agentPrefs = await loadAgentPrefs(userId, agentId);
// Fetch enrichment cache for task titles present in this compute call.
const enrichmentCache = await fetchEnrichmentCache(tasks as { content?: string }[]);
const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, { const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, {
method: 'POST', method: 'POST',
headers: { 'Content-Type': 'application/json' }, headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs }), body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs, enrichment_cache: enrichmentCache, task_hash: currentTaskHash }),
signal: AbortSignal.timeout(60_000), signal: AbortSignal.timeout(60_000),
}); });
@@ -177,10 +239,16 @@ export async function computeAndStore(userId: string, agentId: string): Promise<
const output = await mlResp.json() as { const output = await mlResp.json() as {
user_id: string; agent_id: string; prompt_text: string; user_id: string; agent_id: string; prompt_text: string;
signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string; signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string;
new_enrichments?: Record<string, string>;
}; };
await storeAgentOutput(output); await storeAgentOutput(output);
// Persist any new enrichments produced during this compute cycle.
if (output.new_enrichments && Object.keys(output.new_enrichments).length > 0) {
await persistEnrichments(output.new_enrichments);
}
// Run inference framework for this agent and persist results. // Run inference framework for this agent and persist results.
// Failures are non-fatal — the compute result is already stored. // Failures are non-fatal — the compute result is already stored.
try { try {

View File

@@ -1,7 +1,7 @@
import { type Router as ExpressRouter, Router, Request, Response } from 'express'; import { type Router as ExpressRouter, Router, Request, Response } from 'express';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import { db } from '../db/index.js'; import { db } from '../db/index.js';
import { integrationTokens } from '../db/schema.js'; import { integrationTokens, userConsents } from '../db/schema.js';
import { eq, and } from 'drizzle-orm'; import { eq, and } from 'drizzle-orm';
import { config } from '../config.js'; import { config } from '../config.js';
import { requireAuth, AuthenticatedRequest } from '../middleware/session.js'; import { requireAuth, AuthenticatedRequest } from '../middleware/session.js';
@@ -12,9 +12,49 @@ const TODOIST_OAUTH_URL = 'https://todoist.com/oauth/authorize';
const TODOIST_TOKEN_URL = 'https://todoist.com/oauth/access_token'; const TODOIST_TOKEN_URL = 'https://todoist.com/oauth/access_token';
const TODOIST_SCOPES = 'data:read_write'; const TODOIST_SCOPES = 'data:read_write';
const GOOGLE_AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth';
const GOOGLE_TOKEN_URL = 'https://oauth2.googleapis.com/token';
const GOOGLE_REVOKE_URL = 'https://oauth2.googleapis.com/revoke';
const GOOGLE_HEALTH_SCOPES = [
'https://www.googleapis.com/auth/fitness.activity.read',
'https://www.googleapis.com/auth/fitness.body.read',
'https://www.googleapis.com/auth/fitness.sleep.read',
'https://www.googleapis.com/auth/fitness.heart_rate.read',
'https://www.googleapis.com/auth/fitness.nutrition.read',
'https://www.googleapis.com/auth/fitness.location.read',
'https://www.googleapis.com/auth/fitness.blood_glucose.read',
'https://www.googleapis.com/auth/fitness.blood_pressure.read',
'https://www.googleapis.com/auth/fitness.body_temperature.read',
'https://www.googleapis.com/auth/fitness.oxygen_saturation.read',
'https://www.googleapis.com/auth/fitness.reproductive_health.read',
].join(' ');
// In-memory CSRF state store // In-memory CSRF state store
const pendingStates = new Map<string, { userId: string; redirectTo: string }>(); const pendingStates = new Map<string, { userId: string; redirectTo: string }>();
async function grantDataSourceConsent(userId: string, provider: string): Promise<void> {
const consentKey = `data:${provider}`;
const now = new Date().toISOString();
await db.insert(userConsents)
.values({ userId, consentKey, grantedAt: now, revokedAt: null })
.onConflictDoUpdate({
target: [userConsents.userId, userConsents.consentKey],
set: { grantedAt: now, revokedAt: null },
});
}
async function revokeDataSourceConsent(userId: string, provider: string): Promise<void> {
const consentKey = `data:${provider}`;
const now = new Date().toISOString();
await db.insert(userConsents)
.values({ userId, consentKey, grantedAt: now, revokedAt: now })
.onConflictDoUpdate({
target: [userConsents.userId, userConsents.consentKey],
set: { revokedAt: now },
});
}
/** GET /api/integrations — list connected integrations */ /** GET /api/integrations — list connected integrations */
router.get('/', requireAuth, async (req: AuthenticatedRequest, res: Response) => { router.get('/', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
const tokens = await db const tokens = await db
@@ -100,10 +140,102 @@ router.get('/todoist/callback', async (req: Request, res: Response) => {
tokenStatus: 'active', tokenStatus: 'active',
connectedAt: now, connectedAt: now,
}); });
await grantDataSourceConsent(pending.userId, 'todoist');
res.redirect(`${config.WEB_BASE_URL}${pending.redirectTo}?connected=todoist`); res.redirect(`${config.WEB_BASE_URL}${pending.redirectTo}?connected=todoist`);
}); });
/** GET /api/integrations/google-health/connect — start Google Fit OAuth */
router.get('/google-health/connect', requireAuth, (req: AuthenticatedRequest, res: Response) => {
const state = nanoid();
pendingStates.set(state, {
userId: req.userId!,
redirectTo: (req.query.redirectTo as string) ?? '/connect',
});
setTimeout(() => pendingStates.delete(state), 10 * 60 * 1000);
const url = new URL(GOOGLE_AUTH_URL);
url.searchParams.set('client_id', config.GOOGLE_CLIENT_ID);
url.searchParams.set('redirect_uri', `${config.API_BASE_URL}/api/integrations/google-health/callback`);
url.searchParams.set('response_type', 'code');
url.searchParams.set('scope', GOOGLE_HEALTH_SCOPES);
url.searchParams.set('state', state);
url.searchParams.set('access_type', 'offline');
url.searchParams.set('prompt', 'consent');
res.redirect(url.toString());
});
/** GET /api/integrations/google-health/callback — Google returns here */
router.get('/google-health/callback', async (req: Request, res: Response) => {
const state = req.query.state as string;
const code = req.query.code as string;
const error = req.query.error as string | undefined;
if (error) {
res.status(400).json({ error: `Google denied access: ${error}` });
return;
}
const pending = pendingStates.get(state);
if (!pending) {
res.status(400).json({ error: 'Invalid or expired state' });
return;
}
pendingStates.delete(state);
const body = new URLSearchParams({
client_id: config.GOOGLE_CLIENT_ID,
client_secret: config.GOOGLE_CLIENT_SECRET,
code,
grant_type: 'authorization_code',
redirect_uri: `${config.API_BASE_URL}/api/integrations/google-health/callback`,
});
const tokenRes = await fetch(GOOGLE_TOKEN_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded', Accept: 'application/json' },
body: body.toString(),
});
if (!tokenRes.ok) {
const detail = await tokenRes.text().catch(() => '');
res.status(502).json({ error: `Failed to exchange Google token: ${detail}` });
return;
}
const tokenData = (await tokenRes.json()) as {
access_token: string;
refresh_token?: string;
expires_in: number;
};
const now = new Date();
const expiresAt = new Date(now.getTime() + tokenData.expires_in * 1000).toISOString();
await db
.delete(integrationTokens)
.where(
and(
eq(integrationTokens.userId, pending.userId),
eq(integrationTokens.provider, 'google-health'),
),
);
await db.insert(integrationTokens).values({
id: nanoid(),
userId: pending.userId,
provider: 'google-health',
accessToken: tokenData.access_token,
refreshToken: tokenData.refresh_token ?? null,
expiresAt,
tokenStatus: 'active',
connectedAt: now.toISOString(),
});
await grantDataSourceConsent(pending.userId, 'google-health');
res.redirect(`${config.WEB_BASE_URL}${pending.redirectTo}?connected=google-health`);
});
/** DELETE /api/integrations/:provider — revoke token */ /** DELETE /api/integrations/:provider — revoke token */
router.delete('/:provider', requireAuth, async (req: AuthenticatedRequest, res: Response) => { router.delete('/:provider', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
const provider = String(req.params.provider); const provider = String(req.params.provider);
@@ -120,13 +252,18 @@ router.delete('/:provider', requireAuth, async (req: AuthenticatedRequest, res:
.limit(1); .limit(1);
if (token?.provider === 'todoist') { if (token?.provider === 'todoist') {
// Best-effort revocation
await fetch('https://api.todoist.com/sync/v9/access_tokens/revoke', { await fetch('https://api.todoist.com/sync/v9/access_tokens/revoke', {
method: 'POST', method: 'POST',
headers: { Authorization: `Bearer ${token.accessToken}` }, headers: { Authorization: `Bearer ${token.accessToken}` },
}).catch(() => {}); }).catch(() => {});
} }
if (token?.provider === 'google-health') {
await fetch(`${GOOGLE_REVOKE_URL}?token=${token.accessToken}`, { method: 'POST' }).catch(() => {});
}
await revokeDataSourceConsent(req.userId!, provider);
await db await db
.delete(integrationTokens) .delete(integrationTokens)
.where( .where(

View File

@@ -7,9 +7,10 @@ import { eq, and, desc } from 'drizzle-orm';
import { requireAuth, AuthenticatedRequest } from '../middleware/session.js'; import { requireAuth, AuthenticatedRequest } from '../middleware/session.js';
import { config } from '../config.js'; import { config } from '../config.js';
import { bus } from '../events/bus.js'; import { bus } from '../events/bus.js';
import type { TipCandidate, Signal } from '@oo/shared-types'; import type { Tip, Signal } from '@oo/shared-types';
import { todoistSource, dueAgeDays } from '../signals/todoist.js'; import { todoistSource, dueAgeDays } from '../signals/todoist.js';
export { dueAgeDays }; export { dueAgeDays };
import { googleHealthSource } from '../signals/google-health.js';
import { SignalAggregator } from '../signals/aggregator.js'; import { SignalAggregator } from '../signals/aggregator.js';
import { getActiveAgentOutputs } from './agent-outputs.js'; import { getActiveAgentOutputs } from './agent-outputs.js';
import { getEligibleAgentIds } from '../profile/eligibility.js'; import { getEligibleAgentIds } from '../profile/eligibility.js';
@@ -19,35 +20,18 @@ const router: ExpressRouter = Router();
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Signal aggregator — register sources here as new integrations are added // Signal aggregator — register sources here as new integrations are added
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
export const aggregator = new SignalAggregator().register(todoistSource); export const aggregator = new SignalAggregator().register(todoistSource).register(googleHealthSource);
export const _clearSignalCacheForTests = () => todoistSource.clearCache(); export const _clearSignalCacheForTests = () => {
todoistSource.clearCache();
// --------------------------------------------------------------------------- googleHealthSource.clearCache();
// Signal → TipCandidate conversion };
// ---------------------------------------------------------------------------
function signalToCandidate(signal: Signal): TipCandidate {
return {
id: signal.id,
content: signal.content,
source: signal.source as TipCandidate['source'],
kind: signal.kind as TipCandidate['kind'],
sourceId: (signal.metadata.todoistId as string | undefined) ?? undefined,
createdAt: signal.timestamp,
features: signal.features,
};
}
function randomPolicy(candidates: TipCandidate[]): TipCandidate | null {
if (!candidates.length) return null;
return candidates[Math.floor(Math.random() * candidates.length)];
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Orchestrator: fetch agent snippets + call ml/serving /recommend // Orchestrator: fetch agent snippets + call ml/serving /recommend
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
interface OrchestratorResult { interface OrchestratorResult {
tip: TipCandidate; tip: Tip;
model: string | null; model: string | null;
agentIds: string[]; agentIds: string[];
} }
@@ -68,6 +52,7 @@ async function fetchOrchestratorTip(
hour: number, hour: number,
dayOfWeek: number, dayOfWeek: number,
traceparent?: string, traceparent?: string,
recentTip?: string,
): Promise<OrchestratorResult | null> { ): Promise<OrchestratorResult | null> {
const [allAgentRows, eligibleIds, scienceDestiny] = await Promise.all([ const [allAgentRows, eligibleIds, scienceDestiny] = await Promise.all([
getActiveAgentOutputs(userId), getActiveAgentOutputs(userId),
@@ -89,7 +74,7 @@ async function fetchOrchestratorTip(
const res = await fetch(`${config.ML_SERVING_URL}/recommend`, { const res = await fetch(`${config.ML_SERVING_URL}/recommend`, {
method: 'POST', method: 'POST',
headers: { 'Content-Type': 'application/json', ...(traceparent ? { traceparent } : {}) }, headers: { 'Content-Type': 'application/json', ...(traceparent ? { traceparent } : {}) },
body: JSON.stringify({ user_id: userId, agent_outputs: agentOutputs, tasks, hour_of_day: hour, day_of_week: dayOfWeek, science_destiny: scienceDestiny ?? 50 }), body: JSON.stringify({ user_id: userId, agent_outputs: agentOutputs, tasks, hour_of_day: hour, day_of_week: dayOfWeek, science_destiny: scienceDestiny ?? 50, recent_tip: recentTip ?? null }),
signal: AbortSignal.timeout(15_000), signal: AbortSignal.timeout(15_000),
}); });
if (!res.ok) return null; if (!res.ok) return null;
@@ -106,7 +91,6 @@ async function fetchOrchestratorTip(
kind: 'advice' as const, kind: 'advice' as const,
rationale: data.tip.rationale, rationale: data.tip.rationale,
createdAt: now, createdAt: now,
features: { is_overdue: false, task_age_days: 0, priority: 1 },
}, },
model: data.model ?? null, model: data.model ?? null,
agentIds: agentOutputs.map((a) => a.agent_id), agentIds: agentOutputs.map((a) => a.agent_id),
@@ -123,6 +107,7 @@ async function fetchOrchestratorTip(
router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Response) => { router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
const hour = new Date().getHours(); const hour = new Date().getHours();
const dayOfWeek = new Date().getDay(); const dayOfWeek = new Date().getDay();
const { recent_tip: recentTip } = req.body as { recent_tip?: string };
const anyToken = await db const anyToken = await db
.select({ id: integrationTokens.id }) .select({ id: integrationTokens.id })
@@ -138,16 +123,16 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
const signals = await aggregator.fetchAll(req.userId!); const signals = await aggregator.fetchAll(req.userId!);
const t0 = Date.now(); const t0 = Date.now();
const orchestrated = await fetchOrchestratorTip(req.userId!, signals, hour, dayOfWeek, req.traceparent); const orchestrated = await fetchOrchestratorTip(req.userId!, signals, hour, dayOfWeek, req.traceparent, recentTip);
const latencyMs = Date.now() - t0; const latencyMs = Date.now() - t0;
const tip = orchestrated?.tip ?? randomPolicy(signals.map(signalToCandidate)); if (!orchestrated) {
if (!tip) {
res.status(204).end(); res.status(204).end();
return; return;
} }
const policy = orchestrated ? 'orchestrator' : 'random'; const tip = orchestrated.tip;
const policy = 'orchestrator';
const servedAt = new Date().toISOString(); const servedAt = new Date().toISOString();
await db.insert(tipViews).values({ id: nanoid(), userId: req.userId!, tipId: tip.id, servedAt }); await db.insert(tipViews).values({ id: nanoid(), userId: req.userId!, tipId: tip.id, servedAt });
@@ -158,16 +143,12 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
tipId: tip.id, tipId: tip.id,
policy, policy,
mlScore: null, mlScore: null,
featuresJson: JSON.stringify( featuresJson: JSON.stringify({ agent_ids: orchestrated.agentIds, hour_of_day: hour, day_of_week: dayOfWeek }),
orchestrated candidateCount: 1,
? { agent_ids: orchestrated.agentIds, hour_of_day: hour, day_of_week: dayOfWeek }
: { ...tip.features, hour_of_day: hour, day_of_week: dayOfWeek },
),
candidateCount: orchestrated ? 1 : signals.length,
latencyMs, latencyMs,
servedAt, servedAt,
promptVersion: orchestrated ? 'v4-orchestrator' : null, promptVersion: 'v4-orchestrator',
llmModel: orchestrated ? orchestrated.model : null, llmModel: orchestrated.model,
tipKind: tip.kind ?? null, tipKind: tip.kind ?? null,
}); });

View File

@@ -0,0 +1,312 @@
import type { Signal, SignalSource } from '@oo/shared-types';
import { db } from '../db/index.js';
import { integrationTokens } from '../db/schema.js';
import { eq, and } from 'drizzle-orm';
import { bus } from '../events/bus.js';
import { config } from '../config.js';
import { logger } from '../logger.js';
const CACHE_TTL_MS = 5 * 60_000;
const FIT_AGGREGATE_URL = 'https://www.googleapis.com/fitness/v1/users/me/dataset:aggregate';
const FIT_SESSIONS_URL = 'https://www.googleapis.com/fitness/v1/users/me/sessions';
const GOOGLE_TOKEN_URL = 'https://oauth2.googleapis.com/token';
const STEP_DAILY_GOAL = 7_000;
const SLEEP_GOAL_HOURS = 7;
interface FitBucket {
dataset: Array<{
dataSourceId: string;
point: Array<{ value: Array<{ intVal?: number; fpVal?: number }> }>;
}>;
}
interface FitAggregateResponse {
bucket?: FitBucket[];
}
interface FitSession {
name: string;
startTimeMillis: string;
endTimeMillis: string;
activityType: number;
}
interface FitSessionsResponse {
session?: FitSession[];
}
async function refreshGoogleToken(
userId: string,
refreshToken: string,
): Promise<string | null> {
const body = new URLSearchParams({
client_id: config.GOOGLE_CLIENT_ID,
client_secret: config.GOOGLE_CLIENT_SECRET,
refresh_token: refreshToken,
grant_type: 'refresh_token',
});
const res = await fetch(GOOGLE_TOKEN_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: body.toString(),
});
if (!res.ok) return null;
const data = (await res.json()) as { access_token: string; expires_in: number };
const expiresAt = new Date(Date.now() + data.expires_in * 1000).toISOString();
await db
.update(integrationTokens)
.set({ accessToken: data.access_token, expiresAt, tokenStatus: 'active' })
.where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'google-health')));
return data.access_token;
}
function todayMidnightMs(): number {
const d = new Date();
d.setHours(0, 0, 0, 0);
return d.getTime();
}
function yesterdayIso(): string {
return new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
}
async function fetchAggregates(
token: string,
startMs: number,
endMs: number,
): Promise<FitAggregateResponse> {
const res = await fetch(FIT_AGGREGATE_URL, {
method: 'POST',
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
body: JSON.stringify({
aggregateBy: [
{ dataTypeName: 'com.google.step_count.delta' },
{ dataTypeName: 'com.google.calories.expended' },
{ dataTypeName: 'com.google.active_minutes' },
{ dataTypeName: 'com.google.heart_rate.bpm' },
],
bucketByTime: { durationMillis: endMs - startMs },
startTimeMillis: String(startMs),
endTimeMillis: String(endMs),
}),
});
if (!res.ok) throw new Error(`Fit aggregate: ${res.status}`);
return res.json() as Promise<FitAggregateResponse>;
}
async function fetchSleepSessions(token: string): Promise<FitSessionsResponse> {
const url = new URL(FIT_SESSIONS_URL);
url.searchParams.set('activityType', '72');
url.searchParams.set('startTime', yesterdayIso());
url.searchParams.set('endTime', new Date().toISOString());
const res = await fetch(url.toString(), {
headers: { Authorization: `Bearer ${token}` },
});
if (!res.ok) throw new Error(`Fit sessions: ${res.status}`);
return res.json() as Promise<FitSessionsResponse>;
}
function extractMetric(
bucket: FitBucket,
dataTypeName: string,
valueKey: 'intVal' | 'fpVal',
): number {
for (const ds of bucket.dataset) {
if (!ds.dataSourceId.includes(dataTypeName.replace('com.google.', '').replace('.', '_'))) continue;
for (const pt of ds.point) {
const v = pt.value[0];
if (v) return valueKey === 'intVal' ? (v.intVal ?? 0) : (v.fpVal ?? 0);
}
}
return 0;
}
function extractAnyMetric(
bucket: FitBucket,
typeSuffix: string,
valueKey: 'intVal' | 'fpVal',
): number {
for (const ds of bucket.dataset) {
if (!ds.dataSourceId.includes(typeSuffix)) continue;
const pt = ds.point[0];
if (pt?.value[0]) {
const v = pt.value[0];
return valueKey === 'intVal' ? (v.intVal ?? 0) : (v.fpVal ?? 0);
}
}
return 0;
}
export class GoogleHealthSignalSource implements SignalSource {
readonly id = 'google-health';
private cache = new Map<string, { signals: Signal[]; fetchedAt: number }>();
clearCache(userId?: string): void {
if (userId) this.cache.delete(userId);
else this.cache.clear();
}
async fetchSignals(userId: string): Promise<Signal[]> {
const entry = this.cache.get(userId);
if (entry && Date.now() - entry.fetchedAt < CACHE_TTL_MS) return entry.signals;
const [row] = await db
.select()
.from(integrationTokens)
.where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'google-health')))
.limit(1);
if (!row) return [];
let token = row.accessToken;
const isExpired = row.expiresAt && new Date(row.expiresAt).getTime() - Date.now() < 5 * 60_000;
if (isExpired && row.refreshToken) {
const refreshed = await refreshGoogleToken(userId, row.refreshToken);
if (!refreshed) {
logger.warn({ userId }, 'google-health: refresh failed');
await db
.update(integrationTokens)
.set({ tokenStatus: 'needs_reconnect' })
.where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'google-health')));
bus.publish('signals.integration.token_expired', {
userId,
provider: 'google-health',
detectedAt: new Date().toISOString(),
});
return entry?.signals ?? [];
}
token = refreshed;
}
try {
const startMs = todayMidnightMs();
const endMs = Date.now();
const [aggData, sleepData] = await Promise.all([
fetchAggregates(token, startMs, endMs),
fetchSleepSessions(token),
]);
const bucket = aggData.bucket?.[0];
const signals: Signal[] = [];
const now = new Date().toISOString();
if (bucket) {
// Steps
const steps = extractAnyMetric(bucket, 'step_count', 'intVal');
const stepGoalPct = Math.round((steps / STEP_DAILY_GOAL) * 100);
signals.push({
id: `google-health:steps`,
source: 'google-health',
kind: 'health',
content: `${steps.toLocaleString()} steps today (${stepGoalPct}% of ${STEP_DAILY_GOAL.toLocaleString()} goal)`,
metadata: { dataType: 'steps' },
features: {
step_count: steps,
step_goal_pct: stepGoalPct,
step_goal: STEP_DAILY_GOAL,
below_step_goal: steps < STEP_DAILY_GOAL,
},
timestamp: now,
});
// Calories + active minutes
const calories = Math.round(extractAnyMetric(bucket, 'calories', 'fpVal'));
const activeMinutes = extractAnyMetric(bucket, 'active_minutes', 'intVal');
signals.push({
id: `google-health:activity`,
source: 'google-health',
kind: 'health',
content: `${activeMinutes} active minutes, ${calories} calories burned today`,
metadata: { dataType: 'activity' },
features: {
active_minutes: activeMinutes,
calories_burned: calories,
sedentary: activeMinutes < 20,
},
timestamp: now,
});
// Heart rate
const bpm = Math.round(extractAnyMetric(bucket, 'heart_rate', 'fpVal'));
if (bpm > 0) {
signals.push({
id: `google-health:heart_rate`,
source: 'google-health',
kind: 'health',
content: `Resting heart rate: ${bpm} bpm`,
metadata: { dataType: 'heart_rate' },
features: { resting_bpm: bpm, elevated_hr: bpm > 90 },
timestamp: now,
});
}
}
// Sleep — find the most recent sleep session
if (sleepData.session?.length) {
const sorted = [...sleepData.session].sort(
(a, b) => Number(b.endTimeMillis) - Number(a.endTimeMillis),
);
const last = sorted[0]!;
const durationMs = Number(last.endTimeMillis) - Number(last.startTimeMillis);
const sleepHours = Math.round((durationMs / 3_600_000) * 10) / 10;
const belowGoal = sleepHours < SLEEP_GOAL_HOURS;
signals.push({
id: `google-health:sleep`,
source: 'google-health',
kind: 'health',
content: `${sleepHours}h sleep last night (${belowGoal ? 'below' : 'meets'} ${SLEEP_GOAL_HOURS}h goal)`,
metadata: { dataType: 'sleep', sessionName: last.name },
features: {
sleep_hours: sleepHours,
sleep_goal_hours: SLEEP_GOAL_HOURS,
sleep_deficit_hours: Math.max(0, SLEEP_GOAL_HOURS - sleepHours),
below_sleep_goal: belowGoal,
},
timestamp: now,
});
}
this.cache.set(userId, { signals, fetchedAt: Date.now() });
bus.publish('signals.task.synced', {
userId,
source: 'google-health',
count: signals.length,
syncedAt: now,
});
return signals;
} catch (err: unknown) {
const status = (err as { message?: string }).message;
if (status?.includes('401')) {
logger.warn({ userId }, 'google-health: token expired (401)');
if (row.refreshToken) {
await refreshGoogleToken(userId, row.refreshToken);
} else {
await db
.update(integrationTokens)
.set({ tokenStatus: 'needs_reconnect' })
.where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'google-health')));
bus.publish('signals.integration.token_expired', {
userId,
provider: 'google-health',
detectedAt: new Date().toISOString(),
});
}
} else {
logger.error({ userId, err }, 'google-health: fetch failed');
}
return entry?.signals ?? [];
}
}
}
export const googleHealthSource = new GoogleHealthSignalSource();