diff --git a/CLAUDE.md b/CLAUDE.md index 800dbee..4fd7d50 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -67,6 +67,11 @@ docs/ architecture notes, ADRs, API specs - No secrets in repo. Local dev via `.env.local` (gitignored), prod via the server's secret store (Vaultwarden now; k8s secrets later). - Compose profiles: `core` (api + web + admin), `full` (adds ml-serving + nats), `mlops` (adds MLflow), `ai` (adds Ollama + LiteLLM). Mix as needed. Always pass `--profile ` to `build`/`up` — without a profile, no services are selected and builds silently do nothing. - Docker rebuild: use `--force-recreate` on `up` when only env vars changed (no image rebuild needed); new env vars in `.env.local` are not picked up by a running container until it is recreated. +- Docker rebuild gotchas: + - **Never run two `docker compose up --build` at once** — both grab the same `--mount=type=cache,id=pnpm` and deadlock on the API's `pnpm --prod deploy` step. Symptom: build sits silent for hours on `[api builder 8/8]`. Before starting any build, check `ps aux | grep "docker compose"` and kill any prior `up --build` (`kill -9 ` — the wrapper bash and the docker compose binary are separate PIDs; kill the docker compose one). + - **Don't add `--offline` to `pnpm --prod deploy`** — pnpm's metadata cache (`/root/.cache/pnpm/`) is not in the `/pnpm/store` cache mount, so `--offline` fails with `ERR_PNPM_NO_OFFLINE_META` for transitive devDeps (e.g. vite via vitest). Leave the deploy step network-on; it works. + - **All TS Dockerfiles need `python3 make g++`** in the base stage — `better-sqlite3` rebuilds natively on install. Missing from `Dockerfile.admin` historically caused `gyp ERR! find Python` failures. + - **A clean build of `--profile core` takes ~3 min total** when the buildx cache is warm. If it's been silent for >10 min, check for the parallel-build deadlock above before assuming "still going". - Run Python agent tests: `python3 -m pytest ml/agents/tests/ -x -q` (tests add repo root to `sys.path` themselves). - Run Python feature tests: `python3 -m pytest ml/features/ -x -q` - `ml/features/` files are Python mirrors of TS registries — TS is source of truth. Tests parse `registry.ts` with regex to detect drift; follow the same pattern whenever a new field is added to `ProfileFeature`. @@ -98,6 +103,16 @@ All `httpx` calls in `ml/` must use `trust_env=False` to bypass the system proxy MLflow container-to-container calls: always pass `host_header="localhost"` to `MLflowClient` — MLflow's `--allowed-hosts` rejects `Host: mlflow` (the container DNS name) with 403. Auth credential is `MLFLOW_ADMIN_PASSWORD`. MLflow REST API lives at the origin root (`/api/2.0/mlflow`), not under the `/mlflow` UI prefix. +MLflow from the host shell — query with curl, no script needed: +```bash +env -u HTTPS_PROXY -u HTTP_PROXY -u ALL_PROXY -u https_proxy -u http_proxy -u all_proxy \ + curl -s -H "Host: localhost" -u "admin:${MLFLOW_ADMIN_PASSWORD}" \ + -X POST http://localhost:5000/api/2.0/mlflow/runs/search \ + -H "Content-Type: application/json" \ + -d '{"experiment_ids":["3"],"max_results":1,"order_by":["start_time DESC"]}' +``` +`Host: localhost` required (no port) — `localhost:5000` fails the DNS-rebinding check. Experiment IDs: `3`=oO/serving. Artifacts stored as run tags prefixed `artifact:`. + **Multi-agent tip generation pipeline (ADR-0013):** 1. Pre-compute agents (`ml/agents//`) run on a schedule, each emitting a snippet into `agent_outputs` with a per-agent TTL 2. On request, `recommender` (TS) loads the eligible agent set (registry-driven, ADR-0014) and pulls the freshest non-expired snippets diff --git a/apps/web/src/app/config/page.tsx b/apps/web/src/app/config/page.tsx index fa147ee..03491f0 100644 --- a/apps/web/src/app/config/page.tsx +++ b/apps/web/src/app/config/page.tsx @@ -1,12 +1,27 @@ 'use client'; import { useEffect, useState, useCallback } from 'react'; -import { getVapidPublicKey, subscribePush } from '@/lib/api'; +import { getVapidPublicKey, subscribePush, getOrchestatorPrefs, updateOrchestratorPref } from '@/lib/api'; type PushState = 'idle' | 'subscribed' | 'denied'; export default function ConfigPage() { const [pushState, setPushState] = useState('idle'); + const [scienceDestiny, setScienceDestiny] = useState(50); + const [prefSaving, setPrefSaving] = useState(false); + + useEffect(() => { + getOrchestatorPrefs().then((prefs) => { + if (typeof prefs.science_destiny === 'number') setScienceDestiny(prefs.science_destiny); + }).catch(() => {}); + }, []); + + const handleScienceDestinyChange = useCallback(async (value: number) => { + setScienceDestiny(value); + setPrefSaving(true); + try { await updateOrchestratorPref('science_destiny', value); } + finally { setPrefSaving(false); } + }, []); useEffect(() => { if (typeof Notification !== 'undefined') { @@ -87,6 +102,41 @@ export default function ConfigPage() { + {/* Tip style */} +
+

+ Tip style +

+
+
+ Science + + {prefSaving ? 'saving…' : scienceDestiny === 50 ? 'balanced' : scienceDestiny < 50 ? 'data-driven' : 'intuitive'} + + Destiny +
+ handleScienceDestinyChange(Number(e.target.value))} + style={{ width: '100%', accentColor: 'var(--white)', cursor: 'pointer' }} + /> +
+ {scienceDestiny < 30 + ? 'Tips lean on patterns and data' + : scienceDestiny > 70 + ? 'Tips lean on intuition and meaning' + : 'Tips balance logic and intuition'} +
+
+
+ {/* Integrations */}

diff --git a/apps/web/src/app/tip/page.tsx b/apps/web/src/app/tip/page.tsx index 8f82390..3c71555 100644 --- a/apps/web/src/app/tip/page.tsx +++ b/apps/web/src/app/tip/page.tsx @@ -29,6 +29,7 @@ export default function TipPage() { const [visible, setVisible] = useState(false); const holdTimer = useRef | null>(null); const [pressed, setPressed] = useState(false); + const [showReasoning, setShowReasoning] = useState(false); useEffect(() => { if (state === 'loading' || state === 'done') { @@ -49,6 +50,7 @@ export default function TipPage() { return; } setTip(rec.tip); + setShowReasoning(false); setState('tip'); } catch (err: any) { console.error('[tip] loadTip error', err?.status, err?.message); @@ -235,6 +237,81 @@ export default function TipPage() { )} + {/* Reasoning overlay */} + {showReasoning && tip?.rationale && ( +
{ e.stopPropagation(); setShowReasoning(false); }} + style={{ + position: 'fixed', + inset: 0, + display: 'flex', + alignItems: 'flex-end', + justifyContent: 'center', + zIndex: 20, + padding: '0 0 5rem', + }} + > +
e.stopPropagation()} + style={{ + background: 'rgba(20,20,20,0.96)', + border: '1px solid rgba(255,255,255,0.08)', + borderRadius: '0.875rem', + padding: '1.25rem 1.5rem', + maxWidth: '360px', + width: 'calc(100% - 3rem)', + }} + > +

+ Why this tip +

+

+ {tip.rationale} +

+
+
+ )} + + {/* ? button — bottom left, shows reasoning */} + {(state === 'tip' || state === 'actions') && tip?.rationale && ( + + )} + {/* Settings gear — bottom right */} > { + const data = await apiFetch<{ prefs: Record> }>('/profile'); + return data.prefs?.orchestrator ?? {}; +} + +export async function updateOrchestratorPref(key: string, value: unknown) { + return apiFetch<{ ok: boolean }>('/profile/prefs/orchestrator', { + method: 'PATCH', + body: JSON.stringify({ [key]: value }), + }); +} diff --git a/infra/docker/Dockerfile.admin b/infra/docker/Dockerfile.admin index f1b099a..f890571 100644 --- a/infra/docker/Dockerfile.admin +++ b/infra/docker/Dockerfile.admin @@ -1,7 +1,8 @@ # syntax=docker/dockerfile:1.7 FROM node:22-slim AS base -RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates \ +RUN apt-get update && apt-get install -y --no-install-recommends \ + python3 make g++ ca-certificates \ && rm -rf /var/lib/apt/lists/* \ && npm install -g pnpm ENV CI=true \ diff --git a/infra/docker/docker-compose.yml b/infra/docker/docker-compose.yml index 629bfa1..c452a95 100644 --- a/infra/docker/docker-compose.yml +++ b/infra/docker/docker-compose.yml @@ -112,11 +112,13 @@ services: command: > mlflow server --backend-store-uri sqlite:////mlflow/mlflow.db - --default-artifact-root /mlflow/artifacts + --artifacts-destination /mlflow/artifacts + --serve-artifacts + --default-artifact-root mlflow-artifacts:/ --host 0.0.0.0 --port 5000 --static-prefix /mlflow - --allowed-hosts o.alogins.net,localhost + --allowed-hosts o.alogins.net,localhost,localhost:5000,mlflow,mlflow:5000 --cors-allowed-origins https://o.alogins.net volumes: - /mnt/ssd/dbs/oo/mlflow:/mlflow diff --git a/ml/serving/main.py b/ml/serving/main.py index a48ed86..4e68ad1 100644 --- a/ml/serving/main.py +++ b/ml/serving/main.py @@ -28,9 +28,11 @@ from fastapi import FastAPI, HTTPException, Request from pydantic import BaseModel from starlette.middleware.base import BaseHTTPMiddleware +import mlflow +from mlflow.entities import SpanType + import logging_config import nats_consumer -from mlflow_client import MLflowClient from prompts import get_prompt, build_orchestrator_messages # Make ml.agents importable regardless of working directory. @@ -83,36 +85,69 @@ LITELLM_MASTER_KEY = os.getenv("LITELLM_MASTER_KEY", "sk-oo-dev") STATE_DIR = Path(os.getenv("STATE_DIR", "/tmp/oo-serving-state")) # ── MLflow tracing (optional) ─────────────────────────────────────────────── -# Set MLFLOW_TRACKING_URI to enable. All calls are fire-and-forget; any error -# is logged at WARNING and never propagates to the caller. +# Set MLFLOW_TRACKING_URI to enable. Spans are fire-and-forget; errors are +# logged at WARNING and never propagate to the caller. +# MLflow --allowed-hosts must include "mlflow" (the container DNS name) so the +# SDK can reach the server from inside other containers. _MLFLOW_URI = os.getenv("MLFLOW_TRACKING_URI", "") -_mlflow: MLflowClient | None = ( - MLflowClient( - tracking_uri=_MLFLOW_URI, - username=os.getenv("MLFLOW_TRACKING_USERNAME", "admin"), - password=os.getenv("MLFLOW_TRACKING_PASSWORD") or os.getenv("MLFLOW_ADMIN_PASSWORD", "password"), - host_header="localhost", - ) - if _MLFLOW_URI else None -) _MLFLOW_EXP = "oO/serving" +_mlflow_exp_id: str | None = None + +if _MLFLOW_URI: + try: + mlflow.set_tracking_uri(_MLFLOW_URI) + _mlflow_exp_id = mlflow.set_experiment(_MLFLOW_EXP).experiment_id + except Exception as _exc: + log.warning("mlflow_init_failed", error=str(_exc)) -def _mlflow_run(run_name: str, params: dict, metrics: dict, tags: dict) -> None: - """Create a finished MLflow run. Silently no-ops if MLflow is not configured.""" - if _mlflow is None: +class _NoOpSpan: + """Returned when MLflow is disabled or span creation fails.""" + def set_inputs(self, *a, **k): pass + def set_outputs(self, *a, **k): pass + def set_attribute(self, *a, **k): pass + def set_attributes(self, *a, **k): pass + def end(self, *a, **k): pass + + +_NOOP = _NoOpSpan() + + +def _start_span(name: str, span_type: str, *, parent=_NOOP, inputs=None): + """Start an MLflow span. Returns _NOOP on failure or when tracing is off. + + experiment_id is only passed for root spans (no parent) — passing it to + child spans causes the SDK to fail with '_Span has no attribute _span'. + """ + if _mlflow_exp_id is None: + return _NOOP + try: + kw: dict = {"span_type": span_type} + if isinstance(parent, _NoOpSpan): + kw["experiment_id"] = _mlflow_exp_id # root span only + else: + kw["parent_span"] = parent + if inputs is not None: + kw["inputs"] = inputs + return mlflow.start_span_no_context(name, **kw) + except Exception as exc: # noqa: BLE001 + log.warning("mlflow_span_start_failed", name=name, error=str(exc)) + return _NOOP + + +def _end_span(span, *, status: str = "OK", outputs=None, attributes: dict | None = None) -> None: + """End a span safely, ignoring _NoOpSpan and swallowing exceptions.""" + if isinstance(span, _NoOpSpan): return try: - exp_id = _mlflow.get_or_create_experiment(_MLFLOW_EXP) - run_id = _mlflow.create_run(exp_id, run_name, tags={"source": "ml-serving"}) - _mlflow.log_params(run_id, {k: str(v)[:250] for k, v in params.items()}) - _mlflow.log_metrics(run_id, metrics) - for k, v in tags.items(): - _mlflow.log_text(run_id, str(v), k) - _mlflow.end_run(run_id) + if attributes: + span.set_attributes(attributes) + span.end(status=status, outputs=outputs) except Exception as exc: # noqa: BLE001 - log.warning("mlflow_log_failed", error=str(exc)) + log.warning("mlflow_span_end_failed", error=str(exc)) + + STATE_DIR.mkdir(parents=True, exist_ok=True) @@ -197,6 +232,7 @@ class RecommendRequest(BaseModel): tasks: list[dict] = [] hour_of_day: int = 12 day_of_week: int = 0 + science_destiny: int = 50 # 0=science (data-driven), 100=destiny (intuitive) class TipResult(BaseModel): @@ -285,12 +321,15 @@ async def compute_agent(agent_id: str, req: AgentComputeRequest) -> AgentCompute raise HTTPException(status_code=500, detail=f"Agent compute failed: {exc}") log.info("agent_computed", agent_id=agent_id, user_id=req.user_id, expires_at=output.expires_at) - _mlflow_run( - run_name=f"compute/{agent_id}", - params={"agent_id": agent_id, "user_id": req.user_id, "agent_version": output.agent_version}, - metrics={"task_count": len(req.tasks), "feedback_count": len(req.feedback_history)}, - tags={"prompt_text": output.prompt_text, "signals_snapshot": json.dumps(output.signals_snapshot)}, + span = _start_span( + f"compute:{agent_id}", + SpanType.AGENT, + inputs={"user_id": req.user_id, "agent_id": agent_id, + "task_count": len(req.tasks), "feedback_count": len(req.feedback_history)}, ) + _end_span(span, + outputs={"prompt_text": output.prompt_text, "signals_snapshot": output.signals_snapshot}, + attributes={"agent_version": output.agent_version, "expires_at": output.expires_at}) return AgentComputeResponse( user_id=output.user_id, agent_id=output.agent_id, @@ -347,12 +386,15 @@ async def infer_agent(agent_id: str, req: AgentInferRequest) -> AgentInferRespon history_len=len(events), latency_ms=latency_ms, ) - _mlflow_run( - run_name=f"infer/{agent_id}", - params={"agent_id": agent_id, "user_id": req.user_id}, - metrics={"latency_ms": latency_ms, "history_len": len(events), "n_params": len(inferred)}, - tags={"inferred_prefs": json.dumps(inferred)}, + span = _start_span( + f"infer:{agent_id}", + SpanType.CHAIN, + inputs={"user_id": req.user_id, "agent_id": agent_id, + "history_len": len(events), "completion_count": len(completions)}, ) + _end_span(span, + outputs={"inferred_prefs": inferred}, + attributes={"latency_ms": str(latency_ms), "n_params": str(len(inferred))}) return AgentInferResponse(user_id=req.user_id, agent_id=agent_id, inferred_prefs=inferred) @@ -364,99 +406,132 @@ async def recommend(req: RecommendRequest) -> RecommendResponse: the fresh rows from agent_outputs table (fetched by the TypeScript recommender before calling this endpoint). Falls back to raw task context if empty. """ - t0_recommend = time.monotonic() - messages = build_orchestrator_messages( - agent_outputs=[s.model_dump() for s in req.agent_outputs], - tasks=req.tasks, - hour_of_day=req.hour_of_day, - day_of_week=req.day_of_week, - ) - headers = {"Authorization": f"Bearer {LITELLM_MASTER_KEY}"} - last_raw = "" - last_parse_error = "" - total_usage: dict = {"prompt_tokens": 0, "completion_tokens": 0} - model_used = "tip-generator" + t0 = time.monotonic() - async with httpx.AsyncClient(timeout=30.0) as client: - for _attempt in range(1 + _MAX_GENERATE_RETRIES): - payload = {"model": "tip-generator", "messages": messages, "temperature": 0.7} - try: - resp = await client.post( - f"{LITELLM_URL}/chat/completions", json=payload, headers=headers - ) - resp.raise_for_status() - except httpx.HTTPStatusError as e: - raise HTTPException(status_code=502, detail=f"LiteLLM error: {e.response.text}") - except httpx.RequestError as e: - raise HTTPException(status_code=503, detail=f"LiteLLM unreachable: {e}") + # ── root span ────────────────────────────────────────────────────────── + root = _start_span("recommend", SpanType.CHAIN, inputs={ + "user_id": req.user_id, + "agent_ids": [s.agent_id for s in req.agent_outputs], + "hour_of_day": req.hour_of_day, + "day_of_week": req.day_of_week, + "science_destiny": req.science_destiny, + }) - data = resp.json() - usage = data.get("usage", {}) - total_usage["prompt_tokens"] += usage.get("prompt_tokens", 0) - total_usage["completion_tokens"] += usage.get("completion_tokens", 0) - model_used = data.get("model", "tip-generator") - last_raw = data["choices"][0]["message"]["content"] - - try: - text = last_raw.strip() - if text.startswith("```"): - parts = text.split("```") - text = parts[1] if len(parts) > 1 else text - if text.startswith("json"): - text = text[4:] - parsed = json.loads(text) - item: dict = parsed[0] if isinstance(parsed, list) else parsed - break - except (json.JSONDecodeError, ValueError, IndexError) as exc: - last_parse_error = str(exc) - messages.append({"role": "assistant", "content": last_raw}) - messages.append({"role": "user", "content": _RETRY_SUFFIX_OBJ}) - else: - raise HTTPException( - status_code=502, - detail=f"LLM returned invalid JSON after {_MAX_GENERATE_RETRIES} retries: " - f"{last_parse_error}\n{last_raw[:200]}", - ) - - tip = TipResult( - id=item.get("id", f"tip-{req.user_id[:8]}"), - content=item.get("content", ""), - rationale=item.get("rationale"), - ) - latency_ms_recommend = round((time.monotonic() - t0_recommend) * 1000, 1) - log.info( - "recommend_served", - user_id=req.user_id, - agent_count=len(req.agent_outputs), - tip_id=tip.id, - ) - _mlflow_run( - run_name="recommend", - params={ - "user_id": req.user_id, - "agent_ids": ",".join(s.agent_id for s in req.agent_outputs), - "model": model_used, - "hour_of_day": req.hour_of_day, - "day_of_week": req.day_of_week, - }, - metrics={ - "prompt_tokens": total_usage["prompt_tokens"], - "completion_tokens": total_usage["completion_tokens"], + try: + # ── build_context span ───────────────────────────────────────────── + ctx_span = _start_span("build_context", SpanType.TOOL, parent=root, inputs={ "agent_count": len(req.agent_outputs), - "latency_ms": latency_ms_recommend, - }, - tags={ - "prompt_messages": json.dumps(messages), - "tip_content": tip.content, - "tip_rationale": tip.rationale or "", - }, - ) - return RecommendResponse( - tip=tip, - model=model_used, - prompt_tokens=total_usage["prompt_tokens"], - completion_tokens=total_usage["completion_tokens"], - ) + "task_count": len(req.tasks), + "science_destiny": req.science_destiny, + }) + messages = build_orchestrator_messages( + agent_outputs=[s.model_dump() for s in req.agent_outputs], + tasks=req.tasks, + hour_of_day=req.hour_of_day, + day_of_week=req.day_of_week, + science_destiny=req.science_destiny, + ) + _end_span(ctx_span, outputs={"message_count": len(messages)}) + + # ── one span per pre-computed agent snippet ──────────────────────── + for snippet in req.agent_outputs: + a_span = _start_span( + f"agent:{snippet.agent_id}", SpanType.AGENT, parent=root, + inputs={"agent_id": snippet.agent_id}, + ) + _end_span(a_span, outputs={"prompt_text": snippet.prompt_text}) + + # ── LLM orchestrator span (wraps retry loop) ─────────────────────── + llm_span = _start_span("llm_orchestrator", SpanType.LLM, parent=root, inputs={ + "messages": messages, + "model": "tip-generator", + "temperature": 0.7, + }) + + headers = {"Authorization": f"Bearer {LITELLM_MASTER_KEY}"} + last_raw = "" + last_parse_error = "" + total_usage: dict = {"prompt_tokens": 0, "completion_tokens": 0} + model_used = "tip-generator" + _attempt = 0 + + async with httpx.AsyncClient(timeout=30.0) as client: + for _attempt in range(1 + _MAX_GENERATE_RETRIES): + payload = {"model": "tip-generator", "messages": messages, "temperature": 0.7} + try: + resp = await client.post( + f"{LITELLM_URL}/chat/completions", json=payload, headers=headers + ) + resp.raise_for_status() + except httpx.HTTPStatusError as e: + _end_span(llm_span, status="ERROR") + _end_span(root, status="ERROR") + raise HTTPException(status_code=502, detail=f"LiteLLM error: {e.response.text}") + except httpx.RequestError as e: + _end_span(llm_span, status="ERROR") + _end_span(root, status="ERROR") + raise HTTPException(status_code=503, detail=f"LiteLLM unreachable: {e}") + + data = resp.json() + usage = data.get("usage", {}) + total_usage["prompt_tokens"] += usage.get("prompt_tokens", 0) + total_usage["completion_tokens"] += usage.get("completion_tokens", 0) + model_used = data.get("model", "tip-generator") + last_raw = data["choices"][0]["message"]["content"] + + try: + text = last_raw.strip() + if text.startswith("```"): + parts = text.split("```") + text = parts[1] if len(parts) > 1 else text + if text.startswith("json"): + text = text[4:] + parsed = json.loads(text) + item: dict = parsed[0] if isinstance(parsed, list) else parsed + break + except (json.JSONDecodeError, ValueError, IndexError) as exc: + last_parse_error = str(exc) + messages.append({"role": "assistant", "content": last_raw}) + messages.append({"role": "user", "content": _RETRY_SUFFIX_OBJ}) + else: + _end_span(llm_span, status="ERROR") + _end_span(root, status="ERROR") + raise HTTPException( + status_code=502, + detail=f"LLM returned invalid JSON after {_MAX_GENERATE_RETRIES} retries: " + f"{last_parse_error}\n{last_raw[:200]}", + ) + + tip = TipResult( + id=item.get("id", f"tip-{req.user_id[:8]}"), + content=item.get("content", ""), + rationale=item.get("rationale"), + ) + _end_span(llm_span, outputs={"content": tip.content, "rationale": tip.rationale or ""}, + attributes={ + "prompt_tokens": str(total_usage["prompt_tokens"]), + "completion_tokens": str(total_usage["completion_tokens"]), + "model": model_used, + "attempts": str(_attempt + 1), + }) + + latency_ms = round((time.monotonic() - t0) * 1000, 1) + log.info("recommend_served", user_id=req.user_id, agent_count=len(req.agent_outputs), tip_id=tip.id) + _end_span(root, outputs={"tip_id": tip.id, "content": tip.content, "rationale": tip.rationale or ""}, + attributes={"latency_ms": str(latency_ms), "agent_count": str(len(req.agent_outputs))}) + + return RecommendResponse( + tip=tip, + model=model_used, + prompt_tokens=total_usage["prompt_tokens"], + completion_tokens=total_usage["completion_tokens"], + ) + + except HTTPException: + raise + except Exception: + _end_span(root, status="ERROR") + raise _MAX_GENERATE_RETRIES = 2 diff --git a/ml/serving/prompts.py b/ml/serving/prompts.py index 2c78aab..3342c04 100644 --- a/ml/serving/prompts.py +++ b/ml/serving/prompts.py @@ -124,17 +124,52 @@ _SYS_V4_ORCHESTRATOR = ( ) +def _science_destiny_instruction(science_destiny: int) -> str: + """Translate 0-100 slider into a prompt instruction. + + 0 = pure science: prioritise patterns, data, measurable progress. + 100 = pure destiny: prioritise meaning, intuition, deeper purpose. + 50 = balanced (no extra instruction injected). + """ + if science_destiny <= 20: + return ( + "The user strongly prefers data-driven advice. " + "Ground every tip in observable patterns, streaks, or measurable progress. " + "Avoid abstract or motivational language." + ) + if science_destiny <= 40: + return ( + "The user leans toward evidence-based guidance. " + "Anchor tips in patterns and metrics where possible." + ) + if science_destiny >= 80: + return ( + "The user strongly believes in intuition and meaning. " + "Frame tips around purpose, values, and deeper intention rather than metrics." + ) + if science_destiny >= 60: + return ( + "The user leans toward intuitive, meaning-driven advice. " + "Weave in purpose and intention alongside practicality." + ) + return "" # balanced — no extra instruction + + def build_orchestrator_messages( agent_outputs: list[dict], tasks: list[dict], hour_of_day: int, day_of_week: int, + science_destiny: int = 50, ) -> list[dict]: """Build the [system, user] message list for the orchestrator LLM call. agent_outputs: list of {agent_id, prompt_text} dicts. Falls back to raw task summary when agent_outputs is empty. """ + style_hint = _science_destiny_instruction(science_destiny) + system = _SYS_V4_ORCHESTRATOR + (f"\n\n{style_hint}" if style_hint else "") + lines = [f"Current time: {hour_of_day:02d}:00, day_of_week={day_of_week}", ""] if agent_outputs: lines.append("Context from analysis agents:") @@ -150,7 +185,7 @@ def build_orchestrator_messages( lines.append(f" - {t.get('content', '?')}") lines.append("\nGenerate one tip as a JSON object. Write the tip content in English only.") return [ - {"role": "system", "content": _SYS_V4_ORCHESTRATOR}, + {"role": "system", "content": system}, {"role": "user", "content": "\n".join(lines)}, ] diff --git a/ml/serving/requirements.txt b/ml/serving/requirements.txt index 4d142cc..00253e8 100644 --- a/ml/serving/requirements.txt +++ b/ml/serving/requirements.txt @@ -7,3 +7,4 @@ anthropic>=0.40.0 nats-py>=2.9.0 structlog>=24.1.0 sentry-sdk>=2.0.0 +mlflow-skinny>=3.1.0 diff --git a/services/api/src/profile/__tests__/eligibility.test.ts b/services/api/src/profile/__tests__/eligibility.test.ts index 34fd74a..246553a 100644 --- a/services/api/src/profile/__tests__/eligibility.test.ts +++ b/services/api/src/profile/__tests__/eligibility.test.ts @@ -35,7 +35,7 @@ const AGENT_C = { ...MANIFEST_DEFAULTS, id: 'agent-c', required_consents: ['data beforeAll(async () => { await testDb.insert(users).values({ id: 'u1', email: 'u@test.com', name: null, image: null, role: 'user', - consentGiven: false, createdAt: NOW, + createdAt: NOW, }); }); diff --git a/services/api/src/routes/__tests__/recommender.test.ts b/services/api/src/routes/__tests__/recommender.test.ts index 27d44e2..e3923fc 100644 --- a/services/api/src/routes/__tests__/recommender.test.ts +++ b/services/api/src/routes/__tests__/recommender.test.ts @@ -213,7 +213,7 @@ describe('POST /recommend integration', () => { }); // Intercept the /recommend body to inspect what agent_outputs were sent - const origFetch = globalThis.fetch as ReturnType; + const origFetch = globalThis.fetch as unknown as (url: string, init?: RequestInit) => Promise; const wrappedFetch = vi.fn().mockImplementation(async (url: string, init?: RequestInit) => { if (String(url).includes('/recommend') && init?.body) { const body = JSON.parse(init.body as string); diff --git a/services/api/src/routes/agent-outputs.ts b/services/api/src/routes/agent-outputs.ts index 273e694..3f90578 100644 --- a/services/api/src/routes/agent-outputs.ts +++ b/services/api/src/routes/agent-outputs.ts @@ -166,7 +166,7 @@ export async function computeAndStore(userId: string, agentId: string): Promise< method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ user_id: userId, tasks, profile, feedback_history: feedbackHistory, agent_prefs: agentPrefs }), - signal: AbortSignal.timeout(15_000), + signal: AbortSignal.timeout(60_000), }); if (!mlResp.ok) { diff --git a/services/api/src/routes/recommender.ts b/services/api/src/routes/recommender.ts index 3dd0f39..e5c6c1f 100644 --- a/services/api/src/routes/recommender.ts +++ b/services/api/src/routes/recommender.ts @@ -2,7 +2,7 @@ import { type Router as ExpressRouter, Router, Response } from 'express'; import { nanoid } from 'nanoid'; import { logger } from '../logger.js'; import { db } from '../db/index.js'; -import { integrationTokens, tipFeedback, tipViews, tipScores } from '../db/schema.js'; +import { integrationTokens, tipFeedback, tipViews, tipScores, userPreferences } from '../db/schema.js'; import { eq, and, desc } from 'drizzle-orm'; import { requireAuth, AuthenticatedRequest } from '../middleware/session.js'; import { config } from '../config.js'; @@ -52,6 +52,16 @@ interface OrchestratorResult { agentIds: string[]; } +async function loadOrchestratorPref(userId: string, key: string): Promise { + const rows = await db + .select({ valueJson: userPreferences.valueJson }) + .from(userPreferences) + .where(and(eq(userPreferences.userId, userId), eq(userPreferences.scope, 'orchestrator'), eq(userPreferences.key, key))) + .limit(1); + if (!rows.length) return undefined; + try { return JSON.parse(rows[0].valueJson) as T; } catch { return undefined; } +} + async function fetchOrchestratorTip( userId: string, signals: Signal[], @@ -59,9 +69,10 @@ async function fetchOrchestratorTip( dayOfWeek: number, traceparent?: string, ): Promise { - const [allAgentRows, eligibleIds] = await Promise.all([ + const [allAgentRows, eligibleIds, scienceDestiny] = await Promise.all([ getActiveAgentOutputs(userId), getEligibleAgentIds(userId), + loadOrchestratorPref(userId, 'science_destiny'), ]); const agentOutputs = allAgentRows .filter((r) => eligibleIds.has(r.agentId)) @@ -78,7 +89,7 @@ async function fetchOrchestratorTip( const res = await fetch(`${config.ML_SERVING_URL}/recommend`, { method: 'POST', headers: { 'Content-Type': 'application/json', ...(traceparent ? { traceparent } : {}) }, - body: JSON.stringify({ user_id: userId, agent_outputs: agentOutputs, tasks, hour_of_day: hour, day_of_week: dayOfWeek }), + body: JSON.stringify({ user_id: userId, agent_outputs: agentOutputs, tasks, hour_of_day: hour, day_of_week: dayOfWeek, science_destiny: scienceDestiny ?? 50 }), signal: AbortSignal.timeout(15_000), }); if (!res.ok) return null; diff --git a/services/api/src/signals/agent-scheduler.ts b/services/api/src/signals/agent-scheduler.ts index 36c35a8..42c6db3 100644 --- a/services/api/src/signals/agent-scheduler.ts +++ b/services/api/src/signals/agent-scheduler.ts @@ -68,14 +68,13 @@ async function runCycle(agentIds: string[]): Promise { let failed = 0; for (const userId of userIds) { - const results = await Promise.allSettled( - agentIds.map((agentId) => computeAndStore(userId, agentId)), - ); - for (const r of results) { - if (r.status === 'fulfilled') ok++; - else { + for (const agentId of agentIds) { + try { + await computeAndStore(userId, agentId); + ok++; + } catch (err: any) { failed++; - logger.error({ err: r.reason, userId }, 'agent-scheduler: compute error'); + logger.error({ err, userId, agentId }, 'agent-scheduler: compute error'); } } }