Compare commits

..

4 Commits

Author SHA1 Message Date
Alvis
09a93c661e Add three-tier model routing with VRAM management and benchmark suite
- Three-tier routing: light (router answers directly ~3s), medium (qwen3:4b
  + tools ~60s), complex (/think prefix → qwen3:8b + subagents ~140s)
- Router: qwen2.5:1.5b, temp=0, regex pre-classifier + raw-text LLM classify
- VRAMManager: explicit flush/poll/prewarm to prevent Ollama CPU-spill bug
- agent_factory: build_medium_agent and build_complex_agent using deepagents
  (TodoListMiddleware + SubAgentMiddleware with research/memory subagents)
- Fix: split Telegram replies >4000 chars into multiple messages
- Benchmark: 30 questions (easy/medium/hard) — 10/10/10 verified passing
  easy→light, medium→medium, hard→complex with VRAM flush confirmed

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 17:54:51 +00:00
Alvis
ff20f8942d Fix system prompt: agent now correctly handles memory requests
- Tell agent that memory is saved automatically after every reply
- Instruct agent to never say it cannot store information
- Instruct agent to acknowledge and confirm when user asks to remember something
- Fix misleading startup log (gemma3:1b → qwen2.5:1.5b)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-23 05:22:08 +00:00
Alvis
d61dcfb83e Switch extraction model to qwen2.5:1.5b, fix mem0migrations dims, update tests
- openmemory: use qwen2.5:1.5b instead of gemma3:1b for fact extraction
- test_pipeline.py: check qwen2.5:1.5b, fix SSE checks, fix Qdrant payload
  parsing, relax SearXNG threshold to 5s, improve marker word test
- potential-directions.md: ranked CPU extraction model candidates
- Root cause: mem0migrations collection had stale 1536-dim vectors causing
  silent dedup failures; recreate both collections at 768 dims

All 18 pipeline tests now pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-23 05:11:29 +00:00
Alvis
f6714f9392 Add Adolf architecture doc and integration test script
- ARCHITECTURE.md: comprehensive pipeline description (copied from Gitea wiki)
- test_pipeline.py: tests all services, memory, async timing, and recall

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-23 04:52:40 +00:00
10 changed files with 1749 additions and 0 deletions

144
adolf/ARCHITECTURE.md Normal file
View File

@@ -0,0 +1,144 @@
# Adolf
Persistent AI assistant reachable via Telegram. Three-tier model routing with GPU VRAM management.
## Architecture
```
Telegram user
↕ (long-polling)
[grammy] Node.js — port 3001
- grammY bot polls Telegram
- on message: fire-and-forget POST /chat to deepagents
- exposes MCP SSE server: tool send_telegram_message(chat_id, text)
↓ POST /chat → 202 Accepted immediately
[deepagents] Python FastAPI — port 8000
Pre-check: starts with /think? → force_complex=True, strip prefix
Router (qwen2.5:0.5b, ~1-2s, always warm in VRAM)
Structured output: {tier: light|medium|complex, confidence: 0.0-1.0, reply?: str}
- light: simple conversational → router answers directly, ~1-2s
- medium: needs memory/web search → qwen3:4b + deepagents tools
- complex: multi-step research, planning, code → qwen3:8b + subagents
force_complex always overrides to complex
complex only if confidence >= 0.85 (else downgraded to medium)
├── light ─────────── router reply used directly (no extra LLM call)
├── medium ────────── deepagents qwen3:4b + TodoList + tools
└── complex ───────── VRAM flush → deepagents qwen3:8b + TodoList + subagents
└→ background: exit_complex_mode (flush 8b, prewarm 4b+router)
send_telegram_message via grammy MCP
asyncio.create_task(store_memory_async) — spin-wait GPU idle → openmemory add_memory
↕ MCP SSE ↕ HTTP
[openmemory] Python + mem0 — port 8765 [SearXNG — port 11437]
- add_memory, search_memory, get_all_memories
- extractor: qwen2.5:1.5b on GPU Ollama (11436) — 25s
- embedder: nomic-embed-text on CPU Ollama (11435) — 50150ms
- vector store: Qdrant (port 6333), 768 dims
```
## Three-Tier Model Routing
| Tier | Model | VRAM | Trigger | Latency |
|------|-------|------|---------|---------|
| Light | qwen2.5:1.5b (router answers) | ~1.2 GB (shared with extraction) | Router classifies as light | ~24s |
| Medium | qwen3:4b | ~2.5 GB | Default; router classifies medium | ~2040s |
| Complex | qwen3:8b | ~5.5 GB | `/think` prefix | ~60120s |
**Normal VRAM** (light + medium): router/extraction(1.2, shared) + medium(2.5) = ~3.7 GB
**Complex VRAM**: 8b alone = ~5.5 GB — must flush others first
### Router model: qwen2.5:1.5b (not 0.5b)
qwen2.5:0.5b is too small for reliable classification — tends to output "medium" for everything
or produces nonsensical output. qwen2.5:1.5b is already loaded in VRAM for memory extraction,
so switching adds zero net VRAM overhead while dramatically improving accuracy.
Router uses **raw text generation** (not structured output/JSON schema):
- Ask model to output one word: `light`, `medium`, or `complex`
- Parse with simple keyword matching (fallback: `medium`)
- For `light` tier: a second call generates the reply text
## VRAM Management
GTX 1070 has 8 GB VRAM. Ollama's auto-eviction can spill models to CPU RAM permanently
(all subsequent loads stay on CPU). To prevent this:
1. **Always flush explicitly** before loading qwen3:8b (`keep_alive=0`)
2. **Verify eviction** via `/api/ps` poll (15s timeout) before proceeding
3. **Fallback**: timeout → log warning, run medium agent instead
4. **Post-complex**: flush 8b immediately, pre-warm 4b + router
```python
# Flush (force immediate unload):
POST /api/generate {"model": "qwen3:4b", "prompt": "", "keep_alive": 0}
# Pre-warm (load into VRAM for 5 min):
POST /api/generate {"model": "qwen3:4b", "prompt": "", "keep_alive": 300}
```
## Agents
**Medium agent** (`build_medium_agent`):
- `create_deep_agent` with TodoListMiddleware (auto-included)
- Tools: `search_memory`, `get_all_memories`, `web_search`
- No subagents
**Complex agent** (`build_complex_agent`):
- `create_deep_agent` with TodoListMiddleware + SubAgentMiddleware
- Tools: all agent tools
- Subagents:
- `research`: web_search only, for thorough multi-query web research
- `memory`: search_memory + get_all_memories, for comprehensive context retrieval
## Concurrency
| Semaphore | Guards | Notes |
|-----------|--------|-------|
| `_reply_semaphore(1)` | GPU Ollama (all tiers) | One LLM reply inference at a time |
| `_memory_semaphore(1)` | GPU Ollama (qwen2.5:1.5b extraction) | One memory extraction at a time |
Light path holds `_reply_semaphore` briefly (no GPU inference).
Memory extraction spin-waits until `_reply_semaphore` is free (60s timeout).
## Pipeline
1. User message → Grammy → `POST /chat` → 202 Accepted
2. Background: acquire `_reply_semaphore` → route → run agent tier → send reply
3. `asyncio.create_task(store_memory_async)` — spin-waits GPU free, then extracts memories
4. For complex: `asyncio.create_task(exit_complex_mode)` — flushes 8b, pre-warms 4b+router
## External Services (from openai/ stack)
| Service | Host Port | Role |
|---------|-----------|------|
| Ollama GPU | 11436 | All reply inference + extraction (qwen2.5:1.5b) |
| Ollama CPU | 11435 | Memory embedding (nomic-embed-text) |
| Qdrant | 6333 | Vector store for memories |
| SearXNG | 11437 | Web search |
GPU Ollama config: `OLLAMA_MAX_LOADED_MODELS=2`, `OLLAMA_NUM_PARALLEL=1`.
## Files
```
adolf/
├── docker-compose.yml Services: deepagents, openmemory, grammy
├── Dockerfile deepagents container (Python 3.12)
├── agent.py FastAPI + three-tier routing + run_agent_task
├── router.py Router class — qwen2.5:0.5b structured output routing
├── vram_manager.py VRAMManager — flush/prewarm/poll Ollama VRAM
├── agent_factory.py build_medium_agent / build_complex_agent (deepagents)
├── .env TELEGRAM_BOT_TOKEN (not committed)
├── openmemory/
│ ├── server.py FastMCP + mem0 MCP tools
│ ├── requirements.txt
│ └── Dockerfile
└── grammy/
├── bot.mjs grammY bot + MCP SSE server
├── package.json
└── Dockerfile
```

10
adolf/Dockerfile Normal file
View File

@@ -0,0 +1,10 @@
FROM python:3.12-slim
WORKDIR /app
RUN pip install --no-cache-dir deepagents langchain-ollama langgraph \
fastapi uvicorn langchain-mcp-adapters langchain-community httpx
COPY agent.py vram_manager.py router.py agent_factory.py hello_world.py .
CMD ["uvicorn", "agent:app", "--host", "0.0.0.0", "--port", "8000"]

309
adolf/agent.py Normal file
View File

@@ -0,0 +1,309 @@
import asyncio
import os
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from langchain_ollama import ChatOllama
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_community.utilities import SearxSearchWrapper
from langchain_core.tools import Tool
from vram_manager import VRAMManager
from router import Router
from agent_factory import build_medium_agent, build_complex_agent
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
ROUTER_MODEL = os.getenv("DEEPAGENTS_ROUTER_MODEL", "qwen2.5:0.5b")
MEDIUM_MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:4b")
COMPLEX_MODEL = os.getenv("DEEPAGENTS_COMPLEX_MODEL", "qwen3:8b")
SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437")
OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765")
GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001")
MAX_HISTORY_TURNS = 5
_conversation_buffers: dict[str, list] = {}
MEDIUM_SYSTEM_PROMPT = (
"You are a helpful AI assistant talking to a user via Telegram. "
"The user's ID is {user_id}. "
"IMPORTANT: When calling any memory tool (search_memory, get_all_memories), "
"always use user_id=\"{user_id}\". "
"Every conversation is automatically saved to memory after you reply — "
"you do NOT need to explicitly store anything. "
"NEVER tell the user you cannot remember or store information. "
"If the user asks you to remember something, acknowledge it and confirm it will be remembered. "
"Use search_memory when context from past conversations may be relevant. "
"Use web_search for questions about current events or facts you don't know. "
"Reply concisely."
)
COMPLEX_SYSTEM_PROMPT = (
"You are a capable AI assistant tackling a complex, multi-step task for a Telegram user. "
"The user's ID is {user_id}. "
"IMPORTANT: When calling any memory tool (search_memory, get_all_memories), "
"always use user_id=\"{user_id}\". "
"Plan your work using write_todos before diving in. "
"Delegate: use the 'research' subagent for thorough web research across multiple queries, "
"and the 'memory' subagent to gather comprehensive context from past conversations. "
"Every conversation is automatically saved to memory — you do NOT need to store anything. "
"NEVER tell the user you cannot remember or store information. "
"Produce a thorough, well-structured reply."
)
medium_agent = None
complex_agent = None
router: Router = None
vram_manager: VRAMManager = None
mcp_client = None
send_tool = None
add_memory_tool = None
# GPU mutex: one LLM inference at a time
_reply_semaphore = asyncio.Semaphore(1)
# Memory semaphore: one async extraction at a time
_memory_semaphore = asyncio.Semaphore(1)
@asynccontextmanager
async def lifespan(app: FastAPI):
global medium_agent, complex_agent, router, vram_manager
global mcp_client, send_tool, add_memory_tool
# Three model instances
router_model = ChatOllama(
model=ROUTER_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=4096,
temperature=0, # deterministic classification
)
medium_model = ChatOllama(
model=MEDIUM_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=8192
)
complex_model = ChatOllama(
model=COMPLEX_MODEL, base_url=OLLAMA_BASE_URL, think=True, num_ctx=16384
)
vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL)
router = Router(model=router_model)
mcp_connections = {
"openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"},
"grammy": {"transport": "sse", "url": f"{GRAMMY_URL}/sse"},
}
mcp_client = MultiServerMCPClient(mcp_connections)
for attempt in range(12):
try:
mcp_tools = await mcp_client.get_tools()
break
except Exception as e:
if attempt == 11:
raise
print(f"[agent] MCP not ready (attempt {attempt + 1}/12): {e}. Retrying in 5s...")
await asyncio.sleep(5)
send_tool = next((t for t in mcp_tools if t.name == "send_telegram_message"), None)
add_memory_tool = next((t for t in mcp_tools if t.name == "add_memory"), None)
agent_tools = [t for t in mcp_tools if t.name not in ("send_telegram_message", "add_memory")]
searx = SearxSearchWrapper(searx_host=SEARXNG_URL)
agent_tools.append(Tool(
name="web_search",
func=searx.run,
description="Search the web for current information",
))
# Build agents (system_prompt filled per-request with user_id)
medium_agent = build_medium_agent(
model=medium_model,
agent_tools=agent_tools,
system_prompt=MEDIUM_SYSTEM_PROMPT.format(user_id="{user_id}"),
)
complex_agent = build_complex_agent(
model=complex_model,
agent_tools=agent_tools,
system_prompt=COMPLEX_SYSTEM_PROMPT.format(user_id="{user_id}"),
)
print(
f"[agent] three-tier: router={ROUTER_MODEL} | medium={MEDIUM_MODEL} | complex={COMPLEX_MODEL}",
flush=True,
)
print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True)
yield
medium_agent = None
complex_agent = None
router = None
vram_manager = None
mcp_client = None
send_tool = None
add_memory_tool = None
app = FastAPI(lifespan=lifespan)
class ChatRequest(BaseModel):
message: str
chat_id: str
async def store_memory_async(conversation: str, user_id: str):
"""Fire-and-forget: extract and store memories after GPU is free."""
t_wait = time.monotonic()
while _reply_semaphore.locked():
if time.monotonic() - t_wait > 60:
print(f"[memory] spin-wait timeout 60s, proceeding for user {user_id}", flush=True)
break
await asyncio.sleep(0.5)
async with _memory_semaphore:
t0 = time.monotonic()
try:
await add_memory_tool.ainvoke({"text": conversation, "user_id": user_id})
print(f"[memory] stored in {time.monotonic() - t0:.1f}s for user {user_id}", flush=True)
except Exception as e:
print(f"[memory] error after {time.monotonic() - t0:.1f}s: {e}", flush=True)
def _extract_final_text(result) -> str | None:
"""Extract last AIMessage content from agent result."""
msgs = result.get("messages", [])
for m in reversed(msgs):
if type(m).__name__ == "AIMessage" and getattr(m, "content", ""):
return m.content
# deepagents may return output differently
if isinstance(result, dict) and result.get("output"):
return result["output"]
return None
def _log_messages(result):
msgs = result.get("messages", [])
for m in msgs:
role = type(m).__name__
content = getattr(m, "content", "")
tool_calls = getattr(m, "tool_calls", [])
if content:
print(f"[agent] {role}: {str(content)[:150]}", flush=True)
for tc in tool_calls:
print(f"[agent] {role}{tc['name']}({tc['args']})", flush=True)
async def run_agent_task(message: str, chat_id: str):
print(f"[agent] queued: {message[:80]!r} chat={chat_id}", flush=True)
# Pre-check: /think prefix forces complex tier
force_complex = False
clean_message = message
if message.startswith("/think "):
force_complex = True
clean_message = message[len("/think "):]
print("[agent] /think prefix → force_complex=True", flush=True)
async with _reply_semaphore:
t0 = time.monotonic()
history = _conversation_buffers.get(chat_id, [])
print(f"[agent] running: {clean_message[:80]!r}", flush=True)
# Route the message
tier, light_reply = await router.route(clean_message, history, force_complex)
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
final_text = None
try:
if tier == "light":
final_text = light_reply
llm_elapsed = time.monotonic() - t0
print(f"[agent] light path: answered by router", flush=True)
elif tier == "medium":
system_prompt = MEDIUM_SYSTEM_PROMPT.format(user_id=chat_id)
result = await medium_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
*history,
{"role": "user", "content": clean_message},
]
})
llm_elapsed = time.monotonic() - t0
_log_messages(result)
final_text = _extract_final_text(result)
else: # complex
ok = await vram_manager.enter_complex_mode()
if not ok:
print("[agent] complex→medium fallback (eviction timeout)", flush=True)
tier = "medium"
system_prompt = MEDIUM_SYSTEM_PROMPT.format(user_id=chat_id)
result = await medium_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
*history,
{"role": "user", "content": clean_message},
]
})
else:
system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=chat_id)
result = await complex_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
*history,
{"role": "user", "content": clean_message},
]
})
asyncio.create_task(vram_manager.exit_complex_mode())
llm_elapsed = time.monotonic() - t0
_log_messages(result)
final_text = _extract_final_text(result)
except Exception as e:
import traceback
llm_elapsed = time.monotonic() - t0
print(f"[agent] error after {llm_elapsed:.1f}s for chat {chat_id}: {e}", flush=True)
traceback.print_exc()
# Send reply via grammy MCP (split if > Telegram's 4096-char limit)
if final_text and send_tool:
t1 = time.monotonic()
MAX_TG = 4000 # leave headroom below the 4096 hard limit
chunks = [final_text[i:i + MAX_TG] for i in range(0, len(final_text), MAX_TG)]
for chunk in chunks:
await send_tool.ainvoke({"chat_id": chat_id, "text": chunk})
send_elapsed = time.monotonic() - t1
# Log in format compatible with test_pipeline.py parser
print(
f"[agent] replied in {time.monotonic() - t0:.1f}s "
f"(llm={llm_elapsed:.1f}s, send={send_elapsed:.1f}s) tier={tier}",
flush=True,
)
elif not final_text:
print("[agent] warning: no text reply from agent", flush=True)
# Update conversation buffer
if final_text:
buf = _conversation_buffers.get(chat_id, [])
buf.append({"role": "user", "content": clean_message})
buf.append({"role": "assistant", "content": final_text})
_conversation_buffers[chat_id] = buf[-(MAX_HISTORY_TURNS * 2):]
# Async memory storage (fire-and-forget)
if add_memory_tool and final_text:
conversation = f"User: {clean_message}\nAssistant: {final_text}"
asyncio.create_task(store_memory_async(conversation, chat_id))
@app.post("/chat")
async def chat(request: ChatRequest, background_tasks: BackgroundTasks):
if medium_agent is None:
return JSONResponse(status_code=503, content={"error": "Agent not ready"})
background_tasks.add_task(run_agent_task, request.message, request.chat_id)
return JSONResponse(status_code=202, content={"status": "accepted"})
@app.get("/health")
async def health():
return {"status": "ok", "agent_ready": medium_agent is not None}

54
adolf/agent_factory.py Normal file
View File

@@ -0,0 +1,54 @@
from deepagents import create_deep_agent, SubAgent
def build_medium_agent(model, agent_tools: list, system_prompt: str):
"""Medium agent: create_deep_agent with TodoList planning, no subagents."""
return create_deep_agent(
model=model,
tools=agent_tools,
system_prompt=system_prompt,
)
def build_complex_agent(model, agent_tools: list, system_prompt: str):
"""Complex agent: create_deep_agent with TodoList planning + research/memory subagents."""
web_tools = [t for t in agent_tools if getattr(t, "name", "") == "web_search"]
memory_tools = [
t for t in agent_tools
if getattr(t, "name", "") in ("search_memory", "get_all_memories")
]
research_sub: SubAgent = {
"name": "research",
"description": (
"Runs multiple web searches in parallel and synthesizes findings. "
"Use for thorough research tasks requiring several queries."
),
"system_prompt": (
"You are a research specialist. Search the web thoroughly using multiple queries. "
"Cite sources and synthesize information into a clear summary."
),
"tools": web_tools,
"model": model,
}
memory_sub: SubAgent = {
"name": "memory",
"description": (
"Searches and retrieves all relevant memories about the user comprehensively. "
"Use to gather full context from past conversations."
),
"system_prompt": (
"You are a memory specialist. Search broadly using multiple queries. "
"Return all relevant facts and context you find."
),
"tools": memory_tools,
"model": model,
}
return create_deep_agent(
model=model,
tools=agent_tools,
system_prompt=system_prompt,
subagents=[research_sub, memory_sub],
)

43
adolf/docker-compose.yml Normal file
View File

@@ -0,0 +1,43 @@
services:
deepagents:
build: .
container_name: deepagents
ports:
- "8000:8000"
environment:
- PYTHONUNBUFFERED=1
- OLLAMA_BASE_URL=http://host.docker.internal:11436
- DEEPAGENTS_MODEL=qwen3:4b
- DEEPAGENTS_COMPLEX_MODEL=qwen3:8b
- DEEPAGENTS_ROUTER_MODEL=qwen2.5:1.5b
- SEARXNG_URL=http://host.docker.internal:11437
extra_hosts:
- "host.docker.internal:host-gateway"
depends_on:
- openmemory
- grammy
restart: unless-stopped
openmemory:
build: ./openmemory
container_name: openmemory
ports:
- "8765:8765"
environment:
# Extraction LLM (qwen2.5:1.5b) runs on GPU after reply — fast 2-5s extraction
- OLLAMA_GPU_URL=http://host.docker.internal:11436
# Embedding (nomic-embed-text) runs on CPU — fast enough for search (50-150ms)
- OLLAMA_CPU_URL=http://host.docker.internal:11435
extra_hosts:
- "host.docker.internal:host-gateway"
restart: unless-stopped
grammy:
build: ./grammy
container_name: grammy
ports:
- "3001:3001"
environment:
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN}
- DEEPAGENTS_URL=http://deepagents:8000
restart: unless-stopped

View File

@@ -0,0 +1,62 @@
import os
from mcp.server.fastmcp import FastMCP
from mem0 import Memory
OLLAMA_CPU_URL = os.getenv("OLLAMA_CPU_URL", "http://host.docker.internal:11435")
QDRANT_HOST = os.getenv("QDRANT_HOST", "host.docker.internal")
QDRANT_PORT = int(os.getenv("QDRANT_PORT", "6333"))
config = {
"llm": {
"provider": "ollama",
"config": {
"model": "qwen2.5:1.5b",
"ollama_base_url": OLLAMA_CPU_URL,
},
},
"embedder": {
"provider": "ollama",
"config": {
"model": "nomic-embed-text",
"ollama_base_url": OLLAMA_CPU_URL,
},
},
"vector_store": {
"provider": "qdrant",
"config": {
"collection_name": "adolf_memories",
"embedding_model_dims": 768,
"host": QDRANT_HOST,
"port": QDRANT_PORT,
},
},
}
memory = Memory.from_config(config)
mcp = FastMCP("openmemory", host="0.0.0.0", port=8765)
@mcp.tool()
def add_memory(text: str, user_id: str = "default") -> str:
"""Store a memory for a user."""
result = memory.add(text, user_id=user_id)
return str(result)
@mcp.tool()
def search_memory(query: str, user_id: str = "default") -> str:
"""Search memories for a user using semantic similarity."""
results = memory.search(query, user_id=user_id)
return str(results)
@mcp.tool()
def get_all_memories(user_id: str = "default") -> str:
"""Get all stored memories for a user."""
results = memory.get_all(user_id=user_id)
return str(results)
if __name__ == "__main__":
mcp.run(transport="sse")

View File

@@ -0,0 +1,13 @@
# Potential Directions
## CPU Extraction Model Candidates (mem0 / openmemory)
Replacing `gemma3:1b` — documented JSON/structured output failures make it unreliable for mem0's extraction pipeline.
| Rank | Model | Size | CPU speed | JSON reliability | Notes |
|------|-------|------|-----------|-----------------|-------|
| 1 | `qwen2.5:1.5b` | ~934 MB | 2540 tok/s | Excellent | Best fit: fast + structured output, 18T token training |
| 2 | `qwen2.5:3b` | ~1.9 GB | 1525 tok/s | Excellent | Quality upgrade, same family |
| 3 | `llama3.2:3b` | ~2 GB | 1525 tok/s | Good | Highest IFEval score (77.4) in class |
| 4 | `smollm2:1.7b` | ~1.1 GB | 2535 tok/s | Moderate | Use temp=0; NuExtract-1.5-smol is fine-tuned variant |
| 5 | `phi4-mini` | ~2.5 GB | 1017 tok/s | Good | Function calling support, borderline CPU speed |

138
adolf/router.py Normal file
View File

@@ -0,0 +1,138 @@
import re
from typing import Optional
from langchain_core.messages import SystemMessage, HumanMessage
# ── Regex pre-classifier ──────────────────────────────────────────────────────
# Catches obvious light-tier patterns before calling the LLM.
# Keyed by regex → compiled pattern.
_LIGHT_PATTERNS = re.compile(
r"^("
# Greetings / farewells
r"hi|hello|hey|yo|sup|howdy|good morning|good evening|good night|good afternoon"
r"|bye|goodbye|see you|cya|later|ttyl"
# Acknowledgements / small talk
r"|thanks?|thank you|thx|ty|ok|okay|k|cool|great|awesome|perfect|sounds good|got it|nice|sure"
r"|how are you|how are you\?|how are you doing(\s+today)?[?!.]*"
r"|what.?s up"
# Calendar facts: "what day comes after X?" / "what comes after X?"
r"|what\s+day\s+(comes\s+after|follows|is\s+after)\s+\w+[?!.]*"
r"|what\s+comes\s+after\s+\w+[?!.]*"
# Acronym expansions: "what does X stand for?"
r"|what\s+does\s+\w+\s+stand\s+for[?!.]*"
r")[\s!.?]*$",
re.IGNORECASE,
)
# ── LLM classification prompt ─────────────────────────────────────────────────
CLASSIFY_PROMPT = """Classify the message. Output ONLY one word: light, medium, or complex.
LIGHT = answerable from general knowledge, no internet needed:
what is 2+2 / what is the capital of France / name the three primary colors
tell me a short joke / is the sky blue / is water wet
MEDIUM = requires web search or the user's stored memories:
current weather / today's news / Bitcoin price / what did we talk about
COMPLEX = /think prefix only:
/think compare frameworks / /think plan a trip
Message: {message}
Output (one word only — light, medium, or complex):"""
LIGHT_REPLY_PROMPT = """You are a helpful Telegram assistant. Answer briefly and naturally (1-3 sentences). Be friendly."""
def _format_history(history: list[dict]) -> str:
if not history:
return "(none)"
lines = []
for msg in history:
role = msg.get("role", "?")
content = str(msg.get("content", ""))[:200]
lines.append(f"{role}: {content}")
return "\n".join(lines)
def _parse_tier(text: str) -> str:
"""Extract tier from raw model output. Default to medium."""
t = text.strip().lower()
snippet = t[:60]
if "complex" in snippet:
return "complex"
if "medium" in snippet:
return "medium"
if "light" in snippet:
return "light"
# Model invented a descriptive category (e.g. "simplefact", "trivial", "basic") →
# treat as light since it recognised the question doesn't need tools
if any(w in snippet for w in ("simple", "fact", "trivial", "basic", "easy", "general")):
return "light"
return "medium" # safe default
class Router:
def __init__(self, model):
self.model = model
async def route(
self,
message: str,
history: list[dict],
force_complex: bool = False,
) -> tuple[str, Optional[str]]:
"""
Returns (tier, reply_or_None).
For light tier: also generates the reply with a second call.
For medium/complex: reply is None.
"""
if force_complex:
return "complex", None
# Step 0: regex pre-classification for obvious light patterns
if _LIGHT_PATTERNS.match(message.strip()):
print(f"[router] regex→light", flush=True)
return await self._generate_light_reply(message, history)
# Step 1: LLM classification with raw text output
try:
classify_response = await self.model.ainvoke([
HumanMessage(content=CLASSIFY_PROMPT.format(message=message)),
])
raw = classify_response.content or ""
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
tier = _parse_tier(raw)
if tier == "complex" and not message.startswith("/think"):
tier = "medium"
print(f"[router] raw={raw[:30]!r} → tier={tier}", flush=True)
except Exception as e:
print(f"[router] classify error, defaulting to medium: {e}", flush=True)
return "medium", None
if tier != "light":
return tier, None
return await self._generate_light_reply(message, history)
async def _generate_light_reply(
self, message: str, history: list[dict]
) -> tuple[str, Optional[str]]:
"""Generate a short reply using the router model for light-tier messages."""
history_text = _format_history(history)
context = f"\nConversation history:\n{history_text}" if history else ""
try:
reply_response = await self.model.ainvoke([
SystemMessage(content=LIGHT_REPLY_PROMPT + context),
HumanMessage(content=message),
])
reply_text = reply_response.content or ""
reply_text = re.sub(r"<think>.*?</think>", "", reply_text, flags=re.DOTALL).strip()
if not reply_text:
print("[router] light reply empty, falling back to medium", flush=True)
return "medium", None
print(f"[router] light reply: {len(reply_text)} chars", flush=True)
return "light", reply_text
except Exception as e:
print(f"[router] light reply error, falling back to medium: {e}", flush=True)
return "medium", None

905
adolf/test_pipeline.py Normal file
View File

@@ -0,0 +1,905 @@
#!/usr/bin/env python3
"""
Adolf pipeline integration test with end-to-end timing profiling.
Tests:
1. Service health (deepagents, openmemory, grammy MCP SSE)
2. GPU Ollama models
3. CPU Ollama models
4. Qdrant collection + vector dims
5. SearXNG
6. Name store — "remember that your name is <RandomName>"
7. Qdrant point added after store
8. Name recall — "what is your name?" → reply contains <RandomName>
9. Timing profile + bottleneck report
10. Easy benchmark — 10 easy questions → all must route to light
11. Medium benchmark — 10 medium questions → must route to medium (or light, never complex)
12. Hard benchmark — 10 /think questions → all must route to complex; VRAM flush verified
Usage:
python3 test_pipeline.py [--chat-id CHAT_ID]
[--bench-only] skip sections 1-9, run 10+11+12
[--easy-only] skip 1-9 and 11+12, run only section 10
[--medium-only] skip 1-9 and 10+12, run only section 11
[--hard-only] skip 1-9 and 10+11, run only section 12
[--no-bench] skip sections 10-12
Timing is extracted from deepagents container logs, not estimated from sleeps.
"""
import argparse
import http.client
import json
import random
import re
import subprocess
import sys
import time
import urllib.request
# ── config ────────────────────────────────────────────────────────────────────
DEEPAGENTS = "http://localhost:8000"
OPENMEMORY = "http://localhost:8765"
GRAMMY_HOST = "localhost"
GRAMMY_PORT = 3001
OLLAMA_GPU = "http://localhost:11436"
OLLAMA_CPU = "http://localhost:11435"
QDRANT = "http://localhost:6333"
SEARXNG = "http://localhost:11437"
COMPOSE_FILE = "/home/alvis/agap_git/adolf/docker-compose.yml"
DEFAULT_CHAT_ID = "346967270"
NAMES = [
"Maximilian", "Cornelius", "Zephyr", "Archibald", "Balthazar",
"Ignatius", "Lysander", "Octavian", "Reginald", "Sylvester",
]
# ── benchmark questions ───────────────────────────────────────────────────────
BENCHMARK = {
"easy": [
"hi",
"what is 2+2?",
"what is the capital of France?",
"tell me a short joke",
"how are you doing today?",
"thanks!",
"what day comes after Wednesday?",
"name the three primary colors",
"is the sky blue?",
"what does CPU stand for?",
],
"medium": [
"what is the current weather in Berlin?",
"find the latest news about artificial intelligence",
"what is the current price of Bitcoin?",
"search for a good pasta carbonara recipe",
"what movies are in theaters this week?",
"find Python tutorials for beginners",
"who won the last FIFA World Cup?",
"do you remember what we talked about before?",
"search for the best coffee shops in Tokyo",
"what is happening in the tech industry this week?",
],
"hard": [
"/think compare the top 3 Python web frameworks (Django, FastAPI, Flask) and recommend one for a production REST API",
"/think research the history of artificial intelligence and create a timeline of key milestones",
"/think plan a 7-day trip to Japan with daily itinerary, accommodation suggestions, and budget breakdown",
"/think analyze microservices vs monolithic architecture: pros, cons, and when to choose each",
"/think write a Python script that reads a CSV file, cleans the data, and generates summary statistics",
"/think research quantum computing: explain the key concepts and how it differs from classical computing",
"/think compare PostgreSQL, MongoDB, and Redis — when to use each and what are the trade-offs?",
"/think create a comprehensive Docker deployment guide covering best practices for production",
"/think research climate change: summarize the latest IPCC findings and key data points",
"/think design a REST API with authentication, rate limiting, and proper error handling — provide architecture and code outline",
],
}
PASS = "\033[32mPASS\033[0m"
FAIL = "\033[31mFAIL\033[0m"
INFO = "\033[36mINFO\033[0m"
WARN = "\033[33mWARN\033[0m"
results = []
timings = {} # label → float seconds | None
# ── helpers ───────────────────────────────────────────────────────────────────
def report(name, ok, detail=""):
tag = PASS if ok else FAIL
print(f" [{tag}] {name}" + (f"{detail}" if detail else ""))
results.append((name, ok))
def tf(v):
"""Format timing value."""
return f"{v:6.2f}s" if v is not None else " n/a"
def get(url, timeout=5):
with urllib.request.urlopen(urllib.request.Request(url), timeout=timeout) as r:
return r.status, r.read().decode()
def post_json(url, payload, timeout=10):
data = json.dumps(payload).encode()
req = urllib.request.Request(url, data=data,
headers={"Content-Type": "application/json"},
method="POST")
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, json.loads(r.read().decode())
def check_sse(host, port, path):
try:
conn = http.client.HTTPConnection(host, port, timeout=5)
conn.request("GET", path, headers={"Accept": "text/event-stream"})
r = conn.getresponse()
conn.close()
return r.status == 200, f"HTTP {r.status}"
except Exception as e:
return False, str(e)
def qdrant_count():
try:
_, body = get(f"{QDRANT}/collections/adolf_memories")
return json.loads(body).get("result", {}).get("points_count", 0)
except Exception:
return 0
def fetch_logs(since_s=600):
"""Return deepagents log lines from the last since_s seconds."""
try:
r = subprocess.run(
["docker", "compose", "-f", COMPOSE_FILE, "logs", "deepagents",
f"--since={int(since_s)}s", "--no-log-prefix"],
capture_output=True, text=True, timeout=15,
)
return r.stdout.splitlines()
except Exception:
return []
def parse_run_block(lines, msg_prefix):
"""
Scan log lines for the LAST '[agent] running: <msg_prefix>' block.
Extracts reply timing, tier, and memory timing from that block.
Returns dict or None if the reply has not appeared in logs yet.
Dict keys:
reply_total, llm, send, tier, reply_text — from "[agent] replied in ..."
memory_s — from "[memory] stored in ..."
memory_error — True if "[memory] error" found
"""
search = msg_prefix[:50]
start_idx = None
for i, line in enumerate(lines):
if "[agent] running:" in line and search in line:
start_idx = i # keep updating — we want the LAST occurrence
if start_idx is None:
return None
block = lines[start_idx:]
last_ai_text = None
reply_data = None
for j, line in enumerate(block):
# Track last non-tool AIMessage (the final reply)
if "AIMessage:" in line and "" not in line:
txt = line.split("AIMessage:", 1)[-1].strip()
if txt:
last_ai_text = txt
# For light tier: router reply is stored in _conversation_buffers directly
# so there may be no AIMessage log — grab from tier=light line
if "[agent] tier=light" in line and "message=" in line:
# Extract preview text logged elsewhere if available
pass
m = re.search(r"replied in ([\d.]+)s \(llm=([\d.]+)s, send=([\d.]+)s\)", line)
if m:
# Extract optional tier tag at end of line
tier_m = re.search(r"\btier=(\w+)", line)
tier = tier_m.group(1) if tier_m else "unknown"
reply_data = {
"reply_total": float(m.group(1)),
"llm": float(m.group(2)),
"send": float(m.group(3)),
"tier": tier,
"reply_text": last_ai_text,
"memory_s": None,
"memory_error": False,
"_j": j,
}
break
if reply_data is None:
return None # reply not in logs yet
# Memory line can appear after the next "[agent] running:" — no stop condition
for line in block[reply_data["_j"] + 1:]:
mm = re.search(r"\[memory\] stored in ([\d.]+)s", line)
if mm:
reply_data["memory_s"] = float(mm.group(1))
break
if "[memory] error" in line:
reply_data["memory_error"] = True
break
return reply_data
def wait_for(label, msg_prefix, timeout_s=200, need_memory=True):
"""
Poll deepagents logs until the message is fully processed.
Shows a live progress line.
Returns timing dict or None on timeout.
"""
t_start = time.monotonic()
deadline = t_start + timeout_s
tick = 0
last_result = None
while time.monotonic() < deadline:
# Window grows with elapsed time — never miss a line that appeared late
since = int(time.monotonic() - t_start) + 90
lines = fetch_logs(since_s=since)
result = parse_run_block(lines, msg_prefix)
if result:
last_result = result
has_mem = result["memory_s"] is not None or result["memory_error"]
if (not need_memory) or has_mem:
elapsed = time.monotonic() - t_start
print(f"\r [{label}] done after {elapsed:.0f}s{' ' * 30}")
return result
time.sleep(4)
tick += 1
rem = int(deadline - time.monotonic())
if last_result:
phase = "waiting for memory..." if need_memory else "done"
else:
phase = "waiting for LLM reply..."
print(f"\r [{label}] {tick*4}s elapsed, {rem}s left — {phase} ", end="", flush=True)
print(f"\r [{label}] TIMEOUT after {timeout_s}s{' ' * 30}")
return None
# ── args ──────────────────────────────────────────────────────────────────────
parser = argparse.ArgumentParser(description="Adolf pipeline test")
parser.add_argument("--chat-id", default=DEFAULT_CHAT_ID)
parser.add_argument("--bench-only", action="store_true",
help="Skip sections 1-9, run sections 10+11 (both benchmarks)")
parser.add_argument("--easy-only", action="store_true",
help="Skip sections 1-9 and 11, run only section 10 (easy benchmark)")
parser.add_argument("--medium-only", action="store_true",
help="Skip sections 1-9 and 10, run only section 11 (medium benchmark)")
parser.add_argument("--hard-only", action="store_true",
help="Skip sections 1-9 and 10+11, run only section 12 (hard benchmark)")
parser.add_argument("--no-bench", action="store_true",
help="Skip sections 10-12 (all benchmarks)")
args = parser.parse_args()
CHAT_ID = args.chat_id
# Derived flags for readability
_skip_pipeline = args.bench_only or args.easy_only or args.medium_only or args.hard_only
_run_easy = not args.no_bench and not args.medium_only and not args.hard_only
_run_medium = not args.no_bench and not args.easy_only and not args.hard_only
_run_hard = not args.no_bench and not args.easy_only and not args.medium_only
random_name = random.choice(NAMES)
if not _skip_pipeline:
print(f"\n Test name : \033[1m{random_name}\033[0m")
print(f" Chat ID : {CHAT_ID}")
# ── 1. service health ─────────────────────────────────────────────────────────
if not _skip_pipeline:
print(f"\n[{INFO}] 1. Service health")
t0 = time.monotonic()
try:
status, body = get(f"{DEEPAGENTS}/health")
data = json.loads(body)
ok = status == 200 and data.get("agent_ready") is True
report("deepagents /health — agent_ready", ok, f"agent_ready={data.get('agent_ready')}")
except Exception as e:
report("deepagents /health", False, str(e))
ok, detail = check_sse("localhost", 8765, "/sse")
report("openmemory /sse reachable", ok, detail)
ok, detail = check_sse(GRAMMY_HOST, GRAMMY_PORT, "/sse")
report("grammy /sse reachable", ok, detail)
timings["health_check"] = time.monotonic() - t0
# ── 2. GPU Ollama ─────────────────────────────────────────────────────────────
if not _skip_pipeline:
print(f"\n[{INFO}] 2. GPU Ollama (port 11436)")
t0 = time.monotonic()
try:
status, body = get(f"{OLLAMA_GPU}/api/tags")
models = [m["name"] for m in json.loads(body).get("models", [])]
has_qwen = any("qwen3" in m for m in models)
report("GPU Ollama reachable", True, f"models: {models}")
report("qwen3:8b present", has_qwen)
except Exception as e:
report("GPU Ollama reachable", False, str(e))
report("qwen3:8b present", False, "skipped")
timings["gpu_ollama_ping"] = time.monotonic() - t0
# ── 3. CPU Ollama ─────────────────────────────────────────────────────────────
if not _skip_pipeline:
print(f"\n[{INFO}] 3. CPU Ollama (port 11435)")
t0 = time.monotonic()
try:
status, body = get(f"{OLLAMA_CPU}/api/tags")
models = [m["name"] for m in json.loads(body).get("models", [])]
has_embed = any("nomic-embed-text" in m for m in models)
report("CPU Ollama reachable", True, f"models: {models}")
report("nomic-embed-text present", has_embed)
except Exception as e:
report("CPU Ollama reachable", False, str(e))
report("nomic-embed-text present", False, "skipped")
timings["cpu_ollama_ping"] = time.monotonic() - t0
# ── 4. Qdrant ─────────────────────────────────────────────────────────────────
if not _skip_pipeline:
print(f"\n[{INFO}] 4. Qdrant (port 6333)")
t0 = time.monotonic()
try:
status, body = get(f"{QDRANT}/collections")
cols = [c["name"] for c in json.loads(body).get("result", {}).get("collections", [])]
report("Qdrant reachable", True, f"collections: {cols}")
report("adolf_memories collection exists", "adolf_memories" in cols)
except Exception as e:
report("Qdrant reachable", False, str(e))
report("adolf_memories collection exists", False, "skipped")
try:
status, body = get(f"{QDRANT}/collections/adolf_memories")
info = json.loads(body).get("result", {})
dims = info.get("config", {}).get("params", {}).get("vectors", {}).get("size")
report("vector dims = 768", dims == 768, f"got {dims}")
except Exception as e:
report("adolf_memories collection info", False, str(e))
timings["qdrant_ping"] = time.monotonic() - t0
# ── 5. SearXNG ────────────────────────────────────────────────────────────────
if not _skip_pipeline:
print(f"\n[{INFO}] 5. SearXNG (port 11437)")
t0 = time.monotonic()
try:
status, body = get(f"{SEARXNG}/search?q=test&format=json", timeout=15)
elapsed = time.monotonic() - t0
n = len(json.loads(body).get("results", []))
report("SearXNG reachable + JSON results", status == 200 and n > 0, f"{n} results in {elapsed:.1f}s")
report("SearXNG response < 5s", elapsed < 5, f"{elapsed:.2f}s")
timings["searxng_latency"] = elapsed
except Exception as e:
report("SearXNG reachable", False, str(e))
report("SearXNG response < 5s", False, "skipped")
timings["searxng_latency"] = None
timings["searxng_check"] = time.monotonic() - t0
# ── 68. Name memory pipeline ─────────────────────────────────────────────────
if not _skip_pipeline:
print(f"\n[{INFO}] 68. Name memory pipeline")
print(f" chat_id={CHAT_ID} name={random_name}")
store_msg = f"remember that your name is {random_name}"
recall_msg = "what is your name?"
pts_before = qdrant_count()
print(f" Qdrant points before: {pts_before}")
# ── 6. Send store message ─────────────────────────────────────────────────────
print(f"\n [store] '{store_msg}'")
t_store = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": store_msg, "chat_id": CHAT_ID}, timeout=5)
t_accept = time.monotonic() - t_store
report("POST /chat (store) returns 202 immediately",
status == 202 and t_accept < 1, f"status={status}, t={t_accept:.3f}s")
timings["store_http_accept"] = t_accept
except Exception as e:
report("POST /chat (store)", False, str(e))
sys.exit(1)
store = wait_for("store", store_msg, timeout_s=220, need_memory=True)
if store:
timings["store_llm"] = store["llm"]
timings["store_send"] = store["send"]
timings["store_reply"] = store["reply_total"]
timings["store_memory"] = store["memory_s"]
report("Agent replied to store message", True,
f"{store['reply_total']:.1f}s total llm={store['llm']:.1f}s send={store['send']:.1f}s tier={store['tier']}")
if store["memory_s"] is not None:
report("Memory stored without error", True, f"{store['memory_s']:.1f}s")
elif store["memory_error"]:
report("Memory stored without error", False, "error in [memory] log")
else:
report("Memory stored without error", False, "not found in logs (still running?)")
print(f" Store reply: {store['reply_text']!r}")
else:
report("Agent replied to store message", False, "timeout")
report("Memory stored without error", False, "timeout")
sys.exit(1)
# ── 7. Verify Qdrant ──────────────────────────────────────────────────────────
pts_after = qdrant_count()
new_pts = pts_after - pts_before
report("New memory point(s) added to Qdrant", new_pts > 0,
f"{pts_before}{pts_after} (+{new_pts})")
timings["qdrant_new_points"] = new_pts
# ── 8. Send recall message ────────────────────────────────────────────────────
print(f"\n [recall] '{recall_msg}'")
t_recall = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": recall_msg, "chat_id": CHAT_ID}, timeout=5)
t_accept2 = time.monotonic() - t_recall
report("POST /chat (recall) returns 202 immediately",
status == 202 and t_accept2 < 1, f"status={status}, t={t_accept2:.3f}s")
timings["recall_http_accept"] = t_accept2
except Exception as e:
report("POST /chat (recall)", False, str(e))
recall = wait_for("recall", recall_msg, timeout_s=160, need_memory=False)
if recall:
timings["recall_llm"] = recall["llm"]
timings["recall_send"] = recall["send"]
timings["recall_reply"] = recall["reply_total"]
report("Agent replied to recall message", True,
f"{recall['reply_total']:.1f}s total llm={recall['llm']:.1f}s send={recall['send']:.1f}s tier={recall['tier']}")
reply_text = recall["reply_text"] or ""
name_in_reply = random_name.lower() in reply_text.lower()
report(f"Reply contains '{random_name}'", name_in_reply,
f"reply: {reply_text[:120]!r}")
else:
report("Agent replied to recall message", False, "timeout")
report(f"Reply contains '{random_name}'", False, "no reply")
# ── 9. Timing profile ─────────────────────────────────────────────────────────
if not _skip_pipeline:
print(f"\n[{INFO}] 9. Timing profile")
W = 36
print(f"\n {'Stage':<{W}} {'Time':>8}")
print(f" {''*W} {''*8}")
rows_store = [
("[GPU] HTTP accept — store turn", "store_http_accept"),
("[GPU] qwen3:Xb inference — store turn","store_llm"),
("[GPU] Telegram send — store turn", "store_send"),
("[GPU] Total reply latency — store", "store_reply"),
("[GPU] qwen2.5:1.5b+embed — async mem", "store_memory"),
]
rows_recall = [
("[GPU] HTTP accept — recall turn", "recall_http_accept"),
("[GPU] qwen3:Xb inference — recall", "recall_llm"),
("[GPU] Telegram send — recall turn", "recall_send"),
("[GPU] Total reply latency — recall", "recall_reply"),
]
for label, key in rows_store:
v = timings.get(key)
print(f" {label:<{W}} {tf(v):>8}")
print(f" {''*W} {''*8}")
for label, key in rows_recall:
v = timings.get(key)
print(f" {label:<{W}} {tf(v):>8}")
# Bottleneck bar chart
print(f"\n Bottleneck analysis (each █ ≈ 5s):")
print(f" {''*(W+12)}")
candidates = [
("[GPU] qwen3:Xb — store reply ", timings.get("store_llm") or 0),
("[GPU] qwen3:Xb — recall reply", timings.get("recall_llm") or 0),
("[GPU] qwen2.5:1.5b+embed (async)", timings.get("store_memory") or 0),
("[net] SearXNG ", timings.get("searxng_latency") or 0),
]
candidates.sort(key=lambda x: x[1], reverse=True)
for label, t in candidates:
bar = "" * min(int(t / 5), 24)
pct = ""
total_pipeline = (timings.get("store_reply") or 0) + (timings.get("store_memory") or 0)
if total_pipeline > 0:
pct = f" {t/total_pipeline*100:4.0f}%"
print(f" {label} {t:6.1f}s {bar}{pct}")
print()
# ── 10. Tier routing benchmark — easy questions → light path ──────────────────
if _run_easy:
print(f"\n[{INFO}] 10. Tier routing benchmark")
print(f" Sending {len(BENCHMARK['easy'])} easy questions — all must route to 'light'")
print(f" Chat ID: {CHAT_ID}")
print()
bench_results = [] # list of (question, tier, latency_s, ok)
LIGHT_TIMEOUT = 60 # seconds — light is fast but may queue behind prior messages
for i, question in enumerate(BENCHMARK["easy"], 1):
tag = f"easy-{i:02d}"
short_q = question[:55]
print(f" [{tag}] {short_q!r}")
# Send
t_send = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": question, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
print(f" → [{FAIL}] POST returned {status}")
bench_results.append((question, "?", None, False))
continue
except Exception as e:
print(f" → [{FAIL}] POST error: {e}")
bench_results.append((question, "?", None, False))
continue
# Poll for reply
t_start = time.monotonic()
found = None
while time.monotonic() - t_start < LIGHT_TIMEOUT:
since = int(time.monotonic() - t_start) + 30
lines = fetch_logs(since_s=since)
found = parse_run_block(lines, question)
if found:
break
time.sleep(1)
elapsed = time.monotonic() - t_send
if not found:
print(f" → [{FAIL}] no reply within {LIGHT_TIMEOUT}s")
bench_results.append((question, "timeout", None, False))
continue
tier = found.get("tier", "unknown")
is_light = (tier == "light")
tag_str = PASS if is_light else FAIL
print(f" → [{tag_str}] tier={tier} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s")
bench_results.append((question, tier, found["reply_total"], is_light))
# Brief pause between questions to keep logs clean
time.sleep(1)
# Summary table
print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}")
print(f" {''*4} {''*8} {''*8} {''*50}")
for idx, (q, tier, lat, ok) in enumerate(bench_results, 1):
lat_str = f"{lat:.1f}s" if lat is not None else "timeout"
ok_str = "" if ok else ""
print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:50]!r}")
light_count = sum(1 for _, _, _, ok in bench_results if ok)
total_bench = len(bench_results)
lats = [lat for _, _, lat, ok in bench_results if ok and lat is not None]
avg_lat = sum(lats) / len(lats) if lats else 0
print(f"\n Light-path score: {light_count}/{total_bench}")
if lats:
print(f" Avg latency (light): {avg_lat:.1f}s "
f"min={min(lats):.1f}s max={max(lats):.1f}s")
report(f"All easy questions routed to light ({light_count}/{total_bench})",
light_count == total_bench,
f"{light_count}/{total_bench} via light path, avg {avg_lat:.1f}s")
# ── 11. Medium benchmark — medium questions → medium or light, never complex ──
if _run_medium:
print(f"\n[{INFO}] 11. Medium routing benchmark")
print(f" Sending {len(BENCHMARK['medium'])} medium questions")
print(f" Expected: tier=medium (needs tools). Light is acceptable for factual questions.")
print(f" Fail condition: tier=complex or timeout.")
print(f" Chat ID: {CHAT_ID}")
print()
# Questions where light is a valid alternative (model may know from training data)
LIGHT_ACCEPTABLE = {
"who won the last FIFA World Cup?",
"search for a good pasta carbonara recipe",
"find Python tutorials for beginners",
"search for the best coffee shops in Tokyo",
}
med_results = [] # list of (question, tier, latency_s, correct)
MEDIUM_TIMEOUT = 120 # seconds — medium takes 20-100s, allow for queue buildup
for i, question in enumerate(BENCHMARK["medium"], 1):
tag = f"med-{i:02d}"
short_q = question[:60]
print(f" [{tag}] {short_q!r}")
# Send
t_send = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": question, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
print(f" → [{FAIL}] POST returned {status}")
med_results.append((question, "?", None, False))
continue
except Exception as e:
print(f" → [{FAIL}] POST error: {e}")
med_results.append((question, "?", None, False))
continue
# Poll for reply
t_start = time.monotonic()
found = None
while time.monotonic() - t_start < MEDIUM_TIMEOUT:
since = int(time.monotonic() - t_start) + 60
lines = fetch_logs(since_s=since)
found = parse_run_block(lines, question)
if found:
break
time.sleep(3)
elapsed = time.monotonic() - t_send
if not found:
print(f" → [{FAIL}] no reply within {MEDIUM_TIMEOUT}s")
med_results.append((question, "timeout", None, False))
continue
tier = found.get("tier", "unknown")
light_ok = question in LIGHT_ACCEPTABLE
if tier == "medium":
correct = True
label = PASS
note = "medium ✓"
elif tier == "light":
correct = light_ok # light is only acceptable for certain questions
label = WARN if not light_ok else PASS
note = "light (acceptable)" if light_ok else "light (should be medium)"
elif tier == "complex":
correct = False
label = FAIL
note = "complex — wrong escalation"
else:
correct = False
label = FAIL
note = f"unknown tier {tier!r}"
print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s")
med_results.append((question, tier, found["reply_total"], correct))
# Brief pause between questions
time.sleep(1)
# Summary table
print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}")
print(f" {''*4} {''*8} {''*8} {''*55}")
for idx, (q, tier, lat, ok) in enumerate(med_results, 1):
lat_str = f"{lat:.1f}s" if lat is not None else "timeout"
ok_str = "" if ok else ("~" if tier == "light" else "")
print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:55]!r}")
total_med = len(med_results)
medium_count = sum(1 for _, tier, _, _ in med_results if tier == "medium")
light_count = sum(1 for _, tier, _, _ in med_results if tier == "light")
complex_count = sum(1 for _, tier, _, _ in med_results if tier == "complex")
timeout_count = sum(1 for _, tier, _, _ in med_results if tier == "timeout")
light_misroute = sum(
1 for q, tier, _, _ in med_results
if tier == "light" and q not in LIGHT_ACCEPTABLE
)
lats = [lat for _, _, lat, _ in med_results if lat is not None]
correct_count = medium_count + (light_count - light_misroute)
print(f"\n Breakdown: medium={medium_count} light={light_count} complex={complex_count} timeout={timeout_count}")
if light_misroute:
print(f" [{WARN}] {light_misroute} question(s) answered via light when medium expected (check reply quality)")
if lats:
print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s")
no_complex = complex_count == 0
no_timeout = timeout_count == 0
all_ok = no_complex and no_timeout
report(
f"Medium questions: no complex escalation ({medium_count + light_count}/{total_med} routed)",
no_complex,
f"medium={medium_count} light={light_count} complex={complex_count} timeout={timeout_count}",
)
if not no_timeout:
report(
f"Medium questions: all completed within {MEDIUM_TIMEOUT}s",
False,
f"{timeout_count} question(s) timed out (increase MEDIUM_TIMEOUT or check agent logs)",
)
# ── 12. Hard benchmark — /think questions → complex tier + VRAM flush verified ─
if _run_hard:
print(f"\n[{INFO}] 12. Hard routing benchmark")
print(f" Sending {len(BENCHMARK['hard'])} /think questions — all must route to 'complex'")
print(f" Verifies: /think prefix → force_complex=True → VRAM flush → qwen3:8b inference")
print(f" Acceptable fallback: 'medium' if VRAM eviction timed out (logged warning)")
print(f" Fail condition: tier=light or timeout")
print(f" Chat ID: {CHAT_ID}")
print()
hard_results = [] # list of (question, tier, latency_s, ok)
COMPLEX_TIMEOUT = 300 # seconds — complex takes 60-180s + VRAM flush overhead
# Log markers we expect to see for complex path
_VRAM_ENTER = "[vram] enter_complex_mode"
_VRAM_EXIT = "[vram] exit_complex_mode"
for i, question in enumerate(BENCHMARK["hard"], 1):
tag = f"hard-{i:02d}"
# Strip /think prefix for display
short_q = question[len("/think "):].strip()[:60]
print(f" [{tag}] /think {short_q!r}")
# Snapshot log window start time
t_send = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": question, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
print(f" → [{FAIL}] POST returned {status}")
hard_results.append((question, "?", None, False))
continue
except Exception as e:
print(f" → [{FAIL}] POST error: {e}")
hard_results.append((question, "?", None, False))
continue
# Poll for reply
t_start = time.monotonic()
found = None
while time.monotonic() - t_start < COMPLEX_TIMEOUT:
since = int(time.monotonic() - t_start) + 90
lines = fetch_logs(since_s=since)
found = parse_run_block(lines, question[len("/think "):].strip())
if found:
break
time.sleep(5)
elapsed = time.monotonic() - t_send
if not found:
print(f" → [{FAIL}] no reply within {COMPLEX_TIMEOUT}s")
hard_results.append((question, "timeout", None, False))
continue
tier = found.get("tier", "unknown")
if tier == "complex":
ok = True
label = PASS
note = "complex ✓"
elif tier == "medium":
# Acceptable fallback if VRAM eviction timed out
ok = True
label = WARN
note = "medium (VRAM fallback — check [vram] logs)"
else:
ok = False
label = FAIL
note = f"tier={tier} — unexpected"
# Check if VRAM enter/exit were logged for this block
lines_block = fetch_logs(since_s=int(elapsed) + 120)
msg_key = question[len("/think "):].strip()[:40]
vram_enter_seen = any(_VRAM_ENTER in ln for ln in lines_block
if msg_key in ln or
any(msg_key[:15] in prev_ln
for prev_ln in lines_block[max(0, lines_block.index(ln)-10):lines_block.index(ln)]))
# Simpler: just check the recent log window for enter/exit markers
recent = "\n".join(lines_block[-200:])
vram_enter_seen = _VRAM_ENTER in recent
vram_exit_seen = _VRAM_EXIT in recent
vram_note = ""
if tier == "complex":
if vram_enter_seen:
vram_note = " [vram:flush✓]"
else:
vram_note = f" [{WARN}:no vram flush log]"
print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s{vram_note}")
hard_results.append((question, tier, found["reply_total"], ok))
# Pause to let exit_complex_mode background task complete before next question
# (flushes qwen3:8b and pre-warms 4b+router — avoids VRAM conflict on next enter)
time.sleep(5)
# Summary table
print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question (/think ...)'}")
print(f" {''*4} {''*8} {''*8} {''*55}")
for idx, (q, tier, lat, ok) in enumerate(hard_results, 1):
lat_str = f"{lat:.1f}s" if lat is not None else "timeout"
ok_str = "" if tier == "complex" else ("~" if tier == "medium" else "")
short = q[len("/think "):].strip()[:55]
print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {short!r}")
total_hard = len(hard_results)
complex_count = sum(1 for _, t, _, _ in hard_results if t == "complex")
medium_fb = sum(1 for _, t, _, _ in hard_results if t == "medium")
light_count = sum(1 for _, t, _, _ in hard_results if t == "light")
timeout_count = sum(1 for _, t, _, _ in hard_results if t == "timeout")
lats = [lat for _, _, lat, _ in hard_results if lat is not None]
print(f"\n Breakdown: complex={complex_count} medium(fallback)={medium_fb} light={light_count} timeout={timeout_count}")
if medium_fb:
print(f" [{WARN}] {medium_fb} question(s) fell back to medium (VRAM eviction timeout)")
if light_count:
print(f" [{FAIL}] {light_count} question(s) routed to light — /think prefix not detected")
if lats:
print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s")
no_light = light_count == 0
no_timeout = timeout_count == 0
report(
f"Hard questions routed to complex (not light) ({complex_count + medium_fb}/{total_hard})",
no_light and no_timeout,
f"complex={complex_count} medium_fallback={medium_fb} light={light_count} timeout={timeout_count}",
)
# ── summary ───────────────────────────────────────────────────────────────────
print(f"\n{''*55}")
total = len(results)
passed = sum(1 for _, ok in results if ok)
failed = total - passed
print(f"Results: {passed}/{total} passed", end="")
if failed:
print(f" ({failed} failed)\n")
print("Failed checks:")
for name, ok in results:
if not ok:
print(f" - {name}")
else:
print(" — all good")
print()
# Print benchmark reference
print(f"[{INFO}] Benchmark questions reference:")
for tier_name, questions in BENCHMARK.items():
print(f"\n {tier_name.upper()} ({len(questions)} questions):")
for j, q in enumerate(questions, 1):
print(f" {j:2d}. {q}")
print()

71
adolf/vram_manager.py Normal file
View File

@@ -0,0 +1,71 @@
import asyncio
import os
import httpx
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
class VRAMManager:
MEDIUM_MODELS = ["qwen3:4b", "qwen2.5:1.5b"]
COMPLEX_MODEL = "qwen3:8b"
def __init__(self, base_url: str = OLLAMA_BASE_URL):
self.base_url = base_url
async def enter_complex_mode(self) -> bool:
"""Flush medium models before loading 8b. Returns False if eviction timed out."""
print("[vram] enter_complex_mode: flushing medium models", flush=True)
await asyncio.gather(*[self._flush(m) for m in self.MEDIUM_MODELS])
ok = await self._poll_evicted(self.MEDIUM_MODELS, timeout=15)
if ok:
print("[vram] enter_complex_mode: eviction confirmed, loading qwen3:8b", flush=True)
else:
print("[vram] enter_complex_mode: eviction timeout — falling back to medium", flush=True)
return ok
async def exit_complex_mode(self):
"""Flush 8b and pre-warm medium models. Run as background task after complex reply."""
print("[vram] exit_complex_mode: flushing qwen3:8b", flush=True)
await self._flush(self.COMPLEX_MODEL)
print("[vram] exit_complex_mode: pre-warming medium models", flush=True)
await asyncio.gather(*[self._prewarm(m) for m in self.MEDIUM_MODELS])
print("[vram] exit_complex_mode: done", flush=True)
async def _flush(self, model: str):
"""Send keep_alive=0 to force immediate unload from VRAM."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
await client.post(
f"{self.base_url}/api/generate",
json={"model": model, "prompt": "", "keep_alive": 0},
)
except Exception as e:
print(f"[vram] flush {model} error: {e}", flush=True)
async def _poll_evicted(self, models: list[str], timeout: float) -> bool:
"""Poll /api/ps until none of the given models appear (or timeout)."""
deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(f"{self.base_url}/api/ps")
data = resp.json()
loaded = {m.get("name", "") for m in data.get("models", [])}
if not any(m in loaded for m in models):
return True
except Exception as e:
print(f"[vram] poll_evicted error: {e}", flush=True)
await asyncio.sleep(0.5)
return False
async def _prewarm(self, model: str):
"""Load model into VRAM with keep_alive=300 (5 min)."""
try:
async with httpx.AsyncClient(timeout=60.0) as client:
await client.post(
f"{self.base_url}/api/generate",
json={"model": model, "prompt": "", "keep_alive": 300},
)
print(f"[vram] pre-warmed {model}", flush=True)
except Exception as e:
print(f"[vram] prewarm {model} error: {e}", flush=True)