25 Commits

Author SHA1 Message Date
887d4b8d90 voice benchmark: rename --dry-run → --no-inference, fix log extraction
- --no-inference applies to all tiers (not just complex)
- metadata key: dry_run → no_inference
- extract_tier_from_logs: forward iteration (not reversed), updated regex
- GPU check skipped when --no-inference
- Fix TypeError in misclassified print when actual=None

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 07:58:05 +00:00
4e6d3090c2 Remove benchmark.json from gitignore — dataset is now tracked
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 07:53:35 +00:00
5b09a99a7f Routing: 100% accuracy on realistic home assistant dataset
- router.py: skip light reply generation when no_inference=True;
  add control words (да/нет/стоп/отмена/повтори/подожди/etc.) to _LIGHT_PATTERNS
- agent.py: pass no_inference to router.route(); skip preflight IO in no_inference mode
- benchmarks/benchmark.json: replace definition-heavy queries with realistic
  Alexa/Google-Home style queries (greetings, smart home, timers, shopping,
  weather, personal memory, cooking) — 30 light / 60 medium / 30 complex

Routing benchmark: 120/120 (100%), all under 0.1s per query

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 07:53:01 +00:00
3fb90ae083 Skip _reply_semaphore in no_inference mode
No GPU inference happens in this mode, so serialization is not needed.
Without this, timed-out routing benchmark queries hold the semaphore
and cascade-block all subsequent queries.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 07:40:07 +00:00
4d37ac65b2 Skip preflight IO (memory/URL/fast-tools) when no_inference=True
In no_inference mode only the routing decision matters — fetching
memories and URLs adds latency without affecting the classification.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 07:37:55 +00:00
b7d5896076 routing benchmark: 1s strict deadline per query
QUERY_TIMEOUT=1s — classification and routing must complete within
1 second or the query is recorded as 'timeout'.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 07:35:13 +00:00
fc53632c7b Merge pull request 'feat: rename dry_run to no_inference for all tiers' (#17) from worktree-agent-afc013ce into main
Reviewed-on: #17
2026-03-24 07:27:04 +00:00
47a1166be6 Merge pull request 'feat: rename --dry-run to --no-inference in run_benchmark.py' (#18) from feat/no-inference-benchmark into main
Reviewed-on: #18
2026-03-24 07:26:44 +00:00
74e5b1758d Merge pull request 'feat: add run_routing_benchmark.py — routing-only benchmark' (#19) from feat/routing-benchmark into main
Reviewed-on: #19
2026-03-24 07:26:31 +00:00
0fbdbf3a5e Add run_routing_benchmark.py — dedicated routing-only benchmark
Tests routing accuracy for all tiers with no_inference=True hardcoded.
Fast (QUERY_TIMEOUT=30s), no GPU check, shares benchmark.json dataset.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 07:25:16 +00:00
77db739819 Rename --dry-run to --no-inference, apply to all tiers in run_benchmark.py
No-inference mode now skips LLM for all tiers (not just complex),
GPU check is auto-skipped, and the metadata key matches agent.py.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 03:49:09 +00:00
9c2f27eed4 Rename dry_run → no_inference, extend to all tiers in agent.py
When no_inference=True, routing decision is captured but all LLM
inference is skipped — yields constant "I don't know" immediately.
Also disables fast-tool short-circuit so routing path always runs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 03:43:42 +00:00
a363347ae5 Merge pull request 'Fix routing: add Russian tech def patterns to light, strengthen medium smart home' (#13) from fix/routing-accuracy into main
Reviewed-on: #13
2026-03-24 02:51:17 +00:00
1d2787766e Merge pull request 'Remove Bifrost: replace test 4 with LiteLLM health check' (#14) from fix/remove-bifrost into main
Reviewed-on: #14
2026-03-24 02:48:40 +00:00
abf792a2ec Remove Bifrost: replace test 4 with LiteLLM health check
- Remove BIFROST constant and fetch_bifrost_logs() from common.py
- Add LITELLM constant (localhost:4000)
- Replace test_memory.py test 4 (Bifrost pass-through) with LiteLLM health check

Fixes #5

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:46:01 +00:00
537e927146 Fix routing: add Russian tech def patterns to light, strengthen medium smart home
- _LIGHT_PATTERNS: add что\s+такое, что\s+означает, сколько бит/байт,
  compound greetings (привет, как дела) — these fell through to embedding
  which sometimes misclassified short Russian phrases as medium
- _MEDIUM_PATTERNS: add non-verb-first smart home patterns (свет/лампочка
  as subject, режим/сцена commands) for benchmark queries with different phrasing

Fixes #8, #9

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:45:42 +00:00
186e16284b Merge pull request 'Fix tier logging: capture actual_tier, fix parse_run_block regex, remove reply_text truncation' (#11) from fix/tier-logging into main
Reviewed-on: #11
2026-03-24 02:44:35 +00:00
0b428e4ada Merge pull request 'Fix benchmark log extraction: first tier match, increase log tail to 300' (#12) from fix/benchmark-log-extraction into main
Reviewed-on: #12
2026-03-24 02:43:26 +00:00
98095679be Fix benchmark log extraction: first tier match, increase log tail to 300
- Remove reversed() from extract_tier_from_logs: first match = routing decision
  (dry-run complex logs tier=complex early, then overwrites with tier=medium at done)
- Increase log tail from 80→300 to handle concurrent log activity

Fixes #7, #10

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:42:27 +00:00
8ef4897869 Fix tier logging: capture actual_tier, fix parse_run_block regex, remove reply_text truncation
- Add tier_capture param to _run_agent_pipeline; append tier after determination
- Capture actual_tier in run_agent_task from tier_capture list
- Log tier in replied-in line: [agent] replied in Xs tier=Y
- Remove reply_text[:200] truncation (was breaking benchmark keyword matching)
- Update parse_run_block regex to match new log format; llm/send fields now None

Fixes #1, #3, #4

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:41:59 +00:00
Alvis
1f5e272600 Switch from Bifrost to LiteLLM; add Matrix channel; update rules
Infrastructure:
- docker-compose.yml: replace bifrost container with LiteLLM proxy
  (host.docker.internal:4000); complex model → deepseek-r1:free via
  OpenRouter; add Matrix URL env var; mount logs volume
- bifrost-config.json: add auth_config + postgres config_store (archived)

Routing:
- router.py: full semantic 3-tier classifier rewrite — nomic-embed-text
  centroids for light/medium/complex; regex pre-classifiers for all tiers;
  Russian utterance sets expanded
- agent.py: wire LiteLLM URL; add dry_run support; add Matrix channel

Channels:
- channels.py: add Matrix adapter (_matrix_send via mx- session prefix)

Rules / docs:
- agent-pipeline.md: remove /think prefix requirement; document automatic
  complex tier classification
- llm-inference.md: update BIFROST_URL → LITELLM_URL references; add
  remote model note for complex tier
- ARCHITECTURE.md: deleted (superseded by README.md)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:14:13 +00:00
Alvis
54cb940279 Update docs: add benchmarks/ section, fix complex tier description
- CLAUDE.md: add benchmark commands (run_benchmark.py flags, dry-run,
  categories, voice benchmark)
- README.md: add benchmarks/ to Files tree; fix incorrect claim that
  complex tier requires /think prefix — it is auto-classified via regex
  and embedding similarity; fix "Complex agent (/think prefix)" heading

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:13:14 +00:00
Alvis
bd951f943f Move benchmark scripts into benchmarks/ subdir
- benchmarks/run_benchmark.py (was run_benchmark.py)
- benchmarks/run_voice_benchmark.py (was run_voice_benchmark.py)
- Scripts use Path(__file__).parent so paths resolve correctly in subdir
- .gitignore updated: ignore benchmarks/benchmark.json,
  results_latest.json, voice_results*.json, voice_audio/

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:02:46 +00:00
Alvis
ab68bba935 Add routing benchmark scripts; gitignore dataset and results
- run_benchmark.py: sends queries to /message, extracts tier= from docker
  logs, reports per-tier accuracy, saves results_latest.json
- run_voice_benchmark.py: voice path benchmark
- .gitignore: ignore benchmark.json (dataset) and results_latest.json
  (runtime output); benchmark scripts are tracked, data files are not

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:00:17 +00:00
Alvis
3ae1cefbd4 WeatherTool: fetch open-meteo directly, skip LLM for fast tool replies
- Replace SearXNG search with direct open-meteo.com API call (no key needed)
- WeatherTool now returns a ready-to-deliver reply string
- agent.py: short-circuit router+LLM when fast tools return a result (tier=fast)
- router.py: fast tool match no longer triggers light reply generation

Weather latency: 105-190s → ~1s

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 09:42:55 +00:00
17 changed files with 1966 additions and 294 deletions

View File

@@ -1,9 +1,11 @@
# Agent Pipeline Rules # Agent Pipeline Rules
## Tiers ## Tiers
- Complex tier requires `/think ` prefix. Any LLM classification of "complex" is downgraded to medium. Do not change this. - Routing is fully automatic: router classifies into light/medium/complex via 3-way embedding similarity.
- Medium is the default tier. Light is only for trivial static-knowledge queries matched by regex or LLM. - Complex tier is reached automatically for deep research queries — no prefix required.
- Medium is the default tier. Light is only for trivial static-knowledge queries matched by regex or embedding.
- Light tier upgrade to medium is automatic when URL content is pre-fetched or a fast tool matches. - Light tier upgrade to medium is automatic when URL content is pre-fetched or a fast tool matches.
- `tier_override` API parameter still allows callers to force a specific tier (e.g. `adolf-deep` model → complex).
## Medium agent ## Medium agent
- `_DirectModel` makes a single `ainvoke()` call with no tool schema. Do not add tools to the medium agent. - `_DirectModel` makes a single `ainvoke()` call with no tool schema. Do not add tools to the medium agent.

View File

@@ -1,7 +1,8 @@
# LLM Inference Rules # LLM Inference Rules
- All LLM calls must use `base_url=BIFROST_URL` with model name `ollama/<model>`. Never call Ollama directly for inference. - All LLM calls must use `base_url=LITELLM_URL` (points to LiteLLM at `host.docker.internal:4000/v1`). Never call Ollama directly for inference.
- `_reply_semaphore` (asyncio.Semaphore(1)) serializes all GPU inference. Never bypass it or add a second semaphore. - `_reply_semaphore` (asyncio.Semaphore(1)) serializes all GPU inference. Never bypass it or add a second semaphore.
- Model names in code always use the `ollama/` prefix: `ollama/qwen3:4b`, `ollama/qwen3:8b`, `ollama/qwen2.5:1.5b`. - Local Ollama models use the `ollama/` prefix: `ollama/qwen3:4b`, `ollama/qwen2.5:1.5b`. Remote models (e.g. OpenRouter) use their full LiteLLM name: `openrouter/deepseek-r1`.
- Timeout values: router=30s, medium=180s, complex=600s. Do not reduce them — GPU inference under load is slow. - Timeout values: router=30s, medium=180s, complex=600s. Do not reduce them.
- `VRAMManager` is the only component that contacts Ollama directly (for flush/prewarm/poll). This is intentional — Bifrost cannot manage VRAM. - `VRAMManager` is the only component that contacts Ollama directly (for flush/prewarm/poll). This is intentional — LiteLLM cannot manage VRAM.
- Complex tier uses a remote model (`DEEPAGENTS_COMPLEX_MODEL`) — no VRAM management is needed for it.

5
.gitignore vendored
View File

@@ -1,2 +1,7 @@
__pycache__/ __pycache__/
*.pyc *.pyc
logs/*.jsonl
adolf_tuning_data/voice_audio/
benchmarks/results_latest.json
benchmarks/voice_results*.json
benchmarks/voice_audio/

View File

@@ -18,8 +18,24 @@ python3 test_routing.py [--easy-only|--medium-only|--hard-only]
# Use case tests — read the .md file and follow its steps as Claude Code # Use case tests — read the .md file and follow its steps as Claude Code
# example: read tests/use_cases/weather_now.md and execute it # example: read tests/use_cases/weather_now.md and execute it
# Routing benchmark — measures tier classification accuracy across 120 queries
# Run from benchmarks/ — Adolf must be running. DO NOT run during active use (holds GPU).
cd benchmarks
python3 run_benchmark.py # full run (120 queries)
python3 run_benchmark.py --tier light # light tier only (30 queries)
python3 run_benchmark.py --tier medium # medium tier only (50 queries)
python3 run_benchmark.py --tier complex --dry-run # complex tier, medium model (no API cost)
python3 run_benchmark.py --category smart_home_control
python3 run_benchmark.py --ids 1,2,3
python3 run_benchmark.py --list-categories
# Voice benchmark
python3 run_voice_benchmark.py
# benchmark.json (dataset) and results_latest.json are gitignored — not committed
``` ```
## Architecture ## Architecture
@ARCHITECTURE.md @README.md

View File

@@ -74,7 +74,7 @@ Autonomous personal assistant with a multi-channel gateway. Three-tier model rou
Adolf uses LangChain's tool interface but only the complex agent actually invokes tools at runtime. Adolf uses LangChain's tool interface but only the complex agent actually invokes tools at runtime.
**Complex agent (`/think` prefix):** `web_search` and `fetch_url` are defined as `langchain_core.tools.Tool` objects and passed to `create_deep_agent()`. The deepagents library runs an agentic loop (LangGraph `create_react_agent` under the hood) that sends the tool schema to the model via OpenAI function-calling format and handles tool dispatch. **Complex agent:** `web_search` and `fetch_url` are defined as `langchain_core.tools.Tool` objects and passed to `create_deep_agent()`. The deepagents library runs an agentic loop (LangGraph `create_react_agent` under the hood) that sends the tool schema to the model via OpenAI function-calling format and handles tool dispatch.
**Medium agent (default):** `_DirectModel` makes a single `model.ainvoke(messages)` call with no tool schema. Context (memories, fetched URL content) is injected via the system prompt instead. This is intentional — `qwen3:4b` behaves unreliably when a tool array is present. **Medium agent (default):** `_DirectModel` makes a single `model.ainvoke(messages)` call with no tool schema. Context (memories, fetched URL content) is injected via the system prompt instead. This is intentional — `qwen3:4b` behaves unreliably when a tool array is present.
@@ -84,13 +84,13 @@ Adolf uses LangChain's tool interface but only the complex agent actually invoke
| Tier | Model | Agent | Trigger | Latency | | Tier | Model | Agent | Trigger | Latency |
|------|-------|-------|---------|---------| |------|-------|-------|---------|---------|
| Light | `qwen2.5:1.5b` (router answers directly) | — | Regex pre-match or LLM classifies "light" | ~24s | | Light | `qwen2.5:1.5b` (router answers directly) | — | Regex pre-match or 3-way embedding classifies "light" | ~24s |
| Medium | `qwen3:4b` (`DEEPAGENTS_MODEL`) | `_DirectModel` — single LLM call, no tools | Default; also forced when message contains URLs | ~1020s | | Medium | `qwen3:4b` (`DEEPAGENTS_MODEL`) | `_DirectModel` — single LLM call, no tools | Default; also forced when message contains URLs | ~1020s |
| Complex | `qwen3:8b` (`DEEPAGENTS_COMPLEX_MODEL`) | `create_deep_agent` — agentic loop with tools | `/think` prefix only | ~60120s | | Complex | `deepseek/deepseek-r1:free` via LiteLLM (`DEEPAGENTS_COMPLEX_MODEL`) | `create_deep_agent` — agentic loop with tools | Auto-classified by embedding similarity | ~3090s |
**`/think` prefix**: forces complex tier, stripped before sending to agent. Routing is fully automatic via 3-way cosine similarity over pre-embedded utterance centroids (light / medium / complex). No prefix required. Use `adolf-deep` model name to force complex tier via API.
Complex tier is locked out unless the message starts with `/think` — any LLM classification of "complex" is downgraded to medium. Complex tier is reached automatically for deep research queries — `исследуй`, `изучи все`, `напиши подробный`, etc. — via regex pre-classifier and embedding similarity. No prefix required. Use `adolf-deep` model name to force it via API.
## Fast Tools (`fast_tools.py`) ## Fast Tools (`fast_tools.py`)
@@ -164,7 +164,7 @@ Conversation history is keyed by session_id (5-turn buffer).
``` ```
adolf/ adolf/
├── docker-compose.yml Services: bifrost, deepagents, openmemory, grammy, crawl4ai, routecheck, cli ├── docker-compose.yml Services: deepagents, openmemory, grammy, crawl4ai, routecheck, cli
├── Dockerfile deepagents container (Python 3.12) ├── Dockerfile deepagents container (Python 3.12)
├── Dockerfile.cli CLI container (python:3.12-slim + rich) ├── Dockerfile.cli CLI container (python:3.12-slim + rich)
├── agent.py FastAPI gateway, run_agent_task, Crawl4AI pre-fetch, fast tools, memory pipeline ├── agent.py FastAPI gateway, run_agent_task, Crawl4AI pre-fetch, fast tools, memory pipeline
@@ -175,6 +175,11 @@ adolf/
├── agent_factory.py _DirectModel (medium) / create_deep_agent (complex) ├── agent_factory.py _DirectModel (medium) / create_deep_agent (complex)
├── cli.py Interactive CLI REPL — Rich Live streaming + Markdown render ├── cli.py Interactive CLI REPL — Rich Live streaming + Markdown render
├── wiki_research.py Batch wiki research pipeline (uses /message + SSE) ├── wiki_research.py Batch wiki research pipeline (uses /message + SSE)
├── benchmarks/
│ ├── run_benchmark.py Routing accuracy benchmark — 120 queries across 3 tiers
│ ├── run_voice_benchmark.py Voice path benchmark
│ ├── benchmark.json Query dataset (gitignored)
│ └── results_latest.json Last run results (gitignored)
├── .env TELEGRAM_BOT_TOKEN, ROUTECHECK_TOKEN, YANDEX_ROUTING_KEY (not committed) ├── .env TELEGRAM_BOT_TOKEN, ROUTECHECK_TOKEN, YANDEX_ROUTING_KEY (not committed)
├── routecheck/ ├── routecheck/
│ ├── app.py FastAPI: image captcha + /api/route Yandex proxy │ ├── app.py FastAPI: image captcha + /api/route Yandex proxy
@@ -195,7 +200,9 @@ adolf/
| Service | Host Port | Role | | Service | Host Port | Role |
|---------|-----------|------| |---------|-----------|------|
| Ollama GPU | 11436 | All LLM inference (via Bifrost) + VRAM management (direct) + memory extraction | | LiteLLM | 4000 | LLM proxy — all inference goes through here (`LITELLM_URL` env var) |
| Ollama GPU | 11436 | GPU inference backend + VRAM management (direct) + memory extraction |
| Ollama CPU | 11435 | nomic-embed-text embeddings for openmemory | | Ollama CPU | 11435 | nomic-embed-text embeddings for openmemory |
| Langfuse | 3200 | LLM observability — traces all requests via LiteLLM callbacks |
| Qdrant | 6333 | Vector store for memories | | Qdrant | 6333 | Vector store for memories |
| SearXNG | 11437 | Web search (used by `web_search` tool) | | SearXNG | 11437 | Web search (used by `web_search` tool) |

380
agent.py
View File

@@ -1,7 +1,9 @@
import asyncio import asyncio
import json as _json_module
import os import os
import time import time
from contextlib import asynccontextmanager from contextlib import asynccontextmanager, nullcontext
from pathlib import Path
from fastapi import FastAPI, BackgroundTasks, Request from fastapi import FastAPI, BackgroundTasks, Request
from fastapi.responses import JSONResponse, StreamingResponse from fastapi.responses import JSONResponse, StreamingResponse
@@ -16,6 +18,7 @@ _URL_RE = _re.compile(r'https?://[^\s<>"\']+')
def _extract_urls(text: str) -> list[str]: def _extract_urls(text: str) -> list[str]:
return _URL_RE.findall(text) return _URL_RE.findall(text)
from openai import AsyncOpenAI
from langchain_openai import ChatOpenAI from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_community.utilities import SearxSearchWrapper from langchain_community.utilities import SearxSearchWrapper
@@ -27,8 +30,9 @@ from agent_factory import build_medium_agent, build_complex_agent
from fast_tools import FastToolRunner, WeatherTool, CommuteTool from fast_tools import FastToolRunner, WeatherTool, CommuteTool
import channels import channels
# Bifrost gateway — all LLM inference goes through here # LiteLLM proxy — all LLM inference goes through here
BIFROST_URL = os.getenv("BIFROST_URL", "http://bifrost:8080/v1") LITELLM_URL = os.getenv("LITELLM_URL", "http://host.docker.internal:4000/v1")
LITELLM_API_KEY = os.getenv("LITELLM_API_KEY", "dummy")
# Direct Ollama URL — used only by VRAMManager for flush/prewarm/poll # Direct Ollama URL — used only by VRAMManager for flush/prewarm/poll
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
@@ -44,6 +48,45 @@ ROUTECHECK_TOKEN = os.getenv("ROUTECHECK_TOKEN", "")
MAX_HISTORY_TURNS = 5 MAX_HISTORY_TURNS = 5
_conversation_buffers: dict[str, list] = {} _conversation_buffers: dict[str, list] = {}
# ── Interaction logging (RLHF data collection) ─────────────────────────────────
_LOG_DIR = Path(os.getenv("ADOLF_LOG_DIR", "/app/logs"))
_INTERACTIONS_LOG = _LOG_DIR / "interactions.jsonl"
def _ensure_log_dir() -> None:
try:
_LOG_DIR.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"[log] cannot create log dir {_LOG_DIR}: {e}", flush=True)
async def _log_interaction(
session_id: str,
channel: str,
tier: str,
input_text: str,
response_text: str | None,
latency_ms: int,
metadata: dict | None = None,
) -> None:
"""Append one interaction record to the JSONL log for future RLHF/finetuning."""
record = {
"ts": time.time(),
"session_id": session_id,
"channel": channel,
"tier": tier,
"input": input_text,
"output": response_text or "",
"latency_ms": latency_ms,
}
if metadata:
record["metadata"] = metadata
try:
_ensure_log_dir()
with open(_INTERACTIONS_LOG, "a", encoding="utf-8") as f:
f.write(_json_module.dumps(record, ensure_ascii=False) + "\n")
except Exception as e:
print(f"[log] write error: {e}", flush=True)
# Per-session streaming queues — filled during inference, read by /stream/{session_id} # Per-session streaming queues — filled during inference, read by /stream/{session_id}
_stream_queues: dict[str, asyncio.Queue] = {} _stream_queues: dict[str, asyncio.Queue] = {}
@@ -123,7 +166,7 @@ _memory_search_tool = None
# Fast tools run before the LLM — classifier + context enricher # Fast tools run before the LLM — classifier + context enricher
_fast_tool_runner = FastToolRunner([ _fast_tool_runner = FastToolRunner([
WeatherTool(searxng_url=SEARXNG_URL), WeatherTool(),
CommuteTool(routecheck_url=ROUTECHECK_URL, internal_token=ROUTECHECK_TOKEN), CommuteTool(routecheck_url=ROUTECHECK_URL, internal_token=ROUTECHECK_TOKEN),
]) ])
@@ -140,31 +183,30 @@ async def lifespan(app: FastAPI):
channels.register_defaults() channels.register_defaults()
# All three models route through Bifrost → Ollama GPU. # All three models route through Bifrost → Ollama GPU.
# Bifrost adds retry logic, observability, and failover.
# Model names use provider/model format: Bifrost strips the "ollama/" prefix
# before forwarding to Ollama's /v1/chat/completions endpoint.
router_model = ChatOpenAI( router_model = ChatOpenAI(
model=f"ollama/{ROUTER_MODEL}", model=f"ollama/{ROUTER_MODEL}",
base_url=BIFROST_URL, base_url=LITELLM_URL,
api_key="dummy", api_key=LITELLM_API_KEY,
temperature=0, temperature=0,
timeout=30, timeout=30,
) )
embedder = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_API_KEY)
medium_model = ChatOpenAI( medium_model = ChatOpenAI(
model=f"ollama/{MEDIUM_MODEL}", model=f"ollama/{MEDIUM_MODEL}",
base_url=BIFROST_URL, base_url=LITELLM_URL,
api_key="dummy", api_key=LITELLM_API_KEY,
timeout=180, timeout=180,
) )
complex_model = ChatOpenAI( complex_model = ChatOpenAI(
model=f"ollama/{COMPLEX_MODEL}", model=COMPLEX_MODEL, # full model name — may be remote (OpenRouter) or local ollama/*
base_url=BIFROST_URL, base_url=LITELLM_URL,
api_key="dummy", api_key=LITELLM_API_KEY,
timeout=600, timeout=600,
) )
vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL) vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL)
router = Router(model=router_model, fast_tool_runner=_fast_tool_runner) router = Router(model=router_model, embedder=embedder, fast_tool_runner=_fast_tool_runner)
await router.initialize()
mcp_connections = { mcp_connections = {
"openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"}, "openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"},
@@ -279,8 +321,8 @@ async def lifespan(app: FastAPI):
) )
print( print(
f"[agent] bifrost={BIFROST_URL} | router=ollama/{ROUTER_MODEL} | " f"[agent] litellm={LITELLM_URL} | router=semantic(ollama/{ROUTER_MODEL}+nomic-embed-text) | "
f"medium=ollama/{MEDIUM_MODEL} | complex=ollama/{COMPLEX_MODEL}", f"medium=ollama/{MEDIUM_MODEL} | complex={COMPLEX_MODEL}",
flush=True, flush=True,
) )
print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True) print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True)
@@ -346,6 +388,12 @@ def _log_messages(result):
# ── memory helpers ───────────────────────────────────────────────────────────── # ── memory helpers ─────────────────────────────────────────────────────────────
def _resolve_user_id(session_id: str) -> str:
"""Map any session_id to a canonical user identity for openmemory.
All channels share the same memory pool for the single user."""
return "alvis"
async def _store_memory(session_id: str, user_msg: str, assistant_reply: str) -> None: async def _store_memory(session_id: str, user_msg: str, assistant_reply: str) -> None:
"""Store a conversation turn in openmemory (runs as a background task).""" """Store a conversation turn in openmemory (runs as a background task)."""
if _memory_add_tool is None: if _memory_add_tool is None:
@@ -353,7 +401,8 @@ async def _store_memory(session_id: str, user_msg: str, assistant_reply: str) ->
t0 = time.monotonic() t0 = time.monotonic()
try: try:
text = f"User: {user_msg}\nAssistant: {assistant_reply}" text = f"User: {user_msg}\nAssistant: {assistant_reply}"
await _memory_add_tool.ainvoke({"text": text, "user_id": session_id}) user_id = _resolve_user_id(session_id)
await _memory_add_tool.ainvoke({"text": text, "user_id": user_id})
print(f"[memory] stored in {time.monotonic() - t0:.1f}s", flush=True) print(f"[memory] stored in {time.monotonic() - t0:.1f}s", flush=True)
except Exception as e: except Exception as e:
print(f"[memory] error: {e}", flush=True) print(f"[memory] error: {e}", flush=True)
@@ -364,7 +413,8 @@ async def _retrieve_memories(message: str, session_id: str) -> str:
if _memory_search_tool is None: if _memory_search_tool is None:
return "" return ""
try: try:
result = await _memory_search_tool.ainvoke({"query": message, "user_id": session_id}) user_id = _resolve_user_id(session_id)
result = await _memory_search_tool.ainvoke({"query": message, "user_id": user_id})
if result and result.strip() and result.strip() != "[]": if result and result.strip() and result.strip() != "[]":
return f"Relevant memories:\n{result}" return f"Relevant memories:\n{result}"
except Exception: except Exception:
@@ -372,36 +422,46 @@ async def _retrieve_memories(message: str, session_id: str) -> str:
return "" return ""
# ── core task ────────────────────────────────────────────────────────────────── # ── core pipeline ──────────────────────────────────────────────────────────────
async def run_agent_task(message: str, session_id: str, channel: str = "telegram"): from typing import AsyncGenerator
print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True)
force_complex = False async def _run_agent_pipeline(
clean_message = message message: str,
if message.startswith("/think "): history: list[dict],
force_complex = True session_id: str,
clean_message = message[len("/think "):] tier_override: str | None = None,
print("[agent] /think prefix → force_complex=True", flush=True) no_inference: bool = False,
tier_capture: list | None = None,
) -> AsyncGenerator[str, None]:
"""Core pipeline: pre-flight → routing → inference. Yields text chunks.
async with _reply_semaphore: tier_override: "light" | "medium" | "complex" | None (auto-route)
no_inference: if True, routing decision is still made but inference is skipped — yields "I don't know" immediately
Caller is responsible for scheduling _store_memory after consuming all chunks.
"""
async with (nullcontext() if no_inference else _reply_semaphore):
t0 = time.monotonic() t0 = time.monotonic()
history = _conversation_buffers.get(session_id, []) clean_message = message
print(f"[agent] running: {clean_message[:80]!r}", flush=True) print(f"[agent] running: {clean_message[:80]!r}", flush=True)
# Fetch URL content, memories, and fast-tool context concurrently — all IO-bound # Fetch URL content, memories, and fast-tool context concurrently
# Skip preflight IO in no_inference mode — only routing decision needed
if no_inference:
url_context = memories = fast_context = None
else:
url_context, memories, fast_context = await asyncio.gather( url_context, memories, fast_context = await asyncio.gather(
_fetch_urls_from_message(clean_message), _fetch_urls_from_message(clean_message),
_retrieve_memories(clean_message, session_id), _retrieve_memories(clean_message, session_id),
_fast_tool_runner.run_matching(clean_message), _fast_tool_runner.run_matching(clean_message),
) )
if url_context: if url_context:
print(f"[agent] crawl4ai: {len(url_context)} chars fetched from message URLs", flush=True) print(f"[agent] crawl4ai: {len(url_context)} chars fetched", flush=True)
if fast_context: if fast_context:
names = _fast_tool_runner.matching_names(clean_message) names = _fast_tool_runner.matching_names(clean_message)
print(f"[agent] fast_tools={names}: {len(fast_context)} chars injected", flush=True) print(f"[agent] fast_tools={names}: {len(fast_context)} chars injected", flush=True)
# Build enriched history: memories + url_context + fast_context for ALL tiers # Build enriched history
enriched_history = list(history) enriched_history = list(history)
if url_context: if url_context:
enriched_history = [{"role": "system", "content": url_context}] + enriched_history enriched_history = [{"role": "system", "content": url_context}] + enriched_history
@@ -410,34 +470,57 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
if memories: if memories:
enriched_history = [{"role": "system", "content": memories}] + enriched_history enriched_history = [{"role": "system", "content": memories}] + enriched_history
tier, light_reply = await router.route(clean_message, enriched_history, force_complex) final_text = None
llm_elapsed = 0.0
# Messages with URL content must be handled by at least medium tier try:
# Short-circuit: fast tool already has the answer
if fast_context and tier_override is None and not url_context and not no_inference:
tier = "fast"
final_text = fast_context
llm_elapsed = time.monotonic() - t0
names = _fast_tool_runner.matching_names(clean_message)
print(f"[agent] tier=fast tools={names} — delivering directly", flush=True)
yield final_text
else:
# Determine tier
if tier_override in ("light", "medium", "complex"):
tier = tier_override
light_reply = None
if tier_override == "light":
tier, light_reply = await router.route(clean_message, enriched_history, no_inference=no_inference)
tier = "light"
else:
tier, light_reply = await router.route(clean_message, enriched_history, no_inference=no_inference)
if url_context and tier == "light": if url_context and tier == "light":
tier = "medium" tier = "medium"
light_reply = None light_reply = None
print("[agent] URL in message → upgraded light→medium", flush=True) print("[agent] URL in message → upgraded light→medium", flush=True)
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
final_text = None print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
try: if tier_capture is not None:
tier_capture.append(tier)
if no_inference:
yield "I don't know"
return
if tier == "light": if tier == "light":
final_text = light_reply final_text = light_reply
llm_elapsed = time.monotonic() - t0 llm_elapsed = time.monotonic() - t0
print(f"[agent] light path: answered by router", flush=True) print("[agent] light path: answered by router", flush=True)
await _push_stream_chunk(session_id, final_text) yield final_text
await _end_stream(session_id)
elif tier == "medium": elif tier == "medium":
system_prompt = MEDIUM_SYSTEM_PROMPT system_prompt = MEDIUM_SYSTEM_PROMPT
if memories: if memories:
system_prompt = system_prompt + "\n\n" + memories system_prompt += "\n\n" + memories
if url_context: if url_context:
system_prompt = system_prompt + "\n\n" + url_context system_prompt += "\n\n" + url_context
if fast_context: if fast_context:
system_prompt = system_prompt + "\n\nLive web search results (use these to answer):\n\n" + fast_context system_prompt += "\n\nLive web search results (use these to answer):\n\n" + fast_context
# Stream tokens directly — filter out qwen3 <think> blocks
in_think = False in_think = False
response_parts = [] response_parts = []
async for chunk in medium_model.astream([ async for chunk in medium_model.astream([
@@ -453,44 +536,26 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
in_think = False in_think = False
after = token.split("</think>", 1)[1] after = token.split("</think>", 1)[1]
if after: if after:
await _push_stream_chunk(session_id, after) yield after
response_parts.append(after) response_parts.append(after)
else: else:
if "<think>" in token: if "<think>" in token:
in_think = True in_think = True
before = token.split("<think>", 1)[0] before = token.split("<think>", 1)[0]
if before: if before:
await _push_stream_chunk(session_id, before) yield before
response_parts.append(before) response_parts.append(before)
else: else:
await _push_stream_chunk(session_id, token) yield token
response_parts.append(token) response_parts.append(token)
await _end_stream(session_id)
llm_elapsed = time.monotonic() - t0 llm_elapsed = time.monotonic() - t0
final_text = "".join(response_parts).strip() or None final_text = "".join(response_parts).strip() or None
else: # complex else: # complex — remote model, no VRAM management needed
ok = await vram_manager.enter_complex_mode()
if not ok:
print("[agent] complex→medium fallback (eviction timeout)", flush=True)
tier = "medium"
system_prompt = MEDIUM_SYSTEM_PROMPT
if memories:
system_prompt = system_prompt + "\n\n" + memories
if url_context:
system_prompt = system_prompt + "\n\n" + url_context
result = await medium_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
*history,
{"role": "user", "content": clean_message},
]
})
else:
system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id) system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id)
if url_context: if url_context:
system_prompt = system_prompt + "\n\n[Pre-fetched URL content from user's message:]\n" + url_context system_prompt += "\n\n[Pre-fetched URL content from user's message:]\n" + url_context
result = await complex_agent.ainvoke({ result = await complex_agent.ainvoke({
"messages": [ "messages": [
{"role": "system", "content": system_prompt}, {"role": "system", "content": system_prompt},
@@ -498,46 +563,90 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
{"role": "user", "content": clean_message}, {"role": "user", "content": clean_message},
] ]
}) })
asyncio.create_task(vram_manager.exit_complex_mode())
llm_elapsed = time.monotonic() - t0 llm_elapsed = time.monotonic() - t0
_log_messages(result) _log_messages(result)
final_text = _extract_final_text(result) final_text = _extract_final_text(result)
if final_text: if final_text:
await _push_stream_chunk(session_id, final_text) yield final_text
await _end_stream(session_id)
except Exception as e: except Exception as e:
import traceback import traceback
llm_elapsed = time.monotonic() - t0 llm_elapsed = time.monotonic() - t0
print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True) print(f"[agent] error after {llm_elapsed:.1f}s for {session_id}: {e}", flush=True)
traceback.print_exc() traceback.print_exc()
await _end_stream(session_id)
# Deliver reply through the originating channel print(f"[agent] pipeline done in {time.monotonic() - t0:.1f}s tier={tier if 'tier' in dir() else '?'}", flush=True)
# Store memory as side-effect (non-blocking, best-effort)
if final_text: if final_text:
t1 = time.monotonic() asyncio.create_task(_store_memory(session_id, clean_message, final_text))
# ── core task (Telegram / Matrix / CLI wrapper) ─────────────────────────────────
async def run_agent_task(
message: str,
session_id: str,
channel: str = "telegram",
metadata: dict | None = None,
):
print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True)
t0 = time.monotonic()
meta = metadata or {}
no_inference = bool(meta.get("no_inference", False))
is_benchmark = bool(meta.get("benchmark", False))
history = _conversation_buffers.get(session_id, [])
final_text = None
actual_tier = "unknown"
tier_capture: list = []
async for chunk in _run_agent_pipeline(message, history, session_id, no_inference=no_inference, tier_capture=tier_capture):
await _push_stream_chunk(session_id, chunk)
if final_text is None:
final_text = chunk
else:
final_text += chunk
await _end_stream(session_id)
actual_tier = tier_capture[0] if tier_capture else "unknown"
elapsed_ms = int((time.monotonic() - t0) * 1000)
if final_text:
final_text = final_text.strip()
# Skip channel delivery for benchmark sessions (no Telegram spam)
if not is_benchmark:
try: try:
await channels.deliver(session_id, channel, final_text) await channels.deliver(session_id, channel, final_text)
except Exception as e: except Exception as e:
print(f"[agent] delivery error (non-fatal): {e}", flush=True) print(f"[agent] delivery error (non-fatal): {e}", flush=True)
send_elapsed = time.monotonic() - t1
print(
f"[agent] replied in {time.monotonic() - t0:.1f}s "
f"(llm={llm_elapsed:.1f}s, send={send_elapsed:.1f}s) tier={tier}",
flush=True,
)
print(f"[agent] reply_text: {final_text}", flush=True)
else:
print("[agent] warning: no text reply from agent", flush=True)
# Update conversation buffer and schedule memory storage print(f"[agent] replied in {elapsed_ms / 1000:.1f}s tier={actual_tier}", flush=True)
if final_text: print(f"[agent] reply_text: {final_text}", flush=True)
# Update conversation buffer
buf = _conversation_buffers.get(session_id, []) buf = _conversation_buffers.get(session_id, [])
buf.append({"role": "user", "content": clean_message}) buf.append({"role": "user", "content": message})
buf.append({"role": "assistant", "content": final_text}) buf.append({"role": "assistant", "content": final_text})
_conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):] _conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):]
asyncio.create_task(_store_memory(session_id, clean_message, final_text))
# Log interaction for RLHF data collection (skip benchmark sessions to avoid noise)
if not is_benchmark:
asyncio.create_task(_log_interaction(
session_id=session_id,
channel=channel,
tier=actual_tier,
input_text=message,
response_text=final_text,
latency_ms=elapsed_ms,
metadata=meta if meta else None,
))
else:
print("[agent] warning: no text reply from agent", flush=True)
# ── endpoints ────────────────────────────────────────────────────────────────── # ── endpoints ──────────────────────────────────────────────────────────────────
@@ -549,7 +658,7 @@ async def message(request: InboundMessage, background_tasks: BackgroundTasks):
return JSONResponse(status_code=503, content={"error": "Agent not ready"}) return JSONResponse(status_code=503, content={"error": "Agent not ready"})
session_id = request.session_id session_id = request.session_id
channel = request.channel channel = request.channel
background_tasks.add_task(run_agent_task, request.text, session_id, channel) background_tasks.add_task(run_agent_task, request.text, session_id, channel, request.metadata)
return JSONResponse(status_code=202, content={"status": "accepted"}) return JSONResponse(status_code=202, content={"status": "accepted"})
@@ -611,3 +720,96 @@ async def stream_reply(session_id: str):
@app.get("/health") @app.get("/health")
async def health(): async def health():
return {"status": "ok", "agent_ready": medium_agent is not None} return {"status": "ok", "agent_ready": medium_agent is not None}
# ── OpenAI-compatible API (for OpenWebUI and other clients) ────────────────────
_TIER_MAP = {
"adolf": None,
"adolf-light": "light",
"adolf-medium": "medium",
"adolf-deep": "complex",
}
@app.get("/v1/models")
async def list_models():
return {
"object": "list",
"data": [
{"id": "adolf", "object": "model", "owned_by": "adolf"},
{"id": "adolf-light", "object": "model", "owned_by": "adolf"},
{"id": "adolf-medium", "object": "model", "owned_by": "adolf"},
{"id": "adolf-deep", "object": "model", "owned_by": "adolf"},
],
}
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
if medium_agent is None:
return JSONResponse(status_code=503, content={"error": {"message": "Agent not ready", "type": "server_error"}})
body = await request.json()
model = body.get("model", "adolf")
messages = body.get("messages", [])
stream = body.get("stream", True)
# Extract current user message and history
user_messages = [m for m in messages if m.get("role") == "user"]
if not user_messages:
return JSONResponse(status_code=400, content={"error": {"message": "No user message", "type": "invalid_request_error"}})
current_message = user_messages[-1]["content"]
# History = everything before the last user message (excluding system messages from OpenWebUI)
last_user_idx = len(messages) - 1 - next(
i for i, m in enumerate(reversed(messages)) if m.get("role") == "user"
)
history = [m for m in messages[:last_user_idx] if m.get("role") in ("user", "assistant")]
session_id = request.headers.get("X-Session-Id", "owui-default")
tier_override = _TIER_MAP.get(model)
import json as _json
import uuid as _uuid
response_id = f"chatcmpl-{_uuid.uuid4().hex[:12]}"
if stream:
async def event_stream():
# Opening chunk with role
opening = {
"id": response_id, "object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}]
}
yield f"data: {_json.dumps(opening)}\n\n"
async for chunk in _run_agent_pipeline(current_message, history, session_id, tier_override):
data = {
"id": response_id, "object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": {"content": chunk}, "finish_reason": None}]
}
yield f"data: {_json.dumps(data)}\n\n"
# Final chunk
final = {
"id": response_id, "object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
}
yield f"data: {_json.dumps(final)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
else:
# Non-streaming: collect all chunks
parts = []
async for chunk in _run_agent_pipeline(current_message, history, session_id, tier_override):
if chunk:
parts.append(chunk)
full_text = "".join(parts).strip()
return {
"id": response_id, "object": "chat.completion",
"choices": [{"index": 0, "message": {"role": "assistant", "content": full_text}, "finish_reason": "stop"}],
"model": model,
}

137
benchmarks/benchmark.json Normal file
View File

@@ -0,0 +1,137 @@
{
"description": "Adolf routing benchmark — домашние сценарии, Alexa/Google-Home стиль, русский язык",
"tiers": {
"light": "Приветствия, прощания, подтверждения, простые разговорные фразы. Не требуют поиска или действий.",
"medium": "Управление домом, погода/пробки, таймеры, напоминания, покупки, личная память, быстрые вопросы.",
"complex": "Глубокое исследование, сравнение технологий, подробные руководства с несколькими источниками."
},
"queries": [
{"id": 1, "tier": "light", "category": "greetings", "query": "привет"},
{"id": 2, "tier": "light", "category": "greetings", "query": "пока"},
{"id": 3, "tier": "light", "category": "greetings", "query": "спасибо"},
{"id": 4, "tier": "light", "category": "greetings", "query": "привет, как дела?"},
{"id": 5, "tier": "light", "category": "greetings", "query": "окей"},
{"id": 6, "tier": "light", "category": "greetings", "query": "добрый вечер"},
{"id": 7, "tier": "light", "category": "greetings", "query": "доброе утро"},
{"id": 8, "tier": "light", "category": "greetings", "query": "добрый день"},
{"id": 9, "tier": "light", "category": "greetings", "query": "hi"},
{"id": 10, "tier": "light", "category": "greetings", "query": "thanks"},
{"id": 11, "tier": "light", "category": "greetings", "query": "отлично, спасибо"},
{"id": 12, "tier": "light", "category": "greetings", "query": "понятно"},
{"id": 13, "tier": "light", "category": "greetings", "query": "ясно"},
{"id": 14, "tier": "light", "category": "greetings", "query": "ладно"},
{"id": 15, "tier": "light", "category": "greetings", "query": "договорились"},
{"id": 16, "tier": "light", "category": "greetings", "query": "good morning"},
{"id": 17, "tier": "light", "category": "greetings", "query": "good night"},
{"id": 18, "tier": "light", "category": "greetings", "query": "всё понятно"},
{"id": 19, "tier": "light", "category": "greetings", "query": "да"},
{"id": 20, "tier": "light", "category": "greetings", "query": "нет"},
{"id": 21, "tier": "light", "category": "greetings", "query": "не нужно"},
{"id": 22, "tier": "light", "category": "greetings", "query": "отмена"},
{"id": 23, "tier": "light", "category": "greetings", "query": "стоп"},
{"id": 24, "tier": "light", "category": "greetings", "query": "подожди"},
{"id": 25, "tier": "light", "category": "greetings", "query": "повтори"},
{"id": 26, "tier": "light", "category": "greetings", "query": "ты тут?"},
{"id": 27, "tier": "light", "category": "greetings", "query": "слышишь меня?"},
{"id": 28, "tier": "light", "category": "greetings", "query": "всё ок"},
{"id": 29, "tier": "light", "category": "greetings", "query": "хорошо"},
{"id": 30, "tier": "light", "category": "greetings", "query": "пожалуйста"},
{"id": 31, "tier": "medium", "category": "weather_commute", "query": "какая сегодня погода в Балашихе"},
{"id": 32, "tier": "medium", "category": "weather_commute", "query": "пойдет ли сегодня дождь"},
{"id": 33, "tier": "medium", "category": "weather_commute", "query": "какая температура на улице сейчас"},
{"id": 34, "tier": "medium", "category": "weather_commute", "query": "будет ли снег сегодня"},
{"id": 35, "tier": "medium", "category": "weather_commute", "query": "погода на завтра"},
{"id": 36, "tier": "medium", "category": "weather_commute", "query": "сколько ехать до Москвы сейчас"},
{"id": 37, "tier": "medium", "category": "weather_commute", "query": "какие пробки на дороге до Москвы"},
{"id": 38, "tier": "medium", "category": "weather_commute", "query": "время в пути на работу"},
{"id": 39, "tier": "medium", "category": "weather_commute", "query": "есть ли пробки сейчас"},
{"id": 40, "tier": "medium", "category": "weather_commute", "query": "стоит ли брать зонтик"},
{"id": 41, "tier": "medium", "category": "smart_home_control", "query": "включи свет в гостиной"},
{"id": 42, "tier": "medium", "category": "smart_home_control", "query": "выключи свет на кухне"},
{"id": 43, "tier": "medium", "category": "smart_home_control", "query": "какая температура дома"},
{"id": 44, "tier": "medium", "category": "smart_home_control", "query": "установи температуру 22 градуса"},
{"id": 45, "tier": "medium", "category": "smart_home_control", "query": "включи свет в спальне на 50 процентов"},
{"id": 46, "tier": "medium", "category": "smart_home_control", "query": "выключи все лампочки"},
{"id": 47, "tier": "medium", "category": "smart_home_control", "query": "какие устройства сейчас включены"},
{"id": 48, "tier": "medium", "category": "smart_home_control", "query": "закрыты ли все окна"},
{"id": 49, "tier": "medium", "category": "smart_home_control", "query": "включи вентилятор в детской"},
{"id": 50, "tier": "medium", "category": "smart_home_control", "query": "есть ли кто-нибудь дома"},
{"id": 51, "tier": "medium", "category": "smart_home_control", "query": "включи ночной режим"},
{"id": 52, "tier": "medium", "category": "smart_home_control", "query": "какое потребление электричества сегодня"},
{"id": 53, "tier": "medium", "category": "smart_home_control", "query": "выключи телевизор"},
{"id": 54, "tier": "medium", "category": "smart_home_control", "query": "открой шторы в гостиной"},
{"id": 55, "tier": "medium", "category": "smart_home_control", "query": "установи будильник на 7 утра"},
{"id": 56, "tier": "medium", "category": "smart_home_control", "query": "включи кофемашину"},
{"id": 57, "tier": "medium", "category": "smart_home_control", "query": "выключи свет во всём доме"},
{"id": 58, "tier": "medium", "category": "smart_home_control", "query": "сколько у нас датчиков движения"},
{"id": 59, "tier": "medium", "category": "smart_home_control", "query": "состояние всех дверных замков"},
{"id": 60, "tier": "medium", "category": "smart_home_control", "query": "включи режим кино в гостиной"},
{"id": 61, "tier": "medium", "category": "smart_home_control", "query": "прибавь яркость в детской"},
{"id": 62, "tier": "medium", "category": "smart_home_control", "query": "закрой все шторы"},
{"id": 63, "tier": "medium", "category": "smart_home_control", "query": "кто последний открывал входную дверь"},
{"id": 64, "tier": "medium", "category": "smart_home_control", "query": "заблокируй входную дверь"},
{"id": 65, "tier": "medium", "category": "smart_home_control", "query": "покажи камеру у входа"},
{"id": 66, "tier": "medium", "category": "timers_reminders", "query": "поставь таймер на 10 минут"},
{"id": 67, "tier": "medium", "category": "timers_reminders", "query": "напомни мне позвонить врачу в 15:00"},
{"id": 68, "tier": "medium", "category": "timers_reminders", "query": "поставь будильник на завтра в 6:30"},
{"id": 69, "tier": "medium", "category": "timers_reminders", "query": "напомни выключить плиту через 20 минут"},
{"id": 70, "tier": "medium", "category": "timers_reminders", "query": "сколько времени осталось на таймере"},
{"id": 71, "tier": "medium", "category": "shopping_cooking", "query": "добавь молоко в список покупок"},
{"id": 72, "tier": "medium", "category": "shopping_cooking", "query": "что есть в списке покупок"},
{"id": 73, "tier": "medium", "category": "shopping_cooking", "query": "добавь хлеб и яйца в список покупок"},
{"id": 74, "tier": "medium", "category": "shopping_cooking", "query": "сколько граммов муки нужно для блинов на 4 человека"},
{"id": 75, "tier": "medium", "category": "shopping_cooking", "query": "какой рецепт борща ты знаешь"},
{"id": 76, "tier": "medium", "category": "personal_memory", "query": "как меня зовут"},
{"id": 77, "tier": "medium", "category": "personal_memory", "query": "где я живу"},
{"id": 78, "tier": "medium", "category": "personal_memory", "query": "что мы обсуждали в прошлый раз"},
{"id": 79, "tier": "medium", "category": "personal_memory", "query": "что ты знаешь о моем домашнем сервере"},
{"id": 80, "tier": "medium", "category": "personal_memory", "query": "напомни, какие сервисы я запускаю"},
{"id": 81, "tier": "medium", "category": "personal_memory", "query": "что я говорил о своей сети"},
{"id": 82, "tier": "medium", "category": "personal_memory", "query": "что я просил тебя запомнить"},
{"id": 83, "tier": "medium", "category": "quick_info", "query": "какой сейчас курс биткоина"},
{"id": 84, "tier": "medium", "category": "quick_info", "query": "курс доллара к рублю сейчас"},
{"id": 85, "tier": "medium", "category": "quick_info", "query": "есть ли проблемы у Cloudflare сегодня"},
{"id": 86, "tier": "medium", "category": "quick_info", "query": "какая последняя версия Docker"},
{"id": 87, "tier": "medium", "category": "quick_info", "query": "какие новые функции в Home Assistant 2024"},
{"id": 88, "tier": "medium", "category": "quick_info", "query": "как проверить использование диска в Linux"},
{"id": 89, "tier": "medium", "category": "quick_info", "query": "как перезапустить Docker контейнер"},
{"id": 90, "tier": "medium", "category": "quick_info", "query": "как посмотреть логи Docker контейнера"},
{"id": 91, "tier": "complex", "category": "infrastructure", "query": "исследуй и сравни Proxmox, Unraid и TrueNAS для домашней лаборатории"},
{"id": 92, "tier": "complex", "category": "infrastructure", "query": "напиши подробное руководство по безопасности домашнего сервера, подключенного к интернету"},
{"id": 93, "tier": "complex", "category": "infrastructure", "query": "исследуй все доступные дашборды для самохостинга и сравни их функции"},
{"id": 94, "tier": "complex", "category": "infrastructure", "query": "исследуй лучший стек мониторинга для самохостинга в 2024 году со всеми вариантами"},
{"id": 95, "tier": "complex", "category": "infrastructure", "query": "сравни все системы резервного копирования для Linux: Restic, Borg, Duplicati, Timeshift"},
{"id": 96, "tier": "complex", "category": "infrastructure", "query": "напиши полное руководство по настройке обратного прокси Caddy для домашнего сервера с SSL"},
{"id": 97, "tier": "complex", "category": "network", "query": "исследуй и сравни WireGuard, OpenVPN и Tailscale для домашней VPN с детальными плюсами и минусами"},
{"id": 98, "tier": "complex", "category": "network", "query": "исследуй лучшие практики сегментации домашней сети с VLAN и правилами файрвола"},
{"id": 99, "tier": "complex", "category": "network", "query": "изучи все самохостируемые DNS решения и их возможности"},
{"id": 100, "tier": "complex", "category": "network", "query": "исследуй лучшие самохостируемые системы мониторинга сети: Zabbix, Grafana, Prometheus, Netdata"},
{"id": 101, "tier": "complex", "category": "home_assistant", "query": "исследуй и сравни все платформы умного дома: Home Assistant, OpenHAB и Domoticz"},
{"id": 102, "tier": "complex", "category": "home_assistant", "query": "изучи лучшие Zigbee координаторы и их совместимость с Home Assistant в 2024 году"},
{"id": 103, "tier": "complex", "category": "home_assistant", "query": "напиши детальный отчет о поддержке протокола Matter и совместимых устройствах"},
{"id": 104, "tier": "complex", "category": "home_assistant", "query": "исследуй все способы интеграции умных ламп с Home Assistant: Zigbee, WiFi, Bluetooth"},
{"id": 105, "tier": "complex", "category": "home_assistant", "query": "найди и сравни все варианты датчиков движения для умного дома с оценками и ценами"},
{"id": 106, "tier": "complex", "category": "home_assistant", "query": "напиши подробное руководство по настройке автоматизаций в Home Assistant для умного освещения"},
{"id": 107, "tier": "complex", "category": "home_assistant", "query": "исследуй все варианты голосового управления умным домом на русском языке, включая локальные решения"},
{"id": 108, "tier": "complex", "category": "home_assistant", "query": "исследуй все протоколы умного дома и их плюсы и минусы: Zigbee, Z-Wave, WiFi, Thread, Bluetooth"},
{"id": 109, "tier": "complex", "category": "media_files", "query": "исследуй и сравни все самохостируемые решения для хранения фотографий с детальным сравнением функций"},
{"id": 110, "tier": "complex", "category": "media_files", "query": "изучи лучшие самохостируемые медиасерверы: Jellyfin, Plex и Emby — с характеристиками и отзывами"},
{"id": 111, "tier": "complex", "category": "media_files", "query": "сравни все самохостируемые облачные хранилища: Nextcloud, Seafile, Owncloud — производительность и функции"},
{"id": 112, "tier": "complex", "category": "research", "query": "исследуй последние достижения в локальном LLM инференсе и оборудовании для него"},
{"id": 113, "tier": "complex", "category": "research", "query": "изучи лучшие опенсорс альтернативы Google сервисов для приватного домашнего окружения"},
{"id": 114, "tier": "complex", "category": "research", "query": "изучи все варианты локального запуска языковых моделей на видеокарте 8 ГБ VRAM"},
{"id": 115, "tier": "complex", "category": "research", "query": "найди и сравни все фреймворки для создания локальных AI ассистентов с открытым исходным кодом"},
{"id": 116, "tier": "complex", "category": "research", "query": "изучи все доступные локальные ассистенты с голосовым управлением на русском языке"},
{"id": 117, "tier": "complex", "category": "infrastructure", "query": "изучи свежие CVE и уязвимости в популярном самохостируемом ПО: Gitea, Nextcloud, Jellyfin"},
{"id": 118, "tier": "complex", "category": "infrastructure", "query": "напиши детальное сравнение систем управления конфигурацией: Ansible, Salt, Puppet для домашнего окружения"},
{"id": 119, "tier": "complex", "category": "network", "query": "исследуй все самохостируемые решения для блокировки рекламы: Pi-hole, AdGuard Home, NextDNS"},
{"id": 120, "tier": "complex", "category": "research", "query": "напиши подробный отчет о технологиях синтеза речи с открытым исходным кодом на русском языке"}
]
}

316
benchmarks/run_benchmark.py Normal file
View File

@@ -0,0 +1,316 @@
#!/usr/bin/env python3
"""
Adolf routing benchmark.
Sends each query to Adolf's /message endpoint, waits briefly for the routing
decision to appear in docker logs, then records the actual tier.
Usage:
python3 run_benchmark.py [options]
python3 run_benchmark.py --tier light|medium|complex
python3 run_benchmark.py --category <name>
python3 run_benchmark.py --ids 1,2,3
python3 run_benchmark.py --list-categories
python3 run_benchmark.py --no-inference # skip all LLM inference — routing decisions only, all tiers
IMPORTANT: Always check GPU is free before running. This script does it automatically.
Adolf must be running at http://localhost:8000.
"""
import argparse
import asyncio
import json
import re
import subprocess
import sys
import time
from pathlib import Path
import httpx
ADOLF_URL = "http://localhost:8000"
OLLAMA_URL = "http://localhost:11436" # GPU Ollama
DATASET = Path(__file__).parent / "benchmark.json"
RESULTS = Path(__file__).parent / "results_latest.json"
# Max time to wait for each query to fully complete via SSE stream
QUERY_TIMEOUT = 300 # seconds — generous to handle GPU semaphore waits
# Memory thresholds
MIN_FREE_RAM_MB = 1500 # abort if less than this is free
MIN_FREE_VRAM_MB = 500 # warn if less than this is free on GPU
# ── Pre-flight checks ──────────────────────────────────────────────────────────
def check_ram() -> tuple[bool, str]:
"""Check available system RAM. Returns (ok, message)."""
try:
with open("/proc/meminfo") as f:
info = {}
for line in f:
parts = line.split()
if len(parts) >= 2:
info[parts[0].rstrip(":")] = int(parts[1])
free_mb = (info.get("MemAvailable", 0)) // 1024
total_mb = info.get("MemTotal", 0) // 1024
msg = f"RAM: {free_mb} MB free / {total_mb} MB total"
if free_mb < MIN_FREE_RAM_MB:
return False, f"CRITICAL: {msg} — need at least {MIN_FREE_RAM_MB} MB free"
return True, msg
except Exception as e:
return True, f"RAM check failed (non-fatal): {e}"
def check_gpu() -> tuple[bool, str]:
"""Check GPU VRAM via Ollama /api/ps. Returns (ok, message)."""
try:
r = httpx.get(f"{OLLAMA_URL}/api/ps", timeout=5)
r.raise_for_status()
data = r.json()
models = data.get("models", [])
if models:
names = [m.get("name", "?") for m in models]
sizes_mb = [m.get("size_vram", 0) // (1024 * 1024) for m in models]
loaded = ", ".join(f"{n} ({s}MB)" for n, s in zip(names, sizes_mb))
total_vram = sum(sizes_mb)
if total_vram > 7000:
return False, f"GPU BUSY: models loaded = {loaded} — total VRAM used {total_vram}MB. Wait for models to unload."
return True, f"GPU: models loaded = {loaded} (total {total_vram}MB VRAM)"
return True, "GPU: idle (no models loaded)"
except httpx.ConnectError:
return True, "GPU check skipped (Ollama not reachable at localhost:11436)"
except Exception as e:
return True, f"GPU check failed (non-fatal): {e}"
def preflight_checks(skip_gpu_check: bool = False) -> bool:
"""Run all pre-flight checks. Returns True if safe to proceed."""
print("\n── Pre-flight checks ──────────────────────────────────────────")
ram_ok, ram_msg = check_ram()
print(f" {'' if ram_ok else ''} {ram_msg}")
if not ram_ok:
print("\nABORTING: not enough RAM. Free up memory before running benchmark.")
return False
if not skip_gpu_check:
gpu_ok, gpu_msg = check_gpu()
print(f" {'' if gpu_ok else ''} {gpu_msg}")
if not gpu_ok:
print("\nABORTING: GPU is busy. Wait for current inference to finish, then retry.")
return False
print(" All checks passed.\n")
return True
# ── Log helpers ────────────────────────────────────────────────────────────────
def get_log_tail(n: int = 50) -> str:
result = subprocess.run(
["docker", "logs", "deepagents", "--tail", str(n)],
capture_output=True, text=True,
)
return result.stdout + result.stderr
def extract_tier_from_logs(logs_before: str, logs_after: str) -> str | None:
"""Find new tier= lines that appeared after we sent the query."""
before_lines = set(logs_before.splitlines())
new_lines = [l for l in logs_after.splitlines() if l not in before_lines]
for line in new_lines:
m = re.search(r"tier=(\w+(?:\s*\(no-inference\))?)", line)
if m:
tier_raw = m.group(1)
# Normalise: "complex (no-inference)" → "complex"
return tier_raw.split()[0]
return None
# ── Request helpers ────────────────────────────────────────────────────────────
async def post_message(
client: httpx.AsyncClient,
query_id: int,
query: str,
no_inference: bool = False,
) -> bool:
payload = {
"text": query,
"session_id": f"benchmark-{query_id}",
"channel": "cli",
"user_id": "benchmark",
"metadata": {"no_inference": no_inference, "benchmark": True},
}
try:
r = await client.post(f"{ADOLF_URL}/message", json=payload, timeout=10)
r.raise_for_status()
return True
except Exception as e:
print(f" POST_ERROR: {e}", end="")
return False
# ── Dataset ────────────────────────────────────────────────────────────────────
def load_dataset() -> list[dict]:
with open(DATASET) as f:
return json.load(f)["queries"]
def filter_queries(queries, tier, category, ids):
if tier:
queries = [q for q in queries if q["tier"] == tier]
if category:
queries = [q for q in queries if q["category"] == category]
if ids:
queries = [q for q in queries if q["id"] in ids]
return queries
# ── Main run ───────────────────────────────────────────────────────────────────
async def run(queries: list[dict], no_inference: bool = False) -> list[dict]:
results = []
async with httpx.AsyncClient() as client:
try:
r = await client.get(f"{ADOLF_URL}/health", timeout=5)
r.raise_for_status()
except Exception as e:
print(f"ERROR: Adolf not reachable: {e}", file=sys.stderr)
sys.exit(1)
total = len(queries)
correct = 0
dry_label = " [NO-INFERENCE: routing only]" if no_inference else ""
print(f"\nRunning {total} queries{dry_label}\n")
print(f"{'ID':>3} {'EXPECTED':8} {'ACTUAL':8} {'OK':3} {'TIME':6} {'CATEGORY':22} QUERY")
print("" * 110)
for q in queries:
qid = q["id"]
expected = q["tier"]
category = q["category"]
query_text = q["query"]
session_id = f"benchmark-{qid}"
print(f"{qid:>3} {expected:8} ", end="", flush=True)
logs_before = get_log_tail(300)
t0 = time.monotonic()
ok_post = await post_message(client, qid, query_text, no_inference=no_inference)
if not ok_post:
print(f"{'?':8} {'ERR':3} {'?':6} {category:22} {query_text[:40]}")
results.append({"id": qid, "expected": expected, "actual": None, "ok": False})
continue
# Wait for query to complete via SSE stream (handles GPU semaphore waits)
try:
async with client.stream(
"GET", f"{ADOLF_URL}/stream/{session_id}", timeout=QUERY_TIMEOUT
) as sse:
async for line in sse.aiter_lines():
if "data: [DONE]" in line:
break
except Exception:
pass # timeout or connection issue — check logs anyway
# Now the query is done — check logs for tier
await asyncio.sleep(0.3)
logs_after = get_log_tail(300)
actual = extract_tier_from_logs(logs_before, logs_after)
elapsed = time.monotonic() - t0
match = actual == expected or (actual == "fast" and expected == "medium")
if match:
correct += 1
mark = "" if match else ""
actual_str = actual or "?"
print(f"{actual_str:8} {mark:3} {elapsed:5.1f}s {category:22} {query_text[:40]}")
results.append({
"id": qid,
"expected": expected,
"actual": actual_str,
"ok": match,
"elapsed": round(elapsed, 1),
"category": category,
"query": query_text,
"no_inference": no_inference,
})
print("" * 110)
accuracy = correct / total * 100 if total else 0
print(f"\nAccuracy: {correct}/{total} ({accuracy:.0f}%)")
for tier_name in ["light", "medium", "complex"]:
tier_qs = [r for r in results if r["expected"] == tier_name]
if tier_qs:
tier_ok = sum(1 for r in tier_qs if r["ok"])
print(f" {tier_name:8}: {tier_ok}/{len(tier_qs)}")
wrong = [r for r in results if not r["ok"]]
if wrong:
print(f"\nMisclassified ({len(wrong)}):")
for r in wrong:
print(f" id={r['id']:3} expected={r['expected']:8} actual={r['actual']:8} {r['query'][:60]}")
with open(RESULTS, "w") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\nResults saved to {RESULTS}")
return results
def main():
parser = argparse.ArgumentParser(
description="Adolf routing benchmark",
epilog="IMPORTANT: Always check GPU is free before running. This is done automatically."
)
parser.add_argument("--tier", choices=["light", "medium", "complex"])
parser.add_argument("--category")
parser.add_argument("--ids", help="Comma-separated IDs")
parser.add_argument("--list-categories", action="store_true")
parser.add_argument(
"--no-inference",
action="store_true",
help="Skip LLM inference for all tiers — only routing decisions are tested (no GPU/API cost)",
)
parser.add_argument(
"--skip-gpu-check",
action="store_true",
help="Skip GPU availability check (use only if you know GPU is free)",
)
args = parser.parse_args()
queries = load_dataset()
if args.list_categories:
cats = sorted(set(q["category"] for q in queries))
tiers = {t: sum(1 for q in queries if q["tier"] == t) for t in ["light", "medium", "complex"]}
print(f"Total: {len(queries)} | Tiers: {tiers}")
print(f"Categories: {cats}")
return
# ALWAYS check GPU and RAM before running
if not preflight_checks(skip_gpu_check=args.no_inference):
sys.exit(1)
ids = [int(i) for i in args.ids.split(",")] if args.ids else None
queries = filter_queries(queries, args.tier, args.category, ids)
if not queries:
print("No queries match filters.")
sys.exit(1)
asyncio.run(run(queries, no_inference=args.no_inference))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,218 @@
#!/usr/bin/env python3
"""
Adolf routing benchmark — tests routing decisions only, no LLM inference.
Sends each query with no_inference=True, waits for the routing decision to
appear in docker logs, and records whether the correct tier was selected.
Usage:
python3 run_routing_benchmark.py [options]
python3 run_routing_benchmark.py --tier light|medium|complex
python3 run_routing_benchmark.py --category <name>
python3 run_routing_benchmark.py --ids 1,2,3
python3 run_routing_benchmark.py --list-categories
No GPU check needed — inference is disabled for all queries.
Adolf must be running at http://localhost:8000.
"""
import argparse
import asyncio
import json
import re
import subprocess
import sys
import time
from pathlib import Path
import httpx
ADOLF_URL = "http://localhost:8000"
DATASET = Path(__file__).parent / "benchmark.json"
RESULTS = Path(__file__).parent / "routing_results_latest.json"
QUERY_TIMEOUT = 1 # 1s strict deadline — routing must decide within 1 second
# ── Log helpers ────────────────────────────────────────────────────────────────
def get_log_tail(n: int = 50) -> str:
result = subprocess.run(
["docker", "logs", "deepagents", "--tail", str(n)],
capture_output=True, text=True,
)
return result.stdout + result.stderr
def extract_tier_from_logs(logs_before: str, logs_after: str) -> str | None:
"""Find new tier= lines that appeared after we sent the query."""
before_lines = set(logs_before.splitlines())
new_lines = [line for line in logs_after.splitlines() if line not in before_lines]
for line in new_lines:
m = re.search(r"tier=(\w+(?:\s*\(no-inference\))?)", line)
if m:
tier_raw = m.group(1)
return tier_raw.split()[0]
return None
# ── Request helpers ────────────────────────────────────────────────────────────
async def post_message(client: httpx.AsyncClient, query_id: int, query: str) -> bool:
payload = {
"text": query,
"session_id": f"routing-bench-{query_id}",
"channel": "cli",
"user_id": "benchmark",
"metadata": {"no_inference": True, "benchmark": True},
}
try:
r = await client.post(f"{ADOLF_URL}/message", json=payload, timeout=10)
r.raise_for_status()
return True
except Exception as e:
print(f" POST_ERROR: {e}", end="")
return False
# ── Dataset ────────────────────────────────────────────────────────────────────
def load_dataset() -> list[dict]:
with open(DATASET) as f:
return json.load(f)["queries"]
def filter_queries(queries, tier, category, ids):
if tier:
queries = [q for q in queries if q["tier"] == tier]
if category:
queries = [q for q in queries if q["category"] == category]
if ids:
queries = [q for q in queries if q["id"] in ids]
return queries
# ── Main run ───────────────────────────────────────────────────────────────────
async def run(queries: list[dict]) -> list[dict]:
results = []
async with httpx.AsyncClient() as client:
try:
r = await client.get(f"{ADOLF_URL}/health", timeout=5)
r.raise_for_status()
except Exception as e:
print(f"ERROR: Adolf not reachable: {e}", file=sys.stderr)
sys.exit(1)
total = len(queries)
correct = 0
print(f"\nRunning {total} queries [NO-INFERENCE: routing only]\n")
print(f"{'ID':>3} {'EXPECTED':8} {'ACTUAL':8} {'OK':3} {'TIME':6} {'CATEGORY':22} QUERY")
print("" * 110)
for q in queries:
qid = q["id"]
expected = q["tier"]
category = q["category"]
query_text = q["query"]
session_id = f"routing-bench-{qid}"
print(f"{qid:>3} {expected:8} ", end="", flush=True)
logs_before = get_log_tail(300)
t0 = time.monotonic()
ok_post = await post_message(client, qid, query_text)
if not ok_post:
print(f"{'?':8} {'ERR':3} {'?':6} {category:22} {query_text[:40]}")
results.append({"id": qid, "expected": expected, "actual": None, "ok": False})
continue
try:
async with client.stream(
"GET", f"{ADOLF_URL}/stream/{session_id}", timeout=QUERY_TIMEOUT
) as sse:
async for line in sse.aiter_lines():
if "data: [DONE]" in line:
break
except Exception:
pass # timeout or connection issue — check logs anyway
logs_after = get_log_tail(300)
actual = extract_tier_from_logs(logs_before, logs_after)
if actual is None:
actual = "timeout"
elapsed = time.monotonic() - t0
match = actual == expected or (actual == "fast" and expected == "medium")
if match:
correct += 1
mark = "" if match else ""
actual_str = actual
print(f"{actual_str:8} {mark:3} {elapsed:5.1f}s {category:22} {query_text[:40]}")
results.append({
"id": qid,
"expected": expected,
"actual": actual_str,
"ok": match,
"elapsed": round(elapsed, 1),
"category": category,
"query": query_text,
})
print("" * 110)
accuracy = correct / total * 100 if total else 0
print(f"\nAccuracy: {correct}/{total} ({accuracy:.0f}%)")
for tier_name in ["light", "medium", "complex"]:
tier_qs = [r for r in results if r["expected"] == tier_name]
if tier_qs:
tier_ok = sum(1 for r in tier_qs if r["ok"])
print(f" {tier_name:8}: {tier_ok}/{len(tier_qs)}")
wrong = [r for r in results if not r["ok"]]
if wrong:
print(f"\nMisclassified ({len(wrong)}):")
for r in wrong:
print(f" id={r['id']:3} expected={r['expected']:8} actual={r['actual']:8} {r['query'][:60]}")
with open(RESULTS, "w") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\nResults saved to {RESULTS}")
return results
def main():
parser = argparse.ArgumentParser(
description="Adolf routing benchmark — routing decisions only, no LLM inference",
)
parser.add_argument("--tier", choices=["light", "medium", "complex"])
parser.add_argument("--category")
parser.add_argument("--ids", help="Comma-separated IDs")
parser.add_argument("--list-categories", action="store_true")
args = parser.parse_args()
queries = load_dataset()
if args.list_categories:
cats = sorted(set(q["category"] for q in queries))
tiers = {t: sum(1 for q in queries if q["tier"] == t) for t in ["light", "medium", "complex"]}
print(f"Total: {len(queries)} | Tiers: {tiers}")
print(f"Categories: {cats}")
return
ids = [int(i) for i in args.ids.split(",")] if args.ids else None
queries = filter_queries(queries, args.tier, args.category, ids)
if not queries:
print("No queries match filters.")
sys.exit(1)
asyncio.run(run(queries))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,425 @@
#!/usr/bin/env python3
"""
Adolf voice benchmark.
Pipeline for each query:
1. Synthesize query text → WAV via Silero TTS (localhost:8881)
2. Transcribe WAV → text via faster-whisper STT (localhost:8880)
3. Send transcription to Adolf → check routing tier
4. Report: WER per query, routing accuracy vs text baseline
Usage:
python3 run_voice_benchmark.py [options]
python3 run_voice_benchmark.py --tier light|medium|complex
python3 run_voice_benchmark.py --ids 1,2,3
python3 run_voice_benchmark.py --no-inference # skip LLM inference — routing only, all tiers
IMPORTANT: Always check GPU is free before running. Done automatically.
Services required:
- Adolf: http://localhost:8000
- Silero TTS: http://localhost:8881 (openai/silero-tts container)
- faster-whisper: http://localhost:8880 (faster-whisper container)
"""
import argparse
import asyncio
import io
import json
import re
import subprocess
import sys
import tempfile
import time
import unicodedata
from pathlib import Path
import httpx
ADOLF_URL = "http://localhost:8000"
OLLAMA_URL = "http://localhost:11436"
TTS_URL = "http://localhost:8881" # Silero TTS — OpenAI-compatible /v1/audio/speech
STT_URL = "http://localhost:8880" # faster-whisper — OpenAI-compatible /v1/audio/transcriptions
DATASET = Path(__file__).parent / "benchmark.json"
RESULTS_DIR = Path(__file__).parent
TIER_WAIT = 15 # seconds to wait for tier= in docker logs
MIN_FREE_RAM_MB = 1500
MIN_FREE_VRAM_MB = 500
# ── Pre-flight ─────────────────────────────────────────────────────────────────
def check_ram() -> tuple[bool, str]:
try:
with open("/proc/meminfo") as f:
info = {}
for line in f:
parts = line.split()
if len(parts) >= 2:
info[parts[0].rstrip(":")] = int(parts[1])
free_mb = info.get("MemAvailable", 0) // 1024
total_mb = info.get("MemTotal", 0) // 1024
msg = f"RAM: {free_mb} MB free / {total_mb} MB total"
if free_mb < MIN_FREE_RAM_MB:
return False, f"CRITICAL: {msg} — need at least {MIN_FREE_RAM_MB} MB free"
return True, msg
except Exception as e:
return True, f"RAM check failed (non-fatal): {e}"
def check_gpu() -> tuple[bool, str]:
try:
r = httpx.get(f"{OLLAMA_URL}/api/ps", timeout=5)
r.raise_for_status()
data = r.json()
models = data.get("models", [])
if models:
names = [m.get("name", "?") for m in models]
sizes_mb = [m.get("size_vram", 0) // (1024 * 1024) for m in models]
loaded = ", ".join(f"{n} ({s}MB)" for n, s in zip(names, sizes_mb))
total_vram = sum(sizes_mb)
if total_vram > 7000:
return False, f"GPU BUSY: {loaded}{total_vram}MB VRAM used. Wait for models to unload."
return True, f"GPU: {loaded} ({total_vram}MB VRAM)"
return True, "GPU: idle"
except httpx.ConnectError:
return True, "GPU check skipped (Ollama not reachable)"
except Exception as e:
return True, f"GPU check failed (non-fatal): {e}"
def check_services() -> tuple[bool, str]:
"""Check TTS and STT are reachable."""
msgs = []
ok = True
for name, url, path in [("TTS", TTS_URL, "/"), ("STT", STT_URL, "/")]:
try:
r = httpx.get(url + path, timeout=5)
msgs.append(f"{name}: reachable (HTTP {r.status_code})")
except Exception as e:
msgs.append(f"{name}: NOT REACHABLE — {e}")
ok = False
return ok, " | ".join(msgs)
def preflight_checks(skip_gpu_check: bool = False) -> bool:
print("\n── Pre-flight checks ──────────────────────────────────────────")
ram_ok, ram_msg = check_ram()
print(f" {'' if ram_ok else ''} {ram_msg}")
if not ram_ok:
print("\nABORTING: not enough RAM.")
return False
if not skip_gpu_check:
gpu_ok, gpu_msg = check_gpu()
print(f" {'' if gpu_ok else ''} {gpu_msg}")
if not gpu_ok:
print("\nABORTING: GPU is busy.")
return False
svc_ok, svc_msg = check_services()
print(f" {'' if svc_ok else ''} {svc_msg}")
if not svc_ok:
print("\nABORTING: required voice services not running.")
print("Start them with: cd /home/alvis/agap_git/openai && docker compose up -d faster-whisper silero-tts")
return False
print(" All checks passed.\n")
return True
# ── TTS ────────────────────────────────────────────────────────────────────────
async def synthesize(client: httpx.AsyncClient, text: str) -> bytes | None:
"""Synthesize text to WAV via Silero TTS (OpenAI-compatible /v1/audio/speech)."""
try:
r = await client.post(
f"{TTS_URL}/v1/audio/speech",
json={"model": "tts-1", "input": text, "voice": "alloy", "response_format": "wav"},
timeout=30,
)
r.raise_for_status()
return r.content
except Exception as e:
print(f"\n [TTS error: {e}]", end="")
return None
# ── STT ────────────────────────────────────────────────────────────────────────
async def transcribe(client: httpx.AsyncClient, wav_bytes: bytes) -> str | None:
"""Transcribe WAV to text via faster-whisper (OpenAI-compatible /v1/audio/transcriptions)."""
try:
files = {"file": ("audio.wav", wav_bytes, "audio/wav")}
data = {"model": "whisper-1", "language": "ru", "response_format": "json"}
r = await client.post(
f"{STT_URL}/v1/audio/transcriptions",
files=files,
data=data,
timeout=60,
)
r.raise_for_status()
result = r.json()
return result.get("text", "").strip()
except Exception as e:
print(f"\n [STT error: {e}]", end="")
return None
# ── WER ────────────────────────────────────────────────────────────────────────
def normalize(text: str) -> str:
"""Lowercase, strip punctuation, normalize unicode for WER calculation."""
text = unicodedata.normalize("NFC", text.lower())
text = re.sub(r"[^\w\s]", " ", text)
return re.sub(r"\s+", " ", text).strip()
def word_error_rate(reference: str, hypothesis: str) -> float:
"""Compute WER between reference and hypothesis."""
ref = normalize(reference).split()
hyp = normalize(hypothesis).split()
if not ref:
return 0.0 if not hyp else 1.0
# Dynamic programming edit distance
d = [[0] * (len(hyp) + 1) for _ in range(len(ref) + 1)]
for i in range(len(ref) + 1):
d[i][0] = i
for j in range(len(hyp) + 1):
d[0][j] = j
for i in range(1, len(ref) + 1):
for j in range(1, len(hyp) + 1):
if ref[i - 1] == hyp[j - 1]:
d[i][j] = d[i - 1][j - 1]
else:
d[i][j] = 1 + min(d[i - 1][j], d[i][j - 1], d[i - 1][j - 1])
return d[len(ref)][len(hyp)] / len(ref)
# ── Adolf interaction ──────────────────────────────────────────────────────────
def get_log_tail(n: int = 60) -> str:
result = subprocess.run(
["docker", "logs", "deepagents", "--tail", str(n)],
capture_output=True, text=True,
)
return result.stdout + result.stderr
def extract_tier_from_logs(logs_before: str, logs_after: str) -> str | None:
before_lines = set(logs_before.splitlines())
new_lines = [line for line in logs_after.splitlines() if line not in before_lines]
for line in new_lines:
m = re.search(r"tier=(\w+(?:\s*\(no-inference\))?)", line)
if m:
return m.group(1).split()[0]
return None
async def post_to_adolf(
client: httpx.AsyncClient,
query_id: int,
text: str,
no_inference: bool = False,
) -> bool:
payload = {
"text": text,
"session_id": f"voice-bench-{query_id}",
"channel": "cli",
"user_id": "benchmark",
"metadata": {"no_inference": no_inference, "benchmark": True, "voice": True},
}
try:
r = await client.post(f"{ADOLF_URL}/message", json=payload, timeout=10)
r.raise_for_status()
return True
except Exception as e:
print(f"\n [Adolf error: {e}]", end="")
return False
# ── Dataset ────────────────────────────────────────────────────────────────────
def load_dataset() -> list[dict]:
with open(DATASET) as f:
return json.load(f)["queries"]
def filter_queries(queries, tier, category, ids):
if tier:
queries = [q for q in queries if q["tier"] == tier]
if category:
queries = [q for q in queries if q["category"] == category]
if ids:
queries = [q for q in queries if q["id"] in ids]
return queries
# ── Main run ───────────────────────────────────────────────────────────────────
async def run(queries: list[dict], no_inference: bool = False, save_audio: bool = False) -> None:
async with httpx.AsyncClient() as client:
# Check Adolf
try:
r = await client.get(f"{ADOLF_URL}/health", timeout=5)
r.raise_for_status()
except Exception as e:
print(f"ERROR: Adolf not reachable: {e}", file=sys.stderr)
sys.exit(1)
total = len(queries)
results = []
dry_label = " [NO-INFERENCE: routing only]" if no_inference else ""
print(f"Voice benchmark: {total} queries{dry_label}\n")
print(f"{'ID':>3} {'EXP':8} {'ACT':8} {'OK':3} {'WER':5} {'TRANSCRIPT'}")
print("" * 100)
total_wer = 0.0
wer_count = 0
correct = 0
for q in queries:
qid = q["id"]
expected = q["tier"]
original = q["query"]
print(f"{qid:>3} {expected:8} ", end="", flush=True)
# Step 1: TTS
wav = await synthesize(client, original)
if wav is None:
print(f"{'?':8} {'ERR':3} {'?':5} [TTS failed]")
results.append({"id": qid, "expected": expected, "actual": None, "ok": False, "wer": None, "error": "tts"})
continue
if save_audio:
audio_path = RESULTS_DIR / f"voice_audio" / f"{qid}.wav"
audio_path.parent.mkdir(exist_ok=True)
audio_path.write_bytes(wav)
# Step 2: STT
transcript = await transcribe(client, wav)
if transcript is None:
print(f"{'?':8} {'ERR':3} {'?':5} [STT failed]")
results.append({"id": qid, "expected": expected, "actual": None, "ok": False, "wer": None, "error": "stt"})
continue
# Calculate WER
wer = word_error_rate(original, transcript)
total_wer += wer
wer_count += 1
# Step 3: Send to Adolf
logs_before = get_log_tail(60)
t0 = time.monotonic()
ok_post = await post_to_adolf(client, qid, transcript, no_inference=no_inference)
if not ok_post:
print(f"{'?':8} {'ERR':3} {wer:4.2f} {transcript[:50]}")
results.append({"id": qid, "expected": expected, "actual": None, "ok": False, "wer": wer, "transcript": transcript})
continue
# Step 4: Wait for routing decision
actual = None
for _ in range(TIER_WAIT * 2):
await asyncio.sleep(0.5)
logs_after = get_log_tail(60)
actual = extract_tier_from_logs(logs_before, logs_after)
if actual and actual in ("light", "medium", "complex", "fast"):
break
elapsed = time.monotonic() - t0
match = actual == expected or (actual == "fast" and expected == "medium")
if match:
correct += 1
mark = "" if match else ""
actual_str = actual or "?"
print(f"{actual_str:8} {mark:3} {wer:4.2f} {transcript[:60]}")
results.append({
"id": qid,
"expected": expected,
"actual": actual_str,
"ok": match,
"wer": round(wer, 3),
"original": original,
"transcript": transcript,
"elapsed": round(elapsed, 1),
"no_inference": no_inference,
})
await asyncio.sleep(0.5)
print("" * 100)
# Summary
accuracy = correct / total * 100 if total else 0
avg_wer = total_wer / wer_count * 100 if wer_count else 0
print(f"\nRouting accuracy: {correct}/{total} ({accuracy:.0f}%)")
print(f"Average WER: {avg_wer:.1f}% (lower is better; 0% = perfect transcription)")
for tier_name in ["light", "medium", "complex"]:
tier_qs = [r for r in results if r["expected"] == tier_name]
if tier_qs:
tier_ok = sum(1 for r in tier_qs if r["ok"])
tier_wers = [r["wer"] for r in tier_qs if r.get("wer") is not None]
avg = sum(tier_wers) / len(tier_wers) * 100 if tier_wers else 0
print(f" {tier_name:8}: routing {tier_ok}/{len(tier_qs)} avg WER {avg:.1f}%")
wrong = [r for r in results if not r["ok"]]
if wrong:
print(f"\nMisclassified after voice ({len(wrong)}):")
for r in wrong:
print(f" id={r['id']:3} expected={r.get('expected') or '?':8} actual={r.get('actual') or '?':8} transcript={r.get('transcript','')[:50]}")
high_wer = [r for r in results if r.get("wer") and r["wer"] > 0.3]
if high_wer:
print(f"\nHigh WER queries (>30%) — transcription quality issues:")
for r in high_wer:
print(f" id={r['id']:3} WER={r['wer']*100:.0f}% original: {r.get('original','')[:50]}")
print(f" transcript: {r.get('transcript','')[:50]}")
# Save results
ts = int(time.time())
out_path = RESULTS_DIR / f"voice_results_{ts}.json"
latest_path = RESULTS_DIR / "voice_results_latest.json"
with open(out_path, "w") as f:
json.dump({"summary": {"accuracy": accuracy, "avg_wer": avg_wer, "total": total}, "results": results}, f, indent=2, ensure_ascii=False)
with open(latest_path, "w") as f:
json.dump({"summary": {"accuracy": accuracy, "avg_wer": avg_wer, "total": total}, "results": results}, f, indent=2, ensure_ascii=False)
print(f"\nResults saved to {latest_path}")
def main():
parser = argparse.ArgumentParser(
description="Adolf voice benchmark — TTS→STT→routing pipeline",
epilog="Requires: Silero TTS (port 8881) and faster-whisper (port 8880) running."
)
parser.add_argument("--tier", choices=["light", "medium", "complex"])
parser.add_argument("--category")
parser.add_argument("--ids", help="Comma-separated IDs")
parser.add_argument("--no-inference", action="store_true",
help="Skip LLM inference for all tiers — routing decisions only (no GPU/API cost)")
parser.add_argument("--save-audio", action="store_true",
help="Save synthesized WAV files to voice_audio/ directory")
parser.add_argument("--skip-gpu-check", action="store_true")
args = parser.parse_args()
if not preflight_checks(skip_gpu_check=args.skip_gpu_check or args.no_inference):
sys.exit(1)
queries = load_dataset()
ids = [int(i) for i in args.ids.split(",")] if args.ids else None
queries = filter_queries(queries, args.tier, args.category, ids)
if not queries:
print("No queries match filters.")
sys.exit(1)
asyncio.run(run(queries, no_inference=args.no_inference, save_audio=args.save_audio))
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,21 @@
{ {
"auth_config": {
"is_enabled": true,
"admin_username": "admin",
"admin_password": "env.BIFROST_ADMIN_PASSWORD"
},
"config_store": {
"enabled": true,
"type": "postgres",
"config": {
"host": "bifrost-db",
"port": "5432",
"user": "bifrost",
"password": "bifrost",
"db_name": "bifrost",
"ssl_mode": "disable"
}
},
"client": { "client": {
"drop_excess_requests": false "drop_excess_requests": false
}, },

View File

@@ -49,6 +49,7 @@ async def deliver(session_id: str, channel: str, text: str) -> None:
# ── built-in channel adapters ───────────────────────────────────────────────── # ── built-in channel adapters ─────────────────────────────────────────────────
GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001") GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001")
MATRIX_URL = os.getenv("MATRIX_URL", "http://matrix:3002")
async def _telegram_send(session_id: str, text: str) -> None: async def _telegram_send(session_id: str, text: str) -> None:
@@ -64,12 +65,26 @@ async def _telegram_send(session_id: str, text: str) -> None:
) )
async def _matrix_send(session_id: str, text: str) -> None:
"""Send reply to Matrix via the matrix adapter POST /send endpoint."""
room_id = session_id.removeprefix("mx-")
MAX_MTX = 4000
chunks = [text[i:i + MAX_MTX] for i in range(0, len(text), MAX_MTX)]
async with httpx.AsyncClient(timeout=15) as client:
for chunk in chunks:
await client.post(
f"{MATRIX_URL}/send",
json={"room_id": room_id, "text": chunk},
)
async def _cli_send(session_id: str, text: str) -> None: async def _cli_send(session_id: str, text: str) -> None:
"""CLI replies are handled entirely through the pending_replies queue — no-op here.""" """CLI replies are handled entirely through the pending_replies queue — no-op here."""
pass pass
def register_defaults() -> None: def register_defaults() -> None:
"""Register the built-in Telegram and CLI channel adapters.""" """Register the built-in Telegram, Matrix, and CLI channel adapters."""
register("telegram", _telegram_send) register("telegram", _telegram_send)
register("matrix", _matrix_send)
register("cli", _cli_send) register("cli", _cli_send)

View File

@@ -1,19 +1,4 @@
services: services:
bifrost:
image: maximhq/bifrost
container_name: bifrost
ports:
- "8080:8080"
volumes:
- ./bifrost-config.json:/app/data/config.json:ro
environment:
- APP_DIR=/app/data
- APP_PORT=8080
- LOG_LEVEL=info
extra_hosts:
- "host.docker.internal:host-gateway"
restart: unless-stopped
deepagents: deepagents:
build: . build: .
container_name: deepagents container_name: deepagents
@@ -21,25 +6,28 @@ services:
- "8000:8000" - "8000:8000"
environment: environment:
- PYTHONUNBUFFERED=1 - PYTHONUNBUFFERED=1
# Bifrost gateway — all LLM inference goes through here # LiteLLM proxy — all LLM inference goes through here
- BIFROST_URL=http://bifrost:8080/v1 - LITELLM_URL=http://host.docker.internal:4000/v1
- LITELLM_API_KEY=sk-fjQC1BxAiGFSMs
# Direct Ollama GPU URL — used only by VRAMManager for flush/prewarm # Direct Ollama GPU URL — used only by VRAMManager for flush/prewarm
- OLLAMA_BASE_URL=http://host.docker.internal:11436 - OLLAMA_BASE_URL=http://host.docker.internal:11436
- DEEPAGENTS_MODEL=qwen3:4b - DEEPAGENTS_MODEL=qwen3:4b
- DEEPAGENTS_COMPLEX_MODEL=qwen3:8b - DEEPAGENTS_COMPLEX_MODEL=deepseek/deepseek-r1:free
- DEEPAGENTS_ROUTER_MODEL=qwen2.5:1.5b - DEEPAGENTS_ROUTER_MODEL=qwen2.5:1.5b
- SEARXNG_URL=http://host.docker.internal:11437 - SEARXNG_URL=http://host.docker.internal:11437
- GRAMMY_URL=http://grammy:3001 - GRAMMY_URL=http://grammy:3001
- MATRIX_URL=http://host.docker.internal:3002
- CRAWL4AI_URL=http://crawl4ai:11235 - CRAWL4AI_URL=http://crawl4ai:11235
- ROUTECHECK_URL=http://routecheck:8090 - ROUTECHECK_URL=http://routecheck:8090
- ROUTECHECK_TOKEN=${ROUTECHECK_TOKEN} - ROUTECHECK_TOKEN=${ROUTECHECK_TOKEN}
volumes:
- ./logs:/app/logs
extra_hosts: extra_hosts:
- "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"
depends_on: depends_on:
- openmemory - openmemory
- grammy - grammy
- crawl4ai - crawl4ai
- bifrost
- routecheck - routecheck
restart: unless-stopped restart: unless-stopped

View File

@@ -35,13 +35,22 @@ class FastTool(ABC):
async def run(self, message: str) -> str: ... async def run(self, message: str) -> str: ...
_WMO_CODES = {
0: "clear sky", 1: "mainly clear", 2: "partly cloudy", 3: "overcast",
45: "fog", 48: "icy fog",
51: "light drizzle", 53: "drizzle", 55: "heavy drizzle",
61: "light rain", 63: "rain", 65: "heavy rain",
71: "light snow", 73: "snow", 75: "heavy snow", 77: "snow grains",
80: "light showers", 81: "showers", 82: "heavy showers",
85: "snow showers", 86: "heavy snow showers",
95: "thunderstorm", 96: "thunderstorm with hail", 99: "thunderstorm with heavy hail",
}
class WeatherTool(FastTool): class WeatherTool(FastTool):
""" """
Fetches current weather for the user's location (Balashikha, Moscow region) Fetches current weather for Balashikha, Moscow region directly from open-meteo.com.
by querying SearXNG, which has external internet access. No API key required. Returns a ready-to-deliver reply — no LLM reformatting needed.
Triggered by any weather-related query. The Router also forces medium tier
when this tool matches so the richer model handles the injected data.
""" """
_PATTERN = re.compile( _PATTERN = re.compile(
@@ -51,11 +60,13 @@ class WeatherTool(FastTool):
re.IGNORECASE, re.IGNORECASE,
) )
# Fixed query — always fetch home location weather _URL = (
_SEARCH_QUERY = "погода Балашиха сейчас" # Russian query → Celsius sources "https://api.open-meteo.com/v1/forecast"
"?latitude=55.7963&longitude=37.9382"
def __init__(self, searxng_url: str): "&current=temperature_2m,apparent_temperature,relative_humidity_2m"
self._searxng_url = searxng_url ",wind_speed_10m,weather_code"
"&wind_speed_unit=ms"
)
@property @property
def name(self) -> str: def name(self) -> str:
@@ -65,31 +76,24 @@ class WeatherTool(FastTool):
return bool(self._PATTERN.search(message)) return bool(self._PATTERN.search(message))
async def run(self, message: str) -> str: async def run(self, message: str) -> str:
"""Query SearXNG for Balashikha weather and return current conditions snippet."""
try: try:
async with httpx.AsyncClient(timeout=15) as client: async with httpx.AsyncClient(timeout=10) as client:
r = await client.get( r = await client.get(self._URL)
f"{self._searxng_url}/search",
params={"q": self._SEARCH_QUERY, "format": "json"},
)
r.raise_for_status() r.raise_for_status()
items = r.json().get("results", [])[:5] c = r.json()["current"]
except Exception as e: except Exception as e:
return f"[weather error: {e}]" return f"[weather error: {e}]"
if not items: temp = c["temperature_2m"]
return "" feels = c["apparent_temperature"]
humidity = c["relative_humidity_2m"]
wind = c["wind_speed_10m"]
condition = _WMO_CODES.get(c.get("weather_code", 0), "unknown")
# Prefer results whose snippets contain actual current conditions return (
lines = ["Current weather data for Balashikha, Moscow region (temperatures in °C):\n"] f"Balashikha: {condition}, {temp:+.0f}°C (feels like {feels:+.0f}°C), "
for item in items: f"wind {wind:.1f} m/s, humidity {humidity}%."
snippet = item.get("content", "") )
title = item.get("title", "")
url = item.get("url", "")
if snippet:
lines.append(f"[{title}]\n{snippet}\nSource: {url}\n")
return "\n".join(lines) if len(lines) > 1 else ""
class CommuteTool(FastTool): class CommuteTool(FastTool):

458
router.py
View File

@@ -1,11 +1,38 @@
import asyncio
import re import re
import math
from typing import Optional from typing import Optional
from openai import AsyncOpenAI
from langchain_core.messages import SystemMessage, HumanMessage from langchain_core.messages import SystemMessage, HumanMessage
from fast_tools import FastToolRunner from fast_tools import FastToolRunner
# ── Regex pre-classifier ───────────────────────────────────────────────────── # ── Regex pre-classifiers ─────────────────────────────────────────────────────
# Catches obvious light-tier patterns before calling the LLM.
# Keyed by regex → compiled pattern. # Complex: keyword triggers that reliably signal deep multi-source research
_COMPLEX_PATTERNS = re.compile(
r"(?:^|\s)("
r"research|investigate|deep.dive|think carefully"
r"|write a (?:detailed|comprehensive|full|thorough|complete)"
r"|compare all|find and (?:compare|summarize|analyze)"
r"|in[- ]depth analysis|comprehensive guide"
r"|detailed (?:report|analysis|comparison|breakdown|overview)"
r"|everything about|all (?:major|available|self-hosted|open.source)"
r"|pros and cons|with (?:sources|citations|references)"
# Russian complex research keywords (no trailing \b — stems like подробн match подробное/подробный)
r"|исследуй|изучи все|сравни все|найди и сравни|найди и опиши"
r"|напиши подробн|напиши детальн|напиши полн"
r"|подробный отчет|детальн\w+ (?:анализ|сравнение|отчет)"
r"|подробное (?:руководство|сравнение)|полное руководство"
r"|все варианты|все способы|все доступные|все самохостируемые|все платформы"
r"|лучшие практики|все инструменты|все решения|все протоколы"
r"|найди детальн|найди и кратко опиши"
r"|изучи свежие|изучи лучши|изучи все"
r"|сравни все\b"
r")",
re.IGNORECASE,
)
# Light: trivial queries that need no tools or memory
_LIGHT_PATTERNS = re.compile( _LIGHT_PATTERNS = re.compile(
r"^(" r"^("
# Greetings / farewells # Greetings / farewells
@@ -15,35 +42,316 @@ _LIGHT_PATTERNS = re.compile(
r"|thanks?|thank you|thx|ty|ok|okay|k|cool|great|awesome|perfect|sounds good|got it|nice|sure" r"|thanks?|thank you|thx|ty|ok|okay|k|cool|great|awesome|perfect|sounds good|got it|nice|sure"
r"|how are you|how are you\?|how are you doing(\s+today)?[?!.]*" r"|how are you|how are you\?|how are you doing(\s+today)?[?!.]*"
r"|what.?s up" r"|what.?s up"
# Calendar facts: "what day comes after X?" / "what comes after X?" # Calendar facts
r"|what\s+day\s+(comes\s+after|follows|is\s+after)\s+\w+[?!.]*" r"|what\s+day\s+(comes\s+after|follows|is\s+after)\s+\w+[?!.]*"
r"|what\s+comes\s+after\s+\w+[?!.]*" r"|what\s+comes\s+after\s+\w+[?!.]*"
# Acronym expansions: "what does X stand for?" # Acronym expansions
r"|what\s+does\s+\w+\s+stand\s+for[?!.]*" r"|what\s+does\s+\w+\s+stand\s+for[?!.]*"
# Russian greetings / farewells / acknowledgements
r"|привет|пока|спасибо|здравствуй|здравствуйте|добрый день|добрый вечер|доброе утро"
r"|окей|хорошо|отлично|понятно|ок|ладно|договорились|спс|благодарю"
r"|пожалуйста|не за что|всё понятно|ясно"
r"|как дела|как ты|как жизнь|всё хорошо|всё ок"
# Assistant control words / confirmations
r"|да|нет|стоп|отмена|отменить|подожди|повтори|повторить|не нужно|не надо"
r"|слышишь\s+меня|ты\s+тут|отлично[,!]?\s+спасибо"
r"|yes|no|stop|cancel|wait|repeat"
# Russian tech definitions — static knowledge (no tools needed)
r"|что\s+такое\s+\S+"
r"|что\s+означает\s+\S+"
r"|сколько\s+(?:бит|байт|байтов|мегабайт|мегабайтов|гигабайт|гигабайтов)(?:\s+\w+)*"
# Compound Russian greetings
r"|привет[,!]?\s+как\s+дела"
r"|добрый\s+(?:день|вечер|утро)[,!]?\s+как\s+дела"
r")[\s!.?]*$", r")[\s!.?]*$",
re.IGNORECASE, re.IGNORECASE,
) )
# ── LLM classification prompt ───────────────────────────────────────────────── # ── Semantic router utterances ────────────────────────────────────────────────
CLASSIFY_PROMPT = """Classify the message. Output ONLY one word: light, medium, or complex. # These are embedded at startup. New messages are classified by cosine
# similarity — whichever tier's centroid is closest wins.
_LIGHT_UTTERANCES = [
# General facts (English)
"what is 2+2",
"what is the capital of France",
"name the three primary colors",
"tell me a short joke",
"is the sky blue",
"is water wet",
"how many days in a week",
"what is the speed of light",
"what is the boiling point of water",
"spell the word beautiful",
"what color is the ocean",
"how many inches in a foot",
"who wrote hamlet",
"what is pi",
"what year did world war two end",
"what is the largest planet",
"how many continents are there",
"what does DNA stand for",
"what language do they speak in Brazil",
"what is the square root of 144",
# Tech definitions — static knowledge (English)
"what is Docker",
"what is a VPN",
"what is SSH",
"what is a reverse proxy",
"what is an API",
"what is a firewall",
"what is a container",
"what is DNS",
"what is HTTPS",
"what is a load balancer",
"what is Kubernetes",
"what is Git",
"what is a network port",
"what is an IP address",
"what is a subnet mask",
"what is the OSI model",
"how many bits in a byte",
"how many bytes in a gigabyte",
"what is TCP",
"what is a REST API",
# Russian — static facts and definitions
"что такое IP-адрес",
"что такое VPN",
"что такое Docker",
"что такое DNS",
"что такое SSH",
"что означает API",
"сколько байт в гигабайте",
"сколько бит в байте",
"что такое Zigbee",
"что такое Z-Wave",
"что такое брандмауэр",
"что такое виртуальная машина",
"что такое обратный прокси",
"привет",
"пока",
"спасибо",
"как дела",
"что такое Matter протокол",
"сколько планет в солнечной системе",
"чему равно число Пи",
# Russian — more static definitions
"что такое TCP/IP",
"что такое подсеть",
"скорость света",
"сколько дней в году",
"что такое Kubernetes",
"что такое Git",
"что такое REST API",
"что такое TCP",
"что такое UDP",
"что такое VLAN",
"сколько мегабайт в гигабайте",
"что такое процессор",
"что такое оперативная память",
"что такое виртуализация",
"что такое Linux",
"что такое умный дом",
"что такое Home Assistant",
"что такое Matter",
]
LIGHT = answerable from general knowledge, no internet needed: _MEDIUM_UTTERANCES = [
what is 2+2 / what is the capital of France / name the three primary colors # English — current data, memory, actions
tell me a short joke / is the sky blue / is water wet "what is the weather today",
"what is the bitcoin price right now",
"what are the latest news",
"what did we talk about last time",
"what is my name",
"where do I live",
"what do you know about me",
"what did I tell you before",
"what is the current temperature outside",
"remind me what I said about my project",
"search for the latest iPhone release",
"find me a restaurant nearby",
"turn on the lights in the living room",
"turn off all lights",
"set temperature to 22 degrees",
"what is the current traffic to Moscow",
"check if anyone is home",
"what devices are currently on",
"look up my public IP address",
"show me recent news about Proxmox",
# Russian — weather and commute
"какая сегодня погода в Балашихе",
"пойдет ли сегодня дождь",
"какая температура на улице сейчас",
"погода на завтра",
"будет ли снег сегодня",
"сколько ехать до Москвы сейчас",
"какие пробки на дороге до Москвы",
"время в пути на работу",
"есть ли пробки сейчас",
"стоит ли брать зонтик",
# Russian — smart home control
"включи свет в гостиной",
"выключи свет на кухне",
"какая температура дома",
"установи температуру 22 градуса",
"выключи все лампочки",
"какие устройства сейчас включены",
"включи ночной режим",
"открой шторы в гостиной",
"включи свет в спальне на 50 процентов",
"выключи свет во всём доме",
"включи вентилятор в детской",
"закрыты ли все окна",
"выключи телевизор",
"какое потребление электричества сегодня",
"включи кофемашину",
"сколько у нас датчиков движения",
"состояние всех дверных замков",
"есть ли кто-нибудь дома",
"установи будильник на 7 утра",
# Russian — personal memory
"как меня зовут",
"где я живу",
"что мы обсуждали в прошлый раз",
"что ты знаешь о моем домашнем сервере",
"напомни, какие сервисы я запускаю",
"что я просил тебя запомнить",
"что я говорил о своей сети",
# Russian — current info lookups requiring network/tools
"какой сейчас курс биткоина",
"курс доллара к рублю сейчас",
"какая последняя версия Docker",
"как перезапустить Docker контейнер",
"как посмотреть логи Docker контейнера",
"какие новые функции в Home Assistant 2024",
"есть ли проблемы у Cloudflare сегодня",
"какие новые Zigbee устройства вышли в 2024 году",
"найди хороший опенсорс менеджер фотографий",
"последние новости Proxmox",
"напиши bash команду для поиска больших файлов",
"как вывести список всех запущенных контейнеров",
"как проверить использование диска в Linux",
]
MEDIUM = requires web search or the user's stored memories: _COMPLEX_UTTERANCES = [
current weather / today's news / Bitcoin price / what did we talk about # English
what is my name / where do I live / what is my job / do I have any pets "research everything about Elon Musk's recent projects and investments",
what do you know about me / what are my preferences / what did I tell you "write a detailed report on climate change solutions with sources",
"investigate the history and current state of quantum computing",
"find and summarize the latest academic papers on transformer architectures",
"analyze in depth the pros and cons of nuclear energy with citations",
"research the background and controversies around this person",
"compare all major cloud providers with detailed pricing and features",
"write a comprehensive biography of this historical figure",
"investigate what caused the 2008 financial crisis with multiple sources",
"research the best programming languages in 2024 with detailed comparison",
"find everything published about this medical condition and treatments",
"do a deep dive into the latest developments in artificial general intelligence",
"research and compare all options for starting a business in Europe",
"investigate recent news and controversies around this company",
"write a thorough analysis of geopolitical tensions in the Middle East",
"find detailed information on the side effects and studies for this medication",
"research the top 10 JavaScript frameworks with benchmarks and community data",
"investigate who is funding AI research and what their goals are",
"write a detailed market analysis for the electric vehicle industry",
"research everything you can find about this startup or technology",
# Russian — deep research
"исследуй и сравни все варианты умного домашнего освещения",
"напиши подробный отчет о протоколах умного дома",
"изучи все самохостируемые медиасерверы и сравни их",
"исследуй лучшие практики безопасности домашнего сервера",
"сравни все системы резервного копирования для Linux",
"напиши детальное сравнение WireGuard и OpenVPN",
"исследуй все варианты голосового управления на русском языке",
"изучи все опенсорс альтернативы Google сервисам",
"напиши подробный анализ локальных языковых моделей",
"исследуй лучшие инструменты мониторинга для домашнего сервера",
# Russian — more deep research queries matching benchmark
"исследуй и сравни Proxmox, Unraid и TrueNAS для домашней лаборатории",
"напиши подробное руководство по безопасности домашнего сервера",
"исследуй все доступные дашборды для самохостинга и сравни их",
"найди детальные бенчмарки ARM одноплатных компьютеров для домашней лаборатории",
"исследуй лучший стек мониторинга для самохостинга в 2024 году",
"исследуй и сравни WireGuard, OpenVPN и Tailscale для домашней сети",
"исследуй лучшие практики сегментации домашней сети с VLAN",
"изучи все самохостируемые DNS решения и их возможности",
"исследуй и сравни все платформы умного дома: Home Assistant и другие",
"изучи лучшие Zigbee координаторы и их совместимость с Home Assistant",
"напиши детальный отчет о поддержке протокола Matter и совместимости устройств",
"исследуй все способы интеграции умных ламп с Home Assistant",
"найди и сравни все варианты датчиков движения для умного дома",
"исследуй и сравни все самохостируемые решения для хранения фотографий",
"изучи лучшие самохостируемые медиасерверы: Jellyfin, Plex и Emby",
"исследуй последние достижения в локальном LLM инференсе и обзор моделей",
"изучи лучшие опенсорс альтернативы Google сервисов для приватности",
"найди и кратко опиши все крупные самохостируемые менеджеры паролей",
"напиши детальный анализ текущего состояния AI ассистентов для самохостинга",
"исследуй и сравни все инструменты оркестрации контейнеров для домашней лаборатории",
"изучи лучшие подходы к автоматическому резервному копированию в Linux",
"исследуй и сравни все самохостируемые инструменты личных финансов",
"изучи свежие CVE и уязвимости в популярном самохостируемом ПО",
"напиши подробное руководство по настройке автоматизаций в Home Assistant",
"исследуй все варианты голосового управления умным домом на русском языке",
"сравни все системы резервного копирования для Linux: Restic, BorgBackup и другие",
"исследуй лучшие самохостируемые системы мониторинга сети: Zabbix, Grafana",
"изучи все варианты локального запуска языковых моделей на видеокарте",
"напиши подробный отчет о технологиях синтеза речи с открытым исходным кодом",
"исследуй все способы интеграции умных розеток с мониторингом потребления",
"напиши полное руководство по настройке обратного прокси Caddy",
"исследуй лучшие практики написания Docker Compose файлов для продакшена",
"сравни все самохостируемые облачные хранилища: Nextcloud, Seafile и другие",
"изучи все доступные локальные ассистенты с голосовым управлением",
"исследуй все самохостируемые решения для блокировки рекламы: Pi-hole, AdGuard",
"напиши детальное сравнение систем управления конфигурацией: Ansible, Puppet",
"исследуй все протоколы умного дома и их плюсы и минусы: Zigbee, Z-Wave, Matter",
"найди и сравни все фреймворки для создания локальных AI ассистентов",
"исследуй лучшие решения для автоматического управления медиатекой",
"изучи все варианты самохостируемых систем учёта расходов с возможностью импорта",
"напиши сравнение всех вариантов самохостинга для хранения и синхронизации файлов",
"исследуй все открытые протоколы для умного дома и их экосистемы",
"изучи лучшие инструменты для автоматизации домашней инфраструктуры",
]
COMPLEX = /think prefix only: # Medium: queries that require tools, actions, or real-time data (not static knowledge)
/think compare frameworks / /think plan a trip _MEDIUM_PATTERNS = re.compile(
r"(?:"
Message: {message} # Russian smart home commands — always need HA integration
Output (one word only — light, medium, or complex):""" r"(?:включи|выключи|открой|закрой|установи|поставь|убавь|прибавь|переключи)\s"
r"|(?:какая|какой|какое|каково)\s+(?:температура|влажность|потребление|состояние|статус)\s"
r"|(?:сколько|есть ли)\s.*(?:датчик|устройств|замк)"
# Russian memory queries
r"|как меня зовут|где я живу|что мы обсуждали|что я говорил|что я просил"
r"|напомни\b|что ты знаешь обо мне"
# Russian current info
r"|курс (?:доллара|биткоина|евро|рубл)"
r"|(?:последние |свежие )?новости\b"
r"|(?:погода|температура)\s+(?:на завтра|на неделю)"
# Smart home commands that don't use verb-first pattern
r"|(?:свет|лампочк|освещени)\w*\s+(?:включ|выключ|убавь|прибавь)"
r"|(?:дома|в доме|по всему дому)\s+(?:свет|лампочк)"
r"|(?:режим|сцена)\s+(?:ночной|утренний|вечерний|кинотеатр)"
r")",
re.IGNORECASE,
)
LIGHT_REPLY_PROMPT = """You are a helpful Telegram assistant. Answer briefly and naturally (1-3 sentences). Be friendly.""" LIGHT_REPLY_PROMPT = """You are a helpful Telegram assistant. Answer briefly and naturally (1-3 sentences). Be friendly."""
_EMBED_MODEL = "ollama/nomic-embed-text"
def _cosine(a: list[float], b: list[float]) -> float:
dot = sum(x * y for x, y in zip(a, b))
norm_a = math.sqrt(sum(x * x for x in a))
norm_b = math.sqrt(sum(x * x for x in b))
if norm_a == 0 or norm_b == 0:
return 0.0
return dot / (norm_a * norm_b)
def _centroid(embeddings: list[list[float]]) -> list[float]:
n = len(embeddings)
dim = len(embeddings[0])
return [sum(embeddings[i][d] for i in range(n)) / n for d in range(dim)]
def _format_history(history: list[dict]) -> str: def _format_history(history: list[dict]) -> str:
if not history: if not history:
@@ -56,71 +364,97 @@ def _format_history(history: list[dict]) -> str:
return "\n".join(lines) return "\n".join(lines)
def _parse_tier(text: str) -> str:
"""Extract tier from raw model output. Default to medium."""
t = text.strip().lower()
snippet = t[:60]
if "complex" in snippet:
return "complex"
if "medium" in snippet:
return "medium"
if "light" in snippet:
return "light"
# Model invented a descriptive category (e.g. "simplefact", "trivial", "basic") →
# treat as light since it recognised the question doesn't need tools
if any(w in snippet for w in ("simple", "fact", "trivial", "basic", "easy", "general")):
return "light"
return "medium" # safe default
class Router: class Router:
def __init__(self, model, fast_tool_runner: FastToolRunner | None = None): def __init__(
self.model = model self,
model,
embedder: AsyncOpenAI,
fast_tool_runner: FastToolRunner | None = None,
):
self.model = model # qwen2.5:1.5b — used only for generating light replies
self._embedder = embedder
self._fast_tool_runner = fast_tool_runner self._fast_tool_runner = fast_tool_runner
self._light_centroid: list[float] | None = None
self._medium_centroid: list[float] | None = None
self._complex_centroid: list[float] | None = None
async def initialize(self) -> None:
"""Pre-compute utterance embeddings. Call once at startup. Retries until LiteLLM is ready."""
print("[router] embedding utterances for semantic classifier...", flush=True)
texts = _LIGHT_UTTERANCES + _MEDIUM_UTTERANCES + _COMPLEX_UTTERANCES
for attempt in range(10):
try:
resp = await self._embedder.embeddings.create(model=_EMBED_MODEL, input=texts)
embeddings = [item.embedding for item in resp.data]
n_light = len(_LIGHT_UTTERANCES)
n_medium = len(_MEDIUM_UTTERANCES)
self._light_centroid = _centroid(embeddings[:n_light])
self._medium_centroid = _centroid(embeddings[n_light:n_light + n_medium])
self._complex_centroid = _centroid(embeddings[n_light + n_medium:])
print("[router] semantic classifier ready (3-tier)", flush=True)
return
except Exception as e:
print(f"[router] embedding attempt {attempt+1}/10 failed: {e}", flush=True)
await asyncio.sleep(3)
print("[router] WARNING: could not initialize semantic classifier — will default to medium", flush=True)
async def _classify_by_embedding(self, message: str) -> str:
"""Embed message and return 'light', 'medium', or 'complex' based on centroid similarity."""
if self._light_centroid is None or self._medium_centroid is None or self._complex_centroid is None:
return "medium"
try:
resp = await self._embedder.embeddings.create(model=_EMBED_MODEL, input=[message])
emb = resp.data[0].embedding
score_light = _cosine(emb, self._light_centroid)
score_medium = _cosine(emb, self._medium_centroid)
score_complex = _cosine(emb, self._complex_centroid)
tier = max(
[("light", score_light), ("medium", score_medium), ("complex", score_complex)],
key=lambda x: x[1],
)[0]
print(
f"[router] semantic: light={score_light:.3f} medium={score_medium:.3f} "
f"complex={score_complex:.3f}{tier}",
flush=True,
)
return tier
except Exception as e:
print(f"[router] embedding classify error, defaulting to medium: {e}", flush=True)
return "medium"
async def route( async def route(
self, self,
message: str, message: str,
history: list[dict], history: list[dict],
force_complex: bool = False, no_inference: bool = False,
) -> tuple[str, Optional[str]]: ) -> tuple[str, Optional[str]]:
""" """
Returns (tier, reply_or_None). Returns (tier, reply_or_None).
For light tier: also generates the reply with a second call. For light tier: also generates the reply inline (unless no_inference=True).
For medium/complex: reply is None. For medium/complex: reply is None.
""" """
if force_complex:
return "complex", None
# Step 0a: force medium if any fast tool matches (live-data queries)
if self._fast_tool_runner and self._fast_tool_runner.any_matches(message.strip()): if self._fast_tool_runner and self._fast_tool_runner.any_matches(message.strip()):
names = self._fast_tool_runner.matching_names(message.strip()) names = self._fast_tool_runner.matching_names(message.strip())
print(f"[router] fast_tool_match={names} → medium", flush=True) print(f"[router] fast_tool_match={names} → medium", flush=True)
return "medium", None return "medium", None
# Step 0b: regex pre-classification for obvious light patterns
if _LIGHT_PATTERNS.match(message.strip()): if _LIGHT_PATTERNS.match(message.strip()):
print(f"[router] regex→light", flush=True) print("[router] regex→light", flush=True)
if no_inference:
return "light", None
return await self._generate_light_reply(message, history) return await self._generate_light_reply(message, history)
# Step 1: LLM classification with raw text output if _COMPLEX_PATTERNS.search(message.strip()):
try: print("[router] regex→complex", flush=True)
classify_response = await self.model.ainvoke([ return "complex", None
HumanMessage(content=CLASSIFY_PROMPT.format(message=message)),
])
raw = classify_response.content or ""
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
tier = _parse_tier(raw)
if tier == "complex" and not message.startswith("/think"): if _MEDIUM_PATTERNS.search(message.strip()):
tier = "medium" print("[router] regex→medium", flush=True)
print(f"[router] raw={raw[:30]!r} → tier={tier}", flush=True)
except Exception as e:
print(f"[router] classify error, defaulting to medium: {e}", flush=True)
return "medium", None return "medium", None
if tier != "light": tier = await self._classify_by_embedding(message)
if tier != "light" or no_inference:
return tier, None return tier, None
return await self._generate_light_reply(message, history) return await self._generate_light_reply(message, history)
@@ -128,7 +462,7 @@ class Router:
async def _generate_light_reply( async def _generate_light_reply(
self, message: str, history: list[dict] self, message: str, history: list[dict]
) -> tuple[str, Optional[str]]: ) -> tuple[str, Optional[str]]:
"""Generate a short reply using the router model for light-tier messages.""" """Generate a short reply using qwen2.5:1.5b for light-tier messages."""
history_text = _format_history(history) history_text = _format_history(history)
context = f"\nConversation history:\n{history_text}" if history else "" context = f"\nConversation history:\n{history_text}" if history else ""
try: try:

View File

@@ -11,7 +11,7 @@ import urllib.request
# ── config ──────────────────────────────────────────────────────────────────── # ── config ────────────────────────────────────────────────────────────────────
DEEPAGENTS = "http://localhost:8000" DEEPAGENTS = "http://localhost:8000"
BIFROST = "http://localhost:8080" LITELLM = "http://localhost:4000"
OPENMEMORY = "http://localhost:8765" OPENMEMORY = "http://localhost:8765"
GRAMMY_HOST = "localhost" GRAMMY_HOST = "localhost"
GRAMMY_PORT = 3001 GRAMMY_PORT = 3001
@@ -156,19 +156,6 @@ def fetch_logs(since_s=600):
return [] return []
def fetch_bifrost_logs(since_s=120):
"""Return bifrost container log lines from the last since_s seconds."""
try:
r = subprocess.run(
["docker", "compose", "-f", COMPOSE_FILE, "logs", "bifrost",
f"--since={int(since_s)}s", "--no-log-prefix"],
capture_output=True, text=True, timeout=10,
)
return r.stdout.splitlines()
except Exception:
return []
def parse_run_block(lines, msg_prefix): def parse_run_block(lines, msg_prefix):
""" """
Scan log lines for the LAST '[agent] running: <msg_prefix>' block. Scan log lines for the LAST '[agent] running: <msg_prefix>' block.
@@ -199,14 +186,13 @@ def parse_run_block(lines, msg_prefix):
if txt: if txt:
last_ai_text = txt last_ai_text = txt
m = re.search(r"replied in ([\d.]+)s \(llm=([\d.]+)s, send=([\d.]+)s\)", line) m = re.search(r"replied in ([\d.]+)s(?:\s+tier=(\w+))?", line)
if m: if m:
tier_m = re.search(r"\btier=(\w+)", line) tier = m.group(2) if m.group(2) else "unknown"
tier = tier_m.group(1) if tier_m else "unknown"
reply_data = { reply_data = {
"reply_total": float(m.group(1)), "reply_total": float(m.group(1)),
"llm": float(m.group(2)), "llm": None,
"send": float(m.group(3)), "send": None,
"tier": tier, "tier": tier,
"reply_text": last_ai_text, "reply_text": last_ai_text,
"memory_s": None, "memory_s": None,

View File

@@ -6,7 +6,7 @@ Tests:
1. Name store — POST "remember that your name is <RandomName>" 1. Name store — POST "remember that your name is <RandomName>"
2. Qdrant point — verifies a new vector was written after store 2. Qdrant point — verifies a new vector was written after store
3. Name recall — POST "what is your name?" → reply must contain <RandomName> 3. Name recall — POST "what is your name?" → reply must contain <RandomName>
4. Bifrost — verifies store/recall requests passed through Bifrost 4. LiteLLM — verifies LiteLLM proxy is reachable (replaced Bifrost)
5. Timing profile — breakdown of store and recall latencies 5. Timing profile — breakdown of store and recall latencies
6. Memory benchmark — store 5 personal facts, recall with 10 questions 6. Memory benchmark — store 5 personal facts, recall with 10 questions
7. Dedup test — same fact stored twice must not grow Qdrant by 2 points 7. Dedup test — same fact stored twice must not grow Qdrant by 2 points
@@ -24,11 +24,11 @@ import time
import urllib.request import urllib.request
from common import ( from common import (
DEEPAGENTS, QDRANT, COMPOSE_FILE, DEFAULT_CHAT_ID, DEEPAGENTS, LITELLM, QDRANT, COMPOSE_FILE, DEFAULT_CHAT_ID,
NAMES, NAMES,
INFO, PASS, FAIL, WARN, INFO, PASS, FAIL, WARN,
report, print_summary, tf, report, print_summary, tf,
get, post_json, qdrant_count, fetch_logs, fetch_bifrost_logs, get, post_json, qdrant_count, fetch_logs,
parse_run_block, wait_for, parse_run_block, wait_for,
) )
@@ -155,14 +155,13 @@ if _run_name:
report(results, "Agent replied to recall message", False, "timeout") report(results, "Agent replied to recall message", False, "timeout")
report(results, f"Reply contains '{random_name}'", False, "no reply") report(results, f"Reply contains '{random_name}'", False, "no reply")
# ── 4. Bifrost pass-through check ───────────────────────────────────────── # ── 4. LiteLLM proxy reachable (replaced Bifrost) ─────────────────────────
bifrost_lines = fetch_bifrost_logs(since_s=300) try:
report(results, "Bifrost container has log output (requests forwarded)", status, _ = get(f"{LITELLM}/health", timeout=5)
len(bifrost_lines) > 0, f"{len(bifrost_lines)} lines in bifrost logs") litellm_ok = status == 200
bifrost_raw = "\n".join(bifrost_lines) except Exception:
report(results, " Bifrost log shows AsyncOpenAI agent requests", litellm_ok = False
"AsyncOpenAI" in bifrost_raw, report(results, "LiteLLM proxy reachable", litellm_ok)
f"{'found' if 'AsyncOpenAI' in bifrost_raw else 'NOT found'} in bifrost logs")
# ── 5. Timing profile ───────────────────────────────────────────────────── # ── 5. Timing profile ─────────────────────────────────────────────────────
print(f"\n[{INFO}] 5. Timing profile") print(f"\n[{INFO}] 5. Timing profile")