Compare commits

...

7 Commits

Author SHA1 Message Date
Alvis
8cd41940f0 Update docs: streaming, CLI container, use_cases tests
- /stream/{session_id} SSE endpoint replaces /reply/ for CLI
- Medium tier streams per-token via astream() with in_think filtering
- CLI now runs as Docker container (Dockerfile.cli, profile:tools)
- Correct medium model to qwen3:4b with real-time think block filtering
- Add use_cases/ test category to commands section
- Update files tree and services table

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 17:31:36 +00:00
Alvis
b04e8a0925 Add Rich token streaming: server SSE + CLI live display + CLI container
Server (agent.py):
- _stream_queues: per-session asyncio.Queue for token chunks
- _push_stream_chunk() / _end_stream() helpers
- Medium tier: astream() with <think> block filtering — real token streaming
- Light tier: full reply pushed as single chunk then [DONE]
- Complex tier: full reply pushed after agent completes then [DONE]
- GET /stream/{session_id} SSE endpoint (data: <chunk>\n\n, data: [DONE]\n\n)
- medium_model promoted to module-level global for astream() access

CLI (cli.py):
- stream_reply(): reads /stream/ SSE, renders tokens live with Rich Live (transient)
- Final reply rendered as Markdown after stream completes
- os.getlogin() replaced with os.getenv("USER") for container compatibility

Dockerfile.cli + docker-compose cli service (profiles: tools):
- Run: docker compose --profile tools run --rm -it cli

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 17:26:52 +00:00
Alvis
edc9a96f7a Add use_cases test category as Claude Code skill instructions
Use cases are markdown files that Claude Code reads, executes step by step
using its tools, and evaluates with its own judgment — not assertion scripts.

- cli_startup.md: pipe EOF into cli.py, verify banner and exit code 0
- apple_pie_research.md: /think query → complex tier → web_search + fetch →
  evaluate recipe quality, sources, and structure

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 17:01:13 +00:00
Alvis
a35ba83db7 Add use_cases test category with CLI startup test
tests/use_cases/ holds scenario-driven tests run by the Claude Code agent,
which acts as both the test runner and mock user. Each test prints a
structured transcript; Claude evaluates correctness.

First test: test_cli_startup.py — spawns cli.py with a subprocess, reads
the welcome banner, sends EOF, and verifies exit code 0.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 16:10:04 +00:00
Alvis
021104f510 Split monolithic test_pipeline.py into focused integration test scripts
- common.py: shared config, URL constants, benchmark questions, all helpers
  (get, post_json, check_sse, qdrant_count, fetch_logs, parse_run_block, wait_for, etc.)
- test_health.py: service health checks (deepagents, bifrost, GPU/CPU Ollama, Qdrant, SearXNG)
- test_memory.py: name store/recall pipeline, memory benchmark (5 facts + 10 recalls), dedup test
- test_routing.py: easy/medium/hard tier routing benchmarks with --easy/medium/hard-only flags
- Removed test_pipeline.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 16:02:57 +00:00
Alvis
50097d6092 Embed Crawl4AI at all tiers, restore qwen3:4b medium, update docs
- Pre-routing URL fetch: any message with URLs gets content fetched
  async (httpx.AsyncClient) before routing via _fetch_urls_from_message()
- URL context and memories gathered concurrently with asyncio.gather
- Light tier upgraded to medium when URL content is present
- url_context injected into system prompt for medium and complex agents
- Complex agent retains web_search/fetch_url tools + receives pre-fetched content
- Medium model restored to qwen3:4b (was temporarily qwen2.5:1.5b)
- Unit tests added for _extract_urls
- ARCHITECTURE.md: added Tool Handling, Crawl4AI Integration, Memory Pipeline sections
- CLAUDE.md: updated request flow and Crawl4AI integration docs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 15:49:34 +00:00
Alvis
f9618a9bbf Integrate Bifrost LLM gateway, add test suite, implement memory pipeline
- Add Bifrost (maximhq/bifrost) as LLM gateway: all inference routes through
  bifrost:8080/v1 with retry logic and observability; VRAMManager keeps direct
  Ollama access for VRAM flush/prewarm operations
- Switch medium model from qwen3:4b to qwen2.5:1.5b (direct call, no tools)
  via _DirectModel wrapper; complex keeps create_deep_agent with qwen3:8b
- Implement out-of-agent memory pipeline: _retrieve_memories pre-fetches
  relevant context (injected into all tiers), _store_memory runs as background
  task after each reply writing to openmemory/Qdrant
- Add tests/unit/ with 133 tests covering router, channels, vram_manager,
  agent helpers; move integration test to tests/integration/
- Add bifrost-config.json with GPU Ollama (qwen2.5:0.5b/1.5b, qwen3:4b/8b,
  gemma3:4b) and CPU Ollama providers
- Integration test 28/29 pass (only grammy fails — no TELEGRAM_BOT_TOKEN)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 13:50:12 +00:00
28 changed files with 2702 additions and 1254 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
__pycache__/
*.pyc

View File

@@ -18,7 +18,8 @@ Autonomous personal assistant with a multi-channel gateway. Three-tier model rou
│ │ │ │
│ │ POST /message │ ← all inbound │
│ │ POST /chat (legacy) │ │
│ │ GET /reply/{id} SSE │ ← CLI polling
│ │ GET /stream/{id} SSE │ ← token stream
│ │ GET /reply/{id} SSE │ ← legacy poll │
│ │ GET /health │ │
│ │ │ │
│ │ channels.py registry │ │
@@ -42,7 +43,7 @@ Autonomous personal assistant with a multi-channel gateway. Three-tier model rou
| Channel | session_id | Inbound | Outbound |
|---------|-----------|---------|---------|
| Telegram | `tg-<chat_id>` | Grammy long-poll → POST /message | channels.py → POST grammy:3001/send |
| CLI | `cli-<user>` | POST /message directly | GET /reply/{id} SSE stream |
| CLI | `cli-<user>` | POST /message directly | GET /stream/{id} SSE — Rich Live streaming |
| Voice | `voice-<device>` | (future) | (future) |
## Unified Message Flow
@@ -52,23 +53,68 @@ Autonomous personal assistant with a multi-channel gateway. Three-tier model rou
2. POST /message {text, session_id, channel, user_id}
3. 202 Accepted immediately
4. Background: run_agent_task(message, session_id, channel)
5. Route → run agent tier → get reply text
6. channels.deliver(session_id, channel, reply_text)
- always puts reply in pending_replies[session_id] queue (for SSE)
- calls channel-specific send callback
7. GET /reply/{session_id} SSE clients receive the reply
5. Parallel IO (asyncio.gather):
a. _fetch_urls_from_message() — Crawl4AI fetches any URLs in message
b. _retrieve_memories() — openmemory semantic search for context
6. router.route() with enriched history (url_context + memories as system msgs)
- if URL content fetched and tier=light → upgrade to medium
7. Invoke agent for tier with url_context + memories in system prompt
8. Token streaming:
- medium: astream() pushes per-token chunks to _stream_queues[session_id]; <think> blocks filtered in real time
- light/complex: full reply pushed as single chunk after completion
- _end_stream() sends [DONE] sentinel
9. channels.deliver(session_id, channel, reply_text) — Telegram callback
10. _store_memory() background task — stores turn in openmemory
11. GET /stream/{session_id} SSE clients receive chunks; CLI renders with Rich Live + final Markdown
```
## Tool Handling
Adolf uses LangChain's tool interface but only the complex agent actually invokes tools at runtime.
**Complex agent (`/think` prefix):** `web_search` and `fetch_url` are defined as `langchain_core.tools.Tool` objects and passed to `create_deep_agent()`. The deepagents library runs an agentic loop (LangGraph `create_react_agent` under the hood) that sends the tool schema to the model via OpenAI function-calling format and handles tool dispatch.
**Medium agent (default):** `_DirectModel` makes a single `model.ainvoke(messages)` call with no tool schema. Context (memories, fetched URL content) is injected via the system prompt instead. This is intentional — `qwen3:4b` behaves unreliably when a tool array is present.
**Memory tools (out-of-loop):** `add_memory` and `search_memory` are LangChain MCP tool objects (via `langchain_mcp_adapters`) but are excluded from both agents' tool lists. They are called directly — `await _memory_add_tool.ainvoke(...)` — outside the agent loop, before and after each turn.
## Three-Tier Model Routing
| Tier | Model | VRAM | Trigger | Latency |
|------|-------|------|---------|---------|
| Light | qwen2.5:1.5b (router answers) | ~1.2 GB | Router classifies as light | ~24s |
| Medium | qwen3:4b | ~2.5 GB | Default | ~2040s |
| Complex | qwen3:8b | ~6.0 GB | `/think` prefix | ~60120s |
| Tier | Model | Agent | Trigger | Latency |
|------|-------|-------|---------|---------|
| Light | `qwen2.5:1.5b` (router answers directly) | — | Regex pre-match or LLM classifies "light" | ~24s |
| Medium | `qwen3:4b` (`DEEPAGENTS_MODEL`) | `_DirectModel` — single LLM call, no tools | Default; also forced when message contains URLs | ~1020s |
| Complex | `qwen3:8b` (`DEEPAGENTS_COMPLEX_MODEL`) | `create_deep_agent` — agentic loop with tools | `/think` prefix only | ~60120s |
**`/think` prefix**: forces complex tier, stripped before sending to agent.
Complex tier is locked out unless the message starts with `/think` — any LLM classification of "complex" is downgraded to medium.
## Crawl4AI Integration
Crawl4AI runs as a Docker service (`crawl4ai:11235`) providing JS-rendered, bot-bypass page fetching.
**Pre-routing fetch (all tiers):**
- `_URL_RE` detects `https?://` URLs in any incoming message
- `_crawl4ai_fetch_async()` uses `httpx.AsyncClient` to POST `{urls: [...]}` to `/crawl`
- Up to 3 URLs fetched concurrently via `asyncio.gather`
- Fetched content (up to 3000 chars/URL) injected as a system context block into enriched history before routing and into medium/complex system prompts
- If fetch succeeds and router returns light → tier upgraded to medium
**Complex agent tools:**
- `web_search`: SearXNG query + Crawl4AI auto-fetch of top 2 result URLs → combined snippet + page text
- `fetch_url`: Crawl4AI single-URL fetch for any specific URL
## Memory Pipeline
openmemory runs as a FastMCP server (`openmemory:8765`) backed by mem0 + Qdrant + nomic-embed-text.
**Retrieval (before routing):** `_retrieve_memories()` calls `search_memory` MCP tool with the user message as query. Results (threshold ≥ 0.5) are prepended to enriched history so all tiers benefit.
**Storage (after reply):** `_store_memory()` runs as an asyncio background task, calling `add_memory` with `"User: ...\nAssistant: ..."`. The extraction LLM (`qwen2.5:1.5b` on GPU Ollama) pulls facts; dedup is handled by mem0's update prompt.
Memory tools (`add_memory`, `search_memory`, `get_all_memories`) are excluded from agent tool lists — memory management happens outside the agent loop.
## VRAM Management
GTX 1070 — 8 GB. Ollama must be restarted if CUDA init fails (model loads on CPU).
@@ -76,7 +122,7 @@ GTX 1070 — 8 GB. Ollama must be restarted if CUDA init fails (model loads on C
1. Flush explicitly before loading qwen3:8b (`keep_alive=0`)
2. Verify eviction via `/api/ps` poll (15s timeout) before proceeding
3. Fallback: timeout → run medium agent instead
4. Post-complex: flush 8b, pre-warm 4b + router
4. Post-complex: flush 8b, pre-warm medium + router
## Session ID Convention
@@ -89,18 +135,22 @@ Conversation history is keyed by session_id (5-turn buffer).
```
adolf/
├── docker-compose.yml Services: deepagents, openmemory, grammy
├── docker-compose.yml Services: bifrost, deepagents, openmemory, grammy, crawl4ai, cli (profile:tools)
├── Dockerfile deepagents container (Python 3.12)
├── agent.py FastAPI gateway + three-tier routing
├── Dockerfile.cli CLI container (python:3.12-slim + rich)
├── agent.py FastAPI gateway, run_agent_task, Crawl4AI pre-fetch, memory pipeline, /stream/ SSE
├── channels.py Channel registry + deliver() + pending_replies
├── router.py Router class — qwen2.5:1.5b routing
├── router.py Router class — regex + LLM tier classification
├── vram_manager.py VRAMManager — flush/prewarm/poll Ollama VRAM
├── agent_factory.py build_medium_agent / build_complex_agent
├── cli.py Interactive CLI REPL client
├── agent_factory.py _DirectModel (medium) / create_deep_agent (complex)
├── cli.py Interactive CLI REPL — Rich Live streaming + Markdown render
├── wiki_research.py Batch wiki research pipeline (uses /message + SSE)
├── tests/
│ ├── integration/ Standalone integration test scripts (common.py + test_*.py)
│ └── use_cases/ Claude Code skill markdown files — Claude acts as user + evaluator
├── .env TELEGRAM_BOT_TOKEN (not committed)
├── openmemory/
│ ├── server.py FastMCP + mem0 MCP tools
│ ├── server.py FastMCP + mem0: add_memory, search_memory, get_all_memories
│ └── Dockerfile
└── grammy/
├── bot.mjs grammY Telegram bot + POST /send HTTP endpoint
@@ -108,11 +158,11 @@ adolf/
└── Dockerfile
```
## External Services (from openai/ stack)
## External Services (host ports, from openai/ stack)
| Service | Host Port | Role |
|---------|-----------|------|
| Ollama GPU | 11436 | All reply inference |
| Ollama CPU | 11435 | Memory embedding (nomic-embed-text) |
| Ollama GPU | 11436 | All LLM inference (via Bifrost) + VRAM management (direct) + memory extraction |
| Ollama CPU | 11435 | nomic-embed-text embeddings for openmemory |
| Qdrant | 6333 | Vector store for memories |
| SearXNG | 11437 | Web search |
| SearXNG | 11437 | Web search (used by `web_search` tool) |

154
CLAUDE.md Normal file
View File

@@ -0,0 +1,154 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Commands
**Start all services:**
```bash
docker compose up --build
```
**Interactive CLI (Docker container, requires gateway running):**
```bash
docker compose --profile tools run --rm -it cli
# or with options:
docker compose --profile tools run --rm -it cli python3 cli.py --url http://deepagents:8000 --session cli-alvis
```
**Run integration tests** (from `tests/integration/`, require all Docker services running):
```bash
python3 test_health.py # service health: deepagents, bifrost, Ollama, Qdrant, SearXNG
python3 test_memory.py # name store/recall + memory benchmark + dedup
python3 test_memory.py --name-only # only name store/recall pipeline
python3 test_memory.py --bench-only # only 5-fact store + 10-question recall
python3 test_memory.py --dedup-only # only deduplication test
python3 test_routing.py # all routing benchmarks (easy + medium + hard)
python3 test_routing.py --easy-only # light-tier routing benchmark
python3 test_routing.py --medium-only # medium-tier routing benchmark
python3 test_routing.py --hard-only # complex-tier + VRAM flush benchmark
```
Shared config and helpers are in `tests/integration/common.py`.
**Use case tests** (`tests/use_cases/`) — markdown skill files executed by Claude Code, which acts as mock user and quality evaluator. Run by reading the `.md` file and following its steps with tools (Bash, WebFetch, etc.).
## Architecture
Adolf is a multi-channel personal assistant. All LLM inference is routed through **Bifrost**, an open-source Go-based LLM gateway that adds retry logic, failover, and observability in front of Ollama.
### Request flow
```
Channel adapter → POST /message {text, session_id, channel, user_id}
→ 202 Accepted (immediate)
→ background: run_agent_task()
→ asyncio.gather(
_fetch_urls_from_message() ← Crawl4AI, concurrent
_retrieve_memories() ← openmemory search, concurrent
)
→ router.route() → tier decision (light/medium/complex)
if URL content fetched → upgrade light→medium
→ invoke agent for tier via Bifrost (url_context + memories in system prompt)
deepagents:8000 → bifrost:8080/v1 → ollama:11436
→ _push_stream_chunk() per token (medium streaming) / full reply (light, complex)
→ _stream_queues[session_id] asyncio.Queue
→ _end_stream() sends [DONE] sentinel
→ channels.deliver(session_id, channel, reply)
→ channel-specific callback (Telegram POST)
→ _store_memory() background task (openmemory)
CLI streaming → GET /stream/{session_id} (SSE, per-token for medium, single-chunk for others)
```
### Bifrost integration
Bifrost (`bifrost-config.json`) is configured with the `ollama` provider pointing to the GPU Ollama instance on host port 11436. It exposes an OpenAI-compatible API at `http://bifrost:8080/v1`.
`agent.py` uses `langchain_openai.ChatOpenAI` with `base_url=BIFROST_URL`. Model names use the `provider/model` format that Bifrost expects: `ollama/qwen3:4b`, `ollama/qwen3:8b`, `ollama/qwen2.5:1.5b`. Bifrost strips the `ollama/` prefix before forwarding to Ollama.
`VRAMManager` bypasses Bifrost and talks directly to Ollama via `OLLAMA_BASE_URL` (host:11436) for flush/poll/prewarm operations — Bifrost cannot manage GPU VRAM.
### Three-tier routing (`router.py`, `agent.py`)
| Tier | Model (env var) | Trigger |
|------|-----------------|---------|
| light | `qwen2.5:1.5b` (`DEEPAGENTS_ROUTER_MODEL`) | Regex pre-match or LLM classifies "light" — answered by router model directly, no agent invoked |
| medium | `qwen3:4b` (`DEEPAGENTS_MODEL`) | Default for tool-requiring queries |
| complex | `qwen3:8b` (`DEEPAGENTS_COMPLEX_MODEL`) | `/think ` prefix only |
The router does regex pre-classification first, then LLM classification. Complex tier is blocked unless the message starts with `/think ` — any LLM classification of "complex" is downgraded to medium.
A global `asyncio.Semaphore(1)` (`_reply_semaphore`) serializes all LLM inference — one request at a time.
### Thinking mode and streaming
qwen3 models produce chain-of-thought `<think>...</think>` tokens. Handling differs by tier:
- **Medium** (`qwen3:4b`): streams via `astream()`. A state machine (`in_think` flag) filters `<think>` blocks in real time — only non-think tokens are pushed to `_stream_queues` and displayed to the user.
- **Complex** (`qwen3:8b`): `create_deep_agent` returns a complete reply; `_strip_think()` filters think blocks before the reply is pushed as a single chunk.
- **Router/light** (`qwen2.5:1.5b`): no thinking support; `_strip_think()` used defensively.
`_strip_think()` in `agent.py` and `router.py` strips any `<think>` blocks from non-streaming output.
### VRAM management (`vram_manager.py`)
Hardware: GTX 1070 (8 GB). Before running the 8b model, medium models are flushed via Ollama `keep_alive=0`, then `/api/ps` is polled (15s timeout) to confirm eviction. On timeout, falls back to medium tier. After complex reply, 8b is flushed and medium models are pre-warmed as a background task.
### Channel adapters (`channels.py`)
- **Telegram**: Grammy Node.js bot (`grammy/bot.mjs`) long-polls Telegram → `POST /message`; replies delivered via `POST grammy:3001/send`
- **CLI**: `cli.py` (Docker container, `profiles: [tools]`) posts to `/message`, then streams from `GET /stream/{session_id}` SSE with Rich `Live` display and final Markdown render.
Session IDs: `tg-<chat_id>` for Telegram, `cli-<username>` for CLI. Conversation history: 5-turn buffer per session.
### Services (`docker-compose.yml`)
| Service | Port | Role |
|---------|------|------|
| `bifrost` | 8080 | LLM gateway — retries, failover, observability; config from `bifrost-config.json` |
| `deepagents` | 8000 | FastAPI gateway + agent core |
| `openmemory` | 8765 | FastMCP server + mem0 memory tools (Qdrant-backed) |
| `grammy` | 3001 | grammY Telegram bot + `/send` HTTP endpoint |
| `crawl4ai` | 11235 | JS-rendered page fetching |
| `cli` | — | Interactive CLI container (`profiles: [tools]`), Rich streaming display |
External (from `openai/` stack, host ports):
- Ollama GPU: `11436` — all reply inference (via Bifrost) + VRAM management (direct)
- Ollama CPU: `11435` — nomic-embed-text embeddings for openmemory
- Qdrant: `6333` — vector store for memories
- SearXNG: `11437` — web search
### Bifrost config (`bifrost-config.json`)
The file is mounted into the bifrost container at `/app/data/config.json`. It declares one Ollama provider key pointing to `host.docker.internal:11436` with 2 retries and 300s timeout. To add fallback providers or adjust weights, edit this file and restart the bifrost container.
### Crawl4AI integration
Crawl4AI is embedded at all levels of the pipeline:
- **Pre-routing (all tiers)**: `_fetch_urls_from_message()` detects URLs in any message via `_URL_RE`, fetches up to 3 URLs concurrently with `_crawl4ai_fetch_async()` (async httpx). URL content is injected as a system context block into enriched history before routing, and into the system prompt for medium/complex agents.
- **Tier upgrade**: if URL content is successfully fetched, light tier is upgraded to medium (light model cannot process page content).
- **Complex agent tools**: `web_search` (SearXNG + Crawl4AI auto-fetch of top 2 results) and `fetch_url` (single-URL Crawl4AI fetch) remain available for the complex agent's agentic loop. Complex tier also receives the pre-fetched content in system prompt to avoid redundant re-fetching.
MCP tools from openmemory (`add_memory`, `search_memory`, `get_all_memories`) are **excluded** from agent tools — memory management is handled outside the agent loop.
### Medium vs Complex agent
| Agent | Builder | Speed | Use case |
|-------|---------|-------|----------|
| medium | `_DirectModel` (single LLM call, no tools) | ~3s | General questions, conversation |
| complex | `create_deep_agent` (deepagents) | Slow — multi-step planner | Deep research via `/think` prefix |
### Key files
- `agent.py` — FastAPI app, lifespan wiring, `run_agent_task()`, Crawl4AI pre-fetch, memory pipeline, all endpoints
- `bifrost-config.json` — Bifrost provider config (Ollama GPU, retries, timeouts)
- `channels.py` — channel registry and `deliver()` dispatcher
- `router.py``Router` class: regex + LLM classification, light-tier reply generation
- `vram_manager.py``VRAMManager`: flush/poll/prewarm Ollama VRAM directly
- `agent_factory.py``build_medium_agent` (`_DirectModel`, single call) / `build_complex_agent` (`create_deep_agent`)
- `openmemory/server.py` — FastMCP + mem0 config with custom extraction/dedup prompts
- `wiki_research.py` — batch research pipeline using `/message` + SSE polling
- `grammy/bot.mjs` — Telegram long-poll + HTTP `/send` endpoint

View File

@@ -2,7 +2,7 @@ FROM python:3.12-slim
WORKDIR /app
RUN pip install --no-cache-dir deepagents langchain-ollama langgraph \
RUN pip install --no-cache-dir deepagents langchain-openai langgraph \
fastapi uvicorn langchain-mcp-adapters langchain-community httpx
COPY agent.py channels.py vram_manager.py router.py agent_factory.py hello_world.py .

9
Dockerfile.cli Normal file
View File

@@ -0,0 +1,9 @@
FROM python:3.12-slim
WORKDIR /app
RUN pip install --no-cache-dir rich
COPY cli.py .
CMD ["python3", "cli.py"]

269
agent.py
View File

@@ -10,7 +10,13 @@ from pydantic import BaseModel
import re as _re
import httpx as _httpx
from langchain_ollama import ChatOllama
_URL_RE = _re.compile(r'https?://[^\s<>"\']+')
def _extract_urls(text: str) -> list[str]:
return _URL_RE.findall(text)
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_community.utilities import SearxSearchWrapper
from langchain_core.tools import Tool
@@ -20,8 +26,12 @@ from router import Router
from agent_factory import build_medium_agent, build_complex_agent
import channels
# Bifrost gateway — all LLM inference goes through here
BIFROST_URL = os.getenv("BIFROST_URL", "http://bifrost:8080/v1")
# Direct Ollama URL — used only by VRAMManager for flush/prewarm/poll
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
ROUTER_MODEL = os.getenv("DEEPAGENTS_ROUTER_MODEL", "qwen2.5:0.5b")
ROUTER_MODEL = os.getenv("DEEPAGENTS_ROUTER_MODEL", "qwen2.5:1.5b")
MEDIUM_MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:4b")
COMPLEX_MODEL = os.getenv("DEEPAGENTS_COMPLEX_MODEL", "qwen3:8b")
SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437")
@@ -31,10 +41,59 @@ CRAWL4AI_URL = os.getenv("CRAWL4AI_URL", "http://crawl4ai:11235")
MAX_HISTORY_TURNS = 5
_conversation_buffers: dict[str, list] = {}
# Per-session streaming queues — filled during inference, read by /stream/{session_id}
_stream_queues: dict[str, asyncio.Queue] = {}
async def _push_stream_chunk(session_id: str, chunk: str) -> None:
q = _stream_queues.setdefault(session_id, asyncio.Queue())
await q.put(chunk)
async def _end_stream(session_id: str) -> None:
q = _stream_queues.setdefault(session_id, asyncio.Queue())
await q.put("[DONE]")
async def _crawl4ai_fetch_async(url: str) -> str:
"""Async fetch via Crawl4AI — JS-rendered, bot-bypass, returns clean markdown."""
try:
async with _httpx.AsyncClient(timeout=60) as client:
r = await client.post(f"{CRAWL4AI_URL}/crawl", json={"urls": [url]})
r.raise_for_status()
results = r.json().get("results", [])
if not results or not results[0].get("success"):
return ""
md_obj = results[0].get("markdown") or {}
md = md_obj.get("raw_markdown") if isinstance(md_obj, dict) else str(md_obj)
return (md or "")[:5000]
except Exception as e:
return f"[fetch error: {e}]"
async def _fetch_urls_from_message(message: str) -> str:
"""If message contains URLs, fetch their content concurrently via Crawl4AI.
Returns a formatted context block, or '' if no URLs or all fetches fail."""
urls = _extract_urls(message)
if not urls:
return ""
# Fetch up to 3 URLs concurrently
results = await asyncio.gather(*[_crawl4ai_fetch_async(u) for u in urls[:3]])
parts = []
for url, content in zip(urls[:3], results):
if content and not content.startswith("[fetch error"):
parts.append(f"### {url}\n{content[:3000]}")
if not parts:
return ""
return "User's message contains URLs. Fetched content:\n\n" + "\n\n".join(parts)
# /no_think at the start of the system prompt disables qwen3 chain-of-thought.
# create_deep_agent prepends our system_prompt before BASE_AGENT_PROMPT, so
# /no_think lands at position 0 and is respected by qwen3 models via Ollama.
MEDIUM_SYSTEM_PROMPT = (
"You are a helpful AI assistant. "
"Use web_search for questions about current events or facts you don't know. "
"Reply concisely."
"You are a helpful AI assistant. Reply concisely. "
"If asked to remember a fact or name, simply confirm: 'Got it, I'll remember that.'"
)
COMPLEX_SYSTEM_PROMPT = (
@@ -49,11 +108,14 @@ COMPLEX_SYSTEM_PROMPT = (
"NEVER invent URLs. End with: **Sources checked: N**"
)
medium_model = None
medium_agent = None
complex_agent = None
router: Router = None
vram_manager: VRAMManager = None
mcp_client = None
_memory_add_tool = None
_memory_search_tool = None
# GPU mutex: one LLM inference at a time
_reply_semaphore = asyncio.Semaphore(1)
@@ -61,21 +123,34 @@ _reply_semaphore = asyncio.Semaphore(1)
@asynccontextmanager
async def lifespan(app: FastAPI):
global medium_agent, complex_agent, router, vram_manager, mcp_client
global medium_model, medium_agent, complex_agent, router, vram_manager, mcp_client, \
_memory_add_tool, _memory_search_tool
# Register channel adapters
channels.register_defaults()
# Three model instances
router_model = ChatOllama(
model=ROUTER_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=4096,
# All three models route through Bifrost → Ollama GPU.
# Bifrost adds retry logic, observability, and failover.
# Model names use provider/model format: Bifrost strips the "ollama/" prefix
# before forwarding to Ollama's /v1/chat/completions endpoint.
router_model = ChatOpenAI(
model=f"ollama/{ROUTER_MODEL}",
base_url=BIFROST_URL,
api_key="dummy",
temperature=0,
timeout=30,
)
medium_model = ChatOllama(
model=MEDIUM_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=8192
medium_model = ChatOpenAI(
model=f"ollama/{MEDIUM_MODEL}",
base_url=BIFROST_URL,
api_key="dummy",
timeout=180,
)
complex_model = ChatOllama(
model=COMPLEX_MODEL, base_url=OLLAMA_BASE_URL, think=True, num_ctx=16384
complex_model = ChatOpenAI(
model=f"ollama/{COMPLEX_MODEL}",
base_url=BIFROST_URL,
api_key="dummy",
timeout=600,
)
vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL)
@@ -97,6 +172,13 @@ async def lifespan(app: FastAPI):
agent_tools = [t for t in mcp_tools if t.name not in ("add_memory", "search_memory", "get_all_memories")]
# Expose memory tools directly so run_agent_task can call them outside the agent loop
for t in mcp_tools:
if t.name == "add_memory":
_memory_add_tool = t
elif t.name == "search_memory":
_memory_search_tool = t
searx = SearxSearchWrapper(searx_host=SEARXNG_URL)
def _crawl4ai_fetch(url: str) -> str:
@@ -187,13 +269,15 @@ async def lifespan(app: FastAPI):
)
print(
f"[agent] three-tier: router={ROUTER_MODEL} | medium={MEDIUM_MODEL} | complex={COMPLEX_MODEL}",
f"[agent] bifrost={BIFROST_URL} | router=ollama/{ROUTER_MODEL} | "
f"medium=ollama/{MEDIUM_MODEL} | complex=ollama/{COMPLEX_MODEL}",
flush=True,
)
print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True)
yield
medium_model = None
medium_agent = None
complex_agent = None
router = None
@@ -222,13 +306,19 @@ class ChatRequest(BaseModel):
# ── helpers ────────────────────────────────────────────────────────────────────
def _strip_think(text: str) -> str:
"""Strip qwen3 chain-of-thought blocks that appear inline in content
when using Ollama's OpenAI-compatible endpoint (/v1/chat/completions)."""
return _re.sub(r"<think>.*?</think>", "", text, flags=_re.DOTALL).strip()
def _extract_final_text(result) -> str | None:
msgs = result.get("messages", [])
for m in reversed(msgs):
if type(m).__name__ == "AIMessage" and getattr(m, "content", ""):
return m.content
return _strip_think(m.content)
if isinstance(result, dict) and result.get("output"):
return result["output"]
return _strip_think(result["output"])
return None
@@ -244,6 +334,34 @@ def _log_messages(result):
print(f"[agent] {role}{tc['name']}({tc['args']})", flush=True)
# ── memory helpers ─────────────────────────────────────────────────────────────
async def _store_memory(session_id: str, user_msg: str, assistant_reply: str) -> None:
"""Store a conversation turn in openmemory (runs as a background task)."""
if _memory_add_tool is None:
return
t0 = time.monotonic()
try:
text = f"User: {user_msg}\nAssistant: {assistant_reply}"
await _memory_add_tool.ainvoke({"text": text, "user_id": session_id})
print(f"[memory] stored in {time.monotonic() - t0:.1f}s", flush=True)
except Exception as e:
print(f"[memory] error: {e}", flush=True)
async def _retrieve_memories(message: str, session_id: str) -> str:
"""Search openmemory for relevant context. Returns formatted string or ''."""
if _memory_search_tool is None:
return ""
try:
result = await _memory_search_tool.ainvoke({"query": message, "user_id": session_id})
if result and result.strip() and result.strip() != "[]":
return f"Relevant memories:\n{result}"
except Exception:
pass
return ""
# ── core task ──────────────────────────────────────────────────────────────────
async def run_agent_task(message: str, session_id: str, channel: str = "telegram"):
@@ -261,7 +379,28 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
history = _conversation_buffers.get(session_id, [])
print(f"[agent] running: {clean_message[:80]!r}", flush=True)
tier, light_reply = await router.route(clean_message, history, force_complex)
# Fetch URL content and memories concurrently — both are IO-bound, neither needs GPU
url_context, memories = await asyncio.gather(
_fetch_urls_from_message(clean_message),
_retrieve_memories(clean_message, session_id),
)
if url_context:
print(f"[agent] crawl4ai: {len(url_context)} chars fetched from message URLs", flush=True)
# Build enriched history: memories + url_context as system context for ALL tiers
enriched_history = list(history)
if url_context:
enriched_history = [{"role": "system", "content": url_context}] + enriched_history
if memories:
enriched_history = [{"role": "system", "content": memories}] + enriched_history
tier, light_reply = await router.route(clean_message, enriched_history, force_complex)
# Messages with URL content must be handled by at least medium tier
if url_context and tier == "light":
tier = "medium"
light_reply = None
print("[agent] URL in message → upgraded light→medium", flush=True)
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
final_text = None
@@ -270,34 +409,70 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
final_text = light_reply
llm_elapsed = time.monotonic() - t0
print(f"[agent] light path: answered by router", flush=True)
await _push_stream_chunk(session_id, final_text)
await _end_stream(session_id)
elif tier == "medium":
system_prompt = MEDIUM_SYSTEM_PROMPT
result = await medium_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
*history,
{"role": "user", "content": clean_message},
]
})
if memories:
system_prompt = system_prompt + "\n\n" + memories
if url_context:
system_prompt = system_prompt + "\n\n" + url_context
# Stream tokens directly — filter out qwen3 <think> blocks
in_think = False
response_parts = []
async for chunk in medium_model.astream([
{"role": "system", "content": system_prompt},
*history,
{"role": "user", "content": clean_message},
]):
token = chunk.content or ""
if not token:
continue
if in_think:
if "</think>" in token:
in_think = False
after = token.split("</think>", 1)[1]
if after:
await _push_stream_chunk(session_id, after)
response_parts.append(after)
else:
if "<think>" in token:
in_think = True
before = token.split("<think>", 1)[0]
if before:
await _push_stream_chunk(session_id, before)
response_parts.append(before)
else:
await _push_stream_chunk(session_id, token)
response_parts.append(token)
await _end_stream(session_id)
llm_elapsed = time.monotonic() - t0
_log_messages(result)
final_text = _extract_final_text(result)
final_text = "".join(response_parts).strip() or None
else: # complex
ok = await vram_manager.enter_complex_mode()
if not ok:
print("[agent] complex→medium fallback (eviction timeout)", flush=True)
tier = "medium"
system_prompt = MEDIUM_SYSTEM_PROMPT
if memories:
system_prompt = system_prompt + "\n\n" + memories
if url_context:
system_prompt = system_prompt + "\n\n" + url_context
result = await medium_agent.ainvoke({
"messages": [
{"role": "system", "content": MEDIUM_SYSTEM_PROMPT},
{"role": "system", "content": system_prompt},
*history,
{"role": "user", "content": clean_message},
]
})
else:
system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id)
if url_context:
system_prompt = system_prompt + "\n\n[Pre-fetched URL content from user's message:]\n" + url_context
result = await complex_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
@@ -310,17 +485,24 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
llm_elapsed = time.monotonic() - t0
_log_messages(result)
final_text = _extract_final_text(result)
if final_text:
await _push_stream_chunk(session_id, final_text)
await _end_stream(session_id)
except Exception as e:
import traceback
llm_elapsed = time.monotonic() - t0
print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True)
traceback.print_exc()
await _end_stream(session_id)
# Deliver reply through the originating channel
if final_text:
t1 = time.monotonic()
await channels.deliver(session_id, channel, final_text)
try:
await channels.deliver(session_id, channel, final_text)
except Exception as e:
print(f"[agent] delivery error (non-fatal): {e}", flush=True)
send_elapsed = time.monotonic() - t1
print(
f"[agent] replied in {time.monotonic() - t0:.1f}s "
@@ -331,12 +513,13 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
else:
print("[agent] warning: no text reply from agent", flush=True)
# Update conversation buffer
# Update conversation buffer and schedule memory storage
if final_text:
buf = _conversation_buffers.get(session_id, [])
buf.append({"role": "user", "content": clean_message})
buf.append({"role": "assistant", "content": final_text})
_conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):]
asyncio.create_task(_store_memory(session_id, clean_message, final_text))
# ── endpoints ──────────────────────────────────────────────────────────────────
@@ -374,13 +557,39 @@ async def reply_stream(session_id: str):
try:
text = await asyncio.wait_for(q.get(), timeout=900)
# Escape newlines so entire reply fits in one SSE data line
yield f"data: {text.replace(chr(10), '\\n').replace(chr(13), '')}\n\n"
yield f"data: {text.replace(chr(10), chr(92) + 'n').replace(chr(13), '')}\n\n"
except asyncio.TimeoutError:
yield "data: [timeout]\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.get("/stream/{session_id}")
async def stream_reply(session_id: str):
"""
SSE endpoint — streams reply tokens as they are generated.
Each chunk: data: <token>\\n\\n
Signals completion: data: [DONE]\\n\\n
Medium tier: real token-by-token streaming (think blocks filtered out).
Light and complex tiers: full reply delivered as one chunk then [DONE].
"""
q = _stream_queues.setdefault(session_id, asyncio.Queue())
async def event_generator():
try:
while True:
chunk = await asyncio.wait_for(q.get(), timeout=900)
escaped = chunk.replace("\n", "\\n").replace("\r", "")
yield f"data: {escaped}\n\n"
if chunk == "[DONE]":
break
except asyncio.TimeoutError:
yield "data: [DONE]\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.get("/health")
async def health():
return {"status": "ok", "agent_ready": medium_agent is not None}

View File

@@ -1,13 +1,21 @@
from deepagents import create_deep_agent
class _DirectModel:
"""Thin wrapper: single LLM call, no tools, same ainvoke interface as a graph."""
def __init__(self, model):
self._model = model
async def ainvoke(self, input_dict: dict) -> dict:
messages = input_dict["messages"]
response = await self._model.ainvoke(messages)
return {"messages": list(messages) + [response]}
def build_medium_agent(model, agent_tools: list, system_prompt: str):
"""Medium agent: create_deep_agent with TodoList planning, no subagents."""
return create_deep_agent(
model=model,
tools=agent_tools,
system_prompt=system_prompt,
)
"""Medium agent: single LLM call, no tools — fast ~3s response."""
return _DirectModel(model)
def build_complex_agent(model, agent_tools: list, system_prompt: str):

58
bifrost-config.json Normal file
View File

@@ -0,0 +1,58 @@
{
"client": {
"drop_excess_requests": false
},
"providers": {
"ollama": {
"keys": [
{
"name": "ollama-gpu",
"value": "dummy",
"models": [
"qwen2.5:0.5b",
"qwen2.5:1.5b",
"qwen3:4b",
"gemma3:4b",
"qwen3:8b"
],
"weight": 1.0
}
],
"network_config": {
"base_url": "http://host.docker.internal:11436",
"default_request_timeout_in_seconds": 300,
"max_retries": 2,
"retry_backoff_initial_ms": 500,
"retry_backoff_max_ms": 10000
}
},
"ollama-cpu": {
"keys": [
{
"name": "ollama-cpu-key",
"value": "dummy",
"models": [
"gemma3:1b",
"qwen2.5:1.5b",
"qwen2.5:3b"
],
"weight": 1.0
}
],
"network_config": {
"base_url": "http://host.docker.internal:11435",
"default_request_timeout_in_seconds": 120,
"max_retries": 2,
"retry_backoff_initial_ms": 500,
"retry_backoff_max_ms": 10000
},
"custom_provider_config": {
"base_provider_type": "openai",
"allowed_requests": {
"chat_completion": true,
"chat_completion_stream": true
}
}
}
}
}

61
cli.py
View File

@@ -1,9 +1,9 @@
#!/usr/bin/env python3
"""
Adolf CLI — interactive REPL for the multi-channel gateway.
Adolf CLI — interactive REPL with Rich streaming display.
Usage:
python3 cli.py [--url http://localhost:8000] [--session cli-alvis]
python3 cli.py [--url http://deepagents:8000] [--session cli-alvis]
"""
import argparse
@@ -12,7 +12,13 @@ import os
import sys
import urllib.request
GATEWAY = "http://localhost:8000"
from rich.console import Console
from rich.live import Live
from rich.markdown import Markdown
from rich.text import Text
GATEWAY = "http://deepagents:8000"
console = Console()
def post_message(gateway: str, text: str, session_id: str) -> None:
@@ -20,7 +26,7 @@ def post_message(gateway: str, text: str, session_id: str) -> None:
"text": text,
"session_id": session_id,
"channel": "cli",
"user_id": os.getlogin(),
"user_id": os.getenv("USER", "user"),
}).encode()
req = urllib.request.Request(
f"{gateway}/message",
@@ -30,33 +36,49 @@ def post_message(gateway: str, text: str, session_id: str) -> None:
)
with urllib.request.urlopen(req, timeout=10) as r:
if r.status != 202:
print(f"[error] gateway returned {r.status}", file=sys.stderr)
console.print(f"[red][error] gateway returned {r.status}[/red]")
sys.exit(1)
def wait_for_reply(gateway: str, session_id: str, timeout: int = 400) -> str:
"""Open SSE stream and return first data event."""
def stream_reply(gateway: str, session_id: str, timeout: int = 400) -> str:
"""
Open the /stream/{session_id} SSE endpoint and display tokens live with
Rich. Returns the full assembled reply text.
"""
req = urllib.request.Request(
f"{gateway}/reply/{session_id}",
f"{gateway}/stream/{session_id}",
headers={"Accept": "text/event-stream"},
)
buffer = ""
with urllib.request.urlopen(req, timeout=timeout + 5) as r:
for raw_line in r:
line = raw_line.decode("utf-8").rstrip("\n")
if line.startswith("data:"):
return line[5:].strip().replace("\\n", "\n")
return ""
with Live(Text(""), console=console, refresh_per_second=20, transient=True) as live:
for raw_line in r:
line = raw_line.decode("utf-8").rstrip("\n")
if not line.startswith("data:"):
continue
chunk = line[5:].strip()
if chunk == "[DONE]":
break
chunk = chunk.replace("\\n", "\n")
buffer += chunk
live.update(Text(buffer))
# Render the complete reply as Markdown once streaming is done
console.print(Markdown(buffer))
return buffer
def main():
parser = argparse.ArgumentParser(description="Adolf CLI")
parser.add_argument("--url", default=GATEWAY, help="Gateway URL")
parser.add_argument("--session", default=f"cli-{os.getlogin()}", help="Session ID")
parser.add_argument("--session", default=f"cli-{os.getenv('USER', 'user')}",
help="Session ID")
parser.add_argument("--timeout", type=int, default=400, help="Reply timeout (seconds)")
args = parser.parse_args()
print(f"Adolf CLI (session={args.session}, gateway={args.url})")
print("Type your message and press Enter. Ctrl+C or Ctrl+D to exit.\n")
console.print(f"[bold]Adolf CLI[/bold] (session=[cyan]{args.session}[/cyan], "
f"gateway=[cyan]{args.url}[/cyan])")
console.print("Type your message and press Enter. Ctrl+C or Ctrl+D to exit.\n")
try:
while True:
@@ -68,12 +90,11 @@ def main():
continue
post_message(args.url, text, args.session)
print("...", end="", flush=True)
reply = wait_for_reply(args.url, args.session, timeout=args.timeout)
print(f"\r{reply}\n")
stream_reply(args.url, args.session, timeout=args.timeout)
console.print()
except KeyboardInterrupt:
print("\nbye")
console.print("\n[dim]bye[/dim]")
if __name__ == "__main__":

View File

@@ -1,4 +1,19 @@
services:
bifrost:
image: maximhq/bifrost
container_name: bifrost
ports:
- "8080:8080"
volumes:
- ./bifrost-config.json:/app/data/config.json:ro
environment:
- APP_DIR=/app/data
- APP_PORT=8080
- LOG_LEVEL=info
extra_hosts:
- "host.docker.internal:host-gateway"
restart: unless-stopped
deepagents:
build: .
container_name: deepagents
@@ -6,6 +21,9 @@ services:
- "8000:8000"
environment:
- PYTHONUNBUFFERED=1
# Bifrost gateway — all LLM inference goes through here
- BIFROST_URL=http://bifrost:8080/v1
# Direct Ollama GPU URL — used only by VRAMManager for flush/prewarm
- OLLAMA_BASE_URL=http://host.docker.internal:11436
- DEEPAGENTS_MODEL=qwen3:4b
- DEEPAGENTS_COMPLEX_MODEL=qwen3:8b
@@ -19,6 +37,7 @@ services:
- openmemory
- grammy
- crawl4ai
- bifrost
restart: unless-stopped
openmemory:
@@ -27,8 +46,9 @@ services:
ports:
- "8765:8765"
environment:
# Extraction LLM (qwen2.5:1.5b) runs on GPU after reply — fast 2-5s extraction
# Extraction LLM runs on GPU — qwen2.5:1.5b for speed (~3s)
- OLLAMA_GPU_URL=http://host.docker.internal:11436
- OLLAMA_EXTRACTION_MODEL=qwen2.5:1.5b
# Embedding (nomic-embed-text) runs on CPU — fast enough for search (50-150ms)
- OLLAMA_CPU_URL=http://host.docker.internal:11435
extra_hosts:
@@ -45,6 +65,20 @@ services:
- DEEPAGENTS_URL=http://deepagents:8000
restart: unless-stopped
cli:
build:
context: .
dockerfile: Dockerfile.cli
container_name: cli
environment:
- DEEPAGENTS_URL=http://deepagents:8000
depends_on:
- deepagents
stdin_open: true
tty: true
profiles:
- tools
crawl4ai:
image: unclecode/crawl4ai:latest
container_name: crawl4ai

View File

@@ -6,6 +6,7 @@ from mem0 import Memory
# Extraction LLM — GPU Ollama (qwen3:4b, same model as medium agent)
# Runs after reply when GPU is idle; spin-wait in agent.py prevents contention
OLLAMA_GPU_URL = os.getenv("OLLAMA_GPU_URL", "http://host.docker.internal:11436")
EXTRACTION_MODEL = os.getenv("OLLAMA_EXTRACTION_MODEL", "qwen2.5:1.5b")
# Embedding — CPU Ollama (nomic-embed-text, 137 MB RAM)
# Used for both search (50-150ms, acceptable) and store-time embedding
@@ -94,7 +95,7 @@ config = {
"llm": {
"provider": "ollama",
"config": {
"model": "qwen3:4b",
"model": EXTRACTION_MODEL,
"ollama_base_url": OLLAMA_GPU_URL,
"temperature": 0.1, # consistent JSON output
},

4
pytest.ini Normal file
View File

@@ -0,0 +1,4 @@
[pytest]
testpaths = tests/unit
pythonpath = .
asyncio_mode = auto

File diff suppressed because it is too large Load Diff

0
tests/__init__.py Normal file
View File

View File

273
tests/integration/common.py Normal file
View File

@@ -0,0 +1,273 @@
"""
Shared config, helpers, and utilities for Adolf integration tests.
"""
import http.client
import json
import re
import subprocess
import time
import urllib.request
# ── config ────────────────────────────────────────────────────────────────────
DEEPAGENTS = "http://localhost:8000"
BIFROST = "http://localhost:8080"
OPENMEMORY = "http://localhost:8765"
GRAMMY_HOST = "localhost"
GRAMMY_PORT = 3001
OLLAMA_GPU = "http://localhost:11436"
OLLAMA_CPU = "http://localhost:11435"
QDRANT = "http://localhost:6333"
SEARXNG = "http://localhost:11437"
COMPOSE_FILE = "/home/alvis/adolf/docker-compose.yml"
DEFAULT_CHAT_ID = "346967270"
NAMES = [
"Maximilian", "Cornelius", "Zephyr", "Archibald", "Balthazar",
"Ignatius", "Lysander", "Octavian", "Reginald", "Sylvester",
]
BENCHMARK = {
"easy": [
"hi",
"what is 2+2?",
"what is the capital of France?",
"tell me a short joke",
"how are you doing today?",
"thanks!",
"what day comes after Wednesday?",
"name the three primary colors",
"is the sky blue?",
"what does CPU stand for?",
],
"medium": [
"what is the current weather in Berlin?",
"find the latest news about artificial intelligence",
"what is the current price of Bitcoin?",
"search for a good pasta carbonara recipe",
"what movies are in theaters this week?",
"find Python tutorials for beginners",
"who won the last FIFA World Cup?",
"do you remember what we talked about before?",
"search for the best coffee shops in Tokyo",
"what is happening in the tech industry this week?",
"what's the weather like today?",
],
"hard": [
"/think compare the top 3 Python web frameworks (Django, FastAPI, Flask) and recommend one for a production REST API",
"/think research the history of artificial intelligence and create a timeline of key milestones",
"/think plan a 7-day trip to Japan with daily itinerary, accommodation suggestions, and budget breakdown",
"/think analyze microservices vs monolithic architecture: pros, cons, and when to choose each",
"/think write a Python script that reads a CSV file, cleans the data, and generates summary statistics",
"/think research quantum computing: explain the key concepts and how it differs from classical computing",
"/think compare PostgreSQL, MongoDB, and Redis — when to use each and what are the trade-offs?",
"/think create a comprehensive Docker deployment guide covering best practices for production",
"/think research climate change: summarize the latest IPCC findings and key data points",
"/think design a REST API with authentication, rate limiting, and proper error handling — provide architecture and code outline",
],
}
# ── terminal colours ──────────────────────────────────────────────────────────
PASS = "\033[32mPASS\033[0m"
FAIL = "\033[31mFAIL\033[0m"
INFO = "\033[36mINFO\033[0m"
WARN = "\033[33mWARN\033[0m"
# ── result helpers ────────────────────────────────────────────────────────────
def report(results: list, name: str, ok: bool, detail: str = ""):
tag = PASS if ok else FAIL
print(f" [{tag}] {name}" + (f"{detail}" if detail else ""))
results.append((name, ok))
def print_summary(results: list):
print(f"\n{''*55}")
total = len(results)
passed = sum(1 for _, ok in results if ok)
failed = total - passed
print(f"Results: {passed}/{total} passed", end="")
if failed:
print(f" ({failed} failed)\n")
print("Failed checks:")
for name, ok in results:
if not ok:
print(f" - {name}")
else:
print(" — all good")
print()
def tf(v):
"""Format timing value."""
return f"{v:6.2f}s" if v is not None else " n/a"
# ── HTTP helpers ──────────────────────────────────────────────────────────────
def get(url, timeout=5):
with urllib.request.urlopen(urllib.request.Request(url), timeout=timeout) as r:
return r.status, r.read().decode()
def post_json(url, payload, timeout=10):
data = json.dumps(payload).encode()
req = urllib.request.Request(
url, data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, json.loads(r.read().decode())
def check_sse(host, port, path):
try:
conn = http.client.HTTPConnection(host, port, timeout=5)
conn.request("GET", path, headers={"Accept": "text/event-stream"})
r = conn.getresponse()
conn.close()
return r.status == 200, f"HTTP {r.status}"
except Exception as e:
return False, str(e)
def qdrant_count():
try:
_, body = get(f"{QDRANT}/collections/adolf_memories")
return json.loads(body).get("result", {}).get("points_count", 0)
except Exception:
return 0
# ── log helpers ───────────────────────────────────────────────────────────────
def fetch_logs(since_s=600):
"""Return deepagents log lines from the last since_s seconds."""
try:
r = subprocess.run(
["docker", "compose", "-f", COMPOSE_FILE, "logs", "deepagents",
f"--since={int(since_s)}s", "--no-log-prefix"],
capture_output=True, text=True, timeout=15,
)
return r.stdout.splitlines()
except Exception:
return []
def fetch_bifrost_logs(since_s=120):
"""Return bifrost container log lines from the last since_s seconds."""
try:
r = subprocess.run(
["docker", "compose", "-f", COMPOSE_FILE, "logs", "bifrost",
f"--since={int(since_s)}s", "--no-log-prefix"],
capture_output=True, text=True, timeout=10,
)
return r.stdout.splitlines()
except Exception:
return []
def parse_run_block(lines, msg_prefix):
"""
Scan log lines for the LAST '[agent] running: <msg_prefix>' block.
Extracts reply timing, tier, and memory timing from that block.
Returns dict or None if the reply has not appeared in logs yet.
Dict keys:
reply_total, llm, send, tier, reply_text — from "[agent] replied in ..."
memory_s — from "[memory] stored in ..."
memory_error — True if "[memory] error" found
"""
search = msg_prefix[:50]
start_idx = None
for i, line in enumerate(lines):
if "[agent] running:" in line and search in line:
start_idx = i # keep updating — we want the LAST occurrence
if start_idx is None:
return None
block = lines[start_idx:]
last_ai_text = None
reply_data = None
for j, line in enumerate(block):
if "AIMessage:" in line and "" not in line:
txt = line.split("AIMessage:", 1)[-1].strip()
if txt:
last_ai_text = txt
m = re.search(r"replied in ([\d.]+)s \(llm=([\d.]+)s, send=([\d.]+)s\)", line)
if m:
tier_m = re.search(r"\btier=(\w+)", line)
tier = tier_m.group(1) if tier_m else "unknown"
reply_data = {
"reply_total": float(m.group(1)),
"llm": float(m.group(2)),
"send": float(m.group(3)),
"tier": tier,
"reply_text": last_ai_text,
"memory_s": None,
"memory_error": False,
"_j": j,
}
break
if reply_data is not None:
next_lines = block[reply_data["_j"] + 1: reply_data["_j"] + 3]
for line in next_lines:
if line.startswith("[agent] reply_text:"):
reply_data["reply_text"] = line[len("[agent] reply_text:"):].strip()
break
if reply_data is None:
return None
for line in block[reply_data["_j"] + 1:]:
mm = re.search(r"\[memory\] stored in ([\d.]+)s", line)
if mm:
reply_data["memory_s"] = float(mm.group(1))
break
if "[memory] error" in line:
reply_data["memory_error"] = True
break
return reply_data
def wait_for(label, msg_prefix, timeout_s=200, need_memory=True):
"""
Poll deepagents logs until the message is fully processed.
Shows a live progress line. Returns timing dict or None on timeout.
"""
t_start = time.monotonic()
deadline = t_start + timeout_s
tick = 0
last_result = None
while time.monotonic() < deadline:
since = int(time.monotonic() - t_start) + 90
lines = fetch_logs(since_s=since)
result = parse_run_block(lines, msg_prefix)
if result:
last_result = result
has_mem = result["memory_s"] is not None or result["memory_error"]
if (not need_memory) or has_mem:
elapsed = time.monotonic() - t_start
print(f"\r [{label}] done after {elapsed:.0f}s{' ' * 30}")
return result
time.sleep(4)
tick += 1
rem = int(deadline - time.monotonic())
if last_result:
phase = "waiting for memory..." if need_memory else "done"
else:
phase = "waiting for LLM reply..."
print(f"\r [{label}] {tick*4}s elapsed, {rem}s left — {phase} ", end="", flush=True)
print(f"\r [{label}] TIMEOUT after {timeout_s}s{' ' * 30}")
return None

View File

@@ -0,0 +1,214 @@
#!/usr/bin/env python3
"""
Adolf service health integration tests.
Checks:
1. deepagents /health — agent_ready
1b. openmemory /sse reachable
1c. grammy /sse reachable
2. Bifrost /health, /v1/models, direct inference, deepagents startup log
3. GPU Ollama — reachable, qwen3:8b present
4. CPU Ollama — reachable, nomic-embed-text present
5. Qdrant — reachable, adolf_memories collection, vector dims=768
6. SearXNG — reachable, JSON results, latency < 5s
Usage:
python3 test_health.py
"""
import json
import sys
import time
import urllib.request
from common import (
DEEPAGENTS, BIFROST, GRAMMY_HOST, GRAMMY_PORT,
OLLAMA_GPU, OLLAMA_CPU, QDRANT, SEARXNG, COMPOSE_FILE,
INFO, FAIL,
report, print_summary, tf,
get, post_json, check_sse, fetch_logs,
)
results = []
timings = {}
# ── 1. Service health ─────────────────────────────────────────────────────────
print(f"\n[{INFO}] 1. Service health")
t0 = time.monotonic()
try:
status, body = get(f"{DEEPAGENTS}/health")
data = json.loads(body)
ok = status == 200 and data.get("agent_ready") is True
report(results, "deepagents /health — agent_ready", ok,
f"agent_ready={data.get('agent_ready')}")
except Exception as e:
report(results, "deepagents /health", False, str(e))
ok, detail = check_sse("localhost", 8765, "/sse")
report(results, "openmemory /sse reachable", ok, detail)
ok, detail = check_sse(GRAMMY_HOST, GRAMMY_PORT, "/sse")
report(results, "grammy /sse reachable", ok, detail)
timings["health_check"] = time.monotonic() - t0
# ── 2. Bifrost gateway ────────────────────────────────────────────────────────
print(f"\n[{INFO}] 2. Bifrost gateway (port 8080)")
t0 = time.monotonic()
try:
status, body = get(f"{BIFROST}/health", timeout=5)
report(results, "Bifrost /health reachable", status == 200, f"HTTP {status}")
except Exception as e:
report(results, "Bifrost /health reachable", False, str(e))
try:
status, body = get(f"{BIFROST}/v1/models", timeout=5)
data = json.loads(body)
model_ids = [m.get("id", "") for m in data.get("data", [])]
gpu_models = [m for m in model_ids if m.startswith("ollama/")]
report(results, "Bifrost lists ollama GPU models", len(gpu_models) > 0,
f"found: {gpu_models}")
for expected in ["ollama/qwen3:4b", "ollama/qwen3:8b", "ollama/qwen2.5:1.5b"]:
report(results, f" model {expected} listed", expected in model_ids)
except Exception as e:
report(results, "Bifrost /v1/models", False, str(e))
print(f" [bifrost-infer] POST /v1/chat/completions → ollama/qwen2.5:0.5b ...")
t_infer = time.monotonic()
try:
infer_payload = {
"model": "ollama/qwen2.5:0.5b",
"messages": [{"role": "user", "content": "Reply with exactly one word: pong"}],
"max_tokens": 16,
}
data = json.dumps(infer_payload).encode()
req = urllib.request.Request(
f"{BIFROST}/v1/chat/completions",
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=60) as r:
infer_status = r.status
infer_body = json.loads(r.read().decode())
infer_elapsed = time.monotonic() - t_infer
reply_content = infer_body.get("choices", [{}])[0].get("message", {}).get("content", "")
used_model = infer_body.get("model", "")
report(results, "Bifrost → Ollama GPU inference succeeds",
infer_status == 200 and bool(reply_content),
f"{infer_elapsed:.1f}s model={used_model!r} reply={reply_content[:60]!r}")
timings["bifrost_direct_infer"] = infer_elapsed
except Exception as e:
report(results, "Bifrost → Ollama GPU inference succeeds", False, str(e))
timings["bifrost_direct_infer"] = None
try:
import subprocess
r = subprocess.run(
["docker", "compose", "-f", COMPOSE_FILE, "logs", "deepagents",
"--since=3600s", "--no-log-prefix"],
capture_output=True, text=True, timeout=10,
)
log_lines = r.stdout.splitlines()
bifrost_line = next(
(l for l in log_lines if "[agent] bifrost=" in l and "bifrost:8080" in l),
None,
)
report(results, "deepagents startup log confirms bifrost URL",
bifrost_line is not None,
bifrost_line.strip() if bifrost_line else "line not found in logs")
if bifrost_line:
has_prefix = "router=ollama/" in bifrost_line and "medium=ollama/" in bifrost_line
report(results, "deepagents model names use ollama/ prefix", has_prefix,
bifrost_line.strip())
except Exception as e:
report(results, "deepagents startup log check", False, str(e))
timings["bifrost_check"] = time.monotonic() - t0
# ── 3. GPU Ollama ─────────────────────────────────────────────────────────────
print(f"\n[{INFO}] 3. GPU Ollama (port 11436)")
t0 = time.monotonic()
try:
status, body = get(f"{OLLAMA_GPU}/api/tags")
models = [m["name"] for m in json.loads(body).get("models", [])]
has_qwen = any("qwen3" in m for m in models)
report(results, "GPU Ollama reachable", True, f"models: {models}")
report(results, "qwen3:8b present", has_qwen)
except Exception as e:
report(results, "GPU Ollama reachable", False, str(e))
report(results, "qwen3:8b present", False, "skipped")
timings["gpu_ollama_ping"] = time.monotonic() - t0
# ── 4. CPU Ollama ─────────────────────────────────────────────────────────────
print(f"\n[{INFO}] 4. CPU Ollama (port 11435)")
t0 = time.monotonic()
try:
status, body = get(f"{OLLAMA_CPU}/api/tags")
models = [m["name"] for m in json.loads(body).get("models", [])]
has_embed = any("nomic-embed-text" in m for m in models)
report(results, "CPU Ollama reachable", True, f"models: {models}")
report(results, "nomic-embed-text present", has_embed)
except Exception as e:
report(results, "CPU Ollama reachable", False, str(e))
report(results, "nomic-embed-text present", False, "skipped")
timings["cpu_ollama_ping"] = time.monotonic() - t0
# ── 5. Qdrant ─────────────────────────────────────────────────────────────────
print(f"\n[{INFO}] 5. Qdrant (port 6333)")
t0 = time.monotonic()
try:
status, body = get(f"{QDRANT}/collections")
cols = [c["name"] for c in json.loads(body).get("result", {}).get("collections", [])]
report(results, "Qdrant reachable", True, f"collections: {cols}")
report(results, "adolf_memories collection exists", "adolf_memories" in cols)
except Exception as e:
report(results, "Qdrant reachable", False, str(e))
report(results, "adolf_memories collection exists", False, "skipped")
try:
status, body = get(f"{QDRANT}/collections/adolf_memories")
info = json.loads(body).get("result", {})
dims = info.get("config", {}).get("params", {}).get("vectors", {}).get("size")
report(results, "vector dims = 768", dims == 768, f"got {dims}")
except Exception as e:
report(results, "adolf_memories collection info", False, str(e))
timings["qdrant_ping"] = time.monotonic() - t0
# ── 6. SearXNG ────────────────────────────────────────────────────────────────
print(f"\n[{INFO}] 6. SearXNG (port 11437)")
t0 = time.monotonic()
try:
status, body = get(f"{SEARXNG}/search?q=test&format=json", timeout=15)
elapsed = time.monotonic() - t0
n = len(json.loads(body).get("results", []))
report(results, "SearXNG reachable + JSON results", status == 200 and n > 0,
f"{n} results in {elapsed:.1f}s")
report(results, "SearXNG response < 5s", elapsed < 5, f"{elapsed:.2f}s")
timings["searxng_latency"] = elapsed
except Exception as e:
report(results, "SearXNG reachable", False, str(e))
report(results, "SearXNG response < 5s", False, "skipped")
timings["searxng_latency"] = None
timings["searxng_check"] = time.monotonic() - t0
# ── summary ───────────────────────────────────────────────────────────────────
print_summary(results)
sys.exit(0 if all(ok for _, ok in results) else 1)

View File

@@ -0,0 +1,438 @@
#!/usr/bin/env python3
"""
Adolf memory integration tests.
Tests:
1. Name store — POST "remember that your name is <RandomName>"
2. Qdrant point — verifies a new vector was written after store
3. Name recall — POST "what is your name?" → reply must contain <RandomName>
4. Bifrost — verifies store/recall requests passed through Bifrost
5. Timing profile — breakdown of store and recall latencies
6. Memory benchmark — store 5 personal facts, recall with 10 questions
7. Dedup test — same fact stored twice must not grow Qdrant by 2 points
Usage:
python3 test_memory.py [--chat-id CHAT_ID] [--name-only] [--bench-only] [--dedup-only]
"""
import argparse
import json
import random
import subprocess
import sys
import time
import urllib.request
from common import (
DEEPAGENTS, QDRANT, COMPOSE_FILE, DEFAULT_CHAT_ID,
NAMES,
INFO, PASS, FAIL, WARN,
report, print_summary, tf,
get, post_json, qdrant_count, fetch_logs, fetch_bifrost_logs,
parse_run_block, wait_for,
)
# ── args ──────────────────────────────────────────────────────────────────────
parser = argparse.ArgumentParser(description="Adolf memory integration tests")
parser.add_argument("--chat-id", default=DEFAULT_CHAT_ID)
parser.add_argument("--name-only", action="store_true", help="Run only the name store/recall test")
parser.add_argument("--bench-only", action="store_true", help="Run only the memory benchmark")
parser.add_argument("--dedup-only", action="store_true", help="Run only the deduplication test")
args = parser.parse_args()
CHAT_ID = args.chat_id
_only = args.name_only or args.bench_only or args.dedup_only
_run_name = not _only or args.name_only
_run_bench = not _only or args.bench_only
_run_dedup = not _only or args.dedup_only
results = []
timings = {}
random_name = random.choice(NAMES)
TEST_CHAT_ID = f"{CHAT_ID}-{random_name.lower()}"
if _run_name:
print(f"\n Test name : \033[1m{random_name}\033[0m")
print(f" Chat ID : {TEST_CHAT_ID}")
# ── 14. Name store / recall pipeline ────────────────────────────────────────
if _run_name:
print(f"\n[{INFO}] 1. Name store / recall pipeline")
store_msg = f"remember that your name is {random_name}"
recall_msg = "what is your name?"
# Clear memories so each run starts clean
try:
post_json(f"{QDRANT}/collections/adolf_memories/points/delete",
{"filter": {}}, timeout=5)
except Exception:
pass
pts_before = qdrant_count()
print(f" Qdrant points before: {pts_before}")
# ── 1. Store ──────────────────────────────────────────────────────────────
print(f"\n [store] '{store_msg}'")
t_store = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": store_msg, "chat_id": TEST_CHAT_ID}, timeout=5)
t_accept = time.monotonic() - t_store
report(results, "POST /chat (store) returns 202 immediately",
status == 202 and t_accept < 1, f"status={status}, t={t_accept:.3f}s")
timings["store_http_accept"] = t_accept
except Exception as e:
report(results, "POST /chat (store)", False, str(e))
print_summary(results)
sys.exit(1)
store = wait_for("store", store_msg, timeout_s=220, need_memory=True)
if store:
timings.update({
"store_llm": store["llm"],
"store_send": store["send"],
"store_reply": store["reply_total"],
"store_memory": store["memory_s"],
})
report(results, "Agent replied to store message", True,
f"{store['reply_total']:.1f}s total llm={store['llm']:.1f}s "
f"send={store['send']:.1f}s tier={store['tier']}")
if store["memory_s"] is not None:
report(results, "Memory stored without error", True, f"{store['memory_s']:.1f}s")
elif store["memory_error"]:
report(results, "Memory stored without error", False, "error in [memory] log")
else:
report(results, "Memory stored without error", False, "not found in logs")
print(f" Store reply: {store['reply_text']!r}")
else:
report(results, "Agent replied to store message", False, "timeout")
report(results, "Memory stored without error", False, "timeout")
print_summary(results)
sys.exit(1)
# ── 2. Qdrant point check ─────────────────────────────────────────────────
pts_after = qdrant_count()
new_pts = pts_after - pts_before
report(results, "New memory point(s) added to Qdrant", new_pts > 0,
f"{pts_before}{pts_after} (+{new_pts})")
timings["qdrant_new_points"] = new_pts
# ── 3. Recall ─────────────────────────────────────────────────────────────
print(f"\n [recall] '{recall_msg}'")
t_recall = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": recall_msg, "chat_id": TEST_CHAT_ID}, timeout=5)
t_accept2 = time.monotonic() - t_recall
report(results, "POST /chat (recall) returns 202 immediately",
status == 202 and t_accept2 < 1, f"status={status}, t={t_accept2:.3f}s")
timings["recall_http_accept"] = t_accept2
except Exception as e:
report(results, "POST /chat (recall)", False, str(e))
recall = wait_for("recall", recall_msg, timeout_s=160, need_memory=False)
if recall:
timings.update({
"recall_llm": recall["llm"],
"recall_send": recall["send"],
"recall_reply": recall["reply_total"],
})
report(results, "Agent replied to recall message", True,
f"{recall['reply_total']:.1f}s total llm={recall['llm']:.1f}s "
f"send={recall['send']:.1f}s tier={recall['tier']}")
reply_text = recall["reply_text"] or ""
name_in_reply = random_name.lower() in reply_text.lower()
report(results, f"Reply contains '{random_name}'", name_in_reply,
f"reply: {reply_text[:120]!r}")
else:
report(results, "Agent replied to recall message", False, "timeout")
report(results, f"Reply contains '{random_name}'", False, "no reply")
# ── 4. Bifrost pass-through check ─────────────────────────────────────────
bifrost_lines = fetch_bifrost_logs(since_s=300)
report(results, "Bifrost container has log output (requests forwarded)",
len(bifrost_lines) > 0, f"{len(bifrost_lines)} lines in bifrost logs")
bifrost_raw = "\n".join(bifrost_lines)
report(results, " Bifrost log shows AsyncOpenAI agent requests",
"AsyncOpenAI" in bifrost_raw,
f"{'found' if 'AsyncOpenAI' in bifrost_raw else 'NOT found'} in bifrost logs")
# ── 5. Timing profile ─────────────────────────────────────────────────────
print(f"\n[{INFO}] 5. Timing profile")
W = 36
print(f"\n {'Stage':<{W}} {'Time':>8}")
print(f" {''*W} {''*8}")
for label, key in [
("[GPU] HTTP accept — store turn", "store_http_accept"),
("[GPU] qwen3:Xb inference — store turn", "store_llm"),
("[GPU] Telegram send — store turn", "store_send"),
("[GPU] Total reply latency — store", "store_reply"),
("[GPU] qwen2.5:1.5b+embed — async mem", "store_memory"),
]:
print(f" {label:<{W}} {tf(timings.get(key)):>8}")
print(f" {''*W} {''*8}")
for label, key in [
("[GPU] HTTP accept — recall turn", "recall_http_accept"),
("[GPU] qwen3:Xb inference — recall", "recall_llm"),
("[GPU] Telegram send — recall turn", "recall_send"),
("[GPU] Total reply latency — recall", "recall_reply"),
]:
print(f" {label:<{W}} {tf(timings.get(key)):>8}")
print(f"\n Bottleneck analysis (each █ ≈ 5s):")
print(f" {''*(W+12)}")
candidates = [
("[GPU] qwen3:Xb — store reply ", timings.get("store_llm") or 0),
("[GPU] qwen3:Xb — recall reply", timings.get("recall_llm") or 0),
("[GPU] qwen2.5:1.5b+embed (async)", timings.get("store_memory") or 0),
]
candidates.sort(key=lambda x: x[1], reverse=True)
for label, t in candidates:
bar = "" * min(int(t / 5), 24)
total_pipeline = (timings.get("store_reply") or 0) + (timings.get("store_memory") or 0)
pct = f" {t/total_pipeline*100:4.0f}%" if total_pipeline > 0 else ""
print(f" {label} {t:6.1f}s {bar}{pct}")
print()
# ── 6. Memory benchmark ───────────────────────────────────────────────────────
if _run_bench:
_mem_name = random.choice(["Alice", "Bruno", "Camille", "Diego", "Elena",
"Farid", "Greta", "Hiroshi", "Irina", "Jonas"])
_mem_city = random.choice(["Tokyo", "Berlin", "Cairo", "Sydney", "Oslo",
"Nairobi", "Lisbon", "Seoul", "Montreal", "Bangkok"])
_mem_allergy = random.choice(["nuts", "gluten", "dairy", "shellfish", "eggs"])
_mem_job = random.choice([
("software engineer", "startup"),
("data scientist", "research lab"),
("product manager", "tech company"),
("DevOps engineer", "cloud provider"),
])
_mem_lang = random.choice(["Python", "Rust", "Go", "TypeScript", "Kotlin"])
_mem_pet_name = random.choice(["Whiskers", "Biscuit", "Mango", "Pebble", "Shadow",
"Noodle", "Cheddar", "Cosmo", "Pippin", "Ziggy"])
print(f"\n[{INFO}] 6. Memory benchmark")
print(f" name={_mem_name} city={_mem_city} allergy={_mem_allergy} "
f"job={_mem_job[0]}@{_mem_job[1]} lang={_mem_lang} pet={_mem_pet_name}")
print(f" Storing 5 facts, then querying with 10 recall questions")
print(f" Chat ID: {CHAT_ID}")
print()
# Wipe collection and restart openmemory for a clean slate
try:
req = urllib.request.Request(f"{QDRANT}/collections/adolf_memories", method="DELETE")
with urllib.request.urlopen(req, timeout=5):
pass
print(f" [{INFO}] Wiped adolf_memories collection")
except Exception as e:
print(f" [{WARN}] Could not wipe collection: {e}")
try:
subprocess.run(
["docker", "compose", "-f", COMPOSE_FILE, "restart", "openmemory"],
capture_output=True, timeout=30,
)
time.sleep(6)
print(f" [{INFO}] Restarted openmemory — fresh collection ready")
except Exception as e:
print(f" [{WARN}] Could not restart openmemory: {e}")
MEMORY_FACTS = [
f"My name is {_mem_name} and I live in {_mem_city}",
f"I prefer vegetarian food and I'm allergic to {_mem_allergy}",
f"I work as a {_mem_job[0]} at a {_mem_job[1]}",
f"My favorite programming language is {_mem_lang}",
f"I have a cat named {_mem_pet_name}",
]
MEMORY_RECALLS = [
("What is my name?", [_mem_name.lower()]),
("Where do I live?", [_mem_city.lower()]),
("Do I have any food allergies?", [_mem_allergy.lower()]),
("What is my job?", [_mem_job[0].split()[0].lower()]),
("What programming language do I prefer?", [_mem_lang.lower()]),
("Do I have any pets?", [_mem_pet_name.lower()]),
("Am I vegetarian or do I eat meat?", ["vegetarian"]),
("What city am I in?", [_mem_city.lower()]),
("Tell me what you know about me", [_mem_name.lower(), _mem_city.lower()]),
("What's the name of my pet?", [_mem_pet_name.lower()]),
]
STORE_TIMEOUT = 180
RECALL_TIMEOUT = 180
print(f" Storing {len(MEMORY_FACTS)} facts...")
store_ok = 0
for i, fact in enumerate(MEMORY_FACTS, 1):
print(f" [mem-store-{i:02d}] {fact!r}")
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": fact, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
print(f" → [{FAIL}] POST returned {status}")
continue
except Exception as e:
print(f" → [{FAIL}] POST error: {e}")
continue
found = wait_for(f"mem-store-{i:02d}", fact, timeout_s=STORE_TIMEOUT, need_memory=True)
if found:
store_ok += 1
print(f" → [{PASS}] stored tier={found['tier']} mem={found['memory_s']}s")
else:
print(f" → [{FAIL}] timeout")
report(results, f"All memory facts stored ({store_ok}/{len(MEMORY_FACTS)})",
store_ok == len(MEMORY_FACTS))
# Wait for async extraction to settle
print(f"\n Waiting for memory extraction to settle (up to 60s)...")
_prev_count = -1
_stable_ticks = 0
_cur_count = 0
for _ in range(30):
time.sleep(2)
try:
_, body = get(f"{QDRANT}/collections/adolf_memories")
_cur_count = json.loads(body).get("result", {}).get("points_count", 0)
except Exception:
_cur_count = _prev_count
if _cur_count == _prev_count:
_stable_ticks += 1
if _stable_ticks >= 3:
break
else:
_stable_ticks = 0
_prev_count = _cur_count
print(f" Memory settled: {_cur_count} points in Qdrant")
print(f"\n Querying with {len(MEMORY_RECALLS)} recall questions...")
recall_results = []
for i, (question, keywords) in enumerate(MEMORY_RECALLS, 1):
print(f" [mem-recall-{i:02d}] {question!r}")
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": question, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
print(f" → [{FAIL}] POST returned {status}")
recall_results.append((question, keywords, None, False))
continue
except Exception as e:
print(f" → [{FAIL}] POST error: {e}")
recall_results.append((question, keywords, None, False))
continue
t_start = time.monotonic()
found = None
while time.monotonic() - t_start < RECALL_TIMEOUT:
since = int(time.monotonic() - t_start) + 30
lines = fetch_logs(since_s=since)
found = parse_run_block(lines, question)
if found:
break
time.sleep(2)
if not found:
print(f" → [{FAIL}] timeout")
recall_results.append((question, keywords, None, False))
continue
reply_text = (found.get("reply_text") or "").lower()
hit_keywords = [kw for kw in keywords if kw.lower() in reply_text]
passed = len(hit_keywords) == len(keywords)
tag_str = PASS if passed else WARN
missing = [kw for kw in keywords if kw.lower() not in reply_text]
detail = f"tier={found['tier']} lat={found['reply_total']:.1f}s"
if missing:
detail += f" missing keywords: {missing}"
print(f" → [{tag_str}] {detail}")
recall_results.append((question, keywords, found.get("reply_text"), passed))
time.sleep(1)
print(f"\n {'#':<4} {'Pass':<5} {'Question':<45} {'Keywords'}")
print(f" {''*4} {''*5} {''*45} {''*30}")
for idx, (q, kws, reply, ok) in enumerate(recall_results, 1):
ok_str = "" if ok else ""
print(f" {ok_str} {idx:<3} {'yes' if ok else 'no':<5} {q[:45]:<45} {kws}")
recall_pass = sum(1 for _, _, _, ok in recall_results if ok)
total_recall = len(recall_results)
print(f"\n Memory recall score: {recall_pass}/{total_recall}")
report(results, f"Memory recall ({recall_pass}/{total_recall} keywords found)",
recall_pass == total_recall,
f"{recall_pass}/{total_recall} questions had all expected keywords in reply")
# ── 7. Deduplication test ─────────────────────────────────────────────────────
if _run_dedup:
print(f"\n[{INFO}] 7. Memory deduplication test")
print(f" Sends the same fact twice — Qdrant point count must not increase by 2")
print(f" Chat ID: {CHAT_ID}")
print()
DEDUP_TIMEOUT = 120
_dedup_fact = f"My lucky number is {random.randint(1000, 9999)}"
print(f" Fact: {_dedup_fact!r}")
pts_before = qdrant_count()
print(f" Qdrant points before: {pts_before}")
print(f" [dedup-1] sending fact (first time)")
found1 = None
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
report(results, "Dedup: first POST accepted", False, f"status={status}")
else:
found1 = wait_for("dedup-1", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True)
if found1:
print(f" [dedup-1] stored tier={found1['tier']} mem={found1['memory_s']}s")
else:
print(f" [dedup-1] timeout")
except Exception as e:
report(results, "Dedup: first POST accepted", False, str(e))
pts_after_first = qdrant_count()
new_first = pts_after_first - pts_before
print(f" Qdrant after first send: {pts_before}{pts_after_first} (+{new_first})")
print(f" [dedup-2] sending same fact (second time)")
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
report(results, "Dedup: second POST accepted", False, f"status={status}")
else:
found2 = wait_for("dedup-2", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True)
if found2:
print(f" [dedup-2] stored tier={found2['tier']} mem={found2['memory_s']}s")
else:
print(f" [dedup-2] timeout")
except Exception as e:
report(results, "Dedup: second POST accepted", False, str(e))
pts_after_second = qdrant_count()
new_second = pts_after_second - pts_after_first
print(f" Qdrant after second send: {pts_after_first}{pts_after_second} (+{new_second})")
dedup_ok = new_second <= new_first
report(results, "Deduplication: second identical fact not added to Qdrant", dedup_ok,
f"first send: +{new_first} pts, second send: +{new_second} pts (want second ≤ first)")
# ── summary ───────────────────────────────────────────────────────────────────
print_summary(results)
sys.exit(0 if all(ok for _, ok in results) else 1)

View File

@@ -0,0 +1,317 @@
#!/usr/bin/env python3
"""
Adolf tier routing benchmark.
Tests:
easy — 10 questions that must route to 'light' tier
medium — 11 questions that must route to 'medium' (light acceptable for some; complex = fail)
hard — 10 /think questions that must route to 'complex' (medium fallback acceptable)
Usage:
python3 test_routing.py [--chat-id CHAT_ID]
[--easy-only] # only easy benchmark
[--medium-only] # only medium benchmark
[--hard-only] # only hard benchmark
"""
import argparse
import sys
import time
from common import (
DEEPAGENTS, COMPOSE_FILE, DEFAULT_CHAT_ID,
BENCHMARK,
INFO, PASS, FAIL, WARN,
report, print_summary,
post_json, fetch_logs,
parse_run_block,
)
# ── args ──────────────────────────────────────────────────────────────────────
parser = argparse.ArgumentParser(description="Adolf routing benchmark")
parser.add_argument("--chat-id", default=DEFAULT_CHAT_ID)
parser.add_argument("--easy-only", action="store_true")
parser.add_argument("--medium-only", action="store_true")
parser.add_argument("--hard-only", action="store_true")
args = parser.parse_args()
CHAT_ID = args.chat_id
_only = args.easy_only or args.medium_only or args.hard_only
_run_easy = not _only or args.easy_only
_run_medium = not _only or args.medium_only
_run_hard = not _only or args.hard_only
results = []
# ── easy benchmark ────────────────────────────────────────────────────────────
if _run_easy:
print(f"\n[{INFO}] Easy routing benchmark")
print(f" {len(BENCHMARK['easy'])} questions — all must route to 'light'")
print(f" Chat ID: {CHAT_ID}")
print()
bench_results = []
LIGHT_TIMEOUT = 60
for i, question in enumerate(BENCHMARK["easy"], 1):
tag = f"easy-{i:02d}"
print(f" [{tag}] {question[:55]!r}")
t_send = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": question, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
print(f" → [{FAIL}] POST returned {status}")
bench_results.append((question, "?", None, False))
continue
except Exception as e:
print(f" → [{FAIL}] POST error: {e}")
bench_results.append((question, "?", None, False))
continue
t_start = time.monotonic()
found = None
while time.monotonic() - t_start < LIGHT_TIMEOUT:
since = int(time.monotonic() - t_start) + 30
lines = fetch_logs(since_s=since)
found = parse_run_block(lines, question)
if found:
break
time.sleep(1)
if not found:
print(f" → [{FAIL}] no reply within {LIGHT_TIMEOUT}s")
bench_results.append((question, "timeout", None, False))
continue
tier = found.get("tier", "unknown")
is_light = (tier == "light")
tag_str = PASS if is_light else FAIL
print(f" → [{tag_str}] tier={tier} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s")
bench_results.append((question, tier, found["reply_total"], is_light))
time.sleep(1)
print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}")
print(f" {''*4} {''*8} {''*8} {''*50}")
for idx, (q, tier, lat, ok) in enumerate(bench_results, 1):
lat_str = f"{lat:.1f}s" if lat is not None else "timeout"
ok_str = "" if ok else ""
print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:50]!r}")
light_count = sum(1 for _, _, _, ok in bench_results if ok)
total_bench = len(bench_results)
lats = [lat for _, _, lat, ok in bench_results if ok and lat is not None]
avg_lat = sum(lats) / len(lats) if lats else 0
print(f"\n Light-path score: {light_count}/{total_bench}")
if lats:
print(f" Avg latency (light): {avg_lat:.1f}s min={min(lats):.1f}s max={max(lats):.1f}s")
report(results, f"All easy questions routed to light ({light_count}/{total_bench})",
light_count == total_bench,
f"{light_count}/{total_bench} via light path, avg {avg_lat:.1f}s")
# ── medium benchmark ──────────────────────────────────────────────────────────
if _run_medium:
print(f"\n[{INFO}] Medium routing benchmark")
print(f" {len(BENCHMARK['medium'])} questions — must route to medium (light ok for some; complex = fail)")
print(f" Chat ID: {CHAT_ID}")
print()
LIGHT_ACCEPTABLE = {
"who won the last FIFA World Cup?",
"search for a good pasta carbonara recipe",
"find Python tutorials for beginners",
"search for the best coffee shops in Tokyo",
}
med_results = []
MEDIUM_TIMEOUT = 120
for i, question in enumerate(BENCHMARK["medium"], 1):
tag = f"med-{i:02d}"
print(f" [{tag}] {question[:60]!r}")
t_send = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": question, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
print(f" → [{FAIL}] POST returned {status}")
med_results.append((question, "?", None, False))
continue
except Exception as e:
print(f" → [{FAIL}] POST error: {e}")
med_results.append((question, "?", None, False))
continue
t_start = time.monotonic()
found = None
while time.monotonic() - t_start < MEDIUM_TIMEOUT:
since = int(time.monotonic() - t_start) + 60
lines = fetch_logs(since_s=since)
found = parse_run_block(lines, question)
if found:
break
time.sleep(3)
if not found:
print(f" → [{FAIL}] no reply within {MEDIUM_TIMEOUT}s")
med_results.append((question, "timeout", None, False))
continue
tier = found.get("tier", "unknown")
light_ok = question in LIGHT_ACCEPTABLE
if tier == "medium":
correct, label, note = True, PASS, "medium ✓"
elif tier == "light":
correct = light_ok
label = PASS if light_ok else WARN
note = "light (acceptable)" if light_ok else "light (should be medium)"
elif tier == "complex":
correct, label, note = False, FAIL, "complex — wrong escalation"
else:
correct, label, note = False, FAIL, f"unknown tier {tier!r}"
print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s")
med_results.append((question, tier, found["reply_total"], correct))
time.sleep(1)
print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}")
print(f" {''*4} {''*8} {''*8} {''*55}")
for idx, (q, tier, lat, ok) in enumerate(med_results, 1):
lat_str = f"{lat:.1f}s" if lat is not None else "timeout"
ok_str = "" if ok else ("~" if tier == "light" else "")
print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:55]!r}")
total_med = len(med_results)
medium_count = sum(1 for _, tier, _, _ in med_results if tier == "medium")
light_count = sum(1 for _, tier, _, _ in med_results if tier == "light")
complex_count = sum(1 for _, tier, _, _ in med_results if tier == "complex")
timeout_count = sum(1 for _, tier, _, _ in med_results if tier == "timeout")
light_misroute = sum(1 for q, tier, _, _ in med_results
if tier == "light" and q not in LIGHT_ACCEPTABLE)
lats = [lat for _, _, lat, _ in med_results if lat is not None]
print(f"\n Breakdown: medium={medium_count} light={light_count} "
f"complex={complex_count} timeout={timeout_count}")
if light_misroute:
print(f" [{WARN}] {light_misroute} question(s) answered via light when medium expected")
if lats:
print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s")
report(results,
f"Medium questions: no complex escalation ({medium_count + light_count}/{total_med} routed)",
complex_count == 0,
f"medium={medium_count} light={light_count} complex={complex_count} timeout={timeout_count}")
if timeout_count:
report(results, f"Medium questions: all completed within {MEDIUM_TIMEOUT}s", False,
f"{timeout_count} question(s) timed out")
# ── hard benchmark ────────────────────────────────────────────────────────────
if _run_hard:
print(f"\n[{INFO}] Hard routing benchmark")
print(f" {len(BENCHMARK['hard'])} /think questions — must route to 'complex'")
print(f" Acceptable fallback: 'medium' if VRAM eviction timed out")
print(f" Fail condition: tier=light or timeout")
print(f" Chat ID: {CHAT_ID}")
print()
hard_results = []
COMPLEX_TIMEOUT = 300
_VRAM_ENTER = "[vram] enter_complex_mode"
_VRAM_EXIT = "[vram] exit_complex_mode"
for i, question in enumerate(BENCHMARK["hard"], 1):
tag = f"hard-{i:02d}"
short_q = question[len("/think "):].strip()[:60]
print(f" [{tag}] /think {short_q!r}")
t_send = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": question, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
print(f" → [{FAIL}] POST returned {status}")
hard_results.append((question, "?", None, False))
continue
except Exception as e:
print(f" → [{FAIL}] POST error: {e}")
hard_results.append((question, "?", None, False))
continue
t_start = time.monotonic()
found = None
while time.monotonic() - t_start < COMPLEX_TIMEOUT:
since = int(time.monotonic() - t_start) + 90
lines = fetch_logs(since_s=since)
found = parse_run_block(lines, question[len("/think "):].strip())
if found:
break
time.sleep(5)
elapsed = time.monotonic() - t_send
if not found:
print(f" → [{FAIL}] no reply within {COMPLEX_TIMEOUT}s")
hard_results.append((question, "timeout", None, False))
continue
tier = found.get("tier", "unknown")
if tier == "complex":
ok, label, note = True, PASS, "complex ✓"
elif tier == "medium":
ok, label, note = True, WARN, "medium (VRAM fallback — check [vram] logs)"
else:
ok, label, note = False, FAIL, f"tier={tier} — unexpected"
lines_block = fetch_logs(since_s=int(elapsed) + 120)
recent = "\n".join(lines_block[-200:])
vram_enter_seen = _VRAM_ENTER in recent
vram_note = ""
if tier == "complex":
vram_note = " [vram:flush✓]" if vram_enter_seen else f" [{WARN}:no vram flush log]"
print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s{vram_note}")
hard_results.append((question, tier, found["reply_total"], ok))
time.sleep(5)
print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question (/think ...)'}")
print(f" {''*4} {''*8} {''*8} {''*55}")
for idx, (q, tier, lat, ok) in enumerate(hard_results, 1):
lat_str = f"{lat:.1f}s" if lat is not None else "timeout"
ok_str = "" if tier == "complex" else ("~" if tier == "medium" else "")
short = q[len("/think "):].strip()[:55]
print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {short!r}")
total_hard = len(hard_results)
complex_count = sum(1 for _, t, _, _ in hard_results if t == "complex")
medium_fb = sum(1 for _, t, _, _ in hard_results if t == "medium")
light_count = sum(1 for _, t, _, _ in hard_results if t == "light")
timeout_count = sum(1 for _, t, _, _ in hard_results if t == "timeout")
lats = [lat for _, _, lat, _ in hard_results if lat is not None]
print(f"\n Breakdown: complex={complex_count} medium(fallback)={medium_fb} "
f"light={light_count} timeout={timeout_count}")
if medium_fb:
print(f" [{WARN}] {medium_fb} question(s) fell back to medium (VRAM eviction timeout)")
if light_count:
print(f" [{FAIL}] {light_count} question(s) routed to light — /think prefix not detected")
if lats:
print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s")
report(results,
f"Hard questions routed to complex (not light) ({complex_count + medium_fb}/{total_hard})",
light_count == 0 and timeout_count == 0,
f"complex={complex_count} medium_fallback={medium_fb} light={light_count} timeout={timeout_count}")
# ── summary ───────────────────────────────────────────────────────────────────
print_summary(results)
sys.exit(0 if all(ok for _, ok in results) else 1)

2
tests/requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
pytest>=8.0
pytest-asyncio>=0.23

0
tests/unit/__init__.py Normal file
View File

80
tests/unit/conftest.py Normal file
View File

@@ -0,0 +1,80 @@
"""
Stub out all third-party packages that Adolf's source modules import.
This lets the unit tests run without a virtualenv or Docker environment.
Stubs are installed into sys.modules before any test file is collected.
"""
import sys
from unittest.mock import MagicMock
# ── helpers ────────────────────────────────────────────────────────────────────
def _mock(name: str) -> MagicMock:
m = MagicMock(name=name)
sys.modules[name] = m
return m
# ── pydantic: BaseModel must be a real class so `class Foo(BaseModel)` works ──
class _FakeBaseModel:
model_fields: dict = {}
def __init_subclass__(cls, **kwargs):
pass
def __init__(self, **data):
for k, v in data.items():
setattr(self, k, v)
_pydantic = _mock("pydantic")
_pydantic.BaseModel = _FakeBaseModel
# ── httpx: used by channels.py, vram_manager.py, agent.py ────────────────────
_mock("httpx")
# ── fastapi ───────────────────────────────────────────────────────────────────
_fastapi = _mock("fastapi")
_mock("fastapi.responses")
# ── langchain stack ───────────────────────────────────────────────────────────
_mock("langchain_openai")
_lc_core = _mock("langchain_core")
_lc_msgs = _mock("langchain_core.messages")
_mock("langchain_core.tools")
# Provide real-ish message classes so router.py can instantiate them
class _FakeMsg:
def __init__(self, content=""):
self.content = content
class SystemMessage(_FakeMsg):
pass
class HumanMessage(_FakeMsg):
pass
class AIMessage(_FakeMsg):
def __init__(self, content="", tool_calls=None):
super().__init__(content)
self.tool_calls = tool_calls or []
_lc_msgs.SystemMessage = SystemMessage
_lc_msgs.HumanMessage = HumanMessage
_lc_msgs.AIMessage = AIMessage
_mock("langchain_mcp_adapters")
_mock("langchain_mcp_adapters.client")
_mock("langchain_community")
_mock("langchain_community.utilities")
# ── deepagents (agent_factory.py) ─────────────────────────────────────────────
_mock("deepagents")

View File

@@ -0,0 +1,198 @@
"""
Unit tests for agent.py helper functions:
- _strip_think(text)
- _extract_final_text(result)
agent.py has heavy FastAPI/LangChain imports; conftest.py stubs them out so
these pure functions can be imported and tested in isolation.
"""
import pytest
# conftest.py has already installed all stubs into sys.modules.
# The FastAPI app is instantiated at module level in agent.py —
# with the mocked fastapi, that just creates a MagicMock() object
# and the route decorators are no-ops.
from agent import _strip_think, _extract_final_text, _extract_urls
# ── _strip_think ───────────────────────────────────────────────────────────────
class TestStripThink:
def test_removes_single_think_block(self):
text = "<think>internal reasoning</think>Final answer."
assert _strip_think(text) == "Final answer."
def test_removes_multiline_think_block(self):
text = "<think>\nLine one.\nLine two.\n</think>\nResult here."
assert _strip_think(text) == "Result here."
def test_no_think_block_unchanged(self):
text = "This is a plain answer with no think block."
assert _strip_think(text) == text
def test_removes_multiple_think_blocks(self):
text = "<think>step 1</think>middle<think>step 2</think>end"
assert _strip_think(text) == "middleend"
def test_strips_surrounding_whitespace(self):
text = " <think>stuff</think> answer "
assert _strip_think(text) == "answer"
def test_empty_think_block(self):
text = "<think></think>Hello."
assert _strip_think(text) == "Hello."
def test_empty_string(self):
assert _strip_think("") == ""
def test_only_think_block_returns_empty(self):
text = "<think>nothing useful</think>"
assert _strip_think(text) == ""
def test_think_block_with_nested_tags(self):
text = "<think>I should use <b>bold</b> here</think>Done."
assert _strip_think(text) == "Done."
def test_preserves_markdown(self):
text = "<think>plan</think>## Report\n\n- Point one\n- Point two"
result = _strip_think(text)
assert result == "## Report\n\n- Point one\n- Point two"
# ── _extract_final_text ────────────────────────────────────────────────────────
class TestExtractFinalText:
def _ai_msg(self, content: str, tool_calls=None):
"""Create a minimal AIMessage-like object."""
class AIMessage:
pass
m = AIMessage()
m.content = content
m.tool_calls = tool_calls or []
return m
def _human_msg(self, content: str):
class HumanMessage:
pass
m = HumanMessage()
m.content = content
return m
def test_returns_last_ai_message_content(self):
result = {
"messages": [
self._human_msg("what is 2+2"),
self._ai_msg("The answer is 4."),
]
}
assert _extract_final_text(result) == "The answer is 4."
def test_returns_last_of_multiple_ai_messages(self):
result = {
"messages": [
self._ai_msg("First response."),
self._human_msg("follow-up"),
self._ai_msg("Final response."),
]
}
assert _extract_final_text(result) == "Final response."
def test_skips_empty_ai_messages(self):
result = {
"messages": [
self._ai_msg("Real answer."),
self._ai_msg(""), # empty — should be skipped
]
}
assert _extract_final_text(result) == "Real answer."
def test_strips_think_tags_from_ai_message(self):
result = {
"messages": [
self._ai_msg("<think>reasoning here</think>Clean reply."),
]
}
assert _extract_final_text(result) == "Clean reply."
def test_falls_back_to_output_field(self):
result = {
"messages": [],
"output": "Fallback output.",
}
assert _extract_final_text(result) == "Fallback output."
def test_strips_think_from_output_field(self):
result = {
"messages": [],
"output": "<think>thoughts</think>Actual output.",
}
assert _extract_final_text(result) == "Actual output."
def test_returns_none_when_no_content(self):
result = {"messages": []}
assert _extract_final_text(result) is None
def test_returns_none_when_no_messages_and_no_output(self):
result = {"messages": [], "output": ""}
# output is falsy → returns None
assert _extract_final_text(result) is None
def test_skips_non_ai_messages(self):
result = {
"messages": [
self._human_msg("user question"),
]
}
assert _extract_final_text(result) is None
def test_handles_ai_message_with_tool_calls_but_no_content(self):
"""AIMessage that only has tool_calls (no content) should be skipped."""
msg = self._ai_msg("", tool_calls=[{"name": "web_search", "args": {}}])
result = {"messages": [msg]}
assert _extract_final_text(result) is None
def test_multiline_think_stripped_correctly(self):
result = {
"messages": [
self._ai_msg("<think>\nLong\nreasoning\nblock\n</think>\n## Report\n\nSome content."),
]
}
assert _extract_final_text(result) == "## Report\n\nSome content."
# ── _extract_urls ──────────────────────────────────────────────────────────────
class TestExtractUrls:
def test_single_url(self):
assert _extract_urls("check this out https://example.com please") == ["https://example.com"]
def test_multiple_urls(self):
urls = _extract_urls("see https://foo.com and https://bar.org/path?q=1")
assert urls == ["https://foo.com", "https://bar.org/path?q=1"]
def test_no_urls(self):
assert _extract_urls("no links here at all") == []
def test_http_and_https(self):
urls = _extract_urls("http://old.site and https://new.site")
assert "http://old.site" in urls
assert "https://new.site" in urls
def test_url_at_start_of_message(self):
assert _extract_urls("https://example.com is interesting") == ["https://example.com"]
def test_url_only(self):
assert _extract_urls("https://example.com/page") == ["https://example.com/page"]
def test_url_with_path_and_query(self):
url = "https://example.com/articles/123?ref=home&page=2"
assert _extract_urls(url) == [url]
def test_empty_string(self):
assert _extract_urls("") == []
def test_does_not_include_surrounding_quotes(self):
# URLs inside quotes should not include the quote character
urls = _extract_urls('visit "https://example.com" today')
assert urls == ["https://example.com"]

125
tests/unit/test_channels.py Normal file
View File

@@ -0,0 +1,125 @@
"""Unit tests for channels.py — register, deliver, pending_replies queue."""
import asyncio
import pytest
from unittest.mock import AsyncMock, patch
import channels
@pytest.fixture(autouse=True)
def reset_channels_state():
"""Clear module-level state before and after every test."""
channels._callbacks.clear()
channels.pending_replies.clear()
yield
channels._callbacks.clear()
channels.pending_replies.clear()
# ── register ───────────────────────────────────────────────────────────────────
class TestRegister:
def test_register_stores_callback(self):
cb = AsyncMock()
channels.register("test_channel", cb)
assert channels._callbacks["test_channel"] is cb
def test_register_overwrites_existing(self):
cb1 = AsyncMock()
cb2 = AsyncMock()
channels.register("ch", cb1)
channels.register("ch", cb2)
assert channels._callbacks["ch"] is cb2
def test_register_multiple_channels(self):
cb_a = AsyncMock()
cb_b = AsyncMock()
channels.register("a", cb_a)
channels.register("b", cb_b)
assert channels._callbacks["a"] is cb_a
assert channels._callbacks["b"] is cb_b
# ── deliver ────────────────────────────────────────────────────────────────────
class TestDeliver:
async def test_deliver_enqueues_reply(self):
channels.register("cli", AsyncMock())
await channels.deliver("cli-alvis", "cli", "hello world")
q = channels.pending_replies["cli-alvis"]
assert not q.empty()
assert await q.get() == "hello world"
async def test_deliver_calls_channel_callback(self):
cb = AsyncMock()
channels.register("telegram", cb)
await channels.deliver("tg-123", "telegram", "reply text")
cb.assert_awaited_once_with("tg-123", "reply text")
async def test_deliver_unknown_channel_still_enqueues(self):
"""No registered callback for channel → reply still goes to the queue."""
await channels.deliver("cli-bob", "nonexistent", "fallback reply")
q = channels.pending_replies["cli-bob"]
assert await q.get() == "fallback reply"
async def test_deliver_unknown_channel_does_not_raise(self):
"""Missing callback must not raise an exception."""
await channels.deliver("cli-x", "ghost_channel", "msg")
async def test_deliver_creates_queue_if_absent(self):
channels.register("cli", AsyncMock())
assert "cli-new" not in channels.pending_replies
await channels.deliver("cli-new", "cli", "hi")
assert "cli-new" in channels.pending_replies
async def test_deliver_reuses_existing_queue(self):
"""Second deliver to the same session appends to the same queue."""
channels.register("cli", AsyncMock())
await channels.deliver("cli-alvis", "cli", "first")
await channels.deliver("cli-alvis", "cli", "second")
q = channels.pending_replies["cli-alvis"]
assert await q.get() == "first"
assert await q.get() == "second"
async def test_deliver_telegram_sends_to_callback(self):
sent = []
async def fake_tg(session_id, text):
sent.append((session_id, text))
channels.register("telegram", fake_tg)
await channels.deliver("tg-999", "telegram", "test message")
assert sent == [("tg-999", "test message")]
# ── register_defaults ──────────────────────────────────────────────────────────
class TestRegisterDefaults:
def test_registers_telegram_and_cli(self):
channels.register_defaults()
assert "telegram" in channels._callbacks
assert "cli" in channels._callbacks
async def test_cli_callback_is_noop(self):
"""CLI send callback does nothing (replies are handled via SSE queue)."""
channels.register_defaults()
cb = channels._callbacks["cli"]
# Should not raise and should return None
result = await cb("cli-alvis", "some reply")
assert result is None
async def test_telegram_callback_chunks_long_messages(self):
"""Telegram callback splits messages > 4000 chars into chunks."""
channels.register_defaults()
cb = channels._callbacks["telegram"]
long_text = "x" * 9000 # > 4000 chars → should produce 3 chunks
with patch("channels.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock()
mock_client_cls.return_value = mock_client
await cb("tg-123", long_text)
# 9000 chars / 4000 per chunk = 3 POST calls
assert mock_client.post.await_count == 3

200
tests/unit/test_router.py Normal file
View File

@@ -0,0 +1,200 @@
"""Unit tests for router.py — Router, _parse_tier, _format_history, _LIGHT_PATTERNS."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from router import Router, _parse_tier, _format_history, _LIGHT_PATTERNS
# ── _LIGHT_PATTERNS regex ──────────────────────────────────────────────────────
class TestLightPatterns:
@pytest.mark.parametrize("text", [
"hi", "Hi", "HI",
"hello", "hey", "yo", "sup",
"good morning", "good evening", "good night", "good afternoon",
"bye", "goodbye", "see you", "cya", "later", "ttyl",
"thanks", "thank you", "thx", "ty",
"ok", "okay", "k", "cool", "great", "awesome", "perfect",
"sounds good", "got it", "nice", "sure",
"how are you", "how are you?", "how are you doing today?",
"what's up",
"what day comes after Monday?",
"what day follows Friday?",
"what comes after summer?",
"what does NASA stand for?",
"what does AI stand for?",
# with trailing punctuation
"hi!", "hello.", "thanks!",
])
def test_matches(self, text):
assert _LIGHT_PATTERNS.match(text.strip()), f"Expected light match for: {text!r}"
@pytest.mark.parametrize("text", [
"what is the capital of France",
"tell me about bitcoin",
"what is 2+2",
"write me a poem",
"search for news about the election",
"what did we talk about last time",
"what is my name",
"/think compare these frameworks",
"how do I install Python",
"explain machine learning",
"", # empty string doesn't match the pattern
])
def test_no_match(self, text):
assert not _LIGHT_PATTERNS.match(text.strip()), f"Expected NO light match for: {text!r}"
# ── _parse_tier ────────────────────────────────────────────────────────────────
class TestParseTier:
@pytest.mark.parametrize("raw,expected", [
("light", "light"),
("Light", "light"),
("LIGHT\n", "light"),
("medium", "medium"),
("Medium.", "medium"),
("complex", "complex"),
("Complex!", "complex"),
# descriptive words → light
("simplefact", "light"),
("trivial question", "light"),
("basic", "light"),
("easy answer", "light"),
("general knowledge", "light"),
# unknown → medium
("unknown_category", "medium"),
("", "medium"),
("I don't know", "medium"),
# complex only if 'complex' appears in first 60 chars
("this is a complex query requiring search", "complex"),
# _parse_tier checks "complex" before "medium", so complex wins even if medium appears first
("medium complexity, not complex", "complex"),
])
def test_parse_tier(self, raw, expected):
assert _parse_tier(raw) == expected
# ── _format_history ────────────────────────────────────────────────────────────
class TestFormatHistory:
def test_empty(self):
assert _format_history([]) == "(none)"
def test_single_user_message(self):
history = [{"role": "user", "content": "hello there"}]
result = _format_history(history)
assert "user: hello there" in result
def test_multiple_turns(self):
history = [
{"role": "user", "content": "What is Python?"},
{"role": "assistant", "content": "Python is a programming language."},
]
result = _format_history(history)
assert "user: What is Python?" in result
assert "assistant: Python is a programming language." in result
def test_truncates_long_content(self):
long_content = "x" * 300
history = [{"role": "user", "content": long_content}]
result = _format_history(history)
# content is truncated to 200 chars in _format_history
assert len(result) < 250
def test_missing_keys_handled(self):
# Should not raise — uses .get() with defaults
history = [{"role": "user"}] # no content key
result = _format_history(history)
assert "user:" in result
# ── Router.route() ─────────────────────────────────────────────────────────────
class TestRouterRoute:
def _make_router(self, classify_response: str, reply_response: str = "Sure!") -> Router:
"""Return a Router with a mock model that returns given classification and reply."""
model = MagicMock()
classify_msg = MagicMock()
classify_msg.content = classify_response
reply_msg = MagicMock()
reply_msg.content = reply_response
# First ainvoke call → classification; second → reply
model.ainvoke = AsyncMock(side_effect=[classify_msg, reply_msg])
return Router(model=model)
async def test_force_complex_bypasses_classification(self):
router = self._make_router("medium")
tier, reply = await router.route("some question", [], force_complex=True)
assert tier == "complex"
assert reply is None
# Model should NOT have been called
router.model.ainvoke.assert_not_called()
async def test_regex_light_skips_llm_classification(self):
# Regex match bypasses classification entirely; the only ainvoke call is the reply.
model = MagicMock()
reply_msg = MagicMock()
reply_msg.content = "I'm doing great!"
model.ainvoke = AsyncMock(return_value=reply_msg)
router = Router(model=model)
tier, reply = await router.route("how are you", [], force_complex=False)
assert tier == "light"
assert reply == "I'm doing great!"
# Exactly one model call — no classification step
assert router.model.ainvoke.call_count == 1
async def test_llm_classifies_medium(self):
router = self._make_router("medium")
tier, reply = await router.route("what is the bitcoin price?", [], force_complex=False)
assert tier == "medium"
assert reply is None
async def test_llm_classifies_light_generates_reply(self):
router = self._make_router("light", "Paris is the capital of France.")
tier, reply = await router.route("what is the capital of France?", [], force_complex=False)
assert tier == "light"
assert reply == "Paris is the capital of France."
async def test_llm_classifies_complex_downgraded_to_medium(self):
# Without /think prefix, complex classification → downgraded to medium
router = self._make_router("complex")
tier, reply = await router.route("compare React and Vue", [], force_complex=False)
assert tier == "medium"
assert reply is None
async def test_llm_error_falls_back_to_medium(self):
model = MagicMock()
model.ainvoke = AsyncMock(side_effect=Exception("connection error"))
router = Router(model=model)
tier, reply = await router.route("some question", [], force_complex=False)
assert tier == "medium"
assert reply is None
async def test_light_reply_empty_falls_back_to_medium(self):
"""If the light reply comes back empty, router returns medium instead."""
router = self._make_router("light", "") # empty reply
tier, reply = await router.route("what is 2+2", [], force_complex=False)
assert tier == "medium"
assert reply is None
async def test_strips_think_tags_from_classification(self):
"""Router strips <think>...</think> from model output before parsing tier."""
model = MagicMock()
classify_msg = MagicMock()
classify_msg.content = "<think>Hmm let me think...</think>medium"
reply_msg = MagicMock()
reply_msg.content = "I'm fine!"
model.ainvoke = AsyncMock(side_effect=[classify_msg, reply_msg])
router = Router(model=model)
tier, _ = await router.route("what is the news?", [], force_complex=False)
assert tier == "medium"
async def test_think_prefix_forces_complex(self):
"""/think prefix is already stripped by agent.py; force_complex=True is passed."""
router = self._make_router("medium")
tier, reply = await router.route("analyse this", [], force_complex=True)
assert tier == "complex"
assert reply is None

View File

@@ -0,0 +1,164 @@
"""Unit tests for vram_manager.py — VRAMManager flush/poll/prewarm logic."""
import asyncio
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from vram_manager import VRAMManager
BASE_URL = "http://localhost:11434"
def _make_manager() -> VRAMManager:
return VRAMManager(base_url=BASE_URL)
def _mock_client(get_response=None, post_response=None):
"""Return a context-manager mock for httpx.AsyncClient."""
client = AsyncMock()
client.__aenter__ = AsyncMock(return_value=client)
client.__aexit__ = AsyncMock(return_value=False)
if get_response is not None:
client.get = AsyncMock(return_value=get_response)
if post_response is not None:
client.post = AsyncMock(return_value=post_response)
return client
# ── _flush ─────────────────────────────────────────────────────────────────────
class TestFlush:
async def test_sends_keep_alive_zero(self):
client = _mock_client(post_response=MagicMock())
with patch("vram_manager.httpx.AsyncClient", return_value=client):
mgr = _make_manager()
await mgr._flush("qwen3:4b")
client.post.assert_awaited_once()
_, kwargs = client.post.await_args
body = kwargs.get("json") or client.post.call_args[1].get("json") or client.post.call_args[0][1]
assert body["model"] == "qwen3:4b"
assert body["keep_alive"] == 0
async def test_posts_to_correct_endpoint(self):
client = _mock_client(post_response=MagicMock())
with patch("vram_manager.httpx.AsyncClient", return_value=client):
mgr = _make_manager()
await mgr._flush("qwen3:8b")
url = client.post.call_args[0][0]
assert url == f"{BASE_URL}/api/generate"
async def test_ignores_exceptions_silently(self):
client = AsyncMock()
client.__aenter__ = AsyncMock(return_value=client)
client.__aexit__ = AsyncMock(return_value=False)
client.post = AsyncMock(side_effect=Exception("connection refused"))
with patch("vram_manager.httpx.AsyncClient", return_value=client):
mgr = _make_manager()
# Should not raise
await mgr._flush("qwen3:4b")
# ── _prewarm ───────────────────────────────────────────────────────────────────
class TestPrewarm:
async def test_sends_keep_alive_300(self):
client = _mock_client(post_response=MagicMock())
with patch("vram_manager.httpx.AsyncClient", return_value=client):
mgr = _make_manager()
await mgr._prewarm("qwen3:4b")
_, kwargs = client.post.await_args
body = kwargs.get("json") or client.post.call_args[1].get("json") or client.post.call_args[0][1]
assert body["keep_alive"] == 300
assert body["model"] == "qwen3:4b"
async def test_ignores_exceptions_silently(self):
client = AsyncMock()
client.__aenter__ = AsyncMock(return_value=client)
client.__aexit__ = AsyncMock(return_value=False)
client.post = AsyncMock(side_effect=Exception("timeout"))
with patch("vram_manager.httpx.AsyncClient", return_value=client):
mgr = _make_manager()
await mgr._prewarm("qwen3:4b")
# ── _poll_evicted ──────────────────────────────────────────────────────────────
class TestPollEvicted:
async def test_returns_true_when_models_absent(self):
resp = MagicMock()
resp.json.return_value = {"models": [{"name": "some_other_model"}]}
client = _mock_client(get_response=resp)
with patch("vram_manager.httpx.AsyncClient", return_value=client):
mgr = _make_manager()
result = await mgr._poll_evicted(["qwen3:4b", "qwen2.5:1.5b"], timeout=5)
assert result is True
async def test_returns_false_on_timeout_when_model_still_loaded(self):
resp = MagicMock()
resp.json.return_value = {"models": [{"name": "qwen3:4b"}]}
client = _mock_client(get_response=resp)
with patch("vram_manager.httpx.AsyncClient", return_value=client):
mgr = _make_manager()
result = await mgr._poll_evicted(["qwen3:4b"], timeout=0.1)
assert result is False
async def test_returns_true_immediately_if_already_empty(self):
resp = MagicMock()
resp.json.return_value = {"models": []}
client = _mock_client(get_response=resp)
with patch("vram_manager.httpx.AsyncClient", return_value=client):
mgr = _make_manager()
result = await mgr._poll_evicted(["qwen3:4b"], timeout=5)
assert result is True
async def test_handles_poll_error_and_continues(self):
"""If /api/ps errors, polling continues until timeout."""
client = AsyncMock()
client.__aenter__ = AsyncMock(return_value=client)
client.__aexit__ = AsyncMock(return_value=False)
client.get = AsyncMock(side_effect=Exception("network error"))
with patch("vram_manager.httpx.AsyncClient", return_value=client):
mgr = _make_manager()
result = await mgr._poll_evicted(["qwen3:4b"], timeout=0.2)
assert result is False
# ── enter_complex_mode / exit_complex_mode ─────────────────────────────────────
class TestComplexMode:
async def test_enter_complex_mode_returns_true_on_success(self):
mgr = _make_manager()
mgr._flush = AsyncMock()
mgr._poll_evicted = AsyncMock(return_value=True)
result = await mgr.enter_complex_mode()
assert result is True
async def test_enter_complex_mode_flushes_medium_models(self):
mgr = _make_manager()
mgr._flush = AsyncMock()
mgr._poll_evicted = AsyncMock(return_value=True)
await mgr.enter_complex_mode()
flushed = {call.args[0] for call in mgr._flush.call_args_list}
assert "qwen3:4b" in flushed
assert "qwen2.5:1.5b" in flushed
async def test_enter_complex_mode_returns_false_on_eviction_timeout(self):
mgr = _make_manager()
mgr._flush = AsyncMock()
mgr._poll_evicted = AsyncMock(return_value=False)
result = await mgr.enter_complex_mode()
assert result is False
async def test_exit_complex_mode_flushes_complex_and_prewarms_medium(self):
mgr = _make_manager()
mgr._flush = AsyncMock()
mgr._prewarm = AsyncMock()
await mgr.exit_complex_mode()
# Must flush 8b
flushed = {call.args[0] for call in mgr._flush.call_args_list}
assert "qwen3:8b" in flushed
# Must prewarm medium models
prewarmed = {call.args[0] for call in mgr._prewarm.call_args_list}
assert "qwen3:4b" in prewarmed
assert "qwen2.5:1.5b" in prewarmed

View File

@@ -0,0 +1,41 @@
# Use Case: Apple Pie Research
Verify that a deep research query triggers the complex tier, uses web search and
page fetching, and produces a substantive, well-sourced recipe response.
## Steps
**1. Send the research query** (the `/think` prefix forces complex tier):
```bash
curl -s -X POST http://localhost:8000/message \
-H "Content-Type: application/json" \
-d '{"text": "/think what is the best recipe for an apple pie?", "session_id": "use-case-apple-pie", "channel": "cli", "user_id": "claude"}'
```
**2. Wait for the streaming reply** (complex tier can take up to 5 minutes):
```bash
curl -s -N --max-time 300 "http://localhost:8000/stream/use-case-apple-pie"
```
**3. Confirm tier and tool usage in agent logs:**
```bash
docker compose -f /home/alvis/adolf/docker-compose.yml logs deepagents \
--since=600s | grep -E "tier=complex|web_search|fetch_url|crawl4ai"
```
## Evaluate (use your judgment)
Check each of the following:
- **Tier**: logs show `tier=complex` for this session
- **Tool use**: logs show `web_search` or `fetch_url` calls during the request
- **Ingredients**: response lists specific apple pie ingredients (apples, flour, butter, sugar, etc.)
- **Method**: response includes preparation or baking steps
- **Sources**: response cites real URLs it fetched, not invented links
- **Quality**: response is structured and practical — not a refusal, stub, or generic placeholder
Report PASS only if all six criteria are met. For any failure, state which criterion
failed and quote the relevant part of the response or logs.

View File

@@ -0,0 +1,18 @@
# Use Case: CLI Startup
Verify the Adolf CLI container starts cleanly, shows the welcome banner,
and exits without error when the user closes input.
## Steps
```bash
echo "" | docker compose --profile tools run --rm -T cli \
python3 cli.py --url http://deepagents:8000 --session use-case-cli-startup
echo "exit code: $?"
```
## Pass if
- Output contains `Adolf CLI`
- Output contains the session name and gateway URL
- Exit code is 0