Compare commits

..

10 Commits

Author SHA1 Message Date
ac1226c367 feat(integrations): migrate google-health from Fit REST to Google Health API v4
Google Fit REST API was closed to new sign-ups on 2024-05-01 and shuts down
end of 2026, surfacing as "Access blocked: this app's request is invalid"
when starting the OAuth flow.

- Swap the 10 fitness.* OAuth scopes for the 3 googlehealth.*.readonly
  scopes (activity_and_fitness, health_metrics_and_measurements, sleep).
- Replace fitness/v1 dataset:aggregate + sessions calls with
  health.googleapis.com/v4/users/me/dataTypes/{steps,total-calories,
  heart-rate,sleep}/dataPoints, filtered to today's window.
- Read the v4 DataPoint union defensively (the per-type schema is sparsely
  documented) and log the first raw sample at debug so we can refine field
  paths after the first real OAuth.
- Output Signal contract is unchanged — agents and downstream consumers
  see the same steps/activity/heart_rate/sleep signals.

Cloud Console still needs: enable Google Health API, add the 3 scopes to
the consent screen, add test user (all googlehealth scopes are Restricted).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 05:42:05 +00:00
2159d4cbd1 fix(infra): unblock docker builds for stars agent and web
- Dockerfile.ml: install build-essential so pyswisseph (stars agent) compiles
- Dockerfile.web: copy root package.json + pnpm-workspace.yaml + pnpm-lock.yaml into builder stage so pnpm --filter resolves the workspace
- CLAUDE.md: record both gotchas alongside the existing Docker rebuild notes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 04:46:20 +00:00
522454ab61 feat(agents): stars agent — astrological transits via pyswisseph (#121)
Computes natal chart (Sun/Moon/Mercury/Venus/Mars/Jupiter/Saturn) from
birth_date and finds active transits (conjunction/sextile/square/trine/
opposition) between today's sky and the user's natal positions. Top 3
most-exact transits are passed to the orchestrator as interpretive themes
to colour the tip — grounded and actionable, not predictive.

Birth date sourced from agent_prefs (populated by a connected Google
data source); requires data:google-health consent. Agent self-silences
when birth_date is absent. pyswisseph added to ml/serving/requirements.txt.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 10:59:10 +00:00
be8c006a4d feat(agents): tarot agent — daily three-card draw (situation/action/outcome) (#120)
Draws 3 Major Arcana cards from a daily seed (user_id + date) so the
reading is stable within a day and unique per user. Card meanings and
action hints are precomputed in the agent; the orchestrator receives a
structured prompt snippet and is instructed to weave the themes into a
grounded, practical tip without explaining the cards.

No inferred params, no external data — requires only data:core consent.
TTL 6 h (refreshes at most twice daily).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 10:52:55 +00:00
8474468614 feat(integrations): add Google Health card to connect page (#119)
The OAuth backend (signal source, /connect and /callback routes, token
refresh, consent grant) was already complete. This adds the missing UI:
a Google Health card in /connect with Connect/Disconnect actions, and
broadens the "See my tip →" CTA to appear when any integration is
connected (not only Todoist).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 10:28:14 +00:00
ad43a8f06a fix(recommender): serve fallback tips to users with no integrations (#117)
The integration-token gate returned 422 for users with no connected
sources, blocking them from any tip. Users with no integrations now go
through the full orchestrator pipeline; if it fails (or returns nothing
because agent outputs are also empty), randomFallbackTip() fires and
serves a generic advice tip instead of an error.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 09:54:54 +00:00
56fda0d737 chore(scheduler): skip agents whose data sources aren't granted (#128)
Check getEligibleAgentIds per user in runCycle before calling
computeAndStore — agents without consented data sources, silenced by
active context, or disabled via preference are skipped rather than
computed unconditionally. Eligibility check failure skips the whole
user (fail-closed). Skipped count added to cycle-complete log line.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 15:45:08 +00:00
b1bd3d465f docs(readme): replace inline issue checklists with Gitea milestone links
Roadmap phase sections now show shipped summaries only; open work lives
in Gitea milestones. Eliminates duplicate source-of-truth between README
and issue tracker.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 15:34:45 +00:00
8fd08379d7 chore(m2): close out remaining loose ends (#80, #86, #90)
- Add `ai` compose profile — Ollama + LiteLLM containers for local dev
  when Agap shared services are unavailable; use with LITELLM_URL /
  OLLAMA_URL env vars pointing ml-serving at localhost
- Mark #90 done (LLM schema validation + fallback shipped in 85a332b)
- Mark #80 superseded by ADR-0013 (multi-agent orchestrator is the pipeline)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 15:31:25 +00:00
85a332b22b feat(recommender): LLM schema validation + hardcoded fallback tips on AI failure (#90)
Python (ml/serving):
- Validate tip item after JSON parse: non-empty content, valid kind
- Retry on schema failure with a targeted clarification prompt, same 2× retry budget
- JSON parse failures keep the existing retry suffix

TypeScript (recommender):
- Add TipSource 'fallback' to shared-types
- FALLBACK_TIPS: 12 general-purpose life tips (hardcoded, no DB read)
- fetchOrchestratorTip returns {ok} discriminated union instead of null
- On !res.ok or fetch error: serve a random fallback tip with rationale 'AI service issues'
- Update tests: 204 path removed; both failure cases now expect source='fallback'

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 15:21:03 +00:00
19 changed files with 962 additions and 348 deletions

View File

@@ -71,6 +71,8 @@ docs/ architecture notes, ADRs, API specs
- **Never run two `docker compose up --build` at once** — both grab the same `--mount=type=cache,id=pnpm` and deadlock on the API's `pnpm --prod deploy` step. Symptom: build sits silent for hours on `[api builder 8/8]`. Before starting any build, check `ps aux | grep "docker compose"` and kill any prior `up --build` (`kill -9 <pid>` — the wrapper bash and the docker compose binary are separate PIDs; kill the docker compose one).
- **Don't add `--offline` to `pnpm --prod deploy`** — pnpm's metadata cache (`/root/.cache/pnpm/`) is not in the `/pnpm/store` cache mount, so `--offline` fails with `ERR_PNPM_NO_OFFLINE_META` for transitive devDeps (e.g. vite via vitest). Leave the deploy step network-on; it works.
- **All TS Dockerfiles need `python3 make g++`** in the base stage — `better-sqlite3` rebuilds natively on install. Missing from `Dockerfile.admin` historically caused `gyp ERR! find Python` failures.
- **`Dockerfile.ml` needs `build-essential`** (not just `gcc`) — `pyswisseph` (stars agent) compiles C from source and fails with `fatal error: math.h: No such file or directory` if only `gcc` is installed; it needs `libc-dev` too, easiest via `build-essential`.
- **`Dockerfile.web` builder stage needs root `package.json` + `pnpm-workspace.yaml` + `pnpm-lock.yaml`** copied in. Without them, `pnpm --filter @oo/shared-types build` fails with `[ERR_PNPM_NO_PKG_MANIFEST] No package.json found in /app`. The deps stage has them but the builder is a fresh layer; selective copies must include them.
- **A clean build of `--profile core` takes ~3 min total** when the buildx cache is warm. If it's been silent for >10 min, check for the parallel-build deadlock above before assuming "still going".
- Run Python agent tests: `python3 -m pytest ml/agents/tests/ -x -q` (tests add repo root to `sys.path` themselves).
- Run Python feature tests: `python3 -m pytest ml/features/ -x -q`

160
README.md
View File

@@ -121,173 +121,31 @@ All model calls route through **LiteLLM** at `llm.alogins.net` (or `LITELLM_URL`
## Roadmap
Issues and open work are tracked in [Gitea milestones](http://localhost:3000/alvis/oO/milestones). Pick an issue, check its milestone (= phase), read the service's `README.md`, ship.
### Phase 0 — Walking skeleton *(M0)* ✓ shipped
Goal: a single user signs in with Google, connects Todoist, and sees one random Todoist task on a black page. Deletion works.
- [x] Monorepo scaffold, docker-compose dev env
- [x] `auth` — Google OAuth2/PKCE via openid-client v6; session cookie; Next.js middleware guard
- [x] `integrations/todoist` — OAuth2 flow, token stored in DB, disconnect supported
- [x] `recommender` with `RandomPolicy`; stable `POST /recommend` contract; 30s task cache
- [x] `apps/web` — sign-in, connect, tip pages; PWA manifest + icons
- [x] Feedback: `done / snooze / dismiss`; reward inferred from dwell-time (`inferReward`); marks task complete in Todoist
- [x] Deploy modular monolith to Agap VM via Caddy at `o.alogins.net`
- [x] ToS + Privacy Policy pages (`/legal/terms`, `/legal/privacy`); implicit consent on sign-in
- [x] Account deletion: revokes tokens, purges data, soft-deletes profile; button on /connect
- [x] Metrics baseline: `tip_views` table (tip served) + `tip_feedback` (reactions) — activation + reaction rate queryable
Single user signs in with Google, connects Todoist, sees one random task on a black page. Deletion works. Auth, integrations, recommender stub, PWA, feedback loop, ToS/privacy, metrics baseline.
### Phase 1 — Real signal + in-the-moment delivery *(M1)* ✓ shipped
Goal: tips are picked, not drawn from a hat — and they arrive at the right moment on the web.
- [x] Event bus scaffold: typed in-process EventEmitter with 500-event ring buffer; subjects match future NATS JetStream — swap is mechanical
- [x] Todoist sync emits `signals.task.synced`; tip served/feedback emit `signals.tip.*`
- [x] Features extracted per task: `is_overdue`, `task_age_days`, `priority`; context: `hour_of_day`, `day_of_week`
- [x] **ε-greedy v1** (d=7, ε=0.10, day-of-week sin/cos features); per-user state persisted to disk
- [x] **ε-greedy v2** (d=12, profile features: completion rate, dismiss rate, dwell, preferred hour, tip volume) in shadow; promoted to active policy (ADR-0012)
- [x] `RemotePolicy` in recommender: calls ml/serving, falls back to RandomPolicy on timeout/error; logs explainability to `tip_scores`
- [x] Feedback loop: dwell-time inferred reward (`inferReward`) → online model update; `done` in 15 s2 min = +1.0 (magic zone)
- [x] Offline simulation framework (`ml/experiments/sim`): rule/LLM/claude-code judges, two-policy comparison, results persisted to `sim_runs` + `sim_events`
- [x] **Web Push** (VAPID): SW, subscribe/unsubscribe API, "notify me" button on tip page
- [x] Shadow-policy registry: run N shadow policies per request, log picks without serving them (#56)
- [x] NATS JetStream bridge — durable `signals.>` and `feedback.>` streams; in-process bus stays the source of truth, every publish bridges out (#21, shipped)
- [x] Per-user profile features (completion rate, dismiss rate, dwell, preferred hour, tip volume) — event-driven, JIT invalidation (#81)
- [ ] Quiet-hours + dedupe for push delivery
- [ ] Delayed rewards: tasks completed directly in Todoist (requires webhook from Todoist)
- [ ] Apple OAuth (deferred to M3)
#### M1 add-on — Admin & ML Ops Console *(fully shipped)*
oO is ML-heavy. Without a cockpit, every model change ships blind. This console is the team's single pane for users, signals, features, models, experiments, and tip outcomes — with the ability to *act* on them (revoke a token, replay an event, promote a model, reset a bandit).
**Framework pick — `apps/admin` on Next.js 15 + Tremor + shadcn/ui.** Analytics-first UI for an analytics-first product, stays on our existing TS/React/Tailwind stack, reuses `packages/shared-types`, `sdk-js`, and the Auth.js session. Specialized ML tooling (MLflow) runs as a **separate external service** linked from the admin shell; Grafana panels are embedded.
| Layer | Tool | Why |
|-------|------|-----|
| App shell | **Next.js 15** (new `apps/admin`) | Same stack as `apps/web`; reuses auth, types, SDK |
| Dashboards / charts | **[Tremor](https://tremor.so)** | Analytics-first React + Tailwind — KPI cards, time-series, categorical, heatmaps |
| CRUD primitives | **[shadcn/ui](https://ui.shadcn.com)** | Copy-paste Radix components; forms, dialogs, command palette |
| Heavy grids | **[TanStack Table v8](https://tanstack.com/table)** | Sortable / paginated / virtualized tables (events, users, tips) |
| Extra charts | **[Recharts](https://recharts.org)** / **[visx](https://airbnb.io/visx)** | Fallbacks where Tremor falls short (e.g. force graphs, Sankey) |
| Model registry / experiments | **[MLflow](https://mlflow.org)** *(external — `o.alogins.net/mlflow`)* | Experiment tracking, artifact browser, model registry; own basic-auth |
| Infra metrics | **[Grafana](https://grafana.com)** *(embedded panels)* | One ops source of truth |
| Ad-hoc analysis | **[Marimo](https://marimo.io)** reactive notebooks | Python-native for the ML side; launch-out link |
| AuthZ | `profile.role='admin'` + Next.js middleware | Reuses existing session; no new auth surface |
**Rejected alternatives (so we don't re-litigate):**
- *Retool / AppSmith* — low-code speed, but admin logic leaves our repo; weak analytics affordances for an analytics product
- *Streamlit / Gradio / Dash* — Python-first; thin RBAC and routing; splits our frontend stack in two
- *React-admin / Refine.dev* — strong CRUD scaffolding, but analytics/ML views feel bolted on; we'd rebuild Tremor-style dashboards ourselves
- *Superset / Metabase as the admin surface* — excellent for BI, poor for operational **writes** (revoke, replay, promote). Plan: **adopt Superset in M4** for BI alongside batch pipelines; ship a read-only SQL widget inside admin for now
**Build sequence:**
1. [x] **ADR-0006** — record the framework choice + "embed, don't rebuild" rule for MLflow/Grafana
2. [x] **Scaffold**`apps/admin` with Next.js 15, Tailwind, Tremor; deploy behind Caddy at `admin.o.alogins.net`
3. [x] **RBAC**`role` column on `users`; admin-only Next.js middleware; seed first admin via `ADMIN_SEED_EMAIL` env; `admin_actions` audit-log table
4. [x] **Overview dashboard** — DAU/WAU KPI cards, tips served, reaction breakdown, activation funnel
5. [x] **User explorer** — list + detail page: identity, consents, integrations, last tip, reward history; revoke-integration + reset-bandit + rebuild-profile actions
6. [x] **Event stream viewer** — live tail of `signals.*` with filters by subject/user/time; same UI when the bus swaps to NATS
7. [x] **Features page** — features sent to `ml/serving` per scoring call; per-user profile features with freshness; diff across time
8. [x] **Tips page** — tips served, scored, feedback reactions with policy/model breakdown
9. [x] **Reward analytics** — reaction distribution over time; per-policy / per-model / per-prompt-version compare; slice by `hour_of_day`, `priority`, cohort
10. [x] **Data quality widget** — missing-feature rate, stale-token rate, daily completeness heatmap; per-feature freshness SLA status
11. [x] **Ops actions** — revoke token (Users page), rebuild profile, reset bandit, enable/disable shadow policies; every action audit-logged
12. [x] **Health rollup**`/admin/health` surfaces api, ml/serving, SQLite, event-bus, MLflow; auto-refreshes every 15s
13. [x] **Read-only SQL runner** — SELECT-only runner against SQLite + saved queries (sunsets to Superset in M4)
14. [x] **Offline simulation runner** — launch `ml/experiments/sim` from admin UI; track sim runs, judge, policy comparison
15. [x] **Token-based admin auth**`POST /api/auth/token` for Playwright/CI; `ADMIN_TOKEN` env var (#105)
16. [x] **Docs pages** — admin documentation and runbooks inline
Tips are picked, not drawn from a hat. Event bus, Todoist sync, task features, ε-greedy policy (v1 + v2), web push, NATS JetStream bridge, shadow-policy registry, offline sim framework, per-user profile features, admin + ML ops console (`apps/admin`).
### Phase 2 — AI tips + multi-source signals *(M2)* ✓ shipped
Goal: tips are AI-generated from user context, not just raw Todoist tasks. Multiple signal sources feed a generalized pipeline. Research-intensive milestone.
**Architectural shift (mid-M2):** the bandit-ranks-LLM-candidates design from earlier in M2 was replaced with a multi-agent pipeline (ADR-0013): pre-compute agents emit prompt snippets, an orchestrator LLM produces the tip directly. ADR-0014 layers a unified Profile + agent registry + auto-inference framework on top so the system generalizes cleanly to N agents.
**Multi-agent recommendation (ADR-0013, shipped):**
- [x] `agent_outputs` table + per-agent TTL caching
- [x] Five initial agents: `overdue-task`, `momentum`, `time-of-day`, `recent-patterns`, `focus-area`
- [x] Agent pre-compute scheduler
- [x] Orchestrator cutover — recommender calls `ml/serving` with snippet list, no bandit scoring
- [x] Bandit endpoints + shadow policy machinery removed
**Unified Profile + agent registry (ADR-0014, shipped):**
- [x] Unified Profile model: prefs, contexts, consents + manifest plumbing + orchestrator cutover (#30)
- [x] Shared context-inference framework (#111)
- [x] Per-agent auto-inference: `time-of-day` (#112), `focus-area` (#113), `momentum` (#114), `overdue-task` (#115), `recent-patterns` (#116)
**AI infrastructure (unblock everything else):**
- [ ] `ai` compose profile — Ollama + LiteLLM for local dev; env vars `OLLAMA_URL` / `LITELLM_URL` (#86)
- [x] AI gateway — wire `ml/serving` to LiteLLM; model aliases `tip-generator` + `embedder` (#87)
**AI tip generation pipeline:**
- [x] Context assembler — user signals + feature store → structured prompt context (`ml/features/context.py`); skeleton implemented
- [x] Tip generator endpoint — `POST /generate` in `ml/serving`; LLM → N typed `TipCandidate` objects (#79)
- [x] `TipCandidate` shared schema — `{content, kind, source, model, prompt_version, confidence}`; update recommender pipeline (#89)
- [ ] LLM output validation + retry — JSON schema gate, clarification retry (2×), fallback to task-based (#90)
- [x] Prompt versioning — `prompt_version` + `model` columns in `tip_scores`; content-hash invalidation (#91)
- [x] LLM tip quality dashboard — reaction breakdown by model / prompt_version in `/admin/reward-analytics` (#92)
**Evaluation & model selection:**
- [x] Model benchmark — compare qwen2.5:7b / llama3.2:3b / gemma3:4b via offline sim + LLM judge (#93)
- [x] LLM prompt research — persona design, context injection strategies, few-shot examples (#84, #95)
**Pipeline architecture:**
- [x] Signal source abstraction — `SignalSource` interface for Todoist + extensible design (#78)
- [ ] Generalized recommendation pipeline — candidate → rank → render stages (#80)
- [x] Feature registry + user profile builder — centralized features, persistent profiles, event-driven invalidation (#81)
- [ ] Tip kind system — task, advice, insight, reminder with kind-aware UI + rewards (#82)
**Policy research:**
- [ ] Next-gen policies — Thompson sampling, neural bandits, hybrid transfer learning (#83)
**Integrations & infra (carried from M1):**
- [ ] Apple OAuth (#7)
- [x] NATS JetStream replacing in-process bus (#21) — adapter ships in `services/api/src/events/nats.ts`; in-proc bus is the producer, JetStream is the durable mirror
- [x] Todoist sync via events (#22) — background scheduler in `services/api/src/signals/scheduler.ts` emits `signals.task.synced` every `TODOIST_SYNC_INTERVAL_MS`; on-demand fetch remains as freshness fallback
- [x] Event schema registry + protobuf CI gate (#54) — buf lint/breaking checks on every PR
- [x] Per-user freshness SLAs for features (#61) — context-feature (JIT) vs profile-feature (batched) spec in ADR-0011; `invalidated_by` mirrored into `ProfileFeature`; CONTEXT_FEATURES in ml/features/context.py
- [x] Embedding-based task clustering — `nomic-embed-text` for semantic dedup + focus-area features (#97)
- [x] Observability (#18) — structured logs via pino, W3C trace IDs, Sentry hooks, trace correlation end-to-end
- [ ] CI skeleton (#3), E2E tests (#20)
**Bugs & UX (fix before new features):**
- [x] TipFeedback type mismatch (#73)
- [x] Todoist token refresh (#74) — OAuth token auto-refresh on 401
- [x] Reward fire-and-forget (#75) — retry logic + logging
- [x] Data retention purge (#76) — daily purge of 30-day-old tip_scores/tip_feedback
- [x] Port mismatch (#77) — fixed in docker-compose + env var config
- [x] UX refinements (#100102) — "done/snooze/dismiss" feedback only, config page UI, settings gear button
Tips are AI-generated from user context. Multi-agent pipeline (ADR-0013): five pre-compute agents (`overdue-task`, `momentum`, `time-of-day`, `recent-patterns`, `focus-area`) emit prompt snippets; orchestrator LLM produces one tip. Unified Profile + agent registry + auto-inference framework (ADR-0014). LLM output validation + fallback. LiteLLM gateway, model benchmarking, prompt research, MLflow tracing.
### Phase 3 — Native mobile *(M3)*
- [ ] iOS app (SwiftUI) with APNs push
- [ ] Android app (Compose) with FCM push
- [ ] `notifier` gains APNs + FCM channels, per-device rate limits
- [ ] Migrate auth from Auth.js to dedicated OIDC provider (trigger from ADR-0004)
- [ ] Consolidate MLflow behind shared OIDC (SSO for all internal services)
- [ ] Decide-and-deliver scheduler: per-user "is this tip worth interrupting now?" threshold
iOS (SwiftUI + APNs) and Android (Compose + FCM). `notifier` service gains APNs + FCM channels. Auth migrated from Auth.js to dedicated OIDC provider. Decide-and-deliver scheduler. See [M3 milestone](http://localhost:3000/alvis/oO/milestone/3).
### Phase 4 — MLOps at scale *(M4)*
- [x] MLflow deployed as external service (`mlops` compose profile); own auth; health check integrated
- [ ] Write first retraining pipeline + first MLflow experiment logging from `ml/serving` + JetStream consumers (#98)
- [ ] Feature-to-prompt pipeline — nightly batch job materializes context for LLM; cuts inline latency (#94)
- [ ] Prompt optimization loop — sim A/B → MLflow experiment → human-approved promotion (#95)
- [ ] LLM fine-tuning — tip reactions as training signal; LoRA on base model; MLflow tracks runs (#96)
- [ ] Embedding-based task clustering — `nomic-embed-text` for dedup + user pattern features (#97)
- [ ] Modular-monolith packaging + import-boundary lint (#47)
- [ ] Consolidate MLflow auth into shared OIDC provider (tracked as M3 issue #85)
- [ ] Shadow → A/B → launch pipeline as first-class in MLflow
- [ ] Online experiments framework: deterministic assignment + bandit policies alongside fixed-split A/B
- [ ] Cross-user collaborative features (opt-in only); cohort slicing; fairness checks
- [ ] Drift monitoring (feature + prediction + reward drift); model cards per LLM version
Retraining pipeline, feature-to-prompt batch jobs, prompt optimization loop, LLM fine-tuning on reaction signals, modular-monolith import-boundary lint, online experiments framework, drift monitoring. See [M4 milestone](http://localhost:3000/alvis/oO/milestone/4).
### Phase 5 — Production hardening *(M5)*
- [ ] Audit logging, rotation of provider tokens + internal signing keys
- [ ] **k3s** on existing VM, then k8s + HPA once multi-node justified (no cliff)
- [ ] Multi-region failover, Postgres PITR, event-bus mirroring
- [ ] Public integration SDK; sandbox tenancy for third-party connectors
- [ ] Billing + subscription tiers
Audit logging, key rotation, k3s → k8s, multi-region, public integration SDK, billing. See [M5 milestone](http://localhost:3000/alvis/oO/milestone/5).
---
## Contributing
This repo is split into independent modules; most tickets belong to exactly one. Pick an issue, check its milestone (= phase), read the service's `README.md`, ship.
This repo is split into independent modules; most tickets belong to exactly one. Pick an issue from [Gitea](http://localhost:3000/alvis/oO/issues), read the service's `README.md`, ship.
Conventions and per-service guidance live in [`CLAUDE.md`](CLAUDE.md).

View File

@@ -51,6 +51,8 @@ function ConnectPageInner() {
}
const todoistConnected = isConnected('todoist');
const googleHealthConnected = isConnected('google-health');
const anyConnected = todoistConnected || googleHealthConnected;
return (
<main style={{ minHeight: '100vh', padding: '4rem 2rem', maxWidth: '480px', margin: '0 auto' }}>
@@ -85,7 +87,6 @@ function ConnectPageInner() {
marginBottom: '1rem',
}}>
<div style={{ display: 'flex', alignItems: 'center', gap: '0.875rem' }}>
{/* Todoist logomark */}
<svg width="28" height="28" viewBox="0 0 24 24" fill="none" aria-label="Todoist">
<rect width="24" height="24" rx="6" fill="#DB4035"/>
<path d="M6 8.5L11 13l7-7" stroke="#fff" strokeWidth="2" strokeLinecap="round" strokeLinejoin="round"/>
@@ -130,7 +131,65 @@ function ConnectPageInner() {
)}
</div>
{todoistConnected && (
{/* Google Health card */}
<div style={{
border: '1px solid rgba(255,255,255,0.1)',
borderRadius: '0.75rem',
padding: '1.25rem 1.5rem',
display: 'flex',
alignItems: 'center',
justifyContent: 'space-between',
marginBottom: '1rem',
}}>
<div style={{ display: 'flex', alignItems: 'center', gap: '0.875rem' }}>
<svg width="28" height="28" viewBox="0 0 24 24" fill="none" aria-label="Google Health">
<rect width="24" height="24" rx="6" fill="#EA4335"/>
<path d="M12 6.5c0-1.1.9-2 2-2s2 .9 2 2-.9 2-2 2-2-.9-2-2z" fill="#fff"/>
<path d="M8 10.5c0-1.1.9-2 2-2s2 .9 2 2-.9 2-2 2-2-.9-2-2z" fill="#fff" opacity=".7"/>
<path d="M12 14.5c0 2.2-1.8 4-4 4s-4-1.8-4-4 1.8-4 4-4 4 1.8 4 4z" fill="#fff" opacity=".4"/>
<path d="M13 13.5c.5-1 1.5-1.7 2.5-1.7 1.7 0 3 1.3 3 3s-1.3 3-3 3c-1 0-1.9-.5-2.5-1.3" stroke="#fff" strokeWidth="1.5" strokeLinecap="round" fill="none"/>
</svg>
<div>
<div style={{ fontWeight: 500, fontSize: '0.9rem' }}>Google Health</div>
<div style={{ color: 'var(--gray)', fontSize: '0.75rem', marginTop: '0.1rem' }}>
{googleHealthConnected ? 'Connected' : 'Steps, sleep & activity'}
</div>
</div>
</div>
{googleHealthConnected ? (
<button
onClick={() => handleDisconnect('google-health')}
disabled={disconnecting === 'google-health'}
style={{
background: 'transparent',
border: '1px solid rgba(255,255,255,0.15)',
color: 'var(--gray)',
borderRadius: '0.375rem',
padding: '0.375rem 0.875rem',
fontSize: '0.8rem',
}}
>
{disconnecting === 'google-health' ? '…' : 'Disconnect'}
</button>
) : (
<a
href="/api/integrations/google-health/connect?redirectTo=/connect"
style={{
background: 'var(--white)',
color: 'var(--black)',
borderRadius: '0.375rem',
padding: '0.375rem 0.875rem',
fontSize: '0.8rem',
fontWeight: 500,
}}
>
Connect
</a>
)}
</div>
{anyConnected && (
<div style={{ marginTop: '3rem' }}>
<a
href="/tip"

View File

@@ -1,5 +1,8 @@
FROM python:3.12-slim
WORKDIR /app/ml/serving
RUN apt-get update \
&& apt-get install -y --no-install-recommends build-essential \
&& rm -rf /var/lib/apt/lists/*
COPY ml/serving/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY ml/ /app/ml/

View File

@@ -13,6 +13,7 @@ WORKDIR /app
COPY --from=deps /app/node_modules ./node_modules
COPY --from=deps /app/packages/shared-types/node_modules ./packages/shared-types/node_modules
COPY --from=deps /app/apps/web/node_modules ./apps/web/node_modules
COPY package.json pnpm-workspace.yaml pnpm-lock.yaml ./
COPY tsconfig.base.json ./
COPY packages/shared-types ./packages/shared-types
COPY apps/web ./apps/web

View File

@@ -82,6 +82,46 @@ services:
timeout: 5s
retries: 5
# ── ai profile — Ollama + LiteLLM for local dev ──────────────────────────
# Start: docker compose --profile ai up
# Use when the Agap shared Ollama/LiteLLM services are not available locally.
# Set LITELLM_URL=http://localhost:4000 and OLLAMA_URL=http://localhost:11434
# in .env.local to point ml-serving at these containers instead of Agap.
ollama:
image: ollama/ollama:latest
profiles: [ai]
volumes:
- ollama-models:/root/.ollama
ports:
- "127.0.0.1:11434:11434"
healthcheck:
test: ["CMD", "curl", "-sf", "http://localhost:11434/api/tags"]
interval: 15s
timeout: 5s
retries: 10
litellm:
image: ghcr.io/berriai/litellm:main-latest
profiles: [ai]
environment:
LITELLM_MASTER_KEY: ${LITELLM_MASTER_KEY:-sk-local-dev}
command: >
--model ollama/qwen2.5:1.5b
--model ollama/nomic-embed-text
--api_base http://ollama:11434
--port 4000
ports:
- "127.0.0.1:4000:4000"
depends_on:
ollama:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-sf", "http://localhost:4000/health"]
interval: 10s
timeout: 5s
retries: 5
# ── mlops profile — MLflow ────────────────────────────────────────────────
# Start: docker compose --profile mlops up
# MLflow UI: http://localhost:5000 or https://o.alogins.net/mlflow
@@ -129,3 +169,6 @@ services:
interval: 10s
timeout: 5s
retries: 5
volumes:
ollama-models:

View File

@@ -17,6 +17,8 @@ from .time_of_day import TimeOfDayAgent, MANIFEST as TIME_OF_DAY_MANIFEST
from .recent_patterns import RecentPatternsAgent, MANIFEST as RECENT_PATTERNS_MANIFEST
from .focus_area import FocusAreaAgent, MANIFEST as FOCUS_AREA_MANIFEST
from .health_vitals import HealthVitalsAgent, MANIFEST as HEALTH_VITALS_MANIFEST
from .tarot import TarotAgent, MANIFEST as TAROT_MANIFEST
from .stars import StarsAgent, MANIFEST as STARS_MANIFEST
_REGISTERED: list[tuple[BaseAgent, AgentManifest]] = [
(OverdueTaskAgent(), OVERDUE_TASK_MANIFEST),
@@ -25,6 +27,8 @@ _REGISTERED: list[tuple[BaseAgent, AgentManifest]] = [
(RecentPatternsAgent(), RECENT_PATTERNS_MANIFEST),
(FocusAreaAgent(), FOCUS_AREA_MANIFEST),
(HealthVitalsAgent(), HEALTH_VITALS_MANIFEST),
(TarotAgent(), TAROT_MANIFEST),
(StarsAgent(), STARS_MANIFEST),
]
# Sanity check — agent_id and manifest.id must agree, otherwise the registry

233
ml/agents/stars.py Normal file
View File

@@ -0,0 +1,233 @@
"""Stars agent — astrological transit predictions via pyswisseph.
Requires birth_date in agent_prefs (ISO 8601 date string, e.g. '1990-06-15').
Populated from a connected data source (Google profile / Google Health).
If birth_date is absent the agent returns a no-data snippet and the
eligibility filter will silence it once the consent / pref check catches up.
Computes today's Sun, Moon, Mercury, Venus, Mars, Jupiter, Saturn positions
and finds notable transits (conjunctions, oppositions, squares, trines, sextiles)
between today's sky and the user's natal chart. Passes a concise prediction
+ interpretation to the orchestrator.
"""
from __future__ import annotations
import math
from datetime import date, datetime, timezone
from typing import ClassVar
from .base import BaseAgent, AgentInput, AgentOutput
from .manifest import AgentManifest, InferredParam
try:
import swisseph as swe # type: ignore
_SWE_AVAILABLE = True
except ImportError: # pragma: no cover — present in container, absent in dev
_SWE_AVAILABLE = False
# ---------------------------------------------------------------------------
# Planet catalogue
# ---------------------------------------------------------------------------
_PLANETS: list[tuple[int, str]] = []
if _SWE_AVAILABLE:
_PLANETS = [
(swe.SUN, "Sun"),
(swe.MOON, "Moon"),
(swe.MERCURY, "Mercury"),
(swe.VENUS, "Venus"),
(swe.MARS, "Mars"),
(swe.JUPITER, "Jupiter"),
(swe.SATURN, "Saturn"),
]
# Aspect definitions: (angle, orb, name, nature)
_ASPECTS: list[tuple[float, float, str, str]] = [
(0.0, 8.0, "conjunction", "intensifying"),
(60.0, 6.0, "sextile", "harmonious"),
(90.0, 7.0, "square", "challenging"),
(120.0, 8.0, "trine", "flowing"),
(180.0, 8.0, "opposition", "tension"),
]
_ZODIAC = [
"Aries", "Taurus", "Gemini", "Cancer", "Leo", "Virgo",
"Libra", "Scorpio", "Sagittarius", "Capricorn", "Aquarius", "Pisces",
]
# Interpretive keywords per planet for transit readings
_PLANET_THEMES: dict[str, str] = {
"Sun": "identity, vitality, core purpose",
"Moon": "emotions, intuition, comfort needs",
"Mercury": "communication, thinking, decisions",
"Venus": "relationships, values, pleasure",
"Mars": "energy, drive, conflict",
"Jupiter": "growth, opportunity, expansion",
"Saturn": "discipline, responsibility, long-term structure",
}
def _zodiac_sign(lon: float) -> str:
return _ZODIAC[int(lon / 30) % 12]
def _jd_from_date(d: date) -> float:
"""Julian Day Number for noon UTC on the given date."""
assert _SWE_AVAILABLE
return swe.julday(d.year, d.month, d.day, 12.0)
def _planet_positions(jd: float) -> dict[str, float]:
assert _SWE_AVAILABLE
positions: dict[str, float] = {}
for pid, name in _PLANETS:
result, _ = swe.calc_ut(jd, pid)
positions[name] = result[0] # ecliptic longitude
return positions
def _angular_diff(a: float, b: float) -> float:
"""Smallest angle between two ecliptic longitudes (0180)."""
diff = abs(a - b) % 360
return diff if diff <= 180 else 360 - diff
def _find_transits(natal: dict[str, float], today: dict[str, float]) -> list[dict]:
"""Return list of active transits between today's sky and natal chart."""
transits: list[dict] = []
for t_name, t_lon in today.items():
for n_name, n_lon in natal.items():
diff = _angular_diff(t_lon, n_lon)
for angle, orb, aspect_name, nature in _ASPECTS:
if abs(diff - angle) <= orb:
transits.append({
"transit_planet": t_name,
"natal_planet": n_name,
"aspect": aspect_name,
"nature": nature,
"orb": round(abs(diff - angle), 2),
})
# Sort by tightness of orb
transits.sort(key=lambda x: x["orb"])
return transits
def _format_transit(t: dict) -> str:
tp, np, asp, nat = t["transit_planet"], t["natal_planet"], t["aspect"], t["nature"]
tp_theme = _PLANET_THEMES.get(tp, "")
np_theme = _PLANET_THEMES.get(np, "")
return (
f"Transiting {tp} ({tp_theme}) {asp} natal {np} ({np_theme}) "
f"— a {nat} influence"
)
# ---------------------------------------------------------------------------
# Manifest
# ---------------------------------------------------------------------------
MANIFEST = AgentManifest(
id="stars",
version="1.0.0",
description="Astrological transit predictions based on the user's birth date and today's planetary positions.",
pref_schema={
"type": "object",
"additionalProperties": False,
"properties": {
"birth_date": {
"type": "string",
"pattern": r"^\d{4}-\d{2}-\d{2}$",
"description": "ISO 8601 birth date (YYYY-MM-DD). Populated from connected data source.",
},
},
},
context_schema=["profile.birth_date"],
# Requires a connected Google source that supplies birth date.
# data:google-health is the current carrier; when Google profile is a
# separate consent key, add it here.
required_consents=["data:core", "data:google-health"],
output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=3_600 * 6, # planetary positions change slowly — 6 h is fine
silenced_in_contexts=[],
inferred_params=[
InferredParam(
key="birth_date",
ttl_sec=365 * 86_400, # effectively permanent once known
cold_start_default=None,
min_history=999_999, # never inferred from events — sourced externally
infer=None,
),
],
)
class StarsAgent(BaseAgent):
"""Produces astrological transit predictions for the user's birth chart."""
agent_id: ClassVar[str] = MANIFEST.id
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
version: ClassVar[str] = MANIFEST.version
def compute(self, inp: AgentInput) -> AgentOutput:
birth_date_str: str | None = inp.agent_prefs.get("birth_date")
if not birth_date_str:
prompt = (
"Birth date is not available — astrological reading skipped. "
"(Always write the tip in English.)"
)
return self._make_output(inp, prompt, {"no_birth_date": True})
if not _SWE_AVAILABLE:
prompt = (
"Astrological library unavailable — reading skipped. "
"(Always write the tip in English.)"
)
return self._make_output(inp, prompt, {"swe_unavailable": True})
try:
birth_date = date.fromisoformat(birth_date_str)
except ValueError:
prompt = "Birth date format invalid — astrological reading skipped."
return self._make_output(inp, prompt, {"invalid_birth_date": birth_date_str})
today_date = inp.now.date()
natal_jd = _jd_from_date(birth_date)
today_jd = _jd_from_date(today_date)
natal_pos = _planet_positions(natal_jd)
today_pos = _planet_positions(today_jd)
transits = _find_transits(natal_pos, today_pos)
top = transits[:3] # most exact transits only
today_sun_sign = _zodiac_sign(today_pos["Sun"])
natal_sun_sign = _zodiac_sign(natal_pos["Sun"])
natal_moon_sign = _zodiac_sign(natal_pos["Moon"])
snapshot = {
"birth_date": birth_date_str,
"today": today_date.isoformat(),
"natal_sun": natal_sun_sign,
"natal_moon": natal_moon_sign,
"today_sun": today_sun_sign,
"active_transits": transits[:5],
}
if not top:
prompt = (
f"Natal chart: Sun in {natal_sun_sign}, Moon in {natal_moon_sign}. "
f"Today's Sun is in {today_sun_sign}. "
"No exact transits today — a quiet, stable day energetically. "
"(Always write the tip in English.)"
)
else:
transit_lines = "; ".join(_format_transit(t) for t in top)
prompt = (
f"Natal chart: Sun in {natal_sun_sign}, Moon in {natal_moon_sign}. "
f"Today's Sun is in {today_sun_sign}. "
f"Active transits: {transit_lines}. "
"Use these planetary themes to colour the tip — "
"keep it grounded and actionable, not predictive or fatalistic. "
"(Always write the tip in English.)"
)
return self._make_output(inp, prompt, snapshot)

110
ml/agents/tarot.py Normal file
View File

@@ -0,0 +1,110 @@
"""TAROT agent — three-card draw (situation / action / outcome).
Draws cards deterministically from a daily seed so the reading stays
stable for the day (same cards whether the agent runs at 08:00 or 14:00).
Card meanings are precomputed here and passed as a structured snippet to
the orchestrator, which weaves them into a grounded, actionable tip.
"""
from __future__ import annotations
import hashlib
from typing import ClassVar
from .base import BaseAgent, AgentInput, AgentOutput
from .manifest import AgentManifest
# ---------------------------------------------------------------------------
# Card definitions — Major Arcana only (22 cards, indices 021)
# Each entry: (name, upright_meaning, action_hint)
# ---------------------------------------------------------------------------
_CARDS: list[tuple[str, str, str]] = [
("The Fool", "new beginnings, spontaneity, a leap of faith", "start something without overthinking"),
("The Magician", "skill, willpower, resourcefulness", "use what you already have"),
("The High Priestess","intuition, inner knowing, patience", "listen to what you already sense is true"),
("The Empress", "abundance, creativity, nurturing", "invest energy in something generative"),
("The Emperor", "structure, authority, discipline", "set a boundary or impose order"),
("The Hierophant", "tradition, guidance, shared values", "seek or offer mentorship"),
("The Lovers", "alignment, choice, commitment", "make a decision you have been avoiding"),
("The Chariot", "determination, focus, forward motion", "push through the resistance"),
("Strength", "inner courage, patience, gentle persistence", "stay the course with compassion"),
("The Hermit", "solitude, reflection, inner guidance", "step back and think before acting"),
("Wheel of Fortune", "cycles, turning points, inevitable change", "acknowledge what is shifting around you"),
("Justice", "fairness, truth, cause and effect", "audit a recent decision for its real consequences"),
("The Hanged Man", "pause, surrender, new perspective", "release your grip on the outcome"),
("Death", "endings, transformation, release", "let go of what no longer serves you"),
("Temperance", "balance, moderation, patience", "blend two competing demands"),
("The Devil", "attachment, habit, shadow patterns", "name a loop you are stuck in"),
("The Tower", "sudden disruption, revelation, necessary collapse", "accept the thing that already broke"),
("The Star", "hope, renewal, calm after the storm", "trust that recovery is already underway"),
("The Moon", "uncertainty, illusion, the unconscious", "sit with ambiguity rather than forcing clarity"),
("The Sun", "clarity, vitality, success", "act from your most energised self"),
("Judgement", "reflection, reckoning, a call to rise", "respond to a long-deferred summons"),
("The World", "completion, integration, a cycle closing", "acknowledge what you have finished"),
]
_POSITIONS = ("situation", "action", "outcome")
def _daily_draw(user_id: str, date_str: str) -> list[int]:
"""Return three distinct card indices seeded by (user_id, date)."""
seed = hashlib.sha256(f"{user_id}:{date_str}".encode()).digest()
indices: list[int] = []
offset = 0
while len(indices) < 3:
val = int.from_bytes(seed[offset:offset + 2], "big") % len(_CARDS)
if val not in indices:
indices.append(val)
offset = (offset + 2) % (len(seed) - 1)
return indices
MANIFEST = AgentManifest(
id="tarot",
version="1.0.0",
description="Daily three-card draw (situation/action/outcome) that frames the tip as a symbolic reflection.",
pref_schema={
"type": "object",
"additionalProperties": False,
"properties": {
"enabled": {
"type": "boolean",
"default": True,
"description": "Set false to disable the tarot agent for this user.",
},
},
},
context_schema=[],
required_consents=["data:core"],
output_contract={"type": "snippet", "format": "free_text"},
ttl_sec=3_600 * 6, # stable for 6 h; refreshes mid-day at most twice
silenced_in_contexts=[],
inferred_params=[],
)
class TarotAgent(BaseAgent):
"""Produces a three-card reading as a prompt snippet."""
agent_id: ClassVar[str] = MANIFEST.id
ttl_seconds: ClassVar[int] = MANIFEST.ttl_sec
version: ClassVar[str] = MANIFEST.version
def compute(self, inp: AgentInput) -> AgentOutput:
date_str = inp.now.strftime("%Y-%m-%d")
indices = _daily_draw(inp.user_id, date_str)
reading: list[dict] = []
parts: list[str] = [f"Today's tarot reading ({date_str}):"]
for pos, idx in zip(_POSITIONS, indices):
name, meaning, hint = _CARDS[idx]
reading.append({"position": pos, "card": name, "meaning": meaning, "hint": hint})
parts.append(f" {pos.capitalize()}{name}: {meaning}. Hint: {hint}.")
parts.append(
"Weave these symbolic themes lightly into the tip — "
"ground them in practical, specific action. "
"Do not explain the cards; let their meaning shape the advice."
)
prompt = "\n".join(parts)
snapshot = {"date": date_str, "reading": reading}
return self._make_output(inp, prompt, snapshot)

View File

@@ -13,6 +13,8 @@ from ml.agents.momentum import MomentumAgent
from ml.agents.time_of_day import TimeOfDayAgent
from ml.agents.recent_patterns import RecentPatternsAgent
from ml.agents.focus_area import FocusAreaAgent
from ml.agents.tarot import TarotAgent, _daily_draw, _CARDS, _POSITIONS
from ml.agents.stars import StarsAgent, _SWE_AVAILABLE
from ml.agents.registry import get_agent, all_agents
_NOW = datetime(2026, 5, 1, 9, 0, 0, tzinfo=timezone.utc) # Thursday 09:00 UTC
@@ -250,13 +252,102 @@ class TestFocusAreaAgent:
assert all("label" in c and "task_count" in c and "tasks" in c for c in clusters)
# ── TarotAgent ────────────────────────────────────────────────────────────────
class TestTarotAgent:
agent = TarotAgent()
def test_basic_output(self):
out = self.agent.compute(_inp())
_check_output(out, self.agent)
assert "situation" in out.prompt_text.lower()
assert "action" in out.prompt_text.lower()
assert "outcome" in out.prompt_text.lower()
assert out.signals_snapshot["date"] == "2026-05-01"
assert len(out.signals_snapshot["reading"]) == 3
def test_three_distinct_cards(self):
out = self.agent.compute(_inp())
cards = [r["card"] for r in out.signals_snapshot["reading"]]
assert len(set(cards)) == 3
def test_positions_labelled(self):
out = self.agent.compute(_inp())
positions = [r["position"] for r in out.signals_snapshot["reading"]]
assert positions == list(_POSITIONS)
def test_daily_stability(self):
out1 = self.agent.compute(_inp(now=datetime(2026, 5, 1, 8, 0, 0, tzinfo=timezone.utc)))
out2 = self.agent.compute(_inp(now=datetime(2026, 5, 1, 20, 0, 0, tzinfo=timezone.utc)))
assert out1.signals_snapshot["reading"] == out2.signals_snapshot["reading"]
def test_different_days_different_draw(self):
out1 = self.agent.compute(_inp(now=datetime(2026, 5, 1, 9, 0, 0, tzinfo=timezone.utc)))
out2 = self.agent.compute(_inp(now=datetime(2026, 5, 2, 9, 0, 0, tzinfo=timezone.utc)))
assert out1.signals_snapshot["reading"] != out2.signals_snapshot["reading"]
def test_different_users_different_draw(self):
out1 = self.agent.compute(_inp(user_id="user-A"))
out2 = self.agent.compute(_inp(user_id="user-B"))
assert out1.signals_snapshot["reading"] != out2.signals_snapshot["reading"]
def test_daily_draw_returns_valid_indices(self):
indices = _daily_draw("u1", "2026-05-01")
assert len(indices) == 3
assert len(set(indices)) == 3
assert all(0 <= i < len(_CARDS) for i in indices)
# ── StarsAgent ────────────────────────────────────────────────────────────────
class TestStarsAgent:
agent = StarsAgent()
def test_no_birth_date(self):
out = self.agent.compute(_inp())
_check_output(out, self.agent)
assert out.signals_snapshot.get("no_birth_date") is True
assert "birth date" in out.prompt_text.lower()
@pytest.mark.skipif(not _SWE_AVAILABLE, reason="pyswisseph not installed")
def test_invalid_birth_date(self):
out = self.agent.compute(_inp(agent_prefs={"birth_date": "not-a-date"}))
_check_output(out, self.agent)
assert out.signals_snapshot.get("invalid_birth_date") == "not-a-date"
@pytest.mark.skipif(not _SWE_AVAILABLE, reason="pyswisseph not installed")
def test_with_birth_date(self):
out = self.agent.compute(_inp(agent_prefs={"birth_date": "1990-06-15"}))
_check_output(out, self.agent)
assert "natal" in out.prompt_text.lower()
assert out.signals_snapshot["birth_date"] == "1990-06-15"
assert "natal_sun" in out.signals_snapshot
assert "natal_moon" in out.signals_snapshot
@pytest.mark.skipif(not _SWE_AVAILABLE, reason="pyswisseph not installed")
def test_transit_snapshot_structure(self):
out = self.agent.compute(_inp(agent_prefs={"birth_date": "1985-03-21"}))
snap = out.signals_snapshot
assert "active_transits" in snap
for t in snap["active_transits"]:
assert {"transit_planet", "natal_planet", "aspect", "nature", "orb"} <= t.keys()
def test_swe_unavailable_path(self, monkeypatch):
import ml.agents.stars as stars_mod
monkeypatch.setattr(stars_mod, "_SWE_AVAILABLE", False)
agent = StarsAgent()
out = agent.compute(_inp(agent_prefs={"birth_date": "1990-06-15"}))
_check_output(out, agent)
assert out.signals_snapshot.get("swe_unavailable") is True
# ── Registry ─────────────────────────────────────────────────────────────────
class TestRegistry:
def test_all_agents_present(self):
agents = all_agents()
ids = {a.agent_id for a in agents}
assert ids == {"overdue-task", "momentum", "time-of-day", "recent-patterns", "focus-area", "health-vitals"}
assert ids == {"overdue-task", "momentum", "time-of-day", "recent-patterns", "focus-area", "health-vitals", "tarot", "stars"}
def test_get_agent(self):
a = get_agent("momentum")

View File

@@ -293,6 +293,25 @@ _RETRY_SUFFIX_OBJ = (
"Reply ONLY with the JSON object — no prose, no markdown fences."
)
_RETRY_SUFFIX_SCHEMA = (
"\n\nYour previous response parsed as JSON but was missing required fields. "
'Reply ONLY with a JSON object containing "content" (non-empty string) and "kind" '
'(one of: advice, task, insight, reminder) — no prose, no markdown fences.'
)
_VALID_KINDS = {"advice", "task", "insight", "reminder"}
def _validate_tip_item(item: dict) -> str | None:
"""Return an error string if item fails schema, else None."""
content = item.get("content", "")
if not isinstance(content, str) or not content.strip():
return "missing or empty 'content' field"
kind = item.get("kind", "")
if kind and kind not in _VALID_KINDS:
return f"invalid kind '{kind}', must be one of {_VALID_KINDS}"
return None
@app.post("/agents/{agent_id}/compute", response_model=AgentComputeResponse)
async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentComputeResponse:
@@ -504,11 +523,15 @@ async def recommend(req: RecommendRequest) -> RecommendResponse:
text = text[4:]
parsed = json.loads(text)
item: dict = parsed[0] if isinstance(parsed, list) else parsed
schema_err = _validate_tip_item(item)
if schema_err:
raise ValueError(schema_err)
break
except (json.JSONDecodeError, ValueError, IndexError) as exc:
last_parse_error = str(exc)
messages.append({"role": "assistant", "content": last_raw})
messages.append({"role": "user", "content": _RETRY_SUFFIX_OBJ})
is_schema_err = not isinstance(exc, json.JSONDecodeError)
messages.append({"role": "user", "content": _RETRY_SUFFIX_SCHEMA if is_schema_err else _RETRY_SUFFIX_OBJ})
else:
_end_span(llm_span, status="ERROR")
_end_span(root, status="ERROR")

View File

@@ -8,3 +8,4 @@ nats-py>=2.9.0
structlog>=24.1.0
sentry-sdk>=2.0.0
mlflow-skinny>=3.1.0
pyswisseph>=2.10.3.2

View File

@@ -2,7 +2,7 @@
export type TipKind = 'task' | 'advice' | 'insight' | 'reminder';
/** Where the tip content originated */
export type TipSource = 'todoist' | 'llm' | 'advice';
export type TipSource = 'todoist' | 'llm' | 'advice' | 'fallback';
/** A single recommendation surfaced to the user */
export interface Tip {

View File

@@ -83,16 +83,17 @@ describe('POST /recommend integration', () => {
clearSignalCache?.();
});
it('returns 204 when Todoist is empty and orchestrator fails', async () => {
it('returns fallback tip when orchestrator fails', async () => {
globalThis.fetch = vi.fn().mockImplementation((url: string) => {
if (String(url).includes('todoist.com')) {
return Promise.resolve({ ok: true, status: 200, json: async () => ({ results: [] }) } as any);
}
// /recommend fails → orchestrator returns null, random fallback also empty → 204
return Promise.resolve({ ok: false, status: 503 } as any);
});
const { status } = await post(`${baseUrl}/api/recommend`);
expect(status).toBe(204);
const { status, body } = await post(`${baseUrl}/api/recommend`);
expect(status).toBe(200);
expect(body.tip.source).toBe('fallback');
expect(body.tip.rationale).toBe('AI service issues');
});
it('serves orchestrator tip and writes correct tip_scores columns', async () => {
@@ -132,7 +133,7 @@ describe('POST /recommend integration', () => {
expect(row.tipKind).toBe('advice');
});
it('falls back to random signal tip when orchestrator fails', async () => {
it('falls back to hardcoded tip when orchestrator fails', async () => {
globalThis.fetch = vi.fn().mockImplementation((url: string) => {
if (String(url).includes('todoist.com')) {
return Promise.resolve({
@@ -142,19 +143,14 @@ describe('POST /recommend integration', () => {
}),
} as any);
}
// /recommend fails → falls back to random signal candidate
return Promise.resolve({ ok: false, status: 502 } as any);
});
const { status, body } = await post(`${baseUrl}/api/recommend`);
expect(status).toBe(200);
expect(body.tip.source).toBe('todoist');
const rows = await testDb.select().from(tipScores);
const row = rows[rows.length - 1];
expect(row.policy).toBe('random');
expect(row.promptVersion).toBeNull();
expect(row.llmModel).toBeNull();
expect(body.tip.source).toBe('fallback');
expect(body.tip.rationale).toBe('AI service issues');
expect(body.tip.kind).toBe('advice');
});
it('eligibility filter: only passes consented agent outputs to ml/serving', async () => {

View File

@@ -17,17 +17,9 @@ const GOOGLE_TOKEN_URL = 'https://oauth2.googleapis.com/token';
const GOOGLE_REVOKE_URL = 'https://oauth2.googleapis.com/revoke';
const GOOGLE_HEALTH_SCOPES = [
'https://www.googleapis.com/auth/fitness.activity.read',
'https://www.googleapis.com/auth/fitness.body.read',
'https://www.googleapis.com/auth/fitness.sleep.read',
'https://www.googleapis.com/auth/fitness.heart_rate.read',
'https://www.googleapis.com/auth/fitness.nutrition.read',
'https://www.googleapis.com/auth/fitness.location.read',
'https://www.googleapis.com/auth/fitness.blood_glucose.read',
'https://www.googleapis.com/auth/fitness.blood_pressure.read',
'https://www.googleapis.com/auth/fitness.body_temperature.read',
'https://www.googleapis.com/auth/fitness.oxygen_saturation.read',
'https://www.googleapis.com/auth/fitness.reproductive_health.read',
'https://www.googleapis.com/auth/googlehealth.activity_and_fitness.readonly',
'https://www.googleapis.com/auth/googlehealth.health_metrics_and_measurements.readonly',
'https://www.googleapis.com/auth/googlehealth.sleep.readonly',
].join(' ');
// In-memory CSRF state store

View File

@@ -2,7 +2,7 @@ import { type Router as ExpressRouter, Router, Response } from 'express';
import { nanoid } from 'nanoid';
import { logger } from '../logger.js';
import { db } from '../db/index.js';
import { integrationTokens, tipFeedback, tipViews, tipScores, userPreferences } from '../db/schema.js';
import { tipFeedback, tipViews, tipScores, userPreferences } from '../db/schema.js';
import { eq, and, desc } from 'drizzle-orm';
import { requireAuth, AuthenticatedRequest } from '../middleware/session.js';
import { config } from '../config.js';
@@ -17,6 +17,36 @@ import { getEligibleAgentIds } from '../profile/eligibility.js';
const router: ExpressRouter = Router();
// ---------------------------------------------------------------------------
// Fallback tips — shown when the AI service is unavailable
// ---------------------------------------------------------------------------
const FALLBACK_TIPS = [
"Take a moment to stretch and breathe — your body and mind will thank you.",
"Write down one thing you're grateful for today.",
"Drink a glass of water. Small acts of self-care add up.",
"Reach out to someone you haven't spoken to in a while.",
"Close a tab you've been meaning to close for days.",
"Step outside for five minutes, even briefly.",
"Put your phone down for the next 30 minutes and see how it feels.",
"Do the smallest possible version of a task you've been avoiding.",
"Tidy one small area — a clear space helps a clear mind.",
"Pause and ask: what would make today feel like a win?",
"Rest is productive. Give yourself permission to recharge.",
"You don't have to do everything today. Pick one thing and do it well.",
];
function randomFallbackTip(): import('@oo/shared-types').Tip {
const content = FALLBACK_TIPS[Math.floor(Math.random() * FALLBACK_TIPS.length)];
return {
id: `fallback:${nanoid()}`,
content,
source: 'fallback',
kind: 'advice',
rationale: 'AI service issues',
createdAt: new Date().toISOString(),
};
}
// ---------------------------------------------------------------------------
// Signal aggregator — register sources here as new integrations are added
// ---------------------------------------------------------------------------
@@ -46,6 +76,8 @@ async function loadOrchestratorPref<T>(userId: string, key: string): Promise<T |
try { return JSON.parse(rows[0].valueJson) as T; } catch { return undefined; }
}
type OrchestratorOutcome = { ok: true; result: OrchestratorResult } | { ok: false };
async function fetchOrchestratorTip(
userId: string,
signals: Signal[],
@@ -53,7 +85,7 @@ async function fetchOrchestratorTip(
dayOfWeek: number,
traceparent?: string,
recentTip?: string,
): Promise<OrchestratorResult | null> {
): Promise<OrchestratorOutcome> {
const [allAgentRows, eligibleIds, scienceDestiny] = await Promise.all([
getActiveAgentOutputs(userId),
getEligibleAgentIds(userId),
@@ -77,13 +109,15 @@ async function fetchOrchestratorTip(
body: JSON.stringify({ user_id: userId, agent_outputs: agentOutputs, tasks, hour_of_day: hour, day_of_week: dayOfWeek, science_destiny: scienceDestiny ?? 50, recent_tip: recentTip ?? null }),
signal: AbortSignal.timeout(15_000),
});
if (!res.ok) return null;
if (!res.ok) return { ok: false };
const data = (await res.json()) as {
tip: { id: string; content: string; rationale?: string };
model?: string;
};
const now = new Date().toISOString();
return {
ok: true,
result: {
tip: {
id: `llm:${data.tip.id}`,
content: data.tip.content,
@@ -94,9 +128,10 @@ async function fetchOrchestratorTip(
},
model: data.model ?? null,
agentIds: agentOutputs.map((a) => a.agent_id),
},
};
} catch {
return null;
return { ok: false };
}
}
@@ -109,28 +144,18 @@ router.post('/recommend', requireAuth, async (req: AuthenticatedRequest, res: Re
const dayOfWeek = new Date().getDay();
const { recent_tip: recentTip } = req.body as { recent_tip?: string };
const anyToken = await db
.select({ id: integrationTokens.id })
.from(integrationTokens)
.where(eq(integrationTokens.userId, req.userId!))
.limit(1);
if (!anyToken.length) {
res.status(422).json({ error: 'No integrations connected' });
return;
}
const signals = await aggregator.fetchAll(req.userId!);
const t0 = Date.now();
const orchestrated = await fetchOrchestratorTip(req.userId!, signals, hour, dayOfWeek, req.traceparent, recentTip);
const outcome = await fetchOrchestratorTip(req.userId!, signals, hour, dayOfWeek, req.traceparent, recentTip);
const latencyMs = Date.now() - t0;
if (!orchestrated) {
res.status(204).end();
if (!outcome.ok) {
res.json({ tip: randomFallbackTip() });
return;
}
const orchestrated = outcome.result;
const tip = orchestrated.tip;
const policy = 'orchestrator';
const servedAt = new Date().toISOString();

View File

@@ -0,0 +1,166 @@
/**
* Tests for the agent pre-compute scheduler (signals/agent-scheduler.ts).
*
* Key behaviour under test: runCycle calls getEligibleAgentIds per user and
* skips computeAndStore for agents the user hasn't consented to.
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
vi.mock('../../logger.js', () => ({
logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), fatal: vi.fn() },
}));
import { logger } from '../../logger.js';
// ── active-user query: db.selectDistinct(...).from(...).where(...) ──────────
let activeUsers: { userId: string }[] = [];
const userWhereMock = vi.fn(async () => activeUsers);
const userFromMock = vi.fn(() => ({ where: userWhereMock }));
const selectDistinctMock = vi.fn(() => ({ from: userFromMock }));
// ── purge: db.delete(...).where(...) ────────────────────────────────────────
const deleteWhereMock = vi.fn(async () => ({}));
const deleteMock = vi.fn(() => ({ where: deleteWhereMock }));
vi.mock('../../db/index.js', () => ({
db: { selectDistinct: selectDistinctMock, delete: deleteMock },
}));
vi.mock('../../db/schema.js', () => ({
agentOutputs: { expiresAt: 'expires_at' },
tipViews: { userId: 'user_id', servedAt: 'served_at' },
}));
vi.mock('drizzle-orm', () => ({
gt: vi.fn(),
lt: vi.fn(),
and: vi.fn(),
eq: vi.fn(),
isNull: vi.fn(),
}));
vi.mock('../../config.js', () => ({ config: { ML_SERVING_URL: 'http://ml' } }));
// ── computeAndStore — tracks which (user, agent) pairs were computed ────────
const computeAndStoreMock = vi.fn(async () => {});
vi.mock('../../routes/agent-outputs.js', () => ({
computeAndStore: computeAndStoreMock,
}));
// ── eligibility — replaceable per test ─────────────────────────────────────
let eligibleIds: Set<string> = new Set();
const getEligibleAgentIdsMock = vi.fn(async (_userId: string) => eligibleIds);
vi.mock('../../profile/eligibility.js', () => ({
getEligibleAgentIds: getEligibleAgentIdsMock,
}));
// ml-serving /health — return a fixed agent list
global.fetch = vi.fn(async () => ({
ok: true,
json: async () => ({ agents: ['overdue-task', 'momentum', 'time-of-day'] }),
})) as unknown as typeof fetch;
beforeEach(() => {
activeUsers = [];
eligibleIds = new Set();
computeAndStoreMock.mockClear();
getEligibleAgentIdsMock.mockClear();
userWhereMock.mockClear();
deleteWhereMock.mockClear();
vi.clearAllMocks();
vi.useFakeTimers();
// restore default mocks after clearAllMocks
userWhereMock.mockImplementation(async () => activeUsers);
getEligibleAgentIdsMock.mockImplementation(async () => eligibleIds);
computeAndStoreMock.mockResolvedValue(undefined);
deleteWhereMock.mockResolvedValue({});
global.fetch = vi.fn(async () => ({
ok: true,
json: async () => ({ agents: ['overdue-task', 'momentum', 'time-of-day'] }),
})) as unknown as typeof fetch;
});
afterEach(() => {
vi.useRealTimers();
});
describe('startAgentPrecomputeScheduler', () => {
it('skips computeAndStore for agents not in the eligibility set', async () => {
activeUsers = [{ userId: 'alice' }];
eligibleIds = new Set(['momentum']); // only momentum consented
const { startAgentPrecomputeScheduler } = await import('../agent-scheduler.js');
startAgentPrecomputeScheduler(60_000);
await vi.advanceTimersByTimeAsync(16_000);
await Promise.resolve();
const computed = computeAndStoreMock.mock.calls.map((c) => c[1]);
expect(computed).toEqual(['momentum']);
expect(computed).not.toContain('overdue-task');
expect(computed).not.toContain('time-of-day');
});
it('skips all agents when eligibility set is empty', async () => {
activeUsers = [{ userId: 'bob' }];
eligibleIds = new Set(); // no consents
const { startAgentPrecomputeScheduler } = await import('../agent-scheduler.js');
startAgentPrecomputeScheduler(60_000);
await vi.advanceTimersByTimeAsync(16_000);
await Promise.resolve();
expect(computeAndStoreMock).not.toHaveBeenCalled();
expect(logger.info).toHaveBeenCalledWith(
expect.objectContaining({ skipped: 3, ok: 0 }),
'agent-scheduler: cycle complete',
);
});
it('computes all agents when all are eligible', async () => {
activeUsers = [{ userId: 'carol' }];
eligibleIds = new Set(['overdue-task', 'momentum', 'time-of-day']);
const { startAgentPrecomputeScheduler } = await import('../agent-scheduler.js');
startAgentPrecomputeScheduler(60_000);
await vi.advanceTimersByTimeAsync(16_000);
await Promise.resolve();
expect(computeAndStoreMock).toHaveBeenCalledTimes(3);
expect(logger.info).toHaveBeenCalledWith(
expect.objectContaining({ ok: 3, skipped: 0 }),
'agent-scheduler: cycle complete',
);
});
it('skips entire user when eligibility check throws', async () => {
activeUsers = [{ userId: 'dave' }];
getEligibleAgentIdsMock.mockRejectedValueOnce(new Error('db timeout'));
const { startAgentPrecomputeScheduler } = await import('../agent-scheduler.js');
startAgentPrecomputeScheduler(60_000);
await vi.advanceTimersByTimeAsync(16_000);
await Promise.resolve();
expect(computeAndStoreMock).not.toHaveBeenCalled();
expect(logger.error).toHaveBeenCalledWith(
expect.objectContaining({ err: expect.anything(), userId: 'dave' }),
'agent-scheduler: eligibility check failed, skipping user',
);
});
it('checks eligibility independently per user', async () => {
activeUsers = [{ userId: 'u1' }, { userId: 'u2' }];
getEligibleAgentIdsMock.mockImplementation(async (userId: string) =>
userId === 'u1' ? new Set(['momentum']) : new Set(['overdue-task', 'time-of-day']),
);
const { startAgentPrecomputeScheduler } = await import('../agent-scheduler.js');
startAgentPrecomputeScheduler(60_000);
await vi.advanceTimersByTimeAsync(16_000);
await Promise.resolve();
const u1Calls = computeAndStoreMock.mock.calls.filter((c) => c[0] === 'u1').map((c) => c[1]);
const u2Calls = computeAndStoreMock.mock.calls.filter((c) => c[0] === 'u2').map((c) => c[1]);
expect(u1Calls).toEqual(['momentum']);
expect(u2Calls.sort()).toEqual(['overdue-task', 'time-of-day']);
});
});

View File

@@ -15,6 +15,7 @@ import { gt, lt } from 'drizzle-orm';
import { logger } from '../logger.js';
import { config } from '../config.js';
import { computeAndStore } from '../routes/agent-outputs.js';
import { getEligibleAgentIds } from '../profile/eligibility.js';
const FALLBACK_AGENT_IDS = [
'overdue-task',
@@ -67,8 +68,22 @@ async function runCycle(agentIds: string[]): Promise<void> {
let ok = 0;
let failed = 0;
let skipped = 0;
for (const userId of userIds) {
let eligible: Set<string>;
try {
eligible = await getEligibleAgentIds(userId);
} catch (err: any) {
logger.error({ err, userId }, 'agent-scheduler: eligibility check failed, skipping user');
skipped += agentIds.length;
continue;
}
for (const agentId of agentIds) {
if (!eligible.has(agentId)) {
skipped++;
continue;
}
try {
await computeAndStore(userId, agentId);
ok++;
@@ -86,7 +101,7 @@ async function runCycle(agentIds: string[]): Promise<void> {
}
logger.info(
{ ok, failed, users: userIds.length, agents: agentIds.length },
{ ok, failed, skipped, users: userIds.length, agents: agentIds.length },
'agent-scheduler: cycle complete',
);
}

View File

@@ -7,33 +7,20 @@ import { config } from '../config.js';
import { logger } from '../logger.js';
const CACHE_TTL_MS = 5 * 60_000;
const FIT_AGGREGATE_URL = 'https://www.googleapis.com/fitness/v1/users/me/dataset:aggregate';
const FIT_SESSIONS_URL = 'https://www.googleapis.com/fitness/v1/users/me/sessions';
const HEALTH_API_BASE = 'https://health.googleapis.com/v4/users/me/dataTypes';
const GOOGLE_TOKEN_URL = 'https://oauth2.googleapis.com/token';
const STEP_DAILY_GOAL = 7_000;
const SLEEP_GOAL_HOURS = 7;
interface FitBucket {
dataset: Array<{
dataSourceId: string;
point: Array<{ value: Array<{ intVal?: number; fpVal?: number }> }>;
}>;
// v4 DataPoint shape is a union keyed by data type; we read defensively.
interface DataPoint {
[key: string]: unknown;
}
interface FitAggregateResponse {
bucket?: FitBucket[];
}
interface FitSession {
name: string;
startTimeMillis: string;
endTimeMillis: string;
activityType: number;
}
interface FitSessionsResponse {
session?: FitSession[];
interface DataPointsResponse {
dataPoints?: DataPoint[];
nextPageToken?: string;
}
async function refreshGoogleToken(
@@ -66,81 +53,62 @@ async function refreshGoogleToken(
return data.access_token;
}
function todayMidnightMs(): number {
function todayMidnightIso(): string {
const d = new Date();
d.setHours(0, 0, 0, 0);
return d.getTime();
return d.toISOString();
}
function yesterdayIso(): string {
return new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
}
async function fetchAggregates(
async function fetchDataPoints(
token: string,
startMs: number,
endMs: number,
): Promise<FitAggregateResponse> {
const res = await fetch(FIT_AGGREGATE_URL, {
method: 'POST',
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
body: JSON.stringify({
aggregateBy: [
{ dataTypeName: 'com.google.step_count.delta' },
{ dataTypeName: 'com.google.calories.expended' },
{ dataTypeName: 'com.google.active_minutes' },
{ dataTypeName: 'com.google.heart_rate.bpm' },
],
bucketByTime: { durationMillis: endMs - startMs },
startTimeMillis: String(startMs),
endTimeMillis: String(endMs),
}),
});
if (!res.ok) throw new Error(`Fit aggregate: ${res.status}`);
return res.json() as Promise<FitAggregateResponse>;
}
async function fetchSleepSessions(token: string): Promise<FitSessionsResponse> {
const url = new URL(FIT_SESSIONS_URL);
url.searchParams.set('activityType', '72');
url.searchParams.set('startTime', yesterdayIso());
url.searchParams.set('endTime', new Date().toISOString());
dataType: string,
filter: string,
): Promise<DataPoint[]> {
const url = new URL(`${HEALTH_API_BASE}/${dataType}/dataPoints`);
url.searchParams.set('filter', filter);
const res = await fetch(url.toString(), {
headers: { Authorization: `Bearer ${token}` },
});
if (!res.ok) throw new Error(`Fit sessions: ${res.status}`);
return res.json() as Promise<FitSessionsResponse>;
if (!res.ok) throw new Error(`health ${dataType}: ${res.status}`);
const data = (await res.json()) as DataPointsResponse;
return data.dataPoints ?? [];
}
function extractMetric(
bucket: FitBucket,
dataTypeName: string,
valueKey: 'intVal' | 'fpVal',
): number {
for (const ds of bucket.dataset) {
if (!ds.dataSourceId.includes(dataTypeName.replace('com.google.', '').replace('.', '_'))) continue;
for (const pt of ds.point) {
const v = pt.value[0];
if (v) return valueKey === 'intVal' ? (v.intVal ?? 0) : (v.fpVal ?? 0);
// Defensive numeric reader — probes likely field names in a v4 DataPoint payload.
function readNumber(point: DataPoint, paths: string[][]): number {
for (const path of paths) {
let cur: unknown = point;
for (const key of path) {
if (cur && typeof cur === 'object' && key in (cur as object)) {
cur = (cur as Record<string, unknown>)[key];
} else {
cur = undefined;
break;
}
}
if (typeof cur === 'number') return cur;
}
return 0;
}
function extractAnyMetric(
bucket: FitBucket,
typeSuffix: string,
valueKey: 'intVal' | 'fpVal',
): number {
for (const ds of bucket.dataset) {
if (!ds.dataSourceId.includes(typeSuffix)) continue;
const pt = ds.point[0];
if (pt?.value[0]) {
const v = pt.value[0];
return valueKey === 'intVal' ? (v.intVal ?? 0) : (v.fpVal ?? 0);
function readString(point: DataPoint, paths: string[][]): string | undefined {
for (const path of paths) {
let cur: unknown = point;
for (const key of path) {
if (cur && typeof cur === 'object' && key in (cur as object)) {
cur = (cur as Record<string, unknown>)[key];
} else {
cur = undefined;
break;
}
}
return 0;
if (typeof cur === 'string') return cur;
}
return undefined;
}
export class GoogleHealthSignalSource implements SignalSource {
@@ -187,21 +155,35 @@ export class GoogleHealthSignalSource implements SignalSource {
}
try {
const startMs = todayMidnightMs();
const endMs = Date.now();
const dayStartIso = todayMidnightIso();
const dayEndIso = new Date().toISOString();
const yIso = yesterdayIso();
const [aggData, sleepData] = await Promise.all([
fetchAggregates(token, startMs, endMs),
fetchSleepSessions(token),
const stepsFilter = `steps.interval.start_time >= "${dayStartIso}" AND steps.interval.start_time < "${dayEndIso}"`;
const caloriesFilter = `total_calories.interval.start_time >= "${dayStartIso}" AND total_calories.interval.start_time < "${dayEndIso}"`;
const hrFilter = `heart_rate.sample_time.physical_time >= "${dayStartIso}" AND heart_rate.sample_time.physical_time < "${dayEndIso}"`;
const sleepFilter = `sleep.interval.start_time >= "${yIso}" AND sleep.interval.start_time < "${dayEndIso}"`;
const [stepsPts, caloriesPts, hrPts, sleepPts] = await Promise.all([
fetchDataPoints(token, 'steps', stepsFilter),
fetchDataPoints(token, 'total-calories', caloriesFilter),
fetchDataPoints(token, 'heart-rate', hrFilter),
fetchDataPoints(token, 'sleep', sleepFilter),
]);
const bucket = aggData.bucket?.[0];
// One-time peek at raw shape so we can refine field paths after first real OAuth.
logger.debug(
{ userId, samples: { stepsPts: stepsPts.slice(0, 1), caloriesPts: caloriesPts.slice(0, 1), hrPts: hrPts.slice(0, 1), sleepPts: sleepPts.slice(0, 1) } },
'google-health: v4 dataPoints sample',
);
const signals: Signal[] = [];
const now = new Date().toISOString();
if (bucket) {
// Steps
const steps = extractAnyMetric(bucket, 'step_count', 'intVal');
const steps = stepsPts.reduce(
(sum, p) => sum + readNumber(p, [['steps', 'count'], ['count']]),
0,
);
const stepGoalPct = Math.round((steps / STEP_DAILY_GOAL) * 100);
signals.push({
id: `google-health:steps`,
@@ -218,26 +200,31 @@ export class GoogleHealthSignalSource implements SignalSource {
timestamp: now,
});
// Calories + active minutes
const calories = Math.round(extractAnyMetric(bucket, 'calories', 'fpVal'));
const activeMinutes = extractAnyMetric(bucket, 'active_minutes', 'intVal');
const calories = Math.round(
caloriesPts.reduce(
(sum, p) =>
sum + readNumber(p, [['totalCalories', 'kilocalories'], ['kilocalories'], ['energy', 'kilocalories']]),
0,
),
);
signals.push({
id: `google-health:activity`,
source: 'google-health',
kind: 'health',
content: `${activeMinutes} active minutes, ${calories} calories burned today`,
content: `${calories} calories burned today`,
metadata: { dataType: 'activity' },
features: {
active_minutes: activeMinutes,
calories_burned: calories,
sedentary: activeMinutes < 20,
},
timestamp: now,
});
// Heart rate
const bpm = Math.round(extractAnyMetric(bucket, 'heart_rate', 'fpVal'));
if (bpm > 0) {
if (hrPts.length > 0) {
const hrValues = hrPts
.map((p) => readNumber(p, [['heartRate', 'beatsPerMinute'], ['beatsPerMinute']]))
.filter((v) => v > 0);
if (hrValues.length > 0) {
const bpm = Math.round(hrValues.reduce((a, b) => a + b, 0) / hrValues.length);
signals.push({
id: `google-health:heart_rate`,
source: 'google-health',
@@ -250,13 +237,17 @@ export class GoogleHealthSignalSource implements SignalSource {
}
}
// Sleep — find the most recent sleep session
if (sleepData.session?.length) {
const sorted = [...sleepData.session].sort(
(a, b) => Number(b.endTimeMillis) - Number(a.endTimeMillis),
);
const last = sorted[0]!;
const durationMs = Number(last.endTimeMillis) - Number(last.startTimeMillis);
if (sleepPts.length > 0) {
const sleepSessions = sleepPts
.map((p) => ({
start: readString(p, [['sleep', 'interval', 'startTime'], ['interval', 'startTime'], ['startTime']]),
end: readString(p, [['sleep', 'interval', 'endTime'], ['interval', 'endTime'], ['endTime']]),
}))
.filter((s): s is { start: string; end: string } => !!s.start && !!s.end)
.sort((a, b) => Date.parse(b.end) - Date.parse(a.end));
const last = sleepSessions[0];
if (last) {
const durationMs = Date.parse(last.end) - Date.parse(last.start);
const sleepHours = Math.round((durationMs / 3_600_000) * 10) / 10;
const belowGoal = sleepHours < SLEEP_GOAL_HOURS;
signals.push({
@@ -264,7 +255,7 @@ export class GoogleHealthSignalSource implements SignalSource {
source: 'google-health',
kind: 'health',
content: `${sleepHours}h sleep last night (${belowGoal ? 'below' : 'meets'} ${SLEEP_GOAL_HOURS}h goal)`,
metadata: { dataType: 'sleep', sessionName: last.name },
metadata: { dataType: 'sleep' },
features: {
sleep_hours: sleepHours,
sleep_goal_hours: SLEEP_GOAL_HOURS,
@@ -274,6 +265,7 @@ export class GoogleHealthSignalSource implements SignalSource {
timestamp: now,
});
}
}
this.cache.set(userId, { signals, fetchedAt: Date.now() });
bus.publish('signals.task.synced', {