From ea77b2308b0f517095c3500c0c03e526f26dd0df Mon Sep 17 00:00:00 2001 From: Alvis Date: Sat, 28 Feb 2026 17:54:51 +0000 Subject: [PATCH] Add three-tier model routing with VRAM management and benchmark suite MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Three-tier routing: light (router answers directly ~3s), medium (qwen3:4b + tools ~60s), complex (/think prefix → qwen3:8b + subagents ~140s) - Router: qwen2.5:1.5b, temp=0, regex pre-classifier + raw-text LLM classify - VRAMManager: explicit flush/poll/prewarm to prevent Ollama CPU-spill bug - agent_factory: build_medium_agent and build_complex_agent using deepagents (TodoListMiddleware + SubAgentMiddleware with research/memory subagents) - Fix: split Telegram replies >4000 chars into multiple messages - Benchmark: 30 questions (easy/medium/hard) — 10/10/10 verified passing easy→light, medium→medium, hard→complex with VRAM flush confirmed Co-Authored-By: Claude Sonnet 4.6 --- ARCHITECTURE.md | 144 ++++--- Dockerfile | 10 + agent.py | 249 ++++++++--- agent_factory.py | 54 +++ docker-compose.yml | 43 ++ router.py | 138 +++++++ test_pipeline.py | 999 +++++++++++++++++++++++++++++++++++---------- vram_manager.py | 71 ++++ 8 files changed, 1400 insertions(+), 308 deletions(-) create mode 100644 Dockerfile create mode 100644 agent_factory.py create mode 100644 docker-compose.yml create mode 100644 router.py create mode 100644 vram_manager.py diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 43f31cc..5c98ec7 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -1,6 +1,6 @@ # Adolf -Persistent AI assistant reachable via Telegram. GPU-accelerated inference with long-term memory and web search. +Persistent AI assistant reachable via Telegram. Three-tier model routing with GPU VRAM management. ## Architecture @@ -11,67 +11,116 @@ Telegram user - grammY bot polls Telegram - on message: fire-and-forget POST /chat to deepagents - exposes MCP SSE server: tool send_telegram_message(chat_id, text) - ↕ fire-and-forget HTTP ↕ MCP SSE tool call + ↓ POST /chat → 202 Accepted immediately [deepagents] Python FastAPI — port 8000 - - POST /chat → 202 Accepted immediately - - background task: run LangGraph react agent - - LLM: qwen3:8b via Ollama GPU (host port 11436) - - tools: search_memory, get_all_memories, web_search - - after reply: async fire-and-forget → store memory on CPU - ↕ MCP SSE ↕ HTTP (SearXNG) -[openmemory] Python + mem0 — port 8765 [SearXNG — port 11437] - - MCP tools: add_memory, search_memory, get_all_memories - - mem0 backend: Qdrant (port 6333) + CPU Ollama (port 11435) - - embedder: nomic-embed-text (768 dims) - - extractor: gemma3:1b - - collection: adolf_memories + ↓ +Pre-check: starts with /think? → force_complex=True, strip prefix + ↓ +Router (qwen2.5:0.5b, ~1-2s, always warm in VRAM) + Structured output: {tier: light|medium|complex, confidence: 0.0-1.0, reply?: str} + - light: simple conversational → router answers directly, ~1-2s + - medium: needs memory/web search → qwen3:4b + deepagents tools + - complex: multi-step research, planning, code → qwen3:8b + subagents + force_complex always overrides to complex + complex only if confidence >= 0.85 (else downgraded to medium) + ↓ + ├── light ─────────── router reply used directly (no extra LLM call) + ├── medium ────────── deepagents qwen3:4b + TodoList + tools + └── complex ───────── VRAM flush → deepagents qwen3:8b + TodoList + subagents + └→ background: exit_complex_mode (flush 8b, prewarm 4b+router) + ↓ +send_telegram_message via grammy MCP + ↓ +asyncio.create_task(store_memory_async) — spin-wait GPU idle → openmemory add_memory + ↕ MCP SSE ↕ HTTP +[openmemory] Python + mem0 — port 8765 [SearXNG — port 11437] + - add_memory, search_memory, get_all_memories + - extractor: qwen2.5:1.5b on GPU Ollama (11436) — 2–5s + - embedder: nomic-embed-text on CPU Ollama (11435) — 50–150ms + - vector store: Qdrant (port 6333), 768 dims ``` -## Queuing and Concurrency +## Three-Tier Model Routing -Two semaphores prevent resource contention: +| Tier | Model | VRAM | Trigger | Latency | +|------|-------|------|---------|---------| +| Light | qwen2.5:1.5b (router answers) | ~1.2 GB (shared with extraction) | Router classifies as light | ~2–4s | +| Medium | qwen3:4b | ~2.5 GB | Default; router classifies medium | ~20–40s | +| Complex | qwen3:8b | ~5.5 GB | `/think` prefix | ~60–120s | + +**Normal VRAM** (light + medium): router/extraction(1.2, shared) + medium(2.5) = ~3.7 GB +**Complex VRAM**: 8b alone = ~5.5 GB — must flush others first + +### Router model: qwen2.5:1.5b (not 0.5b) + +qwen2.5:0.5b is too small for reliable classification — tends to output "medium" for everything +or produces nonsensical output. qwen2.5:1.5b is already loaded in VRAM for memory extraction, +so switching adds zero net VRAM overhead while dramatically improving accuracy. + +Router uses **raw text generation** (not structured output/JSON schema): +- Ask model to output one word: `light`, `medium`, or `complex` +- Parse with simple keyword matching (fallback: `medium`) +- For `light` tier: a second call generates the reply text + +## VRAM Management + +GTX 1070 has 8 GB VRAM. Ollama's auto-eviction can spill models to CPU RAM permanently +(all subsequent loads stay on CPU). To prevent this: + +1. **Always flush explicitly** before loading qwen3:8b (`keep_alive=0`) +2. **Verify eviction** via `/api/ps` poll (15s timeout) before proceeding +3. **Fallback**: timeout → log warning, run medium agent instead +4. **Post-complex**: flush 8b immediately, pre-warm 4b + router + +```python +# Flush (force immediate unload): +POST /api/generate {"model": "qwen3:4b", "prompt": "", "keep_alive": 0} + +# Pre-warm (load into VRAM for 5 min): +POST /api/generate {"model": "qwen3:4b", "prompt": "", "keep_alive": 300} +``` + +## Agents + +**Medium agent** (`build_medium_agent`): +- `create_deep_agent` with TodoListMiddleware (auto-included) +- Tools: `search_memory`, `get_all_memories`, `web_search` +- No subagents + +**Complex agent** (`build_complex_agent`): +- `create_deep_agent` with TodoListMiddleware + SubAgentMiddleware +- Tools: all agent tools +- Subagents: + - `research`: web_search only, for thorough multi-query web research + - `memory`: search_memory + get_all_memories, for comprehensive context retrieval + +## Concurrency | Semaphore | Guards | Notes | |-----------|--------|-------| -| `_reply_semaphore(1)` | GPU Ollama (qwen3:8b) | One LLM inference at a time | -| `_memory_semaphore(1)` | CPU Ollama (gemma3:1b) | One memory store at a time | +| `_reply_semaphore(1)` | GPU Ollama (all tiers) | One LLM reply inference at a time | +| `_memory_semaphore(1)` | GPU Ollama (qwen2.5:1.5b extraction) | One memory extraction at a time | -**Reply-first pipeline:** -1. User message arrives via Telegram → Grammy forwards to deepagents (fire-and-forget) -2. Deepagents queues behind `_reply_semaphore`, runs agent, sends reply via Grammy MCP tool -3. After reply is sent, `asyncio.create_task` fires `store_memory_async` in background -4. Memory task queues behind `_memory_semaphore`, calls `add_memory` on openmemory -5. openmemory uses CPU Ollama: embedding (~0.3s) + extraction (~1.6s) → stored in Qdrant +Light path holds `_reply_semaphore` briefly (no GPU inference). +Memory extraction spin-waits until `_reply_semaphore` is free (60s timeout). -Reply latency: ~10–18s (GPU qwen3:8b inference + tool calls). -Memory latency: ~5–16s (runs async, never blocks replies). +## Pipeline + +1. User message → Grammy → `POST /chat` → 202 Accepted +2. Background: acquire `_reply_semaphore` → route → run agent tier → send reply +3. `asyncio.create_task(store_memory_async)` — spin-waits GPU free, then extracts memories +4. For complex: `asyncio.create_task(exit_complex_mode)` — flushes 8b, pre-warms 4b+router ## External Services (from openai/ stack) | Service | Host Port | Role | |---------|-----------|------| -| Ollama GPU | 11436 | Main LLM (qwen3:8b) | -| Ollama CPU | 11435 | Memory embedding + extraction | +| Ollama GPU | 11436 | All reply inference + extraction (qwen2.5:1.5b) | +| Ollama CPU | 11435 | Memory embedding (nomic-embed-text) | | Qdrant | 6333 | Vector store for memories | | SearXNG | 11437 | Web search | -## Compose Stack - -Config: `agap_git/adolf/docker-compose.yml` - -```bash -cd agap_git/adolf -docker compose up -d -``` - -Requires `TELEGRAM_BOT_TOKEN` in `adolf/.env`. - -## Memory - -- Stored per `chat_id` (Telegram user ID) as `user_id` in mem0 -- Semantic search via Qdrant (cosine similarity, 768-dim nomic-embed-text vectors) -- mem0 uses gemma3:1b to extract structured facts before embedding -- Collection: `adolf_memories` in Qdrant +GPU Ollama config: `OLLAMA_MAX_LOADED_MODELS=2`, `OLLAMA_NUM_PARALLEL=1`. ## Files @@ -79,7 +128,10 @@ Requires `TELEGRAM_BOT_TOKEN` in `adolf/.env`. adolf/ ├── docker-compose.yml Services: deepagents, openmemory, grammy ├── Dockerfile deepagents container (Python 3.12) -├── agent.py FastAPI + LangGraph react agent +├── agent.py FastAPI + three-tier routing + run_agent_task +├── router.py Router class — qwen2.5:0.5b structured output routing +├── vram_manager.py VRAMManager — flush/prewarm/poll Ollama VRAM +├── agent_factory.py build_medium_agent / build_complex_agent (deepagents) ├── .env TELEGRAM_BOT_TOKEN (not committed) ├── openmemory/ │ ├── server.py FastMCP + mem0 MCP tools diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d4cd94f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.12-slim + +WORKDIR /app + +RUN pip install --no-cache-dir deepagents langchain-ollama langgraph \ + fastapi uvicorn langchain-mcp-adapters langchain-community httpx + +COPY agent.py vram_manager.py router.py agent_factory.py hello_world.py . + +CMD ["uvicorn", "agent:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/agent.py b/agent.py index c54952e..2ab6351 100644 --- a/agent.py +++ b/agent.py @@ -11,15 +11,23 @@ from langchain_ollama import ChatOllama from langchain_mcp_adapters.client import MultiServerMCPClient from langchain_community.utilities import SearxSearchWrapper from langchain_core.tools import Tool -from langgraph.prebuilt import create_react_agent + +from vram_manager import VRAMManager +from router import Router +from agent_factory import build_medium_agent, build_complex_agent OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") -MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:8b") +ROUTER_MODEL = os.getenv("DEEPAGENTS_ROUTER_MODEL", "qwen2.5:0.5b") +MEDIUM_MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:4b") +COMPLEX_MODEL = os.getenv("DEEPAGENTS_COMPLEX_MODEL", "qwen3:8b") SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437") OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765") GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001") -SYSTEM_PROMPT_TEMPLATE = ( +MAX_HISTORY_TURNS = 5 +_conversation_buffers: dict[str, list] = {} + +MEDIUM_SYSTEM_PROMPT = ( "You are a helpful AI assistant talking to a user via Telegram. " "The user's ID is {user_id}. " "IMPORTANT: When calling any memory tool (search_memory, get_all_memories), " @@ -28,33 +36,62 @@ SYSTEM_PROMPT_TEMPLATE = ( "you do NOT need to explicitly store anything. " "NEVER tell the user you cannot remember or store information. " "If the user asks you to remember something, acknowledge it and confirm it will be remembered. " - "Always call search_memory before answering to recall relevant past context. " - "Use web_search for questions about current events. " + "Use search_memory when context from past conversations may be relevant. " + "Use web_search for questions about current events or facts you don't know. " "Reply concisely." ) -agent = None +COMPLEX_SYSTEM_PROMPT = ( + "You are a capable AI assistant tackling a complex, multi-step task for a Telegram user. " + "The user's ID is {user_id}. " + "IMPORTANT: When calling any memory tool (search_memory, get_all_memories), " + "always use user_id=\"{user_id}\". " + "Plan your work using write_todos before diving in. " + "Delegate: use the 'research' subagent for thorough web research across multiple queries, " + "and the 'memory' subagent to gather comprehensive context from past conversations. " + "Every conversation is automatically saved to memory — you do NOT need to store anything. " + "NEVER tell the user you cannot remember or store information. " + "Produce a thorough, well-structured reply." +) + +medium_agent = None +complex_agent = None +router: Router = None +vram_manager: VRAMManager = None mcp_client = None send_tool = None add_memory_tool = None -# GPU semaphore: one LLM inference at a time +# GPU mutex: one LLM inference at a time _reply_semaphore = asyncio.Semaphore(1) -# CPU semaphore: one memory store at a time (runs on CPU Ollama, no GPU contention) +# Memory semaphore: one async extraction at a time _memory_semaphore = asyncio.Semaphore(1) @asynccontextmanager async def lifespan(app: FastAPI): - global agent, mcp_client, send_tool, add_memory_tool + global medium_agent, complex_agent, router, vram_manager + global mcp_client, send_tool, add_memory_tool - model = ChatOllama(model=MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=8192) + # Three model instances + router_model = ChatOllama( + model=ROUTER_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=4096, + temperature=0, # deterministic classification + ) + medium_model = ChatOllama( + model=MEDIUM_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=8192 + ) + complex_model = ChatOllama( + model=COMPLEX_MODEL, base_url=OLLAMA_BASE_URL, think=True, num_ctx=16384 + ) + + vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL) + router = Router(model=router_model) mcp_connections = { "openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"}, "grammy": {"transport": "sse", "url": f"{GRAMMY_URL}/sse"}, } - mcp_client = MultiServerMCPClient(mcp_connections) for attempt in range(12): try: @@ -66,10 +103,8 @@ async def lifespan(app: FastAPI): print(f"[agent] MCP not ready (attempt {attempt + 1}/12): {e}. Retrying in 5s...") await asyncio.sleep(5) - # Split tools: send is called by us, add_memory runs async after reply send_tool = next((t for t in mcp_tools if t.name == "send_telegram_message"), None) add_memory_tool = next((t for t in mcp_tools if t.name == "add_memory"), None) - # Agent only gets read/search tools — no add_memory (would block reply) agent_tools = [t for t in mcp_tools if t.name not in ("send_telegram_message", "add_memory")] searx = SearxSearchWrapper(searx_host=SEARXNG_URL) @@ -79,13 +114,30 @@ async def lifespan(app: FastAPI): description="Search the web for current information", )) - agent = create_react_agent(model, agent_tools) - print(f"[agent] ready — agent tools: {[t.name for t in agent_tools]}", flush=True) - print(f"[agent] async memory: add_memory via CPU Ollama (qwen2.5:1.5b + nomic-embed-text)", flush=True) + # Build agents (system_prompt filled per-request with user_id) + medium_agent = build_medium_agent( + model=medium_model, + agent_tools=agent_tools, + system_prompt=MEDIUM_SYSTEM_PROMPT.format(user_id="{user_id}"), + ) + complex_agent = build_complex_agent( + model=complex_model, + agent_tools=agent_tools, + system_prompt=COMPLEX_SYSTEM_PROMPT.format(user_id="{user_id}"), + ) + + print( + f"[agent] three-tier: router={ROUTER_MODEL} | medium={MEDIUM_MODEL} | complex={COMPLEX_MODEL}", + flush=True, + ) + print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True) yield - agent = None + medium_agent = None + complex_agent = None + router = None + vram_manager = None mcp_client = None send_tool = None add_memory_tool = None @@ -100,7 +152,13 @@ class ChatRequest(BaseModel): async def store_memory_async(conversation: str, user_id: str): - """Fire-and-forget: extract and store memories using CPU Ollama. Never blocks replies.""" + """Fire-and-forget: extract and store memories after GPU is free.""" + t_wait = time.monotonic() + while _reply_semaphore.locked(): + if time.monotonic() - t_wait > 60: + print(f"[memory] spin-wait timeout 60s, proceeding for user {user_id}", flush=True) + break + await asyncio.sleep(0.5) async with _memory_semaphore: t0 = time.monotonic() try: @@ -110,60 +168,137 @@ async def store_memory_async(conversation: str, user_id: str): print(f"[memory] error after {time.monotonic() - t0:.1f}s: {e}", flush=True) +def _extract_final_text(result) -> str | None: + """Extract last AIMessage content from agent result.""" + msgs = result.get("messages", []) + for m in reversed(msgs): + if type(m).__name__ == "AIMessage" and getattr(m, "content", ""): + return m.content + # deepagents may return output differently + if isinstance(result, dict) and result.get("output"): + return result["output"] + return None + + +def _log_messages(result): + msgs = result.get("messages", []) + for m in msgs: + role = type(m).__name__ + content = getattr(m, "content", "") + tool_calls = getattr(m, "tool_calls", []) + if content: + print(f"[agent] {role}: {str(content)[:150]}", flush=True) + for tc in tool_calls: + print(f"[agent] {role} → {tc['name']}({tc['args']})", flush=True) + + async def run_agent_task(message: str, chat_id: str): print(f"[agent] queued: {message[:80]!r} chat={chat_id}", flush=True) + + # Pre-check: /think prefix forces complex tier + force_complex = False + clean_message = message + if message.startswith("/think "): + force_complex = True + clean_message = message[len("/think "):] + print("[agent] /think prefix → force_complex=True", flush=True) + async with _reply_semaphore: t0 = time.monotonic() - print(f"[agent] running: {message[:80]!r}", flush=True) + history = _conversation_buffers.get(chat_id, []) + print(f"[agent] running: {clean_message[:80]!r}", flush=True) + + # Route the message + tier, light_reply = await router.route(clean_message, history, force_complex) + print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True) + + final_text = None try: - system_prompt = SYSTEM_PROMPT_TEMPLATE.format(user_id=chat_id) - result = await agent.ainvoke( - {"messages": [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": message}, - ]} - ) - llm_elapsed = time.monotonic() - t0 + if tier == "light": + final_text = light_reply + llm_elapsed = time.monotonic() - t0 + print(f"[agent] light path: answered by router", flush=True) - # Log trace - msgs = result.get("messages", []) - for m in msgs: - role = type(m).__name__ - content = getattr(m, "content", "") - tool_calls = getattr(m, "tool_calls", []) - if content: - print(f"[agent] {role}: {str(content)[:150]}", flush=True) - for tc in tool_calls: - print(f"[agent] {role} → {tc['name']}({tc['args']})", flush=True) + elif tier == "medium": + system_prompt = MEDIUM_SYSTEM_PROMPT.format(user_id=chat_id) + result = await medium_agent.ainvoke({ + "messages": [ + {"role": "system", "content": system_prompt}, + *history, + {"role": "user", "content": clean_message}, + ] + }) + llm_elapsed = time.monotonic() - t0 + _log_messages(result) + final_text = _extract_final_text(result) - # Send reply immediately - final_text = None - for m in reversed(msgs): - if type(m).__name__ == "AIMessage" and getattr(m, "content", ""): - final_text = m.content - break + else: # complex + ok = await vram_manager.enter_complex_mode() + if not ok: + print("[agent] complex→medium fallback (eviction timeout)", flush=True) + tier = "medium" + system_prompt = MEDIUM_SYSTEM_PROMPT.format(user_id=chat_id) + result = await medium_agent.ainvoke({ + "messages": [ + {"role": "system", "content": system_prompt}, + *history, + {"role": "user", "content": clean_message}, + ] + }) + else: + system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=chat_id) + result = await complex_agent.ainvoke({ + "messages": [ + {"role": "system", "content": system_prompt}, + *history, + {"role": "user", "content": clean_message}, + ] + }) + asyncio.create_task(vram_manager.exit_complex_mode()) - if final_text and send_tool: - t1 = time.monotonic() - await send_tool.ainvoke({"chat_id": chat_id, "text": final_text}) - print(f"[agent] replied in {time.monotonic() - t0:.1f}s (llm={llm_elapsed:.1f}s, send={time.monotonic()-t1:.1f}s)", flush=True) - elif not final_text: - print(f"[agent] warning: no text reply from agent", flush=True) - - # Async memoization: runs on CPU Ollama, does not block next reply - if add_memory_tool and final_text: - conversation = f"User: {message}\nAssistant: {final_text}" - asyncio.create_task(store_memory_async(conversation, chat_id)) + llm_elapsed = time.monotonic() - t0 + _log_messages(result) + final_text = _extract_final_text(result) except Exception as e: import traceback - print(f"[agent] error after {time.monotonic()-t0:.1f}s for chat {chat_id}: {e}", flush=True) + llm_elapsed = time.monotonic() - t0 + print(f"[agent] error after {llm_elapsed:.1f}s for chat {chat_id}: {e}", flush=True) traceback.print_exc() + # Send reply via grammy MCP (split if > Telegram's 4096-char limit) + if final_text and send_tool: + t1 = time.monotonic() + MAX_TG = 4000 # leave headroom below the 4096 hard limit + chunks = [final_text[i:i + MAX_TG] for i in range(0, len(final_text), MAX_TG)] + for chunk in chunks: + await send_tool.ainvoke({"chat_id": chat_id, "text": chunk}) + send_elapsed = time.monotonic() - t1 + # Log in format compatible with test_pipeline.py parser + print( + f"[agent] replied in {time.monotonic() - t0:.1f}s " + f"(llm={llm_elapsed:.1f}s, send={send_elapsed:.1f}s) tier={tier}", + flush=True, + ) + elif not final_text: + print("[agent] warning: no text reply from agent", flush=True) + + # Update conversation buffer + if final_text: + buf = _conversation_buffers.get(chat_id, []) + buf.append({"role": "user", "content": clean_message}) + buf.append({"role": "assistant", "content": final_text}) + _conversation_buffers[chat_id] = buf[-(MAX_HISTORY_TURNS * 2):] + + # Async memory storage (fire-and-forget) + if add_memory_tool and final_text: + conversation = f"User: {clean_message}\nAssistant: {final_text}" + asyncio.create_task(store_memory_async(conversation, chat_id)) + @app.post("/chat") async def chat(request: ChatRequest, background_tasks: BackgroundTasks): - if agent is None: + if medium_agent is None: return JSONResponse(status_code=503, content={"error": "Agent not ready"}) background_tasks.add_task(run_agent_task, request.message, request.chat_id) return JSONResponse(status_code=202, content={"status": "accepted"}) @@ -171,4 +306,4 @@ async def chat(request: ChatRequest, background_tasks: BackgroundTasks): @app.get("/health") async def health(): - return {"status": "ok", "agent_ready": agent is not None} + return {"status": "ok", "agent_ready": medium_agent is not None} diff --git a/agent_factory.py b/agent_factory.py new file mode 100644 index 0000000..5926504 --- /dev/null +++ b/agent_factory.py @@ -0,0 +1,54 @@ +from deepagents import create_deep_agent, SubAgent + + +def build_medium_agent(model, agent_tools: list, system_prompt: str): + """Medium agent: create_deep_agent with TodoList planning, no subagents.""" + return create_deep_agent( + model=model, + tools=agent_tools, + system_prompt=system_prompt, + ) + + +def build_complex_agent(model, agent_tools: list, system_prompt: str): + """Complex agent: create_deep_agent with TodoList planning + research/memory subagents.""" + web_tools = [t for t in agent_tools if getattr(t, "name", "") == "web_search"] + memory_tools = [ + t for t in agent_tools + if getattr(t, "name", "") in ("search_memory", "get_all_memories") + ] + + research_sub: SubAgent = { + "name": "research", + "description": ( + "Runs multiple web searches in parallel and synthesizes findings. " + "Use for thorough research tasks requiring several queries." + ), + "system_prompt": ( + "You are a research specialist. Search the web thoroughly using multiple queries. " + "Cite sources and synthesize information into a clear summary." + ), + "tools": web_tools, + "model": model, + } + + memory_sub: SubAgent = { + "name": "memory", + "description": ( + "Searches and retrieves all relevant memories about the user comprehensively. " + "Use to gather full context from past conversations." + ), + "system_prompt": ( + "You are a memory specialist. Search broadly using multiple queries. " + "Return all relevant facts and context you find." + ), + "tools": memory_tools, + "model": model, + } + + return create_deep_agent( + model=model, + tools=agent_tools, + system_prompt=system_prompt, + subagents=[research_sub, memory_sub], + ) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..2469c37 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,43 @@ +services: + deepagents: + build: . + container_name: deepagents + ports: + - "8000:8000" + environment: + - PYTHONUNBUFFERED=1 + - OLLAMA_BASE_URL=http://host.docker.internal:11436 + - DEEPAGENTS_MODEL=qwen3:4b + - DEEPAGENTS_COMPLEX_MODEL=qwen3:8b + - DEEPAGENTS_ROUTER_MODEL=qwen2.5:1.5b + - SEARXNG_URL=http://host.docker.internal:11437 + extra_hosts: + - "host.docker.internal:host-gateway" + depends_on: + - openmemory + - grammy + restart: unless-stopped + + openmemory: + build: ./openmemory + container_name: openmemory + ports: + - "8765:8765" + environment: + # Extraction LLM (qwen2.5:1.5b) runs on GPU after reply — fast 2-5s extraction + - OLLAMA_GPU_URL=http://host.docker.internal:11436 + # Embedding (nomic-embed-text) runs on CPU — fast enough for search (50-150ms) + - OLLAMA_CPU_URL=http://host.docker.internal:11435 + extra_hosts: + - "host.docker.internal:host-gateway" + restart: unless-stopped + + grammy: + build: ./grammy + container_name: grammy + ports: + - "3001:3001" + environment: + - TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN} + - DEEPAGENTS_URL=http://deepagents:8000 + restart: unless-stopped diff --git a/router.py b/router.py new file mode 100644 index 0000000..31bce20 --- /dev/null +++ b/router.py @@ -0,0 +1,138 @@ +import re +from typing import Optional +from langchain_core.messages import SystemMessage, HumanMessage + +# ── Regex pre-classifier ────────────────────────────────────────────────────── +# Catches obvious light-tier patterns before calling the LLM. +# Keyed by regex → compiled pattern. +_LIGHT_PATTERNS = re.compile( + r"^(" + # Greetings / farewells + r"hi|hello|hey|yo|sup|howdy|good morning|good evening|good night|good afternoon" + r"|bye|goodbye|see you|cya|later|ttyl" + # Acknowledgements / small talk + r"|thanks?|thank you|thx|ty|ok|okay|k|cool|great|awesome|perfect|sounds good|got it|nice|sure" + r"|how are you|how are you\?|how are you doing(\s+today)?[?!.]*" + r"|what.?s up" + # Calendar facts: "what day comes after X?" / "what comes after X?" + r"|what\s+day\s+(comes\s+after|follows|is\s+after)\s+\w+[?!.]*" + r"|what\s+comes\s+after\s+\w+[?!.]*" + # Acronym expansions: "what does X stand for?" + r"|what\s+does\s+\w+\s+stand\s+for[?!.]*" + r")[\s!.?]*$", + re.IGNORECASE, +) + +# ── LLM classification prompt ───────────────────────────────────────────────── +CLASSIFY_PROMPT = """Classify the message. Output ONLY one word: light, medium, or complex. + +LIGHT = answerable from general knowledge, no internet needed: + what is 2+2 / what is the capital of France / name the three primary colors + tell me a short joke / is the sky blue / is water wet + +MEDIUM = requires web search or the user's stored memories: + current weather / today's news / Bitcoin price / what did we talk about + +COMPLEX = /think prefix only: + /think compare frameworks / /think plan a trip + +Message: {message} +Output (one word only — light, medium, or complex):""" + +LIGHT_REPLY_PROMPT = """You are a helpful Telegram assistant. Answer briefly and naturally (1-3 sentences). Be friendly.""" + + +def _format_history(history: list[dict]) -> str: + if not history: + return "(none)" + lines = [] + for msg in history: + role = msg.get("role", "?") + content = str(msg.get("content", ""))[:200] + lines.append(f"{role}: {content}") + return "\n".join(lines) + + +def _parse_tier(text: str) -> str: + """Extract tier from raw model output. Default to medium.""" + t = text.strip().lower() + snippet = t[:60] + if "complex" in snippet: + return "complex" + if "medium" in snippet: + return "medium" + if "light" in snippet: + return "light" + # Model invented a descriptive category (e.g. "simplefact", "trivial", "basic") → + # treat as light since it recognised the question doesn't need tools + if any(w in snippet for w in ("simple", "fact", "trivial", "basic", "easy", "general")): + return "light" + return "medium" # safe default + + +class Router: + def __init__(self, model): + self.model = model + + async def route( + self, + message: str, + history: list[dict], + force_complex: bool = False, + ) -> tuple[str, Optional[str]]: + """ + Returns (tier, reply_or_None). + For light tier: also generates the reply with a second call. + For medium/complex: reply is None. + """ + if force_complex: + return "complex", None + + # Step 0: regex pre-classification for obvious light patterns + if _LIGHT_PATTERNS.match(message.strip()): + print(f"[router] regex→light", flush=True) + return await self._generate_light_reply(message, history) + + # Step 1: LLM classification with raw text output + try: + classify_response = await self.model.ainvoke([ + HumanMessage(content=CLASSIFY_PROMPT.format(message=message)), + ]) + raw = classify_response.content or "" + raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() + tier = _parse_tier(raw) + + if tier == "complex" and not message.startswith("/think"): + tier = "medium" + + print(f"[router] raw={raw[:30]!r} → tier={tier}", flush=True) + except Exception as e: + print(f"[router] classify error, defaulting to medium: {e}", flush=True) + return "medium", None + + if tier != "light": + return tier, None + + return await self._generate_light_reply(message, history) + + async def _generate_light_reply( + self, message: str, history: list[dict] + ) -> tuple[str, Optional[str]]: + """Generate a short reply using the router model for light-tier messages.""" + history_text = _format_history(history) + context = f"\nConversation history:\n{history_text}" if history else "" + try: + reply_response = await self.model.ainvoke([ + SystemMessage(content=LIGHT_REPLY_PROMPT + context), + HumanMessage(content=message), + ]) + reply_text = reply_response.content or "" + reply_text = re.sub(r".*?", "", reply_text, flags=re.DOTALL).strip() + if not reply_text: + print("[router] light reply empty, falling back to medium", flush=True) + return "medium", None + print(f"[router] light reply: {len(reply_text)} chars", flush=True) + return "light", reply_text + except Exception as e: + print(f"[router] light reply error, falling back to medium: {e}", flush=True) + return "medium", None diff --git a/test_pipeline.py b/test_pipeline.py index edacf6b..c696a41 100644 --- a/test_pipeline.py +++ b/test_pipeline.py @@ -1,307 +1,888 @@ #!/usr/bin/env python3 """ -Adolf pipeline integration test. +Adolf pipeline integration test with end-to-end timing profiling. Tests: 1. Service health (deepagents, openmemory, grammy MCP SSE) - 2. GPU Ollama reachability and model availability - 3. CPU Ollama reachability and model availability - 4. Qdrant reachability and adolf_memories collection - 5. SearXNG reachability and JSON results - 6. Full chat pipeline — POST /chat returns 202 immediately - 7. Async memory storage — memories appear in Qdrant after reply - 8. Memory recall — agent retrieves stored facts on next query - 9. Async timing — reply logged before memory stored (from deepagents logs) + 2. GPU Ollama models + 3. CPU Ollama models + 4. Qdrant collection + vector dims + 5. SearXNG + 6. Name store — "remember that your name is " + 7. Qdrant point added after store + 8. Name recall — "what is your name?" → reply contains + 9. Timing profile + bottleneck report + 10. Easy benchmark — 10 easy questions → all must route to light + 11. Medium benchmark — 10 medium questions → must route to medium (or light, never complex) + 12. Hard benchmark — 10 /think questions → all must route to complex; VRAM flush verified Usage: python3 test_pipeline.py [--chat-id CHAT_ID] + [--bench-only] skip sections 1-9, run 10+11+12 + [--easy-only] skip 1-9 and 11+12, run only section 10 + [--medium-only] skip 1-9 and 10+12, run only section 11 + [--hard-only] skip 1-9 and 10+11, run only section 12 + [--no-bench] skip sections 10-12 -Does NOT send real Telegram messages — calls deepagents /chat directly and -reads Qdrant to verify memory. Grammy delivery is confirmed via MCP tool -visible in deepagents logs ('[agent] replied in Xs ... send=Ys'). - -Known limitation: gemma3:1b (CPU extraction model) may abstract or -deduplicate memories rather than storing raw text verbatim. +Timing is extracted from deepagents container logs, not estimated from sleeps. """ import argparse import http.client import json import random +import re +import subprocess import sys import time -import urllib.error import urllib.request # ── config ──────────────────────────────────────────────────────────────────── -DEEPAGENTS = "http://localhost:8000" -OPENMEMORY = "http://localhost:8765" -GRAMMY_HOST = "localhost" -GRAMMY_PORT = 3001 -OLLAMA_GPU = "http://localhost:11436" -OLLAMA_CPU = "http://localhost:11435" -QDRANT = "http://localhost:6333" -SEARXNG = "http://localhost:11437" - +DEEPAGENTS = "http://localhost:8000" +OPENMEMORY = "http://localhost:8765" +GRAMMY_HOST = "localhost" +GRAMMY_PORT = 3001 +OLLAMA_GPU = "http://localhost:11436" +OLLAMA_CPU = "http://localhost:11435" +QDRANT = "http://localhost:6333" +SEARXNG = "http://localhost:11437" +COMPOSE_FILE = "/home/alvis/agap_git/adolf/docker-compose.yml" DEFAULT_CHAT_ID = "346967270" +NAMES = [ + "Maximilian", "Cornelius", "Zephyr", "Archibald", "Balthazar", + "Ignatius", "Lysander", "Octavian", "Reginald", "Sylvester", +] + +# ── benchmark questions ─────────────────────────────────────────────────────── +BENCHMARK = { + "easy": [ + "hi", + "what is 2+2?", + "what is the capital of France?", + "tell me a short joke", + "how are you doing today?", + "thanks!", + "what day comes after Wednesday?", + "name the three primary colors", + "is the sky blue?", + "what does CPU stand for?", + ], + "medium": [ + "what is the current weather in Berlin?", + "find the latest news about artificial intelligence", + "what is the current price of Bitcoin?", + "search for a good pasta carbonara recipe", + "what movies are in theaters this week?", + "find Python tutorials for beginners", + "who won the last FIFA World Cup?", + "do you remember what we talked about before?", + "search for the best coffee shops in Tokyo", + "what is happening in the tech industry this week?", + ], + "hard": [ + "/think compare the top 3 Python web frameworks (Django, FastAPI, Flask) and recommend one for a production REST API", + "/think research the history of artificial intelligence and create a timeline of key milestones", + "/think plan a 7-day trip to Japan with daily itinerary, accommodation suggestions, and budget breakdown", + "/think analyze microservices vs monolithic architecture: pros, cons, and when to choose each", + "/think write a Python script that reads a CSV file, cleans the data, and generates summary statistics", + "/think research quantum computing: explain the key concepts and how it differs from classical computing", + "/think compare PostgreSQL, MongoDB, and Redis — when to use each and what are the trade-offs?", + "/think create a comprehensive Docker deployment guide covering best practices for production", + "/think research climate change: summarize the latest IPCC findings and key data points", + "/think design a REST API with authentication, rate limiting, and proper error handling — provide architecture and code outline", + ], +} + PASS = "\033[32mPASS\033[0m" FAIL = "\033[31mFAIL\033[0m" INFO = "\033[36mINFO\033[0m" +WARN = "\033[33mWARN\033[0m" results = [] +timings = {} # label → float seconds | None +# ── helpers ─────────────────────────────────────────────────────────────────── + def report(name, ok, detail=""): tag = PASS if ok else FAIL - line = f" [{tag}] {name}" - if detail: - line += f" — {detail}" - print(line) + print(f" [{tag}] {name}" + (f" — {detail}" if detail else "")) results.append((name, ok)) +def tf(v): + """Format timing value.""" + return f"{v:6.2f}s" if v is not None else " n/a" + + def get(url, timeout=5): - req = urllib.request.Request(url) - with urllib.request.urlopen(req, timeout=timeout) as r: + with urllib.request.urlopen(urllib.request.Request(url), timeout=timeout) as r: return r.status, r.read().decode() -def post_json(url, payload, timeout=30): +def post_json(url, payload, timeout=10): data = json.dumps(payload).encode() - req = urllib.request.Request( - url, data=data, - headers={"Content-Type": "application/json"}, - method="POST" - ) + req = urllib.request.Request(url, data=data, + headers={"Content-Type": "application/json"}, + method="POST") with urllib.request.urlopen(req, timeout=timeout) as r: return r.status, json.loads(r.read().decode()) def check_sse(host, port, path): - """ - SSE endpoints stream indefinitely — urlopen would hang waiting for body. - Use http.client directly to read just the response status line and headers. - """ try: conn = http.client.HTTPConnection(host, port, timeout=5) conn.request("GET", path, headers={"Accept": "text/event-stream"}) r = conn.getresponse() - ok = r.status == 200 conn.close() - return ok, f"HTTP {r.status}" + return r.status == 200, f"HTTP {r.status}" except Exception as e: return False, str(e) +def qdrant_count(): + try: + _, body = get(f"{QDRANT}/collections/adolf_memories") + return json.loads(body).get("result", {}).get("points_count", 0) + except Exception: + return 0 + + +def fetch_logs(since_s=600): + """Return deepagents log lines from the last since_s seconds.""" + try: + r = subprocess.run( + ["docker", "compose", "-f", COMPOSE_FILE, "logs", "deepagents", + f"--since={int(since_s)}s", "--no-log-prefix"], + capture_output=True, text=True, timeout=15, + ) + return r.stdout.splitlines() + except Exception: + return [] + + +def parse_run_block(lines, msg_prefix): + """ + Scan log lines for the LAST '[agent] running: ' block. + Extracts reply timing, tier, and memory timing from that block. + + Returns dict or None if the reply has not appeared in logs yet. + Dict keys: + reply_total, llm, send, tier, reply_text — from "[agent] replied in ..." + memory_s — from "[memory] stored in ..." + memory_error — True if "[memory] error" found + """ + search = msg_prefix[:50] + start_idx = None + for i, line in enumerate(lines): + if "[agent] running:" in line and search in line: + start_idx = i # keep updating — we want the LAST occurrence + + if start_idx is None: + return None + + block = lines[start_idx:] + last_ai_text = None + reply_data = None + + for j, line in enumerate(block): + # Track last non-tool AIMessage (the final reply) + if "AIMessage:" in line and "→" not in line: + txt = line.split("AIMessage:", 1)[-1].strip() + if txt: + last_ai_text = txt + + # For light tier: router reply is stored in _conversation_buffers directly + # so there may be no AIMessage log — grab from tier=light line + if "[agent] tier=light" in line and "message=" in line: + # Extract preview text logged elsewhere if available + pass + + m = re.search(r"replied in ([\d.]+)s \(llm=([\d.]+)s, send=([\d.]+)s\)", line) + if m: + # Extract optional tier tag at end of line + tier_m = re.search(r"\btier=(\w+)", line) + tier = tier_m.group(1) if tier_m else "unknown" + reply_data = { + "reply_total": float(m.group(1)), + "llm": float(m.group(2)), + "send": float(m.group(3)), + "tier": tier, + "reply_text": last_ai_text, + "memory_s": None, + "memory_error": False, + "_j": j, + } + break + + if reply_data is None: + return None # reply not in logs yet + + # Memory line can appear after the next "[agent] running:" — no stop condition + for line in block[reply_data["_j"] + 1:]: + mm = re.search(r"\[memory\] stored in ([\d.]+)s", line) + if mm: + reply_data["memory_s"] = float(mm.group(1)) + break + if "[memory] error" in line: + reply_data["memory_error"] = True + break + + return reply_data + + +def wait_for(label, msg_prefix, timeout_s=200, need_memory=True): + """ + Poll deepagents logs until the message is fully processed. + Shows a live progress line. + Returns timing dict or None on timeout. + """ + t_start = time.monotonic() + deadline = t_start + timeout_s + tick = 0 + last_result = None + + while time.monotonic() < deadline: + # Window grows with elapsed time — never miss a line that appeared late + since = int(time.monotonic() - t_start) + 90 + lines = fetch_logs(since_s=since) + result = parse_run_block(lines, msg_prefix) + + if result: + last_result = result + has_mem = result["memory_s"] is not None or result["memory_error"] + if (not need_memory) or has_mem: + elapsed = time.monotonic() - t_start + print(f"\r [{label}] done after {elapsed:.0f}s{' ' * 30}") + return result + + time.sleep(4) + tick += 1 + rem = int(deadline - time.monotonic()) + if last_result: + phase = "waiting for memory..." if need_memory else "done" + else: + phase = "waiting for LLM reply..." + print(f"\r [{label}] {tick*4}s elapsed, {rem}s left — {phase} ", end="", flush=True) + + print(f"\r [{label}] TIMEOUT after {timeout_s}s{' ' * 30}") + return None + + +# ── args ────────────────────────────────────────────────────────────────────── +parser = argparse.ArgumentParser(description="Adolf pipeline test") +parser.add_argument("--chat-id", default=DEFAULT_CHAT_ID) +parser.add_argument("--bench-only", action="store_true", + help="Skip sections 1-9, run sections 10+11 (both benchmarks)") +parser.add_argument("--easy-only", action="store_true", + help="Skip sections 1-9 and 11, run only section 10 (easy benchmark)") +parser.add_argument("--medium-only", action="store_true", + help="Skip sections 1-9 and 10, run only section 11 (medium benchmark)") +parser.add_argument("--hard-only", action="store_true", + help="Skip sections 1-9 and 10+11, run only section 12 (hard benchmark)") +parser.add_argument("--no-bench", action="store_true", + help="Skip sections 10-12 (all benchmarks)") +args = parser.parse_args() +CHAT_ID = args.chat_id + +# Derived flags for readability +_skip_pipeline = args.bench_only or args.easy_only or args.medium_only or args.hard_only +_run_easy = not args.no_bench and not args.medium_only and not args.hard_only +_run_medium = not args.no_bench and not args.easy_only and not args.hard_only +_run_hard = not args.no_bench and not args.easy_only and not args.medium_only + +random_name = random.choice(NAMES) + +if not _skip_pipeline: + print(f"\n Test name : \033[1m{random_name}\033[0m") + print(f" Chat ID : {CHAT_ID}") + + # ── 1. service health ───────────────────────────────────────────────────────── -print(f"\n[{INFO}] 1. Service health") +if not _skip_pipeline: + print(f"\n[{INFO}] 1. Service health") + t0 = time.monotonic() -try: - status, body = get(f"{DEEPAGENTS}/health") - data = json.loads(body) - ok = status == 200 and data.get("agent_ready") is True - report("deepagents /health — agent_ready", ok, f"agent_ready={data.get('agent_ready')}") -except Exception as e: - report("deepagents /health", False, str(e)) + try: + status, body = get(f"{DEEPAGENTS}/health") + data = json.loads(body) + ok = status == 200 and data.get("agent_ready") is True + report("deepagents /health — agent_ready", ok, f"agent_ready={data.get('agent_ready')}") + except Exception as e: + report("deepagents /health", False, str(e)) -ok, detail = check_sse("localhost", 8765, "/sse") -report("openmemory /sse reachable (HTTP 200)", ok, detail) + ok, detail = check_sse("localhost", 8765, "/sse") + report("openmemory /sse reachable", ok, detail) -ok, detail = check_sse(GRAMMY_HOST, GRAMMY_PORT, "/sse") -report("grammy /sse reachable (HTTP 200)", ok, detail) + ok, detail = check_sse(GRAMMY_HOST, GRAMMY_PORT, "/sse") + report("grammy /sse reachable", ok, detail) + + timings["health_check"] = time.monotonic() - t0 # ── 2. GPU Ollama ───────────────────────────────────────────────────────────── -print(f"\n[{INFO}] 2. GPU Ollama (port 11436)") +if not _skip_pipeline: + print(f"\n[{INFO}] 2. GPU Ollama (port 11436)") + t0 = time.monotonic() -try: - status, body = get(f"{OLLAMA_GPU}/api/tags") - models = [m["name"] for m in json.loads(body).get("models", [])] - has_qwen = any("qwen3" in m for m in models) - report("GPU Ollama reachable", True, f"models: {models}") - report("qwen3:8b present on GPU Ollama", has_qwen) -except Exception as e: - report("GPU Ollama reachable", False, str(e)) - report("qwen3:8b present on GPU Ollama", False, "skipped") + try: + status, body = get(f"{OLLAMA_GPU}/api/tags") + models = [m["name"] for m in json.loads(body).get("models", [])] + has_qwen = any("qwen3" in m for m in models) + report("GPU Ollama reachable", True, f"models: {models}") + report("qwen3:8b present", has_qwen) + except Exception as e: + report("GPU Ollama reachable", False, str(e)) + report("qwen3:8b present", False, "skipped") + + timings["gpu_ollama_ping"] = time.monotonic() - t0 # ── 3. CPU Ollama ───────────────────────────────────────────────────────────── -print(f"\n[{INFO}] 3. CPU Ollama (port 11435)") +if not _skip_pipeline: + print(f"\n[{INFO}] 3. CPU Ollama (port 11435)") + t0 = time.monotonic() -try: - status, body = get(f"{OLLAMA_CPU}/api/tags") - models = [m["name"] for m in json.loads(body).get("models", [])] - has_embed = any("nomic-embed-text" in m for m in models) - has_qwen = any("qwen2.5:1.5b" in m for m in models) - report("CPU Ollama reachable", True, f"models: {models}") - report("nomic-embed-text present on CPU Ollama", has_embed) - report("qwen2.5:1.5b present on CPU Ollama", has_qwen) -except Exception as e: - report("CPU Ollama reachable", False, str(e)) - report("nomic-embed-text present on CPU Ollama", False, "skipped") - report("qwen2.5:1.5b present on CPU Ollama", False, "skipped") + try: + status, body = get(f"{OLLAMA_CPU}/api/tags") + models = [m["name"] for m in json.loads(body).get("models", [])] + has_embed = any("nomic-embed-text" in m for m in models) + report("CPU Ollama reachable", True, f"models: {models}") + report("nomic-embed-text present", has_embed) + except Exception as e: + report("CPU Ollama reachable", False, str(e)) + report("nomic-embed-text present", False, "skipped") + + timings["cpu_ollama_ping"] = time.monotonic() - t0 # ── 4. Qdrant ───────────────────────────────────────────────────────────────── -print(f"\n[{INFO}] 4. Qdrant (port 6333)") +if not _skip_pipeline: + print(f"\n[{INFO}] 4. Qdrant (port 6333)") + t0 = time.monotonic() -try: - status, body = get(f"{QDRANT}/collections") - collections = [c["name"] for c in json.loads(body).get("result", {}).get("collections", [])] - has_col = "adolf_memories" in collections - report("Qdrant reachable", True, f"collections: {collections}") - report("adolf_memories collection exists", has_col) -except Exception as e: - report("Qdrant reachable", False, str(e)) - report("adolf_memories collection exists", False, "skipped") + try: + status, body = get(f"{QDRANT}/collections") + cols = [c["name"] for c in json.loads(body).get("result", {}).get("collections", [])] + report("Qdrant reachable", True, f"collections: {cols}") + report("adolf_memories collection exists", "adolf_memories" in cols) + except Exception as e: + report("Qdrant reachable", False, str(e)) + report("adolf_memories collection exists", False, "skipped") -try: - status, body = get(f"{QDRANT}/collections/adolf_memories") - info = json.loads(body).get("result", {}) - dims = info.get("config", {}).get("params", {}).get("vectors", {}).get("size") - report("adolf_memories vector dims = 768", dims == 768, f"got {dims}") -except Exception as e: - report("adolf_memories collection info", False, str(e)) + try: + status, body = get(f"{QDRANT}/collections/adolf_memories") + info = json.loads(body).get("result", {}) + dims = info.get("config", {}).get("params", {}).get("vectors", {}).get("size") + report("vector dims = 768", dims == 768, f"got {dims}") + except Exception as e: + report("adolf_memories collection info", False, str(e)) + + timings["qdrant_ping"] = time.monotonic() - t0 # ── 5. SearXNG ──────────────────────────────────────────────────────────────── -print(f"\n[{INFO}] 5. SearXNG (port 11437)") - -try: +if not _skip_pipeline: + print(f"\n[{INFO}] 5. SearXNG (port 11437)") t0 = time.monotonic() - status, body = get(f"{SEARXNG}/search?q=test&format=json", timeout=15) - elapsed = time.monotonic() - t0 - data = json.loads(body) - n_results = len(data.get("results", [])) - report("SearXNG reachable + JSON format enabled", status == 200 and n_results > 0, - f"{n_results} results in {elapsed:.1f}s") - report("SearXNG response < 5s", elapsed < 5, f"{elapsed:.2f}s") -except Exception as e: - report("SearXNG reachable", False, str(e)) - report("SearXNG response < 5s", False, "skipped") + + try: + status, body = get(f"{SEARXNG}/search?q=test&format=json", timeout=15) + elapsed = time.monotonic() - t0 + n = len(json.loads(body).get("results", [])) + report("SearXNG reachable + JSON results", status == 200 and n > 0, f"{n} results in {elapsed:.1f}s") + report("SearXNG response < 5s", elapsed < 5, f"{elapsed:.2f}s") + timings["searxng_latency"] = elapsed + except Exception as e: + report("SearXNG reachable", False, str(e)) + report("SearXNG response < 5s", False, "skipped") + timings["searxng_latency"] = None + + timings["searxng_check"] = time.monotonic() - t0 -# ── 6. POST /chat returns 202 immediately ───────────────────────────────────── -print(f"\n[{INFO}] 6–8. Full pipeline (chat → reply → memory → recall)") -print(f" Using chat_id={DEFAULT_CHAT_ID}") +# ── 6–8. Name memory pipeline ───────────────────────────────────────────────── +if not _skip_pipeline: + print(f"\n[{INFO}] 6–8. Name memory pipeline") + print(f" chat_id={CHAT_ID} name={random_name}") -marker_word = f"testword{random.randint(1000, 9999)}" -marker_msg = f"My test marker for this run is: {marker_word}. Please acknowledge." + store_msg = f"remember that your name is {random_name}" + recall_msg = "what is your name?" -# Record point count before test so we can verify new points are added -try: - _, col_body = get(f"{QDRANT}/collections/adolf_memories") - points_before = json.loads(col_body).get("result", {}).get("points_count", 0) -except Exception: - points_before = 0 + pts_before = qdrant_count() + print(f" Qdrant points before: {pts_before}") -print(f"\n [send] '{marker_msg}'") -print(f" Qdrant points before: {points_before}") + # ── 6. Send store message ───────────────────────────────────────────────────── + print(f"\n [store] '{store_msg}'") + t_store = time.monotonic() -t_send = time.monotonic() -try: - status, resp = post_json(f"{DEEPAGENTS}/chat", - {"message": marker_msg, "chat_id": DEFAULT_CHAT_ID}, - timeout=5) - t_accepted = time.monotonic() - t_send - report("POST /chat returns 202 immediately (< 1s)", status == 202 and t_accepted < 1, - f"status={status}, t={t_accepted:.3f}s") -except Exception as e: - report("POST /chat returns 202 immediately", False, str(e)) - print(" Cannot continue pipeline tests.") - sys.exit(1) + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": store_msg, "chat_id": CHAT_ID}, timeout=5) + t_accept = time.monotonic() - t_store + report("POST /chat (store) returns 202 immediately", + status == 202 and t_accept < 1, f"status={status}, t={t_accept:.3f}s") + timings["store_http_accept"] = t_accept + except Exception as e: + report("POST /chat (store)", False, str(e)) + sys.exit(1) + + store = wait_for("store", store_msg, timeout_s=220, need_memory=True) + + if store: + timings["store_llm"] = store["llm"] + timings["store_send"] = store["send"] + timings["store_reply"] = store["reply_total"] + timings["store_memory"] = store["memory_s"] + report("Agent replied to store message", True, + f"{store['reply_total']:.1f}s total llm={store['llm']:.1f}s send={store['send']:.1f}s tier={store['tier']}") + if store["memory_s"] is not None: + report("Memory stored without error", True, f"{store['memory_s']:.1f}s") + elif store["memory_error"]: + report("Memory stored without error", False, "error in [memory] log") + else: + report("Memory stored without error", False, "not found in logs (still running?)") + print(f" Store reply: {store['reply_text']!r}") + else: + report("Agent replied to store message", False, "timeout") + report("Memory stored without error", False, "timeout") + sys.exit(1) + + # ── 7. Verify Qdrant ────────────────────────────────────────────────────────── + pts_after = qdrant_count() + new_pts = pts_after - pts_before + report("New memory point(s) added to Qdrant", new_pts > 0, + f"{pts_before} → {pts_after} (+{new_pts})") + timings["qdrant_new_points"] = new_pts + + # ── 8. Send recall message ──────────────────────────────────────────────────── + print(f"\n [recall] '{recall_msg}'") + t_recall = time.monotonic() + + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": recall_msg, "chat_id": CHAT_ID}, timeout=5) + t_accept2 = time.monotonic() - t_recall + report("POST /chat (recall) returns 202 immediately", + status == 202 and t_accept2 < 1, f"status={status}, t={t_accept2:.3f}s") + timings["recall_http_accept"] = t_accept2 + except Exception as e: + report("POST /chat (recall)", False, str(e)) + + recall = wait_for("recall", recall_msg, timeout_s=160, need_memory=False) + + if recall: + timings["recall_llm"] = recall["llm"] + timings["recall_send"] = recall["send"] + timings["recall_reply"] = recall["reply_total"] + report("Agent replied to recall message", True, + f"{recall['reply_total']:.1f}s total llm={recall['llm']:.1f}s send={recall['send']:.1f}s tier={recall['tier']}") + reply_text = recall["reply_text"] or "" + name_in_reply = random_name.lower() in reply_text.lower() + report(f"Reply contains '{random_name}'", name_in_reply, + f"reply: {reply_text[:120]!r}") + else: + report("Agent replied to recall message", False, "timeout") + report(f"Reply contains '{random_name}'", False, "no reply") -# ── 7. Async memory storage ─────────────────────────────────────────────────── -# Wait long enough for: GPU reply (~20s) + async CPU memory store (~20s) = ~40s -print(f" Waiting 50s for agent reply + async memory store…") -for i in range(10): - time.sleep(5) - print(f" …{(i+1)*5}s", end="\r") -print() +# ── 9. Timing profile ───────────────────────────────────────────────────────── +if not _skip_pipeline: + print(f"\n[{INFO}] 9. Timing profile") -try: - _, col_body = get(f"{QDRANT}/collections/adolf_memories") - points_after = json.loads(col_body).get("result", {}).get("points_count", 0) - new_points = points_after - points_before - report("New memory point(s) added to Qdrant after reply", new_points > 0, - f"{points_before} → {points_after} (+{new_points})") -except Exception as e: - report("Qdrant points after reply", False, str(e)) + W = 36 -# Inspect Qdrant payloads — the `data` field holds what mem0 stored -# Note: gemma3:1b may abstract/rewrite facts; raw marker_word may or may not appear -try: - _, scroll_body = post_json( - f"{QDRANT}/collections/adolf_memories/points/scroll", - {"limit": 50, "with_payload": True, "with_vector": False}, - timeout=10 + print(f"\n {'Stage':<{W}} {'Time':>8}") + print(f" {'─'*W} {'─'*8}") + + rows_store = [ + ("[GPU] HTTP accept — store turn", "store_http_accept"), + ("[GPU] qwen3:Xb inference — store turn","store_llm"), + ("[GPU] Telegram send — store turn", "store_send"), + ("[GPU] Total reply latency — store", "store_reply"), + ("[GPU] qwen2.5:1.5b+embed — async mem", "store_memory"), + ] + rows_recall = [ + ("[GPU] HTTP accept — recall turn", "recall_http_accept"), + ("[GPU] qwen3:Xb inference — recall", "recall_llm"), + ("[GPU] Telegram send — recall turn", "recall_send"), + ("[GPU] Total reply latency — recall", "recall_reply"), + ] + + for label, key in rows_store: + v = timings.get(key) + print(f" {label:<{W}} {tf(v):>8}") + + print(f" {'─'*W} {'─'*8}") + + for label, key in rows_recall: + v = timings.get(key) + print(f" {label:<{W}} {tf(v):>8}") + + # Bottleneck bar chart + print(f"\n Bottleneck analysis (each █ ≈ 5s):") + print(f" {'─'*(W+12)}") + + candidates = [ + ("[GPU] qwen3:Xb — store reply ", timings.get("store_llm") or 0), + ("[GPU] qwen3:Xb — recall reply", timings.get("recall_llm") or 0), + ("[GPU] qwen2.5:1.5b+embed (async)", timings.get("store_memory") or 0), + ("[net] SearXNG ", timings.get("searxng_latency") or 0), + ] + candidates.sort(key=lambda x: x[1], reverse=True) + + for label, t in candidates: + bar = "█" * min(int(t / 5), 24) + pct = "" + total_pipeline = (timings.get("store_reply") or 0) + (timings.get("store_memory") or 0) + if total_pipeline > 0: + pct = f" {t/total_pipeline*100:4.0f}%" + print(f" {label} {t:6.1f}s {bar}{pct}") + + print() + + +# ── 10. Tier routing benchmark — easy questions → light path ────────────────── +if _run_easy: + print(f"\n[{INFO}] 10. Tier routing benchmark") + print(f" Sending {len(BENCHMARK['easy'])} easy questions — all must route to 'light'") + print(f" Chat ID: {CHAT_ID}") + print() + + bench_results = [] # list of (question, tier, latency_s, ok) + LIGHT_TIMEOUT = 60 # seconds — light is fast but may queue behind prior messages + + for i, question in enumerate(BENCHMARK["easy"], 1): + tag = f"easy-{i:02d}" + short_q = question[:55] + print(f" [{tag}] {short_q!r}") + + # Send + t_send = time.monotonic() + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": question, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + print(f" → [{FAIL}] POST returned {status}") + bench_results.append((question, "?", None, False)) + continue + except Exception as e: + print(f" → [{FAIL}] POST error: {e}") + bench_results.append((question, "?", None, False)) + continue + + # Poll for reply + t_start = time.monotonic() + found = None + while time.monotonic() - t_start < LIGHT_TIMEOUT: + since = int(time.monotonic() - t_start) + 30 + lines = fetch_logs(since_s=since) + found = parse_run_block(lines, question) + if found: + break + time.sleep(1) + + elapsed = time.monotonic() - t_send + + if not found: + print(f" → [{FAIL}] no reply within {LIGHT_TIMEOUT}s") + bench_results.append((question, "timeout", None, False)) + continue + + tier = found.get("tier", "unknown") + is_light = (tier == "light") + tag_str = PASS if is_light else FAIL + print(f" → [{tag_str}] tier={tier} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s") + bench_results.append((question, tier, found["reply_total"], is_light)) + + # Brief pause between questions to keep logs clean + time.sleep(1) + + # Summary table + print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}") + print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*50}") + for idx, (q, tier, lat, ok) in enumerate(bench_results, 1): + lat_str = f"{lat:.1f}s" if lat is not None else "timeout" + ok_str = "✓" if ok else "✗" + print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:50]!r}") + + light_count = sum(1 for _, _, _, ok in bench_results if ok) + total_bench = len(bench_results) + lats = [lat for _, _, lat, ok in bench_results if ok and lat is not None] + avg_lat = sum(lats) / len(lats) if lats else 0 + + print(f"\n Light-path score: {light_count}/{total_bench}") + if lats: + print(f" Avg latency (light): {avg_lat:.1f}s " + f"min={min(lats):.1f}s max={max(lats):.1f}s") + + report(f"All easy questions routed to light ({light_count}/{total_bench})", + light_count == total_bench, + f"{light_count}/{total_bench} via light path, avg {avg_lat:.1f}s") + + +# ── 11. Medium benchmark — medium questions → medium or light, never complex ── +if _run_medium: + print(f"\n[{INFO}] 11. Medium routing benchmark") + print(f" Sending {len(BENCHMARK['medium'])} medium questions") + print(f" Expected: tier=medium (needs tools). Light is acceptable for factual questions.") + print(f" Fail condition: tier=complex or timeout.") + print(f" Chat ID: {CHAT_ID}") + print() + + # Questions where light is a valid alternative (model may know from training data) + LIGHT_ACCEPTABLE = { + "who won the last FIFA World Cup?", + "search for a good pasta carbonara recipe", + "find Python tutorials for beginners", + "search for the best coffee shops in Tokyo", + } + + med_results = [] # list of (question, tier, latency_s, correct) + MEDIUM_TIMEOUT = 120 # seconds — medium takes 20-100s, allow for queue buildup + + for i, question in enumerate(BENCHMARK["medium"], 1): + tag = f"med-{i:02d}" + short_q = question[:60] + print(f" [{tag}] {short_q!r}") + + # Send + t_send = time.monotonic() + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": question, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + print(f" → [{FAIL}] POST returned {status}") + med_results.append((question, "?", None, False)) + continue + except Exception as e: + print(f" → [{FAIL}] POST error: {e}") + med_results.append((question, "?", None, False)) + continue + + # Poll for reply + t_start = time.monotonic() + found = None + while time.monotonic() - t_start < MEDIUM_TIMEOUT: + since = int(time.monotonic() - t_start) + 60 + lines = fetch_logs(since_s=since) + found = parse_run_block(lines, question) + if found: + break + time.sleep(3) + + elapsed = time.monotonic() - t_send + + if not found: + print(f" → [{FAIL}] no reply within {MEDIUM_TIMEOUT}s") + med_results.append((question, "timeout", None, False)) + continue + + tier = found.get("tier", "unknown") + light_ok = question in LIGHT_ACCEPTABLE + + if tier == "medium": + correct = True + label = PASS + note = "medium ✓" + elif tier == "light": + correct = light_ok # light is only acceptable for certain questions + label = WARN if not light_ok else PASS + note = "light (acceptable)" if light_ok else "light (should be medium)" + elif tier == "complex": + correct = False + label = FAIL + note = "complex — wrong escalation" + else: + correct = False + label = FAIL + note = f"unknown tier {tier!r}" + + print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s") + med_results.append((question, tier, found["reply_total"], correct)) + + # Brief pause between questions + time.sleep(1) + + # Summary table + print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}") + print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*55}") + for idx, (q, tier, lat, ok) in enumerate(med_results, 1): + lat_str = f"{lat:.1f}s" if lat is not None else "timeout" + ok_str = "✓" if ok else ("~" if tier == "light" else "✗") + print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:55]!r}") + + total_med = len(med_results) + medium_count = sum(1 for _, tier, _, _ in med_results if tier == "medium") + light_count = sum(1 for _, tier, _, _ in med_results if tier == "light") + complex_count = sum(1 for _, tier, _, _ in med_results if tier == "complex") + timeout_count = sum(1 for _, tier, _, _ in med_results if tier == "timeout") + light_misroute = sum( + 1 for q, tier, _, _ in med_results + if tier == "light" and q not in LIGHT_ACCEPTABLE ) - points = scroll_body.get("result", {}).get("points", []) - all_data = [str(p.get("payload", {}).get("data", "")) for p in points] - marker_in_data = any(marker_word in d for d in all_data) + lats = [lat for _, _, lat, _ in med_results if lat is not None] + correct_count = medium_count + (light_count - light_misroute) + + print(f"\n Breakdown: medium={medium_count} light={light_count} complex={complex_count} timeout={timeout_count}") + if light_misroute: + print(f" [{WARN}] {light_misroute} question(s) answered via light when medium expected (check reply quality)") + if lats: + print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s") + + no_complex = complex_count == 0 + no_timeout = timeout_count == 0 + all_ok = no_complex and no_timeout + report( - f"Marker '{marker_word}' found verbatim in Qdrant payloads", - marker_in_data, - "(gemma3:1b may abstract facts — check logs if FAIL)" if not marker_in_data else "found" + f"Medium questions: no complex escalation ({medium_count + light_count}/{total_med} routed)", + no_complex, + f"medium={medium_count} light={light_count} complex={complex_count} timeout={timeout_count}", ) - if not marker_in_data and all_data: - print(f" Most recent stored data: {all_data[-1][:120]!r}") -except Exception as e: - report("Qdrant payload inspection", False, str(e)) + if not no_timeout: + report( + f"Medium questions: all completed within {MEDIUM_TIMEOUT}s", + False, + f"{timeout_count} question(s) timed out (increase MEDIUM_TIMEOUT or check agent logs)", + ) -# ── 8. Memory recall ────────────────────────────────────────────────────────── -recall_msg = f"What is the test marker word I just told you? (hint: it starts with 'testword')" -print(f"\n [recall] '{recall_msg}'") +# ── 12. Hard benchmark — /think questions → complex tier + VRAM flush verified ─ +if _run_hard: + print(f"\n[{INFO}] 12. Hard routing benchmark") + print(f" Sending {len(BENCHMARK['hard'])} /think questions — all must route to 'complex'") + print(f" Verifies: /think prefix → force_complex=True → VRAM flush → qwen3:8b inference") + print(f" Acceptable fallback: 'medium' if VRAM eviction timed out (logged warning)") + print(f" Fail condition: tier=light or timeout") + print(f" Chat ID: {CHAT_ID}") + print() -try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": recall_msg, "chat_id": DEFAULT_CHAT_ID}, - timeout=5) - report("Recall query accepted (202)", status == 202) -except Exception as e: - report("Recall query accepted", False, str(e)) + hard_results = [] # list of (question, tier, latency_s, ok) + COMPLEX_TIMEOUT = 300 # seconds — complex takes 60-180s + VRAM flush overhead -print(f" Waiting 35s for recall reply (check Telegram for actual answer)…") -for i in range(7): - time.sleep(5) - print(f" …{(i+1)*5}s", end="\r") -print() -print(f" NOTE: Check Telegram — the bot should reply with '{marker_word}'.") -print(f" Check deepagents logs for: search_memory tool call and correct result.") + # Log markers we expect to see for complex path + _VRAM_ENTER = "[vram] enter_complex_mode" + _VRAM_EXIT = "[vram] exit_complex_mode" + for i, question in enumerate(BENCHMARK["hard"], 1): + tag = f"hard-{i:02d}" + # Strip /think prefix for display + short_q = question[len("/think "):].strip()[:60] + print(f" [{tag}] /think {short_q!r}") -# ── 9. Async timing verification ────────────────────────────────────────────── -print(f"\n[{INFO}] 9. Async pipeline timing") + # Snapshot log window start time + t_send = time.monotonic() + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": question, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + print(f" → [{FAIL}] POST returned {status}") + hard_results.append((question, "?", None, False)) + continue + except Exception as e: + print(f" → [{FAIL}] POST error: {e}") + hard_results.append((question, "?", None, False)) + continue -# Verify two rapid POSTs both return 202 quickly (queuing, not blocking) -t0 = time.monotonic() -try: - s1, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": "async timing check one", "chat_id": DEFAULT_CHAT_ID}, - timeout=3) - s2, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": "async timing check two", "chat_id": DEFAULT_CHAT_ID}, - timeout=3) - t_both = time.monotonic() - t0 - report("Two consecutive POSTs both 202, total < 1s (fire-and-forget queue)", - s1 == 202 and s2 == 202 and t_both < 1, f"{t_both:.3f}s") -except Exception as e: - report("Consecutive POST queueing", False, str(e)) + # Poll for reply + t_start = time.monotonic() + found = None + while time.monotonic() - t_start < COMPLEX_TIMEOUT: + since = int(time.monotonic() - t_start) + 90 + lines = fetch_logs(since_s=since) + found = parse_run_block(lines, question[len("/think "):].strip()) + if found: + break + time.sleep(5) -print() -print(f" To confirm reply-before-memory async ordering, run:") -print(f" docker compose -f adolf/docker-compose.yml logs deepagents | grep -E 'replied|stored'") -print(f" Expected pattern per message:") -print(f" [agent] replied in Xs ← GPU reply first") -print(f" [memory] stored in Ys ← CPU memory after (Y > X - reply_time)") + elapsed = time.monotonic() - t_send + + if not found: + print(f" → [{FAIL}] no reply within {COMPLEX_TIMEOUT}s") + hard_results.append((question, "timeout", None, False)) + continue + + tier = found.get("tier", "unknown") + + if tier == "complex": + ok = True + label = PASS + note = "complex ✓" + elif tier == "medium": + # Acceptable fallback if VRAM eviction timed out + ok = True + label = WARN + note = "medium (VRAM fallback — check [vram] logs)" + else: + ok = False + label = FAIL + note = f"tier={tier} — unexpected" + + # Check if VRAM enter/exit were logged for this block + lines_block = fetch_logs(since_s=int(elapsed) + 120) + msg_key = question[len("/think "):].strip()[:40] + vram_enter_seen = any(_VRAM_ENTER in ln for ln in lines_block + if msg_key in ln or + any(msg_key[:15] in prev_ln + for prev_ln in lines_block[max(0, lines_block.index(ln)-10):lines_block.index(ln)])) + + # Simpler: just check the recent log window for enter/exit markers + recent = "\n".join(lines_block[-200:]) + vram_enter_seen = _VRAM_ENTER in recent + vram_exit_seen = _VRAM_EXIT in recent + + vram_note = "" + if tier == "complex": + if vram_enter_seen: + vram_note = " [vram:flush✓]" + else: + vram_note = f" [{WARN}:no vram flush log]" + + print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s{vram_note}") + hard_results.append((question, tier, found["reply_total"], ok)) + + # Pause to let exit_complex_mode background task complete before next question + # (flushes qwen3:8b and pre-warms 4b+router — avoids VRAM conflict on next enter) + time.sleep(5) + + # Summary table + print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question (/think ...)'}") + print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*55}") + for idx, (q, tier, lat, ok) in enumerate(hard_results, 1): + lat_str = f"{lat:.1f}s" if lat is not None else "timeout" + ok_str = "✓" if tier == "complex" else ("~" if tier == "medium" else "✗") + short = q[len("/think "):].strip()[:55] + print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {short!r}") + + total_hard = len(hard_results) + complex_count = sum(1 for _, t, _, _ in hard_results if t == "complex") + medium_fb = sum(1 for _, t, _, _ in hard_results if t == "medium") + light_count = sum(1 for _, t, _, _ in hard_results if t == "light") + timeout_count = sum(1 for _, t, _, _ in hard_results if t == "timeout") + lats = [lat for _, _, lat, _ in hard_results if lat is not None] + + print(f"\n Breakdown: complex={complex_count} medium(fallback)={medium_fb} light={light_count} timeout={timeout_count}") + if medium_fb: + print(f" [{WARN}] {medium_fb} question(s) fell back to medium (VRAM eviction timeout)") + if light_count: + print(f" [{FAIL}] {light_count} question(s) routed to light — /think prefix not detected") + if lats: + print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s") + + no_light = light_count == 0 + no_timeout = timeout_count == 0 + + report( + f"Hard questions routed to complex (not light) ({complex_count + medium_fb}/{total_hard})", + no_light and no_timeout, + f"complex={complex_count} medium_fallback={medium_fb} light={light_count} timeout={timeout_count}", + ) # ── summary ─────────────────────────────────────────────────────────────────── print(f"\n{'─'*55}") -total = len(results) +total = len(results) passed = sum(1 for _, ok in results if ok) failed = total - passed print(f"Results: {passed}/{total} passed", end="") @@ -314,3 +895,11 @@ if failed: else: print(" — all good") print() + +# Print benchmark reference +print(f"[{INFO}] Benchmark questions reference:") +for tier_name, questions in BENCHMARK.items(): + print(f"\n {tier_name.upper()} ({len(questions)} questions):") + for j, q in enumerate(questions, 1): + print(f" {j:2d}. {q}") +print() diff --git a/vram_manager.py b/vram_manager.py new file mode 100644 index 0000000..fbf0083 --- /dev/null +++ b/vram_manager.py @@ -0,0 +1,71 @@ +import asyncio +import os +import httpx + +OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") + + +class VRAMManager: + MEDIUM_MODELS = ["qwen3:4b", "qwen2.5:1.5b"] + COMPLEX_MODEL = "qwen3:8b" + + def __init__(self, base_url: str = OLLAMA_BASE_URL): + self.base_url = base_url + + async def enter_complex_mode(self) -> bool: + """Flush medium models before loading 8b. Returns False if eviction timed out.""" + print("[vram] enter_complex_mode: flushing medium models", flush=True) + await asyncio.gather(*[self._flush(m) for m in self.MEDIUM_MODELS]) + ok = await self._poll_evicted(self.MEDIUM_MODELS, timeout=15) + if ok: + print("[vram] enter_complex_mode: eviction confirmed, loading qwen3:8b", flush=True) + else: + print("[vram] enter_complex_mode: eviction timeout — falling back to medium", flush=True) + return ok + + async def exit_complex_mode(self): + """Flush 8b and pre-warm medium models. Run as background task after complex reply.""" + print("[vram] exit_complex_mode: flushing qwen3:8b", flush=True) + await self._flush(self.COMPLEX_MODEL) + print("[vram] exit_complex_mode: pre-warming medium models", flush=True) + await asyncio.gather(*[self._prewarm(m) for m in self.MEDIUM_MODELS]) + print("[vram] exit_complex_mode: done", flush=True) + + async def _flush(self, model: str): + """Send keep_alive=0 to force immediate unload from VRAM.""" + try: + async with httpx.AsyncClient(timeout=10.0) as client: + await client.post( + f"{self.base_url}/api/generate", + json={"model": model, "prompt": "", "keep_alive": 0}, + ) + except Exception as e: + print(f"[vram] flush {model} error: {e}", flush=True) + + async def _poll_evicted(self, models: list[str], timeout: float) -> bool: + """Poll /api/ps until none of the given models appear (or timeout).""" + deadline = asyncio.get_event_loop().time() + timeout + while asyncio.get_event_loop().time() < deadline: + try: + async with httpx.AsyncClient(timeout=5.0) as client: + resp = await client.get(f"{self.base_url}/api/ps") + data = resp.json() + loaded = {m.get("name", "") for m in data.get("models", [])} + if not any(m in loaded for m in models): + return True + except Exception as e: + print(f"[vram] poll_evicted error: {e}", flush=True) + await asyncio.sleep(0.5) + return False + + async def _prewarm(self, model: str): + """Load model into VRAM with keep_alive=300 (5 min).""" + try: + async with httpx.AsyncClient(timeout=60.0) as client: + await client.post( + f"{self.base_url}/api/generate", + json={"model": model, "prompt": "", "keep_alive": 300}, + ) + print(f"[vram] pre-warmed {model}", flush=True) + except Exception as e: + print(f"[vram] prewarm {model} error: {e}", flush=True)