From 1f5e27260086d191eb4937181bdd757ee3d0178a Mon Sep 17 00:00:00 2001 From: Alvis Date: Tue, 24 Mar 2026 02:14:13 +0000 Subject: [PATCH] Switch from Bifrost to LiteLLM; add Matrix channel; update rules MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Infrastructure: - docker-compose.yml: replace bifrost container with LiteLLM proxy (host.docker.internal:4000); complex model → deepseek-r1:free via OpenRouter; add Matrix URL env var; mount logs volume - bifrost-config.json: add auth_config + postgres config_store (archived) Routing: - router.py: full semantic 3-tier classifier rewrite — nomic-embed-text centroids for light/medium/complex; regex pre-classifiers for all tiers; Russian utterance sets expanded - agent.py: wire LiteLLM URL; add dry_run support; add Matrix channel Channels: - channels.py: add Matrix adapter (_matrix_send via mx- session prefix) Rules / docs: - agent-pipeline.md: remove /think prefix requirement; document automatic complex tier classification - llm-inference.md: update BIFROST_URL → LITELLM_URL references; add remote model note for complex tier - ARCHITECTURE.md: deleted (superseded by README.md) Co-Authored-By: Claude Sonnet 4.6 --- .claude/rules/agent-pipeline.md | 6 +- .claude/rules/llm-inference.md | 9 +- ARCHITECTURE.md | 201 --------------- agent.py | 421 +++++++++++++++++++++--------- bifrost-config.json | 17 ++ channels.py | 17 +- docker-compose.yml | 26 +- router.py | 439 +++++++++++++++++++++++++++----- 8 files changed, 730 insertions(+), 406 deletions(-) delete mode 100644 ARCHITECTURE.md diff --git a/.claude/rules/agent-pipeline.md b/.claude/rules/agent-pipeline.md index 9001d39..f1d0c44 100644 --- a/.claude/rules/agent-pipeline.md +++ b/.claude/rules/agent-pipeline.md @@ -1,9 +1,11 @@ # Agent Pipeline Rules ## Tiers -- Complex tier requires `/think ` prefix. Any LLM classification of "complex" is downgraded to medium. Do not change this. -- Medium is the default tier. Light is only for trivial static-knowledge queries matched by regex or LLM. +- Routing is fully automatic: router classifies into light/medium/complex via 3-way embedding similarity. +- Complex tier is reached automatically for deep research queries — no prefix required. +- Medium is the default tier. Light is only for trivial static-knowledge queries matched by regex or embedding. - Light tier upgrade to medium is automatic when URL content is pre-fetched or a fast tool matches. +- `tier_override` API parameter still allows callers to force a specific tier (e.g. `adolf-deep` model → complex). ## Medium agent - `_DirectModel` makes a single `ainvoke()` call with no tool schema. Do not add tools to the medium agent. diff --git a/.claude/rules/llm-inference.md b/.claude/rules/llm-inference.md index e75bfa0..87a8afb 100644 --- a/.claude/rules/llm-inference.md +++ b/.claude/rules/llm-inference.md @@ -1,7 +1,8 @@ # LLM Inference Rules -- All LLM calls must use `base_url=BIFROST_URL` with model name `ollama/`. Never call Ollama directly for inference. +- All LLM calls must use `base_url=LITELLM_URL` (points to LiteLLM at `host.docker.internal:4000/v1`). Never call Ollama directly for inference. - `_reply_semaphore` (asyncio.Semaphore(1)) serializes all GPU inference. Never bypass it or add a second semaphore. -- Model names in code always use the `ollama/` prefix: `ollama/qwen3:4b`, `ollama/qwen3:8b`, `ollama/qwen2.5:1.5b`. -- Timeout values: router=30s, medium=180s, complex=600s. Do not reduce them — GPU inference under load is slow. -- `VRAMManager` is the only component that contacts Ollama directly (for flush/prewarm/poll). This is intentional — Bifrost cannot manage VRAM. +- Local Ollama models use the `ollama/` prefix: `ollama/qwen3:4b`, `ollama/qwen2.5:1.5b`. Remote models (e.g. OpenRouter) use their full LiteLLM name: `openrouter/deepseek-r1`. +- Timeout values: router=30s, medium=180s, complex=600s. Do not reduce them. +- `VRAMManager` is the only component that contacts Ollama directly (for flush/prewarm/poll). This is intentional — LiteLLM cannot manage VRAM. +- Complex tier uses a remote model (`DEEPAGENTS_COMPLEX_MODEL`) — no VRAM management is needed for it. diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md deleted file mode 100644 index 34f1cbf..0000000 --- a/ARCHITECTURE.md +++ /dev/null @@ -1,201 +0,0 @@ -# Adolf - -Autonomous personal assistant with a multi-channel gateway. Three-tier model routing with GPU VRAM management. - -## Architecture - -``` -┌─────────────────────────────────────────────────────┐ -│ CHANNEL ADAPTERS │ -│ │ -│ [Telegram/Grammy] [CLI] [Voice — future] │ -│ ↕ ↕ ↕ │ -│ └────────────────┴────────────┘ │ -│ ↕ │ -│ ┌─────────────────────────┐ │ -│ │ GATEWAY (agent.py) │ │ -│ │ FastAPI :8000 │ │ -│ │ │ │ -│ │ POST /message │ ← all inbound │ -│ │ POST /chat (legacy) │ │ -│ │ GET /stream/{id} SSE │ ← token stream│ -│ │ GET /reply/{id} SSE │ ← legacy poll │ -│ │ GET /health │ │ -│ │ │ │ -│ │ channels.py registry │ │ -│ │ conversation buffers │ │ -│ └──────────┬──────────────┘ │ -│ ↓ │ -│ ┌──────────────────────┐ │ -│ │ AGENT CORE │ │ -│ │ three-tier routing │ │ -│ │ VRAM management │ │ -│ └──────────────────────┘ │ -│ ↓ │ -│ channels.deliver(session_id, channel, text)│ -│ ↓ ↓ │ -│ telegram → POST grammy/send cli → SSE queue │ -└─────────────────────────────────────────────────────┘ -``` - -## Channel Adapters - -| Channel | session_id | Inbound | Outbound | -|---------|-----------|---------|---------| -| Telegram | `tg-` | Grammy long-poll → POST /message | channels.py → POST grammy:3001/send | -| CLI | `cli-` | POST /message directly | GET /stream/{id} SSE — Rich Live streaming | -| Voice | `voice-` | (future) | (future) | - -## Unified Message Flow - -``` -1. Channel adapter receives message -2. POST /message {text, session_id, channel, user_id} -3. 202 Accepted immediately -4. Background: run_agent_task(message, session_id, channel) -5. Parallel IO (asyncio.gather): - a. _fetch_urls_from_message() — Crawl4AI fetches any URLs in message - b. _retrieve_memories() — openmemory semantic search for context - c. _fast_tool_runner.run_matching() — FastTools (weather, commute) if pattern matches -6. router.route() with enriched history (url_context + fast_context + memories) - - fast tool match → force medium (real-time data, no point routing to light) - - if URL content fetched and tier=light → upgrade to medium -7. Invoke agent for tier with url_context + memories in system prompt -8. Token streaming: - - medium: astream() pushes per-token chunks to _stream_queues[session_id]; blocks filtered in real time - - light/complex: full reply pushed as single chunk after completion - - _end_stream() sends [DONE] sentinel -9. channels.deliver(session_id, channel, reply_text) — Telegram callback -10. _store_memory() background task — stores turn in openmemory -11. GET /stream/{session_id} SSE clients receive chunks; CLI renders with Rich Live + final Markdown -``` - -## Tool Handling - -Adolf uses LangChain's tool interface but only the complex agent actually invokes tools at runtime. - -**Complex agent (`/think` prefix):** `web_search` and `fetch_url` are defined as `langchain_core.tools.Tool` objects and passed to `create_deep_agent()`. The deepagents library runs an agentic loop (LangGraph `create_react_agent` under the hood) that sends the tool schema to the model via OpenAI function-calling format and handles tool dispatch. - -**Medium agent (default):** `_DirectModel` makes a single `model.ainvoke(messages)` call with no tool schema. Context (memories, fetched URL content) is injected via the system prompt instead. This is intentional — `qwen3:4b` behaves unreliably when a tool array is present. - -**Memory tools (out-of-loop):** `add_memory` and `search_memory` are LangChain MCP tool objects (via `langchain_mcp_adapters`) but are excluded from both agents' tool lists. They are called directly — `await _memory_add_tool.ainvoke(...)` — outside the agent loop, before and after each turn. - -## Three-Tier Model Routing - -| Tier | Model | Agent | Trigger | Latency | -|------|-------|-------|---------|---------| -| Light | `qwen2.5:1.5b` (router answers directly) | — | Regex pre-match or LLM classifies "light" | ~2–4s | -| Medium | `qwen3:4b` (`DEEPAGENTS_MODEL`) | `_DirectModel` — single LLM call, no tools | Default; also forced when message contains URLs | ~10–20s | -| Complex | `qwen3:8b` (`DEEPAGENTS_COMPLEX_MODEL`) | `create_deep_agent` — agentic loop with tools | `/think` prefix only | ~60–120s | - -**`/think` prefix**: forces complex tier, stripped before sending to agent. - -Complex tier is locked out unless the message starts with `/think` — any LLM classification of "complex" is downgraded to medium. - -## Fast Tools (`fast_tools.py`) - -Pre-flight tools that run concurrently with URL fetch and memory retrieval before any LLM call. Each tool has two methods: -- `matches(message) → bool` — regex classifier; also used by `Router` to force medium tier -- `run(message) → str` — async fetch returning a context block injected into system prompt - -`FastToolRunner` holds all tools. `any_matches()` is called by the Router at step 0a; `run_matching()` is called in the pre-flight `asyncio.gather` in `run_agent_task()`. - -| Tool | Pattern | Source | Context returned | -|------|---------|--------|-----------------| -| `WeatherTool` | weather/forecast/temperature/snow/rain | SearXNG `"погода Балашиха сейчас"` | Current conditions in °C from Russian weather sites | -| `CommuteTool` | commute/traffic/arrival/пробки | `routecheck:8090/api/route` (Yandex Routing API) | Drive time with/without traffic, Balashikha→Moscow | - -**To add a new fast tool:** subclass `FastTool` in `fast_tools.py`, implement `name`/`matches`/`run`, add an instance to `_fast_tool_runner` in `agent.py`. - -## routecheck Service (`routecheck/`) - -Local web service on port 8090. Exists because Yandex Routing API free tier requires a web UI that uses the API. - -**Web UI** (`http://localhost:8090`): PIL-generated arithmetic captcha → lat/lon form → travel time result. - -**Internal API**: `GET /api/route?from=lat,lon&to=lat,lon&token=ROUTECHECK_TOKEN` — bypasses captcha, used by `CommuteTool`. The `ROUTECHECK_TOKEN` shared secret is set in `.env` and passed to both `routecheck` and `deepagents` containers. - -Yandex API calls are routed through the host HTTPS proxy (`host.docker.internal:56928`) since the container has no direct external internet access. - -**Requires** `.env`: `YANDEX_ROUTING_KEY` (free from `developer.tech.yandex.ru`) + `ROUTECHECK_TOKEN`. - -## Crawl4AI Integration - -Crawl4AI runs as a Docker service (`crawl4ai:11235`) providing JS-rendered, bot-bypass page fetching. - -**Pre-routing fetch (all tiers):** -- `_URL_RE` detects `https?://` URLs in any incoming message -- `_crawl4ai_fetch_async()` uses `httpx.AsyncClient` to POST `{urls: [...]}` to `/crawl` -- Up to 3 URLs fetched concurrently via `asyncio.gather` -- Fetched content (up to 3000 chars/URL) injected as a system context block into enriched history before routing and into medium/complex system prompts -- If fetch succeeds and router returns light → tier upgraded to medium - -**Complex agent tools:** -- `web_search`: SearXNG query + Crawl4AI auto-fetch of top 2 result URLs → combined snippet + page text -- `fetch_url`: Crawl4AI single-URL fetch for any specific URL - -## Memory Pipeline - -openmemory runs as a FastMCP server (`openmemory:8765`) backed by mem0 + Qdrant + nomic-embed-text. - -**Retrieval (before routing):** `_retrieve_memories()` calls `search_memory` MCP tool with the user message as query. Results (threshold ≥ 0.5) are prepended to enriched history so all tiers benefit. - -**Storage (after reply):** `_store_memory()` runs as an asyncio background task, calling `add_memory` with `"User: ...\nAssistant: ..."`. The extraction LLM (`qwen2.5:1.5b` on GPU Ollama) pulls facts; dedup is handled by mem0's update prompt. - -Memory tools (`add_memory`, `search_memory`, `get_all_memories`) are excluded from agent tool lists — memory management happens outside the agent loop. - -## VRAM Management - -GTX 1070 — 8 GB. Ollama must be restarted if CUDA init fails (model loads on CPU). - -1. Flush explicitly before loading qwen3:8b (`keep_alive=0`) -2. Verify eviction via `/api/ps` poll (15s timeout) before proceeding -3. Fallback: timeout → run medium agent instead -4. Post-complex: flush 8b, pre-warm medium + router - -## Session ID Convention - -- Telegram: `tg-` (e.g. `tg-346967270`) -- CLI: `cli-` (e.g. `cli-alvis`) - -Conversation history is keyed by session_id (5-turn buffer). - -## Files - -``` -adolf/ -├── docker-compose.yml Services: bifrost, deepagents, openmemory, grammy, crawl4ai, routecheck, cli -├── Dockerfile deepagents container (Python 3.12) -├── Dockerfile.cli CLI container (python:3.12-slim + rich) -├── agent.py FastAPI gateway, run_agent_task, Crawl4AI pre-fetch, fast tools, memory pipeline -├── fast_tools.py FastTool base, FastToolRunner, WeatherTool, CommuteTool -├── channels.py Channel registry + deliver() + pending_replies -├── router.py Router class — regex + LLM tier classification, FastToolRunner integration -├── vram_manager.py VRAMManager — flush/prewarm/poll Ollama VRAM -├── agent_factory.py _DirectModel (medium) / create_deep_agent (complex) -├── cli.py Interactive CLI REPL — Rich Live streaming + Markdown render -├── wiki_research.py Batch wiki research pipeline (uses /message + SSE) -├── .env TELEGRAM_BOT_TOKEN, ROUTECHECK_TOKEN, YANDEX_ROUTING_KEY (not committed) -├── routecheck/ -│ ├── app.py FastAPI: image captcha + /api/route Yandex proxy -│ └── Dockerfile -├── tests/ -│ ├── integration/ Standalone integration test scripts (common.py + test_*.py) -│ └── use_cases/ Claude Code skill markdown files — Claude acts as user + evaluator -├── openmemory/ -│ ├── server.py FastMCP + mem0: add_memory, search_memory, get_all_memories -│ └── Dockerfile -└── grammy/ - ├── bot.mjs grammY Telegram bot + POST /send HTTP endpoint - ├── package.json - └── Dockerfile -``` - -## External Services (host ports, from openai/ stack) - -| Service | Host Port | Role | -|---------|-----------|------| -| Ollama GPU | 11436 | All LLM inference (via Bifrost) + VRAM management (direct) + memory extraction | -| Ollama CPU | 11435 | nomic-embed-text embeddings for openmemory | -| Qdrant | 6333 | Vector store for memories | -| SearXNG | 11437 | Web search (used by `web_search` tool) | diff --git a/agent.py b/agent.py index c26f351..37b163b 100644 --- a/agent.py +++ b/agent.py @@ -1,7 +1,9 @@ import asyncio +import json as _json_module import os import time from contextlib import asynccontextmanager +from pathlib import Path from fastapi import FastAPI, BackgroundTasks, Request from fastapi.responses import JSONResponse, StreamingResponse @@ -16,6 +18,7 @@ _URL_RE = _re.compile(r'https?://[^\s<>"\']+') def _extract_urls(text: str) -> list[str]: return _URL_RE.findall(text) +from openai import AsyncOpenAI from langchain_openai import ChatOpenAI from langchain_mcp_adapters.client import MultiServerMCPClient from langchain_community.utilities import SearxSearchWrapper @@ -27,8 +30,9 @@ from agent_factory import build_medium_agent, build_complex_agent from fast_tools import FastToolRunner, WeatherTool, CommuteTool import channels -# Bifrost gateway — all LLM inference goes through here -BIFROST_URL = os.getenv("BIFROST_URL", "http://bifrost:8080/v1") +# LiteLLM proxy — all LLM inference goes through here +LITELLM_URL = os.getenv("LITELLM_URL", "http://host.docker.internal:4000/v1") +LITELLM_API_KEY = os.getenv("LITELLM_API_KEY", "dummy") # Direct Ollama URL — used only by VRAMManager for flush/prewarm/poll OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") @@ -44,6 +48,45 @@ ROUTECHECK_TOKEN = os.getenv("ROUTECHECK_TOKEN", "") MAX_HISTORY_TURNS = 5 _conversation_buffers: dict[str, list] = {} +# ── Interaction logging (RLHF data collection) ───────────────────────────────── +_LOG_DIR = Path(os.getenv("ADOLF_LOG_DIR", "/app/logs")) +_INTERACTIONS_LOG = _LOG_DIR / "interactions.jsonl" + +def _ensure_log_dir() -> None: + try: + _LOG_DIR.mkdir(parents=True, exist_ok=True) + except Exception as e: + print(f"[log] cannot create log dir {_LOG_DIR}: {e}", flush=True) + + +async def _log_interaction( + session_id: str, + channel: str, + tier: str, + input_text: str, + response_text: str | None, + latency_ms: int, + metadata: dict | None = None, +) -> None: + """Append one interaction record to the JSONL log for future RLHF/finetuning.""" + record = { + "ts": time.time(), + "session_id": session_id, + "channel": channel, + "tier": tier, + "input": input_text, + "output": response_text or "", + "latency_ms": latency_ms, + } + if metadata: + record["metadata"] = metadata + try: + _ensure_log_dir() + with open(_INTERACTIONS_LOG, "a", encoding="utf-8") as f: + f.write(_json_module.dumps(record, ensure_ascii=False) + "\n") + except Exception as e: + print(f"[log] write error: {e}", flush=True) + # Per-session streaming queues — filled during inference, read by /stream/{session_id} _stream_queues: dict[str, asyncio.Queue] = {} @@ -140,31 +183,30 @@ async def lifespan(app: FastAPI): channels.register_defaults() # All three models route through Bifrost → Ollama GPU. - # Bifrost adds retry logic, observability, and failover. - # Model names use provider/model format: Bifrost strips the "ollama/" prefix - # before forwarding to Ollama's /v1/chat/completions endpoint. router_model = ChatOpenAI( model=f"ollama/{ROUTER_MODEL}", - base_url=BIFROST_URL, - api_key="dummy", + base_url=LITELLM_URL, + api_key=LITELLM_API_KEY, temperature=0, timeout=30, ) + embedder = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_API_KEY) medium_model = ChatOpenAI( model=f"ollama/{MEDIUM_MODEL}", - base_url=BIFROST_URL, - api_key="dummy", + base_url=LITELLM_URL, + api_key=LITELLM_API_KEY, timeout=180, ) complex_model = ChatOpenAI( - model=f"ollama/{COMPLEX_MODEL}", - base_url=BIFROST_URL, - api_key="dummy", + model=COMPLEX_MODEL, # full model name — may be remote (OpenRouter) or local ollama/* + base_url=LITELLM_URL, + api_key=LITELLM_API_KEY, timeout=600, ) vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL) - router = Router(model=router_model, fast_tool_runner=_fast_tool_runner) + router = Router(model=router_model, embedder=embedder, fast_tool_runner=_fast_tool_runner) + await router.initialize() mcp_connections = { "openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"}, @@ -279,8 +321,8 @@ async def lifespan(app: FastAPI): ) print( - f"[agent] bifrost={BIFROST_URL} | router=ollama/{ROUTER_MODEL} | " - f"medium=ollama/{MEDIUM_MODEL} | complex=ollama/{COMPLEX_MODEL}", + f"[agent] litellm={LITELLM_URL} | router=semantic(ollama/{ROUTER_MODEL}+nomic-embed-text) | " + f"medium=ollama/{MEDIUM_MODEL} | complex={COMPLEX_MODEL}", flush=True, ) print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True) @@ -346,6 +388,12 @@ def _log_messages(result): # ── memory helpers ───────────────────────────────────────────────────────────── +def _resolve_user_id(session_id: str) -> str: + """Map any session_id to a canonical user identity for openmemory. + All channels share the same memory pool for the single user.""" + return "alvis" + + async def _store_memory(session_id: str, user_msg: str, assistant_reply: str) -> None: """Store a conversation turn in openmemory (runs as a background task).""" if _memory_add_tool is None: @@ -353,7 +401,8 @@ async def _store_memory(session_id: str, user_msg: str, assistant_reply: str) -> t0 = time.monotonic() try: text = f"User: {user_msg}\nAssistant: {assistant_reply}" - await _memory_add_tool.ainvoke({"text": text, "user_id": session_id}) + user_id = _resolve_user_id(session_id) + await _memory_add_tool.ainvoke({"text": text, "user_id": user_id}) print(f"[memory] stored in {time.monotonic() - t0:.1f}s", flush=True) except Exception as e: print(f"[memory] error: {e}", flush=True) @@ -364,7 +413,8 @@ async def _retrieve_memories(message: str, session_id: str) -> str: if _memory_search_tool is None: return "" try: - result = await _memory_search_tool.ainvoke({"query": message, "user_id": session_id}) + user_id = _resolve_user_id(session_id) + result = await _memory_search_tool.ainvoke({"query": message, "user_id": user_id}) if result and result.strip() and result.strip() != "[]": return f"Relevant memories:\n{result}" except Exception: @@ -372,36 +422,41 @@ async def _retrieve_memories(message: str, session_id: str) -> str: return "" -# ── core task ────────────────────────────────────────────────────────────────── +# ── core pipeline ────────────────────────────────────────────────────────────── -async def run_agent_task(message: str, session_id: str, channel: str = "telegram"): - print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True) +from typing import AsyncGenerator - force_complex = False - clean_message = message - if message.startswith("/think "): - force_complex = True - clean_message = message[len("/think "):] - print("[agent] /think prefix → force_complex=True", flush=True) +async def _run_agent_pipeline( + message: str, + history: list[dict], + session_id: str, + tier_override: str | None = None, + dry_run: bool = False, +) -> AsyncGenerator[str, None]: + """Core pipeline: pre-flight → routing → inference. Yields text chunks. + tier_override: "light" | "medium" | "complex" | None (auto-route) + dry_run: if True and tier=complex, log tier=complex but use medium model (avoids API cost) + Caller is responsible for scheduling _store_memory after consuming all chunks. + """ async with _reply_semaphore: t0 = time.monotonic() - history = _conversation_buffers.get(session_id, []) + clean_message = message print(f"[agent] running: {clean_message[:80]!r}", flush=True) - # Fetch URL content, memories, and fast-tool context concurrently — all IO-bound + # Fetch URL content, memories, and fast-tool context concurrently url_context, memories, fast_context = await asyncio.gather( _fetch_urls_from_message(clean_message), _retrieve_memories(clean_message, session_id), _fast_tool_runner.run_matching(clean_message), ) if url_context: - print(f"[agent] crawl4ai: {len(url_context)} chars fetched from message URLs", flush=True) + print(f"[agent] crawl4ai: {len(url_context)} chars fetched", flush=True) if fast_context: names = _fast_tool_runner.matching_names(clean_message) print(f"[agent] fast_tools={names}: {len(fast_context)} chars injected", flush=True) - # Build enriched history: memories + url_context + fast_context for ALL tiers + # Build enriched history enriched_history = list(history) if url_context: enriched_history = [{"role": "system", "content": url_context}] + enriched_history @@ -410,45 +465,58 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram if memories: enriched_history = [{"role": "system", "content": memories}] + enriched_history - # Short-circuit: fast tool result is already a complete reply — skip router+LLM - if fast_context and not force_complex and not url_context: - tier = "fast" - final_text = fast_context - llm_elapsed = time.monotonic() - t0 - names = _fast_tool_runner.matching_names(clean_message) - print(f"[agent] tier=fast tools={names} — delivering directly", flush=True) - await _push_stream_chunk(session_id, final_text) - await _end_stream(session_id) - else: - tier, light_reply = await router.route(clean_message, enriched_history, force_complex) + final_text = None + llm_elapsed = 0.0 - # Messages with URL content must be handled by at least medium tier - if url_context and tier == "light": - tier = "medium" - light_reply = None - print("[agent] URL in message → upgraded light→medium", flush=True) - print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True) + try: + # Short-circuit: fast tool already has the answer + if fast_context and tier_override is None and not url_context: + tier = "fast" + final_text = fast_context + llm_elapsed = time.monotonic() - t0 + names = _fast_tool_runner.matching_names(clean_message) + print(f"[agent] tier=fast tools={names} — delivering directly", flush=True) + yield final_text + + else: + # Determine tier + if tier_override in ("light", "medium", "complex"): + tier = tier_override + light_reply = None + if tier_override == "light": + tier, light_reply = await router.route(clean_message, enriched_history) + tier = "light" + else: + tier, light_reply = await router.route(clean_message, enriched_history) + if url_context and tier == "light": + tier = "medium" + light_reply = None + print("[agent] URL in message → upgraded light→medium", flush=True) + + # Dry-run: log as complex but infer with medium (no remote API call) + effective_tier = tier + if dry_run and tier == "complex": + effective_tier = "medium" + print(f"[agent] tier=complex (dry-run) → using medium model, message={clean_message[:60]!r}", flush=True) + else: + print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True) + tier = effective_tier - if tier != "fast": - final_text = None - try: if tier == "light": final_text = light_reply llm_elapsed = time.monotonic() - t0 - print(f"[agent] light path: answered by router", flush=True) - await _push_stream_chunk(session_id, final_text) - await _end_stream(session_id) + print("[agent] light path: answered by router", flush=True) + yield final_text elif tier == "medium": system_prompt = MEDIUM_SYSTEM_PROMPT if memories: - system_prompt = system_prompt + "\n\n" + memories + system_prompt += "\n\n" + memories if url_context: - system_prompt = system_prompt + "\n\n" + url_context + system_prompt += "\n\n" + url_context if fast_context: - system_prompt = system_prompt + "\n\nLive web search results (use these to answer):\n\n" + fast_context + system_prompt += "\n\nLive web search results (use these to answer):\n\n" + fast_context - # Stream tokens directly — filter out qwen3 blocks in_think = False response_parts = [] async for chunk in medium_model.astream([ @@ -464,91 +532,117 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram in_think = False after = token.split("", 1)[1] if after: - await _push_stream_chunk(session_id, after) + yield after response_parts.append(after) else: if "" in token: in_think = True before = token.split("", 1)[0] if before: - await _push_stream_chunk(session_id, before) + yield before response_parts.append(before) else: - await _push_stream_chunk(session_id, token) + yield token response_parts.append(token) - await _end_stream(session_id) llm_elapsed = time.monotonic() - t0 final_text = "".join(response_parts).strip() or None - else: # complex - ok = await vram_manager.enter_complex_mode() - if not ok: - print("[agent] complex→medium fallback (eviction timeout)", flush=True) - tier = "medium" - system_prompt = MEDIUM_SYSTEM_PROMPT - if memories: - system_prompt = system_prompt + "\n\n" + memories - if url_context: - system_prompt = system_prompt + "\n\n" + url_context - result = await medium_agent.ainvoke({ - "messages": [ - {"role": "system", "content": system_prompt}, - *history, - {"role": "user", "content": clean_message}, - ] - }) - else: - system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id) - if url_context: - system_prompt = system_prompt + "\n\n[Pre-fetched URL content from user's message:]\n" + url_context - result = await complex_agent.ainvoke({ - "messages": [ - {"role": "system", "content": system_prompt}, - *history, - {"role": "user", "content": clean_message}, - ] - }) - asyncio.create_task(vram_manager.exit_complex_mode()) + else: # complex — remote model, no VRAM management needed + system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id) + if url_context: + system_prompt += "\n\n[Pre-fetched URL content from user's message:]\n" + url_context + result = await complex_agent.ainvoke({ + "messages": [ + {"role": "system", "content": system_prompt}, + *history, + {"role": "user", "content": clean_message}, + ] + }) llm_elapsed = time.monotonic() - t0 _log_messages(result) final_text = _extract_final_text(result) if final_text: - await _push_stream_chunk(session_id, final_text) - await _end_stream(session_id) + yield final_text - except Exception as e: - import traceback - llm_elapsed = time.monotonic() - t0 - print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True) - traceback.print_exc() - await _end_stream(session_id) + except Exception as e: + import traceback + llm_elapsed = time.monotonic() - t0 + print(f"[agent] error after {llm_elapsed:.1f}s for {session_id}: {e}", flush=True) + traceback.print_exc() - # Deliver reply through the originating channel + print(f"[agent] pipeline done in {time.monotonic() - t0:.1f}s tier={tier if 'tier' in dir() else '?'}", flush=True) + + # Store memory as side-effect (non-blocking, best-effort) if final_text: - t1 = time.monotonic() + asyncio.create_task(_store_memory(session_id, clean_message, final_text)) + + +# ── core task (Telegram / Matrix / CLI wrapper) ───────────────────────────────── + +async def run_agent_task( + message: str, + session_id: str, + channel: str = "telegram", + metadata: dict | None = None, +): + print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True) + t0 = time.monotonic() + + meta = metadata or {} + dry_run = bool(meta.get("dry_run", False)) + is_benchmark = bool(meta.get("benchmark", False)) + + history = _conversation_buffers.get(session_id, []) + final_text = None + actual_tier = "unknown" + + # Patch pipeline to capture tier for logging + # We read it from logs post-hoc; capture via a wrapper + async for chunk in _run_agent_pipeline(message, history, session_id, dry_run=dry_run): + await _push_stream_chunk(session_id, chunk) + if final_text is None: + final_text = chunk + else: + final_text += chunk + + await _end_stream(session_id) + + elapsed_ms = int((time.monotonic() - t0) * 1000) + + if final_text: + final_text = final_text.strip() + + # Skip channel delivery for benchmark sessions (no Telegram spam) + if not is_benchmark: try: await channels.deliver(session_id, channel, final_text) except Exception as e: print(f"[agent] delivery error (non-fatal): {e}", flush=True) - send_elapsed = time.monotonic() - t1 - print( - f"[agent] replied in {time.monotonic() - t0:.1f}s " - f"(llm={llm_elapsed:.1f}s, send={send_elapsed:.1f}s) tier={tier}", - flush=True, - ) - print(f"[agent] reply_text: {final_text}", flush=True) - else: - print("[agent] warning: no text reply from agent", flush=True) - # Update conversation buffer and schedule memory storage - if final_text: - buf = _conversation_buffers.get(session_id, []) - buf.append({"role": "user", "content": clean_message}) - buf.append({"role": "assistant", "content": final_text}) - _conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):] - asyncio.create_task(_store_memory(session_id, clean_message, final_text)) + print(f"[agent] replied in {elapsed_ms / 1000:.1f}s", flush=True) + print(f"[agent] reply_text: {final_text[:200]}", flush=True) + + # Update conversation buffer + buf = _conversation_buffers.get(session_id, []) + buf.append({"role": "user", "content": message}) + buf.append({"role": "assistant", "content": final_text}) + _conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):] + + # Log interaction for RLHF data collection (skip benchmark sessions to avoid noise) + if not is_benchmark: + asyncio.create_task(_log_interaction( + session_id=session_id, + channel=channel, + tier=actual_tier, + input_text=message, + response_text=final_text, + latency_ms=elapsed_ms, + metadata=meta if meta else None, + )) + else: + print("[agent] warning: no text reply from agent", flush=True) # ── endpoints ────────────────────────────────────────────────────────────────── @@ -560,7 +654,7 @@ async def message(request: InboundMessage, background_tasks: BackgroundTasks): return JSONResponse(status_code=503, content={"error": "Agent not ready"}) session_id = request.session_id channel = request.channel - background_tasks.add_task(run_agent_task, request.text, session_id, channel) + background_tasks.add_task(run_agent_task, request.text, session_id, channel, request.metadata) return JSONResponse(status_code=202, content={"status": "accepted"}) @@ -622,3 +716,96 @@ async def stream_reply(session_id: str): @app.get("/health") async def health(): return {"status": "ok", "agent_ready": medium_agent is not None} + + +# ── OpenAI-compatible API (for OpenWebUI and other clients) ──────────────────── + +_TIER_MAP = { + "adolf": None, + "adolf-light": "light", + "adolf-medium": "medium", + "adolf-deep": "complex", +} + + +@app.get("/v1/models") +async def list_models(): + return { + "object": "list", + "data": [ + {"id": "adolf", "object": "model", "owned_by": "adolf"}, + {"id": "adolf-light", "object": "model", "owned_by": "adolf"}, + {"id": "adolf-medium", "object": "model", "owned_by": "adolf"}, + {"id": "adolf-deep", "object": "model", "owned_by": "adolf"}, + ], + } + + +@app.post("/v1/chat/completions") +async def chat_completions(request: Request): + if medium_agent is None: + return JSONResponse(status_code=503, content={"error": {"message": "Agent not ready", "type": "server_error"}}) + + body = await request.json() + model = body.get("model", "adolf") + messages = body.get("messages", []) + stream = body.get("stream", True) + + # Extract current user message and history + user_messages = [m for m in messages if m.get("role") == "user"] + if not user_messages: + return JSONResponse(status_code=400, content={"error": {"message": "No user message", "type": "invalid_request_error"}}) + + current_message = user_messages[-1]["content"] + # History = everything before the last user message (excluding system messages from OpenWebUI) + last_user_idx = len(messages) - 1 - next( + i for i, m in enumerate(reversed(messages)) if m.get("role") == "user" + ) + history = [m for m in messages[:last_user_idx] if m.get("role") in ("user", "assistant")] + + session_id = request.headers.get("X-Session-Id", "owui-default") + tier_override = _TIER_MAP.get(model) + + import json as _json + import uuid as _uuid + + response_id = f"chatcmpl-{_uuid.uuid4().hex[:12]}" + + if stream: + async def event_stream(): + # Opening chunk with role + opening = { + "id": response_id, "object": "chat.completion.chunk", + "choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}] + } + yield f"data: {_json.dumps(opening)}\n\n" + + async for chunk in _run_agent_pipeline(current_message, history, session_id, tier_override): + data = { + "id": response_id, "object": "chat.completion.chunk", + "choices": [{"index": 0, "delta": {"content": chunk}, "finish_reason": None}] + } + yield f"data: {_json.dumps(data)}\n\n" + + # Final chunk + final = { + "id": response_id, "object": "chat.completion.chunk", + "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}] + } + yield f"data: {_json.dumps(final)}\n\n" + yield "data: [DONE]\n\n" + + return StreamingResponse(event_stream(), media_type="text/event-stream") + + else: + # Non-streaming: collect all chunks + parts = [] + async for chunk in _run_agent_pipeline(current_message, history, session_id, tier_override): + if chunk: + parts.append(chunk) + full_text = "".join(parts).strip() + return { + "id": response_id, "object": "chat.completion", + "choices": [{"index": 0, "message": {"role": "assistant", "content": full_text}, "finish_reason": "stop"}], + "model": model, + } diff --git a/bifrost-config.json b/bifrost-config.json index 7db331e..278a75f 100644 --- a/bifrost-config.json +++ b/bifrost-config.json @@ -1,4 +1,21 @@ { + "auth_config": { + "is_enabled": true, + "admin_username": "admin", + "admin_password": "env.BIFROST_ADMIN_PASSWORD" + }, + "config_store": { + "enabled": true, + "type": "postgres", + "config": { + "host": "bifrost-db", + "port": "5432", + "user": "bifrost", + "password": "bifrost", + "db_name": "bifrost", + "ssl_mode": "disable" + } + }, "client": { "drop_excess_requests": false }, diff --git a/channels.py b/channels.py index 97c50c5..21cde34 100644 --- a/channels.py +++ b/channels.py @@ -49,6 +49,7 @@ async def deliver(session_id: str, channel: str, text: str) -> None: # ── built-in channel adapters ───────────────────────────────────────────────── GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001") +MATRIX_URL = os.getenv("MATRIX_URL", "http://matrix:3002") async def _telegram_send(session_id: str, text: str) -> None: @@ -64,12 +65,26 @@ async def _telegram_send(session_id: str, text: str) -> None: ) +async def _matrix_send(session_id: str, text: str) -> None: + """Send reply to Matrix via the matrix adapter POST /send endpoint.""" + room_id = session_id.removeprefix("mx-") + MAX_MTX = 4000 + chunks = [text[i:i + MAX_MTX] for i in range(0, len(text), MAX_MTX)] + async with httpx.AsyncClient(timeout=15) as client: + for chunk in chunks: + await client.post( + f"{MATRIX_URL}/send", + json={"room_id": room_id, "text": chunk}, + ) + + async def _cli_send(session_id: str, text: str) -> None: """CLI replies are handled entirely through the pending_replies queue — no-op here.""" pass def register_defaults() -> None: - """Register the built-in Telegram and CLI channel adapters.""" + """Register the built-in Telegram, Matrix, and CLI channel adapters.""" register("telegram", _telegram_send) + register("matrix", _matrix_send) register("cli", _cli_send) diff --git a/docker-compose.yml b/docker-compose.yml index a17ade0..d40bdb1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,19 +1,4 @@ services: - bifrost: - image: maximhq/bifrost - container_name: bifrost - ports: - - "8080:8080" - volumes: - - ./bifrost-config.json:/app/data/config.json:ro - environment: - - APP_DIR=/app/data - - APP_PORT=8080 - - LOG_LEVEL=info - extra_hosts: - - "host.docker.internal:host-gateway" - restart: unless-stopped - deepagents: build: . container_name: deepagents @@ -21,25 +6,28 @@ services: - "8000:8000" environment: - PYTHONUNBUFFERED=1 - # Bifrost gateway — all LLM inference goes through here - - BIFROST_URL=http://bifrost:8080/v1 + # LiteLLM proxy — all LLM inference goes through here + - LITELLM_URL=http://host.docker.internal:4000/v1 + - LITELLM_API_KEY=sk-fjQC1BxAiGFSMs # Direct Ollama GPU URL — used only by VRAMManager for flush/prewarm - OLLAMA_BASE_URL=http://host.docker.internal:11436 - DEEPAGENTS_MODEL=qwen3:4b - - DEEPAGENTS_COMPLEX_MODEL=qwen3:8b + - DEEPAGENTS_COMPLEX_MODEL=deepseek/deepseek-r1:free - DEEPAGENTS_ROUTER_MODEL=qwen2.5:1.5b - SEARXNG_URL=http://host.docker.internal:11437 - GRAMMY_URL=http://grammy:3001 + - MATRIX_URL=http://host.docker.internal:3002 - CRAWL4AI_URL=http://crawl4ai:11235 - ROUTECHECK_URL=http://routecheck:8090 - ROUTECHECK_TOKEN=${ROUTECHECK_TOKEN} + volumes: + - ./logs:/app/logs extra_hosts: - "host.docker.internal:host-gateway" depends_on: - openmemory - grammy - crawl4ai - - bifrost - routecheck restart: unless-stopped diff --git a/router.py b/router.py index e3dde5f..a140d43 100644 --- a/router.py +++ b/router.py @@ -1,11 +1,38 @@ +import asyncio import re +import math from typing import Optional +from openai import AsyncOpenAI from langchain_core.messages import SystemMessage, HumanMessage from fast_tools import FastToolRunner -# ── Regex pre-classifier ────────────────────────────────────────────────────── -# Catches obvious light-tier patterns before calling the LLM. -# Keyed by regex → compiled pattern. +# ── Regex pre-classifiers ───────────────────────────────────────────────────── + +# Complex: keyword triggers that reliably signal deep multi-source research +_COMPLEX_PATTERNS = re.compile( + r"(?:^|\s)(" + r"research|investigate|deep.dive|think carefully" + r"|write a (?:detailed|comprehensive|full|thorough|complete)" + r"|compare all|find and (?:compare|summarize|analyze)" + r"|in[- ]depth analysis|comprehensive guide" + r"|detailed (?:report|analysis|comparison|breakdown|overview)" + r"|everything about|all (?:major|available|self-hosted|open.source)" + r"|pros and cons|with (?:sources|citations|references)" + # Russian complex research keywords (no trailing \b — stems like подробн match подробное/подробный) + r"|исследуй|изучи все|сравни все|найди и сравни|найди и опиши" + r"|напиши подробн|напиши детальн|напиши полн" + r"|подробный отчет|детальн\w+ (?:анализ|сравнение|отчет)" + r"|подробное (?:руководство|сравнение)|полное руководство" + r"|все варианты|все способы|все доступные|все самохостируемые|все платформы" + r"|лучшие практики|все инструменты|все решения|все протоколы" + r"|найди детальн|найди и кратко опиши" + r"|изучи свежие|изучи лучши|изучи все" + r"|сравни все\b" + r")", + re.IGNORECASE, +) + +# Light: trivial queries that need no tools or memory _LIGHT_PATTERNS = re.compile( r"^(" # Greetings / farewells @@ -15,35 +42,301 @@ _LIGHT_PATTERNS = re.compile( r"|thanks?|thank you|thx|ty|ok|okay|k|cool|great|awesome|perfect|sounds good|got it|nice|sure" r"|how are you|how are you\?|how are you doing(\s+today)?[?!.]*" r"|what.?s up" - # Calendar facts: "what day comes after X?" / "what comes after X?" + # Calendar facts r"|what\s+day\s+(comes\s+after|follows|is\s+after)\s+\w+[?!.]*" r"|what\s+comes\s+after\s+\w+[?!.]*" - # Acronym expansions: "what does X stand for?" + # Acronym expansions r"|what\s+does\s+\w+\s+stand\s+for[?!.]*" + # Russian greetings / farewells / acknowledgements + r"|привет|пока|спасибо|здравствуй|здравствуйте|добрый день|добрый вечер|доброе утро" + r"|окей|хорошо|отлично|понятно|ок|ладно|договорились|спс|благодарю" + r"|пожалуйста|не за что|всё понятно|ясно" + r"|как дела|как ты|как жизнь|всё хорошо|всё ок" r")[\s!.?]*$", re.IGNORECASE, ) -# ── LLM classification prompt ───────────────────────────────────────────────── -CLASSIFY_PROMPT = """Classify the message. Output ONLY one word: light, medium, or complex. +# ── Semantic router utterances ──────────────────────────────────────────────── +# These are embedded at startup. New messages are classified by cosine +# similarity — whichever tier's centroid is closest wins. +_LIGHT_UTTERANCES = [ + # General facts (English) + "what is 2+2", + "what is the capital of France", + "name the three primary colors", + "tell me a short joke", + "is the sky blue", + "is water wet", + "how many days in a week", + "what is the speed of light", + "what is the boiling point of water", + "spell the word beautiful", + "what color is the ocean", + "how many inches in a foot", + "who wrote hamlet", + "what is pi", + "what year did world war two end", + "what is the largest planet", + "how many continents are there", + "what does DNA stand for", + "what language do they speak in Brazil", + "what is the square root of 144", + # Tech definitions — static knowledge (English) + "what is Docker", + "what is a VPN", + "what is SSH", + "what is a reverse proxy", + "what is an API", + "what is a firewall", + "what is a container", + "what is DNS", + "what is HTTPS", + "what is a load balancer", + "what is Kubernetes", + "what is Git", + "what is a network port", + "what is an IP address", + "what is a subnet mask", + "what is the OSI model", + "how many bits in a byte", + "how many bytes in a gigabyte", + "what is TCP", + "what is a REST API", + # Russian — static facts and definitions + "что такое IP-адрес", + "что такое VPN", + "что такое Docker", + "что такое DNS", + "что такое SSH", + "что означает API", + "сколько байт в гигабайте", + "сколько бит в байте", + "что такое Zigbee", + "что такое Z-Wave", + "что такое брандмауэр", + "что такое виртуальная машина", + "что такое обратный прокси", + "привет", + "пока", + "спасибо", + "как дела", + "что такое Matter протокол", + "сколько планет в солнечной системе", + "чему равно число Пи", + # Russian — more static definitions + "что такое TCP/IP", + "что такое подсеть", + "скорость света", + "сколько дней в году", + "что такое Kubernetes", + "что такое Git", + "что такое REST API", + "что такое TCP", + "что такое UDP", + "что такое VLAN", + "сколько мегабайт в гигабайте", + "что такое процессор", + "что такое оперативная память", + "что такое виртуализация", + "что такое Linux", + "что такое умный дом", + "что такое Home Assistant", + "что такое Matter", +] -LIGHT = answerable from general knowledge, no internet needed: - what is 2+2 / what is the capital of France / name the three primary colors - tell me a short joke / is the sky blue / is water wet +_MEDIUM_UTTERANCES = [ + # English — current data, memory, actions + "what is the weather today", + "what is the bitcoin price right now", + "what are the latest news", + "what did we talk about last time", + "what is my name", + "where do I live", + "what do you know about me", + "what did I tell you before", + "what is the current temperature outside", + "remind me what I said about my project", + "search for the latest iPhone release", + "find me a restaurant nearby", + "turn on the lights in the living room", + "turn off all lights", + "set temperature to 22 degrees", + "what is the current traffic to Moscow", + "check if anyone is home", + "what devices are currently on", + "look up my public IP address", + "show me recent news about Proxmox", + # Russian — weather and commute + "какая сегодня погода в Балашихе", + "пойдет ли сегодня дождь", + "какая температура на улице сейчас", + "погода на завтра", + "будет ли снег сегодня", + "сколько ехать до Москвы сейчас", + "какие пробки на дороге до Москвы", + "время в пути на работу", + "есть ли пробки сейчас", + "стоит ли брать зонтик", + # Russian — smart home control + "включи свет в гостиной", + "выключи свет на кухне", + "какая температура дома", + "установи температуру 22 градуса", + "выключи все лампочки", + "какие устройства сейчас включены", + "включи ночной режим", + "открой шторы в гостиной", + "включи свет в спальне на 50 процентов", + "выключи свет во всём доме", + "включи вентилятор в детской", + "закрыты ли все окна", + "выключи телевизор", + "какое потребление электричества сегодня", + "включи кофемашину", + "сколько у нас датчиков движения", + "состояние всех дверных замков", + "есть ли кто-нибудь дома", + "установи будильник на 7 утра", + # Russian — personal memory + "как меня зовут", + "где я живу", + "что мы обсуждали в прошлый раз", + "что ты знаешь о моем домашнем сервере", + "напомни, какие сервисы я запускаю", + "что я просил тебя запомнить", + "что я говорил о своей сети", + # Russian — current info lookups requiring network/tools + "какой сейчас курс биткоина", + "курс доллара к рублю сейчас", + "какая последняя версия Docker", + "как перезапустить Docker контейнер", + "как посмотреть логи Docker контейнера", + "какие новые функции в Home Assistant 2024", + "есть ли проблемы у Cloudflare сегодня", + "какие новые Zigbee устройства вышли в 2024 году", + "найди хороший опенсорс менеджер фотографий", + "последние новости Proxmox", + "напиши bash команду для поиска больших файлов", + "как вывести список всех запущенных контейнеров", + "как проверить использование диска в Linux", +] -MEDIUM = requires web search or the user's stored memories: - current weather / today's news / Bitcoin price / what did we talk about - what is my name / where do I live / what is my job / do I have any pets - what do you know about me / what are my preferences / what did I tell you +_COMPLEX_UTTERANCES = [ + # English + "research everything about Elon Musk's recent projects and investments", + "write a detailed report on climate change solutions with sources", + "investigate the history and current state of quantum computing", + "find and summarize the latest academic papers on transformer architectures", + "analyze in depth the pros and cons of nuclear energy with citations", + "research the background and controversies around this person", + "compare all major cloud providers with detailed pricing and features", + "write a comprehensive biography of this historical figure", + "investigate what caused the 2008 financial crisis with multiple sources", + "research the best programming languages in 2024 with detailed comparison", + "find everything published about this medical condition and treatments", + "do a deep dive into the latest developments in artificial general intelligence", + "research and compare all options for starting a business in Europe", + "investigate recent news and controversies around this company", + "write a thorough analysis of geopolitical tensions in the Middle East", + "find detailed information on the side effects and studies for this medication", + "research the top 10 JavaScript frameworks with benchmarks and community data", + "investigate who is funding AI research and what their goals are", + "write a detailed market analysis for the electric vehicle industry", + "research everything you can find about this startup or technology", + # Russian — deep research + "исследуй и сравни все варианты умного домашнего освещения", + "напиши подробный отчет о протоколах умного дома", + "изучи все самохостируемые медиасерверы и сравни их", + "исследуй лучшие практики безопасности домашнего сервера", + "сравни все системы резервного копирования для Linux", + "напиши детальное сравнение WireGuard и OpenVPN", + "исследуй все варианты голосового управления на русском языке", + "изучи все опенсорс альтернативы Google сервисам", + "напиши подробный анализ локальных языковых моделей", + "исследуй лучшие инструменты мониторинга для домашнего сервера", + # Russian — more deep research queries matching benchmark + "исследуй и сравни Proxmox, Unraid и TrueNAS для домашней лаборатории", + "напиши подробное руководство по безопасности домашнего сервера", + "исследуй все доступные дашборды для самохостинга и сравни их", + "найди детальные бенчмарки ARM одноплатных компьютеров для домашней лаборатории", + "исследуй лучший стек мониторинга для самохостинга в 2024 году", + "исследуй и сравни WireGuard, OpenVPN и Tailscale для домашней сети", + "исследуй лучшие практики сегментации домашней сети с VLAN", + "изучи все самохостируемые DNS решения и их возможности", + "исследуй и сравни все платформы умного дома: Home Assistant и другие", + "изучи лучшие Zigbee координаторы и их совместимость с Home Assistant", + "напиши детальный отчет о поддержке протокола Matter и совместимости устройств", + "исследуй все способы интеграции умных ламп с Home Assistant", + "найди и сравни все варианты датчиков движения для умного дома", + "исследуй и сравни все самохостируемые решения для хранения фотографий", + "изучи лучшие самохостируемые медиасерверы: Jellyfin, Plex и Emby", + "исследуй последние достижения в локальном LLM инференсе и обзор моделей", + "изучи лучшие опенсорс альтернативы Google сервисов для приватности", + "найди и кратко опиши все крупные самохостируемые менеджеры паролей", + "напиши детальный анализ текущего состояния AI ассистентов для самохостинга", + "исследуй и сравни все инструменты оркестрации контейнеров для домашней лаборатории", + "изучи лучшие подходы к автоматическому резервному копированию в Linux", + "исследуй и сравни все самохостируемые инструменты личных финансов", + "изучи свежие CVE и уязвимости в популярном самохостируемом ПО", + "напиши подробное руководство по настройке автоматизаций в Home Assistant", + "исследуй все варианты голосового управления умным домом на русском языке", + "сравни все системы резервного копирования для Linux: Restic, BorgBackup и другие", + "исследуй лучшие самохостируемые системы мониторинга сети: Zabbix, Grafana", + "изучи все варианты локального запуска языковых моделей на видеокарте", + "напиши подробный отчет о технологиях синтеза речи с открытым исходным кодом", + "исследуй все способы интеграции умных розеток с мониторингом потребления", + "напиши полное руководство по настройке обратного прокси Caddy", + "исследуй лучшие практики написания Docker Compose файлов для продакшена", + "сравни все самохостируемые облачные хранилища: Nextcloud, Seafile и другие", + "изучи все доступные локальные ассистенты с голосовым управлением", + "исследуй все самохостируемые решения для блокировки рекламы: Pi-hole, AdGuard", + "напиши детальное сравнение систем управления конфигурацией: Ansible, Puppet", + "исследуй все протоколы умного дома и их плюсы и минусы: Zigbee, Z-Wave, Matter", + "найди и сравни все фреймворки для создания локальных AI ассистентов", + "исследуй лучшие решения для автоматического управления медиатекой", + "изучи все варианты самохостируемых систем учёта расходов с возможностью импорта", + "напиши сравнение всех вариантов самохостинга для хранения и синхронизации файлов", + "исследуй все открытые протоколы для умного дома и их экосистемы", + "изучи лучшие инструменты для автоматизации домашней инфраструктуры", +] -COMPLEX = /think prefix only: - /think compare frameworks / /think plan a trip - -Message: {message} -Output (one word only — light, medium, or complex):""" +# Medium: queries that require tools, actions, or real-time data (not static knowledge) +_MEDIUM_PATTERNS = re.compile( + r"(?:" + # Russian smart home commands — always need HA integration + r"(?:включи|выключи|открой|закрой|установи|поставь|убавь|прибавь|переключи)\s" + r"|(?:какая|какой|какое|каково)\s+(?:температура|влажность|потребление|состояние|статус)\s" + r"|(?:сколько|есть ли)\s.*(?:датчик|устройств|замк)" + # Russian memory queries + r"|как меня зовут|где я живу|что мы обсуждали|что я говорил|что я просил" + r"|напомни\b|что ты знаешь обо мне" + # Russian current info + r"|курс (?:доллара|биткоина|евро|рубл)" + r"|(?:последние |свежие )?новости\b" + r"|(?:погода|температура)\s+(?:на завтра|на неделю)" + r")", + re.IGNORECASE, +) LIGHT_REPLY_PROMPT = """You are a helpful Telegram assistant. Answer briefly and naturally (1-3 sentences). Be friendly.""" +_EMBED_MODEL = "ollama/nomic-embed-text" + + +def _cosine(a: list[float], b: list[float]) -> float: + dot = sum(x * y for x, y in zip(a, b)) + norm_a = math.sqrt(sum(x * x for x in a)) + norm_b = math.sqrt(sum(x * x for x in b)) + if norm_a == 0 or norm_b == 0: + return 0.0 + return dot / (norm_a * norm_b) + + +def _centroid(embeddings: list[list[float]]) -> list[float]: + n = len(embeddings) + dim = len(embeddings[0]) + return [sum(embeddings[i][d] for i in range(n)) / n for d in range(dim)] + def _format_history(history: list[dict]) -> str: if not history: @@ -56,71 +349,93 @@ def _format_history(history: list[dict]) -> str: return "\n".join(lines) -def _parse_tier(text: str) -> str: - """Extract tier from raw model output. Default to medium.""" - t = text.strip().lower() - snippet = t[:60] - if "complex" in snippet: - return "complex" - if "medium" in snippet: - return "medium" - if "light" in snippet: - return "light" - # Model invented a descriptive category (e.g. "simplefact", "trivial", "basic") → - # treat as light since it recognised the question doesn't need tools - if any(w in snippet for w in ("simple", "fact", "trivial", "basic", "easy", "general")): - return "light" - return "medium" # safe default - - class Router: - def __init__(self, model, fast_tool_runner: FastToolRunner | None = None): - self.model = model + def __init__( + self, + model, + embedder: AsyncOpenAI, + fast_tool_runner: FastToolRunner | None = None, + ): + self.model = model # qwen2.5:1.5b — used only for generating light replies + self._embedder = embedder self._fast_tool_runner = fast_tool_runner + self._light_centroid: list[float] | None = None + self._medium_centroid: list[float] | None = None + self._complex_centroid: list[float] | None = None + + async def initialize(self) -> None: + """Pre-compute utterance embeddings. Call once at startup. Retries until LiteLLM is ready.""" + print("[router] embedding utterances for semantic classifier...", flush=True) + texts = _LIGHT_UTTERANCES + _MEDIUM_UTTERANCES + _COMPLEX_UTTERANCES + for attempt in range(10): + try: + resp = await self._embedder.embeddings.create(model=_EMBED_MODEL, input=texts) + embeddings = [item.embedding for item in resp.data] + n_light = len(_LIGHT_UTTERANCES) + n_medium = len(_MEDIUM_UTTERANCES) + self._light_centroid = _centroid(embeddings[:n_light]) + self._medium_centroid = _centroid(embeddings[n_light:n_light + n_medium]) + self._complex_centroid = _centroid(embeddings[n_light + n_medium:]) + print("[router] semantic classifier ready (3-tier)", flush=True) + return + except Exception as e: + print(f"[router] embedding attempt {attempt+1}/10 failed: {e}", flush=True) + await asyncio.sleep(3) + print("[router] WARNING: could not initialize semantic classifier — will default to medium", flush=True) + + async def _classify_by_embedding(self, message: str) -> str: + """Embed message and return 'light', 'medium', or 'complex' based on centroid similarity.""" + if self._light_centroid is None or self._medium_centroid is None or self._complex_centroid is None: + return "medium" + try: + resp = await self._embedder.embeddings.create(model=_EMBED_MODEL, input=[message]) + emb = resp.data[0].embedding + score_light = _cosine(emb, self._light_centroid) + score_medium = _cosine(emb, self._medium_centroid) + score_complex = _cosine(emb, self._complex_centroid) + tier = max( + [("light", score_light), ("medium", score_medium), ("complex", score_complex)], + key=lambda x: x[1], + )[0] + print( + f"[router] semantic: light={score_light:.3f} medium={score_medium:.3f} " + f"complex={score_complex:.3f} → {tier}", + flush=True, + ) + return tier + except Exception as e: + print(f"[router] embedding classify error, defaulting to medium: {e}", flush=True) + return "medium" async def route( self, message: str, history: list[dict], - force_complex: bool = False, ) -> tuple[str, Optional[str]]: """ Returns (tier, reply_or_None). - For light tier: also generates the reply with a second call. + For light tier: also generates the reply inline. For medium/complex: reply is None. """ - if force_complex: - return "complex", None - - # Step 0a: fast tool match — agent.py short-circuits before reaching router - # This branch is only hit if force_complex=True with a fast-tool message (rare) if self._fast_tool_runner and self._fast_tool_runner.any_matches(message.strip()): names = self._fast_tool_runner.matching_names(message.strip()) print(f"[router] fast_tool_match={names} → medium", flush=True) return "medium", None - # Step 0b: regex pre-classification for obvious light patterns if _LIGHT_PATTERNS.match(message.strip()): - print(f"[router] regex→light", flush=True) + print("[router] regex→light", flush=True) return await self._generate_light_reply(message, history) - # Step 1: LLM classification with raw text output - try: - classify_response = await self.model.ainvoke([ - HumanMessage(content=CLASSIFY_PROMPT.format(message=message)), - ]) - raw = classify_response.content or "" - raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() - tier = _parse_tier(raw) + if _COMPLEX_PATTERNS.search(message.strip()): + print("[router] regex→complex", flush=True) + return "complex", None - if tier == "complex" and not message.startswith("/think"): - tier = "medium" - - print(f"[router] raw={raw[:30]!r} → tier={tier}", flush=True) - except Exception as e: - print(f"[router] classify error, defaulting to medium: {e}", flush=True) + if _MEDIUM_PATTERNS.search(message.strip()): + print("[router] regex→medium", flush=True) return "medium", None + tier = await self._classify_by_embedding(message) + if tier != "light": return tier, None @@ -129,7 +444,7 @@ class Router: async def _generate_light_reply( self, message: str, history: list[dict] ) -> tuple[str, Optional[str]]: - """Generate a short reply using the router model for light-tier messages.""" + """Generate a short reply using qwen2.5:1.5b for light-tier messages.""" history_text = _format_history(history) context = f"\nConversation history:\n{history_text}" if history else "" try: