14 Commits

Author SHA1 Message Date
8ef4897869 Fix tier logging: capture actual_tier, fix parse_run_block regex, remove reply_text truncation
- Add tier_capture param to _run_agent_pipeline; append tier after determination
- Capture actual_tier in run_agent_task from tier_capture list
- Log tier in replied-in line: [agent] replied in Xs tier=Y
- Remove reply_text[:200] truncation (was breaking benchmark keyword matching)
- Update parse_run_block regex to match new log format; llm/send fields now None

Fixes #1, #3, #4

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:41:59 +00:00
Alvis
1f5e272600 Switch from Bifrost to LiteLLM; add Matrix channel; update rules
Infrastructure:
- docker-compose.yml: replace bifrost container with LiteLLM proxy
  (host.docker.internal:4000); complex model → deepseek-r1:free via
  OpenRouter; add Matrix URL env var; mount logs volume
- bifrost-config.json: add auth_config + postgres config_store (archived)

Routing:
- router.py: full semantic 3-tier classifier rewrite — nomic-embed-text
  centroids for light/medium/complex; regex pre-classifiers for all tiers;
  Russian utterance sets expanded
- agent.py: wire LiteLLM URL; add dry_run support; add Matrix channel

Channels:
- channels.py: add Matrix adapter (_matrix_send via mx- session prefix)

Rules / docs:
- agent-pipeline.md: remove /think prefix requirement; document automatic
  complex tier classification
- llm-inference.md: update BIFROST_URL → LITELLM_URL references; add
  remote model note for complex tier
- ARCHITECTURE.md: deleted (superseded by README.md)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:14:13 +00:00
Alvis
54cb940279 Update docs: add benchmarks/ section, fix complex tier description
- CLAUDE.md: add benchmark commands (run_benchmark.py flags, dry-run,
  categories, voice benchmark)
- README.md: add benchmarks/ to Files tree; fix incorrect claim that
  complex tier requires /think prefix — it is auto-classified via regex
  and embedding similarity; fix "Complex agent (/think prefix)" heading

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:13:14 +00:00
Alvis
bd951f943f Move benchmark scripts into benchmarks/ subdir
- benchmarks/run_benchmark.py (was run_benchmark.py)
- benchmarks/run_voice_benchmark.py (was run_voice_benchmark.py)
- Scripts use Path(__file__).parent so paths resolve correctly in subdir
- .gitignore updated: ignore benchmarks/benchmark.json,
  results_latest.json, voice_results*.json, voice_audio/

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:02:46 +00:00
Alvis
ab68bba935 Add routing benchmark scripts; gitignore dataset and results
- run_benchmark.py: sends queries to /message, extracts tier= from docker
  logs, reports per-tier accuracy, saves results_latest.json
- run_voice_benchmark.py: voice path benchmark
- .gitignore: ignore benchmark.json (dataset) and results_latest.json
  (runtime output); benchmark scripts are tracked, data files are not

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:00:17 +00:00
Alvis
3ae1cefbd4 WeatherTool: fetch open-meteo directly, skip LLM for fast tool replies
- Replace SearXNG search with direct open-meteo.com API call (no key needed)
- WeatherTool now returns a ready-to-deliver reply string
- agent.py: short-circuit router+LLM when fast tools return a result (tier=fast)
- router.py: fast tool match no longer triggers light reply generation

Weather latency: 105-190s → ~1s

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 09:42:55 +00:00
Alvis
957360f6ce Restructure CLAUDE.md per official Claude Code recommendations
CLAUDE.md: 178→25 lines — commands + @ARCHITECTURE.md import only

Rules split into .claude/rules/ (load at startup, topic-scoped):
  llm-inference.md  — Bifrost-only, semaphore, model name format, timeouts
  agent-pipeline.md — tier rules, no tools in medium, memory outside loop
  fast-tools.md     — extension guide (path-scoped: fast_tools.py + agent.py)
  secrets.md        — .env keys, Vaultwarden, no hardcoding

Path-scoped rule: fast-tools.md only loads when editing fast_tools.py or agent.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-13 07:19:09 +00:00
Alvis
3ed47b45da Split CLAUDE.md per official Claude Code recommendations
CLAUDE.md: lean — commands, key conventions, fast tool guide, @ARCHITECTURE.md import
routecheck/CLAUDE.md: purpose, access paths, env vars, gotchas
openmemory/CLAUDE.md: tools, two Ollama instances, prompts, notes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-13 07:15:51 +00:00
Alvis
eba805f787 Update docs: fast tools, routecheck service, commute tool
- Request flow: add fast_tool_runner.run_matching() to pre-flight gather
- New Fast Tools section: WeatherTool + CommuteTool table, extension guide
- New routecheck section: captcha UI, internal API, proxy requirements
- Services table: add routecheck:8090
- Files tree: add fast_tools.py, routecheck/, updated .env note

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-13 07:10:30 +00:00
Alvis
32089ed596 Add routecheck service and CommuteTool fast tool
routecheck/ — FastAPI service (port 8090):
  - Image captcha (PIL: arithmetic problem, noise, wave distortion)
  - POST /api/captcha/new + /api/captcha/solve → short-lived token
  - GET /api/route?from=lat,lon&to=lat,lon&token=... → Yandex Routing API
  - Internal bypass via INTERNAL_TOKEN env var (for CommuteTool)
  - HTTPS proxy forwarded to reach Yandex API from container

CommuteTool (fast_tools.py):
  - Matches commute/traffic/arrival time queries
  - Calls routecheck /api/route with ROUTECHECK_TOKEN
  - Hardcoded route: Balashikha home → Moscow center
  - Returns traffic-adjusted travel time + delay annotation

Needs: YANDEX_ROUTING_KEY + ROUTECHECK_TOKEN in .env

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 07:08:48 +00:00
Alvis
d2ca1926f8 WeatherTool: use Russian query for Celsius sources
'погода Балашиха сейчас' returns Russian weather sites (gismeteo,
meteotrend) that report in °C, vs English queries which return
Fahrenheit snippets that the model misreads as Celsius.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 06:25:53 +00:00
Alvis
af181ba7ec Rename RealTimeSearchTool → WeatherTool, fetch Balashikha weather via SearXNG
WeatherTool queries SearXNG with a fixed 'weather Balashikha Moscow now'
query instead of passing the user message as-is. SearXNG has external
internet access and returns snippets with actual current conditions.
Direct wttr.in fetch not possible — deepagents container has no external
internet routing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 05:40:10 +00:00
Alvis
f5fc2e9bfb Introduce FastTools: pre-flight classifier + context enrichment
New fast_tools.py module:
- FastTool base class (matches + run interface)
- RealTimeSearchTool: SearXNG search for weather/news/prices/scores
- FastToolRunner: classifier that checks all tools, runs matching
  ones concurrently and returns combined context

Router accepts FastToolRunner; any_matches() forces medium tier
before LLM classification (replaces _MEDIUM_FORCE_PATTERNS regex).

agent.py: _REALTIME_RE and _searxng_search_async removed; pre-flight
gather now includes fast_tool_runner.run_matching() alongside URL
fetch and memory retrieval.

To add a new fast tool: subclass FastTool, add to the list in agent.py.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 05:18:44 +00:00
Alvis
436299f7e2 Add real-time query handling: pre-search enrichment + routing fix
- router.py: add _MEDIUM_FORCE_PATTERNS to block weather/news/price
  queries from light tier regardless of LLM classification
- agent.py: add _REALTIME_RE and _searxng_search_async(); real-time
  queries now run SearXNG search concurrently with URL fetch + memory
  retrieval, injecting snippets into medium system prompt
- tests/use_cases/weather_now.md: use case test for weather queries

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 05:08:08 +00:00
22 changed files with 2342 additions and 364 deletions

View File

@@ -0,0 +1,22 @@
# Agent Pipeline Rules
## Tiers
- Routing is fully automatic: router classifies into light/medium/complex via 3-way embedding similarity.
- Complex tier is reached automatically for deep research queries — no prefix required.
- Medium is the default tier. Light is only for trivial static-knowledge queries matched by regex or embedding.
- Light tier upgrade to medium is automatic when URL content is pre-fetched or a fast tool matches.
- `tier_override` API parameter still allows callers to force a specific tier (e.g. `adolf-deep` model → complex).
## Medium agent
- `_DirectModel` makes a single `ainvoke()` call with no tool schema. Do not add tools to the medium agent.
- `qwen3:4b` behaves unreliably when a tool array is present in the request — inject context via system prompt instead.
## Memory
- `add_memory` and `search_memory` are called directly in `run_agent_task()`, outside the agent loop.
- Never add memory tools to any agent's tool list.
- Memory storage (`_store_memory`) runs as an asyncio background task after the semaphore is released.
## Fast tools
- `FastToolRunner.run_matching()` runs in the pre-flight `asyncio.gather` alongside URL fetch and memory retrieval.
- Fast tool results are injected as a system prompt block, not returned to the user directly.
- When `any_matches()` is true, the router forces medium tier before LLM classification.

View File

@@ -0,0 +1,24 @@
---
paths:
- "fast_tools.py"
- "agent.py"
---
# Fast Tools — Extension Guide
To add a new fast tool:
1. In `fast_tools.py`, subclass `FastTool` and implement:
- `name` (str property) — unique identifier, used in logs
- `matches(message: str) -> bool` — regex or logic; keep it cheap, runs on every message
- `run(message: str) -> str` — async fetch; return a short context block or `""` on failure; never raise
2. In `agent.py`, add an instance to the `_fast_tool_runner` list (module level, after env vars are defined).
3. The router will automatically force medium tier when `matches()` returns true — no router changes needed.
## Constraints
- `run()` must return in under 15s — it runs in the pre-flight gather that blocks routing.
- Return `""` or a `[tool error: ...]` string on failure — never raise exceptions.
- Keep returned context under ~1000 chars — larger contexts slow down `qwen3:4b` streaming significantly.
- The deepagents container has no direct external internet. Use SearXNG (`host.docker.internal:11437`) or internal services.

View File

@@ -0,0 +1,8 @@
# LLM Inference Rules
- All LLM calls must use `base_url=LITELLM_URL` (points to LiteLLM at `host.docker.internal:4000/v1`). Never call Ollama directly for inference.
- `_reply_semaphore` (asyncio.Semaphore(1)) serializes all GPU inference. Never bypass it or add a second semaphore.
- Local Ollama models use the `ollama/` prefix: `ollama/qwen3:4b`, `ollama/qwen2.5:1.5b`. Remote models (e.g. OpenRouter) use their full LiteLLM name: `openrouter/deepseek-r1`.
- Timeout values: router=30s, medium=180s, complex=600s. Do not reduce them.
- `VRAMManager` is the only component that contacts Ollama directly (for flush/prewarm/poll). This is intentional — LiteLLM cannot manage VRAM.
- Complex tier uses a remote model (`DEEPAGENTS_COMPLEX_MODEL`) — no VRAM management is needed for it.

7
.claude/rules/secrets.md Normal file
View File

@@ -0,0 +1,7 @@
# Secrets and Environment
- `.env` is required at project root and must never be committed. It is in `.gitignore`.
- Required keys: `TELEGRAM_BOT_TOKEN`, `ROUTECHECK_TOKEN`, `YANDEX_ROUTING_KEY`.
- `ROUTECHECK_TOKEN` is a shared secret between `deepagents` and `routecheck` containers — generate once with `python3 -c "import uuid; print(uuid.uuid4())"`.
- All tokens are stored in Vaultwarden (AI collection). Fetch with `bw get password "<NAME>"` — see `~/.claude/CLAUDE.md` for the full procedure.
- Do not hardcode tokens, URLs, or credentials anywhere in source code.

6
.gitignore vendored
View File

@@ -1,2 +1,8 @@
__pycache__/
*.pyc
logs/*.jsonl
adolf_tuning_data/voice_audio/
benchmarks/benchmark.json
benchmarks/results_latest.json
benchmarks/voice_results*.json
benchmarks/voice_audio/

167
CLAUDE.md
View File

@@ -4,151 +4,38 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
## Commands
**Start all services:**
```bash
# Start all services
docker compose up --build
```
**Interactive CLI (Docker container, requires gateway running):**
```bash
# Interactive CLI (requires services running)
docker compose --profile tools run --rm -it cli
# or with options:
docker compose --profile tools run --rm -it cli python3 cli.py --url http://deepagents:8000 --session cli-alvis
# Integration tests — run from tests/integration/, require all services up
python3 test_health.py
python3 test_memory.py [--name-only|--bench-only|--dedup-only]
python3 test_routing.py [--easy-only|--medium-only|--hard-only]
# Use case tests — read the .md file and follow its steps as Claude Code
# example: read tests/use_cases/weather_now.md and execute it
# Routing benchmark — measures tier classification accuracy across 120 queries
# Run from benchmarks/ — Adolf must be running. DO NOT run during active use (holds GPU).
cd benchmarks
python3 run_benchmark.py # full run (120 queries)
python3 run_benchmark.py --tier light # light tier only (30 queries)
python3 run_benchmark.py --tier medium # medium tier only (50 queries)
python3 run_benchmark.py --tier complex --dry-run # complex tier, medium model (no API cost)
python3 run_benchmark.py --category smart_home_control
python3 run_benchmark.py --ids 1,2,3
python3 run_benchmark.py --list-categories
# Voice benchmark
python3 run_voice_benchmark.py
# benchmark.json (dataset) and results_latest.json are gitignored — not committed
```
**Run integration tests** (from `tests/integration/`, require all Docker services running):
```bash
python3 test_health.py # service health: deepagents, bifrost, Ollama, Qdrant, SearXNG
python3 test_memory.py # name store/recall + memory benchmark + dedup
python3 test_memory.py --name-only # only name store/recall pipeline
python3 test_memory.py --bench-only # only 5-fact store + 10-question recall
python3 test_memory.py --dedup-only # only deduplication test
python3 test_routing.py # all routing benchmarks (easy + medium + hard)
python3 test_routing.py --easy-only # light-tier routing benchmark
python3 test_routing.py --medium-only # medium-tier routing benchmark
python3 test_routing.py --hard-only # complex-tier + VRAM flush benchmark
```
Shared config and helpers are in `tests/integration/common.py`.
**Use case tests** (`tests/use_cases/`) — markdown skill files executed by Claude Code, which acts as mock user and quality evaluator. Run by reading the `.md` file and following its steps with tools (Bash, WebFetch, etc.).
## Architecture
Adolf is a multi-channel personal assistant. All LLM inference is routed through **Bifrost**, an open-source Go-based LLM gateway that adds retry logic, failover, and observability in front of Ollama.
### Request flow
```
Channel adapter → POST /message {text, session_id, channel, user_id}
→ 202 Accepted (immediate)
→ background: run_agent_task()
→ asyncio.gather(
_fetch_urls_from_message() ← Crawl4AI, concurrent
_retrieve_memories() ← openmemory search, concurrent
)
→ router.route() → tier decision (light/medium/complex)
if URL content fetched → upgrade light→medium
→ invoke agent for tier via Bifrost (url_context + memories in system prompt)
deepagents:8000 → bifrost:8080/v1 → ollama:11436
→ _push_stream_chunk() per token (medium streaming) / full reply (light, complex)
→ _stream_queues[session_id] asyncio.Queue
→ _end_stream() sends [DONE] sentinel
→ channels.deliver(session_id, channel, reply)
→ channel-specific callback (Telegram POST)
→ _store_memory() background task (openmemory)
CLI streaming → GET /stream/{session_id} (SSE, per-token for medium, single-chunk for others)
```
### Bifrost integration
Bifrost (`bifrost-config.json`) is configured with the `ollama` provider pointing to the GPU Ollama instance on host port 11436. It exposes an OpenAI-compatible API at `http://bifrost:8080/v1`.
`agent.py` uses `langchain_openai.ChatOpenAI` with `base_url=BIFROST_URL`. Model names use the `provider/model` format that Bifrost expects: `ollama/qwen3:4b`, `ollama/qwen3:8b`, `ollama/qwen2.5:1.5b`. Bifrost strips the `ollama/` prefix before forwarding to Ollama.
`VRAMManager` bypasses Bifrost and talks directly to Ollama via `OLLAMA_BASE_URL` (host:11436) for flush/poll/prewarm operations — Bifrost cannot manage GPU VRAM.
### Three-tier routing (`router.py`, `agent.py`)
| Tier | Model (env var) | Trigger |
|------|-----------------|---------|
| light | `qwen2.5:1.5b` (`DEEPAGENTS_ROUTER_MODEL`) | Regex pre-match or LLM classifies "light" — answered by router model directly, no agent invoked |
| medium | `qwen3:4b` (`DEEPAGENTS_MODEL`) | Default for tool-requiring queries |
| complex | `qwen3:8b` (`DEEPAGENTS_COMPLEX_MODEL`) | `/think ` prefix only |
The router does regex pre-classification first, then LLM classification. Complex tier is blocked unless the message starts with `/think ` — any LLM classification of "complex" is downgraded to medium.
A global `asyncio.Semaphore(1)` (`_reply_semaphore`) serializes all LLM inference — one request at a time.
### Thinking mode and streaming
qwen3 models produce chain-of-thought `<think>...</think>` tokens. Handling differs by tier:
- **Medium** (`qwen3:4b`): streams via `astream()`. A state machine (`in_think` flag) filters `<think>` blocks in real time — only non-think tokens are pushed to `_stream_queues` and displayed to the user.
- **Complex** (`qwen3:8b`): `create_deep_agent` returns a complete reply; `_strip_think()` filters think blocks before the reply is pushed as a single chunk.
- **Router/light** (`qwen2.5:1.5b`): no thinking support; `_strip_think()` used defensively.
`_strip_think()` in `agent.py` and `router.py` strips any `<think>` blocks from non-streaming output.
### VRAM management (`vram_manager.py`)
Hardware: GTX 1070 (8 GB). Before running the 8b model, medium models are flushed via Ollama `keep_alive=0`, then `/api/ps` is polled (15s timeout) to confirm eviction. On timeout, falls back to medium tier. After complex reply, 8b is flushed and medium models are pre-warmed as a background task.
### Channel adapters (`channels.py`)
- **Telegram**: Grammy Node.js bot (`grammy/bot.mjs`) long-polls Telegram → `POST /message`; replies delivered via `POST grammy:3001/send`
- **CLI**: `cli.py` (Docker container, `profiles: [tools]`) posts to `/message`, then streams from `GET /stream/{session_id}` SSE with Rich `Live` display and final Markdown render.
Session IDs: `tg-<chat_id>` for Telegram, `cli-<username>` for CLI. Conversation history: 5-turn buffer per session.
### Services (`docker-compose.yml`)
| Service | Port | Role |
|---------|------|------|
| `bifrost` | 8080 | LLM gateway — retries, failover, observability; config from `bifrost-config.json` |
| `deepagents` | 8000 | FastAPI gateway + agent core |
| `openmemory` | 8765 | FastMCP server + mem0 memory tools (Qdrant-backed) |
| `grammy` | 3001 | grammY Telegram bot + `/send` HTTP endpoint |
| `crawl4ai` | 11235 | JS-rendered page fetching |
| `cli` | — | Interactive CLI container (`profiles: [tools]`), Rich streaming display |
External (from `openai/` stack, host ports):
- Ollama GPU: `11436` — all reply inference (via Bifrost) + VRAM management (direct)
- Ollama CPU: `11435` — nomic-embed-text embeddings for openmemory
- Qdrant: `6333` — vector store for memories
- SearXNG: `11437` — web search
### Bifrost config (`bifrost-config.json`)
The file is mounted into the bifrost container at `/app/data/config.json`. It declares one Ollama provider key pointing to `host.docker.internal:11436` with 2 retries and 300s timeout. To add fallback providers or adjust weights, edit this file and restart the bifrost container.
### Crawl4AI integration
Crawl4AI is embedded at all levels of the pipeline:
- **Pre-routing (all tiers)**: `_fetch_urls_from_message()` detects URLs in any message via `_URL_RE`, fetches up to 3 URLs concurrently with `_crawl4ai_fetch_async()` (async httpx). URL content is injected as a system context block into enriched history before routing, and into the system prompt for medium/complex agents.
- **Tier upgrade**: if URL content is successfully fetched, light tier is upgraded to medium (light model cannot process page content).
- **Complex agent tools**: `web_search` (SearXNG + Crawl4AI auto-fetch of top 2 results) and `fetch_url` (single-URL Crawl4AI fetch) remain available for the complex agent's agentic loop. Complex tier also receives the pre-fetched content in system prompt to avoid redundant re-fetching.
MCP tools from openmemory (`add_memory`, `search_memory`, `get_all_memories`) are **excluded** from agent tools — memory management is handled outside the agent loop.
### Medium vs Complex agent
| Agent | Builder | Speed | Use case |
|-------|---------|-------|----------|
| medium | `_DirectModel` (single LLM call, no tools) | ~3s | General questions, conversation |
| complex | `create_deep_agent` (deepagents) | Slow — multi-step planner | Deep research via `/think` prefix |
### Key files
- `agent.py` — FastAPI app, lifespan wiring, `run_agent_task()`, Crawl4AI pre-fetch, memory pipeline, all endpoints
- `bifrost-config.json` — Bifrost provider config (Ollama GPU, retries, timeouts)
- `channels.py` — channel registry and `deliver()` dispatcher
- `router.py``Router` class: regex + LLM classification, light-tier reply generation
- `vram_manager.py``VRAMManager`: flush/poll/prewarm Ollama VRAM directly
- `agent_factory.py``build_medium_agent` (`_DirectModel`, single call) / `build_complex_agent` (`create_deep_agent`)
- `openmemory/server.py` — FastMCP + mem0 config with custom extraction/dedup prompts
- `wiki_research.py` — batch research pipeline using `/message` + SSE polling
- `grammy/bot.mjs` — Telegram long-poll + HTTP `/send` endpoint
@README.md

View File

@@ -5,6 +5,6 @@ WORKDIR /app
RUN pip install --no-cache-dir deepagents langchain-openai langgraph \
fastapi uvicorn langchain-mcp-adapters langchain-community httpx
COPY agent.py channels.py vram_manager.py router.py agent_factory.py hello_world.py .
COPY agent.py channels.py vram_manager.py router.py agent_factory.py fast_tools.py hello_world.py ./
CMD ["uvicorn", "agent:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -54,9 +54,11 @@ Autonomous personal assistant with a multi-channel gateway. Three-tier model rou
3. 202 Accepted immediately
4. Background: run_agent_task(message, session_id, channel)
5. Parallel IO (asyncio.gather):
a. _fetch_urls_from_message() — Crawl4AI fetches any URLs in message
b. _retrieve_memories() — openmemory semantic search for context
6. router.route() with enriched history (url_context + memories as system msgs)
a. _fetch_urls_from_message() — Crawl4AI fetches any URLs in message
b. _retrieve_memories() — openmemory semantic search for context
c. _fast_tool_runner.run_matching() — FastTools (weather, commute) if pattern matches
6. router.route() with enriched history (url_context + fast_context + memories)
- fast tool match → force medium (real-time data, no point routing to light)
- if URL content fetched and tier=light → upgrade to medium
7. Invoke agent for tier with url_context + memories in system prompt
8. Token streaming:
@@ -72,7 +74,7 @@ Autonomous personal assistant with a multi-channel gateway. Three-tier model rou
Adolf uses LangChain's tool interface but only the complex agent actually invokes tools at runtime.
**Complex agent (`/think` prefix):** `web_search` and `fetch_url` are defined as `langchain_core.tools.Tool` objects and passed to `create_deep_agent()`. The deepagents library runs an agentic loop (LangGraph `create_react_agent` under the hood) that sends the tool schema to the model via OpenAI function-calling format and handles tool dispatch.
**Complex agent:** `web_search` and `fetch_url` are defined as `langchain_core.tools.Tool` objects and passed to `create_deep_agent()`. The deepagents library runs an agentic loop (LangGraph `create_react_agent` under the hood) that sends the tool schema to the model via OpenAI function-calling format and handles tool dispatch.
**Medium agent (default):** `_DirectModel` makes a single `model.ainvoke(messages)` call with no tool schema. Context (memories, fetched URL content) is injected via the system prompt instead. This is intentional — `qwen3:4b` behaves unreliably when a tool array is present.
@@ -82,13 +84,40 @@ Adolf uses LangChain's tool interface but only the complex agent actually invoke
| Tier | Model | Agent | Trigger | Latency |
|------|-------|-------|---------|---------|
| Light | `qwen2.5:1.5b` (router answers directly) | — | Regex pre-match or LLM classifies "light" | ~24s |
| Light | `qwen2.5:1.5b` (router answers directly) | — | Regex pre-match or 3-way embedding classifies "light" | ~24s |
| Medium | `qwen3:4b` (`DEEPAGENTS_MODEL`) | `_DirectModel` — single LLM call, no tools | Default; also forced when message contains URLs | ~1020s |
| Complex | `qwen3:8b` (`DEEPAGENTS_COMPLEX_MODEL`) | `create_deep_agent` — agentic loop with tools | `/think` prefix only | ~60120s |
| Complex | `deepseek/deepseek-r1:free` via LiteLLM (`DEEPAGENTS_COMPLEX_MODEL`) | `create_deep_agent` — agentic loop with tools | Auto-classified by embedding similarity | ~3090s |
**`/think` prefix**: forces complex tier, stripped before sending to agent.
Routing is fully automatic via 3-way cosine similarity over pre-embedded utterance centroids (light / medium / complex). No prefix required. Use `adolf-deep` model name to force complex tier via API.
Complex tier is locked out unless the message starts with `/think` — any LLM classification of "complex" is downgraded to medium.
Complex tier is reached automatically for deep research queries — `исследуй`, `изучи все`, `напиши подробный`, etc. — via regex pre-classifier and embedding similarity. No prefix required. Use `adolf-deep` model name to force it via API.
## Fast Tools (`fast_tools.py`)
Pre-flight tools that run concurrently with URL fetch and memory retrieval before any LLM call. Each tool has two methods:
- `matches(message) → bool` — regex classifier; also used by `Router` to force medium tier
- `run(message) → str` — async fetch returning a context block injected into system prompt
`FastToolRunner` holds all tools. `any_matches()` is called by the Router at step 0a; `run_matching()` is called in the pre-flight `asyncio.gather` in `run_agent_task()`.
| Tool | Pattern | Source | Context returned |
|------|---------|--------|-----------------|
| `WeatherTool` | weather/forecast/temperature/snow/rain | SearXNG `"погода Балашиха сейчас"` | Current conditions in °C from Russian weather sites |
| `CommuteTool` | commute/traffic/arrival/пробки | `routecheck:8090/api/route` (Yandex Routing API) | Drive time with/without traffic, Balashikha→Moscow |
**To add a new fast tool:** subclass `FastTool` in `fast_tools.py`, implement `name`/`matches`/`run`, add an instance to `_fast_tool_runner` in `agent.py`.
## routecheck Service (`routecheck/`)
Local web service on port 8090. Exists because Yandex Routing API free tier requires a web UI that uses the API.
**Web UI** (`http://localhost:8090`): PIL-generated arithmetic captcha → lat/lon form → travel time result.
**Internal API**: `GET /api/route?from=lat,lon&to=lat,lon&token=ROUTECHECK_TOKEN` — bypasses captcha, used by `CommuteTool`. The `ROUTECHECK_TOKEN` shared secret is set in `.env` and passed to both `routecheck` and `deepagents` containers.
Yandex API calls are routed through the host HTTPS proxy (`host.docker.internal:56928`) since the container has no direct external internet access.
**Requires** `.env`: `YANDEX_ROUTING_KEY` (free from `developer.tech.yandex.ru`) + `ROUTECHECK_TOKEN`.
## Crawl4AI Integration
@@ -135,20 +164,29 @@ Conversation history is keyed by session_id (5-turn buffer).
```
adolf/
├── docker-compose.yml Services: bifrost, deepagents, openmemory, grammy, crawl4ai, cli (profile:tools)
├── docker-compose.yml Services: deepagents, openmemory, grammy, crawl4ai, routecheck, cli
├── Dockerfile deepagents container (Python 3.12)
├── Dockerfile.cli CLI container (python:3.12-slim + rich)
├── agent.py FastAPI gateway, run_agent_task, Crawl4AI pre-fetch, memory pipeline, /stream/ SSE
├── agent.py FastAPI gateway, run_agent_task, Crawl4AI pre-fetch, fast tools, memory pipeline
├── fast_tools.py FastTool base, FastToolRunner, WeatherTool, CommuteTool
├── channels.py Channel registry + deliver() + pending_replies
├── router.py Router class — regex + LLM tier classification
├── router.py Router class — regex + LLM tier classification, FastToolRunner integration
├── vram_manager.py VRAMManager — flush/prewarm/poll Ollama VRAM
├── agent_factory.py _DirectModel (medium) / create_deep_agent (complex)
├── cli.py Interactive CLI REPL — Rich Live streaming + Markdown render
├── wiki_research.py Batch wiki research pipeline (uses /message + SSE)
├── benchmarks/
│ ├── run_benchmark.py Routing accuracy benchmark — 120 queries across 3 tiers
│ ├── run_voice_benchmark.py Voice path benchmark
│ ├── benchmark.json Query dataset (gitignored)
│ └── results_latest.json Last run results (gitignored)
├── .env TELEGRAM_BOT_TOKEN, ROUTECHECK_TOKEN, YANDEX_ROUTING_KEY (not committed)
├── routecheck/
│ ├── app.py FastAPI: image captcha + /api/route Yandex proxy
│ └── Dockerfile
├── tests/
│ ├── integration/ Standalone integration test scripts (common.py + test_*.py)
│ └── use_cases/ Claude Code skill markdown files — Claude acts as user + evaluator
├── .env TELEGRAM_BOT_TOKEN (not committed)
├── openmemory/
│ ├── server.py FastMCP + mem0: add_memory, search_memory, get_all_memories
│ └── Dockerfile
@@ -162,7 +200,9 @@ adolf/
| Service | Host Port | Role |
|---------|-----------|------|
| Ollama GPU | 11436 | All LLM inference (via Bifrost) + VRAM management (direct) + memory extraction |
| LiteLLM | 4000 | LLM proxy — all inference goes through here (`LITELLM_URL` env var) |
| Ollama GPU | 11436 | GPU inference backend + VRAM management (direct) + memory extraction |
| Ollama CPU | 11435 | nomic-embed-text embeddings for openmemory |
| Langfuse | 3200 | LLM observability — traces all requests via LiteLLM callbacks |
| Qdrant | 6333 | Vector store for memories |
| SearXNG | 11437 | Web search (used by `web_search` tool) |

471
agent.py
View File

@@ -1,7 +1,9 @@
import asyncio
import json as _json_module
import os
import time
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, BackgroundTasks, Request
from fastapi.responses import JSONResponse, StreamingResponse
@@ -16,6 +18,7 @@ _URL_RE = _re.compile(r'https?://[^\s<>"\']+')
def _extract_urls(text: str) -> list[str]:
return _URL_RE.findall(text)
from openai import AsyncOpenAI
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_community.utilities import SearxSearchWrapper
@@ -24,10 +27,12 @@ from langchain_core.tools import Tool
from vram_manager import VRAMManager
from router import Router
from agent_factory import build_medium_agent, build_complex_agent
from fast_tools import FastToolRunner, WeatherTool, CommuteTool
import channels
# Bifrost gateway — all LLM inference goes through here
BIFROST_URL = os.getenv("BIFROST_URL", "http://bifrost:8080/v1")
# LiteLLM proxy — all LLM inference goes through here
LITELLM_URL = os.getenv("LITELLM_URL", "http://host.docker.internal:4000/v1")
LITELLM_API_KEY = os.getenv("LITELLM_API_KEY", "dummy")
# Direct Ollama URL — used only by VRAMManager for flush/prewarm/poll
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
@@ -37,10 +42,51 @@ COMPLEX_MODEL = os.getenv("DEEPAGENTS_COMPLEX_MODEL", "qwen3:8b")
SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437")
OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765")
CRAWL4AI_URL = os.getenv("CRAWL4AI_URL", "http://crawl4ai:11235")
ROUTECHECK_URL = os.getenv("ROUTECHECK_URL", "http://routecheck:8090")
ROUTECHECK_TOKEN = os.getenv("ROUTECHECK_TOKEN", "")
MAX_HISTORY_TURNS = 5
_conversation_buffers: dict[str, list] = {}
# ── Interaction logging (RLHF data collection) ─────────────────────────────────
_LOG_DIR = Path(os.getenv("ADOLF_LOG_DIR", "/app/logs"))
_INTERACTIONS_LOG = _LOG_DIR / "interactions.jsonl"
def _ensure_log_dir() -> None:
try:
_LOG_DIR.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"[log] cannot create log dir {_LOG_DIR}: {e}", flush=True)
async def _log_interaction(
session_id: str,
channel: str,
tier: str,
input_text: str,
response_text: str | None,
latency_ms: int,
metadata: dict | None = None,
) -> None:
"""Append one interaction record to the JSONL log for future RLHF/finetuning."""
record = {
"ts": time.time(),
"session_id": session_id,
"channel": channel,
"tier": tier,
"input": input_text,
"output": response_text or "",
"latency_ms": latency_ms,
}
if metadata:
record["metadata"] = metadata
try:
_ensure_log_dir()
with open(_INTERACTIONS_LOG, "a", encoding="utf-8") as f:
f.write(_json_module.dumps(record, ensure_ascii=False) + "\n")
except Exception as e:
print(f"[log] write error: {e}", flush=True)
# Per-session streaming queues — filled during inference, read by /stream/{session_id}
_stream_queues: dict[str, asyncio.Queue] = {}
@@ -88,6 +134,7 @@ async def _fetch_urls_from_message(message: str) -> str:
return "User's message contains URLs. Fetched content:\n\n" + "\n\n".join(parts)
# /no_think at the start of the system prompt disables qwen3 chain-of-thought.
# create_deep_agent prepends our system_prompt before BASE_AGENT_PROMPT, so
# /no_think lands at position 0 and is respected by qwen3 models via Ollama.
@@ -117,6 +164,12 @@ mcp_client = None
_memory_add_tool = None
_memory_search_tool = None
# Fast tools run before the LLM — classifier + context enricher
_fast_tool_runner = FastToolRunner([
WeatherTool(),
CommuteTool(routecheck_url=ROUTECHECK_URL, internal_token=ROUTECHECK_TOKEN),
])
# GPU mutex: one LLM inference at a time
_reply_semaphore = asyncio.Semaphore(1)
@@ -130,31 +183,30 @@ async def lifespan(app: FastAPI):
channels.register_defaults()
# All three models route through Bifrost → Ollama GPU.
# Bifrost adds retry logic, observability, and failover.
# Model names use provider/model format: Bifrost strips the "ollama/" prefix
# before forwarding to Ollama's /v1/chat/completions endpoint.
router_model = ChatOpenAI(
model=f"ollama/{ROUTER_MODEL}",
base_url=BIFROST_URL,
api_key="dummy",
base_url=LITELLM_URL,
api_key=LITELLM_API_KEY,
temperature=0,
timeout=30,
)
embedder = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_API_KEY)
medium_model = ChatOpenAI(
model=f"ollama/{MEDIUM_MODEL}",
base_url=BIFROST_URL,
api_key="dummy",
base_url=LITELLM_URL,
api_key=LITELLM_API_KEY,
timeout=180,
)
complex_model = ChatOpenAI(
model=f"ollama/{COMPLEX_MODEL}",
base_url=BIFROST_URL,
api_key="dummy",
model=COMPLEX_MODEL, # full model name — may be remote (OpenRouter) or local ollama/*
base_url=LITELLM_URL,
api_key=LITELLM_API_KEY,
timeout=600,
)
vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL)
router = Router(model=router_model)
router = Router(model=router_model, embedder=embedder, fast_tool_runner=_fast_tool_runner)
await router.initialize()
mcp_connections = {
"openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"},
@@ -269,8 +321,8 @@ async def lifespan(app: FastAPI):
)
print(
f"[agent] bifrost={BIFROST_URL} | router=ollama/{ROUTER_MODEL} | "
f"medium=ollama/{MEDIUM_MODEL} | complex=ollama/{COMPLEX_MODEL}",
f"[agent] litellm={LITELLM_URL} | router=semantic(ollama/{ROUTER_MODEL}+nomic-embed-text) | "
f"medium=ollama/{MEDIUM_MODEL} | complex={COMPLEX_MODEL}",
flush=True,
)
print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True)
@@ -336,6 +388,12 @@ def _log_messages(result):
# ── memory helpers ─────────────────────────────────────────────────────────────
def _resolve_user_id(session_id: str) -> str:
"""Map any session_id to a canonical user identity for openmemory.
All channels share the same memory pool for the single user."""
return "alvis"
async def _store_memory(session_id: str, user_msg: str, assistant_reply: str) -> None:
"""Store a conversation turn in openmemory (runs as a background task)."""
if _memory_add_tool is None:
@@ -343,7 +401,8 @@ async def _store_memory(session_id: str, user_msg: str, assistant_reply: str) ->
t0 = time.monotonic()
try:
text = f"User: {user_msg}\nAssistant: {assistant_reply}"
await _memory_add_tool.ainvoke({"text": text, "user_id": session_id})
user_id = _resolve_user_id(session_id)
await _memory_add_tool.ainvoke({"text": text, "user_id": user_id})
print(f"[memory] stored in {time.monotonic() - t0:.1f}s", flush=True)
except Exception as e:
print(f"[memory] error: {e}", flush=True)
@@ -354,7 +413,8 @@ async def _retrieve_memories(message: str, session_id: str) -> str:
if _memory_search_tool is None:
return ""
try:
result = await _memory_search_tool.ainvoke({"query": message, "user_id": session_id})
user_id = _resolve_user_id(session_id)
result = await _memory_search_tool.ainvoke({"query": message, "user_id": user_id})
if result and result.strip() and result.strip() != "[]":
return f"Relevant memories:\n{result}"
except Exception:
@@ -362,117 +422,139 @@ async def _retrieve_memories(message: str, session_id: str) -> str:
return ""
# ── core task ──────────────────────────────────────────────────────────────────
# ── core pipeline ──────────────────────────────────────────────────────────────
async def run_agent_task(message: str, session_id: str, channel: str = "telegram"):
print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True)
from typing import AsyncGenerator
force_complex = False
clean_message = message
if message.startswith("/think "):
force_complex = True
clean_message = message[len("/think "):]
print("[agent] /think prefix → force_complex=True", flush=True)
async def _run_agent_pipeline(
message: str,
history: list[dict],
session_id: str,
tier_override: str | None = None,
dry_run: bool = False,
tier_capture: list | None = None,
) -> AsyncGenerator[str, None]:
"""Core pipeline: pre-flight → routing → inference. Yields text chunks.
tier_override: "light" | "medium" | "complex" | None (auto-route)
dry_run: if True and tier=complex, log tier=complex but use medium model (avoids API cost)
Caller is responsible for scheduling _store_memory after consuming all chunks.
"""
async with _reply_semaphore:
t0 = time.monotonic()
history = _conversation_buffers.get(session_id, [])
clean_message = message
print(f"[agent] running: {clean_message[:80]!r}", flush=True)
# Fetch URL content and memories concurrently — both are IO-bound, neither needs GPU
url_context, memories = await asyncio.gather(
# Fetch URL content, memories, and fast-tool context concurrently
url_context, memories, fast_context = await asyncio.gather(
_fetch_urls_from_message(clean_message),
_retrieve_memories(clean_message, session_id),
_fast_tool_runner.run_matching(clean_message),
)
if url_context:
print(f"[agent] crawl4ai: {len(url_context)} chars fetched from message URLs", flush=True)
print(f"[agent] crawl4ai: {len(url_context)} chars fetched", flush=True)
if fast_context:
names = _fast_tool_runner.matching_names(clean_message)
print(f"[agent] fast_tools={names}: {len(fast_context)} chars injected", flush=True)
# Build enriched history: memories + url_context as system context for ALL tiers
# Build enriched history
enriched_history = list(history)
if url_context:
enriched_history = [{"role": "system", "content": url_context}] + enriched_history
if fast_context:
enriched_history = [{"role": "system", "content": fast_context}] + enriched_history
if memories:
enriched_history = [{"role": "system", "content": memories}] + enriched_history
tier, light_reply = await router.route(clean_message, enriched_history, force_complex)
# Messages with URL content must be handled by at least medium tier
if url_context and tier == "light":
tier = "medium"
light_reply = None
print("[agent] URL in message → upgraded light→medium", flush=True)
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
final_text = None
llm_elapsed = 0.0
try:
if tier == "light":
final_text = light_reply
# Short-circuit: fast tool already has the answer
if fast_context and tier_override is None and not url_context:
tier = "fast"
final_text = fast_context
llm_elapsed = time.monotonic() - t0
print(f"[agent] light path: answered by router", flush=True)
await _push_stream_chunk(session_id, final_text)
await _end_stream(session_id)
names = _fast_tool_runner.matching_names(clean_message)
print(f"[agent] tier=fast tools={names} — delivering directly", flush=True)
yield final_text
elif tier == "medium":
system_prompt = MEDIUM_SYSTEM_PROMPT
if memories:
system_prompt = system_prompt + "\n\n" + memories
if url_context:
system_prompt = system_prompt + "\n\n" + url_context
else:
# Determine tier
if tier_override in ("light", "medium", "complex"):
tier = tier_override
light_reply = None
if tier_override == "light":
tier, light_reply = await router.route(clean_message, enriched_history)
tier = "light"
else:
tier, light_reply = await router.route(clean_message, enriched_history)
if url_context and tier == "light":
tier = "medium"
light_reply = None
print("[agent] URL in message → upgraded light→medium", flush=True)
# Stream tokens directly — filter out qwen3 <think> blocks
in_think = False
response_parts = []
async for chunk in medium_model.astream([
{"role": "system", "content": system_prompt},
*history,
{"role": "user", "content": clean_message},
]):
token = chunk.content or ""
if not token:
continue
if in_think:
if "</think>" in token:
in_think = False
after = token.split("</think>", 1)[1]
if after:
await _push_stream_chunk(session_id, after)
response_parts.append(after)
else:
if "<think>" in token:
in_think = True
before = token.split("<think>", 1)[0]
if before:
await _push_stream_chunk(session_id, before)
response_parts.append(before)
else:
await _push_stream_chunk(session_id, token)
response_parts.append(token)
# Dry-run: log as complex but infer with medium (no remote API call)
effective_tier = tier
if dry_run and tier == "complex":
effective_tier = "medium"
print(f"[agent] tier=complex (dry-run) → using medium model, message={clean_message[:60]!r}", flush=True)
else:
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
tier = effective_tier
if tier_capture is not None:
tier_capture.append(tier)
await _end_stream(session_id)
llm_elapsed = time.monotonic() - t0
final_text = "".join(response_parts).strip() or None
if tier == "light":
final_text = light_reply
llm_elapsed = time.monotonic() - t0
print("[agent] light path: answered by router", flush=True)
yield final_text
else: # complex
ok = await vram_manager.enter_complex_mode()
if not ok:
print("[agent] complex→medium fallback (eviction timeout)", flush=True)
tier = "medium"
elif tier == "medium":
system_prompt = MEDIUM_SYSTEM_PROMPT
if memories:
system_prompt = system_prompt + "\n\n" + memories
system_prompt += "\n\n" + memories
if url_context:
system_prompt = system_prompt + "\n\n" + url_context
result = await medium_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
*history,
{"role": "user", "content": clean_message},
]
})
else:
system_prompt += "\n\n" + url_context
if fast_context:
system_prompt += "\n\nLive web search results (use these to answer):\n\n" + fast_context
in_think = False
response_parts = []
async for chunk in medium_model.astream([
{"role": "system", "content": system_prompt},
*history,
{"role": "user", "content": clean_message},
]):
token = chunk.content or ""
if not token:
continue
if in_think:
if "</think>" in token:
in_think = False
after = token.split("</think>", 1)[1]
if after:
yield after
response_parts.append(after)
else:
if "<think>" in token:
in_think = True
before = token.split("<think>", 1)[0]
if before:
yield before
response_parts.append(before)
else:
yield token
response_parts.append(token)
llm_elapsed = time.monotonic() - t0
final_text = "".join(response_parts).strip() or None
else: # complex — remote model, no VRAM management needed
system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id)
if url_context:
system_prompt = system_prompt + "\n\n[Pre-fetched URL content from user's message:]\n" + url_context
system_prompt += "\n\n[Pre-fetched URL content from user's message:]\n" + url_context
result = await complex_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
@@ -480,46 +562,90 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
{"role": "user", "content": clean_message},
]
})
asyncio.create_task(vram_manager.exit_complex_mode())
llm_elapsed = time.monotonic() - t0
_log_messages(result)
final_text = _extract_final_text(result)
if final_text:
await _push_stream_chunk(session_id, final_text)
await _end_stream(session_id)
llm_elapsed = time.monotonic() - t0
_log_messages(result)
final_text = _extract_final_text(result)
if final_text:
yield final_text
except Exception as e:
import traceback
llm_elapsed = time.monotonic() - t0
print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True)
print(f"[agent] error after {llm_elapsed:.1f}s for {session_id}: {e}", flush=True)
traceback.print_exc()
await _end_stream(session_id)
# Deliver reply through the originating channel
print(f"[agent] pipeline done in {time.monotonic() - t0:.1f}s tier={tier if 'tier' in dir() else '?'}", flush=True)
# Store memory as side-effect (non-blocking, best-effort)
if final_text:
t1 = time.monotonic()
asyncio.create_task(_store_memory(session_id, clean_message, final_text))
# ── core task (Telegram / Matrix / CLI wrapper) ─────────────────────────────────
async def run_agent_task(
message: str,
session_id: str,
channel: str = "telegram",
metadata: dict | None = None,
):
print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True)
t0 = time.monotonic()
meta = metadata or {}
dry_run = bool(meta.get("dry_run", False))
is_benchmark = bool(meta.get("benchmark", False))
history = _conversation_buffers.get(session_id, [])
final_text = None
actual_tier = "unknown"
tier_capture: list = []
async for chunk in _run_agent_pipeline(message, history, session_id, dry_run=dry_run, tier_capture=tier_capture):
await _push_stream_chunk(session_id, chunk)
if final_text is None:
final_text = chunk
else:
final_text += chunk
await _end_stream(session_id)
actual_tier = tier_capture[0] if tier_capture else "unknown"
elapsed_ms = int((time.monotonic() - t0) * 1000)
if final_text:
final_text = final_text.strip()
# Skip channel delivery for benchmark sessions (no Telegram spam)
if not is_benchmark:
try:
await channels.deliver(session_id, channel, final_text)
except Exception as e:
print(f"[agent] delivery error (non-fatal): {e}", flush=True)
send_elapsed = time.monotonic() - t1
print(
f"[agent] replied in {time.monotonic() - t0:.1f}s "
f"(llm={llm_elapsed:.1f}s, send={send_elapsed:.1f}s) tier={tier}",
flush=True,
)
print(f"[agent] reply_text: {final_text}", flush=True)
else:
print("[agent] warning: no text reply from agent", flush=True)
# Update conversation buffer and schedule memory storage
if final_text:
buf = _conversation_buffers.get(session_id, [])
buf.append({"role": "user", "content": clean_message})
buf.append({"role": "assistant", "content": final_text})
_conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):]
asyncio.create_task(_store_memory(session_id, clean_message, final_text))
print(f"[agent] replied in {elapsed_ms / 1000:.1f}s tier={actual_tier}", flush=True)
print(f"[agent] reply_text: {final_text}", flush=True)
# Update conversation buffer
buf = _conversation_buffers.get(session_id, [])
buf.append({"role": "user", "content": message})
buf.append({"role": "assistant", "content": final_text})
_conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):]
# Log interaction for RLHF data collection (skip benchmark sessions to avoid noise)
if not is_benchmark:
asyncio.create_task(_log_interaction(
session_id=session_id,
channel=channel,
tier=actual_tier,
input_text=message,
response_text=final_text,
latency_ms=elapsed_ms,
metadata=meta if meta else None,
))
else:
print("[agent] warning: no text reply from agent", flush=True)
# ── endpoints ──────────────────────────────────────────────────────────────────
@@ -531,7 +657,7 @@ async def message(request: InboundMessage, background_tasks: BackgroundTasks):
return JSONResponse(status_code=503, content={"error": "Agent not ready"})
session_id = request.session_id
channel = request.channel
background_tasks.add_task(run_agent_task, request.text, session_id, channel)
background_tasks.add_task(run_agent_task, request.text, session_id, channel, request.metadata)
return JSONResponse(status_code=202, content={"status": "accepted"})
@@ -593,3 +719,96 @@ async def stream_reply(session_id: str):
@app.get("/health")
async def health():
return {"status": "ok", "agent_ready": medium_agent is not None}
# ── OpenAI-compatible API (for OpenWebUI and other clients) ────────────────────
_TIER_MAP = {
"adolf": None,
"adolf-light": "light",
"adolf-medium": "medium",
"adolf-deep": "complex",
}
@app.get("/v1/models")
async def list_models():
return {
"object": "list",
"data": [
{"id": "adolf", "object": "model", "owned_by": "adolf"},
{"id": "adolf-light", "object": "model", "owned_by": "adolf"},
{"id": "adolf-medium", "object": "model", "owned_by": "adolf"},
{"id": "adolf-deep", "object": "model", "owned_by": "adolf"},
],
}
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
if medium_agent is None:
return JSONResponse(status_code=503, content={"error": {"message": "Agent not ready", "type": "server_error"}})
body = await request.json()
model = body.get("model", "adolf")
messages = body.get("messages", [])
stream = body.get("stream", True)
# Extract current user message and history
user_messages = [m for m in messages if m.get("role") == "user"]
if not user_messages:
return JSONResponse(status_code=400, content={"error": {"message": "No user message", "type": "invalid_request_error"}})
current_message = user_messages[-1]["content"]
# History = everything before the last user message (excluding system messages from OpenWebUI)
last_user_idx = len(messages) - 1 - next(
i for i, m in enumerate(reversed(messages)) if m.get("role") == "user"
)
history = [m for m in messages[:last_user_idx] if m.get("role") in ("user", "assistant")]
session_id = request.headers.get("X-Session-Id", "owui-default")
tier_override = _TIER_MAP.get(model)
import json as _json
import uuid as _uuid
response_id = f"chatcmpl-{_uuid.uuid4().hex[:12]}"
if stream:
async def event_stream():
# Opening chunk with role
opening = {
"id": response_id, "object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}]
}
yield f"data: {_json.dumps(opening)}\n\n"
async for chunk in _run_agent_pipeline(current_message, history, session_id, tier_override):
data = {
"id": response_id, "object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": {"content": chunk}, "finish_reason": None}]
}
yield f"data: {_json.dumps(data)}\n\n"
# Final chunk
final = {
"id": response_id, "object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
}
yield f"data: {_json.dumps(final)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
else:
# Non-streaming: collect all chunks
parts = []
async for chunk in _run_agent_pipeline(current_message, history, session_id, tier_override):
if chunk:
parts.append(chunk)
full_text = "".join(parts).strip()
return {
"id": response_id, "object": "chat.completion",
"choices": [{"index": 0, "message": {"role": "assistant", "content": full_text}, "finish_reason": "stop"}],
"model": model,
}

318
benchmarks/run_benchmark.py Normal file
View File

@@ -0,0 +1,318 @@
#!/usr/bin/env python3
"""
Adolf routing benchmark.
Sends each query to Adolf's /message endpoint, waits briefly for the routing
decision to appear in docker logs, then records the actual tier.
Usage:
python3 run_benchmark.py [options]
python3 run_benchmark.py --tier light|medium|complex
python3 run_benchmark.py --category <name>
python3 run_benchmark.py --ids 1,2,3
python3 run_benchmark.py --list-categories
python3 run_benchmark.py --dry-run # complex queries use medium model (no API cost)
IMPORTANT: Always check GPU is free before running. This script does it automatically.
Adolf must be running at http://localhost:8000.
"""
import argparse
import asyncio
import json
import re
import subprocess
import sys
import time
from pathlib import Path
import httpx
ADOLF_URL = "http://localhost:8000"
OLLAMA_URL = "http://localhost:11436" # GPU Ollama
DATASET = Path(__file__).parent / "benchmark.json"
RESULTS = Path(__file__).parent / "results_latest.json"
# Max time to wait for each query to fully complete via SSE stream
QUERY_TIMEOUT = 300 # seconds — generous to handle GPU semaphore waits
# Memory thresholds
MIN_FREE_RAM_MB = 1500 # abort if less than this is free
MIN_FREE_VRAM_MB = 500 # warn if less than this is free on GPU
# ── Pre-flight checks ──────────────────────────────────────────────────────────
def check_ram() -> tuple[bool, str]:
"""Check available system RAM. Returns (ok, message)."""
try:
with open("/proc/meminfo") as f:
info = {}
for line in f:
parts = line.split()
if len(parts) >= 2:
info[parts[0].rstrip(":")] = int(parts[1])
free_mb = (info.get("MemAvailable", 0)) // 1024
total_mb = info.get("MemTotal", 0) // 1024
msg = f"RAM: {free_mb} MB free / {total_mb} MB total"
if free_mb < MIN_FREE_RAM_MB:
return False, f"CRITICAL: {msg} — need at least {MIN_FREE_RAM_MB} MB free"
return True, msg
except Exception as e:
return True, f"RAM check failed (non-fatal): {e}"
def check_gpu() -> tuple[bool, str]:
"""Check GPU VRAM via Ollama /api/ps. Returns (ok, message)."""
try:
r = httpx.get(f"{OLLAMA_URL}/api/ps", timeout=5)
r.raise_for_status()
data = r.json()
models = data.get("models", [])
if models:
names = [m.get("name", "?") for m in models]
sizes_mb = [m.get("size_vram", 0) // (1024 * 1024) for m in models]
loaded = ", ".join(f"{n} ({s}MB)" for n, s in zip(names, sizes_mb))
total_vram = sum(sizes_mb)
if total_vram > 7000:
return False, f"GPU BUSY: models loaded = {loaded} — total VRAM used {total_vram}MB. Wait for models to unload."
return True, f"GPU: models loaded = {loaded} (total {total_vram}MB VRAM)"
return True, "GPU: idle (no models loaded)"
except httpx.ConnectError:
return True, "GPU check skipped (Ollama not reachable at localhost:11436)"
except Exception as e:
return True, f"GPU check failed (non-fatal): {e}"
def preflight_checks(skip_gpu_check: bool = False) -> bool:
"""Run all pre-flight checks. Returns True if safe to proceed."""
print("\n── Pre-flight checks ──────────────────────────────────────────")
ram_ok, ram_msg = check_ram()
print(f" {'' if ram_ok else ''} {ram_msg}")
if not ram_ok:
print("\nABORTING: not enough RAM. Free up memory before running benchmark.")
return False
if not skip_gpu_check:
gpu_ok, gpu_msg = check_gpu()
print(f" {'' if gpu_ok else ''} {gpu_msg}")
if not gpu_ok:
print("\nABORTING: GPU is busy. Wait for current inference to finish, then retry.")
return False
print(" All checks passed.\n")
return True
# ── Log helpers ────────────────────────────────────────────────────────────────
def get_log_tail(n: int = 50) -> str:
result = subprocess.run(
["docker", "logs", "deepagents", "--tail", str(n)],
capture_output=True, text=True,
)
return result.stdout + result.stderr
def extract_tier_from_logs(logs_before: str, logs_after: str) -> str | None:
"""Find new tier= lines that appeared after we sent the query."""
before_lines = set(logs_before.splitlines())
new_lines = [l for l in logs_after.splitlines() if l not in before_lines]
for line in reversed(new_lines):
m = re.search(r"tier=(\w+(?:\s*\(dry-run\))?)", line)
if m:
tier_raw = m.group(1)
# Normalise: "complex (dry-run)" → "complex"
return tier_raw.split()[0]
return None
# ── Request helpers ────────────────────────────────────────────────────────────
async def post_message(
client: httpx.AsyncClient,
query_id: int,
query: str,
dry_run: bool = False,
) -> bool:
payload = {
"text": query,
"session_id": f"benchmark-{query_id}",
"channel": "cli",
"user_id": "benchmark",
"metadata": {"dry_run": dry_run, "benchmark": True},
}
try:
r = await client.post(f"{ADOLF_URL}/message", json=payload, timeout=10)
r.raise_for_status()
return True
except Exception as e:
print(f" POST_ERROR: {e}", end="")
return False
# ── Dataset ────────────────────────────────────────────────────────────────────
def load_dataset() -> list[dict]:
with open(DATASET) as f:
return json.load(f)["queries"]
def filter_queries(queries, tier, category, ids):
if tier:
queries = [q for q in queries if q["tier"] == tier]
if category:
queries = [q for q in queries if q["category"] == category]
if ids:
queries = [q for q in queries if q["id"] in ids]
return queries
# ── Main run ───────────────────────────────────────────────────────────────────
async def run(queries: list[dict], dry_run: bool = False) -> list[dict]:
results = []
async with httpx.AsyncClient() as client:
try:
r = await client.get(f"{ADOLF_URL}/health", timeout=5)
r.raise_for_status()
except Exception as e:
print(f"ERROR: Adolf not reachable: {e}", file=sys.stderr)
sys.exit(1)
total = len(queries)
correct = 0
dry_label = " [DRY-RUN: complex→medium]" if dry_run else ""
print(f"\nRunning {total} queries{dry_label}\n")
print(f"{'ID':>3} {'EXPECTED':8} {'ACTUAL':8} {'OK':3} {'TIME':6} {'CATEGORY':22} QUERY")
print("" * 110)
for q in queries:
qid = q["id"]
expected = q["tier"]
category = q["category"]
query_text = q["query"]
# In dry-run, complex queries still use complex classification (logged), but medium infers
send_dry = dry_run and expected == "complex"
session_id = f"benchmark-{qid}"
print(f"{qid:>3} {expected:8} ", end="", flush=True)
logs_before = get_log_tail(80)
t0 = time.monotonic()
ok_post = await post_message(client, qid, query_text, dry_run=send_dry)
if not ok_post:
print(f"{'?':8} {'ERR':3} {'?':6} {category:22} {query_text[:40]}")
results.append({"id": qid, "expected": expected, "actual": None, "ok": False})
continue
# Wait for query to complete via SSE stream (handles GPU semaphore waits)
try:
async with client.stream(
"GET", f"{ADOLF_URL}/stream/{session_id}", timeout=QUERY_TIMEOUT
) as sse:
async for line in sse.aiter_lines():
if "data: [DONE]" in line:
break
except Exception:
pass # timeout or connection issue — check logs anyway
# Now the query is done — check logs for tier
await asyncio.sleep(0.3)
logs_after = get_log_tail(80)
actual = extract_tier_from_logs(logs_before, logs_after)
elapsed = time.monotonic() - t0
match = actual == expected or (actual == "fast" and expected == "medium")
if match:
correct += 1
mark = "" if match else ""
actual_str = actual or "?"
print(f"{actual_str:8} {mark:3} {elapsed:5.1f}s {category:22} {query_text[:40]}")
results.append({
"id": qid,
"expected": expected,
"actual": actual_str,
"ok": match,
"elapsed": round(elapsed, 1),
"category": category,
"query": query_text,
"dry_run": send_dry,
})
print("" * 110)
accuracy = correct / total * 100 if total else 0
print(f"\nAccuracy: {correct}/{total} ({accuracy:.0f}%)")
for tier_name in ["light", "medium", "complex"]:
tier_qs = [r for r in results if r["expected"] == tier_name]
if tier_qs:
tier_ok = sum(1 for r in tier_qs if r["ok"])
print(f" {tier_name:8}: {tier_ok}/{len(tier_qs)}")
wrong = [r for r in results if not r["ok"]]
if wrong:
print(f"\nMisclassified ({len(wrong)}):")
for r in wrong:
print(f" id={r['id']:3} expected={r['expected']:8} actual={r['actual']:8} {r['query'][:60]}")
with open(RESULTS, "w") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\nResults saved to {RESULTS}")
return results
def main():
parser = argparse.ArgumentParser(
description="Adolf routing benchmark",
epilog="IMPORTANT: Always check GPU is free before running. This is done automatically."
)
parser.add_argument("--tier", choices=["light", "medium", "complex"])
parser.add_argument("--category")
parser.add_argument("--ids", help="Comma-separated IDs")
parser.add_argument("--list-categories", action="store_true")
parser.add_argument(
"--dry-run",
action="store_true",
help="For complex queries: route classification is tested but medium model is used for inference (no API cost)",
)
parser.add_argument(
"--skip-gpu-check",
action="store_true",
help="Skip GPU availability check (use only if you know GPU is free)",
)
args = parser.parse_args()
queries = load_dataset()
if args.list_categories:
cats = sorted(set(q["category"] for q in queries))
tiers = {t: sum(1 for q in queries if q["tier"] == t) for t in ["light", "medium", "complex"]}
print(f"Total: {len(queries)} | Tiers: {tiers}")
print(f"Categories: {cats}")
return
# ALWAYS check GPU and RAM before running
if not preflight_checks(skip_gpu_check=args.skip_gpu_check):
sys.exit(1)
ids = [int(i) for i in args.ids.split(",")] if args.ids else None
queries = filter_queries(queries, args.tier, args.category, ids)
if not queries:
print("No queries match filters.")
sys.exit(1)
asyncio.run(run(queries, dry_run=args.dry_run))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,426 @@
#!/usr/bin/env python3
"""
Adolf voice benchmark.
Pipeline for each query:
1. Synthesize query text → WAV via Silero TTS (localhost:8881)
2. Transcribe WAV → text via faster-whisper STT (localhost:8880)
3. Send transcription to Adolf → check routing tier
4. Report: WER per query, routing accuracy vs text baseline
Usage:
python3 run_voice_benchmark.py [options]
python3 run_voice_benchmark.py --tier light|medium|complex
python3 run_voice_benchmark.py --ids 1,2,3
python3 run_voice_benchmark.py --dry-run # complex queries use medium model
IMPORTANT: Always check GPU is free before running. Done automatically.
Services required:
- Adolf: http://localhost:8000
- Silero TTS: http://localhost:8881 (openai/silero-tts container)
- faster-whisper: http://localhost:8880 (faster-whisper container)
"""
import argparse
import asyncio
import io
import json
import re
import subprocess
import sys
import tempfile
import time
import unicodedata
from pathlib import Path
import httpx
ADOLF_URL = "http://localhost:8000"
OLLAMA_URL = "http://localhost:11436"
TTS_URL = "http://localhost:8881" # Silero TTS — OpenAI-compatible /v1/audio/speech
STT_URL = "http://localhost:8880" # faster-whisper — OpenAI-compatible /v1/audio/transcriptions
DATASET = Path(__file__).parent / "benchmark.json"
RESULTS_DIR = Path(__file__).parent
TIER_WAIT = 15 # seconds to wait for tier= in docker logs
MIN_FREE_RAM_MB = 1500
MIN_FREE_VRAM_MB = 500
# ── Pre-flight ─────────────────────────────────────────────────────────────────
def check_ram() -> tuple[bool, str]:
try:
with open("/proc/meminfo") as f:
info = {}
for line in f:
parts = line.split()
if len(parts) >= 2:
info[parts[0].rstrip(":")] = int(parts[1])
free_mb = info.get("MemAvailable", 0) // 1024
total_mb = info.get("MemTotal", 0) // 1024
msg = f"RAM: {free_mb} MB free / {total_mb} MB total"
if free_mb < MIN_FREE_RAM_MB:
return False, f"CRITICAL: {msg} — need at least {MIN_FREE_RAM_MB} MB free"
return True, msg
except Exception as e:
return True, f"RAM check failed (non-fatal): {e}"
def check_gpu() -> tuple[bool, str]:
try:
r = httpx.get(f"{OLLAMA_URL}/api/ps", timeout=5)
r.raise_for_status()
data = r.json()
models = data.get("models", [])
if models:
names = [m.get("name", "?") for m in models]
sizes_mb = [m.get("size_vram", 0) // (1024 * 1024) for m in models]
loaded = ", ".join(f"{n} ({s}MB)" for n, s in zip(names, sizes_mb))
total_vram = sum(sizes_mb)
if total_vram > 7000:
return False, f"GPU BUSY: {loaded}{total_vram}MB VRAM used. Wait for models to unload."
return True, f"GPU: {loaded} ({total_vram}MB VRAM)"
return True, "GPU: idle"
except httpx.ConnectError:
return True, "GPU check skipped (Ollama not reachable)"
except Exception as e:
return True, f"GPU check failed (non-fatal): {e}"
def check_services() -> tuple[bool, str]:
"""Check TTS and STT are reachable."""
msgs = []
ok = True
for name, url, path in [("TTS", TTS_URL, "/"), ("STT", STT_URL, "/")]:
try:
r = httpx.get(url + path, timeout=5)
msgs.append(f"{name}: reachable (HTTP {r.status_code})")
except Exception as e:
msgs.append(f"{name}: NOT REACHABLE — {e}")
ok = False
return ok, " | ".join(msgs)
def preflight_checks(skip_gpu_check: bool = False) -> bool:
print("\n── Pre-flight checks ──────────────────────────────────────────")
ram_ok, ram_msg = check_ram()
print(f" {'' if ram_ok else ''} {ram_msg}")
if not ram_ok:
print("\nABORTING: not enough RAM.")
return False
if not skip_gpu_check:
gpu_ok, gpu_msg = check_gpu()
print(f" {'' if gpu_ok else ''} {gpu_msg}")
if not gpu_ok:
print("\nABORTING: GPU is busy.")
return False
svc_ok, svc_msg = check_services()
print(f" {'' if svc_ok else ''} {svc_msg}")
if not svc_ok:
print("\nABORTING: required voice services not running.")
print("Start them with: cd /home/alvis/agap_git/openai && docker compose up -d faster-whisper silero-tts")
return False
print(" All checks passed.\n")
return True
# ── TTS ────────────────────────────────────────────────────────────────────────
async def synthesize(client: httpx.AsyncClient, text: str) -> bytes | None:
"""Synthesize text to WAV via Silero TTS (OpenAI-compatible /v1/audio/speech)."""
try:
r = await client.post(
f"{TTS_URL}/v1/audio/speech",
json={"model": "tts-1", "input": text, "voice": "alloy", "response_format": "wav"},
timeout=30,
)
r.raise_for_status()
return r.content
except Exception as e:
print(f"\n [TTS error: {e}]", end="")
return None
# ── STT ────────────────────────────────────────────────────────────────────────
async def transcribe(client: httpx.AsyncClient, wav_bytes: bytes) -> str | None:
"""Transcribe WAV to text via faster-whisper (OpenAI-compatible /v1/audio/transcriptions)."""
try:
files = {"file": ("audio.wav", wav_bytes, "audio/wav")}
data = {"model": "whisper-1", "language": "ru", "response_format": "json"}
r = await client.post(
f"{STT_URL}/v1/audio/transcriptions",
files=files,
data=data,
timeout=60,
)
r.raise_for_status()
result = r.json()
return result.get("text", "").strip()
except Exception as e:
print(f"\n [STT error: {e}]", end="")
return None
# ── WER ────────────────────────────────────────────────────────────────────────
def normalize(text: str) -> str:
"""Lowercase, strip punctuation, normalize unicode for WER calculation."""
text = unicodedata.normalize("NFC", text.lower())
text = re.sub(r"[^\w\s]", " ", text)
return re.sub(r"\s+", " ", text).strip()
def word_error_rate(reference: str, hypothesis: str) -> float:
"""Compute WER between reference and hypothesis."""
ref = normalize(reference).split()
hyp = normalize(hypothesis).split()
if not ref:
return 0.0 if not hyp else 1.0
# Dynamic programming edit distance
d = [[0] * (len(hyp) + 1) for _ in range(len(ref) + 1)]
for i in range(len(ref) + 1):
d[i][0] = i
for j in range(len(hyp) + 1):
d[0][j] = j
for i in range(1, len(ref) + 1):
for j in range(1, len(hyp) + 1):
if ref[i - 1] == hyp[j - 1]:
d[i][j] = d[i - 1][j - 1]
else:
d[i][j] = 1 + min(d[i - 1][j], d[i][j - 1], d[i - 1][j - 1])
return d[len(ref)][len(hyp)] / len(ref)
# ── Adolf interaction ──────────────────────────────────────────────────────────
def get_log_tail(n: int = 60) -> str:
result = subprocess.run(
["docker", "logs", "deepagents", "--tail", str(n)],
capture_output=True, text=True,
)
return result.stdout + result.stderr
def extract_tier_from_logs(logs_before: str, logs_after: str) -> str | None:
before_lines = set(logs_before.splitlines())
new_lines = [l for l in logs_after.splitlines() if l not in before_lines]
for line in reversed(new_lines):
m = re.search(r"tier=(\w+(?:\s*\(dry-run\))?)", line)
if m:
return m.group(1).split()[0]
return None
async def post_to_adolf(
client: httpx.AsyncClient,
query_id: int,
text: str,
dry_run: bool = False,
) -> bool:
payload = {
"text": text,
"session_id": f"voice-bench-{query_id}",
"channel": "cli",
"user_id": "benchmark",
"metadata": {"dry_run": dry_run, "benchmark": True, "voice": True},
}
try:
r = await client.post(f"{ADOLF_URL}/message", json=payload, timeout=10)
r.raise_for_status()
return True
except Exception as e:
print(f"\n [Adolf error: {e}]", end="")
return False
# ── Dataset ────────────────────────────────────────────────────────────────────
def load_dataset() -> list[dict]:
with open(DATASET) as f:
return json.load(f)["queries"]
def filter_queries(queries, tier, category, ids):
if tier:
queries = [q for q in queries if q["tier"] == tier]
if category:
queries = [q for q in queries if q["category"] == category]
if ids:
queries = [q for q in queries if q["id"] in ids]
return queries
# ── Main run ───────────────────────────────────────────────────────────────────
async def run(queries: list[dict], dry_run: bool = False, save_audio: bool = False) -> None:
async with httpx.AsyncClient() as client:
# Check Adolf
try:
r = await client.get(f"{ADOLF_URL}/health", timeout=5)
r.raise_for_status()
except Exception as e:
print(f"ERROR: Adolf not reachable: {e}", file=sys.stderr)
sys.exit(1)
total = len(queries)
results = []
dry_label = " [DRY-RUN]" if dry_run else ""
print(f"Voice benchmark: {total} queries{dry_label}\n")
print(f"{'ID':>3} {'EXP':8} {'ACT':8} {'OK':3} {'WER':5} {'TRANSCRIPT'}")
print("" * 100)
total_wer = 0.0
wer_count = 0
correct = 0
for q in queries:
qid = q["id"]
expected = q["tier"]
original = q["query"]
print(f"{qid:>3} {expected:8} ", end="", flush=True)
# Step 1: TTS
wav = await synthesize(client, original)
if wav is None:
print(f"{'?':8} {'ERR':3} {'?':5} [TTS failed]")
results.append({"id": qid, "expected": expected, "actual": None, "ok": False, "wer": None, "error": "tts"})
continue
if save_audio:
audio_path = RESULTS_DIR / f"voice_audio" / f"{qid}.wav"
audio_path.parent.mkdir(exist_ok=True)
audio_path.write_bytes(wav)
# Step 2: STT
transcript = await transcribe(client, wav)
if transcript is None:
print(f"{'?':8} {'ERR':3} {'?':5} [STT failed]")
results.append({"id": qid, "expected": expected, "actual": None, "ok": False, "wer": None, "error": "stt"})
continue
# Calculate WER
wer = word_error_rate(original, transcript)
total_wer += wer
wer_count += 1
# Step 3: Send to Adolf
send_dry = dry_run and expected == "complex"
logs_before = get_log_tail(60)
t0 = time.monotonic()
ok_post = await post_to_adolf(client, qid, transcript, dry_run=send_dry)
if not ok_post:
print(f"{'?':8} {'ERR':3} {wer:4.2f} {transcript[:50]}")
results.append({"id": qid, "expected": expected, "actual": None, "ok": False, "wer": wer, "transcript": transcript})
continue
# Step 4: Wait for routing decision
actual = None
for _ in range(TIER_WAIT * 2):
await asyncio.sleep(0.5)
logs_after = get_log_tail(60)
actual = extract_tier_from_logs(logs_before, logs_after)
if actual and actual in ("light", "medium", "complex", "fast"):
break
elapsed = time.monotonic() - t0
match = actual == expected or (actual == "fast" and expected == "medium")
if match:
correct += 1
mark = "" if match else ""
actual_str = actual or "?"
print(f"{actual_str:8} {mark:3} {wer:4.2f} {transcript[:60]}")
results.append({
"id": qid,
"expected": expected,
"actual": actual_str,
"ok": match,
"wer": round(wer, 3),
"original": original,
"transcript": transcript,
"elapsed": round(elapsed, 1),
"dry_run": send_dry,
})
await asyncio.sleep(0.5)
print("" * 100)
# Summary
accuracy = correct / total * 100 if total else 0
avg_wer = total_wer / wer_count * 100 if wer_count else 0
print(f"\nRouting accuracy: {correct}/{total} ({accuracy:.0f}%)")
print(f"Average WER: {avg_wer:.1f}% (lower is better; 0% = perfect transcription)")
for tier_name in ["light", "medium", "complex"]:
tier_qs = [r for r in results if r["expected"] == tier_name]
if tier_qs:
tier_ok = sum(1 for r in tier_qs if r["ok"])
tier_wers = [r["wer"] for r in tier_qs if r.get("wer") is not None]
avg = sum(tier_wers) / len(tier_wers) * 100 if tier_wers else 0
print(f" {tier_name:8}: routing {tier_ok}/{len(tier_qs)} avg WER {avg:.1f}%")
wrong = [r for r in results if not r["ok"]]
if wrong:
print(f"\nMisclassified after voice ({len(wrong)}):")
for r in wrong:
print(f" id={r['id']:3} expected={r.get('expected','?'):8} actual={r.get('actual','?'):8} transcript={r.get('transcript','')[:50]}")
high_wer = [r for r in results if r.get("wer") and r["wer"] > 0.3]
if high_wer:
print(f"\nHigh WER queries (>30%) — transcription quality issues:")
for r in high_wer:
print(f" id={r['id']:3} WER={r['wer']*100:.0f}% original: {r.get('original','')[:50]}")
print(f" transcript: {r.get('transcript','')[:50]}")
# Save results
ts = int(time.time())
out_path = RESULTS_DIR / f"voice_results_{ts}.json"
latest_path = RESULTS_DIR / "voice_results_latest.json"
with open(out_path, "w") as f:
json.dump({"summary": {"accuracy": accuracy, "avg_wer": avg_wer, "total": total}, "results": results}, f, indent=2, ensure_ascii=False)
with open(latest_path, "w") as f:
json.dump({"summary": {"accuracy": accuracy, "avg_wer": avg_wer, "total": total}, "results": results}, f, indent=2, ensure_ascii=False)
print(f"\nResults saved to {latest_path}")
def main():
parser = argparse.ArgumentParser(
description="Adolf voice benchmark — TTS→STT→routing pipeline",
epilog="Requires: Silero TTS (port 8881) and faster-whisper (port 8880) running."
)
parser.add_argument("--tier", choices=["light", "medium", "complex"])
parser.add_argument("--category")
parser.add_argument("--ids", help="Comma-separated IDs")
parser.add_argument("--dry-run", action="store_true",
help="Complex queries use medium model for inference (no API cost)")
parser.add_argument("--save-audio", action="store_true",
help="Save synthesized WAV files to voice_audio/ directory")
parser.add_argument("--skip-gpu-check", action="store_true")
args = parser.parse_args()
if not preflight_checks(skip_gpu_check=args.skip_gpu_check):
sys.exit(1)
queries = load_dataset()
ids = [int(i) for i in args.ids.split(",")] if args.ids else None
queries = filter_queries(queries, args.tier, args.category, ids)
if not queries:
print("No queries match filters.")
sys.exit(1)
asyncio.run(run(queries, dry_run=args.dry_run, save_audio=args.save_audio))
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,21 @@
{
"auth_config": {
"is_enabled": true,
"admin_username": "admin",
"admin_password": "env.BIFROST_ADMIN_PASSWORD"
},
"config_store": {
"enabled": true,
"type": "postgres",
"config": {
"host": "bifrost-db",
"port": "5432",
"user": "bifrost",
"password": "bifrost",
"db_name": "bifrost",
"ssl_mode": "disable"
}
},
"client": {
"drop_excess_requests": false
},

View File

@@ -49,6 +49,7 @@ async def deliver(session_id: str, channel: str, text: str) -> None:
# ── built-in channel adapters ─────────────────────────────────────────────────
GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001")
MATRIX_URL = os.getenv("MATRIX_URL", "http://matrix:3002")
async def _telegram_send(session_id: str, text: str) -> None:
@@ -64,12 +65,26 @@ async def _telegram_send(session_id: str, text: str) -> None:
)
async def _matrix_send(session_id: str, text: str) -> None:
"""Send reply to Matrix via the matrix adapter POST /send endpoint."""
room_id = session_id.removeprefix("mx-")
MAX_MTX = 4000
chunks = [text[i:i + MAX_MTX] for i in range(0, len(text), MAX_MTX)]
async with httpx.AsyncClient(timeout=15) as client:
for chunk in chunks:
await client.post(
f"{MATRIX_URL}/send",
json={"room_id": room_id, "text": chunk},
)
async def _cli_send(session_id: str, text: str) -> None:
"""CLI replies are handled entirely through the pending_replies queue — no-op here."""
pass
def register_defaults() -> None:
"""Register the built-in Telegram and CLI channel adapters."""
"""Register the built-in Telegram, Matrix, and CLI channel adapters."""
register("telegram", _telegram_send)
register("matrix", _matrix_send)
register("cli", _cli_send)

View File

@@ -1,19 +1,4 @@
services:
bifrost:
image: maximhq/bifrost
container_name: bifrost
ports:
- "8080:8080"
volumes:
- ./bifrost-config.json:/app/data/config.json:ro
environment:
- APP_DIR=/app/data
- APP_PORT=8080
- LOG_LEVEL=info
extra_hosts:
- "host.docker.internal:host-gateway"
restart: unless-stopped
deepagents:
build: .
container_name: deepagents
@@ -21,23 +6,29 @@ services:
- "8000:8000"
environment:
- PYTHONUNBUFFERED=1
# Bifrost gateway — all LLM inference goes through here
- BIFROST_URL=http://bifrost:8080/v1
# LiteLLM proxy — all LLM inference goes through here
- LITELLM_URL=http://host.docker.internal:4000/v1
- LITELLM_API_KEY=sk-fjQC1BxAiGFSMs
# Direct Ollama GPU URL — used only by VRAMManager for flush/prewarm
- OLLAMA_BASE_URL=http://host.docker.internal:11436
- DEEPAGENTS_MODEL=qwen3:4b
- DEEPAGENTS_COMPLEX_MODEL=qwen3:8b
- DEEPAGENTS_COMPLEX_MODEL=deepseek/deepseek-r1:free
- DEEPAGENTS_ROUTER_MODEL=qwen2.5:1.5b
- SEARXNG_URL=http://host.docker.internal:11437
- GRAMMY_URL=http://grammy:3001
- MATRIX_URL=http://host.docker.internal:3002
- CRAWL4AI_URL=http://crawl4ai:11235
- ROUTECHECK_URL=http://routecheck:8090
- ROUTECHECK_TOKEN=${ROUTECHECK_TOKEN}
volumes:
- ./logs:/app/logs
extra_hosts:
- "host.docker.internal:host-gateway"
depends_on:
- openmemory
- grammy
- crawl4ai
- bifrost
- routecheck
restart: unless-stopped
openmemory:
@@ -79,6 +70,19 @@ services:
profiles:
- tools
routecheck:
build: ./routecheck
container_name: routecheck
ports:
- "8090:8090"
environment:
- YANDEX_ROUTING_KEY=${YANDEX_ROUTING_KEY}
- INTERNAL_TOKEN=${ROUTECHECK_TOKEN}
- HTTPS_PROXY=http://host.docker.internal:56928
extra_hosts:
- "host.docker.internal:host-gateway"
restart: unless-stopped
crawl4ai:
image: unclecode/crawl4ai:latest
container_name: crawl4ai

188
fast_tools.py Normal file
View File

@@ -0,0 +1,188 @@
"""
Fast Tools — pre-flight tools invoked by a classifier before the main LLM call.
Each FastTool has:
- matches(message) → bool : regex classifier that determines if this tool applies
- run(message) → str : async fetch that returns enrichment context
FastToolRunner holds a list of FastTools. The Router uses any_matches() to force
the tier to medium before LLM classification. run_agent_task() calls run_matching()
to build extra context that is injected into the system prompt.
To add a new fast tool:
1. Subclass FastTool, implement name/matches/run
2. Add an instance to the list passed to FastToolRunner in agent.py
"""
import asyncio
import re
from abc import ABC, abstractmethod
import httpx
class FastTool(ABC):
"""Base class for all pre-flight fast tools."""
@property
@abstractmethod
def name(self) -> str: ...
@abstractmethod
def matches(self, message: str) -> bool: ...
@abstractmethod
async def run(self, message: str) -> str: ...
_WMO_CODES = {
0: "clear sky", 1: "mainly clear", 2: "partly cloudy", 3: "overcast",
45: "fog", 48: "icy fog",
51: "light drizzle", 53: "drizzle", 55: "heavy drizzle",
61: "light rain", 63: "rain", 65: "heavy rain",
71: "light snow", 73: "snow", 75: "heavy snow", 77: "snow grains",
80: "light showers", 81: "showers", 82: "heavy showers",
85: "snow showers", 86: "heavy snow showers",
95: "thunderstorm", 96: "thunderstorm with hail", 99: "thunderstorm with heavy hail",
}
class WeatherTool(FastTool):
"""
Fetches current weather for Balashikha, Moscow region directly from open-meteo.com.
No API key required. Returns a ready-to-deliver reply — no LLM reformatting needed.
"""
_PATTERN = re.compile(
r"\b(weather|forecast|temperature|rain(ing)?|snow(ing)?|humidity|wind\s*speed"
r"|холодно|тепло|погода|прогноз погоды"
r"|how (hot|cold|warm) is it|what.?s the (weather|temp)|dress for the weather)\b",
re.IGNORECASE,
)
_URL = (
"https://api.open-meteo.com/v1/forecast"
"?latitude=55.7963&longitude=37.9382"
"&current=temperature_2m,apparent_temperature,relative_humidity_2m"
",wind_speed_10m,weather_code"
"&wind_speed_unit=ms"
)
@property
def name(self) -> str:
return "weather"
def matches(self, message: str) -> bool:
return bool(self._PATTERN.search(message))
async def run(self, message: str) -> str:
try:
async with httpx.AsyncClient(timeout=10) as client:
r = await client.get(self._URL)
r.raise_for_status()
c = r.json()["current"]
except Exception as e:
return f"[weather error: {e}]"
temp = c["temperature_2m"]
feels = c["apparent_temperature"]
humidity = c["relative_humidity_2m"]
wind = c["wind_speed_10m"]
condition = _WMO_CODES.get(c.get("weather_code", 0), "unknown")
return (
f"Balashikha: {condition}, {temp:+.0f}°C (feels like {feels:+.0f}°C), "
f"wind {wind:.1f} m/s, humidity {humidity}%."
)
class CommuteTool(FastTool):
"""
Returns real-time driving time from home (Balashikha) to a destination
using Yandex traffic data via the local routecheck service.
Triggered by queries about commute time, arrival, or road traffic.
The routecheck service handles Yandex API auth and the HTTPS proxy.
"""
_PATTERN = re.compile(
r"\b(commute|arrival time|how long.{0,20}(drive|get|travel|reach)"
r"|сколько.{0,20}(ехать|добираться|минут)"
r"|пробки|traffic|road.{0,10}now|drive to (work|office|center|москва|moscow)"
r"|when (will i|do i) (arrive|get there|reach))\b",
re.IGNORECASE,
)
# Home: Balashikha. Default destination: Moscow city center.
_HOME = "55.7963,37.9382"
_DEST = "55.7558,37.6173"
def __init__(self, routecheck_url: str, internal_token: str):
self._url = routecheck_url.rstrip("/")
self._token = internal_token
@property
def name(self) -> str:
return "commute"
def matches(self, message: str) -> bool:
return bool(self._PATTERN.search(message))
async def run(self, message: str) -> str:
if not self._token:
return "[commute: ROUTECHECK_TOKEN not configured]"
try:
async with httpx.AsyncClient(timeout=15) as client:
r = await client.get(
f"{self._url}/api/route",
params={"from": self._HOME, "to": self._DEST, "token": self._token},
)
r.raise_for_status()
d = r.json()
except Exception as e:
return f"[commute error: {e}]"
traffic = d["duration_traffic_min"]
normal = d["duration_min"]
dist = d["distance_km"]
delay = traffic - normal
lines = [
f"Current drive time from Balashikha to Moscow center:",
f" With traffic: {traffic} min",
f" Without traffic: {normal} min",
f" Distance: {dist} km",
]
if delay > 5:
lines.append(f" Traffic delay: +{delay} min")
return "\n".join(lines)
class FastToolRunner:
"""
Classifier + executor for fast tools.
Used in two places:
- Router.route(): any_matches() forces medium tier before LLM classification
- run_agent_task(): run_matching() builds enrichment context in the pre-flight gather
"""
def __init__(self, tools: list[FastTool]):
self._tools = tools
def any_matches(self, message: str) -> bool:
"""True if any fast tool applies to this message."""
return any(t.matches(message) for t in self._tools)
def matching_names(self, message: str) -> list[str]:
"""Names of tools that match this message (for logging)."""
return [t.name for t in self._tools if t.matches(message)]
async def run_matching(self, message: str) -> str:
"""Run all matching tools concurrently and return combined context."""
matching = [t for t in self._tools if t.matches(message)]
if not matching:
return ""
results = await asyncio.gather(*[t.run(message) for t in matching])
parts = [r for r in results if r and not r.startswith("[")]
return "\n\n".join(parts)

26
openmemory/CLAUDE.md Normal file
View File

@@ -0,0 +1,26 @@
# openmemory
FastMCP server wrapping mem0 for persistent per-session memory, backed by Qdrant + nomic-embed-text.
## Tools exposed (MCP)
- `add_memory(text, user_id)` — extract facts from a conversation turn and store in Qdrant
- `search_memory(query, user_id)` — semantic search, returns results with score ≥ 0.5
- `get_all_memories(user_id)` — dump all stored memories for a session
These are called directly by `agent.py` (outside the agent loop), never exposed to the LLM as tools.
## Two Ollama instances
- **GPU** (`OLLAMA_GPU_URL`, port 11436) — extraction model (`qwen2.5:1.5b`): pulls facts from conversation text
- **CPU** (`OLLAMA_CPU_URL`, port 11435) — embedding model (`nomic-embed-text`): 50150 ms per query
## Prompts
Custom `EXTRACTION_PROMPT` starts with `/no_think` to suppress qwen3 chain-of-thought and get clean JSON output. Custom `UPDATE_MEMORY_PROMPT` handles deduplication — mem0 merges new facts with existing ones rather than creating duplicates.
## Notes
- Qdrant collection is created automatically on first use
- Memory is keyed by `user_id` which equals `session_id` in `agent.py`
- Extraction runs after the reply is sent (background task) — GPU contention with medium model is avoided since the semaphore is released before `_store_memory()` is scheduled

25
routecheck/CLAUDE.md Normal file
View File

@@ -0,0 +1,25 @@
# routecheck
FastAPI service providing a Yandex Routing API proxy behind an image captcha.
## Purpose
Yandex Routing API free tier requires a website that uses the API. This service is that website.
It also exposes an internal endpoint (`/api/route`) used by `CommuteTool` in `fast_tools.py`.
## Two access paths
- **Web UI** (`/`): solve PIL arithmetic captcha → get a token → query any two lat/lon points
- **Internal API**: `GET /api/route?from=lat,lon&to=lat,lon&token=$ROUTECHECK_TOKEN` — no captcha
## Key env vars
- `YANDEX_ROUTING_KEY` — from developer.tech.yandex.ru, Router API, free tier
- `INTERNAL_TOKEN` — equals `ROUTECHECK_TOKEN` from root `.env`; shared with deepagents
- `HTTPS_PROXY` — set to `http://host.docker.internal:56928`; container has no direct external internet
## Notes
- Captchas expire after 5 min, route tokens after 1 hour, both stored in-memory (restart clears them)
- Yandex API expects `lon,lat` order (not `lat,lon`) — `app.py` swaps automatically
- Captcha image endpoint: `GET /captcha/image/{id}` — regenerates on each call with random noise

6
routecheck/Dockerfile Normal file
View File

@@ -0,0 +1,6 @@
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends fonts-dejavu-core && rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir fastapi uvicorn pillow httpx
COPY app.py .
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8090"]

377
routecheck/app.py Normal file
View File

@@ -0,0 +1,377 @@
"""
RouteCheck — local routing web service with image captcha.
Endpoints:
GET / — web UI
GET /captcha/image/{id} — PNG captcha image
POST /api/captcha/new — create captcha, return {id}
POST /api/captcha/solve — {id, answer} → {token} or 400
GET /api/route — ?from=lat,lon&to=lat,lon&token=...
token = solved captcha token OR INTERNAL_TOKEN env var
"""
import io
import math
import os
import random
import string
import time
import uuid
from typing import Optional
import httpx
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from PIL import Image, ImageDraw, ImageFilter, ImageFont
from pydantic import BaseModel
app = FastAPI(title="RouteCheck")
# ── Config ─────────────────────────────────────────────────────────────────────
YANDEX_KEY = os.getenv("YANDEX_ROUTING_KEY", "")
INTERNAL_TOKEN = os.getenv("INTERNAL_TOKEN", "")
HTTPS_PROXY = os.getenv("HTTPS_PROXY", "")
CAPTCHA_TTL = 300 # seconds a captcha is valid
TOKEN_TTL = 3600 # seconds a solved token is valid
# ── In-memory captcha store ────────────────────────────────────────────────────
_captchas: dict[str, dict] = {} # id → {answer, token, expires}
_tokens: dict[str, float] = {} # token → expires
def _purge():
now = time.time()
for k in list(_captchas.keys()):
if _captchas[k]["expires"] < now:
del _captchas[k]
for k in list(_tokens.keys()):
if _tokens[k] < now:
del _tokens[k]
# ── Captcha image generation ───────────────────────────────────────────────────
def _rand_color(dark=False):
if dark:
return tuple(random.randint(0, 100) for _ in range(3))
return tuple(random.randint(140, 255) for _ in range(3))
def _make_captcha_image(text: str) -> bytes:
W, H = 220, 80
img = Image.new("RGB", (W, H), color=_rand_color())
draw = ImageDraw.Draw(img)
# Background noise: random lines
for _ in range(8):
x1, y1 = random.randint(0, W), random.randint(0, H)
x2, y2 = random.randint(0, W), random.randint(0, H)
draw.line([(x1, y1), (x2, y2)], fill=_rand_color(dark=True), width=2)
# Background noise: random dots
for _ in range(300):
x, y = random.randint(0, W), random.randint(0, H)
draw.point((x, y), fill=_rand_color(dark=True))
# Draw each character with slight random offset and rotation
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 36)
except Exception:
font = ImageFont.load_default()
char_w = W // (len(text) + 2)
for i, ch in enumerate(text):
x = char_w + i * char_w + random.randint(-4, 4)
y = (H - 40) // 2 + random.randint(-6, 6)
# Draw shadow
draw.text((x + 2, y + 2), ch, font=font, fill=_rand_color(dark=True))
draw.text((x, y), ch, font=font, fill=_rand_color(dark=True))
# Wavy distortion via pixel manipulation
pixels = img.load()
for x in range(W):
shift = int(4 * math.sin(x / 15.0))
col = [pixels[x, y] for y in range(H)]
for y in range(H):
pixels[x, y] = col[(y - shift) % H]
img = img.filter(ImageFilter.SMOOTH)
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
def _generate_problem() -> tuple[str, int]:
"""Return (display_text, answer)."""
ops = [
lambda a, b: (f"{a} + {b} = ?", a + b),
lambda a, b: (f"{a} × {b} = ?", a * b),
lambda a, b: (f"{max(a,b)} {min(a,b)} = ?", max(a, b) - min(a, b)),
]
op = random.choice(ops)
a, b = random.randint(2, 9), random.randint(2, 9)
text, answer = op(a, b)
return text, answer
# ── Routes ─────────────────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def index():
return HTML_PAGE
@app.get("/captcha/image/{captcha_id}")
async def captcha_image(captcha_id: str):
_purge()
entry = _captchas.get(captcha_id)
if not entry:
raise HTTPException(404, "Captcha not found or expired")
png = _make_captcha_image(entry["problem"])
return StreamingResponse(io.BytesIO(png), media_type="image/png",
headers={"Cache-Control": "no-store"})
class CaptchaNewResponse(BaseModel):
id: str
@app.post("/api/captcha/new")
async def captcha_new():
_purge()
problem_text, answer = _generate_problem()
cid = str(uuid.uuid4())
_captchas[cid] = {
"problem": problem_text,
"answer": answer,
"expires": time.time() + CAPTCHA_TTL,
}
return {"id": cid}
class SolveRequest(BaseModel):
id: str
answer: int
@app.post("/api/captcha/solve")
async def captcha_solve(req: SolveRequest):
_purge()
entry = _captchas.get(req.id)
if not entry:
raise HTTPException(400, "Captcha expired or not found")
if entry["answer"] != req.answer:
raise HTTPException(400, "Wrong answer")
token = str(uuid.uuid4())
_tokens[token] = time.time() + TOKEN_TTL
del _captchas[req.id]
return {"token": token}
@app.get("/api/route")
async def route(
from_coords: str = Query(..., alias="from", description="lat,lon"),
to_coords: str = Query(..., alias="to", description="lat,lon"),
token: str = Query(...),
):
_purge()
# Auth: internal service token or valid captcha token
if token != INTERNAL_TOKEN:
if token not in _tokens:
raise HTTPException(401, "Invalid or expired token — solve captcha first")
if not YANDEX_KEY:
raise HTTPException(503, "YANDEX_ROUTING_KEY not configured")
# Parse coords
try:
from_lat, from_lon = map(float, from_coords.split(","))
to_lat, to_lon = map(float, to_coords.split(","))
except ValueError:
raise HTTPException(400, "coords must be lat,lon")
# Yandex Routing API expects lon,lat order
waypoints = f"{from_lon},{from_lat}|{to_lon},{to_lat}"
transport = httpx.AsyncHTTPTransport(proxy=HTTPS_PROXY) if HTTPS_PROXY else None
async with httpx.AsyncClient(timeout=15, transport=transport) as client:
try:
r = await client.get(
"https://api.routing.yandex.net/v2/route",
params={"apikey": YANDEX_KEY, "waypoints": waypoints, "mode": "driving"},
)
except Exception as e:
raise HTTPException(502, f"Yandex API unreachable: {e}")
if r.status_code != 200:
raise HTTPException(502, f"Yandex API error {r.status_code}: {r.text[:200]}")
data = r.json()
try:
leg = data["route"]["legs"][0]
duration_s = leg["duration"]
duration_traffic_s = leg.get("duration_in_traffic", duration_s)
distance_m = leg["distance"]
except (KeyError, IndexError) as e:
raise HTTPException(502, f"Unexpected Yandex response: {e}{str(data)[:200]}")
return {
"duration_min": round(duration_s / 60),
"duration_traffic_min": round(duration_traffic_s / 60),
"distance_km": round(distance_m / 1000, 1),
}
# ── HTML ───────────────────────────────────────────────────────────────────────
HTML_PAGE = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>RouteCheck</title>
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: system-ui, sans-serif; background: #0f172a; color: #e2e8f0; min-height: 100vh;
display: flex; align-items: center; justify-content: center; }
.card { background: #1e293b; border-radius: 12px; padding: 2rem; width: 420px;
box-shadow: 0 20px 60px rgba(0,0,0,.5); }
h1 { font-size: 1.4rem; font-weight: 700; color: #38bdf8; margin-bottom: .3rem; }
.sub { color: #94a3b8; font-size: .85rem; margin-bottom: 1.5rem; }
label { display: block; font-size: .8rem; color: #94a3b8; margin-bottom: .3rem; margin-top: 1rem; }
input { width: 100%; background: #0f172a; border: 1px solid #334155; border-radius: 6px;
color: #e2e8f0; padding: .55rem .75rem; font-size: .95rem; outline: none; }
input:focus { border-color: #38bdf8; }
button { width: 100%; margin-top: 1.2rem; padding: .7rem; background: #0ea5e9;
border: none; border-radius: 6px; color: #fff; font-size: 1rem;
font-weight: 600; cursor: pointer; transition: background .2s; }
button:hover { background: #0284c7; }
button:disabled { background: #334155; cursor: default; }
.captcha-row { display: flex; gap: .75rem; align-items: center; margin-top: 1rem; }
.captcha-row img { border-radius: 6px; border: 1px solid #334155; cursor: pointer; }
.captcha-row input { flex: 1; }
.result { margin-top: 1.2rem; background: #0f172a; border-radius: 8px; padding: 1rem;
border-left: 3px solid #38bdf8; display: none; }
.result .big { font-size: 1.6rem; font-weight: 700; color: #38bdf8; }
.result .label { font-size: .8rem; color: #64748b; margin-top: .2rem; }
.result .row { display: flex; gap: 1.5rem; margin-top: .8rem; }
.result .metric { flex: 1; }
.result .metric .val { font-size: 1.1rem; font-weight: 600; }
.error { color: #f87171; margin-top: .8rem; font-size: .85rem; display: none; }
.step { display: none; }
.step.active { display: block; }
a.refresh { font-size: .75rem; color: #38bdf8; text-decoration: none; display: block;
margin-top: .4rem; }
a.refresh:hover { text-decoration: underline; }
</style>
</head>
<body>
<div class="card">
<h1>RouteCheck</h1>
<p class="sub">Real-time driving time with Yandex traffic data</p>
<!-- Step 1: captcha -->
<div class="step active" id="step-captcha">
<label>Prove you are human</label>
<div class="captcha-row">
<img id="captcha-img" src="" alt="captcha" width="160" height="60"
title="Click to refresh" onclick="loadCaptcha()">
<input id="captcha-ans" type="number" placeholder="Answer" min="0" max="999">
</div>
<a class="refresh" href="#" onclick="loadCaptcha();return false;">↻ New challenge</a>
<div class="error" id="captcha-err">Wrong answer, try again.</div>
<button id="captcha-btn" onclick="solveCaptcha()">Verify →</button>
</div>
<!-- Step 2: route query -->
<div class="step" id="step-route">
<label>From (lat, lon)</label>
<input id="from" type="text" placeholder="55.7963, 37.9382" value="55.7963, 37.9382">
<label>To (lat, lon)</label>
<input id="to" type="text" placeholder="55.7558, 37.6173" value="55.7558, 37.6173">
<button id="route-btn" onclick="queryRoute()">Get travel time</button>
<div class="error" id="route-err"></div>
<div class="result" id="result">
<div class="big" id="res-traffic"></div>
<div class="label">with current traffic</div>
<div class="row">
<div class="metric"><div class="val" id="res-normal"></div>
<div class="label">without traffic</div></div>
<div class="metric"><div class="val" id="res-dist"></div>
<div class="label">distance</div></div>
</div>
</div>
</div>
</div>
<script>
let captchaId = null;
let routeToken = null;
async function loadCaptcha() {
const r = await fetch('/api/captcha/new', {method: 'POST'});
const d = await r.json();
captchaId = d.id;
document.getElementById('captcha-img').src = '/captcha/image/' + captchaId + '?t=' + Date.now();
document.getElementById('captcha-ans').value = '';
document.getElementById('captcha-err').style.display = 'none';
}
async function solveCaptcha() {
const ans = parseInt(document.getElementById('captcha-ans').value);
if (isNaN(ans)) return;
const btn = document.getElementById('captcha-btn');
btn.disabled = true;
const r = await fetch('/api/captcha/solve', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({id: captchaId, answer: ans})
});
if (r.ok) {
const d = await r.json();
routeToken = d.token;
document.getElementById('step-captcha').classList.remove('active');
document.getElementById('step-route').classList.add('active');
} else {
document.getElementById('captcha-err').style.display = 'block';
loadCaptcha();
}
btn.disabled = false;
}
async function queryRoute() {
const from = document.getElementById('from').value.trim();
const to = document.getElementById('to').value.trim();
const btn = document.getElementById('route-btn');
const err = document.getElementById('route-err');
err.style.display = 'none';
document.getElementById('result').style.display = 'none';
btn.disabled = true;
btn.textContent = 'Fetching…';
const r = await fetch(`/api/route?from=${encodeURIComponent(from)}&to=${encodeURIComponent(to)}&token=${routeToken}`);
btn.disabled = false;
btn.textContent = 'Get travel time';
if (!r.ok) {
const d = await r.json();
err.textContent = d.detail || 'Error';
err.style.display = 'block';
return;
}
const d = await r.json();
document.getElementById('res-traffic').textContent = d.duration_traffic_min + ' min';
document.getElementById('res-normal').textContent = d.duration_min + ' min';
document.getElementById('res-dist').textContent = d.distance_km + ' km';
document.getElementById('result').style.display = 'block';
}
loadCaptcha();
document.getElementById('captcha-ans').addEventListener('keydown', e => {
if (e.key === 'Enter') solveCaptcha();
});
</script>
</body>
</html>
"""

442
router.py
View File

@@ -1,10 +1,38 @@
import asyncio
import re
import math
from typing import Optional
from openai import AsyncOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
from fast_tools import FastToolRunner
# ── Regex pre-classifier ─────────────────────────────────────────────────────
# Catches obvious light-tier patterns before calling the LLM.
# Keyed by regex → compiled pattern.
# ── Regex pre-classifiers ─────────────────────────────────────────────────────
# Complex: keyword triggers that reliably signal deep multi-source research
_COMPLEX_PATTERNS = re.compile(
r"(?:^|\s)("
r"research|investigate|deep.dive|think carefully"
r"|write a (?:detailed|comprehensive|full|thorough|complete)"
r"|compare all|find and (?:compare|summarize|analyze)"
r"|in[- ]depth analysis|comprehensive guide"
r"|detailed (?:report|analysis|comparison|breakdown|overview)"
r"|everything about|all (?:major|available|self-hosted|open.source)"
r"|pros and cons|with (?:sources|citations|references)"
# Russian complex research keywords (no trailing \b — stems like подробн match подробное/подробный)
r"|исследуй|изучи все|сравни все|найди и сравни|найди и опиши"
r"|напиши подробн|напиши детальн|напиши полн"
r"|подробный отчет|детальн\w+ (?:анализ|сравнение|отчет)"
r"|подробное (?:руководство|сравнение)|полное руководство"
r"|все варианты|все способы|все доступные|все самохостируемые|все платформы"
r"|лучшие практики|все инструменты|все решения|все протоколы"
r"|найди детальн|найди и кратко опиши"
r"|изучи свежие|изучи лучши|изучи все"
r"|сравни все\b"
r")",
re.IGNORECASE,
)
# Light: trivial queries that need no tools or memory
_LIGHT_PATTERNS = re.compile(
r"^("
# Greetings / farewells
@@ -14,35 +42,301 @@ _LIGHT_PATTERNS = re.compile(
r"|thanks?|thank you|thx|ty|ok|okay|k|cool|great|awesome|perfect|sounds good|got it|nice|sure"
r"|how are you|how are you\?|how are you doing(\s+today)?[?!.]*"
r"|what.?s up"
# Calendar facts: "what day comes after X?" / "what comes after X?"
# Calendar facts
r"|what\s+day\s+(comes\s+after|follows|is\s+after)\s+\w+[?!.]*"
r"|what\s+comes\s+after\s+\w+[?!.]*"
# Acronym expansions: "what does X stand for?"
# Acronym expansions
r"|what\s+does\s+\w+\s+stand\s+for[?!.]*"
# Russian greetings / farewells / acknowledgements
r"|привет|пока|спасибо|здравствуй|здравствуйте|добрый день|добрый вечер|доброе утро"
r"|окей|хорошо|отлично|понятно|ок|ладно|договорились|спс|благодарю"
r"|пожалуйста|не за что|всё понятно|ясно"
r"|как дела|как ты|как жизнь|всё хорошо|всё ок"
r")[\s!.?]*$",
re.IGNORECASE,
)
# ── LLM classification prompt ─────────────────────────────────────────────────
CLASSIFY_PROMPT = """Classify the message. Output ONLY one word: light, medium, or complex.
# ── Semantic router utterances ────────────────────────────────────────────────
# These are embedded at startup. New messages are classified by cosine
# similarity — whichever tier's centroid is closest wins.
_LIGHT_UTTERANCES = [
# General facts (English)
"what is 2+2",
"what is the capital of France",
"name the three primary colors",
"tell me a short joke",
"is the sky blue",
"is water wet",
"how many days in a week",
"what is the speed of light",
"what is the boiling point of water",
"spell the word beautiful",
"what color is the ocean",
"how many inches in a foot",
"who wrote hamlet",
"what is pi",
"what year did world war two end",
"what is the largest planet",
"how many continents are there",
"what does DNA stand for",
"what language do they speak in Brazil",
"what is the square root of 144",
# Tech definitions — static knowledge (English)
"what is Docker",
"what is a VPN",
"what is SSH",
"what is a reverse proxy",
"what is an API",
"what is a firewall",
"what is a container",
"what is DNS",
"what is HTTPS",
"what is a load balancer",
"what is Kubernetes",
"what is Git",
"what is a network port",
"what is an IP address",
"what is a subnet mask",
"what is the OSI model",
"how many bits in a byte",
"how many bytes in a gigabyte",
"what is TCP",
"what is a REST API",
# Russian — static facts and definitions
"что такое IP-адрес",
"что такое VPN",
"что такое Docker",
"что такое DNS",
"что такое SSH",
"что означает API",
"сколько байт в гигабайте",
"сколько бит в байте",
"что такое Zigbee",
"что такое Z-Wave",
"что такое брандмауэр",
"что такое виртуальная машина",
"что такое обратный прокси",
"привет",
"пока",
"спасибо",
"как дела",
"что такое Matter протокол",
"сколько планет в солнечной системе",
"чему равно число Пи",
# Russian — more static definitions
"что такое TCP/IP",
"что такое подсеть",
"скорость света",
"сколько дней в году",
"что такое Kubernetes",
"что такое Git",
"что такое REST API",
"что такое TCP",
"что такое UDP",
"что такое VLAN",
"сколько мегабайт в гигабайте",
"что такое процессор",
"что такое оперативная память",
"что такое виртуализация",
"что такое Linux",
"что такое умный дом",
"что такое Home Assistant",
"что такое Matter",
]
LIGHT = answerable from general knowledge, no internet needed:
what is 2+2 / what is the capital of France / name the three primary colors
tell me a short joke / is the sky blue / is water wet
_MEDIUM_UTTERANCES = [
# English — current data, memory, actions
"what is the weather today",
"what is the bitcoin price right now",
"what are the latest news",
"what did we talk about last time",
"what is my name",
"where do I live",
"what do you know about me",
"what did I tell you before",
"what is the current temperature outside",
"remind me what I said about my project",
"search for the latest iPhone release",
"find me a restaurant nearby",
"turn on the lights in the living room",
"turn off all lights",
"set temperature to 22 degrees",
"what is the current traffic to Moscow",
"check if anyone is home",
"what devices are currently on",
"look up my public IP address",
"show me recent news about Proxmox",
# Russian — weather and commute
"какая сегодня погода в Балашихе",
"пойдет ли сегодня дождь",
"какая температура на улице сейчас",
"погода на завтра",
"будет ли снег сегодня",
"сколько ехать до Москвы сейчас",
"какие пробки на дороге до Москвы",
"время в пути на работу",
"есть ли пробки сейчас",
"стоит ли брать зонтик",
# Russian — smart home control
"включи свет в гостиной",
"выключи свет на кухне",
"какая температура дома",
"установи температуру 22 градуса",
"выключи все лампочки",
"какие устройства сейчас включены",
"включи ночной режим",
"открой шторы в гостиной",
"включи свет в спальне на 50 процентов",
"выключи свет во всём доме",
"включи вентилятор в детской",
"закрыты ли все окна",
"выключи телевизор",
"какое потребление электричества сегодня",
"включи кофемашину",
"сколько у нас датчиков движения",
"состояние всех дверных замков",
"есть ли кто-нибудь дома",
"установи будильник на 7 утра",
# Russian — personal memory
"как меня зовут",
"где я живу",
"что мы обсуждали в прошлый раз",
"что ты знаешь о моем домашнем сервере",
"напомни, какие сервисы я запускаю",
"что я просил тебя запомнить",
"что я говорил о своей сети",
# Russian — current info lookups requiring network/tools
"какой сейчас курс биткоина",
"курс доллара к рублю сейчас",
"какая последняя версия Docker",
"как перезапустить Docker контейнер",
"как посмотреть логи Docker контейнера",
"какие новые функции в Home Assistant 2024",
"есть ли проблемы у Cloudflare сегодня",
"какие новые Zigbee устройства вышли в 2024 году",
"найди хороший опенсорс менеджер фотографий",
"последние новости Proxmox",
"напиши bash команду для поиска больших файлов",
"как вывести список всех запущенных контейнеров",
"как проверить использование диска в Linux",
]
MEDIUM = requires web search or the user's stored memories:
current weather / today's news / Bitcoin price / what did we talk about
what is my name / where do I live / what is my job / do I have any pets
what do you know about me / what are my preferences / what did I tell you
_COMPLEX_UTTERANCES = [
# English
"research everything about Elon Musk's recent projects and investments",
"write a detailed report on climate change solutions with sources",
"investigate the history and current state of quantum computing",
"find and summarize the latest academic papers on transformer architectures",
"analyze in depth the pros and cons of nuclear energy with citations",
"research the background and controversies around this person",
"compare all major cloud providers with detailed pricing and features",
"write a comprehensive biography of this historical figure",
"investigate what caused the 2008 financial crisis with multiple sources",
"research the best programming languages in 2024 with detailed comparison",
"find everything published about this medical condition and treatments",
"do a deep dive into the latest developments in artificial general intelligence",
"research and compare all options for starting a business in Europe",
"investigate recent news and controversies around this company",
"write a thorough analysis of geopolitical tensions in the Middle East",
"find detailed information on the side effects and studies for this medication",
"research the top 10 JavaScript frameworks with benchmarks and community data",
"investigate who is funding AI research and what their goals are",
"write a detailed market analysis for the electric vehicle industry",
"research everything you can find about this startup or technology",
# Russian — deep research
"исследуй и сравни все варианты умного домашнего освещения",
"напиши подробный отчет о протоколах умного дома",
"изучи все самохостируемые медиасерверы и сравни их",
"исследуй лучшие практики безопасности домашнего сервера",
"сравни все системы резервного копирования для Linux",
"напиши детальное сравнение WireGuard и OpenVPN",
"исследуй все варианты голосового управления на русском языке",
"изучи все опенсорс альтернативы Google сервисам",
"напиши подробный анализ локальных языковых моделей",
"исследуй лучшие инструменты мониторинга для домашнего сервера",
# Russian — more deep research queries matching benchmark
"исследуй и сравни Proxmox, Unraid и TrueNAS для домашней лаборатории",
"напиши подробное руководство по безопасности домашнего сервера",
"исследуй все доступные дашборды для самохостинга и сравни их",
"найди детальные бенчмарки ARM одноплатных компьютеров для домашней лаборатории",
"исследуй лучший стек мониторинга для самохостинга в 2024 году",
"исследуй и сравни WireGuard, OpenVPN и Tailscale для домашней сети",
"исследуй лучшие практики сегментации домашней сети с VLAN",
"изучи все самохостируемые DNS решения и их возможности",
"исследуй и сравни все платформы умного дома: Home Assistant и другие",
"изучи лучшие Zigbee координаторы и их совместимость с Home Assistant",
"напиши детальный отчет о поддержке протокола Matter и совместимости устройств",
"исследуй все способы интеграции умных ламп с Home Assistant",
"найди и сравни все варианты датчиков движения для умного дома",
"исследуй и сравни все самохостируемые решения для хранения фотографий",
"изучи лучшие самохостируемые медиасерверы: Jellyfin, Plex и Emby",
"исследуй последние достижения в локальном LLM инференсе и обзор моделей",
"изучи лучшие опенсорс альтернативы Google сервисов для приватности",
"найди и кратко опиши все крупные самохостируемые менеджеры паролей",
"напиши детальный анализ текущего состояния AI ассистентов для самохостинга",
"исследуй и сравни все инструменты оркестрации контейнеров для домашней лаборатории",
"изучи лучшие подходы к автоматическому резервному копированию в Linux",
"исследуй и сравни все самохостируемые инструменты личных финансов",
"изучи свежие CVE и уязвимости в популярном самохостируемом ПО",
"напиши подробное руководство по настройке автоматизаций в Home Assistant",
"исследуй все варианты голосового управления умным домом на русском языке",
"сравни все системы резервного копирования для Linux: Restic, BorgBackup и другие",
"исследуй лучшие самохостируемые системы мониторинга сети: Zabbix, Grafana",
"изучи все варианты локального запуска языковых моделей на видеокарте",
"напиши подробный отчет о технологиях синтеза речи с открытым исходным кодом",
"исследуй все способы интеграции умных розеток с мониторингом потребления",
"напиши полное руководство по настройке обратного прокси Caddy",
"исследуй лучшие практики написания Docker Compose файлов для продакшена",
"сравни все самохостируемые облачные хранилища: Nextcloud, Seafile и другие",
"изучи все доступные локальные ассистенты с голосовым управлением",
"исследуй все самохостируемые решения для блокировки рекламы: Pi-hole, AdGuard",
"напиши детальное сравнение систем управления конфигурацией: Ansible, Puppet",
"исследуй все протоколы умного дома и их плюсы и минусы: Zigbee, Z-Wave, Matter",
"найди и сравни все фреймворки для создания локальных AI ассистентов",
"исследуй лучшие решения для автоматического управления медиатекой",
"изучи все варианты самохостируемых систем учёта расходов с возможностью импорта",
"напиши сравнение всех вариантов самохостинга для хранения и синхронизации файлов",
"исследуй все открытые протоколы для умного дома и их экосистемы",
"изучи лучшие инструменты для автоматизации домашней инфраструктуры",
]
COMPLEX = /think prefix only:
/think compare frameworks / /think plan a trip
Message: {message}
Output (one word only — light, medium, or complex):"""
# Medium: queries that require tools, actions, or real-time data (not static knowledge)
_MEDIUM_PATTERNS = re.compile(
r"(?:"
# Russian smart home commands — always need HA integration
r"(?:включи|выключи|открой|закрой|установи|поставь|убавь|прибавь|переключи)\s"
r"|(?:какая|какой|какое|каково)\s+(?:температура|влажность|потребление|состояние|статус)\s"
r"|(?:сколько|есть ли)\s.*(?:датчик|устройств|замк)"
# Russian memory queries
r"|как меня зовут|где я живу|что мы обсуждали|что я говорил|что я просил"
r"|напомни\b|что ты знаешь обо мне"
# Russian current info
r"|курс (?:доллара|биткоина|евро|рубл)"
r"|(?:последние |свежие )?новости\b"
r"|(?:погода|температура)\s+(?:на завтра|на неделю)"
r")",
re.IGNORECASE,
)
LIGHT_REPLY_PROMPT = """You are a helpful Telegram assistant. Answer briefly and naturally (1-3 sentences). Be friendly."""
_EMBED_MODEL = "ollama/nomic-embed-text"
def _cosine(a: list[float], b: list[float]) -> float:
dot = sum(x * y for x, y in zip(a, b))
norm_a = math.sqrt(sum(x * x for x in a))
norm_b = math.sqrt(sum(x * x for x in b))
if norm_a == 0 or norm_b == 0:
return 0.0
return dot / (norm_a * norm_b)
def _centroid(embeddings: list[list[float]]) -> list[float]:
n = len(embeddings)
dim = len(embeddings[0])
return [sum(embeddings[i][d] for i in range(n)) / n for d in range(dim)]
def _format_history(history: list[dict]) -> str:
if not history:
@@ -55,63 +349,93 @@ def _format_history(history: list[dict]) -> str:
return "\n".join(lines)
def _parse_tier(text: str) -> str:
"""Extract tier from raw model output. Default to medium."""
t = text.strip().lower()
snippet = t[:60]
if "complex" in snippet:
return "complex"
if "medium" in snippet:
return "medium"
if "light" in snippet:
return "light"
# Model invented a descriptive category (e.g. "simplefact", "trivial", "basic") →
# treat as light since it recognised the question doesn't need tools
if any(w in snippet for w in ("simple", "fact", "trivial", "basic", "easy", "general")):
return "light"
return "medium" # safe default
class Router:
def __init__(self, model):
self.model = model
def __init__(
self,
model,
embedder: AsyncOpenAI,
fast_tool_runner: FastToolRunner | None = None,
):
self.model = model # qwen2.5:1.5b — used only for generating light replies
self._embedder = embedder
self._fast_tool_runner = fast_tool_runner
self._light_centroid: list[float] | None = None
self._medium_centroid: list[float] | None = None
self._complex_centroid: list[float] | None = None
async def initialize(self) -> None:
"""Pre-compute utterance embeddings. Call once at startup. Retries until LiteLLM is ready."""
print("[router] embedding utterances for semantic classifier...", flush=True)
texts = _LIGHT_UTTERANCES + _MEDIUM_UTTERANCES + _COMPLEX_UTTERANCES
for attempt in range(10):
try:
resp = await self._embedder.embeddings.create(model=_EMBED_MODEL, input=texts)
embeddings = [item.embedding for item in resp.data]
n_light = len(_LIGHT_UTTERANCES)
n_medium = len(_MEDIUM_UTTERANCES)
self._light_centroid = _centroid(embeddings[:n_light])
self._medium_centroid = _centroid(embeddings[n_light:n_light + n_medium])
self._complex_centroid = _centroid(embeddings[n_light + n_medium:])
print("[router] semantic classifier ready (3-tier)", flush=True)
return
except Exception as e:
print(f"[router] embedding attempt {attempt+1}/10 failed: {e}", flush=True)
await asyncio.sleep(3)
print("[router] WARNING: could not initialize semantic classifier — will default to medium", flush=True)
async def _classify_by_embedding(self, message: str) -> str:
"""Embed message and return 'light', 'medium', or 'complex' based on centroid similarity."""
if self._light_centroid is None or self._medium_centroid is None or self._complex_centroid is None:
return "medium"
try:
resp = await self._embedder.embeddings.create(model=_EMBED_MODEL, input=[message])
emb = resp.data[0].embedding
score_light = _cosine(emb, self._light_centroid)
score_medium = _cosine(emb, self._medium_centroid)
score_complex = _cosine(emb, self._complex_centroid)
tier = max(
[("light", score_light), ("medium", score_medium), ("complex", score_complex)],
key=lambda x: x[1],
)[0]
print(
f"[router] semantic: light={score_light:.3f} medium={score_medium:.3f} "
f"complex={score_complex:.3f}{tier}",
flush=True,
)
return tier
except Exception as e:
print(f"[router] embedding classify error, defaulting to medium: {e}", flush=True)
return "medium"
async def route(
self,
message: str,
history: list[dict],
force_complex: bool = False,
) -> tuple[str, Optional[str]]:
"""
Returns (tier, reply_or_None).
For light tier: also generates the reply with a second call.
For light tier: also generates the reply inline.
For medium/complex: reply is None.
"""
if force_complex:
return "complex", None
if self._fast_tool_runner and self._fast_tool_runner.any_matches(message.strip()):
names = self._fast_tool_runner.matching_names(message.strip())
print(f"[router] fast_tool_match={names} → medium", flush=True)
return "medium", None
# Step 0: regex pre-classification for obvious light patterns
if _LIGHT_PATTERNS.match(message.strip()):
print(f"[router] regex→light", flush=True)
print("[router] regex→light", flush=True)
return await self._generate_light_reply(message, history)
# Step 1: LLM classification with raw text output
try:
classify_response = await self.model.ainvoke([
HumanMessage(content=CLASSIFY_PROMPT.format(message=message)),
])
raw = classify_response.content or ""
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
tier = _parse_tier(raw)
if _COMPLEX_PATTERNS.search(message.strip()):
print("[router] regex→complex", flush=True)
return "complex", None
if tier == "complex" and not message.startswith("/think"):
tier = "medium"
print(f"[router] raw={raw[:30]!r} → tier={tier}", flush=True)
except Exception as e:
print(f"[router] classify error, defaulting to medium: {e}", flush=True)
if _MEDIUM_PATTERNS.search(message.strip()):
print("[router] regex→medium", flush=True)
return "medium", None
tier = await self._classify_by_embedding(message)
if tier != "light":
return tier, None
@@ -120,7 +444,7 @@ class Router:
async def _generate_light_reply(
self, message: str, history: list[dict]
) -> tuple[str, Optional[str]]:
"""Generate a short reply using the router model for light-tier messages."""
"""Generate a short reply using qwen2.5:1.5b for light-tier messages."""
history_text = _format_history(history)
context = f"\nConversation history:\n{history_text}" if history else ""
try:

View File

@@ -199,14 +199,13 @@ def parse_run_block(lines, msg_prefix):
if txt:
last_ai_text = txt
m = re.search(r"replied in ([\d.]+)s \(llm=([\d.]+)s, send=([\d.]+)s\)", line)
m = re.search(r"replied in ([\d.]+)s(?:\s+tier=(\w+))?", line)
if m:
tier_m = re.search(r"\btier=(\w+)", line)
tier = tier_m.group(1) if tier_m else "unknown"
tier = m.group(2) if m.group(2) else "unknown"
reply_data = {
"reply_total": float(m.group(1)),
"llm": float(m.group(2)),
"send": float(m.group(3)),
"llm": None,
"send": None,
"tier": tier,
"reply_text": last_ai_text,
"memory_s": None,

View File

@@ -0,0 +1,40 @@
# Use Case: Current Weather Query
Verify how Adolf handles a real-time information request ("what's the weather now?").
This question requires live data that an LLM cannot answer from training alone.
## Steps
**1. Send the weather query:**
```bash
curl -s -X POST http://localhost:8000/message \
-H "Content-Type: application/json" \
-d '{"text": "whats the weather right now?", "session_id": "use-case-weather", "channel": "cli", "user_id": "claude"}'
```
**2. Stream the reply** (medium tier should respond within 30s):
```bash
curl -s -N --max-time 60 "http://localhost:8000/stream/use-case-weather"
```
**3. Check routing tier and any tool usage in logs:**
```bash
docker compose -f /home/alvis/adolf/docker-compose.yml logs deepagents \
--since=120s | grep -E "tier=|web_search|fetch_url|crawl4ai"
```
## Evaluate (use your judgment)
Check each of the following:
- **Routing**: which tier was selected? Was it appropriate for a real-time query?
- **Tool use**: did the agent use web_search or any external data source?
- **Accuracy**: does the response contain actual current weather data (temperature, conditions) or is it a guess/refusal?
- **Honesty**: if the agent cannot fetch weather, does it say so — or does it hallucinate fake data?
- **Helpfulness**: does the response suggest how the user could get weather info (e.g. check a website, use /think)?
Report PASS only if the response is both honest and helpful. A hallucinated weather
report is a FAIL. A honest "I can't check weather" with guidance is a PASS.