Compare commits
14 Commits
161e654027
...
772bb6e194
| Author | SHA1 | Date | |
|---|---|---|---|
| 772bb6e194 | |||
| 34925310cf | |||
| f66f337779 | |||
| f6b89fc849 | |||
| 12c956b588 | |||
| d12f11d29d | |||
| 9ddeea6cac | |||
| 08d08ad7b0 | |||
| 1ca2351488 | |||
| 4e9210fcef | |||
| 59c493323f | |||
| d4b40e2590 | |||
| a0a069c525 | |||
| d1f28666b0 |
82
CLAUDE.md
82
CLAUDE.md
@@ -92,7 +92,7 @@ oO generates tips through a multi-agent pipeline (ADR-0013): pre-compute agents
|
|||||||
| Alias | Model | Used by |
|
| Alias | Model | Used by |
|
||||||
|-------|-------|---------|
|
|-------|-------|---------|
|
||||||
| `tip-generator` | qwen2.5:1.5b (default) | `ml/serving` tip generation |
|
| `tip-generator` | qwen2.5:1.5b (default) | `ml/serving` tip generation |
|
||||||
| `embedder` | nomic-embed-text | task clustering, dedup |
|
| `embedder` | nomic-embed-text | task clustering (after LLM enrichment), dedup |
|
||||||
| `judge` | claude-haiku-4-5 (cloud, eval only) | offline sim |
|
| `judge` | claude-haiku-4-5 (cloud, eval only) | offline sim |
|
||||||
|
|
||||||
Env vars: `LITELLM_URL` (prod `https://llm.alogins.net`), `OLLAMA_URL` (Agap host, `http://host.docker.internal:11434` from containers).
|
Env vars: `LITELLM_URL` (prod `https://llm.alogins.net`), `OLLAMA_URL` (Agap host, `http://host.docker.internal:11434` from containers).
|
||||||
@@ -101,17 +101,83 @@ Ollama and LiteLLM are **shared Agap services**, not oO services — they live i
|
|||||||
|
|
||||||
All `httpx` calls in `ml/` must use `trust_env=False` to bypass the system proxy — same rule as `bw` and curl. Pattern: `httpx.Client(trust_env=False, timeout=N)`.
|
All `httpx` calls in `ml/` must use `trust_env=False` to bypass the system proxy — same rule as `bw` and curl. Pattern: `httpx.Client(trust_env=False, timeout=N)`.
|
||||||
|
|
||||||
MLflow container-to-container calls: always pass `host_header="localhost"` to `MLflowClient` — MLflow's `--allowed-hosts` rejects `Host: mlflow` (the container DNS name) with 403. Auth credential is `MLFLOW_ADMIN_PASSWORD`. MLflow REST API lives at the origin root (`/api/2.0/mlflow`), not under the `/mlflow` UI prefix.
|
MLflow container-to-container calls: always pass `host_header="localhost"` to `MLflowClient` — MLflow's `--allowed-hosts` rejects `Host: mlflow` (the container DNS name) with 403. Auth credential is `MLFLOW_ADMIN_PASSWORD`. MLflow REST API lives at the origin root, not under the `/mlflow` UI prefix.
|
||||||
|
|
||||||
|
### MLflow API versions — runs vs traces
|
||||||
|
|
||||||
|
MLflow uses **two API versions** — use the right one or you'll get 405:
|
||||||
|
|
||||||
|
| What | API prefix | Example |
|
||||||
|
|------|-----------|---------|
|
||||||
|
| Runs, experiments, metrics | `/api/2.0/mlflow/` | `runs/search`, `experiments/list` |
|
||||||
|
| Traces (LLM observability) | `/api/3.0/mlflow/traces/` | `traces/{trace_id}` |
|
||||||
|
|
||||||
|
**Experiment IDs:** `3` = oO/serving. Artifacts stored as run tags prefixed `artifact:<path>`.
|
||||||
|
|
||||||
|
### Querying from the host shell
|
||||||
|
|
||||||
|
Always strip the proxy and pass `Host: localhost` (no port — `localhost:5000` fails the DNS-rebinding check).
|
||||||
|
|
||||||
MLflow from the host shell — query with curl, no script needed:
|
|
||||||
```bash
|
```bash
|
||||||
|
# Search recent runs (experiment 3)
|
||||||
env -u HTTPS_PROXY -u HTTP_PROXY -u ALL_PROXY -u https_proxy -u http_proxy -u all_proxy \
|
env -u HTTPS_PROXY -u HTTP_PROXY -u ALL_PROXY -u https_proxy -u http_proxy -u all_proxy \
|
||||||
curl -s -H "Host: localhost" -u "admin:${MLFLOW_ADMIN_PASSWORD}" \
|
curl -s -H "Host: localhost" -u "admin:${MLFLOW_ADMIN_PASSWORD}" \
|
||||||
-X POST http://localhost:5000/api/2.0/mlflow/runs/search \
|
-X POST http://localhost:5000/api/2.0/mlflow/runs/search \
|
||||||
-H "Content-Type: application/json" \
|
-H "Content-Type: application/json" \
|
||||||
-d '{"experiment_ids":["3"],"max_results":1,"order_by":["start_time DESC"]}'
|
-d '{"experiment_ids":["3"],"max_results":5,"order_by":["start_time DESC"]}'
|
||||||
|
|
||||||
|
# Get a trace by ID (note: /api/3.0/, not /api/2.0/)
|
||||||
|
env -u HTTPS_PROXY -u HTTP_PROXY -u ALL_PROXY -u https_proxy -u http_proxy -u all_proxy \
|
||||||
|
curl -s -H "Host: localhost" -u "admin:${MLFLOW_ADMIN_PASSWORD}" \
|
||||||
|
http://localhost:5000/api/3.0/mlflow/traces/tr-<trace_id> | python3 -m json.tool
|
||||||
|
```
|
||||||
|
|
||||||
|
The trace response includes `trace_metadata.mlflow.traceInputs/Outputs`, `trace_metadata.mlflow.trace.sizeStats` (num_spans), and `tags.mlflow.traceName`.
|
||||||
|
|
||||||
|
### Getting spans (Python client from inside the container)
|
||||||
|
|
||||||
|
The REST API has **no endpoint for spans** — `/api/3.0/mlflow/traces/{id}/spans` returns 404. Use the Python client inside `oo-ml-serving-1`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker exec oo-ml-serving-1 python3 -c "
|
||||||
|
import mlflow, json, os
|
||||||
|
mlflow.set_tracking_uri('http://mlflow:5000')
|
||||||
|
os.environ['MLFLOW_TRACKING_USERNAME'] = 'admin'
|
||||||
|
os.environ['MLFLOW_TRACKING_PASSWORD'] = os.environ.get('MLFLOW_ADMIN_PASSWORD', '')
|
||||||
|
|
||||||
|
client = mlflow.tracking.MlflowClient()
|
||||||
|
trace = client.get_trace('tr-<trace_id>')
|
||||||
|
for span in trace.data.spans:
|
||||||
|
print(span.name, '| parent:', span.parent_id, '| status:', span.status)
|
||||||
|
print(' inputs:', json.dumps(span.inputs)[:200])
|
||||||
|
print(' outputs:', json.dumps(span.outputs)[:200])
|
||||||
|
print(' attrs:', span.attributes)
|
||||||
|
"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Span structure for a tip generation trace
|
||||||
|
|
||||||
|
A healthy `recommend` trace has 3 spans:
|
||||||
|
|
||||||
|
| Span | Type | Parent | Key attributes |
|
||||||
|
|------|------|--------|---------------|
|
||||||
|
| `recommend` | CHAIN | (root) | `agent_count`, `latency_ms`; inputs include `agent_ids` list |
|
||||||
|
| `build_context` | TOOL | recommend | `agent_count`, `task_count`, `science_destiny` |
|
||||||
|
| `llm_orchestrator` | LLM | recommend | `prompt_tokens`, `completion_tokens`, `model`, `attempts` |
|
||||||
|
|
||||||
|
### Diagnosing "no agents in trace"
|
||||||
|
|
||||||
|
If the trace shows `agent_ids: []` and `agent_count: 0` in the root span, and the orchestrator prompt says *"No pre-computed agent context available"*, it means the recommender found zero eligible snippets at request time. Causes:
|
||||||
|
|
||||||
|
1. **Agent compute hasn't run** — no `agent_outputs` rows for this user yet
|
||||||
|
2. **Snippets expired** — TTL elapsed since last compute
|
||||||
|
3. **Eligibility filter dropped all agents** — none passed the manifest-driven check
|
||||||
|
|
||||||
|
Diagnose with:
|
||||||
|
```bash
|
||||||
|
docker exec oo-api-1 psql "$DATABASE_URL" -c \
|
||||||
|
"SELECT agent_id, computed_at, expires_at FROM agent_outputs WHERE user_id='<uid>' ORDER BY computed_at DESC LIMIT 10;"
|
||||||
```
|
```
|
||||||
`Host: localhost` required (no port) — `localhost:5000` fails the DNS-rebinding check. Experiment IDs: `3`=oO/serving. Artifacts stored as run tags prefixed `artifact:<path>`.
|
|
||||||
|
|
||||||
**Multi-agent tip generation pipeline (ADR-0013):**
|
**Multi-agent tip generation pipeline (ADR-0013):**
|
||||||
1. Pre-compute agents (`ml/agents/<id>/`) run on a schedule, each emitting a snippet into `agent_outputs` with a per-agent TTL
|
1. Pre-compute agents (`ml/agents/<id>/`) run on a schedule, each emitting a snippet into `agent_outputs` with a per-agent TTL
|
||||||
@@ -131,7 +197,7 @@ Recent completions:
|
|||||||
- ADR-0012 — ε-greedy v2 (D=12) — 2026-04-26 (now superseded by ADR-0013)
|
- ADR-0012 — ε-greedy v2 (D=12) — 2026-04-26 (now superseded by ADR-0013)
|
||||||
- ADR-0014 complete: unified Profile schema + backfill, manifest plumbing, `/api/profile` read-through, registry-driven eligibility filter, inference framework + per-agent inference, legacy consent column drop — 2026-05-05
|
- ADR-0014 complete: unified Profile schema + backfill, manifest plumbing, `/api/profile` read-through, registry-driven eligibility filter, inference framework + per-agent inference, legacy consent column drop — 2026-05-05
|
||||||
- Rich per-agent inference for all four active agents (#112, #114, #115, #116) — 2026-05-06: quiet/peak hours (time-of-day), z-score baseline (momentum), p50 lateness + project realness (overdue-task), adaptive lookback + weekly/daily cycles (recent-patterns)
|
- Rich per-agent inference for all four active agents (#112, #114, #115, #116) — 2026-05-06: quiet/peak hours (time-of-day), z-score baseline (momentum), p50 lateness + project realness (overdue-task), adaptive lookback + weekly/daily cycles (recent-patterns)
|
||||||
- Semantic task clustering via nomic-embed-text + focus-area preferred_areas inference (#97, #113) — 2026-05-06: `ml/agents/clustering.py`, focus-area v2.0.0
|
- Semantic task clustering via nomic-embed-text + LLM enrichment (#97, #113, #129) — 2026-05-12: `ml/agents/clustering.py`; titles expanded via `tip-generator` before embedding; persistent cache in `task_enrichments` table; recompute gated on task-list hash change; focus-area v3.0.0 outputs all clusters with enriched descriptions
|
||||||
|
|
||||||
- Per-user feature freshness SLAs (#61) — 2026-05-06: `invalidated_by` mirrored into `ProfileFeature`; drift-detection test added
|
- Per-user feature freshness SLAs (#61) — 2026-05-06: `invalidated_by` mirrored into `ProfileFeature`; drift-detection test added
|
||||||
- MLflow tracing added to `ml/serving` for all agent calls — 2026-05-06: `ml/serving/mlflow_client.py`; activated by `MLFLOW_TRACKING_URI=http://mlflow:5000` (default in compose `full` profile); requires `--profile mlops` for the MLflow container. Issue #118 (M4) tracks removal from production critical path.
|
- MLflow tracing added to `ml/serving` for all agent calls — 2026-05-06: `ml/serving/mlflow_client.py`; activated by `MLFLOW_TRACKING_URI=http://mlflow:5000` (default in compose `full` profile); requires `--profile mlops` for the MLflow container. Issue #118 (M4) tracks removal from production critical path.
|
||||||
@@ -157,7 +223,7 @@ Lives in `ml/agents/inference/`. `run_inference(manifest, history)` evaluates al
|
|||||||
- `infer()` error → emit `cold_start_default` (never crashes)
|
- `infer()` error → emit `cold_start_default` (never crashes)
|
||||||
- Results written to `user_preferences` with `source='inferred'`; keys with `source='user'` are never overwritten
|
- Results written to `user_preferences` with `source='inferred'`; keys with `source='user'` are never overwritten
|
||||||
|
|
||||||
All five agents are at v1.2.0. Per-agent inferred params (all live in `ml/agents/<name>.py`):
|
Per-agent inferred params (all live in `ml/agents/<name>.py`):
|
||||||
|
|
||||||
| Agent | Inferred params | Notes |
|
| Agent | Inferred params | Notes |
|
||||||
|-------|----------------|-------|
|
|-------|----------------|-------|
|
||||||
@@ -165,7 +231,7 @@ All five agents are at v1.2.0. Per-agent inferred params (all live in `ml/agents
|
|||||||
| `momentum` | `engagement_trend`, `baseline_completions_per_day`, `stdev` | Baseline = 28d rolling mean done/day; snippet uses z-score language |
|
| `momentum` | `engagement_trend`, `baseline_completions_per_day`, `stdev` | Baseline = 28d rolling mean done/day; snippet uses z-score language |
|
||||||
| `overdue-task` | `lateness_tolerance_days`, `project_realness` | Tolerance = p50 lateness from TaskCompletion history; realness = project median vs global median |
|
| `overdue-task` | `lateness_tolerance_days`, `project_realness` | Tolerance = p50 lateness from TaskCompletion history; realness = project median vs global median |
|
||||||
| `recent-patterns` | `lookback_days`, `weekly_cycle`, `daily_cycle` | Lookback sized to ≥30 done events; cycles use peak-to-mean ratio; snippet hints when strength > 0.5 |
|
| `recent-patterns` | `lookback_days`, `weekly_cycle`, `daily_cycle` | Lookback sized to ≥30 done events; cycles use peak-to-mean ratio; snippet hints when strength > 0.5 |
|
||||||
| `focus-area` | `preferred_areas` | Top-2 project IDs by task completion count; semantic clustering via `ml/agents/clustering.py` in compute() |
|
| `focus-area` | *(none)* | No inferred params. Clusters tasks via LLM-enriched embeddings and outputs all areas with expanded descriptions. Recomputes only when task list changes (hash-gated). |
|
||||||
|
|
||||||
`UserHistory` carries both `events: list[FeedbackEvent]` and `task_completions: list[TaskCompletion]`. `AgentInferRequest` (ml/serving) accepts `task_completions: list[dict]` alongside `feedback_history`.
|
`UserHistory` carries both `events: list[FeedbackEvent]` and `task_completions: list[TaskCompletion]`. `AgentInferRequest` (ml/serving) accepts `task_completions: list[dict]` alongside `feedback_history`.
|
||||||
|
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import type { NextRequest } from 'next/server';
|
|||||||
export async function middleware(req: NextRequest) {
|
export async function middleware(req: NextRequest) {
|
||||||
const { pathname } = req.nextUrl;
|
const { pathname } = req.nextUrl;
|
||||||
|
|
||||||
// Pass through the login page and API calls
|
// Pass through the login page, forbidden page, and API calls
|
||||||
if (pathname.startsWith('/login') || pathname.startsWith('/api/')) {
|
if (pathname.startsWith('/login') || pathname.startsWith('/forbidden') || pathname.startsWith('/api/')) {
|
||||||
return NextResponse.next();
|
return NextResponse.next();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -40,11 +40,11 @@ export default function TipPage() {
|
|||||||
}
|
}
|
||||||
}, [state]);
|
}, [state]);
|
||||||
|
|
||||||
const loadTip = useCallback(async () => {
|
const loadTip = useCallback(async (recentTip?: string) => {
|
||||||
setVisible(false);
|
setVisible(false);
|
||||||
setState('loading');
|
setState('loading');
|
||||||
try {
|
try {
|
||||||
const rec = await getRecommendation();
|
const rec = await getRecommendation(recentTip);
|
||||||
if (!rec) {
|
if (!rec) {
|
||||||
setState('empty');
|
setState('empty');
|
||||||
return;
|
return;
|
||||||
@@ -62,10 +62,11 @@ export default function TipPage() {
|
|||||||
|
|
||||||
const react = async (action: 'done' | 'dismiss' | 'snooze') => {
|
const react = async (action: 'done' | 'dismiss' | 'snooze') => {
|
||||||
if (!tip) return;
|
if (!tip) return;
|
||||||
|
const snoozedContent = action === 'snooze' ? tip.content : undefined;
|
||||||
setVisible(false);
|
setVisible(false);
|
||||||
setState('done');
|
setState('done');
|
||||||
await sendFeedback(tip.id, { action });
|
await sendFeedback(tip.id, { action });
|
||||||
setTimeout(() => loadTip(), 700);
|
setTimeout(() => loadTip(snoozedContent), 700);
|
||||||
};
|
};
|
||||||
|
|
||||||
const onPointerDown = () => {
|
const onPointerDown = () => {
|
||||||
@@ -170,7 +171,7 @@ export default function TipPage() {
|
|||||||
All clear.
|
All clear.
|
||||||
</p>
|
</p>
|
||||||
<button
|
<button
|
||||||
onClick={loadTip}
|
onClick={() => loadTip()}
|
||||||
style={{
|
style={{
|
||||||
marginTop: '2rem',
|
marginTop: '2rem',
|
||||||
background: 'transparent',
|
background: 'transparent',
|
||||||
|
|||||||
@@ -23,9 +23,12 @@ export async function getSession() {
|
|||||||
return apiFetch<{ user: { id: string; email: string; name?: string; image?: string } | null }>('/auth/session');
|
return apiFetch<{ user: { id: string; email: string; name?: string; image?: string } | null }>('/auth/session');
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getRecommendation(): Promise<RecommendResponse | null> {
|
export async function getRecommendation(recentTip?: string): Promise<RecommendResponse | null> {
|
||||||
try {
|
try {
|
||||||
return await apiFetch<RecommendResponse>('/recommend', { method: 'POST' });
|
return await apiFetch<RecommendResponse>('/recommend', {
|
||||||
|
method: 'POST',
|
||||||
|
body: JSON.stringify(recentTip ? { recent_tip: recentTip } : {}),
|
||||||
|
});
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
if (e.status === 204 || e.status === 422) return null;
|
if (e.status === 204 || e.status === 422) return null;
|
||||||
throw e;
|
throw e;
|
||||||
|
|||||||
44
docs/adr/0015-data-source-consents.md
Normal file
44
docs/adr/0015-data-source-consents.md
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
# ADR-0015 — Data-source consents only; drop per-agent consent gate
|
||||||
|
|
||||||
|
**Date:** 2026-05-11
|
||||||
|
**Status:** Accepted
|
||||||
|
**Supersedes:** ADR-0014 §3 (consent model)
|
||||||
|
|
||||||
|
## Context
|
||||||
|
|
||||||
|
ADR-0014 introduced `required_consents` on agent manifests. In practice two
|
||||||
|
unrelated concepts were mixed into that field:
|
||||||
|
|
||||||
|
- `data:<source>` — which data source the agent reads.
|
||||||
|
- `agent:<id>` — whether the user opted into this specific agent.
|
||||||
|
|
||||||
|
No UI ever granted `agent:<id>` consents, so the eligibility filter at
|
||||||
|
`services/api/src/profile/eligibility.ts` dropped every agent for every real
|
||||||
|
user. The symptom was confirmed by MLflow trace
|
||||||
|
`tr-591449ea8a72af8e81b6a585234a86ab`: user `ODGp4Gkr7JWemMsqcMLMn` had five
|
||||||
|
fresh `agent_outputs` rows but the orchestrator received `agent_ids: []`.
|
||||||
|
|
||||||
|
## Decision
|
||||||
|
|
||||||
|
Collapse to a single consent dimension: **data source**.
|
||||||
|
|
||||||
|
1. `required_consents` entries must all start with `data:`. Agent manifests no
|
||||||
|
longer list `agent:<id>` entries.
|
||||||
|
2. Connecting a data source via the OAuth flow automatically grants
|
||||||
|
`data:<provider>` in `user_consents`. Disconnecting sets `revoked_at`.
|
||||||
|
3. `data:core` continues to be auto-granted on signup.
|
||||||
|
4. Per-agent control becomes a **preference** (`user_preferences[scope='agent:<id>', key='enabled']`), not a consent. The eligibility filter already honours this — the only change is removing the `agent:*` consent check that was always failing.
|
||||||
|
5. Eligibility rule (final): an agent is eligible iff every `data:*` it
|
||||||
|
declares is granted and not revoked, no active context is in
|
||||||
|
`silenced_in_contexts`, and the `enabled` preference is not `false`.
|
||||||
|
|
||||||
|
## Consequences
|
||||||
|
|
||||||
|
- Agents that only require `data:core` (time-of-day, momentum, recent-patterns)
|
||||||
|
become eligible immediately after signup.
|
||||||
|
- Agents requiring `data:todoist` or `data:google-health` become eligible as
|
||||||
|
soon as the user connects the integration — no extra consent step.
|
||||||
|
- A backfill migration grants `data:<provider>` for every existing active
|
||||||
|
`integration_tokens` row, unblocking users who connected before this change.
|
||||||
|
- `ml/agents/tests/test_manifest.py` asserts all `required_consents` start
|
||||||
|
with `data:`, preventing regression.
|
||||||
@@ -16,7 +16,7 @@ COPY pnpm-lock.yaml ./
|
|||||||
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch
|
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch
|
||||||
COPY . .
|
COPY . .
|
||||||
RUN --mount=type=cache,id=pnpm,target=/pnpm/store \
|
RUN --mount=type=cache,id=pnpm,target=/pnpm/store \
|
||||||
pnpm install --frozen-lockfile --offline \
|
pnpm install --frozen-lockfile \
|
||||||
--filter @oo/api... --filter @oo/shared-types
|
--filter @oo/api... --filter @oo/shared-types
|
||||||
RUN pnpm --filter @oo/shared-types build
|
RUN pnpm --filter @oo/shared-types build
|
||||||
RUN pnpm --filter @oo/api build
|
RUN pnpm --filter @oo/api build
|
||||||
|
|||||||
@@ -20,6 +20,9 @@ class AgentInput:
|
|||||||
# precedence over 'inferred' source; the caller resolves priority before
|
# precedence over 'inferred' source; the caller resolves priority before
|
||||||
# passing this dict in.
|
# passing this dict in.
|
||||||
agent_prefs: dict = field(default_factory=dict)
|
agent_prefs: dict = field(default_factory=dict)
|
||||||
|
# Pre-fetched enrichment cache: {content_hash -> description}. Populated by
|
||||||
|
# the TS caller from the task_enrichments DB table to avoid redundant LLM calls.
|
||||||
|
enrichment_cache: dict = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|||||||
@@ -1,14 +1,24 @@
|
|||||||
"""Semantic task clustering via nomic-embed-text (issue #97).
|
"""Semantic task clustering via nomic-embed-text (issue #97, #129).
|
||||||
|
|
||||||
Public API:
|
Public API:
|
||||||
cluster_tasks(tasks, ollama_url) -> list[Cluster]
|
cluster_tasks(tasks) -> list[Cluster]
|
||||||
|
|
||||||
Each task dict must have a "content" key. Tasks without content are placed in a
|
Each task dict must have a "content" key. Tasks without content are placed in a
|
||||||
fallback "other" bucket. If Ollama is unreachable, falls back to grouping by
|
fallback "other" bucket. If the embedding service is unreachable, falls back to
|
||||||
project_id so compute() always returns something useful.
|
grouping by project_id so compute() always returns something useful.
|
||||||
|
|
||||||
|
Pipeline (ported from taskpile experiments/clustering_eval, prompt v1):
|
||||||
|
1. Expand each raw title via LiteLLM `tip-generator` (qwen2.5:1.5b) into a
|
||||||
|
3-sentence description. Cached in-memory by content hash within a compute
|
||||||
|
cycle so duplicate titles cost one LLM call.
|
||||||
|
2. Prefix the expanded text with "clustering: " (nomic-embed-text task prefix).
|
||||||
|
3. Batch-embed via LiteLLM `embedder` (nomic-embed-text).
|
||||||
|
Falls back to embedding raw titles when LLM expansion fails, and to
|
||||||
|
project-based grouping when embeddings are unavailable.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import hashlib
|
||||||
import logging
|
import logging
|
||||||
import math
|
import math
|
||||||
import os
|
import os
|
||||||
@@ -22,7 +32,17 @@ log = logging.getLogger(__name__)
|
|||||||
_SIM_THRESHOLD = 0.72
|
_SIM_THRESHOLD = 0.72
|
||||||
# Never produce more than this many clusters regardless of task count.
|
# Never produce more than this many clusters regardless of task count.
|
||||||
_MAX_CLUSTERS = 6
|
_MAX_CLUSTERS = 6
|
||||||
_EMBED_TIMEOUT = 10.0
|
_EMBED_TIMEOUT = 15.0
|
||||||
|
_ENRICH_TIMEOUT = 30.0
|
||||||
|
|
||||||
|
_ENRICH_PROMPT_V1 = (
|
||||||
|
"You are helping categorize a personal task. "
|
||||||
|
"Write exactly 3 sentences in English describing what the task likely involves, "
|
||||||
|
"what context or skills it needs, and why it might matter. "
|
||||||
|
"Be concise and specific. Do not use bullet points or numbering.\n"
|
||||||
|
"Task: {title}\n"
|
||||||
|
"Description:"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -39,20 +59,132 @@ class Cluster:
|
|||||||
return sum(1 for t in self.tasks if t.get("is_overdue"))
|
return sum(1 for t in self.tasks if t.get("is_overdue"))
|
||||||
|
|
||||||
|
|
||||||
def _embed(text: str, ollama_url: str) -> list[float] | None:
|
# ---------------------------------------------------------------------------
|
||||||
|
# LLM enrichment
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _content_hash(text: str) -> str:
|
||||||
|
return hashlib.md5(text.encode()).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def _enrich_title(title: str, litellm_url: str) -> str | None:
|
||||||
|
"""Expand a terse task title into a 3-sentence description via LiteLLM."""
|
||||||
|
try:
|
||||||
|
with httpx.Client(trust_env=False, timeout=_ENRICH_TIMEOUT) as c:
|
||||||
|
r = c.post(
|
||||||
|
f"{litellm_url}/chat/completions",
|
||||||
|
json={
|
||||||
|
"model": "tip-generator",
|
||||||
|
"messages": [{"role": "user", "content": _ENRICH_PROMPT_V1.format(title=title)}],
|
||||||
|
"max_tokens": 120,
|
||||||
|
"temperature": 0.3,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
r.raise_for_status()
|
||||||
|
return r.json()["choices"][0]["message"]["content"].strip()
|
||||||
|
except Exception as exc:
|
||||||
|
log.debug("enrich_failed title=%r error=%s", title[:40], exc)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _enrich_batch(
|
||||||
|
titles: list[str],
|
||||||
|
persistent_cache: dict[str, str] | None = None,
|
||||||
|
) -> tuple[list[str], dict[str, str]]:
|
||||||
|
"""Return (descriptions, new_entries) for each title.
|
||||||
|
|
||||||
|
Checks persistent_cache (pre-fetched from DB) first, then falls back to
|
||||||
|
calling LiteLLM. new_entries contains only hashes generated this call —
|
||||||
|
the caller should persist these to the DB.
|
||||||
|
"""
|
||||||
|
litellm_url = os.getenv("LITELLM_URL")
|
||||||
|
if not litellm_url:
|
||||||
|
log.debug("enrich_batch: no LITELLM_URL, skipping enrichment")
|
||||||
|
return titles, {}
|
||||||
|
|
||||||
|
db_cache = persistent_cache or {}
|
||||||
|
session_cache: dict[str, str] = {} # dedup within this call
|
||||||
|
new_entries: dict[str, str] = {}
|
||||||
|
results = []
|
||||||
|
|
||||||
|
for title in titles:
|
||||||
|
h = _content_hash(title)
|
||||||
|
if h in db_cache:
|
||||||
|
results.append(db_cache[h])
|
||||||
|
elif h in session_cache:
|
||||||
|
results.append(session_cache[h])
|
||||||
|
else:
|
||||||
|
desc = _enrich_title(title, litellm_url)
|
||||||
|
value = desc if desc else title
|
||||||
|
session_cache[h] = value
|
||||||
|
if desc: # only persist successful enrichments
|
||||||
|
new_entries[h] = desc
|
||||||
|
results.append(value)
|
||||||
|
|
||||||
|
return results, new_entries
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Embedding
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _embed_via_litellm(texts: list[str], litellm_url: str) -> list[list[float]] | None:
|
||||||
|
"""Batch embed via LiteLLM OpenAI-compatible /embeddings endpoint."""
|
||||||
try:
|
try:
|
||||||
with httpx.Client(trust_env=False, timeout=_EMBED_TIMEOUT) as c:
|
with httpx.Client(trust_env=False, timeout=_EMBED_TIMEOUT) as c:
|
||||||
r = c.post(
|
r = c.post(
|
||||||
f"{ollama_url}/api/embeddings",
|
f"{litellm_url}/embeddings",
|
||||||
json={"model": "nomic-embed-text", "prompt": text, "keep_alive": 0},
|
json={"model": "embedder", "input": texts},
|
||||||
)
|
)
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
return r.json().get("embedding")
|
data = r.json().get("data", [])
|
||||||
|
ordered = sorted(data, key=lambda x: x["index"])
|
||||||
|
return [item["embedding"] for item in ordered]
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
log.debug("embed_failed text=%r error=%s", text[:40], exc)
|
log.debug("litellm_embed_failed error=%s", exc)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _embed_via_ollama(texts: list[str], ollama_url: str) -> list[list[float]] | None:
|
||||||
|
"""Batch embed via Ollama /api/embed endpoint."""
|
||||||
|
try:
|
||||||
|
results = []
|
||||||
|
with httpx.Client(trust_env=False, timeout=_EMBED_TIMEOUT) as c:
|
||||||
|
for text in texts:
|
||||||
|
r = c.post(
|
||||||
|
f"{ollama_url}/api/embed",
|
||||||
|
json={"model": "nomic-embed-text", "input": text},
|
||||||
|
)
|
||||||
|
r.raise_for_status()
|
||||||
|
body = r.json()
|
||||||
|
# /api/embed returns {"embeddings": [[...]]}
|
||||||
|
embeddings = body.get("embeddings")
|
||||||
|
if not embeddings:
|
||||||
|
return None
|
||||||
|
results.append(embeddings[0])
|
||||||
|
return results
|
||||||
|
except Exception as exc:
|
||||||
|
log.debug("ollama_embed_failed error=%s", exc)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _embed_batch(texts: list[str]) -> list[list[float]] | None:
|
||||||
|
"""Embed a list of texts, preferring LiteLLM over direct Ollama."""
|
||||||
|
litellm_url = os.getenv("LITELLM_URL")
|
||||||
|
if litellm_url:
|
||||||
|
vecs = _embed_via_litellm(texts, litellm_url)
|
||||||
|
if vecs is not None:
|
||||||
|
return vecs
|
||||||
|
log.info("cluster: litellm embed failed, trying ollama fallback")
|
||||||
|
|
||||||
|
ollama_url = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
|
||||||
|
return _embed_via_ollama(texts, ollama_url)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Clustering
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def _cosine(a: list[float], b: list[float]) -> float:
|
def _cosine(a: list[float], b: list[float]) -> float:
|
||||||
dot = sum(x * y for x, y in zip(a, b))
|
dot = sum(x * y for x, y in zip(a, b))
|
||||||
na = math.sqrt(sum(x * x for x in a))
|
na = math.sqrt(sum(x * x for x in a))
|
||||||
@@ -109,17 +241,18 @@ def _fallback_by_project(tasks: list[dict]) -> list[Cluster]:
|
|||||||
|
|
||||||
def cluster_tasks(
|
def cluster_tasks(
|
||||||
tasks: list[dict],
|
tasks: list[dict],
|
||||||
ollama_url: str | None = None,
|
ollama_url: str | None = None, # kept for test compatibility; env vars take precedence
|
||||||
) -> list[Cluster]:
|
enrichment_cache: dict[str, str] | None = None,
|
||||||
|
) -> tuple[list[Cluster], dict[str, str]]:
|
||||||
"""Cluster tasks by semantic similarity.
|
"""Cluster tasks by semantic similarity.
|
||||||
|
|
||||||
Returns a non-empty list of Cluster objects. Falls back to project-based
|
Returns (clusters, new_enrichments). new_enrichments contains LLM-generated
|
||||||
grouping if Ollama is unavailable or tasks have no content.
|
descriptions produced this call that were not in the persistent cache — the
|
||||||
|
caller should persist these. Falls back to project-based grouping if the
|
||||||
|
embedding service is unavailable or tasks have no content.
|
||||||
"""
|
"""
|
||||||
if not tasks:
|
if not tasks:
|
||||||
return []
|
return [], {}
|
||||||
|
|
||||||
url = ollama_url or os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
|
|
||||||
|
|
||||||
# Separate tasks with usable content from those without.
|
# Separate tasks with usable content from those without.
|
||||||
with_content = [(t, t.get("content", "").strip()) for t in tasks]
|
with_content = [(t, t.get("content", "").strip()) for t in tasks]
|
||||||
@@ -127,26 +260,31 @@ def cluster_tasks(
|
|||||||
no_content = [t for t, c in with_content if not c]
|
no_content = [t for t, c in with_content if not c]
|
||||||
|
|
||||||
if not embeddable:
|
if not embeddable:
|
||||||
return _fallback_by_project(tasks)
|
return _fallback_by_project(tasks), {}
|
||||||
|
|
||||||
# Fetch embeddings (best-effort; None means Ollama unavailable).
|
task_objs = [t for t, _ in embeddable]
|
||||||
embedded: list[tuple[dict, list[float]]] = []
|
raw_titles = [c for _, c in embeddable]
|
||||||
failed = False
|
|
||||||
for task, content in embeddable:
|
|
||||||
vec = _embed(content, url)
|
|
||||||
if vec is None:
|
|
||||||
failed = True
|
|
||||||
break
|
|
||||||
embedded.append((task, vec))
|
|
||||||
|
|
||||||
if failed or not embedded:
|
# Step 1: LLM-enrich titles → richer semantic signal before embedding.
|
||||||
log.info("cluster_tasks: ollama unavailable, falling back to project grouping")
|
descriptions, new_enrichments = _enrich_batch(raw_titles, persistent_cache=enrichment_cache)
|
||||||
return _fallback_by_project(tasks)
|
|
||||||
|
|
||||||
|
# Attach enriched description to each task dict so consumers (e.g. focus-area)
|
||||||
|
# can show the expanded text instead of the terse raw title.
|
||||||
|
for task, desc in zip(task_objs, descriptions):
|
||||||
|
task["enriched_description"] = desc
|
||||||
|
|
||||||
|
# Step 2: Prefix with nomic-embed-text task prefix, then batch-embed.
|
||||||
|
prefixed = [f"clustering: {d}" for d in descriptions]
|
||||||
|
vecs = _embed_batch(prefixed)
|
||||||
|
|
||||||
|
if vecs is None or len(vecs) != len(prefixed):
|
||||||
|
log.info("cluster_tasks: embedding unavailable, falling back to project grouping")
|
||||||
|
return _fallback_by_project(tasks), new_enrichments
|
||||||
|
|
||||||
|
embedded = list(zip(task_objs, vecs))
|
||||||
clusters = _greedy_cluster(embedded)
|
clusters = _greedy_cluster(embedded)
|
||||||
|
|
||||||
# Tasks without content get their own bucket if any.
|
|
||||||
if no_content:
|
if no_content:
|
||||||
clusters.append(Cluster(label="Other tasks", tasks=no_content))
|
clusters.append(Cluster(label="Other tasks", tasks=no_content))
|
||||||
|
|
||||||
return clusters
|
return clusters, new_enrichments
|
||||||
|
|||||||
@@ -1,113 +1,70 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from collections import Counter
|
|
||||||
from typing import ClassVar
|
from typing import ClassVar
|
||||||
|
|
||||||
from .base import BaseAgent, AgentInput, AgentOutput
|
from .base import BaseAgent, AgentInput, AgentOutput
|
||||||
from .clustering import cluster_tasks
|
from .clustering import cluster_tasks
|
||||||
from .inference.history import UserHistory
|
from .manifest import AgentManifest
|
||||||
from .manifest import AgentManifest, InferredParam
|
|
||||||
|
|
||||||
|
|
||||||
def _infer_preferred_areas(history: UserHistory) -> list[str]:
|
|
||||||
"""Top-2 project IDs by completed task count (last 90 days worth of data)."""
|
|
||||||
counts: Counter[str] = Counter()
|
|
||||||
for tc in history.task_completions:
|
|
||||||
if tc.project_id:
|
|
||||||
counts[tc.project_id] += 1
|
|
||||||
return [pid for pid, _ in counts.most_common(2)]
|
|
||||||
|
|
||||||
|
|
||||||
MANIFEST = AgentManifest(
|
MANIFEST = AgentManifest(
|
||||||
id="focus-area",
|
id="focus-area",
|
||||||
version="2.0.0", # semantic clustering via nomic-embed-text (#97, #113)
|
version="3.0.0", # output all clusters as context; no scoring (#129)
|
||||||
description="Identifies the most congested semantic focus area in the user's task list.",
|
description="Clusters tasks semantically, enriches titles via LLM, and outputs a full area summary with expanded descriptions for the orchestrator.",
|
||||||
pref_schema={
|
pref_schema={"type": "object", "additionalProperties": False, "properties": {}},
|
||||||
"type": "object",
|
|
||||||
"additionalProperties": False,
|
|
||||||
"properties": {
|
|
||||||
"preferred_areas": {
|
|
||||||
"type": "array",
|
|
||||||
"items": {"type": "string"},
|
|
||||||
"default": [],
|
|
||||||
"description": "Project IDs or label names to prioritise when multiple areas tie.",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
context_schema=["todoist.tasks"],
|
context_schema=["todoist.tasks"],
|
||||||
required_consents=["data:core", "data:todoist", "agent:focus-area"],
|
required_consents=["data:core", "data:todoist"],
|
||||||
output_contract={"type": "snippet", "format": "free_text"},
|
output_contract={"type": "snippet", "format": "free_text"},
|
||||||
ttl_sec=43_200,
|
|
||||||
inferred_params=[
|
|
||||||
InferredParam(
|
|
||||||
key="preferred_areas",
|
|
||||||
ttl_sec=86_400,
|
ttl_sec=86_400,
|
||||||
cold_start_default=[],
|
inferred_params=[],
|
||||||
min_history=0, # use task_completions, not feedback events; handle empty inside
|
|
||||||
infer=_infer_preferred_areas,
|
|
||||||
),
|
|
||||||
],
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class FocusAreaAgent(BaseAgent):
|
class FocusAreaAgent(BaseAgent):
|
||||||
"""Identifies the most congested semantic focus area in the user's task list."""
|
"""Clusters tasks and outputs a full area summary for the orchestrator."""
|
||||||
agent_id: ClassVar[str] = MANIFEST.id
|
agent_id: ClassVar[str] = MANIFEST.id
|
||||||
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
|
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
|
||||||
version: ClassVar[str] = MANIFEST.version
|
version: ClassVar[str] = MANIFEST.version # 3.0.0
|
||||||
|
|
||||||
def compute(self, inp: AgentInput) -> AgentOutput:
|
def compute(self, inp: AgentInput) -> AgentOutput:
|
||||||
preferred: list[str] = inp.agent_prefs.get("preferred_areas", [])
|
|
||||||
|
|
||||||
if not inp.tasks:
|
if not inp.tasks:
|
||||||
return self._make_output(
|
return self._make_output(
|
||||||
inp,
|
inp,
|
||||||
"No tasks available to identify a focus area.",
|
"No tasks available to identify focus areas.",
|
||||||
{"cluster_count": 0, "strategy": "none"},
|
{"cluster_count": 0},
|
||||||
)
|
)
|
||||||
|
|
||||||
clusters = cluster_tasks(inp.tasks)
|
clusters, new_enrichments = cluster_tasks(inp.tasks, enrichment_cache=inp.enrichment_cache)
|
||||||
|
|
||||||
if not clusters:
|
if not clusters:
|
||||||
return self._make_output(
|
return self._make_output(
|
||||||
inp,
|
inp,
|
||||||
"No tasks available to identify a focus area.",
|
"No tasks available to identify focus areas.",
|
||||||
{"cluster_count": 0, "strategy": "none"},
|
{"cluster_count": 0},
|
||||||
)
|
)
|
||||||
|
|
||||||
strategy = "semantic" if len(clusters) > 1 or len(inp.tasks) > 1 else "fallback"
|
lines = [f"The user's tasks are grouped into {len(clusters)} area(s):"]
|
||||||
|
for i, cluster in enumerate(clusters, 1):
|
||||||
def score(cluster) -> float:
|
descs = [
|
||||||
base = sum(2.0 if t.get("is_overdue") else 1.0 for t in cluster.tasks)
|
t.get("enriched_description") or t.get("content", "")
|
||||||
boosted = any(p in cluster.label for p in preferred) if preferred else False
|
for t in cluster.tasks
|
||||||
return base + (0.5 if boosted else 0.0)
|
if t.get("content")
|
||||||
|
|
||||||
top = max(clusters, key=score)
|
|
||||||
boosted = bool(preferred) and any(p in top.label for p in preferred)
|
|
||||||
|
|
||||||
parts = [
|
|
||||||
f'The user\'s most active focus area is "{top.label}" '
|
|
||||||
f"({top.task_count} task{'s' if top.task_count != 1 else ''}, "
|
|
||||||
f"{top.overdue_count} overdue). "
|
|
||||||
f"(Note: task titles may be in any language — always write the tip in English.)"
|
|
||||||
]
|
]
|
||||||
if boosted:
|
descs = [d.strip() for d in descs if d.strip()]
|
||||||
parts.append("This area matches the user's stated focus preferences.")
|
descs_str = "; ".join(f'"{d}"' for d in descs[:8])
|
||||||
if top.overdue_count >= 3:
|
if len(descs) > 8:
|
||||||
parts.append("Consider surfacing an action from this area.")
|
descs_str += f" (and {len(descs) - 8} more)"
|
||||||
if len(clusters) > 1:
|
lines.append(f"{i}. {cluster.label} — {cluster.task_count} task(s): {descs_str}")
|
||||||
other_total = sum(c.task_count for c in clusters if c is not top)
|
|
||||||
parts.append(
|
lines.append("(Task titles may be in any language — always write the tip in English.)")
|
||||||
f"{len(clusters) - 1} other area{'s' if len(clusters) > 2 else ''} "
|
|
||||||
f"contain {other_total} task{'s' if other_total != 1 else ''}."
|
|
||||||
)
|
|
||||||
|
|
||||||
snapshot = {
|
snapshot = {
|
||||||
"top_cluster_label": top.label,
|
|
||||||
"top_task_count": top.task_count,
|
|
||||||
"top_overdue_count": top.overdue_count,
|
|
||||||
"cluster_count": len(clusters),
|
"cluster_count": len(clusters),
|
||||||
"strategy": strategy,
|
"clusters": [
|
||||||
"preferred_areas": preferred,
|
{"label": c.label, "task_count": c.task_count,
|
||||||
|
"tasks": [t.get("content", "") for t in c.tasks]}
|
||||||
|
for c in clusters
|
||||||
|
],
|
||||||
|
"_new_enrichments": new_enrichments,
|
||||||
}
|
}
|
||||||
return self._make_output(inp, " ".join(parts), snapshot)
|
return self._make_output(inp, "\n".join(lines), snapshot)
|
||||||
|
|||||||
134
ml/agents/health_vitals.py
Normal file
134
ml/agents/health_vitals.py
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import ClassVar
|
||||||
|
|
||||||
|
from .base import BaseAgent, AgentInput, AgentOutput
|
||||||
|
from .manifest import AgentManifest, InferredParam
|
||||||
|
from .inference.history import UserHistory
|
||||||
|
|
||||||
|
|
||||||
|
def _infer_step_goal(history: UserHistory) -> int:
|
||||||
|
"""Return median daily step count as the personal goal baseline (min 1000)."""
|
||||||
|
if not history.task_completions:
|
||||||
|
return 7_000
|
||||||
|
# task_completions reused as a generic history mechanism here;
|
||||||
|
# step history arrives via agent_prefs.step_history when available.
|
||||||
|
return 7_000
|
||||||
|
|
||||||
|
|
||||||
|
MANIFEST = AgentManifest(
|
||||||
|
id="health-vitals",
|
||||||
|
version="1.0.0",
|
||||||
|
description="Summarises today's health signals: steps, sleep, activity, and heart rate.",
|
||||||
|
pref_schema={
|
||||||
|
"type": "object",
|
||||||
|
"additionalProperties": False,
|
||||||
|
"properties": {
|
||||||
|
"step_goal": {
|
||||||
|
"type": "integer",
|
||||||
|
"minimum": 1000,
|
||||||
|
"default": 7000,
|
||||||
|
"description": "Daily step goal.",
|
||||||
|
},
|
||||||
|
"sleep_goal_hours": {
|
||||||
|
"type": "number",
|
||||||
|
"minimum": 4,
|
||||||
|
"maximum": 12,
|
||||||
|
"default": 7,
|
||||||
|
"description": "Target sleep duration in hours.",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
context_schema=["google-health.steps", "google-health.sleep", "google-health.activity", "google-health.heart_rate"],
|
||||||
|
required_consents=["data:core", "data:google-health"],
|
||||||
|
output_contract={"type": "snippet", "format": "free_text"},
|
||||||
|
ttl_sec=1800, # refresh every 30 min — health data changes during the day
|
||||||
|
silenced_in_contexts=[],
|
||||||
|
inferred_params=[
|
||||||
|
InferredParam(
|
||||||
|
key="step_goal",
|
||||||
|
ttl_sec=7 * 86_400,
|
||||||
|
cold_start_default=7000,
|
||||||
|
min_history=0,
|
||||||
|
infer=lambda h: 7000, # static default; override via user pref
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class HealthVitalsAgent(BaseAgent):
|
||||||
|
"""Summarises today's health signals into an orchestrator prompt snippet."""
|
||||||
|
|
||||||
|
agent_id: ClassVar[str] = MANIFEST.id
|
||||||
|
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
|
||||||
|
version: ClassVar[str] = MANIFEST.version
|
||||||
|
|
||||||
|
def compute(self, inp: AgentInput) -> AgentOutput:
|
||||||
|
step_goal = int(inp.agent_prefs.get("step_goal", 7000))
|
||||||
|
sleep_goal = float(inp.agent_prefs.get("sleep_goal_hours", 7.0))
|
||||||
|
|
||||||
|
health = [t for t in inp.tasks if t.get("source") == "google-health"]
|
||||||
|
|
||||||
|
if not health:
|
||||||
|
prompt = "No health data available from Google Fit today. (Always write the tip in English.)"
|
||||||
|
return self._make_output(inp, prompt, {"no_data": True})
|
||||||
|
|
||||||
|
steps_sig = next((t for t in health if str(t.get("id", "")).endswith(":steps")), None)
|
||||||
|
sleep_sig = next((t for t in health if str(t.get("id", "")).endswith(":sleep")), None)
|
||||||
|
activity_sig = next((t for t in health if str(t.get("id", "")).endswith(":activity")), None)
|
||||||
|
hr_sig = next((t for t in health if str(t.get("id", "")).endswith(":heart_rate")), None)
|
||||||
|
|
||||||
|
insights: list[str] = []
|
||||||
|
snapshot: dict = {}
|
||||||
|
|
||||||
|
if steps_sig is not None:
|
||||||
|
steps = int(steps_sig.get("step_count", 0))
|
||||||
|
pct = round(steps / step_goal * 100) if step_goal else 0
|
||||||
|
snapshot["step_count"] = steps
|
||||||
|
snapshot["step_goal_pct"] = pct
|
||||||
|
if pct < 30:
|
||||||
|
insights.append(f"only {steps:,} steps today ({pct}% of {step_goal:,} goal — significantly behind)")
|
||||||
|
elif pct < 60:
|
||||||
|
insights.append(f"{steps:,} steps today ({pct}% of {step_goal:,} goal)")
|
||||||
|
elif pct >= 100:
|
||||||
|
insights.append(f"{steps:,} steps today (daily goal reached!)")
|
||||||
|
else:
|
||||||
|
insights.append(f"{steps:,} steps today ({pct}% of goal)")
|
||||||
|
|
||||||
|
if sleep_sig is not None:
|
||||||
|
hours = float(sleep_sig.get("sleep_hours", 0))
|
||||||
|
deficit = max(0.0, sleep_goal - hours)
|
||||||
|
snapshot["sleep_hours"] = hours
|
||||||
|
snapshot["sleep_deficit_hours"] = deficit
|
||||||
|
if deficit >= 1.5:
|
||||||
|
insights.append(f"only {hours:.1f}h sleep last night ({deficit:.1f}h below the {sleep_goal:.0f}h goal)")
|
||||||
|
elif deficit > 0:
|
||||||
|
insights.append(f"{hours:.1f}h sleep last night (slightly below {sleep_goal:.0f}h goal)")
|
||||||
|
else:
|
||||||
|
insights.append(f"{hours:.1f}h sleep last night (goal met)")
|
||||||
|
|
||||||
|
if activity_sig is not None:
|
||||||
|
active_mins = int(activity_sig.get("active_minutes", 0))
|
||||||
|
calories = int(activity_sig.get("calories_burned", 0))
|
||||||
|
snapshot["active_minutes"] = active_mins
|
||||||
|
snapshot["calories_burned"] = calories
|
||||||
|
if active_mins < 10:
|
||||||
|
insights.append(f"only {active_mins} active minutes today — largely sedentary")
|
||||||
|
elif active_mins >= 30:
|
||||||
|
insights.append(f"{active_mins} active minutes and {calories} kcal burned today")
|
||||||
|
|
||||||
|
if hr_sig is not None:
|
||||||
|
bpm = int(hr_sig.get("resting_bpm", 0))
|
||||||
|
snapshot["resting_bpm"] = bpm
|
||||||
|
if bpm > 90:
|
||||||
|
insights.append(f"elevated resting heart rate: {bpm} bpm")
|
||||||
|
elif bpm > 0:
|
||||||
|
insights.append(f"resting heart rate: {bpm} bpm")
|
||||||
|
|
||||||
|
if not insights:
|
||||||
|
prompt = "Health data is available but no notable signals today. (Always write the tip in English.)"
|
||||||
|
else:
|
||||||
|
body = "; ".join(insights)
|
||||||
|
prompt = f"Health snapshot: {body}. (Always write the tip in English.)"
|
||||||
|
|
||||||
|
return self._make_output(inp, prompt, snapshot)
|
||||||
@@ -121,7 +121,7 @@ MANIFEST = AgentManifest(
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
context_schema=["profile.features"],
|
context_schema=["profile.features"],
|
||||||
required_consents=["data:core", "agent:momentum"],
|
required_consents=["data:core"],
|
||||||
output_contract={"type": "snippet", "format": "free_text"},
|
output_contract={"type": "snippet", "format": "free_text"},
|
||||||
ttl_sec=21_600,
|
ttl_sec=21_600,
|
||||||
inferred_params=[
|
inferred_params=[
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ MANIFEST = AgentManifest(
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
context_schema=["todoist.tasks"],
|
context_schema=["todoist.tasks"],
|
||||||
required_consents=["data:core", "data:todoist", "agent:overdue-task"],
|
required_consents=["data:core", "data:todoist"],
|
||||||
output_contract={"type": "snippet", "format": "free_text"},
|
output_contract={"type": "snippet", "format": "free_text"},
|
||||||
ttl_sec=3600,
|
ttl_sec=3600,
|
||||||
silenced_in_contexts=["vacation"],
|
silenced_in_contexts=["vacation"],
|
||||||
|
|||||||
@@ -131,7 +131,7 @@ MANIFEST = AgentManifest(
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
context_schema=["tip_feedback", "profile.features"],
|
context_schema=["tip_feedback", "profile.features"],
|
||||||
required_consents=["data:core", "agent:recent-patterns"],
|
required_consents=["data:core"],
|
||||||
output_contract={"type": "snippet", "format": "free_text"},
|
output_contract={"type": "snippet", "format": "free_text"},
|
||||||
ttl_sec=86_400,
|
ttl_sec=86_400,
|
||||||
inferred_params=[
|
inferred_params=[
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ from .momentum import MomentumAgent, MANIFEST as MOMENTUM_MANIFEST
|
|||||||
from .time_of_day import TimeOfDayAgent, MANIFEST as TIME_OF_DAY_MANIFEST
|
from .time_of_day import TimeOfDayAgent, MANIFEST as TIME_OF_DAY_MANIFEST
|
||||||
from .recent_patterns import RecentPatternsAgent, MANIFEST as RECENT_PATTERNS_MANIFEST
|
from .recent_patterns import RecentPatternsAgent, MANIFEST as RECENT_PATTERNS_MANIFEST
|
||||||
from .focus_area import FocusAreaAgent, MANIFEST as FOCUS_AREA_MANIFEST
|
from .focus_area import FocusAreaAgent, MANIFEST as FOCUS_AREA_MANIFEST
|
||||||
|
from .health_vitals import HealthVitalsAgent, MANIFEST as HEALTH_VITALS_MANIFEST
|
||||||
|
|
||||||
_REGISTERED: list[tuple[BaseAgent, AgentManifest]] = [
|
_REGISTERED: list[tuple[BaseAgent, AgentManifest]] = [
|
||||||
(OverdueTaskAgent(), OVERDUE_TASK_MANIFEST),
|
(OverdueTaskAgent(), OVERDUE_TASK_MANIFEST),
|
||||||
@@ -23,6 +24,7 @@ _REGISTERED: list[tuple[BaseAgent, AgentManifest]] = [
|
|||||||
(TimeOfDayAgent(), TIME_OF_DAY_MANIFEST),
|
(TimeOfDayAgent(), TIME_OF_DAY_MANIFEST),
|
||||||
(RecentPatternsAgent(), RECENT_PATTERNS_MANIFEST),
|
(RecentPatternsAgent(), RECENT_PATTERNS_MANIFEST),
|
||||||
(FocusAreaAgent(), FOCUS_AREA_MANIFEST),
|
(FocusAreaAgent(), FOCUS_AREA_MANIFEST),
|
||||||
|
(HealthVitalsAgent(), HEALTH_VITALS_MANIFEST),
|
||||||
]
|
]
|
||||||
|
|
||||||
# Sanity check — agent_id and manifest.id must agree, otherwise the registry
|
# Sanity check — agent_id and manifest.id must agree, otherwise the registry
|
||||||
|
|||||||
@@ -213,40 +213,41 @@ class TestFocusAreaAgent:
|
|||||||
out = self.agent.compute(_inp())
|
out = self.agent.compute(_inp())
|
||||||
assert "no tasks" in out.prompt_text.lower()
|
assert "no tasks" in out.prompt_text.lower()
|
||||||
|
|
||||||
def test_single_project(self):
|
def test_lists_all_clusters(self):
|
||||||
tasks = [_task(f"T{i}", project_id="Work") for i in range(3)]
|
|
||||||
out = self.agent.compute(_inp(tasks=tasks))
|
|
||||||
assert '"Work"' in out.prompt_text
|
|
||||||
assert "3 tasks" in out.prompt_text
|
|
||||||
|
|
||||||
def test_most_congested_wins(self):
|
|
||||||
tasks = (
|
tasks = (
|
||||||
[_task(f"W{i}", project_id="Work") for i in range(5)]
|
[_task(f"W{i}", project_id="Work") for i in range(3)]
|
||||||
+ [_task(f"H{i}", project_id="Home") for i in range(2)]
|
+ [_task(f"H{i}", project_id="Home") for i in range(2)]
|
||||||
)
|
)
|
||||||
out = self.agent.compute(_inp(tasks=tasks))
|
out = self.agent.compute(_inp(tasks=tasks))
|
||||||
assert '"Work"' in out.prompt_text
|
assert "Work" in out.prompt_text
|
||||||
|
assert "Home" in out.prompt_text
|
||||||
|
|
||||||
def test_overdue_weighting(self):
|
def test_includes_task_titles(self):
|
||||||
# Home has 2 tasks (1 overdue), Work has 3 non-overdue tasks
|
tasks = [_task("Buy milk", project_id="Personal"), _task("Write report", project_id="Personal")]
|
||||||
# Home score = 2+1 = 3; Work score = 3 — Home should win due to overdue weight
|
|
||||||
tasks = (
|
|
||||||
[_task("Home1", project_id="Home", is_overdue=True),
|
|
||||||
_task("Home2", project_id="Home")]
|
|
||||||
+ [_task(f"W{i}", project_id="Work") for i in range(3)]
|
|
||||||
)
|
|
||||||
out = self.agent.compute(_inp(tasks=tasks))
|
out = self.agent.compute(_inp(tasks=tasks))
|
||||||
assert '"Work"' not in out.prompt_text or '"Home"' in out.prompt_text
|
assert '"Buy milk"' in out.prompt_text
|
||||||
|
assert '"Write report"' in out.prompt_text
|
||||||
|
|
||||||
|
def test_task_count_in_output(self):
|
||||||
|
tasks = [_task(f"T{i}", project_id="Work") for i in range(3)]
|
||||||
|
out = self.agent.compute(_inp(tasks=tasks))
|
||||||
|
assert "3 task" in out.prompt_text
|
||||||
|
|
||||||
def test_default_project_fallback(self):
|
def test_default_project_fallback(self):
|
||||||
out = self.agent.compute(_inp(tasks=[_task("No project task")]))
|
out = self.agent.compute(_inp(tasks=[_task("No project task")]))
|
||||||
# Tasks without project_id fall back to a "Tasks" bucket
|
|
||||||
assert "Tasks" in out.prompt_text
|
assert "Tasks" in out.prompt_text
|
||||||
|
|
||||||
def test_snapshot_keys(self):
|
def test_snapshot_keys(self):
|
||||||
out = self.agent.compute(_inp(tasks=[_task("T1", project_id="A")]))
|
out = self.agent.compute(_inp(tasks=[_task("T1", project_id="A")]))
|
||||||
assert {"top_cluster_label", "top_task_count", "top_overdue_count", "cluster_count",
|
public_keys = {k for k in out.signals_snapshot if not k.startswith("_")}
|
||||||
"strategy", "preferred_areas"} == set(out.signals_snapshot)
|
assert {"cluster_count", "clusters"} == public_keys
|
||||||
|
|
||||||
|
def test_snapshot_clusters_shape(self):
|
||||||
|
tasks = [_task("Buy milk", project_id="P1"), _task("Fix bug", project_id="P2")]
|
||||||
|
out = self.agent.compute(_inp(tasks=tasks))
|
||||||
|
clusters = out.signals_snapshot["clusters"]
|
||||||
|
assert isinstance(clusters, list)
|
||||||
|
assert all("label" in c and "task_count" in c and "tasks" in c for c in clusters)
|
||||||
|
|
||||||
|
|
||||||
# ── Registry ─────────────────────────────────────────────────────────────────
|
# ── Registry ─────────────────────────────────────────────────────────────────
|
||||||
@@ -255,7 +256,7 @@ class TestRegistry:
|
|||||||
def test_all_agents_present(self):
|
def test_all_agents_present(self):
|
||||||
agents = all_agents()
|
agents = all_agents()
|
||||||
ids = {a.agent_id for a in agents}
|
ids = {a.agent_id for a in agents}
|
||||||
assert ids == {"overdue-task", "momentum", "time-of-day", "recent-patterns", "focus-area"}
|
assert ids == {"overdue-task", "momentum", "time-of-day", "recent-patterns", "focus-area", "health-vitals"}
|
||||||
|
|
||||||
def test_get_agent(self):
|
def test_get_agent(self):
|
||||||
a = get_agent("momentum")
|
a = get_agent("momentum")
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
"""Unit tests for ml.agents.clustering (issue #97).
|
"""Unit tests for ml.agents.clustering (issue #97, #129).
|
||||||
|
|
||||||
Embedding calls are mocked so tests run without Ollama.
|
LLM and embedding calls are mocked so tests run without Ollama or LiteLLM.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
@@ -9,7 +9,7 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
|||||||
|
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
from ml.agents.clustering import cluster_tasks, Cluster, _greedy_cluster, _cosine
|
from ml.agents.clustering import cluster_tasks, Cluster, _greedy_cluster, _cosine, _embed_batch, _enrich_batch
|
||||||
|
|
||||||
|
|
||||||
# ── helpers ──────────────────────────────────────────────────────────────────
|
# ── helpers ──────────────────────────────────────────────────────────────────
|
||||||
@@ -82,54 +82,128 @@ class TestGreedyClustering:
|
|||||||
assert clusters[0].label == "Write report"
|
assert clusters[0].label == "Write report"
|
||||||
|
|
||||||
|
|
||||||
|
# ── enrichment ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestEnrichBatch:
|
||||||
|
def test_falls_back_to_raw_when_no_litellm_url(self, monkeypatch):
|
||||||
|
monkeypatch.delenv("LITELLM_URL", raising=False)
|
||||||
|
result, new = _enrich_batch(["Buy milk", "Fix bug"])
|
||||||
|
assert result == ["Buy milk", "Fix bug"] and new == {}
|
||||||
|
|
||||||
|
def test_uses_description_when_litellm_available(self, monkeypatch):
|
||||||
|
monkeypatch.setenv("LITELLM_URL", "http://fake-litellm")
|
||||||
|
with patch("ml.agents.clustering._enrich_title", return_value="Expanded description."):
|
||||||
|
result, new = _enrich_batch(["Buy milk"])
|
||||||
|
assert result == ["Expanded description."]
|
||||||
|
assert len(new) == 1
|
||||||
|
|
||||||
|
def test_falls_back_to_raw_title_on_enrich_failure(self, monkeypatch):
|
||||||
|
monkeypatch.setenv("LITELLM_URL", "http://fake-litellm")
|
||||||
|
with patch("ml.agents.clustering._enrich_title", return_value=None):
|
||||||
|
result, new = _enrich_batch(["Buy milk"])
|
||||||
|
assert result == ["Buy milk"]
|
||||||
|
assert new == {} # failed enrichments are not persisted
|
||||||
|
|
||||||
|
def test_deduplicates_identical_titles(self, monkeypatch):
|
||||||
|
monkeypatch.setenv("LITELLM_URL", "http://fake-litellm")
|
||||||
|
call_count = {"n": 0}
|
||||||
|
def fake_enrich(title, url):
|
||||||
|
call_count["n"] += 1
|
||||||
|
return f"desc:{title}"
|
||||||
|
with patch("ml.agents.clustering._enrich_title", side_effect=fake_enrich):
|
||||||
|
result, new = _enrich_batch(["Buy milk", "Buy milk", "Fix bug"])
|
||||||
|
assert call_count["n"] == 2 # only 2 unique titles
|
||||||
|
assert result == ["desc:Buy milk", "desc:Buy milk", "desc:Fix bug"]
|
||||||
|
|
||||||
|
def test_uses_persistent_cache(self, monkeypatch):
|
||||||
|
monkeypatch.setenv("LITELLM_URL", "http://fake-litellm")
|
||||||
|
from ml.agents.clustering import _content_hash
|
||||||
|
h = _content_hash("Buy milk")
|
||||||
|
call_count = {"n": 0}
|
||||||
|
def fake_enrich(title, url):
|
||||||
|
call_count["n"] += 1
|
||||||
|
return "new desc"
|
||||||
|
with patch("ml.agents.clustering._enrich_title", side_effect=fake_enrich):
|
||||||
|
result, new = _enrich_batch(["Buy milk"], persistent_cache={h: "cached desc"})
|
||||||
|
assert call_count["n"] == 0 # cache hit, no LLM call
|
||||||
|
assert result == ["cached desc"]
|
||||||
|
assert new == {}
|
||||||
|
|
||||||
|
|
||||||
# ── cluster_tasks integration ─────────────────────────────────────────────────
|
# ── cluster_tasks integration ─────────────────────────────────────────────────
|
||||||
|
|
||||||
class TestClusterTasks:
|
class TestClusterTasks:
|
||||||
def test_empty_tasks(self):
|
def _no_enrich(self, titles, persistent_cache=None):
|
||||||
result = cluster_tasks([])
|
return titles, {}
|
||||||
assert result == []
|
|
||||||
|
|
||||||
def test_fallback_when_ollama_unavailable(self):
|
def test_empty_tasks(self):
|
||||||
with patch("ml.agents.clustering._embed", return_value=None):
|
clusters, new = cluster_tasks([])
|
||||||
|
assert clusters == [] and new == {}
|
||||||
|
|
||||||
|
def test_fallback_when_embed_unavailable(self):
|
||||||
|
with patch("ml.agents.clustering._enrich_batch", side_effect=self._no_enrich), \
|
||||||
|
patch("ml.agents.clustering._embed_batch", return_value=None):
|
||||||
tasks = [_task("A", "p1"), _task("B", "p2"), _task("C", "p1")]
|
tasks = [_task("A", "p1"), _task("B", "p2"), _task("C", "p1")]
|
||||||
clusters = cluster_tasks(tasks)
|
clusters, _ = cluster_tasks(tasks)
|
||||||
assert len(clusters) == 2
|
assert len(clusters) == 2
|
||||||
labels = {c.label for c in clusters}
|
labels = {c.label for c in clusters}
|
||||||
assert "p1" in labels and "p2" in labels
|
assert "p1" in labels and "p2" in labels
|
||||||
|
|
||||||
def test_fallback_groups_by_project(self):
|
def test_fallback_groups_by_project(self):
|
||||||
with patch("ml.agents.clustering._embed", return_value=None):
|
with patch("ml.agents.clustering._enrich_batch", side_effect=self._no_enrich), \
|
||||||
|
patch("ml.agents.clustering._embed_batch", return_value=None):
|
||||||
tasks = [_task("A", "work")] * 3 + [_task("B", "home")] * 2
|
tasks = [_task("A", "work")] * 3 + [_task("B", "home")] * 2
|
||||||
clusters = cluster_tasks(tasks)
|
clusters, _ = cluster_tasks(tasks)
|
||||||
by_label = {c.label: c.task_count for c in clusters}
|
by_label = {c.label: c.task_count for c in clusters}
|
||||||
assert by_label["work"] == 3
|
assert by_label["work"] == 3
|
||||||
assert by_label["home"] == 2
|
assert by_label["home"] == 2
|
||||||
|
|
||||||
def test_tasks_without_content_go_to_other(self):
|
def test_tasks_without_content_go_to_other(self):
|
||||||
v = [1.0, 0.0]
|
v = [1.0, 0.0]
|
||||||
with patch("ml.agents.clustering._embed", return_value=v):
|
with patch("ml.agents.clustering._enrich_batch", side_effect=self._no_enrich), \
|
||||||
|
patch("ml.agents.clustering._embed_batch", return_value=[v]):
|
||||||
tasks = [_task("Has content"), {"is_overdue": False}]
|
tasks = [_task("Has content"), {"is_overdue": False}]
|
||||||
clusters = cluster_tasks(tasks)
|
clusters, _ = cluster_tasks(tasks)
|
||||||
labels = {c.label for c in clusters}
|
labels = {c.label for c in clusters}
|
||||||
assert "Other tasks" in labels
|
assert "Other tasks" in labels
|
||||||
|
|
||||||
def test_semantic_clustering_groups_similar(self):
|
def test_semantic_clustering_groups_similar(self):
|
||||||
v_work = [1.0, 0.0, 0.0]
|
v_work = [1.0, 0.0, 0.0]
|
||||||
v_home = [0.0, 1.0, 0.0]
|
v_home = [0.0, 1.0, 0.0]
|
||||||
side_effects = [v_work, v_work, v_home, v_home]
|
batch_result = [v_work, v_work, v_home, v_home]
|
||||||
with patch("ml.agents.clustering._embed", side_effect=side_effects):
|
with patch("ml.agents.clustering._enrich_batch", side_effect=self._no_enrich), \
|
||||||
|
patch("ml.agents.clustering._embed_batch", return_value=batch_result):
|
||||||
tasks = [
|
tasks = [
|
||||||
_task("Write report"),
|
_task("Write report"),
|
||||||
_task("Review PR"),
|
_task("Review PR"),
|
||||||
_task("Buy groceries"),
|
_task("Buy groceries"),
|
||||||
_task("Cook dinner"),
|
_task("Cook dinner"),
|
||||||
]
|
]
|
||||||
clusters = cluster_tasks(tasks)
|
clusters, _ = cluster_tasks(tasks)
|
||||||
assert len(clusters) == 2
|
assert len(clusters) == 2
|
||||||
assert all(c.task_count == 2 for c in clusters)
|
assert all(c.task_count == 2 for c in clusters)
|
||||||
|
|
||||||
def test_all_tasks_no_content_fallback_by_project(self):
|
def test_all_tasks_no_content_fallback_by_project(self):
|
||||||
tasks = [{"project_id": "p1", "is_overdue": False},
|
tasks = [{"project_id": "p1", "is_overdue": False},
|
||||||
{"project_id": "p2", "is_overdue": False}]
|
{"project_id": "p2", "is_overdue": False}]
|
||||||
clusters = cluster_tasks(tasks)
|
clusters, new = cluster_tasks(tasks)
|
||||||
assert len(clusters) == 2
|
assert len(clusters) == 2 and new == {}
|
||||||
|
|
||||||
|
def test_enrich_called_before_embed(self):
|
||||||
|
"""Verify enrichment output (not raw title) is what gets embedded."""
|
||||||
|
v = [1.0, 0.0]
|
||||||
|
captured = {}
|
||||||
|
def fake_embed(texts):
|
||||||
|
captured["texts"] = texts
|
||||||
|
return [v] * len(texts)
|
||||||
|
with patch("ml.agents.clustering._enrich_batch", return_value=(["Expanded desc."], {})), \
|
||||||
|
patch("ml.agents.clustering._embed_batch", side_effect=fake_embed):
|
||||||
|
cluster_tasks([_task("Buy milk")])
|
||||||
|
assert captured["texts"] == ["clustering: Expanded desc."]
|
||||||
|
|
||||||
|
def test_new_enrichments_returned(self):
|
||||||
|
v = [1.0, 0.0]
|
||||||
|
with patch("ml.agents.clustering._enrich_batch", return_value=(["desc"], {"abc123": "desc"})), \
|
||||||
|
patch("ml.agents.clustering._embed_batch", return_value=[v]):
|
||||||
|
_, new = cluster_tasks([_task("Buy milk")])
|
||||||
|
assert new == {"abc123": "desc"}
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ def test_manifest_required_fields(agent_id: str):
|
|||||||
assert isinstance(m.pref_schema, dict) and m.pref_schema.get("type") == "object"
|
assert isinstance(m.pref_schema, dict) and m.pref_schema.get("type") == "object"
|
||||||
assert isinstance(m.required_consents, list) and m.required_consents
|
assert isinstance(m.required_consents, list) and m.required_consents
|
||||||
assert "data:core" in m.required_consents, "every agent should require data:core"
|
assert "data:core" in m.required_consents, "every agent should require data:core"
|
||||||
|
assert all(c.startswith("data:") for c in m.required_consents), "only data: consents allowed; agent: consents have been removed"
|
||||||
assert m.ttl_sec == get_agent(agent_id).ttl_seconds, "ttl divergence"
|
assert m.ttl_sec == get_agent(agent_id).ttl_seconds, "ttl divergence"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -627,86 +627,37 @@ class TestTimeOfDaySnippet:
|
|||||||
assert {"quiet_start", "quiet_end", "peak_hours", "tz"}.issubset(keys)
|
assert {"quiet_start", "quiet_end", "peak_hours", "tz"}.issubset(keys)
|
||||||
|
|
||||||
|
|
||||||
# ── focus-area: preferred_areas wiring ───────────────────────────────────────
|
# ── focus-area: cluster summary output ───────────────────────────────────────
|
||||||
|
|
||||||
class TestFocusAreaPreferredAreas:
|
class TestFocusAreaOutput:
|
||||||
agent = FocusAreaAgent()
|
agent = FocusAreaAgent()
|
||||||
|
|
||||||
def _task(self, content: str, project_id: str, is_overdue: bool = False) -> dict:
|
def _task(self, content: str, project_id: str) -> dict:
|
||||||
return {"id": "t1", "content": content, "is_overdue": is_overdue,
|
return {"id": "t1", "content": content, "is_overdue": False,
|
||||||
"task_age_days": 2.0, "priority": 1, "project_id": project_id}
|
"task_age_days": 2.0, "priority": 1, "project_id": project_id}
|
||||||
|
|
||||||
def test_preferred_area_wins_tie(self):
|
def test_version(self):
|
||||||
tasks = [
|
|
||||||
self._task("Work thing", "work"),
|
|
||||||
self._task("Home thing", "home"),
|
|
||||||
]
|
|
||||||
out = self.agent.compute(_inp(tasks=tasks, agent_prefs={"preferred_areas": ["work"]}))
|
|
||||||
assert "work" in out.prompt_text
|
|
||||||
assert "matches the user's stated focus preferences" in out.prompt_text
|
|
||||||
|
|
||||||
def test_no_preferred_areas_uses_congestion_score(self):
|
|
||||||
tasks = [
|
|
||||||
self._task("W1", "work"),
|
|
||||||
self._task("H1", "home"),
|
|
||||||
self._task("H2", "home"),
|
|
||||||
]
|
|
||||||
out = self.agent.compute(_inp(tasks=tasks))
|
|
||||||
# home has more tasks → wins without any preference
|
|
||||||
assert "home" in out.prompt_text
|
|
||||||
|
|
||||||
def test_snapshot_includes_preferred_areas(self):
|
|
||||||
tasks = [self._task("T", "work")]
|
|
||||||
out = self.agent.compute(_inp(tasks=tasks, agent_prefs={"preferred_areas": ["work"]}))
|
|
||||||
assert out.signals_snapshot["preferred_areas"] == ["work"]
|
|
||||||
|
|
||||||
def test_version_bumped(self):
|
|
||||||
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
|
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
|
||||||
assert FA_MANIFEST.version == "2.0.0"
|
assert FA_MANIFEST.version == "3.0.0"
|
||||||
|
|
||||||
def test_snapshot_uses_cluster_keys(self):
|
def test_all_clusters_in_output(self):
|
||||||
|
tasks = [self._task("Work thing", "work"), self._task("Home thing", "home")]
|
||||||
|
out = self.agent.compute(_inp(tasks=tasks))
|
||||||
|
assert "work" in out.prompt_text.lower()
|
||||||
|
assert "home" in out.prompt_text.lower()
|
||||||
|
|
||||||
|
def test_task_titles_in_output(self):
|
||||||
|
tasks = [self._task("Buy milk", "personal")]
|
||||||
|
out = self.agent.compute(_inp(tasks=tasks))
|
||||||
|
assert '"Buy milk"' in out.prompt_text
|
||||||
|
|
||||||
|
def test_snapshot_shape(self):
|
||||||
tasks = [self._task("T", "work")]
|
tasks = [self._task("T", "work")]
|
||||||
out = self.agent.compute(_inp(tasks=tasks))
|
out = self.agent.compute(_inp(tasks=tasks))
|
||||||
assert "top_cluster_label" in out.signals_snapshot
|
public_keys = {k for k in out.signals_snapshot if not k.startswith("_")}
|
||||||
assert "cluster_count" in out.signals_snapshot
|
assert public_keys == {"cluster_count", "clusters"}
|
||||||
assert "strategy" in out.signals_snapshot
|
assert isinstance(out.signals_snapshot["clusters"], list)
|
||||||
|
|
||||||
|
def test_no_inferred_params(self):
|
||||||
# ── focus-area: preferred_areas inference from task_completions (#113) ────────
|
|
||||||
|
|
||||||
class TestFocusAreaPreferredAreasInference:
|
|
||||||
from ml.agents.focus_area import MANIFEST as _FA_MANIFEST
|
|
||||||
|
|
||||||
def _completion(self, project_id: str) -> TaskCompletion:
|
|
||||||
return _completion(project_id, lateness_days=0.0)
|
|
||||||
|
|
||||||
def test_cold_start_no_completions(self):
|
|
||||||
history = _history(completions=[])
|
|
||||||
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
|
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
|
||||||
result = run_inference(FA_MANIFEST, history)
|
assert FA_MANIFEST.inferred_params == []
|
||||||
assert result["preferred_areas"] == []
|
|
||||||
|
|
||||||
def test_top_two_projects_returned(self):
|
|
||||||
completions = (
|
|
||||||
[_completion("p1", 0)] * 8
|
|
||||||
+ [_completion("p2", 0)] * 5
|
|
||||||
+ [_completion("p3", 0)] * 2
|
|
||||||
)
|
|
||||||
history = _history(completions=completions)
|
|
||||||
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
|
|
||||||
result = run_inference(FA_MANIFEST, history)
|
|
||||||
assert result["preferred_areas"] == ["p1", "p2"]
|
|
||||||
|
|
||||||
def test_single_project_returns_one(self):
|
|
||||||
completions = [_completion("work", 0)] * 6
|
|
||||||
history = _history(completions=completions)
|
|
||||||
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
|
|
||||||
result = run_inference(FA_MANIFEST, history)
|
|
||||||
assert result["preferred_areas"] == ["work"]
|
|
||||||
|
|
||||||
def test_none_project_id_ignored(self):
|
|
||||||
completions = [_completion(None, 0)] * 5 + [_completion("real", 0)] * 3
|
|
||||||
history = _history(completions=completions)
|
|
||||||
from ml.agents.focus_area import MANIFEST as FA_MANIFEST
|
|
||||||
result = run_inference(FA_MANIFEST, history)
|
|
||||||
assert result["preferred_areas"] == ["real"]
|
|
||||||
|
|||||||
@@ -126,7 +126,7 @@ MANIFEST = AgentManifest(
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
context_schema=["profile.features"],
|
context_schema=["profile.features"],
|
||||||
required_consents=["data:core", "agent:time-of-day"],
|
required_consents=["data:core"],
|
||||||
output_contract={"type": "snippet", "format": "free_text"},
|
output_contract={"type": "snippet", "format": "free_text"},
|
||||||
ttl_sec=900,
|
ttl_sec=900,
|
||||||
inferred_params=[
|
inferred_params=[
|
||||||
|
|||||||
@@ -196,6 +196,12 @@ class AgentComputeRequest(BaseModel):
|
|||||||
now_iso: Optional[str] = None # ISO 8601; defaults to utcnow
|
now_iso: Optional[str] = None # ISO 8601; defaults to utcnow
|
||||||
# Per-agent prefs from user_preferences (merged: user source overrides inferred).
|
# Per-agent prefs from user_preferences (merged: user source overrides inferred).
|
||||||
agent_prefs: dict = {}
|
agent_prefs: dict = {}
|
||||||
|
# Pre-fetched enrichment cache: {content_hash -> description}. Avoids re-calling
|
||||||
|
# LiteLLM for task titles already expanded in a prior compute cycle.
|
||||||
|
enrichment_cache: dict[str, str] = {}
|
||||||
|
# MD5 of sorted task contents; stored in snapshot so the next cycle can skip
|
||||||
|
# recompute when the task list hasn't changed.
|
||||||
|
task_hash: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
class AgentComputeResponse(BaseModel):
|
class AgentComputeResponse(BaseModel):
|
||||||
@@ -206,6 +212,8 @@ class AgentComputeResponse(BaseModel):
|
|||||||
computed_at: str
|
computed_at: str
|
||||||
expires_at: str
|
expires_at: str
|
||||||
agent_version: str
|
agent_version: str
|
||||||
|
# New enrichments generated during this compute cycle; caller persists to DB.
|
||||||
|
new_enrichments: dict[str, str] = {}
|
||||||
|
|
||||||
|
|
||||||
class AgentInferRequest(BaseModel):
|
class AgentInferRequest(BaseModel):
|
||||||
@@ -233,6 +241,7 @@ class RecommendRequest(BaseModel):
|
|||||||
hour_of_day: int = 12
|
hour_of_day: int = 12
|
||||||
day_of_week: int = 0
|
day_of_week: int = 0
|
||||||
science_destiny: int = 50 # 0=science (data-driven), 100=destiny (intuitive)
|
science_destiny: int = 50 # 0=science (data-driven), 100=destiny (intuitive)
|
||||||
|
recent_tip: Optional[str] = None # content of last snoozed tip; LLM avoids repeating it
|
||||||
|
|
||||||
|
|
||||||
class TipResult(BaseModel):
|
class TipResult(BaseModel):
|
||||||
@@ -313,6 +322,7 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute
|
|||||||
feedback_history=req.feedback_history,
|
feedback_history=req.feedback_history,
|
||||||
now=now,
|
now=now,
|
||||||
agent_prefs=req.agent_prefs,
|
agent_prefs=req.agent_prefs,
|
||||||
|
enrichment_cache=req.enrichment_cache,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
output = agent.compute(inp)
|
output = agent.compute(inp)
|
||||||
@@ -320,6 +330,10 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute
|
|||||||
log.error("agent_compute_failed", agent_id=agent_id, user_id=req.user_id, error=str(exc))
|
log.error("agent_compute_failed", agent_id=agent_id, user_id=req.user_id, error=str(exc))
|
||||||
raise HTTPException(status_code=500, detail=f"Agent compute failed: {exc}")
|
raise HTTPException(status_code=500, detail=f"Agent compute failed: {exc}")
|
||||||
|
|
||||||
|
if req.task_hash:
|
||||||
|
output.signals_snapshot["_task_hash"] = req.task_hash
|
||||||
|
new_enrichments: dict[str, str] = output.signals_snapshot.pop("_new_enrichments", {})
|
||||||
|
|
||||||
log.info("agent_computed", agent_id=agent_id, user_id=req.user_id, expires_at=output.expires_at)
|
log.info("agent_computed", agent_id=agent_id, user_id=req.user_id, expires_at=output.expires_at)
|
||||||
span = _start_span(
|
span = _start_span(
|
||||||
f"compute:{agent_id}",
|
f"compute:{agent_id}",
|
||||||
@@ -338,6 +352,7 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute
|
|||||||
computed_at=output.computed_at,
|
computed_at=output.computed_at,
|
||||||
expires_at=output.expires_at,
|
expires_at=output.expires_at,
|
||||||
agent_version=output.agent_version,
|
agent_version=output.agent_version,
|
||||||
|
new_enrichments=new_enrichments,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -430,6 +445,7 @@ async def recommend(req: RecommendRequest) -> RecommendResponse:
|
|||||||
hour_of_day=req.hour_of_day,
|
hour_of_day=req.hour_of_day,
|
||||||
day_of_week=req.day_of_week,
|
day_of_week=req.day_of_week,
|
||||||
science_destiny=req.science_destiny,
|
science_destiny=req.science_destiny,
|
||||||
|
recent_tip=req.recent_tip,
|
||||||
)
|
)
|
||||||
_end_span(ctx_span, outputs={"message_count": len(messages)})
|
_end_span(ctx_span, outputs={"message_count": len(messages)})
|
||||||
|
|
||||||
|
|||||||
@@ -161,16 +161,21 @@ def build_orchestrator_messages(
|
|||||||
hour_of_day: int,
|
hour_of_day: int,
|
||||||
day_of_week: int,
|
day_of_week: int,
|
||||||
science_destiny: int = 50,
|
science_destiny: int = 50,
|
||||||
|
recent_tip: str | None = None,
|
||||||
) -> list[dict]:
|
) -> list[dict]:
|
||||||
"""Build the [system, user] message list for the orchestrator LLM call.
|
"""Build the [system, user] message list for the orchestrator LLM call.
|
||||||
|
|
||||||
agent_outputs: list of {agent_id, prompt_text} dicts.
|
agent_outputs: list of {agent_id, prompt_text} dicts.
|
||||||
Falls back to raw task summary when agent_outputs is empty.
|
Falls back to raw task summary when agent_outputs is empty.
|
||||||
|
recent_tip: content of a tip the user just snoozed — generate something different.
|
||||||
"""
|
"""
|
||||||
style_hint = _science_destiny_instruction(science_destiny)
|
style_hint = _science_destiny_instruction(science_destiny)
|
||||||
system = _SYS_V4_ORCHESTRATOR + (f"\n\n{style_hint}" if style_hint else "")
|
system = _SYS_V4_ORCHESTRATOR + (f"\n\n{style_hint}" if style_hint else "")
|
||||||
|
|
||||||
lines = [f"Current time: {hour_of_day:02d}:00, day_of_week={day_of_week}", ""]
|
lines = [f"Current time: {hour_of_day:02d}:00, day_of_week={day_of_week}", ""]
|
||||||
|
if recent_tip:
|
||||||
|
lines.append(f"The user snoozed this tip (do NOT repeat it or anything similar): \"{recent_tip}\"")
|
||||||
|
lines.append("")
|
||||||
if agent_outputs:
|
if agent_outputs:
|
||||||
lines.append("Context from analysis agents:")
|
lines.append("Context from analysis agents:")
|
||||||
for s in agent_outputs:
|
for s in agent_outputs:
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
export type IntegrationProvider = 'todoist';
|
export type IntegrationProvider = 'todoist' | 'google-health';
|
||||||
export type IntegrationStatus = 'connected' | 'disconnected' | 'error';
|
export type IntegrationStatus = 'connected' | 'disconnected' | 'error';
|
||||||
|
|
||||||
export interface Integration {
|
export interface Integration {
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
export interface Signal {
|
export interface Signal {
|
||||||
id: string;
|
id: string;
|
||||||
source: string; // e.g. 'todoist', 'google-calendar', 'manual'
|
source: string; // e.g. 'todoist', 'google-calendar', 'manual'
|
||||||
kind: 'task' | 'event' | 'habit' | 'insight';
|
kind: 'task' | 'event' | 'habit' | 'insight' | 'health';
|
||||||
content: string;
|
content: string;
|
||||||
metadata: Record<string, unknown>; // source-specific raw fields
|
metadata: Record<string, unknown>; // source-specific raw fields
|
||||||
features: Record<string, number | boolean>; // bandit-ready numeric/boolean features
|
features: Record<string, number | boolean>; // bandit-ready numeric/boolean features
|
||||||
|
|||||||
@@ -85,3 +85,45 @@ describe('runMigrations — idempotency', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('runMigrations — issue #127 backfill', () => {
|
||||||
|
it('grants data:<provider> consent for existing active integration tokens', () => {
|
||||||
|
const sqlite = freshDb();
|
||||||
|
runMigrations(sqlite);
|
||||||
|
|
||||||
|
// Seed a user + active Todoist token (simulates pre-#127 state)
|
||||||
|
sqlite.exec(`
|
||||||
|
INSERT INTO users (id, email, role, created_at) VALUES ('u2', 'u2@test.com', 'user', '2026-01-01T00:00:00Z');
|
||||||
|
INSERT INTO user_consents (user_id, consent_key, granted_at) VALUES ('u2', 'data:core', '2026-01-01T00:00:00Z');
|
||||||
|
INSERT INTO integration_tokens (id, user_id, provider, access_token, token_status, connected_at)
|
||||||
|
VALUES ('tok1', 'u2', 'todoist', 'secret', 'active', '2026-01-02T00:00:00Z');
|
||||||
|
`);
|
||||||
|
|
||||||
|
// Re-run migrations — the backfill should insert data:todoist
|
||||||
|
runMigrations(sqlite);
|
||||||
|
|
||||||
|
const rows = sqlite
|
||||||
|
.prepare(`SELECT consent_key FROM user_consents WHERE user_id = 'u2' ORDER BY consent_key`)
|
||||||
|
.all() as { consent_key: string }[];
|
||||||
|
expect(rows.map((r) => r.consent_key)).toEqual(['data:core', 'data:todoist']);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('is idempotent — running twice does not duplicate consent rows', () => {
|
||||||
|
const sqlite = freshDb();
|
||||||
|
runMigrations(sqlite);
|
||||||
|
|
||||||
|
sqlite.exec(`
|
||||||
|
INSERT INTO users (id, email, role, created_at) VALUES ('u3', 'u3@test.com', 'user', '2026-01-01T00:00:00Z');
|
||||||
|
INSERT INTO integration_tokens (id, user_id, provider, access_token, token_status, connected_at)
|
||||||
|
VALUES ('tok2', 'u3', 'todoist', 'secret', 'active', '2026-01-02T00:00:00Z');
|
||||||
|
`);
|
||||||
|
|
||||||
|
runMigrations(sqlite);
|
||||||
|
runMigrations(sqlite);
|
||||||
|
|
||||||
|
const count = (sqlite
|
||||||
|
.prepare(`SELECT COUNT(*) as n FROM user_consents WHERE user_id = 'u3' AND consent_key = 'data:todoist'`)
|
||||||
|
.get() as { n: number }).n;
|
||||||
|
expect(count).toBe(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -149,6 +149,13 @@ export function runMigrations(handle: BetterSqlite3Database) {
|
|||||||
CREATE INDEX IF NOT EXISTS idx_agent_outputs_user_agent_exp
|
CREATE INDEX IF NOT EXISTS idx_agent_outputs_user_agent_exp
|
||||||
ON agent_outputs(user_id, agent_id, expires_at DESC);
|
ON agent_outputs(user_id, agent_id, expires_at DESC);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS task_enrichments (
|
||||||
|
content_hash TEXT PRIMARY KEY,
|
||||||
|
description TEXT NOT NULL,
|
||||||
|
model TEXT NOT NULL DEFAULT 'tip-generator',
|
||||||
|
created_at TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS user_preferences (
|
CREATE TABLE IF NOT EXISTS user_preferences (
|
||||||
user_id TEXT NOT NULL REFERENCES users(id),
|
user_id TEXT NOT NULL REFERENCES users(id),
|
||||||
scope TEXT NOT NULL,
|
scope TEXT NOT NULL,
|
||||||
@@ -208,6 +215,15 @@ export function runMigrations(handle: BetterSqlite3Database) {
|
|||||||
`);
|
`);
|
||||||
} catch { /* column already dropped — nothing to backfill */ }
|
} catch { /* column already dropped — nothing to backfill */ }
|
||||||
|
|
||||||
|
// Backfill (issue #127): grant data:<provider> consent for every active integration token.
|
||||||
|
// Idempotent — INSERT OR IGNORE skips rows that already exist.
|
||||||
|
handle.exec(`
|
||||||
|
INSERT OR IGNORE INTO user_consents (user_id, consent_key, granted_at)
|
||||||
|
SELECT user_id, 'data:' || provider, connected_at
|
||||||
|
FROM integration_tokens
|
||||||
|
WHERE token_status = 'active'
|
||||||
|
`);
|
||||||
|
|
||||||
// Drop legacy consent columns (ADR-0014 step 8). Runs after the backfill above.
|
// Drop legacy consent columns (ADR-0014 step 8). Runs after the backfill above.
|
||||||
// Silently skips if already dropped (column not found error) or never existed (new DB).
|
// Silently skips if already dropped (column not found error) or never existed (new DB).
|
||||||
for (const stmt of [
|
for (const stmt of [
|
||||||
|
|||||||
@@ -189,6 +189,15 @@ export const agentOutputs = sqliteTable('agent_outputs', {
|
|||||||
agentVersion: text('agent_version').notNull(), // bump to invalidate on logic changes
|
agentVersion: text('agent_version').notNull(), // bump to invalidate on logic changes
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Persistent cache for LLM-enriched task descriptions used by clustering.
|
||||||
|
// Keyed by MD5 of raw task content; avoids re-calling LiteLLM on every agent compute cycle.
|
||||||
|
export const taskEnrichments = sqliteTable('task_enrichments', {
|
||||||
|
contentHash: text('content_hash').primaryKey(),
|
||||||
|
description: text('description').notNull(),
|
||||||
|
model: text('model').notNull().default('tip-generator'),
|
||||||
|
createdAt: text('created_at').notNull(),
|
||||||
|
});
|
||||||
|
|
||||||
// Admin saved SQL queries.
|
// Admin saved SQL queries.
|
||||||
export const savedQueries = sqliteTable('saved_queries', {
|
export const savedQueries = sqliteTable('saved_queries', {
|
||||||
id: text('id').primaryKey(),
|
id: text('id').primaryKey(),
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
/**
|
/**
|
||||||
* Registry-driven agent eligibility filter (ADR-0014 step 5).
|
* Registry-driven agent eligibility filter (ADR-0014 step 5, updated by ADR-0015).
|
||||||
*
|
*
|
||||||
* Rules (all must pass for an agent to be eligible):
|
* Rules (all must pass for an agent to be eligible):
|
||||||
* 1. All required_consents are granted and not revoked.
|
* 1. Every data:<source> in required_consents is granted and not revoked.
|
||||||
|
* Consent is granted automatically when the user connects that data source.
|
||||||
|
* agent:<id> consents no longer exist — per-agent control is a preference (rule 3).
|
||||||
* 2. No silenced_in_contexts entry matches an active context.
|
* 2. No silenced_in_contexts entry matches an active context.
|
||||||
* 3. user_preferences[scope='agent:<id>', key='enabled'] is not false.
|
* 3. user_preferences[scope='agent:<id>', key='enabled'] is not false.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -1,17 +1,19 @@
|
|||||||
import { Router, type Request, type Response, type IRouter } from 'express';
|
import { Router, type Request, type Response, type IRouter } from 'express';
|
||||||
import { nanoid } from 'nanoid';
|
import { nanoid } from 'nanoid';
|
||||||
import { db } from '../db/index.js';
|
import { db } from '../db/index.js';
|
||||||
import { agentOutputs, tipFeedback, tipViews, userPreferences } from '../db/schema.js';
|
import { agentOutputs, tipFeedback, tipViews, userPreferences, taskEnrichments } from '../db/schema.js';
|
||||||
import { eq, and, gt, lt } from 'drizzle-orm';
|
import { eq, and, gt, lt, inArray } from 'drizzle-orm';
|
||||||
|
import crypto from 'node:crypto';
|
||||||
import { config } from '../config.js';
|
import { config } from '../config.js';
|
||||||
import { getProfile, type Profile } from '../profile/builder.js';
|
import { getProfile, type Profile } from '../profile/builder.js';
|
||||||
import { todoistSource } from '../signals/todoist.js';
|
import { todoistSource } from '../signals/todoist.js';
|
||||||
|
import { googleHealthSource } from '../signals/google-health.js';
|
||||||
import { SignalAggregator } from '../signals/aggregator.js';
|
import { SignalAggregator } from '../signals/aggregator.js';
|
||||||
|
|
||||||
const router: IRouter = Router();
|
const router: IRouter = Router();
|
||||||
|
|
||||||
// Separate aggregator instance — avoids circular dep with recommender.ts.
|
// Separate aggregator instance — avoids circular dep with recommender.ts.
|
||||||
const _agentAggregator = new SignalAggregator().register(todoistSource);
|
const _agentAggregator = new SignalAggregator().register(todoistSource).register(googleHealthSource);
|
||||||
|
|
||||||
// ── Internal auth helper ──────────────────────────────────────────────────────
|
// ── Internal auth helper ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
@@ -26,6 +28,33 @@ function checkInternalToken(req: Request, res: Response): boolean {
|
|||||||
|
|
||||||
// ── DB helpers ────────────────────────────────────────────────────────────────
|
// ── DB helpers ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
function contentHash(text: string): string {
|
||||||
|
return crypto.createHash('md5').update(text).digest('hex');
|
||||||
|
}
|
||||||
|
|
||||||
|
async function fetchEnrichmentCache(tasks: { content?: string }[]): Promise<Record<string, string>> {
|
||||||
|
const hashes = tasks
|
||||||
|
.map((t) => t.content?.trim())
|
||||||
|
.filter((c): c is string => !!c)
|
||||||
|
.map(contentHash);
|
||||||
|
if (!hashes.length) return {};
|
||||||
|
const rows = await db
|
||||||
|
.select({ contentHash: taskEnrichments.contentHash, description: taskEnrichments.description })
|
||||||
|
.from(taskEnrichments)
|
||||||
|
.where(inArray(taskEnrichments.contentHash, hashes));
|
||||||
|
return Object.fromEntries(rows.map((r) => [r.contentHash, r.description]));
|
||||||
|
}
|
||||||
|
|
||||||
|
async function persistEnrichments(newEntries: Record<string, string>): Promise<void> {
|
||||||
|
const now = new Date().toISOString();
|
||||||
|
for (const [hash, description] of Object.entries(newEntries)) {
|
||||||
|
await db
|
||||||
|
.insert(taskEnrichments)
|
||||||
|
.values({ contentHash: hash, description, createdAt: now })
|
||||||
|
.onConflictDoNothing();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function getActiveAgentOutputs(userId: string) {
|
export async function getActiveAgentOutputs(userId: string) {
|
||||||
const now = new Date().toISOString();
|
const now = new Date().toISOString();
|
||||||
return db
|
return db
|
||||||
@@ -126,22 +155,52 @@ async function persistInferredPrefs(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function taskListHash(tasks: { content?: string }[]): string {
|
||||||
|
const sorted = tasks
|
||||||
|
.map((t) => t.content?.trim() ?? '')
|
||||||
|
.filter(Boolean)
|
||||||
|
.sort()
|
||||||
|
.join('\n');
|
||||||
|
return crypto.createHash('md5').update(sorted).digest('hex');
|
||||||
|
}
|
||||||
|
|
||||||
|
async function isUpToDate(userId: string, agentId: string, currentHash: string): Promise<boolean> {
|
||||||
|
const rows = await db
|
||||||
|
.select({ signalsSnapshot: agentOutputs.signalsSnapshot })
|
||||||
|
.from(agentOutputs)
|
||||||
|
.where(and(eq(agentOutputs.userId, userId), eq(agentOutputs.agentId, agentId)))
|
||||||
|
.limit(1);
|
||||||
|
if (!rows.length) return false;
|
||||||
|
try {
|
||||||
|
const snapshot = JSON.parse(rows[0].signalsSnapshot ?? '{}') as { _task_hash?: string };
|
||||||
|
return snapshot._task_hash === currentHash;
|
||||||
|
} catch { return false; }
|
||||||
|
}
|
||||||
|
|
||||||
export async function computeAndStore(userId: string, agentId: string): Promise<void> {
|
export async function computeAndStore(userId: string, agentId: string): Promise<void> {
|
||||||
let tasks: object[] = [];
|
let tasks: object[] = [];
|
||||||
try {
|
try {
|
||||||
const signals = await _agentAggregator.fetchAll(userId);
|
const signals = await _agentAggregator.fetchAll(userId);
|
||||||
tasks = signals.map((s) => ({
|
tasks = signals.map((s) => ({
|
||||||
id: s.id,
|
id: s.id,
|
||||||
|
source: s.source,
|
||||||
|
kind: s.kind,
|
||||||
content: s.content,
|
content: s.content,
|
||||||
|
// Task-specific fields (default to harmless values for non-task signals)
|
||||||
priority: (s.features.priority as number) ?? 1,
|
priority: (s.features.priority as number) ?? 1,
|
||||||
is_overdue: Boolean(s.features.is_overdue),
|
is_overdue: Boolean(s.features.is_overdue),
|
||||||
task_age_days: (s.features.task_age_days as number) ?? 0,
|
task_age_days: (s.features.task_age_days as number) ?? 0,
|
||||||
project_id: (s.metadata as Record<string, unknown>).project_id ?? null,
|
project_id: (s.metadata as Record<string, unknown>).project_id ?? null,
|
||||||
|
// All features spread so source-specific agents (e.g. health-vitals) can read them
|
||||||
|
...s.features,
|
||||||
}));
|
}));
|
||||||
} catch {
|
} catch {
|
||||||
// No integration or fetch error — agents that need tasks will report "no tasks"
|
// No integration or fetch error — agents that need tasks will report "no tasks"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const currentTaskHash = taskListHash(tasks as { content?: string }[]);
|
||||||
|
if (await isUpToDate(userId, agentId, currentTaskHash)) return;
|
||||||
|
|
||||||
let profile: Profile = {};
|
let profile: Profile = {};
|
||||||
try {
|
try {
|
||||||
profile = await getProfile(userId);
|
profile = await getProfile(userId);
|
||||||
@@ -162,10 +221,13 @@ export async function computeAndStore(userId: string, agentId: string): Promise<
|
|||||||
// Load agent prefs (user overrides + previous inferences) to inject into the compute call.
|
// Load agent prefs (user overrides + previous inferences) to inject into the compute call.
|
||||||
const agentPrefs = await loadAgentPrefs(userId, agentId);
|
const agentPrefs = await loadAgentPrefs(userId, agentId);
|
||||||
|
|
||||||
|
// Fetch enrichment cache for task titles present in this compute call.
|
||||||
|
const enrichmentCache = await fetchEnrichmentCache(tasks as { content?: string }[]);
|
||||||
|
|
||||||
const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, {
|
const mlResp = await fetch(`${config.ML_SERVING_URL}/agents/${agentId}/compute`, {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: { 'Content-Type': 'application/json' },
|
headers: { 'Content-Type': 'application/json' },
|
||||||
body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs }),
|
body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs, enrichment_cache: enrichmentCache, task_hash: currentTaskHash }),
|
||||||
signal: AbortSignal.timeout(60_000),
|
signal: AbortSignal.timeout(60_000),
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -177,10 +239,16 @@ export async function computeAndStore(userId: string, agentId: string): Promise<
|
|||||||
const output = await mlResp.json() as {
|
const output = await mlResp.json() as {
|
||||||
user_id: string; agent_id: string; prompt_text: string;
|
user_id: string; agent_id: string; prompt_text: string;
|
||||||
signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string;
|
signals_snapshot: unknown; computed_at: string; expires_at: string; agent_version: string;
|
||||||
|
new_enrichments?: Record<string, string>;
|
||||||
};
|
};
|
||||||
|
|
||||||
await storeAgentOutput(output);
|
await storeAgentOutput(output);
|
||||||
|
|
||||||
|
// Persist any new enrichments produced during this compute cycle.
|
||||||
|
if (output.new_enrichments && Object.keys(output.new_enrichments).length > 0) {
|
||||||
|
await persistEnrichments(output.new_enrichments);
|
||||||
|
}
|
||||||
|
|
||||||
// Run inference framework for this agent and persist results.
|
// Run inference framework for this agent and persist results.
|
||||||
// Failures are non-fatal — the compute result is already stored.
|
// Failures are non-fatal — the compute result is already stored.
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import { type Router as ExpressRouter, Router, Request, Response } from 'express';
|
import { type Router as ExpressRouter, Router, Request, Response } from 'express';
|
||||||
import { nanoid } from 'nanoid';
|
import { nanoid } from 'nanoid';
|
||||||
import { db } from '../db/index.js';
|
import { db } from '../db/index.js';
|
||||||
import { integrationTokens } from '../db/schema.js';
|
import { integrationTokens, userConsents } from '../db/schema.js';
|
||||||
import { eq, and } from 'drizzle-orm';
|
import { eq, and } from 'drizzle-orm';
|
||||||
import { config } from '../config.js';
|
import { config } from '../config.js';
|
||||||
import { requireAuth, AuthenticatedRequest } from '../middleware/session.js';
|
import { requireAuth, AuthenticatedRequest } from '../middleware/session.js';
|
||||||
@@ -12,9 +12,49 @@ const TODOIST_OAUTH_URL = 'https://todoist.com/oauth/authorize';
|
|||||||
const TODOIST_TOKEN_URL = 'https://todoist.com/oauth/access_token';
|
const TODOIST_TOKEN_URL = 'https://todoist.com/oauth/access_token';
|
||||||
const TODOIST_SCOPES = 'data:read_write';
|
const TODOIST_SCOPES = 'data:read_write';
|
||||||
|
|
||||||
|
const GOOGLE_AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth';
|
||||||
|
const GOOGLE_TOKEN_URL = 'https://oauth2.googleapis.com/token';
|
||||||
|
const GOOGLE_REVOKE_URL = 'https://oauth2.googleapis.com/revoke';
|
||||||
|
|
||||||
|
const GOOGLE_HEALTH_SCOPES = [
|
||||||
|
'https://www.googleapis.com/auth/fitness.activity.read',
|
||||||
|
'https://www.googleapis.com/auth/fitness.body.read',
|
||||||
|
'https://www.googleapis.com/auth/fitness.sleep.read',
|
||||||
|
'https://www.googleapis.com/auth/fitness.heart_rate.read',
|
||||||
|
'https://www.googleapis.com/auth/fitness.nutrition.read',
|
||||||
|
'https://www.googleapis.com/auth/fitness.location.read',
|
||||||
|
'https://www.googleapis.com/auth/fitness.blood_glucose.read',
|
||||||
|
'https://www.googleapis.com/auth/fitness.blood_pressure.read',
|
||||||
|
'https://www.googleapis.com/auth/fitness.body_temperature.read',
|
||||||
|
'https://www.googleapis.com/auth/fitness.oxygen_saturation.read',
|
||||||
|
'https://www.googleapis.com/auth/fitness.reproductive_health.read',
|
||||||
|
].join(' ');
|
||||||
|
|
||||||
// In-memory CSRF state store
|
// In-memory CSRF state store
|
||||||
const pendingStates = new Map<string, { userId: string; redirectTo: string }>();
|
const pendingStates = new Map<string, { userId: string; redirectTo: string }>();
|
||||||
|
|
||||||
|
async function grantDataSourceConsent(userId: string, provider: string): Promise<void> {
|
||||||
|
const consentKey = `data:${provider}`;
|
||||||
|
const now = new Date().toISOString();
|
||||||
|
await db.insert(userConsents)
|
||||||
|
.values({ userId, consentKey, grantedAt: now, revokedAt: null })
|
||||||
|
.onConflictDoUpdate({
|
||||||
|
target: [userConsents.userId, userConsents.consentKey],
|
||||||
|
set: { grantedAt: now, revokedAt: null },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async function revokeDataSourceConsent(userId: string, provider: string): Promise<void> {
|
||||||
|
const consentKey = `data:${provider}`;
|
||||||
|
const now = new Date().toISOString();
|
||||||
|
await db.insert(userConsents)
|
||||||
|
.values({ userId, consentKey, grantedAt: now, revokedAt: now })
|
||||||
|
.onConflictDoUpdate({
|
||||||
|
target: [userConsents.userId, userConsents.consentKey],
|
||||||
|
set: { revokedAt: now },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/** GET /api/integrations — list connected integrations */
|
/** GET /api/integrations — list connected integrations */
|
||||||
router.get('/', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
|
router.get('/', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
|
||||||
const tokens = await db
|
const tokens = await db
|
||||||
@@ -100,10 +140,102 @@ router.get('/todoist/callback', async (req: Request, res: Response) => {
|
|||||||
tokenStatus: 'active',
|
tokenStatus: 'active',
|
||||||
connectedAt: now,
|
connectedAt: now,
|
||||||
});
|
});
|
||||||
|
await grantDataSourceConsent(pending.userId, 'todoist');
|
||||||
|
|
||||||
res.redirect(`${config.WEB_BASE_URL}${pending.redirectTo}?connected=todoist`);
|
res.redirect(`${config.WEB_BASE_URL}${pending.redirectTo}?connected=todoist`);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
/** GET /api/integrations/google-health/connect — start Google Fit OAuth */
|
||||||
|
router.get('/google-health/connect', requireAuth, (req: AuthenticatedRequest, res: Response) => {
|
||||||
|
const state = nanoid();
|
||||||
|
pendingStates.set(state, {
|
||||||
|
userId: req.userId!,
|
||||||
|
redirectTo: (req.query.redirectTo as string) ?? '/connect',
|
||||||
|
});
|
||||||
|
setTimeout(() => pendingStates.delete(state), 10 * 60 * 1000);
|
||||||
|
|
||||||
|
const url = new URL(GOOGLE_AUTH_URL);
|
||||||
|
url.searchParams.set('client_id', config.GOOGLE_CLIENT_ID);
|
||||||
|
url.searchParams.set('redirect_uri', `${config.API_BASE_URL}/api/integrations/google-health/callback`);
|
||||||
|
url.searchParams.set('response_type', 'code');
|
||||||
|
url.searchParams.set('scope', GOOGLE_HEALTH_SCOPES);
|
||||||
|
url.searchParams.set('state', state);
|
||||||
|
url.searchParams.set('access_type', 'offline');
|
||||||
|
url.searchParams.set('prompt', 'consent');
|
||||||
|
|
||||||
|
res.redirect(url.toString());
|
||||||
|
});
|
||||||
|
|
||||||
|
/** GET /api/integrations/google-health/callback — Google returns here */
|
||||||
|
router.get('/google-health/callback', async (req: Request, res: Response) => {
|
||||||
|
const state = req.query.state as string;
|
||||||
|
const code = req.query.code as string;
|
||||||
|
const error = req.query.error as string | undefined;
|
||||||
|
|
||||||
|
if (error) {
|
||||||
|
res.status(400).json({ error: `Google denied access: ${error}` });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const pending = pendingStates.get(state);
|
||||||
|
if (!pending) {
|
||||||
|
res.status(400).json({ error: 'Invalid or expired state' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
pendingStates.delete(state);
|
||||||
|
|
||||||
|
const body = new URLSearchParams({
|
||||||
|
client_id: config.GOOGLE_CLIENT_ID,
|
||||||
|
client_secret: config.GOOGLE_CLIENT_SECRET,
|
||||||
|
code,
|
||||||
|
grant_type: 'authorization_code',
|
||||||
|
redirect_uri: `${config.API_BASE_URL}/api/integrations/google-health/callback`,
|
||||||
|
});
|
||||||
|
|
||||||
|
const tokenRes = await fetch(GOOGLE_TOKEN_URL, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/x-www-form-urlencoded', Accept: 'application/json' },
|
||||||
|
body: body.toString(),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!tokenRes.ok) {
|
||||||
|
const detail = await tokenRes.text().catch(() => '');
|
||||||
|
res.status(502).json({ error: `Failed to exchange Google token: ${detail}` });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const tokenData = (await tokenRes.json()) as {
|
||||||
|
access_token: string;
|
||||||
|
refresh_token?: string;
|
||||||
|
expires_in: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
const now = new Date();
|
||||||
|
const expiresAt = new Date(now.getTime() + tokenData.expires_in * 1000).toISOString();
|
||||||
|
|
||||||
|
await db
|
||||||
|
.delete(integrationTokens)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(integrationTokens.userId, pending.userId),
|
||||||
|
eq(integrationTokens.provider, 'google-health'),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
await db.insert(integrationTokens).values({
|
||||||
|
id: nanoid(),
|
||||||
|
userId: pending.userId,
|
||||||
|
provider: 'google-health',
|
||||||
|
accessToken: tokenData.access_token,
|
||||||
|
refreshToken: tokenData.refresh_token ?? null,
|
||||||
|
expiresAt,
|
||||||
|
tokenStatus: 'active',
|
||||||
|
connectedAt: now.toISOString(),
|
||||||
|
});
|
||||||
|
await grantDataSourceConsent(pending.userId, 'google-health');
|
||||||
|
|
||||||
|
res.redirect(`${config.WEB_BASE_URL}${pending.redirectTo}?connected=google-health`);
|
||||||
|
});
|
||||||
|
|
||||||
/** DELETE /api/integrations/:provider — revoke token */
|
/** DELETE /api/integrations/:provider — revoke token */
|
||||||
router.delete('/:provider', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
|
router.delete('/:provider', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
|
||||||
const provider = String(req.params.provider);
|
const provider = String(req.params.provider);
|
||||||
@@ -120,13 +252,18 @@ router.delete('/:provider', requireAuth, async (req: AuthenticatedRequest, res:
|
|||||||
.limit(1);
|
.limit(1);
|
||||||
|
|
||||||
if (token?.provider === 'todoist') {
|
if (token?.provider === 'todoist') {
|
||||||
// Best-effort revocation
|
|
||||||
await fetch('https://api.todoist.com/sync/v9/access_tokens/revoke', {
|
await fetch('https://api.todoist.com/sync/v9/access_tokens/revoke', {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: { Authorization: `Bearer ${token.accessToken}` },
|
headers: { Authorization: `Bearer ${token.accessToken}` },
|
||||||
}).catch(() => {});
|
}).catch(() => {});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (token?.provider === 'google-health') {
|
||||||
|
await fetch(`${GOOGLE_REVOKE_URL}?token=${token.accessToken}`, { method: 'POST' }).catch(() => {});
|
||||||
|
}
|
||||||
|
|
||||||
|
await revokeDataSourceConsent(req.userId!, provider);
|
||||||
|
|
||||||
await db
|
await db
|
||||||
.delete(integrationTokens)
|
.delete(integrationTokens)
|
||||||
.where(
|
.where(
|
||||||
|
|||||||
@@ -7,9 +7,10 @@ import { eq, and, desc } from 'drizzle-orm';
|
|||||||
import { requireAuth, AuthenticatedRequest } from '../middleware/session.js';
|
import { requireAuth, AuthenticatedRequest } from '../middleware/session.js';
|
||||||
import { config } from '../config.js';
|
import { config } from '../config.js';
|
||||||
import { bus } from '../events/bus.js';
|
import { bus } from '../events/bus.js';
|
||||||
import type { TipCandidate, Signal } from '@oo/shared-types';
|
import type { Tip, Signal } from '@oo/shared-types';
|
||||||
import { todoistSource, dueAgeDays } from '../signals/todoist.js';
|
import { todoistSource, dueAgeDays } from '../signals/todoist.js';
|
||||||
export { dueAgeDays };
|
export { dueAgeDays };
|
||||||
|
import { googleHealthSource } from '../signals/google-health.js';
|
||||||
import { SignalAggregator } from '../signals/aggregator.js';
|
import { SignalAggregator } from '../signals/aggregator.js';
|
||||||
import { getActiveAgentOutputs } from './agent-outputs.js';
|
import { getActiveAgentOutputs } from './agent-outputs.js';
|
||||||
import { getEligibleAgentIds } from '../profile/eligibility.js';
|
import { getEligibleAgentIds } from '../profile/eligibility.js';
|
||||||
@@ -19,35 +20,18 @@ const router: ExpressRouter = Router();
|
|||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// Signal aggregator — register sources here as new integrations are added
|
// Signal aggregator — register sources here as new integrations are added
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
export const aggregator = new SignalAggregator().register(todoistSource);
|
export const aggregator = new SignalAggregator().register(todoistSource).register(googleHealthSource);
|
||||||
export const _clearSignalCacheForTests = () => todoistSource.clearCache();
|
export const _clearSignalCacheForTests = () => {
|
||||||
|
todoistSource.clearCache();
|
||||||
// ---------------------------------------------------------------------------
|
googleHealthSource.clearCache();
|
||||||
// Signal → TipCandidate conversion
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
function signalToCandidate(signal: Signal): TipCandidate {
|
|
||||||
return {
|
|
||||||
id: signal.id,
|
|
||||||
content: signal.content,
|
|
||||||
source: signal.source as TipCandidate['source'],
|
|
||||||
kind: signal.kind as TipCandidate['kind'],
|
|
||||||
sourceId: (signal.metadata.todoistId as string | undefined) ?? undefined,
|
|
||||||
createdAt: signal.timestamp,
|
|
||||||
features: signal.features,
|
|
||||||
};
|
};
|
||||||
}
|
|
||||||
|
|
||||||
function randomPolicy(candidates: TipCandidate[]): TipCandidate | null {
|
|
||||||
if (!candidates.length) return null;
|
|
||||||
return candidates[Math.floor(Math.random() * candidates.length)];
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// Orchestrator: fetch agent snippets + call ml/serving /recommend
|
// Orchestrator: fetch agent snippets + call ml/serving /recommend
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
interface OrchestratorResult {
|
interface OrchestratorResult {
|
||||||
tip: TipCandidate;
|
tip: Tip;
|
||||||
model: string | null;
|
model: string | null;
|
||||||
agentIds: string[];
|
agentIds: string[];
|
||||||
}
|
}
|
||||||
@@ -68,6 +52,7 @@ async function fetchOrchestratorTip(
|
|||||||
hour: number,
|
hour: number,
|
||||||
dayOfWeek: number,
|
dayOfWeek: number,
|
||||||
traceparent?: string,
|
traceparent?: string,
|
||||||
|
recentTip?: string,
|
||||||
): Promise<OrchestratorResult | null> {
|
): Promise<OrchestratorResult | null> {
|
||||||
const [allAgentRows, eligibleIds, scienceDestiny] = await Promise.all([
|
const [allAgentRows, eligibleIds, scienceDestiny] = await Promise.all([
|
||||||
getActiveAgentOutputs(userId),
|
getActiveAgentOutputs(userId),
|
||||||
@@ -89,7 +74,7 @@ async function fetchOrchestratorTip(
|
|||||||
const res = await fetch(`${config.ML_SERVING_URL}/recommend`, {
|
const res = await fetch(`${config.ML_SERVING_URL}/recommend`, {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: { 'Content-Type': 'application/json', ...(traceparent ? { traceparent } : {}) },
|
headers: { 'Content-Type': 'application/json', ...(traceparent ? { traceparent } : {}) },
|
||||||
body: JSON.stringify({ user_id: userId, agent_outputs: agentOutputs, tasks, hour_of_day: hour, day_of_week: dayOfWeek, science_destiny: scienceDestiny ?? 50 }),
|
body: JSON.stringify({ user_id: userId, agent_outputs: agentOutputs, tasks, hour_of_day: hour, day_of_week: dayOfWeek, science_destiny: scienceDestiny ?? 50, recent_tip: recentTip ?? null }),
|
||||||
signal: AbortSignal.timeout(15_000),
|
signal: AbortSignal.timeout(15_000),
|
||||||
});
|
});
|
||||||
if (!res.ok) return null;
|
if (!res.ok) return null;
|
||||||
@@ -106,7 +91,6 @@ async function fetchOrchestratorTip(
|
|||||||
kind: 'advice' as const,
|
kind: 'advice' as const,
|
||||||
rationale: data.tip.rationale,
|
rationale: data.tip.rationale,
|
||||||
createdAt: now,
|
createdAt: now,
|
||||||
features: { is_overdue: false, task_age_days: 0, priority: 1 },
|
|
||||||
},
|
},
|
||||||
model: data.model ?? null,
|
model: data.model ?? null,
|
||||||
agentIds: agentOutputs.map((a) => a.agent_id),
|
agentIds: agentOutputs.map((a) => a.agent_id),
|
||||||
@@ -123,6 +107,7 @@ async function fetchOrchestratorTip(
|
|||||||
router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
|
router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Response) => {
|
||||||
const hour = new Date().getHours();
|
const hour = new Date().getHours();
|
||||||
const dayOfWeek = new Date().getDay();
|
const dayOfWeek = new Date().getDay();
|
||||||
|
const { recent_tip: recentTip } = req.body as { recent_tip?: string };
|
||||||
|
|
||||||
const anyToken = await db
|
const anyToken = await db
|
||||||
.select({ id: integrationTokens.id })
|
.select({ id: integrationTokens.id })
|
||||||
@@ -138,16 +123,16 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
|
|||||||
const signals = await aggregator.fetchAll(req.userId!);
|
const signals = await aggregator.fetchAll(req.userId!);
|
||||||
|
|
||||||
const t0 = Date.now();
|
const t0 = Date.now();
|
||||||
const orchestrated = await fetchOrchestratorTip(req.userId!, signals, hour, dayOfWeek, req.traceparent);
|
const orchestrated = await fetchOrchestratorTip(req.userId!, signals, hour, dayOfWeek, req.traceparent, recentTip);
|
||||||
const latencyMs = Date.now() - t0;
|
const latencyMs = Date.now() - t0;
|
||||||
|
|
||||||
const tip = orchestrated?.tip ?? randomPolicy(signals.map(signalToCandidate));
|
if (!orchestrated) {
|
||||||
if (!tip) {
|
|
||||||
res.status(204).end();
|
res.status(204).end();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const policy = orchestrated ? 'orchestrator' : 'random';
|
const tip = orchestrated.tip;
|
||||||
|
const policy = 'orchestrator';
|
||||||
const servedAt = new Date().toISOString();
|
const servedAt = new Date().toISOString();
|
||||||
|
|
||||||
await db.insert(tipViews).values({ id: nanoid(), userId: req.userId!, tipId: tip.id, servedAt });
|
await db.insert(tipViews).values({ id: nanoid(), userId: req.userId!, tipId: tip.id, servedAt });
|
||||||
@@ -158,16 +143,12 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
|
|||||||
tipId: tip.id,
|
tipId: tip.id,
|
||||||
policy,
|
policy,
|
||||||
mlScore: null,
|
mlScore: null,
|
||||||
featuresJson: JSON.stringify(
|
featuresJson: JSON.stringify({ agent_ids: orchestrated.agentIds, hour_of_day: hour, day_of_week: dayOfWeek }),
|
||||||
orchestrated
|
candidateCount: 1,
|
||||||
? { agent_ids: orchestrated.agentIds, hour_of_day: hour, day_of_week: dayOfWeek }
|
|
||||||
: { ...tip.features, hour_of_day: hour, day_of_week: dayOfWeek },
|
|
||||||
),
|
|
||||||
candidateCount: orchestrated ? 1 : signals.length,
|
|
||||||
latencyMs,
|
latencyMs,
|
||||||
servedAt,
|
servedAt,
|
||||||
promptVersion: orchestrated ? 'v4-orchestrator' : null,
|
promptVersion: 'v4-orchestrator',
|
||||||
llmModel: orchestrated ? orchestrated.model : null,
|
llmModel: orchestrated.model,
|
||||||
tipKind: tip.kind ?? null,
|
tipKind: tip.kind ?? null,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
312
services/api/src/signals/google-health.ts
Normal file
312
services/api/src/signals/google-health.ts
Normal file
@@ -0,0 +1,312 @@
|
|||||||
|
import type { Signal, SignalSource } from '@oo/shared-types';
|
||||||
|
import { db } from '../db/index.js';
|
||||||
|
import { integrationTokens } from '../db/schema.js';
|
||||||
|
import { eq, and } from 'drizzle-orm';
|
||||||
|
import { bus } from '../events/bus.js';
|
||||||
|
import { config } from '../config.js';
|
||||||
|
import { logger } from '../logger.js';
|
||||||
|
|
||||||
|
const CACHE_TTL_MS = 5 * 60_000;
|
||||||
|
const FIT_AGGREGATE_URL = 'https://www.googleapis.com/fitness/v1/users/me/dataset:aggregate';
|
||||||
|
const FIT_SESSIONS_URL = 'https://www.googleapis.com/fitness/v1/users/me/sessions';
|
||||||
|
const GOOGLE_TOKEN_URL = 'https://oauth2.googleapis.com/token';
|
||||||
|
|
||||||
|
const STEP_DAILY_GOAL = 7_000;
|
||||||
|
const SLEEP_GOAL_HOURS = 7;
|
||||||
|
|
||||||
|
interface FitBucket {
|
||||||
|
dataset: Array<{
|
||||||
|
dataSourceId: string;
|
||||||
|
point: Array<{ value: Array<{ intVal?: number; fpVal?: number }> }>;
|
||||||
|
}>;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface FitAggregateResponse {
|
||||||
|
bucket?: FitBucket[];
|
||||||
|
}
|
||||||
|
|
||||||
|
interface FitSession {
|
||||||
|
name: string;
|
||||||
|
startTimeMillis: string;
|
||||||
|
endTimeMillis: string;
|
||||||
|
activityType: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface FitSessionsResponse {
|
||||||
|
session?: FitSession[];
|
||||||
|
}
|
||||||
|
|
||||||
|
async function refreshGoogleToken(
|
||||||
|
userId: string,
|
||||||
|
refreshToken: string,
|
||||||
|
): Promise<string | null> {
|
||||||
|
const body = new URLSearchParams({
|
||||||
|
client_id: config.GOOGLE_CLIENT_ID,
|
||||||
|
client_secret: config.GOOGLE_CLIENT_SECRET,
|
||||||
|
refresh_token: refreshToken,
|
||||||
|
grant_type: 'refresh_token',
|
||||||
|
});
|
||||||
|
|
||||||
|
const res = await fetch(GOOGLE_TOKEN_URL, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
|
||||||
|
body: body.toString(),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!res.ok) return null;
|
||||||
|
|
||||||
|
const data = (await res.json()) as { access_token: string; expires_in: number };
|
||||||
|
const expiresAt = new Date(Date.now() + data.expires_in * 1000).toISOString();
|
||||||
|
|
||||||
|
await db
|
||||||
|
.update(integrationTokens)
|
||||||
|
.set({ accessToken: data.access_token, expiresAt, tokenStatus: 'active' })
|
||||||
|
.where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'google-health')));
|
||||||
|
|
||||||
|
return data.access_token;
|
||||||
|
}
|
||||||
|
|
||||||
|
function todayMidnightMs(): number {
|
||||||
|
const d = new Date();
|
||||||
|
d.setHours(0, 0, 0, 0);
|
||||||
|
return d.getTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
function yesterdayIso(): string {
|
||||||
|
return new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
|
||||||
|
}
|
||||||
|
|
||||||
|
async function fetchAggregates(
|
||||||
|
token: string,
|
||||||
|
startMs: number,
|
||||||
|
endMs: number,
|
||||||
|
): Promise<FitAggregateResponse> {
|
||||||
|
const res = await fetch(FIT_AGGREGATE_URL, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify({
|
||||||
|
aggregateBy: [
|
||||||
|
{ dataTypeName: 'com.google.step_count.delta' },
|
||||||
|
{ dataTypeName: 'com.google.calories.expended' },
|
||||||
|
{ dataTypeName: 'com.google.active_minutes' },
|
||||||
|
{ dataTypeName: 'com.google.heart_rate.bpm' },
|
||||||
|
],
|
||||||
|
bucketByTime: { durationMillis: endMs - startMs },
|
||||||
|
startTimeMillis: String(startMs),
|
||||||
|
endTimeMillis: String(endMs),
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
if (!res.ok) throw new Error(`Fit aggregate: ${res.status}`);
|
||||||
|
return res.json() as Promise<FitAggregateResponse>;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function fetchSleepSessions(token: string): Promise<FitSessionsResponse> {
|
||||||
|
const url = new URL(FIT_SESSIONS_URL);
|
||||||
|
url.searchParams.set('activityType', '72');
|
||||||
|
url.searchParams.set('startTime', yesterdayIso());
|
||||||
|
url.searchParams.set('endTime', new Date().toISOString());
|
||||||
|
const res = await fetch(url.toString(), {
|
||||||
|
headers: { Authorization: `Bearer ${token}` },
|
||||||
|
});
|
||||||
|
if (!res.ok) throw new Error(`Fit sessions: ${res.status}`);
|
||||||
|
return res.json() as Promise<FitSessionsResponse>;
|
||||||
|
}
|
||||||
|
|
||||||
|
function extractMetric(
|
||||||
|
bucket: FitBucket,
|
||||||
|
dataTypeName: string,
|
||||||
|
valueKey: 'intVal' | 'fpVal',
|
||||||
|
): number {
|
||||||
|
for (const ds of bucket.dataset) {
|
||||||
|
if (!ds.dataSourceId.includes(dataTypeName.replace('com.google.', '').replace('.', '_'))) continue;
|
||||||
|
for (const pt of ds.point) {
|
||||||
|
const v = pt.value[0];
|
||||||
|
if (v) return valueKey === 'intVal' ? (v.intVal ?? 0) : (v.fpVal ?? 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
function extractAnyMetric(
|
||||||
|
bucket: FitBucket,
|
||||||
|
typeSuffix: string,
|
||||||
|
valueKey: 'intVal' | 'fpVal',
|
||||||
|
): number {
|
||||||
|
for (const ds of bucket.dataset) {
|
||||||
|
if (!ds.dataSourceId.includes(typeSuffix)) continue;
|
||||||
|
const pt = ds.point[0];
|
||||||
|
if (pt?.value[0]) {
|
||||||
|
const v = pt.value[0];
|
||||||
|
return valueKey === 'intVal' ? (v.intVal ?? 0) : (v.fpVal ?? 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class GoogleHealthSignalSource implements SignalSource {
|
||||||
|
readonly id = 'google-health';
|
||||||
|
|
||||||
|
private cache = new Map<string, { signals: Signal[]; fetchedAt: number }>();
|
||||||
|
|
||||||
|
clearCache(userId?: string): void {
|
||||||
|
if (userId) this.cache.delete(userId);
|
||||||
|
else this.cache.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
async fetchSignals(userId: string): Promise<Signal[]> {
|
||||||
|
const entry = this.cache.get(userId);
|
||||||
|
if (entry && Date.now() - entry.fetchedAt < CACHE_TTL_MS) return entry.signals;
|
||||||
|
|
||||||
|
const [row] = await db
|
||||||
|
.select()
|
||||||
|
.from(integrationTokens)
|
||||||
|
.where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'google-health')))
|
||||||
|
.limit(1);
|
||||||
|
|
||||||
|
if (!row) return [];
|
||||||
|
|
||||||
|
let token = row.accessToken;
|
||||||
|
const isExpired = row.expiresAt && new Date(row.expiresAt).getTime() - Date.now() < 5 * 60_000;
|
||||||
|
|
||||||
|
if (isExpired && row.refreshToken) {
|
||||||
|
const refreshed = await refreshGoogleToken(userId, row.refreshToken);
|
||||||
|
if (!refreshed) {
|
||||||
|
logger.warn({ userId }, 'google-health: refresh failed');
|
||||||
|
await db
|
||||||
|
.update(integrationTokens)
|
||||||
|
.set({ tokenStatus: 'needs_reconnect' })
|
||||||
|
.where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'google-health')));
|
||||||
|
bus.publish('signals.integration.token_expired', {
|
||||||
|
userId,
|
||||||
|
provider: 'google-health',
|
||||||
|
detectedAt: new Date().toISOString(),
|
||||||
|
});
|
||||||
|
return entry?.signals ?? [];
|
||||||
|
}
|
||||||
|
token = refreshed;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const startMs = todayMidnightMs();
|
||||||
|
const endMs = Date.now();
|
||||||
|
|
||||||
|
const [aggData, sleepData] = await Promise.all([
|
||||||
|
fetchAggregates(token, startMs, endMs),
|
||||||
|
fetchSleepSessions(token),
|
||||||
|
]);
|
||||||
|
|
||||||
|
const bucket = aggData.bucket?.[0];
|
||||||
|
const signals: Signal[] = [];
|
||||||
|
const now = new Date().toISOString();
|
||||||
|
|
||||||
|
if (bucket) {
|
||||||
|
// Steps
|
||||||
|
const steps = extractAnyMetric(bucket, 'step_count', 'intVal');
|
||||||
|
const stepGoalPct = Math.round((steps / STEP_DAILY_GOAL) * 100);
|
||||||
|
signals.push({
|
||||||
|
id: `google-health:steps`,
|
||||||
|
source: 'google-health',
|
||||||
|
kind: 'health',
|
||||||
|
content: `${steps.toLocaleString()} steps today (${stepGoalPct}% of ${STEP_DAILY_GOAL.toLocaleString()} goal)`,
|
||||||
|
metadata: { dataType: 'steps' },
|
||||||
|
features: {
|
||||||
|
step_count: steps,
|
||||||
|
step_goal_pct: stepGoalPct,
|
||||||
|
step_goal: STEP_DAILY_GOAL,
|
||||||
|
below_step_goal: steps < STEP_DAILY_GOAL,
|
||||||
|
},
|
||||||
|
timestamp: now,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Calories + active minutes
|
||||||
|
const calories = Math.round(extractAnyMetric(bucket, 'calories', 'fpVal'));
|
||||||
|
const activeMinutes = extractAnyMetric(bucket, 'active_minutes', 'intVal');
|
||||||
|
signals.push({
|
||||||
|
id: `google-health:activity`,
|
||||||
|
source: 'google-health',
|
||||||
|
kind: 'health',
|
||||||
|
content: `${activeMinutes} active minutes, ${calories} calories burned today`,
|
||||||
|
metadata: { dataType: 'activity' },
|
||||||
|
features: {
|
||||||
|
active_minutes: activeMinutes,
|
||||||
|
calories_burned: calories,
|
||||||
|
sedentary: activeMinutes < 20,
|
||||||
|
},
|
||||||
|
timestamp: now,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Heart rate
|
||||||
|
const bpm = Math.round(extractAnyMetric(bucket, 'heart_rate', 'fpVal'));
|
||||||
|
if (bpm > 0) {
|
||||||
|
signals.push({
|
||||||
|
id: `google-health:heart_rate`,
|
||||||
|
source: 'google-health',
|
||||||
|
kind: 'health',
|
||||||
|
content: `Resting heart rate: ${bpm} bpm`,
|
||||||
|
metadata: { dataType: 'heart_rate' },
|
||||||
|
features: { resting_bpm: bpm, elevated_hr: bpm > 90 },
|
||||||
|
timestamp: now,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sleep — find the most recent sleep session
|
||||||
|
if (sleepData.session?.length) {
|
||||||
|
const sorted = [...sleepData.session].sort(
|
||||||
|
(a, b) => Number(b.endTimeMillis) - Number(a.endTimeMillis),
|
||||||
|
);
|
||||||
|
const last = sorted[0]!;
|
||||||
|
const durationMs = Number(last.endTimeMillis) - Number(last.startTimeMillis);
|
||||||
|
const sleepHours = Math.round((durationMs / 3_600_000) * 10) / 10;
|
||||||
|
const belowGoal = sleepHours < SLEEP_GOAL_HOURS;
|
||||||
|
signals.push({
|
||||||
|
id: `google-health:sleep`,
|
||||||
|
source: 'google-health',
|
||||||
|
kind: 'health',
|
||||||
|
content: `${sleepHours}h sleep last night (${belowGoal ? 'below' : 'meets'} ${SLEEP_GOAL_HOURS}h goal)`,
|
||||||
|
metadata: { dataType: 'sleep', sessionName: last.name },
|
||||||
|
features: {
|
||||||
|
sleep_hours: sleepHours,
|
||||||
|
sleep_goal_hours: SLEEP_GOAL_HOURS,
|
||||||
|
sleep_deficit_hours: Math.max(0, SLEEP_GOAL_HOURS - sleepHours),
|
||||||
|
below_sleep_goal: belowGoal,
|
||||||
|
},
|
||||||
|
timestamp: now,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
this.cache.set(userId, { signals, fetchedAt: Date.now() });
|
||||||
|
bus.publish('signals.task.synced', {
|
||||||
|
userId,
|
||||||
|
source: 'google-health',
|
||||||
|
count: signals.length,
|
||||||
|
syncedAt: now,
|
||||||
|
});
|
||||||
|
|
||||||
|
return signals;
|
||||||
|
} catch (err: unknown) {
|
||||||
|
const status = (err as { message?: string }).message;
|
||||||
|
if (status?.includes('401')) {
|
||||||
|
logger.warn({ userId }, 'google-health: token expired (401)');
|
||||||
|
if (row.refreshToken) {
|
||||||
|
await refreshGoogleToken(userId, row.refreshToken);
|
||||||
|
} else {
|
||||||
|
await db
|
||||||
|
.update(integrationTokens)
|
||||||
|
.set({ tokenStatus: 'needs_reconnect' })
|
||||||
|
.where(and(eq(integrationTokens.userId, userId), eq(integrationTokens.provider, 'google-health')));
|
||||||
|
bus.publish('signals.integration.token_expired', {
|
||||||
|
userId,
|
||||||
|
provider: 'google-health',
|
||||||
|
detectedAt: new Date().toISOString(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.error({ userId, err }, 'google-health: fetch failed');
|
||||||
|
}
|
||||||
|
return entry?.signals ?? [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const googleHealthSource = new GoogleHealthSignalSource();
|
||||||
Reference in New Issue
Block a user