4 Commits

Author SHA1 Message Date
8ef4897869 Fix tier logging: capture actual_tier, fix parse_run_block regex, remove reply_text truncation
- Add tier_capture param to _run_agent_pipeline; append tier after determination
- Capture actual_tier in run_agent_task from tier_capture list
- Log tier in replied-in line: [agent] replied in Xs tier=Y
- Remove reply_text[:200] truncation (was breaking benchmark keyword matching)
- Update parse_run_block regex to match new log format; llm/send fields now None

Fixes #1, #3, #4

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:41:59 +00:00
Alvis
1f5e272600 Switch from Bifrost to LiteLLM; add Matrix channel; update rules
Infrastructure:
- docker-compose.yml: replace bifrost container with LiteLLM proxy
  (host.docker.internal:4000); complex model → deepseek-r1:free via
  OpenRouter; add Matrix URL env var; mount logs volume
- bifrost-config.json: add auth_config + postgres config_store (archived)

Routing:
- router.py: full semantic 3-tier classifier rewrite — nomic-embed-text
  centroids for light/medium/complex; regex pre-classifiers for all tiers;
  Russian utterance sets expanded
- agent.py: wire LiteLLM URL; add dry_run support; add Matrix channel

Channels:
- channels.py: add Matrix adapter (_matrix_send via mx- session prefix)

Rules / docs:
- agent-pipeline.md: remove /think prefix requirement; document automatic
  complex tier classification
- llm-inference.md: update BIFROST_URL → LITELLM_URL references; add
  remote model note for complex tier
- ARCHITECTURE.md: deleted (superseded by README.md)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:14:13 +00:00
Alvis
54cb940279 Update docs: add benchmarks/ section, fix complex tier description
- CLAUDE.md: add benchmark commands (run_benchmark.py flags, dry-run,
  categories, voice benchmark)
- README.md: add benchmarks/ to Files tree; fix incorrect claim that
  complex tier requires /think prefix — it is auto-classified via regex
  and embedding similarity; fix "Complex agent (/think prefix)" heading

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:13:14 +00:00
Alvis
bd951f943f Move benchmark scripts into benchmarks/ subdir
- benchmarks/run_benchmark.py (was run_benchmark.py)
- benchmarks/run_voice_benchmark.py (was run_voice_benchmark.py)
- Scripts use Path(__file__).parent so paths resolve correctly in subdir
- .gitignore updated: ignore benchmarks/benchmark.json,
  results_latest.json, voice_results*.json, voice_audio/

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:02:46 +00:00
13 changed files with 772 additions and 220 deletions

View File

@@ -1,9 +1,11 @@
# Agent Pipeline Rules # Agent Pipeline Rules
## Tiers ## Tiers
- Complex tier requires `/think ` prefix. Any LLM classification of "complex" is downgraded to medium. Do not change this. - Routing is fully automatic: router classifies into light/medium/complex via 3-way embedding similarity.
- Medium is the default tier. Light is only for trivial static-knowledge queries matched by regex or LLM. - Complex tier is reached automatically for deep research queries — no prefix required.
- Medium is the default tier. Light is only for trivial static-knowledge queries matched by regex or embedding.
- Light tier upgrade to medium is automatic when URL content is pre-fetched or a fast tool matches. - Light tier upgrade to medium is automatic when URL content is pre-fetched or a fast tool matches.
- `tier_override` API parameter still allows callers to force a specific tier (e.g. `adolf-deep` model → complex).
## Medium agent ## Medium agent
- `_DirectModel` makes a single `ainvoke()` call with no tool schema. Do not add tools to the medium agent. - `_DirectModel` makes a single `ainvoke()` call with no tool schema. Do not add tools to the medium agent.

View File

@@ -1,7 +1,8 @@
# LLM Inference Rules # LLM Inference Rules
- All LLM calls must use `base_url=BIFROST_URL` with model name `ollama/<model>`. Never call Ollama directly for inference. - All LLM calls must use `base_url=LITELLM_URL` (points to LiteLLM at `host.docker.internal:4000/v1`). Never call Ollama directly for inference.
- `_reply_semaphore` (asyncio.Semaphore(1)) serializes all GPU inference. Never bypass it or add a second semaphore. - `_reply_semaphore` (asyncio.Semaphore(1)) serializes all GPU inference. Never bypass it or add a second semaphore.
- Model names in code always use the `ollama/` prefix: `ollama/qwen3:4b`, `ollama/qwen3:8b`, `ollama/qwen2.5:1.5b`. - Local Ollama models use the `ollama/` prefix: `ollama/qwen3:4b`, `ollama/qwen2.5:1.5b`. Remote models (e.g. OpenRouter) use their full LiteLLM name: `openrouter/deepseek-r1`.
- Timeout values: router=30s, medium=180s, complex=600s. Do not reduce them — GPU inference under load is slow. - Timeout values: router=30s, medium=180s, complex=600s. Do not reduce them.
- `VRAMManager` is the only component that contacts Ollama directly (for flush/prewarm/poll). This is intentional — Bifrost cannot manage VRAM. - `VRAMManager` is the only component that contacts Ollama directly (for flush/prewarm/poll). This is intentional — LiteLLM cannot manage VRAM.
- Complex tier uses a remote model (`DEEPAGENTS_COMPLEX_MODEL`) — no VRAM management is needed for it.

6
.gitignore vendored
View File

@@ -2,5 +2,7 @@ __pycache__/
*.pyc *.pyc
logs/*.jsonl logs/*.jsonl
adolf_tuning_data/voice_audio/ adolf_tuning_data/voice_audio/
benchmark.json benchmarks/benchmark.json
results_latest.json benchmarks/results_latest.json
benchmarks/voice_results*.json
benchmarks/voice_audio/

View File

@@ -18,8 +18,24 @@ python3 test_routing.py [--easy-only|--medium-only|--hard-only]
# Use case tests — read the .md file and follow its steps as Claude Code # Use case tests — read the .md file and follow its steps as Claude Code
# example: read tests/use_cases/weather_now.md and execute it # example: read tests/use_cases/weather_now.md and execute it
# Routing benchmark — measures tier classification accuracy across 120 queries
# Run from benchmarks/ — Adolf must be running. DO NOT run during active use (holds GPU).
cd benchmarks
python3 run_benchmark.py # full run (120 queries)
python3 run_benchmark.py --tier light # light tier only (30 queries)
python3 run_benchmark.py --tier medium # medium tier only (50 queries)
python3 run_benchmark.py --tier complex --dry-run # complex tier, medium model (no API cost)
python3 run_benchmark.py --category smart_home_control
python3 run_benchmark.py --ids 1,2,3
python3 run_benchmark.py --list-categories
# Voice benchmark
python3 run_voice_benchmark.py
# benchmark.json (dataset) and results_latest.json are gitignored — not committed
``` ```
## Architecture ## Architecture
@ARCHITECTURE.md @README.md

View File

@@ -74,7 +74,7 @@ Autonomous personal assistant with a multi-channel gateway. Three-tier model rou
Adolf uses LangChain's tool interface but only the complex agent actually invokes tools at runtime. Adolf uses LangChain's tool interface but only the complex agent actually invokes tools at runtime.
**Complex agent (`/think` prefix):** `web_search` and `fetch_url` are defined as `langchain_core.tools.Tool` objects and passed to `create_deep_agent()`. The deepagents library runs an agentic loop (LangGraph `create_react_agent` under the hood) that sends the tool schema to the model via OpenAI function-calling format and handles tool dispatch. **Complex agent:** `web_search` and `fetch_url` are defined as `langchain_core.tools.Tool` objects and passed to `create_deep_agent()`. The deepagents library runs an agentic loop (LangGraph `create_react_agent` under the hood) that sends the tool schema to the model via OpenAI function-calling format and handles tool dispatch.
**Medium agent (default):** `_DirectModel` makes a single `model.ainvoke(messages)` call with no tool schema. Context (memories, fetched URL content) is injected via the system prompt instead. This is intentional — `qwen3:4b` behaves unreliably when a tool array is present. **Medium agent (default):** `_DirectModel` makes a single `model.ainvoke(messages)` call with no tool schema. Context (memories, fetched URL content) is injected via the system prompt instead. This is intentional — `qwen3:4b` behaves unreliably when a tool array is present.
@@ -84,13 +84,13 @@ Adolf uses LangChain's tool interface but only the complex agent actually invoke
| Tier | Model | Agent | Trigger | Latency | | Tier | Model | Agent | Trigger | Latency |
|------|-------|-------|---------|---------| |------|-------|-------|---------|---------|
| Light | `qwen2.5:1.5b` (router answers directly) | — | Regex pre-match or LLM classifies "light" | ~24s | | Light | `qwen2.5:1.5b` (router answers directly) | — | Regex pre-match or 3-way embedding classifies "light" | ~24s |
| Medium | `qwen3:4b` (`DEEPAGENTS_MODEL`) | `_DirectModel` — single LLM call, no tools | Default; also forced when message contains URLs | ~1020s | | Medium | `qwen3:4b` (`DEEPAGENTS_MODEL`) | `_DirectModel` — single LLM call, no tools | Default; also forced when message contains URLs | ~1020s |
| Complex | `qwen3:8b` (`DEEPAGENTS_COMPLEX_MODEL`) | `create_deep_agent` — agentic loop with tools | `/think` prefix only | ~60120s | | Complex | `deepseek/deepseek-r1:free` via LiteLLM (`DEEPAGENTS_COMPLEX_MODEL`) | `create_deep_agent` — agentic loop with tools | Auto-classified by embedding similarity | ~3090s |
**`/think` prefix**: forces complex tier, stripped before sending to agent. Routing is fully automatic via 3-way cosine similarity over pre-embedded utterance centroids (light / medium / complex). No prefix required. Use `adolf-deep` model name to force complex tier via API.
Complex tier is locked out unless the message starts with `/think` — any LLM classification of "complex" is downgraded to medium. Complex tier is reached automatically for deep research queries — `исследуй`, `изучи все`, `напиши подробный`, etc. — via regex pre-classifier and embedding similarity. No prefix required. Use `adolf-deep` model name to force it via API.
## Fast Tools (`fast_tools.py`) ## Fast Tools (`fast_tools.py`)
@@ -164,7 +164,7 @@ Conversation history is keyed by session_id (5-turn buffer).
``` ```
adolf/ adolf/
├── docker-compose.yml Services: bifrost, deepagents, openmemory, grammy, crawl4ai, routecheck, cli ├── docker-compose.yml Services: deepagents, openmemory, grammy, crawl4ai, routecheck, cli
├── Dockerfile deepagents container (Python 3.12) ├── Dockerfile deepagents container (Python 3.12)
├── Dockerfile.cli CLI container (python:3.12-slim + rich) ├── Dockerfile.cli CLI container (python:3.12-slim + rich)
├── agent.py FastAPI gateway, run_agent_task, Crawl4AI pre-fetch, fast tools, memory pipeline ├── agent.py FastAPI gateway, run_agent_task, Crawl4AI pre-fetch, fast tools, memory pipeline
@@ -175,6 +175,11 @@ adolf/
├── agent_factory.py _DirectModel (medium) / create_deep_agent (complex) ├── agent_factory.py _DirectModel (medium) / create_deep_agent (complex)
├── cli.py Interactive CLI REPL — Rich Live streaming + Markdown render ├── cli.py Interactive CLI REPL — Rich Live streaming + Markdown render
├── wiki_research.py Batch wiki research pipeline (uses /message + SSE) ├── wiki_research.py Batch wiki research pipeline (uses /message + SSE)
├── benchmarks/
│ ├── run_benchmark.py Routing accuracy benchmark — 120 queries across 3 tiers
│ ├── run_voice_benchmark.py Voice path benchmark
│ ├── benchmark.json Query dataset (gitignored)
│ └── results_latest.json Last run results (gitignored)
├── .env TELEGRAM_BOT_TOKEN, ROUTECHECK_TOKEN, YANDEX_ROUTING_KEY (not committed) ├── .env TELEGRAM_BOT_TOKEN, ROUTECHECK_TOKEN, YANDEX_ROUTING_KEY (not committed)
├── routecheck/ ├── routecheck/
│ ├── app.py FastAPI: image captcha + /api/route Yandex proxy │ ├── app.py FastAPI: image captcha + /api/route Yandex proxy
@@ -195,7 +200,9 @@ adolf/
| Service | Host Port | Role | | Service | Host Port | Role |
|---------|-----------|------| |---------|-----------|------|
| Ollama GPU | 11436 | All LLM inference (via Bifrost) + VRAM management (direct) + memory extraction | | LiteLLM | 4000 | LLM proxy — all inference goes through here (`LITELLM_URL` env var) |
| Ollama GPU | 11436 | GPU inference backend + VRAM management (direct) + memory extraction |
| Ollama CPU | 11435 | nomic-embed-text embeddings for openmemory | | Ollama CPU | 11435 | nomic-embed-text embeddings for openmemory |
| Langfuse | 3200 | LLM observability — traces all requests via LiteLLM callbacks |
| Qdrant | 6333 | Vector store for memories | | Qdrant | 6333 | Vector store for memories |
| SearXNG | 11437 | Web search (used by `web_search` tool) | | SearXNG | 11437 | Web search (used by `web_search` tool) |

424
agent.py
View File

@@ -1,7 +1,9 @@
import asyncio import asyncio
import json as _json_module
import os import os
import time import time
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, BackgroundTasks, Request from fastapi import FastAPI, BackgroundTasks, Request
from fastapi.responses import JSONResponse, StreamingResponse from fastapi.responses import JSONResponse, StreamingResponse
@@ -16,6 +18,7 @@ _URL_RE = _re.compile(r'https?://[^\s<>"\']+')
def _extract_urls(text: str) -> list[str]: def _extract_urls(text: str) -> list[str]:
return _URL_RE.findall(text) return _URL_RE.findall(text)
from openai import AsyncOpenAI
from langchain_openai import ChatOpenAI from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_community.utilities import SearxSearchWrapper from langchain_community.utilities import SearxSearchWrapper
@@ -27,8 +30,9 @@ from agent_factory import build_medium_agent, build_complex_agent
from fast_tools import FastToolRunner, WeatherTool, CommuteTool from fast_tools import FastToolRunner, WeatherTool, CommuteTool
import channels import channels
# Bifrost gateway — all LLM inference goes through here # LiteLLM proxy — all LLM inference goes through here
BIFROST_URL = os.getenv("BIFROST_URL", "http://bifrost:8080/v1") LITELLM_URL = os.getenv("LITELLM_URL", "http://host.docker.internal:4000/v1")
LITELLM_API_KEY = os.getenv("LITELLM_API_KEY", "dummy")
# Direct Ollama URL — used only by VRAMManager for flush/prewarm/poll # Direct Ollama URL — used only by VRAMManager for flush/prewarm/poll
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
@@ -44,6 +48,45 @@ ROUTECHECK_TOKEN = os.getenv("ROUTECHECK_TOKEN", "")
MAX_HISTORY_TURNS = 5 MAX_HISTORY_TURNS = 5
_conversation_buffers: dict[str, list] = {} _conversation_buffers: dict[str, list] = {}
# ── Interaction logging (RLHF data collection) ─────────────────────────────────
_LOG_DIR = Path(os.getenv("ADOLF_LOG_DIR", "/app/logs"))
_INTERACTIONS_LOG = _LOG_DIR / "interactions.jsonl"
def _ensure_log_dir() -> None:
try:
_LOG_DIR.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"[log] cannot create log dir {_LOG_DIR}: {e}", flush=True)
async def _log_interaction(
session_id: str,
channel: str,
tier: str,
input_text: str,
response_text: str | None,
latency_ms: int,
metadata: dict | None = None,
) -> None:
"""Append one interaction record to the JSONL log for future RLHF/finetuning."""
record = {
"ts": time.time(),
"session_id": session_id,
"channel": channel,
"tier": tier,
"input": input_text,
"output": response_text or "",
"latency_ms": latency_ms,
}
if metadata:
record["metadata"] = metadata
try:
_ensure_log_dir()
with open(_INTERACTIONS_LOG, "a", encoding="utf-8") as f:
f.write(_json_module.dumps(record, ensure_ascii=False) + "\n")
except Exception as e:
print(f"[log] write error: {e}", flush=True)
# Per-session streaming queues — filled during inference, read by /stream/{session_id} # Per-session streaming queues — filled during inference, read by /stream/{session_id}
_stream_queues: dict[str, asyncio.Queue] = {} _stream_queues: dict[str, asyncio.Queue] = {}
@@ -140,31 +183,30 @@ async def lifespan(app: FastAPI):
channels.register_defaults() channels.register_defaults()
# All three models route through Bifrost → Ollama GPU. # All three models route through Bifrost → Ollama GPU.
# Bifrost adds retry logic, observability, and failover.
# Model names use provider/model format: Bifrost strips the "ollama/" prefix
# before forwarding to Ollama's /v1/chat/completions endpoint.
router_model = ChatOpenAI( router_model = ChatOpenAI(
model=f"ollama/{ROUTER_MODEL}", model=f"ollama/{ROUTER_MODEL}",
base_url=BIFROST_URL, base_url=LITELLM_URL,
api_key="dummy", api_key=LITELLM_API_KEY,
temperature=0, temperature=0,
timeout=30, timeout=30,
) )
embedder = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_API_KEY)
medium_model = ChatOpenAI( medium_model = ChatOpenAI(
model=f"ollama/{MEDIUM_MODEL}", model=f"ollama/{MEDIUM_MODEL}",
base_url=BIFROST_URL, base_url=LITELLM_URL,
api_key="dummy", api_key=LITELLM_API_KEY,
timeout=180, timeout=180,
) )
complex_model = ChatOpenAI( complex_model = ChatOpenAI(
model=f"ollama/{COMPLEX_MODEL}", model=COMPLEX_MODEL, # full model name — may be remote (OpenRouter) or local ollama/*
base_url=BIFROST_URL, base_url=LITELLM_URL,
api_key="dummy", api_key=LITELLM_API_KEY,
timeout=600, timeout=600,
) )
vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL) vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL)
router = Router(model=router_model, fast_tool_runner=_fast_tool_runner) router = Router(model=router_model, embedder=embedder, fast_tool_runner=_fast_tool_runner)
await router.initialize()
mcp_connections = { mcp_connections = {
"openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"}, "openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"},
@@ -279,8 +321,8 @@ async def lifespan(app: FastAPI):
) )
print( print(
f"[agent] bifrost={BIFROST_URL} | router=ollama/{ROUTER_MODEL} | " f"[agent] litellm={LITELLM_URL} | router=semantic(ollama/{ROUTER_MODEL}+nomic-embed-text) | "
f"medium=ollama/{MEDIUM_MODEL} | complex=ollama/{COMPLEX_MODEL}", f"medium=ollama/{MEDIUM_MODEL} | complex={COMPLEX_MODEL}",
flush=True, flush=True,
) )
print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True) print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True)
@@ -346,6 +388,12 @@ def _log_messages(result):
# ── memory helpers ───────────────────────────────────────────────────────────── # ── memory helpers ─────────────────────────────────────────────────────────────
def _resolve_user_id(session_id: str) -> str:
"""Map any session_id to a canonical user identity for openmemory.
All channels share the same memory pool for the single user."""
return "alvis"
async def _store_memory(session_id: str, user_msg: str, assistant_reply: str) -> None: async def _store_memory(session_id: str, user_msg: str, assistant_reply: str) -> None:
"""Store a conversation turn in openmemory (runs as a background task).""" """Store a conversation turn in openmemory (runs as a background task)."""
if _memory_add_tool is None: if _memory_add_tool is None:
@@ -353,7 +401,8 @@ async def _store_memory(session_id: str, user_msg: str, assistant_reply: str) ->
t0 = time.monotonic() t0 = time.monotonic()
try: try:
text = f"User: {user_msg}\nAssistant: {assistant_reply}" text = f"User: {user_msg}\nAssistant: {assistant_reply}"
await _memory_add_tool.ainvoke({"text": text, "user_id": session_id}) user_id = _resolve_user_id(session_id)
await _memory_add_tool.ainvoke({"text": text, "user_id": user_id})
print(f"[memory] stored in {time.monotonic() - t0:.1f}s", flush=True) print(f"[memory] stored in {time.monotonic() - t0:.1f}s", flush=True)
except Exception as e: except Exception as e:
print(f"[memory] error: {e}", flush=True) print(f"[memory] error: {e}", flush=True)
@@ -364,7 +413,8 @@ async def _retrieve_memories(message: str, session_id: str) -> str:
if _memory_search_tool is None: if _memory_search_tool is None:
return "" return ""
try: try:
result = await _memory_search_tool.ainvoke({"query": message, "user_id": session_id}) user_id = _resolve_user_id(session_id)
result = await _memory_search_tool.ainvoke({"query": message, "user_id": user_id})
if result and result.strip() and result.strip() != "[]": if result and result.strip() and result.strip() != "[]":
return f"Relevant memories:\n{result}" return f"Relevant memories:\n{result}"
except Exception: except Exception:
@@ -372,36 +422,42 @@ async def _retrieve_memories(message: str, session_id: str) -> str:
return "" return ""
# ── core task ────────────────────────────────────────────────────────────────── # ── core pipeline ──────────────────────────────────────────────────────────────
async def run_agent_task(message: str, session_id: str, channel: str = "telegram"): from typing import AsyncGenerator
print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True)
force_complex = False async def _run_agent_pipeline(
clean_message = message message: str,
if message.startswith("/think "): history: list[dict],
force_complex = True session_id: str,
clean_message = message[len("/think "):] tier_override: str | None = None,
print("[agent] /think prefix → force_complex=True", flush=True) dry_run: bool = False,
tier_capture: list | None = None,
) -> AsyncGenerator[str, None]:
"""Core pipeline: pre-flight → routing → inference. Yields text chunks.
tier_override: "light" | "medium" | "complex" | None (auto-route)
dry_run: if True and tier=complex, log tier=complex but use medium model (avoids API cost)
Caller is responsible for scheduling _store_memory after consuming all chunks.
"""
async with _reply_semaphore: async with _reply_semaphore:
t0 = time.monotonic() t0 = time.monotonic()
history = _conversation_buffers.get(session_id, []) clean_message = message
print(f"[agent] running: {clean_message[:80]!r}", flush=True) print(f"[agent] running: {clean_message[:80]!r}", flush=True)
# Fetch URL content, memories, and fast-tool context concurrently — all IO-bound # Fetch URL content, memories, and fast-tool context concurrently
url_context, memories, fast_context = await asyncio.gather( url_context, memories, fast_context = await asyncio.gather(
_fetch_urls_from_message(clean_message), _fetch_urls_from_message(clean_message),
_retrieve_memories(clean_message, session_id), _retrieve_memories(clean_message, session_id),
_fast_tool_runner.run_matching(clean_message), _fast_tool_runner.run_matching(clean_message),
) )
if url_context: if url_context:
print(f"[agent] crawl4ai: {len(url_context)} chars fetched from message URLs", flush=True) print(f"[agent] crawl4ai: {len(url_context)} chars fetched", flush=True)
if fast_context: if fast_context:
names = _fast_tool_runner.matching_names(clean_message) names = _fast_tool_runner.matching_names(clean_message)
print(f"[agent] fast_tools={names}: {len(fast_context)} chars injected", flush=True) print(f"[agent] fast_tools={names}: {len(fast_context)} chars injected", flush=True)
# Build enriched history: memories + url_context + fast_context for ALL tiers # Build enriched history
enriched_history = list(history) enriched_history = list(history)
if url_context: if url_context:
enriched_history = [{"role": "system", "content": url_context}] + enriched_history enriched_history = [{"role": "system", "content": url_context}] + enriched_history
@@ -410,45 +466,60 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
if memories: if memories:
enriched_history = [{"role": "system", "content": memories}] + enriched_history enriched_history = [{"role": "system", "content": memories}] + enriched_history
# Short-circuit: fast tool result is already a complete reply — skip router+LLM final_text = None
if fast_context and not force_complex and not url_context: llm_elapsed = 0.0
tier = "fast"
final_text = fast_context
llm_elapsed = time.monotonic() - t0
names = _fast_tool_runner.matching_names(clean_message)
print(f"[agent] tier=fast tools={names} — delivering directly", flush=True)
await _push_stream_chunk(session_id, final_text)
await _end_stream(session_id)
else:
tier, light_reply = await router.route(clean_message, enriched_history, force_complex)
# Messages with URL content must be handled by at least medium tier try:
if url_context and tier == "light": # Short-circuit: fast tool already has the answer
tier = "medium" if fast_context and tier_override is None and not url_context:
light_reply = None tier = "fast"
print("[agent] URL in message → upgraded light→medium", flush=True) final_text = fast_context
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True) llm_elapsed = time.monotonic() - t0
names = _fast_tool_runner.matching_names(clean_message)
print(f"[agent] tier=fast tools={names} — delivering directly", flush=True)
yield final_text
else:
# Determine tier
if tier_override in ("light", "medium", "complex"):
tier = tier_override
light_reply = None
if tier_override == "light":
tier, light_reply = await router.route(clean_message, enriched_history)
tier = "light"
else:
tier, light_reply = await router.route(clean_message, enriched_history)
if url_context and tier == "light":
tier = "medium"
light_reply = None
print("[agent] URL in message → upgraded light→medium", flush=True)
# Dry-run: log as complex but infer with medium (no remote API call)
effective_tier = tier
if dry_run and tier == "complex":
effective_tier = "medium"
print(f"[agent] tier=complex (dry-run) → using medium model, message={clean_message[:60]!r}", flush=True)
else:
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
tier = effective_tier
if tier_capture is not None:
tier_capture.append(tier)
if tier != "fast":
final_text = None
try:
if tier == "light": if tier == "light":
final_text = light_reply final_text = light_reply
llm_elapsed = time.monotonic() - t0 llm_elapsed = time.monotonic() - t0
print(f"[agent] light path: answered by router", flush=True) print("[agent] light path: answered by router", flush=True)
await _push_stream_chunk(session_id, final_text) yield final_text
await _end_stream(session_id)
elif tier == "medium": elif tier == "medium":
system_prompt = MEDIUM_SYSTEM_PROMPT system_prompt = MEDIUM_SYSTEM_PROMPT
if memories: if memories:
system_prompt = system_prompt + "\n\n" + memories system_prompt += "\n\n" + memories
if url_context: if url_context:
system_prompt = system_prompt + "\n\n" + url_context system_prompt += "\n\n" + url_context
if fast_context: if fast_context:
system_prompt = system_prompt + "\n\nLive web search results (use these to answer):\n\n" + fast_context system_prompt += "\n\nLive web search results (use these to answer):\n\n" + fast_context
# Stream tokens directly — filter out qwen3 <think> blocks
in_think = False in_think = False
response_parts = [] response_parts = []
async for chunk in medium_model.astream([ async for chunk in medium_model.astream([
@@ -464,91 +535,117 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
in_think = False in_think = False
after = token.split("</think>", 1)[1] after = token.split("</think>", 1)[1]
if after: if after:
await _push_stream_chunk(session_id, after) yield after
response_parts.append(after) response_parts.append(after)
else: else:
if "<think>" in token: if "<think>" in token:
in_think = True in_think = True
before = token.split("<think>", 1)[0] before = token.split("<think>", 1)[0]
if before: if before:
await _push_stream_chunk(session_id, before) yield before
response_parts.append(before) response_parts.append(before)
else: else:
await _push_stream_chunk(session_id, token) yield token
response_parts.append(token) response_parts.append(token)
await _end_stream(session_id)
llm_elapsed = time.monotonic() - t0 llm_elapsed = time.monotonic() - t0
final_text = "".join(response_parts).strip() or None final_text = "".join(response_parts).strip() or None
else: # complex else: # complex — remote model, no VRAM management needed
ok = await vram_manager.enter_complex_mode() system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id)
if not ok: if url_context:
print("[agent] complex→medium fallback (eviction timeout)", flush=True) system_prompt += "\n\n[Pre-fetched URL content from user's message:]\n" + url_context
tier = "medium" result = await complex_agent.ainvoke({
system_prompt = MEDIUM_SYSTEM_PROMPT "messages": [
if memories: {"role": "system", "content": system_prompt},
system_prompt = system_prompt + "\n\n" + memories *history,
if url_context: {"role": "user", "content": clean_message},
system_prompt = system_prompt + "\n\n" + url_context ]
result = await medium_agent.ainvoke({ })
"messages": [
{"role": "system", "content": system_prompt},
*history,
{"role": "user", "content": clean_message},
]
})
else:
system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id)
if url_context:
system_prompt = system_prompt + "\n\n[Pre-fetched URL content from user's message:]\n" + url_context
result = await complex_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
*history,
{"role": "user", "content": clean_message},
]
})
asyncio.create_task(vram_manager.exit_complex_mode())
llm_elapsed = time.monotonic() - t0 llm_elapsed = time.monotonic() - t0
_log_messages(result) _log_messages(result)
final_text = _extract_final_text(result) final_text = _extract_final_text(result)
if final_text: if final_text:
await _push_stream_chunk(session_id, final_text) yield final_text
await _end_stream(session_id)
except Exception as e: except Exception as e:
import traceback import traceback
llm_elapsed = time.monotonic() - t0 llm_elapsed = time.monotonic() - t0
print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True) print(f"[agent] error after {llm_elapsed:.1f}s for {session_id}: {e}", flush=True)
traceback.print_exc() traceback.print_exc()
await _end_stream(session_id)
# Deliver reply through the originating channel print(f"[agent] pipeline done in {time.monotonic() - t0:.1f}s tier={tier if 'tier' in dir() else '?'}", flush=True)
# Store memory as side-effect (non-blocking, best-effort)
if final_text: if final_text:
t1 = time.monotonic() asyncio.create_task(_store_memory(session_id, clean_message, final_text))
# ── core task (Telegram / Matrix / CLI wrapper) ─────────────────────────────────
async def run_agent_task(
message: str,
session_id: str,
channel: str = "telegram",
metadata: dict | None = None,
):
print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True)
t0 = time.monotonic()
meta = metadata or {}
dry_run = bool(meta.get("dry_run", False))
is_benchmark = bool(meta.get("benchmark", False))
history = _conversation_buffers.get(session_id, [])
final_text = None
actual_tier = "unknown"
tier_capture: list = []
async for chunk in _run_agent_pipeline(message, history, session_id, dry_run=dry_run, tier_capture=tier_capture):
await _push_stream_chunk(session_id, chunk)
if final_text is None:
final_text = chunk
else:
final_text += chunk
await _end_stream(session_id)
actual_tier = tier_capture[0] if tier_capture else "unknown"
elapsed_ms = int((time.monotonic() - t0) * 1000)
if final_text:
final_text = final_text.strip()
# Skip channel delivery for benchmark sessions (no Telegram spam)
if not is_benchmark:
try: try:
await channels.deliver(session_id, channel, final_text) await channels.deliver(session_id, channel, final_text)
except Exception as e: except Exception as e:
print(f"[agent] delivery error (non-fatal): {e}", flush=True) print(f"[agent] delivery error (non-fatal): {e}", flush=True)
send_elapsed = time.monotonic() - t1
print(
f"[agent] replied in {time.monotonic() - t0:.1f}s "
f"(llm={llm_elapsed:.1f}s, send={send_elapsed:.1f}s) tier={tier}",
flush=True,
)
print(f"[agent] reply_text: {final_text}", flush=True)
else:
print("[agent] warning: no text reply from agent", flush=True)
# Update conversation buffer and schedule memory storage print(f"[agent] replied in {elapsed_ms / 1000:.1f}s tier={actual_tier}", flush=True)
if final_text: print(f"[agent] reply_text: {final_text}", flush=True)
buf = _conversation_buffers.get(session_id, [])
buf.append({"role": "user", "content": clean_message}) # Update conversation buffer
buf.append({"role": "assistant", "content": final_text}) buf = _conversation_buffers.get(session_id, [])
_conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):] buf.append({"role": "user", "content": message})
asyncio.create_task(_store_memory(session_id, clean_message, final_text)) buf.append({"role": "assistant", "content": final_text})
_conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):]
# Log interaction for RLHF data collection (skip benchmark sessions to avoid noise)
if not is_benchmark:
asyncio.create_task(_log_interaction(
session_id=session_id,
channel=channel,
tier=actual_tier,
input_text=message,
response_text=final_text,
latency_ms=elapsed_ms,
metadata=meta if meta else None,
))
else:
print("[agent] warning: no text reply from agent", flush=True)
# ── endpoints ────────────────────────────────────────────────────────────────── # ── endpoints ──────────────────────────────────────────────────────────────────
@@ -560,7 +657,7 @@ async def message(request: InboundMessage, background_tasks: BackgroundTasks):
return JSONResponse(status_code=503, content={"error": "Agent not ready"}) return JSONResponse(status_code=503, content={"error": "Agent not ready"})
session_id = request.session_id session_id = request.session_id
channel = request.channel channel = request.channel
background_tasks.add_task(run_agent_task, request.text, session_id, channel) background_tasks.add_task(run_agent_task, request.text, session_id, channel, request.metadata)
return JSONResponse(status_code=202, content={"status": "accepted"}) return JSONResponse(status_code=202, content={"status": "accepted"})
@@ -622,3 +719,96 @@ async def stream_reply(session_id: str):
@app.get("/health") @app.get("/health")
async def health(): async def health():
return {"status": "ok", "agent_ready": medium_agent is not None} return {"status": "ok", "agent_ready": medium_agent is not None}
# ── OpenAI-compatible API (for OpenWebUI and other clients) ────────────────────
_TIER_MAP = {
"adolf": None,
"adolf-light": "light",
"adolf-medium": "medium",
"adolf-deep": "complex",
}
@app.get("/v1/models")
async def list_models():
return {
"object": "list",
"data": [
{"id": "adolf", "object": "model", "owned_by": "adolf"},
{"id": "adolf-light", "object": "model", "owned_by": "adolf"},
{"id": "adolf-medium", "object": "model", "owned_by": "adolf"},
{"id": "adolf-deep", "object": "model", "owned_by": "adolf"},
],
}
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
if medium_agent is None:
return JSONResponse(status_code=503, content={"error": {"message": "Agent not ready", "type": "server_error"}})
body = await request.json()
model = body.get("model", "adolf")
messages = body.get("messages", [])
stream = body.get("stream", True)
# Extract current user message and history
user_messages = [m for m in messages if m.get("role") == "user"]
if not user_messages:
return JSONResponse(status_code=400, content={"error": {"message": "No user message", "type": "invalid_request_error"}})
current_message = user_messages[-1]["content"]
# History = everything before the last user message (excluding system messages from OpenWebUI)
last_user_idx = len(messages) - 1 - next(
i for i, m in enumerate(reversed(messages)) if m.get("role") == "user"
)
history = [m for m in messages[:last_user_idx] if m.get("role") in ("user", "assistant")]
session_id = request.headers.get("X-Session-Id", "owui-default")
tier_override = _TIER_MAP.get(model)
import json as _json
import uuid as _uuid
response_id = f"chatcmpl-{_uuid.uuid4().hex[:12]}"
if stream:
async def event_stream():
# Opening chunk with role
opening = {
"id": response_id, "object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}]
}
yield f"data: {_json.dumps(opening)}\n\n"
async for chunk in _run_agent_pipeline(current_message, history, session_id, tier_override):
data = {
"id": response_id, "object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": {"content": chunk}, "finish_reason": None}]
}
yield f"data: {_json.dumps(data)}\n\n"
# Final chunk
final = {
"id": response_id, "object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
}
yield f"data: {_json.dumps(final)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
else:
# Non-streaming: collect all chunks
parts = []
async for chunk in _run_agent_pipeline(current_message, history, session_id, tier_override):
if chunk:
parts.append(chunk)
full_text = "".join(parts).strip()
return {
"id": response_id, "object": "chat.completion",
"choices": [{"index": 0, "message": {"role": "assistant", "content": full_text}, "finish_reason": "stop"}],
"model": model,
}

View File

@@ -1,4 +1,21 @@
{ {
"auth_config": {
"is_enabled": true,
"admin_username": "admin",
"admin_password": "env.BIFROST_ADMIN_PASSWORD"
},
"config_store": {
"enabled": true,
"type": "postgres",
"config": {
"host": "bifrost-db",
"port": "5432",
"user": "bifrost",
"password": "bifrost",
"db_name": "bifrost",
"ssl_mode": "disable"
}
},
"client": { "client": {
"drop_excess_requests": false "drop_excess_requests": false
}, },

View File

@@ -49,6 +49,7 @@ async def deliver(session_id: str, channel: str, text: str) -> None:
# ── built-in channel adapters ───────────────────────────────────────────────── # ── built-in channel adapters ─────────────────────────────────────────────────
GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001") GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001")
MATRIX_URL = os.getenv("MATRIX_URL", "http://matrix:3002")
async def _telegram_send(session_id: str, text: str) -> None: async def _telegram_send(session_id: str, text: str) -> None:
@@ -64,12 +65,26 @@ async def _telegram_send(session_id: str, text: str) -> None:
) )
async def _matrix_send(session_id: str, text: str) -> None:
"""Send reply to Matrix via the matrix adapter POST /send endpoint."""
room_id = session_id.removeprefix("mx-")
MAX_MTX = 4000
chunks = [text[i:i + MAX_MTX] for i in range(0, len(text), MAX_MTX)]
async with httpx.AsyncClient(timeout=15) as client:
for chunk in chunks:
await client.post(
f"{MATRIX_URL}/send",
json={"room_id": room_id, "text": chunk},
)
async def _cli_send(session_id: str, text: str) -> None: async def _cli_send(session_id: str, text: str) -> None:
"""CLI replies are handled entirely through the pending_replies queue — no-op here.""" """CLI replies are handled entirely through the pending_replies queue — no-op here."""
pass pass
def register_defaults() -> None: def register_defaults() -> None:
"""Register the built-in Telegram and CLI channel adapters.""" """Register the built-in Telegram, Matrix, and CLI channel adapters."""
register("telegram", _telegram_send) register("telegram", _telegram_send)
register("matrix", _matrix_send)
register("cli", _cli_send) register("cli", _cli_send)

View File

@@ -1,19 +1,4 @@
services: services:
bifrost:
image: maximhq/bifrost
container_name: bifrost
ports:
- "8080:8080"
volumes:
- ./bifrost-config.json:/app/data/config.json:ro
environment:
- APP_DIR=/app/data
- APP_PORT=8080
- LOG_LEVEL=info
extra_hosts:
- "host.docker.internal:host-gateway"
restart: unless-stopped
deepagents: deepagents:
build: . build: .
container_name: deepagents container_name: deepagents
@@ -21,25 +6,28 @@ services:
- "8000:8000" - "8000:8000"
environment: environment:
- PYTHONUNBUFFERED=1 - PYTHONUNBUFFERED=1
# Bifrost gateway — all LLM inference goes through here # LiteLLM proxy — all LLM inference goes through here
- BIFROST_URL=http://bifrost:8080/v1 - LITELLM_URL=http://host.docker.internal:4000/v1
- LITELLM_API_KEY=sk-fjQC1BxAiGFSMs
# Direct Ollama GPU URL — used only by VRAMManager for flush/prewarm # Direct Ollama GPU URL — used only by VRAMManager for flush/prewarm
- OLLAMA_BASE_URL=http://host.docker.internal:11436 - OLLAMA_BASE_URL=http://host.docker.internal:11436
- DEEPAGENTS_MODEL=qwen3:4b - DEEPAGENTS_MODEL=qwen3:4b
- DEEPAGENTS_COMPLEX_MODEL=qwen3:8b - DEEPAGENTS_COMPLEX_MODEL=deepseek/deepseek-r1:free
- DEEPAGENTS_ROUTER_MODEL=qwen2.5:1.5b - DEEPAGENTS_ROUTER_MODEL=qwen2.5:1.5b
- SEARXNG_URL=http://host.docker.internal:11437 - SEARXNG_URL=http://host.docker.internal:11437
- GRAMMY_URL=http://grammy:3001 - GRAMMY_URL=http://grammy:3001
- MATRIX_URL=http://host.docker.internal:3002
- CRAWL4AI_URL=http://crawl4ai:11235 - CRAWL4AI_URL=http://crawl4ai:11235
- ROUTECHECK_URL=http://routecheck:8090 - ROUTECHECK_URL=http://routecheck:8090
- ROUTECHECK_TOKEN=${ROUTECHECK_TOKEN} - ROUTECHECK_TOKEN=${ROUTECHECK_TOKEN}
volumes:
- ./logs:/app/logs
extra_hosts: extra_hosts:
- "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"
depends_on: depends_on:
- openmemory - openmemory
- grammy - grammy
- crawl4ai - crawl4ai
- bifrost
- routecheck - routecheck
restart: unless-stopped restart: unless-stopped

439
router.py
View File

@@ -1,11 +1,38 @@
import asyncio
import re import re
import math
from typing import Optional from typing import Optional
from openai import AsyncOpenAI
from langchain_core.messages import SystemMessage, HumanMessage from langchain_core.messages import SystemMessage, HumanMessage
from fast_tools import FastToolRunner from fast_tools import FastToolRunner
# ── Regex pre-classifier ───────────────────────────────────────────────────── # ── Regex pre-classifiers ─────────────────────────────────────────────────────
# Catches obvious light-tier patterns before calling the LLM.
# Keyed by regex → compiled pattern. # Complex: keyword triggers that reliably signal deep multi-source research
_COMPLEX_PATTERNS = re.compile(
r"(?:^|\s)("
r"research|investigate|deep.dive|think carefully"
r"|write a (?:detailed|comprehensive|full|thorough|complete)"
r"|compare all|find and (?:compare|summarize|analyze)"
r"|in[- ]depth analysis|comprehensive guide"
r"|detailed (?:report|analysis|comparison|breakdown|overview)"
r"|everything about|all (?:major|available|self-hosted|open.source)"
r"|pros and cons|with (?:sources|citations|references)"
# Russian complex research keywords (no trailing \b — stems like подробн match подробное/подробный)
r"|исследуй|изучи все|сравни все|найди и сравни|найди и опиши"
r"|напиши подробн|напиши детальн|напиши полн"
r"|подробный отчет|детальн\w+ (?:анализ|сравнение|отчет)"
r"|подробное (?:руководство|сравнение)|полное руководство"
r"|все варианты|все способы|все доступные|все самохостируемые|все платформы"
r"|лучшие практики|все инструменты|все решения|все протоколы"
r"|найди детальн|найди и кратко опиши"
r"|изучи свежие|изучи лучши|изучи все"
r"|сравни все\b"
r")",
re.IGNORECASE,
)
# Light: trivial queries that need no tools or memory
_LIGHT_PATTERNS = re.compile( _LIGHT_PATTERNS = re.compile(
r"^(" r"^("
# Greetings / farewells # Greetings / farewells
@@ -15,35 +42,301 @@ _LIGHT_PATTERNS = re.compile(
r"|thanks?|thank you|thx|ty|ok|okay|k|cool|great|awesome|perfect|sounds good|got it|nice|sure" r"|thanks?|thank you|thx|ty|ok|okay|k|cool|great|awesome|perfect|sounds good|got it|nice|sure"
r"|how are you|how are you\?|how are you doing(\s+today)?[?!.]*" r"|how are you|how are you\?|how are you doing(\s+today)?[?!.]*"
r"|what.?s up" r"|what.?s up"
# Calendar facts: "what day comes after X?" / "what comes after X?" # Calendar facts
r"|what\s+day\s+(comes\s+after|follows|is\s+after)\s+\w+[?!.]*" r"|what\s+day\s+(comes\s+after|follows|is\s+after)\s+\w+[?!.]*"
r"|what\s+comes\s+after\s+\w+[?!.]*" r"|what\s+comes\s+after\s+\w+[?!.]*"
# Acronym expansions: "what does X stand for?" # Acronym expansions
r"|what\s+does\s+\w+\s+stand\s+for[?!.]*" r"|what\s+does\s+\w+\s+stand\s+for[?!.]*"
# Russian greetings / farewells / acknowledgements
r"|привет|пока|спасибо|здравствуй|здравствуйте|добрый день|добрый вечер|доброе утро"
r"|окей|хорошо|отлично|понятно|ок|ладно|договорились|спс|благодарю"
r"|пожалуйста|не за что|всё понятно|ясно"
r"|как дела|как ты|как жизнь|всё хорошо|всё ок"
r")[\s!.?]*$", r")[\s!.?]*$",
re.IGNORECASE, re.IGNORECASE,
) )
# ── LLM classification prompt ───────────────────────────────────────────────── # ── Semantic router utterances ────────────────────────────────────────────────
CLASSIFY_PROMPT = """Classify the message. Output ONLY one word: light, medium, or complex. # These are embedded at startup. New messages are classified by cosine
# similarity — whichever tier's centroid is closest wins.
_LIGHT_UTTERANCES = [
# General facts (English)
"what is 2+2",
"what is the capital of France",
"name the three primary colors",
"tell me a short joke",
"is the sky blue",
"is water wet",
"how many days in a week",
"what is the speed of light",
"what is the boiling point of water",
"spell the word beautiful",
"what color is the ocean",
"how many inches in a foot",
"who wrote hamlet",
"what is pi",
"what year did world war two end",
"what is the largest planet",
"how many continents are there",
"what does DNA stand for",
"what language do they speak in Brazil",
"what is the square root of 144",
# Tech definitions — static knowledge (English)
"what is Docker",
"what is a VPN",
"what is SSH",
"what is a reverse proxy",
"what is an API",
"what is a firewall",
"what is a container",
"what is DNS",
"what is HTTPS",
"what is a load balancer",
"what is Kubernetes",
"what is Git",
"what is a network port",
"what is an IP address",
"what is a subnet mask",
"what is the OSI model",
"how many bits in a byte",
"how many bytes in a gigabyte",
"what is TCP",
"what is a REST API",
# Russian — static facts and definitions
"что такое IP-адрес",
"что такое VPN",
"что такое Docker",
"что такое DNS",
"что такое SSH",
"что означает API",
"сколько байт в гигабайте",
"сколько бит в байте",
"что такое Zigbee",
"что такое Z-Wave",
"что такое брандмауэр",
"что такое виртуальная машина",
"что такое обратный прокси",
"привет",
"пока",
"спасибо",
"как дела",
"что такое Matter протокол",
"сколько планет в солнечной системе",
"чему равно число Пи",
# Russian — more static definitions
"что такое TCP/IP",
"что такое подсеть",
"скорость света",
"сколько дней в году",
"что такое Kubernetes",
"что такое Git",
"что такое REST API",
"что такое TCP",
"что такое UDP",
"что такое VLAN",
"сколько мегабайт в гигабайте",
"что такое процессор",
"что такое оперативная память",
"что такое виртуализация",
"что такое Linux",
"что такое умный дом",
"что такое Home Assistant",
"что такое Matter",
]
LIGHT = answerable from general knowledge, no internet needed: _MEDIUM_UTTERANCES = [
what is 2+2 / what is the capital of France / name the three primary colors # English — current data, memory, actions
tell me a short joke / is the sky blue / is water wet "what is the weather today",
"what is the bitcoin price right now",
"what are the latest news",
"what did we talk about last time",
"what is my name",
"where do I live",
"what do you know about me",
"what did I tell you before",
"what is the current temperature outside",
"remind me what I said about my project",
"search for the latest iPhone release",
"find me a restaurant nearby",
"turn on the lights in the living room",
"turn off all lights",
"set temperature to 22 degrees",
"what is the current traffic to Moscow",
"check if anyone is home",
"what devices are currently on",
"look up my public IP address",
"show me recent news about Proxmox",
# Russian — weather and commute
"какая сегодня погода в Балашихе",
"пойдет ли сегодня дождь",
"какая температура на улице сейчас",
"погода на завтра",
"будет ли снег сегодня",
"сколько ехать до Москвы сейчас",
"какие пробки на дороге до Москвы",
"время в пути на работу",
"есть ли пробки сейчас",
"стоит ли брать зонтик",
# Russian — smart home control
"включи свет в гостиной",
"выключи свет на кухне",
"какая температура дома",
"установи температуру 22 градуса",
"выключи все лампочки",
"какие устройства сейчас включены",
"включи ночной режим",
"открой шторы в гостиной",
"включи свет в спальне на 50 процентов",
"выключи свет во всём доме",
"включи вентилятор в детской",
"закрыты ли все окна",
"выключи телевизор",
"какое потребление электричества сегодня",
"включи кофемашину",
"сколько у нас датчиков движения",
"состояние всех дверных замков",
"есть ли кто-нибудь дома",
"установи будильник на 7 утра",
# Russian — personal memory
"как меня зовут",
"где я живу",
"что мы обсуждали в прошлый раз",
"что ты знаешь о моем домашнем сервере",
"напомни, какие сервисы я запускаю",
"что я просил тебя запомнить",
"что я говорил о своей сети",
# Russian — current info lookups requiring network/tools
"какой сейчас курс биткоина",
"курс доллара к рублю сейчас",
"какая последняя версия Docker",
"как перезапустить Docker контейнер",
"как посмотреть логи Docker контейнера",
"какие новые функции в Home Assistant 2024",
"есть ли проблемы у Cloudflare сегодня",
"какие новые Zigbee устройства вышли в 2024 году",
"найди хороший опенсорс менеджер фотографий",
"последние новости Proxmox",
"напиши bash команду для поиска больших файлов",
"как вывести список всех запущенных контейнеров",
"как проверить использование диска в Linux",
]
MEDIUM = requires web search or the user's stored memories: _COMPLEX_UTTERANCES = [
current weather / today's news / Bitcoin price / what did we talk about # English
what is my name / where do I live / what is my job / do I have any pets "research everything about Elon Musk's recent projects and investments",
what do you know about me / what are my preferences / what did I tell you "write a detailed report on climate change solutions with sources",
"investigate the history and current state of quantum computing",
"find and summarize the latest academic papers on transformer architectures",
"analyze in depth the pros and cons of nuclear energy with citations",
"research the background and controversies around this person",
"compare all major cloud providers with detailed pricing and features",
"write a comprehensive biography of this historical figure",
"investigate what caused the 2008 financial crisis with multiple sources",
"research the best programming languages in 2024 with detailed comparison",
"find everything published about this medical condition and treatments",
"do a deep dive into the latest developments in artificial general intelligence",
"research and compare all options for starting a business in Europe",
"investigate recent news and controversies around this company",
"write a thorough analysis of geopolitical tensions in the Middle East",
"find detailed information on the side effects and studies for this medication",
"research the top 10 JavaScript frameworks with benchmarks and community data",
"investigate who is funding AI research and what their goals are",
"write a detailed market analysis for the electric vehicle industry",
"research everything you can find about this startup or technology",
# Russian — deep research
"исследуй и сравни все варианты умного домашнего освещения",
"напиши подробный отчет о протоколах умного дома",
"изучи все самохостируемые медиасерверы и сравни их",
"исследуй лучшие практики безопасности домашнего сервера",
"сравни все системы резервного копирования для Linux",
"напиши детальное сравнение WireGuard и OpenVPN",
"исследуй все варианты голосового управления на русском языке",
"изучи все опенсорс альтернативы Google сервисам",
"напиши подробный анализ локальных языковых моделей",
"исследуй лучшие инструменты мониторинга для домашнего сервера",
# Russian — more deep research queries matching benchmark
"исследуй и сравни Proxmox, Unraid и TrueNAS для домашней лаборатории",
"напиши подробное руководство по безопасности домашнего сервера",
"исследуй все доступные дашборды для самохостинга и сравни их",
"найди детальные бенчмарки ARM одноплатных компьютеров для домашней лаборатории",
"исследуй лучший стек мониторинга для самохостинга в 2024 году",
"исследуй и сравни WireGuard, OpenVPN и Tailscale для домашней сети",
"исследуй лучшие практики сегментации домашней сети с VLAN",
"изучи все самохостируемые DNS решения и их возможности",
"исследуй и сравни все платформы умного дома: Home Assistant и другие",
"изучи лучшие Zigbee координаторы и их совместимость с Home Assistant",
"напиши детальный отчет о поддержке протокола Matter и совместимости устройств",
"исследуй все способы интеграции умных ламп с Home Assistant",
"найди и сравни все варианты датчиков движения для умного дома",
"исследуй и сравни все самохостируемые решения для хранения фотографий",
"изучи лучшие самохостируемые медиасерверы: Jellyfin, Plex и Emby",
"исследуй последние достижения в локальном LLM инференсе и обзор моделей",
"изучи лучшие опенсорс альтернативы Google сервисов для приватности",
"найди и кратко опиши все крупные самохостируемые менеджеры паролей",
"напиши детальный анализ текущего состояния AI ассистентов для самохостинга",
"исследуй и сравни все инструменты оркестрации контейнеров для домашней лаборатории",
"изучи лучшие подходы к автоматическому резервному копированию в Linux",
"исследуй и сравни все самохостируемые инструменты личных финансов",
"изучи свежие CVE и уязвимости в популярном самохостируемом ПО",
"напиши подробное руководство по настройке автоматизаций в Home Assistant",
"исследуй все варианты голосового управления умным домом на русском языке",
"сравни все системы резервного копирования для Linux: Restic, BorgBackup и другие",
"исследуй лучшие самохостируемые системы мониторинга сети: Zabbix, Grafana",
"изучи все варианты локального запуска языковых моделей на видеокарте",
"напиши подробный отчет о технологиях синтеза речи с открытым исходным кодом",
"исследуй все способы интеграции умных розеток с мониторингом потребления",
"напиши полное руководство по настройке обратного прокси Caddy",
"исследуй лучшие практики написания Docker Compose файлов для продакшена",
"сравни все самохостируемые облачные хранилища: Nextcloud, Seafile и другие",
"изучи все доступные локальные ассистенты с голосовым управлением",
"исследуй все самохостируемые решения для блокировки рекламы: Pi-hole, AdGuard",
"напиши детальное сравнение систем управления конфигурацией: Ansible, Puppet",
"исследуй все протоколы умного дома и их плюсы и минусы: Zigbee, Z-Wave, Matter",
"найди и сравни все фреймворки для создания локальных AI ассистентов",
"исследуй лучшие решения для автоматического управления медиатекой",
"изучи все варианты самохостируемых систем учёта расходов с возможностью импорта",
"напиши сравнение всех вариантов самохостинга для хранения и синхронизации файлов",
"исследуй все открытые протоколы для умного дома и их экосистемы",
"изучи лучшие инструменты для автоматизации домашней инфраструктуры",
]
COMPLEX = /think prefix only: # Medium: queries that require tools, actions, or real-time data (not static knowledge)
/think compare frameworks / /think plan a trip _MEDIUM_PATTERNS = re.compile(
r"(?:"
Message: {message} # Russian smart home commands — always need HA integration
Output (one word only — light, medium, or complex):""" r"(?:включи|выключи|открой|закрой|установи|поставь|убавь|прибавь|переключи)\s"
r"|(?:какая|какой|какое|каково)\s+(?:температура|влажность|потребление|состояние|статус)\s"
r"|(?:сколько|есть ли)\s.*(?:датчик|устройств|замк)"
# Russian memory queries
r"|как меня зовут|где я живу|что мы обсуждали|что я говорил|что я просил"
r"|напомни\b|что ты знаешь обо мне"
# Russian current info
r"|курс (?:доллара|биткоина|евро|рубл)"
r"|(?:последние |свежие )?новости\b"
r"|(?:погода|температура)\s+(?:на завтра|на неделю)"
r")",
re.IGNORECASE,
)
LIGHT_REPLY_PROMPT = """You are a helpful Telegram assistant. Answer briefly and naturally (1-3 sentences). Be friendly.""" LIGHT_REPLY_PROMPT = """You are a helpful Telegram assistant. Answer briefly and naturally (1-3 sentences). Be friendly."""
_EMBED_MODEL = "ollama/nomic-embed-text"
def _cosine(a: list[float], b: list[float]) -> float:
dot = sum(x * y for x, y in zip(a, b))
norm_a = math.sqrt(sum(x * x for x in a))
norm_b = math.sqrt(sum(x * x for x in b))
if norm_a == 0 or norm_b == 0:
return 0.0
return dot / (norm_a * norm_b)
def _centroid(embeddings: list[list[float]]) -> list[float]:
n = len(embeddings)
dim = len(embeddings[0])
return [sum(embeddings[i][d] for i in range(n)) / n for d in range(dim)]
def _format_history(history: list[dict]) -> str: def _format_history(history: list[dict]) -> str:
if not history: if not history:
@@ -56,71 +349,93 @@ def _format_history(history: list[dict]) -> str:
return "\n".join(lines) return "\n".join(lines)
def _parse_tier(text: str) -> str:
"""Extract tier from raw model output. Default to medium."""
t = text.strip().lower()
snippet = t[:60]
if "complex" in snippet:
return "complex"
if "medium" in snippet:
return "medium"
if "light" in snippet:
return "light"
# Model invented a descriptive category (e.g. "simplefact", "trivial", "basic") →
# treat as light since it recognised the question doesn't need tools
if any(w in snippet for w in ("simple", "fact", "trivial", "basic", "easy", "general")):
return "light"
return "medium" # safe default
class Router: class Router:
def __init__(self, model, fast_tool_runner: FastToolRunner | None = None): def __init__(
self.model = model self,
model,
embedder: AsyncOpenAI,
fast_tool_runner: FastToolRunner | None = None,
):
self.model = model # qwen2.5:1.5b — used only for generating light replies
self._embedder = embedder
self._fast_tool_runner = fast_tool_runner self._fast_tool_runner = fast_tool_runner
self._light_centroid: list[float] | None = None
self._medium_centroid: list[float] | None = None
self._complex_centroid: list[float] | None = None
async def initialize(self) -> None:
"""Pre-compute utterance embeddings. Call once at startup. Retries until LiteLLM is ready."""
print("[router] embedding utterances for semantic classifier...", flush=True)
texts = _LIGHT_UTTERANCES + _MEDIUM_UTTERANCES + _COMPLEX_UTTERANCES
for attempt in range(10):
try:
resp = await self._embedder.embeddings.create(model=_EMBED_MODEL, input=texts)
embeddings = [item.embedding for item in resp.data]
n_light = len(_LIGHT_UTTERANCES)
n_medium = len(_MEDIUM_UTTERANCES)
self._light_centroid = _centroid(embeddings[:n_light])
self._medium_centroid = _centroid(embeddings[n_light:n_light + n_medium])
self._complex_centroid = _centroid(embeddings[n_light + n_medium:])
print("[router] semantic classifier ready (3-tier)", flush=True)
return
except Exception as e:
print(f"[router] embedding attempt {attempt+1}/10 failed: {e}", flush=True)
await asyncio.sleep(3)
print("[router] WARNING: could not initialize semantic classifier — will default to medium", flush=True)
async def _classify_by_embedding(self, message: str) -> str:
"""Embed message and return 'light', 'medium', or 'complex' based on centroid similarity."""
if self._light_centroid is None or self._medium_centroid is None or self._complex_centroid is None:
return "medium"
try:
resp = await self._embedder.embeddings.create(model=_EMBED_MODEL, input=[message])
emb = resp.data[0].embedding
score_light = _cosine(emb, self._light_centroid)
score_medium = _cosine(emb, self._medium_centroid)
score_complex = _cosine(emb, self._complex_centroid)
tier = max(
[("light", score_light), ("medium", score_medium), ("complex", score_complex)],
key=lambda x: x[1],
)[0]
print(
f"[router] semantic: light={score_light:.3f} medium={score_medium:.3f} "
f"complex={score_complex:.3f}{tier}",
flush=True,
)
return tier
except Exception as e:
print(f"[router] embedding classify error, defaulting to medium: {e}", flush=True)
return "medium"
async def route( async def route(
self, self,
message: str, message: str,
history: list[dict], history: list[dict],
force_complex: bool = False,
) -> tuple[str, Optional[str]]: ) -> tuple[str, Optional[str]]:
""" """
Returns (tier, reply_or_None). Returns (tier, reply_or_None).
For light tier: also generates the reply with a second call. For light tier: also generates the reply inline.
For medium/complex: reply is None. For medium/complex: reply is None.
""" """
if force_complex:
return "complex", None
# Step 0a: fast tool match — agent.py short-circuits before reaching router
# This branch is only hit if force_complex=True with a fast-tool message (rare)
if self._fast_tool_runner and self._fast_tool_runner.any_matches(message.strip()): if self._fast_tool_runner and self._fast_tool_runner.any_matches(message.strip()):
names = self._fast_tool_runner.matching_names(message.strip()) names = self._fast_tool_runner.matching_names(message.strip())
print(f"[router] fast_tool_match={names} → medium", flush=True) print(f"[router] fast_tool_match={names} → medium", flush=True)
return "medium", None return "medium", None
# Step 0b: regex pre-classification for obvious light patterns
if _LIGHT_PATTERNS.match(message.strip()): if _LIGHT_PATTERNS.match(message.strip()):
print(f"[router] regex→light", flush=True) print("[router] regex→light", flush=True)
return await self._generate_light_reply(message, history) return await self._generate_light_reply(message, history)
# Step 1: LLM classification with raw text output if _COMPLEX_PATTERNS.search(message.strip()):
try: print("[router] regex→complex", flush=True)
classify_response = await self.model.ainvoke([ return "complex", None
HumanMessage(content=CLASSIFY_PROMPT.format(message=message)),
])
raw = classify_response.content or ""
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
tier = _parse_tier(raw)
if tier == "complex" and not message.startswith("/think"): if _MEDIUM_PATTERNS.search(message.strip()):
tier = "medium" print("[router] regex→medium", flush=True)
print(f"[router] raw={raw[:30]!r} → tier={tier}", flush=True)
except Exception as e:
print(f"[router] classify error, defaulting to medium: {e}", flush=True)
return "medium", None return "medium", None
tier = await self._classify_by_embedding(message)
if tier != "light": if tier != "light":
return tier, None return tier, None
@@ -129,7 +444,7 @@ class Router:
async def _generate_light_reply( async def _generate_light_reply(
self, message: str, history: list[dict] self, message: str, history: list[dict]
) -> tuple[str, Optional[str]]: ) -> tuple[str, Optional[str]]:
"""Generate a short reply using the router model for light-tier messages.""" """Generate a short reply using qwen2.5:1.5b for light-tier messages."""
history_text = _format_history(history) history_text = _format_history(history)
context = f"\nConversation history:\n{history_text}" if history else "" context = f"\nConversation history:\n{history_text}" if history else ""
try: try:

View File

@@ -199,14 +199,13 @@ def parse_run_block(lines, msg_prefix):
if txt: if txt:
last_ai_text = txt last_ai_text = txt
m = re.search(r"replied in ([\d.]+)s \(llm=([\d.]+)s, send=([\d.]+)s\)", line) m = re.search(r"replied in ([\d.]+)s(?:\s+tier=(\w+))?", line)
if m: if m:
tier_m = re.search(r"\btier=(\w+)", line) tier = m.group(2) if m.group(2) else "unknown"
tier = tier_m.group(1) if tier_m else "unknown"
reply_data = { reply_data = {
"reply_total": float(m.group(1)), "reply_total": float(m.group(1)),
"llm": float(m.group(2)), "llm": None,
"send": float(m.group(3)), "send": None,
"tier": tier, "tier": tier,
"reply_text": last_ai_text, "reply_text": last_ai_text,
"memory_s": None, "memory_s": None,