From 021104f5103649fc3aa2bf666d9a7385a893ecc8 Mon Sep 17 00:00:00 2001 From: Alvis Date: Thu, 12 Mar 2026 16:02:57 +0000 Subject: [PATCH] Split monolithic test_pipeline.py into focused integration test scripts - common.py: shared config, URL constants, benchmark questions, all helpers (get, post_json, check_sse, qdrant_count, fetch_logs, parse_run_block, wait_for, etc.) - test_health.py: service health checks (deepagents, bifrost, GPU/CPU Ollama, Qdrant, SearXNG) - test_memory.py: name store/recall pipeline, memory benchmark (5 facts + 10 recalls), dedup test - test_routing.py: easy/medium/hard tier routing benchmarks with --easy/medium/hard-only flags - Removed test_pipeline.py Co-Authored-By: Claude Sonnet 4.6 --- CLAUDE.md | 22 +- tests/integration/common.py | 273 ++++++ tests/integration/test_health.py | 214 +++++ tests/integration/test_memory.py | 438 ++++++++++ tests/integration/test_pipeline.py | 1295 ---------------------------- tests/integration/test_routing.py | 317 +++++++ 6 files changed, 1255 insertions(+), 1304 deletions(-) create mode 100644 tests/integration/common.py create mode 100644 tests/integration/test_health.py create mode 100644 tests/integration/test_memory.py delete mode 100644 tests/integration/test_pipeline.py create mode 100644 tests/integration/test_routing.py diff --git a/CLAUDE.md b/CLAUDE.md index 5f6b428..0e077c4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -14,19 +14,23 @@ docker compose up --build python3 cli.py [--url http://localhost:8000] [--session cli-alvis] [--timeout 400] ``` -**Run integration tests:** +**Run integration tests** (from `tests/integration/`, require all Docker services running): ```bash -python3 test_pipeline.py [--chat-id CHAT_ID] +python3 test_health.py # service health: deepagents, bifrost, Ollama, Qdrant, SearXNG -# Selective sections: -python3 test_pipeline.py --bench-only # routing + memory benchmarks only (sections 10–13) -python3 test_pipeline.py --easy-only # light-tier routing benchmark -python3 test_pipeline.py --medium-only # medium-tier routing benchmark -python3 test_pipeline.py --hard-only # complex-tier + VRAM flush benchmark -python3 test_pipeline.py --memory-only # memory store/recall/dedup benchmark -python3 test_pipeline.py --no-bench # service health + single name store/recall only +python3 test_memory.py # name store/recall + memory benchmark + dedup +python3 test_memory.py --name-only # only name store/recall pipeline +python3 test_memory.py --bench-only # only 5-fact store + 10-question recall +python3 test_memory.py --dedup-only # only deduplication test + +python3 test_routing.py # all routing benchmarks (easy + medium + hard) +python3 test_routing.py --easy-only # light-tier routing benchmark +python3 test_routing.py --medium-only # medium-tier routing benchmark +python3 test_routing.py --hard-only # complex-tier + VRAM flush benchmark ``` +Shared config and helpers are in `tests/integration/common.py`. + ## Architecture Adolf is a multi-channel personal assistant. All LLM inference is routed through **Bifrost**, an open-source Go-based LLM gateway that adds retry logic, failover, and observability in front of Ollama. diff --git a/tests/integration/common.py b/tests/integration/common.py new file mode 100644 index 0000000..6390096 --- /dev/null +++ b/tests/integration/common.py @@ -0,0 +1,273 @@ +""" +Shared config, helpers, and utilities for Adolf integration tests. +""" + +import http.client +import json +import re +import subprocess +import time +import urllib.request + +# ── config ──────────────────────────────────────────────────────────────────── +DEEPAGENTS = "http://localhost:8000" +BIFROST = "http://localhost:8080" +OPENMEMORY = "http://localhost:8765" +GRAMMY_HOST = "localhost" +GRAMMY_PORT = 3001 +OLLAMA_GPU = "http://localhost:11436" +OLLAMA_CPU = "http://localhost:11435" +QDRANT = "http://localhost:6333" +SEARXNG = "http://localhost:11437" +COMPOSE_FILE = "/home/alvis/adolf/docker-compose.yml" +DEFAULT_CHAT_ID = "346967270" + +NAMES = [ + "Maximilian", "Cornelius", "Zephyr", "Archibald", "Balthazar", + "Ignatius", "Lysander", "Octavian", "Reginald", "Sylvester", +] + +BENCHMARK = { + "easy": [ + "hi", + "what is 2+2?", + "what is the capital of France?", + "tell me a short joke", + "how are you doing today?", + "thanks!", + "what day comes after Wednesday?", + "name the three primary colors", + "is the sky blue?", + "what does CPU stand for?", + ], + "medium": [ + "what is the current weather in Berlin?", + "find the latest news about artificial intelligence", + "what is the current price of Bitcoin?", + "search for a good pasta carbonara recipe", + "what movies are in theaters this week?", + "find Python tutorials for beginners", + "who won the last FIFA World Cup?", + "do you remember what we talked about before?", + "search for the best coffee shops in Tokyo", + "what is happening in the tech industry this week?", + "what's the weather like today?", + ], + "hard": [ + "/think compare the top 3 Python web frameworks (Django, FastAPI, Flask) and recommend one for a production REST API", + "/think research the history of artificial intelligence and create a timeline of key milestones", + "/think plan a 7-day trip to Japan with daily itinerary, accommodation suggestions, and budget breakdown", + "/think analyze microservices vs monolithic architecture: pros, cons, and when to choose each", + "/think write a Python script that reads a CSV file, cleans the data, and generates summary statistics", + "/think research quantum computing: explain the key concepts and how it differs from classical computing", + "/think compare PostgreSQL, MongoDB, and Redis — when to use each and what are the trade-offs?", + "/think create a comprehensive Docker deployment guide covering best practices for production", + "/think research climate change: summarize the latest IPCC findings and key data points", + "/think design a REST API with authentication, rate limiting, and proper error handling — provide architecture and code outline", + ], +} + +# ── terminal colours ────────────────────────────────────────────────────────── +PASS = "\033[32mPASS\033[0m" +FAIL = "\033[31mFAIL\033[0m" +INFO = "\033[36mINFO\033[0m" +WARN = "\033[33mWARN\033[0m" + + +# ── result helpers ──────────────────────────────────────────────────────────── + +def report(results: list, name: str, ok: bool, detail: str = ""): + tag = PASS if ok else FAIL + print(f" [{tag}] {name}" + (f" — {detail}" if detail else "")) + results.append((name, ok)) + + +def print_summary(results: list): + print(f"\n{'─'*55}") + total = len(results) + passed = sum(1 for _, ok in results if ok) + failed = total - passed + print(f"Results: {passed}/{total} passed", end="") + if failed: + print(f" ({failed} failed)\n") + print("Failed checks:") + for name, ok in results: + if not ok: + print(f" - {name}") + else: + print(" — all good") + print() + + +def tf(v): + """Format timing value.""" + return f"{v:6.2f}s" if v is not None else " n/a" + + +# ── HTTP helpers ────────────────────────────────────────────────────────────── + +def get(url, timeout=5): + with urllib.request.urlopen(urllib.request.Request(url), timeout=timeout) as r: + return r.status, r.read().decode() + + +def post_json(url, payload, timeout=10): + data = json.dumps(payload).encode() + req = urllib.request.Request( + url, data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=timeout) as r: + return r.status, json.loads(r.read().decode()) + + +def check_sse(host, port, path): + try: + conn = http.client.HTTPConnection(host, port, timeout=5) + conn.request("GET", path, headers={"Accept": "text/event-stream"}) + r = conn.getresponse() + conn.close() + return r.status == 200, f"HTTP {r.status}" + except Exception as e: + return False, str(e) + + +def qdrant_count(): + try: + _, body = get(f"{QDRANT}/collections/adolf_memories") + return json.loads(body).get("result", {}).get("points_count", 0) + except Exception: + return 0 + + +# ── log helpers ─────────────────────────────────────────────────────────────── + +def fetch_logs(since_s=600): + """Return deepagents log lines from the last since_s seconds.""" + try: + r = subprocess.run( + ["docker", "compose", "-f", COMPOSE_FILE, "logs", "deepagents", + f"--since={int(since_s)}s", "--no-log-prefix"], + capture_output=True, text=True, timeout=15, + ) + return r.stdout.splitlines() + except Exception: + return [] + + +def fetch_bifrost_logs(since_s=120): + """Return bifrost container log lines from the last since_s seconds.""" + try: + r = subprocess.run( + ["docker", "compose", "-f", COMPOSE_FILE, "logs", "bifrost", + f"--since={int(since_s)}s", "--no-log-prefix"], + capture_output=True, text=True, timeout=10, + ) + return r.stdout.splitlines() + except Exception: + return [] + + +def parse_run_block(lines, msg_prefix): + """ + Scan log lines for the LAST '[agent] running: ' block. + Extracts reply timing, tier, and memory timing from that block. + + Returns dict or None if the reply has not appeared in logs yet. + Dict keys: + reply_total, llm, send, tier, reply_text — from "[agent] replied in ..." + memory_s — from "[memory] stored in ..." + memory_error — True if "[memory] error" found + """ + search = msg_prefix[:50] + start_idx = None + for i, line in enumerate(lines): + if "[agent] running:" in line and search in line: + start_idx = i # keep updating — we want the LAST occurrence + + if start_idx is None: + return None + + block = lines[start_idx:] + last_ai_text = None + reply_data = None + + for j, line in enumerate(block): + if "AIMessage:" in line and "→" not in line: + txt = line.split("AIMessage:", 1)[-1].strip() + if txt: + last_ai_text = txt + + m = re.search(r"replied in ([\d.]+)s \(llm=([\d.]+)s, send=([\d.]+)s\)", line) + if m: + tier_m = re.search(r"\btier=(\w+)", line) + tier = tier_m.group(1) if tier_m else "unknown" + reply_data = { + "reply_total": float(m.group(1)), + "llm": float(m.group(2)), + "send": float(m.group(3)), + "tier": tier, + "reply_text": last_ai_text, + "memory_s": None, + "memory_error": False, + "_j": j, + } + break + + if reply_data is not None: + next_lines = block[reply_data["_j"] + 1: reply_data["_j"] + 3] + for line in next_lines: + if line.startswith("[agent] reply_text:"): + reply_data["reply_text"] = line[len("[agent] reply_text:"):].strip() + break + + if reply_data is None: + return None + + for line in block[reply_data["_j"] + 1:]: + mm = re.search(r"\[memory\] stored in ([\d.]+)s", line) + if mm: + reply_data["memory_s"] = float(mm.group(1)) + break + if "[memory] error" in line: + reply_data["memory_error"] = True + break + + return reply_data + + +def wait_for(label, msg_prefix, timeout_s=200, need_memory=True): + """ + Poll deepagents logs until the message is fully processed. + Shows a live progress line. Returns timing dict or None on timeout. + """ + t_start = time.monotonic() + deadline = t_start + timeout_s + tick = 0 + last_result = None + + while time.monotonic() < deadline: + since = int(time.monotonic() - t_start) + 90 + lines = fetch_logs(since_s=since) + result = parse_run_block(lines, msg_prefix) + + if result: + last_result = result + has_mem = result["memory_s"] is not None or result["memory_error"] + if (not need_memory) or has_mem: + elapsed = time.monotonic() - t_start + print(f"\r [{label}] done after {elapsed:.0f}s{' ' * 30}") + return result + + time.sleep(4) + tick += 1 + rem = int(deadline - time.monotonic()) + if last_result: + phase = "waiting for memory..." if need_memory else "done" + else: + phase = "waiting for LLM reply..." + print(f"\r [{label}] {tick*4}s elapsed, {rem}s left — {phase} ", end="", flush=True) + + print(f"\r [{label}] TIMEOUT after {timeout_s}s{' ' * 30}") + return None diff --git a/tests/integration/test_health.py b/tests/integration/test_health.py new file mode 100644 index 0000000..6bd1fec --- /dev/null +++ b/tests/integration/test_health.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +""" +Adolf service health integration tests. + +Checks: + 1. deepagents /health — agent_ready + 1b. openmemory /sse reachable + 1c. grammy /sse reachable + 2. Bifrost /health, /v1/models, direct inference, deepagents startup log + 3. GPU Ollama — reachable, qwen3:8b present + 4. CPU Ollama — reachable, nomic-embed-text present + 5. Qdrant — reachable, adolf_memories collection, vector dims=768 + 6. SearXNG — reachable, JSON results, latency < 5s + +Usage: + python3 test_health.py +""" + +import json +import sys +import time +import urllib.request + +from common import ( + DEEPAGENTS, BIFROST, GRAMMY_HOST, GRAMMY_PORT, + OLLAMA_GPU, OLLAMA_CPU, QDRANT, SEARXNG, COMPOSE_FILE, + INFO, FAIL, + report, print_summary, tf, + get, post_json, check_sse, fetch_logs, +) + +results = [] +timings = {} + + +# ── 1. Service health ───────────────────────────────────────────────────────── +print(f"\n[{INFO}] 1. Service health") +t0 = time.monotonic() + +try: + status, body = get(f"{DEEPAGENTS}/health") + data = json.loads(body) + ok = status == 200 and data.get("agent_ready") is True + report(results, "deepagents /health — agent_ready", ok, + f"agent_ready={data.get('agent_ready')}") +except Exception as e: + report(results, "deepagents /health", False, str(e)) + +ok, detail = check_sse("localhost", 8765, "/sse") +report(results, "openmemory /sse reachable", ok, detail) + +ok, detail = check_sse(GRAMMY_HOST, GRAMMY_PORT, "/sse") +report(results, "grammy /sse reachable", ok, detail) + +timings["health_check"] = time.monotonic() - t0 + + +# ── 2. Bifrost gateway ──────────────────────────────────────────────────────── +print(f"\n[{INFO}] 2. Bifrost gateway (port 8080)") +t0 = time.monotonic() + +try: + status, body = get(f"{BIFROST}/health", timeout=5) + report(results, "Bifrost /health reachable", status == 200, f"HTTP {status}") +except Exception as e: + report(results, "Bifrost /health reachable", False, str(e)) + +try: + status, body = get(f"{BIFROST}/v1/models", timeout=5) + data = json.loads(body) + model_ids = [m.get("id", "") for m in data.get("data", [])] + gpu_models = [m for m in model_ids if m.startswith("ollama/")] + report(results, "Bifrost lists ollama GPU models", len(gpu_models) > 0, + f"found: {gpu_models}") + for expected in ["ollama/qwen3:4b", "ollama/qwen3:8b", "ollama/qwen2.5:1.5b"]: + report(results, f" model {expected} listed", expected in model_ids) +except Exception as e: + report(results, "Bifrost /v1/models", False, str(e)) + +print(f" [bifrost-infer] POST /v1/chat/completions → ollama/qwen2.5:0.5b ...") +t_infer = time.monotonic() +try: + infer_payload = { + "model": "ollama/qwen2.5:0.5b", + "messages": [{"role": "user", "content": "Reply with exactly one word: pong"}], + "max_tokens": 16, + } + data = json.dumps(infer_payload).encode() + req = urllib.request.Request( + f"{BIFROST}/v1/chat/completions", + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=60) as r: + infer_status = r.status + infer_body = json.loads(r.read().decode()) + infer_elapsed = time.monotonic() - t_infer + reply_content = infer_body.get("choices", [{}])[0].get("message", {}).get("content", "") + used_model = infer_body.get("model", "") + report(results, "Bifrost → Ollama GPU inference succeeds", + infer_status == 200 and bool(reply_content), + f"{infer_elapsed:.1f}s model={used_model!r} reply={reply_content[:60]!r}") + timings["bifrost_direct_infer"] = infer_elapsed +except Exception as e: + report(results, "Bifrost → Ollama GPU inference succeeds", False, str(e)) + timings["bifrost_direct_infer"] = None + +try: + import subprocess + r = subprocess.run( + ["docker", "compose", "-f", COMPOSE_FILE, "logs", "deepagents", + "--since=3600s", "--no-log-prefix"], + capture_output=True, text=True, timeout=10, + ) + log_lines = r.stdout.splitlines() + bifrost_line = next( + (l for l in log_lines if "[agent] bifrost=" in l and "bifrost:8080" in l), + None, + ) + report(results, "deepagents startup log confirms bifrost URL", + bifrost_line is not None, + bifrost_line.strip() if bifrost_line else "line not found in logs") + if bifrost_line: + has_prefix = "router=ollama/" in bifrost_line and "medium=ollama/" in bifrost_line + report(results, "deepagents model names use ollama/ prefix", has_prefix, + bifrost_line.strip()) +except Exception as e: + report(results, "deepagents startup log check", False, str(e)) + +timings["bifrost_check"] = time.monotonic() - t0 + + +# ── 3. GPU Ollama ───────────────────────────────────────────────────────────── +print(f"\n[{INFO}] 3. GPU Ollama (port 11436)") +t0 = time.monotonic() + +try: + status, body = get(f"{OLLAMA_GPU}/api/tags") + models = [m["name"] for m in json.loads(body).get("models", [])] + has_qwen = any("qwen3" in m for m in models) + report(results, "GPU Ollama reachable", True, f"models: {models}") + report(results, "qwen3:8b present", has_qwen) +except Exception as e: + report(results, "GPU Ollama reachable", False, str(e)) + report(results, "qwen3:8b present", False, "skipped") + +timings["gpu_ollama_ping"] = time.monotonic() - t0 + + +# ── 4. CPU Ollama ───────────────────────────────────────────────────────────── +print(f"\n[{INFO}] 4. CPU Ollama (port 11435)") +t0 = time.monotonic() + +try: + status, body = get(f"{OLLAMA_CPU}/api/tags") + models = [m["name"] for m in json.loads(body).get("models", [])] + has_embed = any("nomic-embed-text" in m for m in models) + report(results, "CPU Ollama reachable", True, f"models: {models}") + report(results, "nomic-embed-text present", has_embed) +except Exception as e: + report(results, "CPU Ollama reachable", False, str(e)) + report(results, "nomic-embed-text present", False, "skipped") + +timings["cpu_ollama_ping"] = time.monotonic() - t0 + + +# ── 5. Qdrant ───────────────────────────────────────────────────────────────── +print(f"\n[{INFO}] 5. Qdrant (port 6333)") +t0 = time.monotonic() + +try: + status, body = get(f"{QDRANT}/collections") + cols = [c["name"] for c in json.loads(body).get("result", {}).get("collections", [])] + report(results, "Qdrant reachable", True, f"collections: {cols}") + report(results, "adolf_memories collection exists", "adolf_memories" in cols) +except Exception as e: + report(results, "Qdrant reachable", False, str(e)) + report(results, "adolf_memories collection exists", False, "skipped") + +try: + status, body = get(f"{QDRANT}/collections/adolf_memories") + info = json.loads(body).get("result", {}) + dims = info.get("config", {}).get("params", {}).get("vectors", {}).get("size") + report(results, "vector dims = 768", dims == 768, f"got {dims}") +except Exception as e: + report(results, "adolf_memories collection info", False, str(e)) + +timings["qdrant_ping"] = time.monotonic() - t0 + + +# ── 6. SearXNG ──────────────────────────────────────────────────────────────── +print(f"\n[{INFO}] 6. SearXNG (port 11437)") +t0 = time.monotonic() + +try: + status, body = get(f"{SEARXNG}/search?q=test&format=json", timeout=15) + elapsed = time.monotonic() - t0 + n = len(json.loads(body).get("results", [])) + report(results, "SearXNG reachable + JSON results", status == 200 and n > 0, + f"{n} results in {elapsed:.1f}s") + report(results, "SearXNG response < 5s", elapsed < 5, f"{elapsed:.2f}s") + timings["searxng_latency"] = elapsed +except Exception as e: + report(results, "SearXNG reachable", False, str(e)) + report(results, "SearXNG response < 5s", False, "skipped") + timings["searxng_latency"] = None + +timings["searxng_check"] = time.monotonic() - t0 + + +# ── summary ─────────────────────────────────────────────────────────────────── +print_summary(results) +sys.exit(0 if all(ok for _, ok in results) else 1) diff --git a/tests/integration/test_memory.py b/tests/integration/test_memory.py new file mode 100644 index 0000000..7d713b2 --- /dev/null +++ b/tests/integration/test_memory.py @@ -0,0 +1,438 @@ +#!/usr/bin/env python3 +""" +Adolf memory integration tests. + +Tests: + 1. Name store — POST "remember that your name is " + 2. Qdrant point — verifies a new vector was written after store + 3. Name recall — POST "what is your name?" → reply must contain + 4. Bifrost — verifies store/recall requests passed through Bifrost + 5. Timing profile — breakdown of store and recall latencies + 6. Memory benchmark — store 5 personal facts, recall with 10 questions + 7. Dedup test — same fact stored twice must not grow Qdrant by 2 points + +Usage: + python3 test_memory.py [--chat-id CHAT_ID] [--name-only] [--bench-only] [--dedup-only] +""" + +import argparse +import json +import random +import subprocess +import sys +import time +import urllib.request + +from common import ( + DEEPAGENTS, QDRANT, COMPOSE_FILE, DEFAULT_CHAT_ID, + NAMES, + INFO, PASS, FAIL, WARN, + report, print_summary, tf, + get, post_json, qdrant_count, fetch_logs, fetch_bifrost_logs, + parse_run_block, wait_for, +) + +# ── args ────────────────────────────────────────────────────────────────────── +parser = argparse.ArgumentParser(description="Adolf memory integration tests") +parser.add_argument("--chat-id", default=DEFAULT_CHAT_ID) +parser.add_argument("--name-only", action="store_true", help="Run only the name store/recall test") +parser.add_argument("--bench-only", action="store_true", help="Run only the memory benchmark") +parser.add_argument("--dedup-only", action="store_true", help="Run only the deduplication test") +args = parser.parse_args() + +CHAT_ID = args.chat_id +_only = args.name_only or args.bench_only or args.dedup_only +_run_name = not _only or args.name_only +_run_bench = not _only or args.bench_only +_run_dedup = not _only or args.dedup_only + +results = [] +timings = {} + +random_name = random.choice(NAMES) +TEST_CHAT_ID = f"{CHAT_ID}-{random_name.lower()}" + +if _run_name: + print(f"\n Test name : \033[1m{random_name}\033[0m") + print(f" Chat ID : {TEST_CHAT_ID}") + + +# ── 1–4. Name store / recall pipeline ──────────────────────────────────────── +if _run_name: + print(f"\n[{INFO}] 1. Name store / recall pipeline") + + store_msg = f"remember that your name is {random_name}" + recall_msg = "what is your name?" + + # Clear memories so each run starts clean + try: + post_json(f"{QDRANT}/collections/adolf_memories/points/delete", + {"filter": {}}, timeout=5) + except Exception: + pass + + pts_before = qdrant_count() + print(f" Qdrant points before: {pts_before}") + + # ── 1. Store ────────────────────────────────────────────────────────────── + print(f"\n [store] '{store_msg}'") + t_store = time.monotonic() + + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": store_msg, "chat_id": TEST_CHAT_ID}, timeout=5) + t_accept = time.monotonic() - t_store + report(results, "POST /chat (store) returns 202 immediately", + status == 202 and t_accept < 1, f"status={status}, t={t_accept:.3f}s") + timings["store_http_accept"] = t_accept + except Exception as e: + report(results, "POST /chat (store)", False, str(e)) + print_summary(results) + sys.exit(1) + + store = wait_for("store", store_msg, timeout_s=220, need_memory=True) + + if store: + timings.update({ + "store_llm": store["llm"], + "store_send": store["send"], + "store_reply": store["reply_total"], + "store_memory": store["memory_s"], + }) + report(results, "Agent replied to store message", True, + f"{store['reply_total']:.1f}s total llm={store['llm']:.1f}s " + f"send={store['send']:.1f}s tier={store['tier']}") + if store["memory_s"] is not None: + report(results, "Memory stored without error", True, f"{store['memory_s']:.1f}s") + elif store["memory_error"]: + report(results, "Memory stored without error", False, "error in [memory] log") + else: + report(results, "Memory stored without error", False, "not found in logs") + print(f" Store reply: {store['reply_text']!r}") + else: + report(results, "Agent replied to store message", False, "timeout") + report(results, "Memory stored without error", False, "timeout") + print_summary(results) + sys.exit(1) + + # ── 2. Qdrant point check ───────────────────────────────────────────────── + pts_after = qdrant_count() + new_pts = pts_after - pts_before + report(results, "New memory point(s) added to Qdrant", new_pts > 0, + f"{pts_before} → {pts_after} (+{new_pts})") + timings["qdrant_new_points"] = new_pts + + # ── 3. Recall ───────────────────────────────────────────────────────────── + print(f"\n [recall] '{recall_msg}'") + t_recall = time.monotonic() + + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": recall_msg, "chat_id": TEST_CHAT_ID}, timeout=5) + t_accept2 = time.monotonic() - t_recall + report(results, "POST /chat (recall) returns 202 immediately", + status == 202 and t_accept2 < 1, f"status={status}, t={t_accept2:.3f}s") + timings["recall_http_accept"] = t_accept2 + except Exception as e: + report(results, "POST /chat (recall)", False, str(e)) + + recall = wait_for("recall", recall_msg, timeout_s=160, need_memory=False) + + if recall: + timings.update({ + "recall_llm": recall["llm"], + "recall_send": recall["send"], + "recall_reply": recall["reply_total"], + }) + report(results, "Agent replied to recall message", True, + f"{recall['reply_total']:.1f}s total llm={recall['llm']:.1f}s " + f"send={recall['send']:.1f}s tier={recall['tier']}") + reply_text = recall["reply_text"] or "" + name_in_reply = random_name.lower() in reply_text.lower() + report(results, f"Reply contains '{random_name}'", name_in_reply, + f"reply: {reply_text[:120]!r}") + else: + report(results, "Agent replied to recall message", False, "timeout") + report(results, f"Reply contains '{random_name}'", False, "no reply") + + # ── 4. Bifrost pass-through check ───────────────────────────────────────── + bifrost_lines = fetch_bifrost_logs(since_s=300) + report(results, "Bifrost container has log output (requests forwarded)", + len(bifrost_lines) > 0, f"{len(bifrost_lines)} lines in bifrost logs") + bifrost_raw = "\n".join(bifrost_lines) + report(results, " Bifrost log shows AsyncOpenAI agent requests", + "AsyncOpenAI" in bifrost_raw, + f"{'found' if 'AsyncOpenAI' in bifrost_raw else 'NOT found'} in bifrost logs") + + # ── 5. Timing profile ───────────────────────────────────────────────────── + print(f"\n[{INFO}] 5. Timing profile") + W = 36 + print(f"\n {'Stage':<{W}} {'Time':>8}") + print(f" {'─'*W} {'─'*8}") + + for label, key in [ + ("[GPU] HTTP accept — store turn", "store_http_accept"), + ("[GPU] qwen3:Xb inference — store turn", "store_llm"), + ("[GPU] Telegram send — store turn", "store_send"), + ("[GPU] Total reply latency — store", "store_reply"), + ("[GPU] qwen2.5:1.5b+embed — async mem", "store_memory"), + ]: + print(f" {label:<{W}} {tf(timings.get(key)):>8}") + + print(f" {'─'*W} {'─'*8}") + + for label, key in [ + ("[GPU] HTTP accept — recall turn", "recall_http_accept"), + ("[GPU] qwen3:Xb inference — recall", "recall_llm"), + ("[GPU] Telegram send — recall turn", "recall_send"), + ("[GPU] Total reply latency — recall", "recall_reply"), + ]: + print(f" {label:<{W}} {tf(timings.get(key)):>8}") + + print(f"\n Bottleneck analysis (each █ ≈ 5s):") + print(f" {'─'*(W+12)}") + candidates = [ + ("[GPU] qwen3:Xb — store reply ", timings.get("store_llm") or 0), + ("[GPU] qwen3:Xb — recall reply", timings.get("recall_llm") or 0), + ("[GPU] qwen2.5:1.5b+embed (async)", timings.get("store_memory") or 0), + ] + candidates.sort(key=lambda x: x[1], reverse=True) + for label, t in candidates: + bar = "█" * min(int(t / 5), 24) + total_pipeline = (timings.get("store_reply") or 0) + (timings.get("store_memory") or 0) + pct = f" {t/total_pipeline*100:4.0f}%" if total_pipeline > 0 else "" + print(f" {label} {t:6.1f}s {bar}{pct}") + print() + + +# ── 6. Memory benchmark ─────────────────────────────────────────────────────── +if _run_bench: + _mem_name = random.choice(["Alice", "Bruno", "Camille", "Diego", "Elena", + "Farid", "Greta", "Hiroshi", "Irina", "Jonas"]) + _mem_city = random.choice(["Tokyo", "Berlin", "Cairo", "Sydney", "Oslo", + "Nairobi", "Lisbon", "Seoul", "Montreal", "Bangkok"]) + _mem_allergy = random.choice(["nuts", "gluten", "dairy", "shellfish", "eggs"]) + _mem_job = random.choice([ + ("software engineer", "startup"), + ("data scientist", "research lab"), + ("product manager", "tech company"), + ("DevOps engineer", "cloud provider"), + ]) + _mem_lang = random.choice(["Python", "Rust", "Go", "TypeScript", "Kotlin"]) + _mem_pet_name = random.choice(["Whiskers", "Biscuit", "Mango", "Pebble", "Shadow", + "Noodle", "Cheddar", "Cosmo", "Pippin", "Ziggy"]) + + print(f"\n[{INFO}] 6. Memory benchmark") + print(f" name={_mem_name} city={_mem_city} allergy={_mem_allergy} " + f"job={_mem_job[0]}@{_mem_job[1]} lang={_mem_lang} pet={_mem_pet_name}") + print(f" Storing 5 facts, then querying with 10 recall questions") + print(f" Chat ID: {CHAT_ID}") + print() + + # Wipe collection and restart openmemory for a clean slate + try: + req = urllib.request.Request(f"{QDRANT}/collections/adolf_memories", method="DELETE") + with urllib.request.urlopen(req, timeout=5): + pass + print(f" [{INFO}] Wiped adolf_memories collection") + except Exception as e: + print(f" [{WARN}] Could not wipe collection: {e}") + + try: + subprocess.run( + ["docker", "compose", "-f", COMPOSE_FILE, "restart", "openmemory"], + capture_output=True, timeout=30, + ) + time.sleep(6) + print(f" [{INFO}] Restarted openmemory — fresh collection ready") + except Exception as e: + print(f" [{WARN}] Could not restart openmemory: {e}") + + MEMORY_FACTS = [ + f"My name is {_mem_name} and I live in {_mem_city}", + f"I prefer vegetarian food and I'm allergic to {_mem_allergy}", + f"I work as a {_mem_job[0]} at a {_mem_job[1]}", + f"My favorite programming language is {_mem_lang}", + f"I have a cat named {_mem_pet_name}", + ] + + MEMORY_RECALLS = [ + ("What is my name?", [_mem_name.lower()]), + ("Where do I live?", [_mem_city.lower()]), + ("Do I have any food allergies?", [_mem_allergy.lower()]), + ("What is my job?", [_mem_job[0].split()[0].lower()]), + ("What programming language do I prefer?", [_mem_lang.lower()]), + ("Do I have any pets?", [_mem_pet_name.lower()]), + ("Am I vegetarian or do I eat meat?", ["vegetarian"]), + ("What city am I in?", [_mem_city.lower()]), + ("Tell me what you know about me", [_mem_name.lower(), _mem_city.lower()]), + ("What's the name of my pet?", [_mem_pet_name.lower()]), + ] + + STORE_TIMEOUT = 180 + RECALL_TIMEOUT = 180 + + print(f" Storing {len(MEMORY_FACTS)} facts...") + store_ok = 0 + for i, fact in enumerate(MEMORY_FACTS, 1): + print(f" [mem-store-{i:02d}] {fact!r}") + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": fact, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + print(f" → [{FAIL}] POST returned {status}") + continue + except Exception as e: + print(f" → [{FAIL}] POST error: {e}") + continue + + found = wait_for(f"mem-store-{i:02d}", fact, timeout_s=STORE_TIMEOUT, need_memory=True) + if found: + store_ok += 1 + print(f" → [{PASS}] stored tier={found['tier']} mem={found['memory_s']}s") + else: + print(f" → [{FAIL}] timeout") + + report(results, f"All memory facts stored ({store_ok}/{len(MEMORY_FACTS)})", + store_ok == len(MEMORY_FACTS)) + + # Wait for async extraction to settle + print(f"\n Waiting for memory extraction to settle (up to 60s)...") + _prev_count = -1 + _stable_ticks = 0 + _cur_count = 0 + for _ in range(30): + time.sleep(2) + try: + _, body = get(f"{QDRANT}/collections/adolf_memories") + _cur_count = json.loads(body).get("result", {}).get("points_count", 0) + except Exception: + _cur_count = _prev_count + if _cur_count == _prev_count: + _stable_ticks += 1 + if _stable_ticks >= 3: + break + else: + _stable_ticks = 0 + _prev_count = _cur_count + print(f" Memory settled: {_cur_count} points in Qdrant") + + print(f"\n Querying with {len(MEMORY_RECALLS)} recall questions...") + recall_results = [] + + for i, (question, keywords) in enumerate(MEMORY_RECALLS, 1): + print(f" [mem-recall-{i:02d}] {question!r}") + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": question, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + print(f" → [{FAIL}] POST returned {status}") + recall_results.append((question, keywords, None, False)) + continue + except Exception as e: + print(f" → [{FAIL}] POST error: {e}") + recall_results.append((question, keywords, None, False)) + continue + + t_start = time.monotonic() + found = None + while time.monotonic() - t_start < RECALL_TIMEOUT: + since = int(time.monotonic() - t_start) + 30 + lines = fetch_logs(since_s=since) + found = parse_run_block(lines, question) + if found: + break + time.sleep(2) + + if not found: + print(f" → [{FAIL}] timeout") + recall_results.append((question, keywords, None, False)) + continue + + reply_text = (found.get("reply_text") or "").lower() + hit_keywords = [kw for kw in keywords if kw.lower() in reply_text] + passed = len(hit_keywords) == len(keywords) + tag_str = PASS if passed else WARN + missing = [kw for kw in keywords if kw.lower() not in reply_text] + detail = f"tier={found['tier']} lat={found['reply_total']:.1f}s" + if missing: + detail += f" missing keywords: {missing}" + print(f" → [{tag_str}] {detail}") + recall_results.append((question, keywords, found.get("reply_text"), passed)) + time.sleep(1) + + print(f"\n {'#':<4} {'Pass':<5} {'Question':<45} {'Keywords'}") + print(f" {'─'*4} {'─'*5} {'─'*45} {'─'*30}") + for idx, (q, kws, reply, ok) in enumerate(recall_results, 1): + ok_str = "✓" if ok else "✗" + print(f" {ok_str} {idx:<3} {'yes' if ok else 'no':<5} {q[:45]:<45} {kws}") + + recall_pass = sum(1 for _, _, _, ok in recall_results if ok) + total_recall = len(recall_results) + print(f"\n Memory recall score: {recall_pass}/{total_recall}") + report(results, f"Memory recall ({recall_pass}/{total_recall} keywords found)", + recall_pass == total_recall, + f"{recall_pass}/{total_recall} questions had all expected keywords in reply") + + +# ── 7. Deduplication test ───────────────────────────────────────────────────── +if _run_dedup: + print(f"\n[{INFO}] 7. Memory deduplication test") + print(f" Sends the same fact twice — Qdrant point count must not increase by 2") + print(f" Chat ID: {CHAT_ID}") + print() + + DEDUP_TIMEOUT = 120 + _dedup_fact = f"My lucky number is {random.randint(1000, 9999)}" + print(f" Fact: {_dedup_fact!r}") + + pts_before = qdrant_count() + print(f" Qdrant points before: {pts_before}") + + print(f" [dedup-1] sending fact (first time)") + found1 = None + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + report(results, "Dedup: first POST accepted", False, f"status={status}") + else: + found1 = wait_for("dedup-1", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True) + if found1: + print(f" [dedup-1] stored tier={found1['tier']} mem={found1['memory_s']}s") + else: + print(f" [dedup-1] timeout") + except Exception as e: + report(results, "Dedup: first POST accepted", False, str(e)) + + pts_after_first = qdrant_count() + new_first = pts_after_first - pts_before + print(f" Qdrant after first send: {pts_before} → {pts_after_first} (+{new_first})") + + print(f" [dedup-2] sending same fact (second time)") + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + report(results, "Dedup: second POST accepted", False, f"status={status}") + else: + found2 = wait_for("dedup-2", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True) + if found2: + print(f" [dedup-2] stored tier={found2['tier']} mem={found2['memory_s']}s") + else: + print(f" [dedup-2] timeout") + except Exception as e: + report(results, "Dedup: second POST accepted", False, str(e)) + + pts_after_second = qdrant_count() + new_second = pts_after_second - pts_after_first + print(f" Qdrant after second send: {pts_after_first} → {pts_after_second} (+{new_second})") + + dedup_ok = new_second <= new_first + report(results, "Deduplication: second identical fact not added to Qdrant", dedup_ok, + f"first send: +{new_first} pts, second send: +{new_second} pts (want second ≤ first)") + + +# ── summary ─────────────────────────────────────────────────────────────────── +print_summary(results) +sys.exit(0 if all(ok for _, ok in results) else 1) diff --git a/tests/integration/test_pipeline.py b/tests/integration/test_pipeline.py deleted file mode 100644 index ce13775..0000000 --- a/tests/integration/test_pipeline.py +++ /dev/null @@ -1,1295 +0,0 @@ -#!/usr/bin/env python3 -""" -Adolf pipeline integration test with end-to-end timing profiling. - -Tests: - 1. Service health (deepagents, openmemory, grammy MCP SSE) - 2. GPU Ollama models - 3. CPU Ollama models - 4. Qdrant collection + vector dims - 5. SearXNG - 6. Name store — "remember that your name is " - 7. Qdrant point added after store - 8. Name recall — "what is your name?" → reply contains - 9. Timing profile + bottleneck report - 10. Easy benchmark — 10 easy questions → all must route to light - 11. Medium benchmark — 11 medium questions → must route to medium (or light, never complex) - 12. Hard benchmark — 10 /think questions → all must route to complex; VRAM flush verified - 13. Memory benchmark — store 5 facts, recall with 10 questions → verify keyword presence - 14. Dedup test — same fact sent twice → Qdrant must not grow by 2 - -Usage: - python3 test_pipeline.py [--chat-id CHAT_ID] - [--bench-only] skip sections 1-9, run 10+11+12+13 - [--easy-only] skip 1-9 and 11+12+13, run only section 10 - [--medium-only] skip 1-9 and 10+12+13, run only section 11 - [--hard-only] skip 1-9 and 10+11+13, run only section 12 - [--memory-only] skip 1-9 and 10+11+12, run only section 13 - [--no-bench] skip sections 10-13 - -Timing is extracted from deepagents container logs, not estimated from sleeps. -""" - -import argparse -import http.client -import json -import random -import re -import subprocess -import sys -import time -import urllib.request - -# ── config ──────────────────────────────────────────────────────────────────── -DEEPAGENTS = "http://localhost:8000" -BIFROST = "http://localhost:8080" -OPENMEMORY = "http://localhost:8765" -GRAMMY_HOST = "localhost" -GRAMMY_PORT = 3001 -OLLAMA_GPU = "http://localhost:11436" -OLLAMA_CPU = "http://localhost:11435" -QDRANT = "http://localhost:6333" -SEARXNG = "http://localhost:11437" -COMPOSE_FILE = "/home/alvis/adolf/docker-compose.yml" -DEFAULT_CHAT_ID = "346967270" - -NAMES = [ - "Maximilian", "Cornelius", "Zephyr", "Archibald", "Balthazar", - "Ignatius", "Lysander", "Octavian", "Reginald", "Sylvester", -] - -# ── benchmark questions ─────────────────────────────────────────────────────── -BENCHMARK = { - "easy": [ - "hi", - "what is 2+2?", - "what is the capital of France?", - "tell me a short joke", - "how are you doing today?", - "thanks!", - "what day comes after Wednesday?", - "name the three primary colors", - "is the sky blue?", - "what does CPU stand for?", - ], - "medium": [ - "what is the current weather in Berlin?", - "find the latest news about artificial intelligence", - "what is the current price of Bitcoin?", - "search for a good pasta carbonara recipe", - "what movies are in theaters this week?", - "find Python tutorials for beginners", - "who won the last FIFA World Cup?", - "do you remember what we talked about before?", - "search for the best coffee shops in Tokyo", - "what is happening in the tech industry this week?", - "what's the weather like today?", - ], - "hard": [ - "/think compare the top 3 Python web frameworks (Django, FastAPI, Flask) and recommend one for a production REST API", - "/think research the history of artificial intelligence and create a timeline of key milestones", - "/think plan a 7-day trip to Japan with daily itinerary, accommodation suggestions, and budget breakdown", - "/think analyze microservices vs monolithic architecture: pros, cons, and when to choose each", - "/think write a Python script that reads a CSV file, cleans the data, and generates summary statistics", - "/think research quantum computing: explain the key concepts and how it differs from classical computing", - "/think compare PostgreSQL, MongoDB, and Redis — when to use each and what are the trade-offs?", - "/think create a comprehensive Docker deployment guide covering best practices for production", - "/think research climate change: summarize the latest IPCC findings and key data points", - "/think design a REST API with authentication, rate limiting, and proper error handling — provide architecture and code outline", - ], -} - -PASS = "\033[32mPASS\033[0m" -FAIL = "\033[31mFAIL\033[0m" -INFO = "\033[36mINFO\033[0m" -WARN = "\033[33mWARN\033[0m" - -results = [] -timings = {} # label → float seconds | None - - -# ── helpers ─────────────────────────────────────────────────────────────────── - -def report(name, ok, detail=""): - tag = PASS if ok else FAIL - print(f" [{tag}] {name}" + (f" — {detail}" if detail else "")) - results.append((name, ok)) - - -def tf(v): - """Format timing value.""" - return f"{v:6.2f}s" if v is not None else " n/a" - - -def get(url, timeout=5): - with urllib.request.urlopen(urllib.request.Request(url), timeout=timeout) as r: - return r.status, r.read().decode() - - -def post_json(url, payload, timeout=10): - data = json.dumps(payload).encode() - req = urllib.request.Request(url, data=data, - headers={"Content-Type": "application/json"}, - method="POST") - with urllib.request.urlopen(req, timeout=timeout) as r: - return r.status, json.loads(r.read().decode()) - - -def check_sse(host, port, path): - try: - conn = http.client.HTTPConnection(host, port, timeout=5) - conn.request("GET", path, headers={"Accept": "text/event-stream"}) - r = conn.getresponse() - conn.close() - return r.status == 200, f"HTTP {r.status}" - except Exception as e: - return False, str(e) - - -def qdrant_count(): - try: - _, body = get(f"{QDRANT}/collections/adolf_memories") - return json.loads(body).get("result", {}).get("points_count", 0) - except Exception: - return 0 - - -def fetch_logs(since_s=600): - """Return deepagents log lines from the last since_s seconds.""" - try: - r = subprocess.run( - ["docker", "compose", "-f", COMPOSE_FILE, "logs", "deepagents", - f"--since={int(since_s)}s", "--no-log-prefix"], - capture_output=True, text=True, timeout=15, - ) - return r.stdout.splitlines() - except Exception: - return [] - - -def fetch_bifrost_logs(since_s=120): - """Return bifrost container log lines from the last since_s seconds.""" - try: - r = subprocess.run( - ["docker", "compose", "-f", COMPOSE_FILE, "logs", "bifrost", - f"--since={int(since_s)}s", "--no-log-prefix"], - capture_output=True, text=True, timeout=10, - ) - return r.stdout.splitlines() - except Exception: - return [] - - -def parse_run_block(lines, msg_prefix): - """ - Scan log lines for the LAST '[agent] running: ' block. - Extracts reply timing, tier, and memory timing from that block. - - Returns dict or None if the reply has not appeared in logs yet. - Dict keys: - reply_total, llm, send, tier, reply_text — from "[agent] replied in ..." - memory_s — from "[memory] stored in ..." - memory_error — True if "[memory] error" found - """ - search = msg_prefix[:50] - start_idx = None - for i, line in enumerate(lines): - if "[agent] running:" in line and search in line: - start_idx = i # keep updating — we want the LAST occurrence - - if start_idx is None: - return None - - block = lines[start_idx:] - last_ai_text = None - reply_data = None - - for j, line in enumerate(block): - # Track last non-tool AIMessage (the final reply) — truncated at 150 chars in logs, - # used only as fallback if reply_text line is absent (older server versions) - if "AIMessage:" in line and "→" not in line: - txt = line.split("AIMessage:", 1)[-1].strip() - if txt: - last_ai_text = txt - - m = re.search(r"replied in ([\d.]+)s \(llm=([\d.]+)s, send=([\d.]+)s\)", line) - if m: - # Extract optional tier tag at end of line - tier_m = re.search(r"\btier=(\w+)", line) - tier = tier_m.group(1) if tier_m else "unknown" - reply_data = { - "reply_total": float(m.group(1)), - "llm": float(m.group(2)), - "send": float(m.group(3)), - "tier": tier, - "reply_text": last_ai_text, # may be overwritten by reply_text line below - "memory_s": None, - "memory_error": False, - "_j": j, - } - break - - # Read full reply_text from dedicated log line (written immediately after replied-in line) - if reply_data is not None: - next_lines = block[reply_data["_j"] + 1: reply_data["_j"] + 3] - for line in next_lines: - if line.startswith("[agent] reply_text:"): - reply_data["reply_text"] = line[len("[agent] reply_text:"):].strip() - break - - if reply_data is None: - return None # reply not in logs yet - - # Memory line can appear after the next "[agent] running:" — no stop condition - for line in block[reply_data["_j"] + 1:]: - mm = re.search(r"\[memory\] stored in ([\d.]+)s", line) - if mm: - reply_data["memory_s"] = float(mm.group(1)) - break - if "[memory] error" in line: - reply_data["memory_error"] = True - break - - return reply_data - - -def wait_for(label, msg_prefix, timeout_s=200, need_memory=True): - """ - Poll deepagents logs until the message is fully processed. - Shows a live progress line. - Returns timing dict or None on timeout. - """ - t_start = time.monotonic() - deadline = t_start + timeout_s - tick = 0 - last_result = None - - while time.monotonic() < deadline: - # Window grows with elapsed time — never miss a line that appeared late - since = int(time.monotonic() - t_start) + 90 - lines = fetch_logs(since_s=since) - result = parse_run_block(lines, msg_prefix) - - if result: - last_result = result - has_mem = result["memory_s"] is not None or result["memory_error"] - if (not need_memory) or has_mem: - elapsed = time.monotonic() - t_start - print(f"\r [{label}] done after {elapsed:.0f}s{' ' * 30}") - return result - - time.sleep(4) - tick += 1 - rem = int(deadline - time.monotonic()) - if last_result: - phase = "waiting for memory..." if need_memory else "done" - else: - phase = "waiting for LLM reply..." - print(f"\r [{label}] {tick*4}s elapsed, {rem}s left — {phase} ", end="", flush=True) - - print(f"\r [{label}] TIMEOUT after {timeout_s}s{' ' * 30}") - return None - - -# ── args ────────────────────────────────────────────────────────────────────── -parser = argparse.ArgumentParser(description="Adolf pipeline test") -parser.add_argument("--chat-id", default=DEFAULT_CHAT_ID) -parser.add_argument("--bench-only", action="store_true", - help="Skip sections 1-9, run sections 10+11 (both benchmarks)") -parser.add_argument("--easy-only", action="store_true", - help="Skip sections 1-9 and 11, run only section 10 (easy benchmark)") -parser.add_argument("--medium-only", action="store_true", - help="Skip sections 1-9 and 10, run only section 11 (medium benchmark)") -parser.add_argument("--hard-only", action="store_true", - help="Skip sections 1-9 and 10+11, run only section 12 (hard benchmark)") -parser.add_argument("--memory-only", action="store_true", - help="Skip sections 1-9 and 10+11+12, run only section 13 (memory benchmark)") -parser.add_argument("--no-bench", action="store_true", - help="Skip sections 10-13 (all benchmarks)") -args = parser.parse_args() -CHAT_ID = args.chat_id - -# Derived flags for readability -_skip_pipeline = args.bench_only or args.easy_only or args.medium_only or args.hard_only or args.memory_only -_run_easy = not args.no_bench and not args.medium_only and not args.hard_only and not args.memory_only -_run_medium = not args.no_bench and not args.easy_only and not args.hard_only and not args.memory_only -_run_hard = not args.no_bench and not args.easy_only and not args.medium_only and not args.memory_only -_run_memory = not args.no_bench and not args.easy_only and not args.medium_only and not args.hard_only - -random_name = random.choice(NAMES) -# Use a unique chat_id per run to avoid cross-run history contamination -TEST_CHAT_ID = f"{CHAT_ID}-{random_name.lower()}" - -if not _skip_pipeline: - print(f"\n Test name : \033[1m{random_name}\033[0m") - print(f" Chat ID : {TEST_CHAT_ID}") - - -# ── 1. service health ───────────────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 1. Service health") - t0 = time.monotonic() - - try: - status, body = get(f"{DEEPAGENTS}/health") - data = json.loads(body) - ok = status == 200 and data.get("agent_ready") is True - report("deepagents /health — agent_ready", ok, f"agent_ready={data.get('agent_ready')}") - except Exception as e: - report("deepagents /health", False, str(e)) - - ok, detail = check_sse("localhost", 8765, "/sse") - report("openmemory /sse reachable", ok, detail) - - ok, detail = check_sse(GRAMMY_HOST, GRAMMY_PORT, "/sse") - report("grammy /sse reachable", ok, detail) - - timings["health_check"] = time.monotonic() - t0 - - -# ── 1b. Bifrost gateway ─────────────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 1b. Bifrost gateway (port 8080)") - t0 = time.monotonic() - - # Health ────────────────────────────────────────────────────────────────── - try: - status, body = get(f"{BIFROST}/health", timeout=5) - ok = status == 200 - report("Bifrost /health reachable", ok, f"HTTP {status}") - except Exception as e: - report("Bifrost /health reachable", False, str(e)) - - # Ollama GPU models listed ──────────────────────────────────────────────── - try: - status, body = get(f"{BIFROST}/v1/models", timeout=5) - data = json.loads(body) - model_ids = [m.get("id", "") for m in data.get("data", [])] - gpu_models = [m for m in model_ids if m.startswith("ollama/")] - report("Bifrost lists ollama GPU models", len(gpu_models) > 0, - f"found: {gpu_models}") - for expected in ["ollama/qwen3:4b", "ollama/qwen3:8b", "ollama/qwen2.5:1.5b"]: - report(f" model {expected} listed", expected in model_ids) - except Exception as e: - report("Bifrost /v1/models", False, str(e)) - - # Direct inference through Bifrost → GPU Ollama ─────────────────────────── - # Uses the smallest GPU model (qwen2.5:0.5b) to keep latency low. - print(f" [bifrost-infer] direct POST /v1/chat/completions → ollama/qwen2.5:0.5b ...") - t_infer = time.monotonic() - try: - infer_payload = { - "model": "ollama/qwen2.5:0.5b", - "messages": [{"role": "user", "content": "Reply with exactly one word: pong"}], - "max_tokens": 16, - } - infer_data = json.dumps(infer_payload).encode() - req = urllib.request.Request( - f"{BIFROST}/v1/chat/completions", - data=infer_data, - headers={"Content-Type": "application/json"}, - method="POST", - ) - with urllib.request.urlopen(req, timeout=60) as r: - infer_status = r.status - infer_body = json.loads(r.read().decode()) - infer_elapsed = time.monotonic() - t_infer - reply_content = infer_body.get("choices", [{}])[0].get("message", {}).get("content", "") - used_model = infer_body.get("model", "") - report("Bifrost → Ollama GPU inference succeeds", - infer_status == 200 and bool(reply_content), - f"{infer_elapsed:.1f}s model={used_model!r} reply={reply_content[:60]!r}") - timings["bifrost_direct_infer"] = infer_elapsed - except Exception as e: - report("Bifrost → Ollama GPU inference succeeds", False, str(e)) - timings["bifrost_direct_infer"] = None - - # deepagents is configured to route through Bifrost ─────────────────────── - # The startup log line "[agent] bifrost=http://bifrost:8080/v1 | ..." is emitted - # during lifespan setup and confirms deepagents is using Bifrost as the LLM gateway. - try: - r = subprocess.run( - ["docker", "compose", "-f", COMPOSE_FILE, "logs", "deepagents", - "--since=3600s", "--no-log-prefix"], - capture_output=True, text=True, timeout=10, - ) - log_lines = r.stdout.splitlines() - bifrost_line = next( - (l for l in log_lines if "[agent] bifrost=" in l and "bifrost:8080" in l), - None, - ) - report( - "deepagents startup log confirms bifrost URL", - bifrost_line is not None, - bifrost_line.strip() if bifrost_line else "line not found in logs", - ) - # Also confirm model names use provider/model format (ollama/...) - if bifrost_line: - has_prefix = "router=ollama/" in bifrost_line and "medium=ollama/" in bifrost_line - report("deepagents model names use ollama/ prefix", has_prefix, - bifrost_line.strip()) - except Exception as e: - report("deepagents startup log check", False, str(e)) - - timings["bifrost_check"] = time.monotonic() - t0 - - -# ── 2. GPU Ollama ───────────────────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 2. GPU Ollama (port 11436)") - t0 = time.monotonic() - - try: - status, body = get(f"{OLLAMA_GPU}/api/tags") - models = [m["name"] for m in json.loads(body).get("models", [])] - has_qwen = any("qwen3" in m for m in models) - report("GPU Ollama reachable", True, f"models: {models}") - report("qwen3:8b present", has_qwen) - except Exception as e: - report("GPU Ollama reachable", False, str(e)) - report("qwen3:8b present", False, "skipped") - - timings["gpu_ollama_ping"] = time.monotonic() - t0 - - -# ── 3. CPU Ollama ───────────────────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 3. CPU Ollama (port 11435)") - t0 = time.monotonic() - - try: - status, body = get(f"{OLLAMA_CPU}/api/tags") - models = [m["name"] for m in json.loads(body).get("models", [])] - has_embed = any("nomic-embed-text" in m for m in models) - report("CPU Ollama reachable", True, f"models: {models}") - report("nomic-embed-text present", has_embed) - except Exception as e: - report("CPU Ollama reachable", False, str(e)) - report("nomic-embed-text present", False, "skipped") - - timings["cpu_ollama_ping"] = time.monotonic() - t0 - - -# ── 4. Qdrant ───────────────────────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 4. Qdrant (port 6333)") - t0 = time.monotonic() - - try: - status, body = get(f"{QDRANT}/collections") - cols = [c["name"] for c in json.loads(body).get("result", {}).get("collections", [])] - report("Qdrant reachable", True, f"collections: {cols}") - report("adolf_memories collection exists", "adolf_memories" in cols) - except Exception as e: - report("Qdrant reachable", False, str(e)) - report("adolf_memories collection exists", False, "skipped") - - try: - status, body = get(f"{QDRANT}/collections/adolf_memories") - info = json.loads(body).get("result", {}) - dims = info.get("config", {}).get("params", {}).get("vectors", {}).get("size") - report("vector dims = 768", dims == 768, f"got {dims}") - except Exception as e: - report("adolf_memories collection info", False, str(e)) - - timings["qdrant_ping"] = time.monotonic() - t0 - - -# ── 5. SearXNG ──────────────────────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 5. SearXNG (port 11437)") - t0 = time.monotonic() - - try: - status, body = get(f"{SEARXNG}/search?q=test&format=json", timeout=15) - elapsed = time.monotonic() - t0 - n = len(json.loads(body).get("results", [])) - report("SearXNG reachable + JSON results", status == 200 and n > 0, f"{n} results in {elapsed:.1f}s") - report("SearXNG response < 5s", elapsed < 5, f"{elapsed:.2f}s") - timings["searxng_latency"] = elapsed - except Exception as e: - report("SearXNG reachable", False, str(e)) - report("SearXNG response < 5s", False, "skipped") - timings["searxng_latency"] = None - - timings["searxng_check"] = time.monotonic() - t0 - - -# ── 6–8. Name memory pipeline ───────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 6–8. Name memory pipeline") - print(f" chat_id={TEST_CHAT_ID} name={random_name}") - - store_msg = f"remember that your name is {random_name}" - recall_msg = "what is your name?" - - # Clear adolf_memories so each run starts clean (avoids cross-run stale memories) - try: - post_json(f"{QDRANT}/collections/adolf_memories/points/delete", - {"filter": {}}, timeout=5) - except Exception: - pass - - pts_before = qdrant_count() - print(f" Qdrant points before: {pts_before}") - - # ── 6. Send store message ───────────────────────────────────────────────────── - print(f"\n [store] '{store_msg}'") - t_store = time.monotonic() - - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": store_msg, "chat_id": TEST_CHAT_ID}, timeout=5) - t_accept = time.monotonic() - t_store - report("POST /chat (store) returns 202 immediately", - status == 202 and t_accept < 1, f"status={status}, t={t_accept:.3f}s") - timings["store_http_accept"] = t_accept - except Exception as e: - report("POST /chat (store)", False, str(e)) - sys.exit(1) - - store = wait_for("store", store_msg, timeout_s=220, need_memory=True) - - if store: - timings["store_llm"] = store["llm"] - timings["store_send"] = store["send"] - timings["store_reply"] = store["reply_total"] - timings["store_memory"] = store["memory_s"] - report("Agent replied to store message", True, - f"{store['reply_total']:.1f}s total llm={store['llm']:.1f}s send={store['send']:.1f}s tier={store['tier']}") - if store["memory_s"] is not None: - report("Memory stored without error", True, f"{store['memory_s']:.1f}s") - elif store["memory_error"]: - report("Memory stored without error", False, "error in [memory] log") - else: - report("Memory stored without error", False, "not found in logs (still running?)") - print(f" Store reply: {store['reply_text']!r}") - else: - report("Agent replied to store message", False, "timeout") - report("Memory stored without error", False, "timeout") - sys.exit(1) - - # ── 7. Verify Qdrant ────────────────────────────────────────────────────────── - pts_after = qdrant_count() - new_pts = pts_after - pts_before - report("New memory point(s) added to Qdrant", new_pts > 0, - f"{pts_before} → {pts_after} (+{new_pts})") - timings["qdrant_new_points"] = new_pts - - # ── 8. Send recall message ──────────────────────────────────────────────────── - print(f"\n [recall] '{recall_msg}'") - t_recall = time.monotonic() - - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": recall_msg, "chat_id": TEST_CHAT_ID}, timeout=5) - t_accept2 = time.monotonic() - t_recall - report("POST /chat (recall) returns 202 immediately", - status == 202 and t_accept2 < 1, f"status={status}, t={t_accept2:.3f}s") - timings["recall_http_accept"] = t_accept2 - except Exception as e: - report("POST /chat (recall)", False, str(e)) - - recall = wait_for("recall", recall_msg, timeout_s=160, need_memory=False) - - if recall: - timings["recall_llm"] = recall["llm"] - timings["recall_send"] = recall["send"] - timings["recall_reply"] = recall["reply_total"] - report("Agent replied to recall message", True, - f"{recall['reply_total']:.1f}s total llm={recall['llm']:.1f}s send={recall['send']:.1f}s tier={recall['tier']}") - reply_text = recall["reply_text"] or "" - name_in_reply = random_name.lower() in reply_text.lower() - report(f"Reply contains '{random_name}'", name_in_reply, - f"reply: {reply_text[:120]!r}") - else: - report("Agent replied to recall message", False, "timeout") - report(f"Reply contains '{random_name}'", False, "no reply") - - # ── 8b. Verify requests passed through Bifrost ──────────────────────────── - # After the store+recall round-trip, Bifrost logs must show forwarded - # requests. An empty Bifrost log means deepagents bypasses the gateway. - bifrost_lines = fetch_bifrost_logs(since_s=300) - report("Bifrost container has log output (requests forwarded)", - len(bifrost_lines) > 0, - f"{len(bifrost_lines)} lines in bifrost logs") - # Bifrost logs contain the request body; AsyncOpenAI user-agent confirms the path - bifrost_raw = "\n".join(bifrost_lines) - report(" Bifrost log shows AsyncOpenAI agent requests", - "AsyncOpenAI" in bifrost_raw, - f"{'found' if 'AsyncOpenAI' in bifrost_raw else 'NOT found'} in bifrost logs") - - -# ── 9. Timing profile ───────────────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 9. Timing profile") - - W = 36 - - print(f"\n {'Stage':<{W}} {'Time':>8}") - print(f" {'─'*W} {'─'*8}") - - rows_store = [ - ("[GPU] HTTP accept — store turn", "store_http_accept"), - ("[GPU] qwen3:Xb inference — store turn","store_llm"), - ("[GPU] Telegram send — store turn", "store_send"), - ("[GPU] Total reply latency — store", "store_reply"), - ("[GPU] qwen2.5:1.5b+embed — async mem", "store_memory"), - ] - rows_recall = [ - ("[GPU] HTTP accept — recall turn", "recall_http_accept"), - ("[GPU] qwen3:Xb inference — recall", "recall_llm"), - ("[GPU] Telegram send — recall turn", "recall_send"), - ("[GPU] Total reply latency — recall", "recall_reply"), - ] - - for label, key in rows_store: - v = timings.get(key) - print(f" {label:<{W}} {tf(v):>8}") - - print(f" {'─'*W} {'─'*8}") - - for label, key in rows_recall: - v = timings.get(key) - print(f" {label:<{W}} {tf(v):>8}") - - # Bottleneck bar chart - print(f"\n Bottleneck analysis (each █ ≈ 5s):") - print(f" {'─'*(W+12)}") - - candidates = [ - ("[GPU] qwen3:Xb — store reply ", timings.get("store_llm") or 0), - ("[GPU] qwen3:Xb — recall reply", timings.get("recall_llm") or 0), - ("[GPU] qwen2.5:1.5b+embed (async)", timings.get("store_memory") or 0), - ("[net] SearXNG ", timings.get("searxng_latency") or 0), - ] - candidates.sort(key=lambda x: x[1], reverse=True) - - for label, t in candidates: - bar = "█" * min(int(t / 5), 24) - pct = "" - total_pipeline = (timings.get("store_reply") or 0) + (timings.get("store_memory") or 0) - if total_pipeline > 0: - pct = f" {t/total_pipeline*100:4.0f}%" - print(f" {label} {t:6.1f}s {bar}{pct}") - - print() - - -# ── 10. Tier routing benchmark — easy questions → light path ────────────────── -if _run_easy: - print(f"\n[{INFO}] 10. Tier routing benchmark") - print(f" Sending {len(BENCHMARK['easy'])} easy questions — all must route to 'light'") - print(f" Chat ID: {CHAT_ID}") - print() - - bench_results = [] # list of (question, tier, latency_s, ok) - LIGHT_TIMEOUT = 60 # seconds — light is fast but may queue behind prior messages - - for i, question in enumerate(BENCHMARK["easy"], 1): - tag = f"easy-{i:02d}" - short_q = question[:55] - print(f" [{tag}] {short_q!r}") - - # Send - t_send = time.monotonic() - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": question, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - print(f" → [{FAIL}] POST returned {status}") - bench_results.append((question, "?", None, False)) - continue - except Exception as e: - print(f" → [{FAIL}] POST error: {e}") - bench_results.append((question, "?", None, False)) - continue - - # Poll for reply - t_start = time.monotonic() - found = None - while time.monotonic() - t_start < LIGHT_TIMEOUT: - since = int(time.monotonic() - t_start) + 30 - lines = fetch_logs(since_s=since) - found = parse_run_block(lines, question) - if found: - break - time.sleep(1) - - elapsed = time.monotonic() - t_send - - if not found: - print(f" → [{FAIL}] no reply within {LIGHT_TIMEOUT}s") - bench_results.append((question, "timeout", None, False)) - continue - - tier = found.get("tier", "unknown") - is_light = (tier == "light") - tag_str = PASS if is_light else FAIL - print(f" → [{tag_str}] tier={tier} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s") - bench_results.append((question, tier, found["reply_total"], is_light)) - - # Brief pause between questions to keep logs clean - time.sleep(1) - - # Summary table - print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}") - print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*50}") - for idx, (q, tier, lat, ok) in enumerate(bench_results, 1): - lat_str = f"{lat:.1f}s" if lat is not None else "timeout" - ok_str = "✓" if ok else "✗" - print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:50]!r}") - - light_count = sum(1 for _, _, _, ok in bench_results if ok) - total_bench = len(bench_results) - lats = [lat for _, _, lat, ok in bench_results if ok and lat is not None] - avg_lat = sum(lats) / len(lats) if lats else 0 - - print(f"\n Light-path score: {light_count}/{total_bench}") - if lats: - print(f" Avg latency (light): {avg_lat:.1f}s " - f"min={min(lats):.1f}s max={max(lats):.1f}s") - - report(f"All easy questions routed to light ({light_count}/{total_bench})", - light_count == total_bench, - f"{light_count}/{total_bench} via light path, avg {avg_lat:.1f}s") - - -# ── 11. Medium benchmark — medium questions → medium or light, never complex ── -if _run_medium: - print(f"\n[{INFO}] 11. Medium routing benchmark") - print(f" Sending {len(BENCHMARK['medium'])} medium questions") - print(f" Expected: tier=medium (needs tools). Light is acceptable for factual questions.") - print(f" Fail condition: tier=complex or timeout.") - print(f" Chat ID: {CHAT_ID}") - print() - - # Questions where light is a valid alternative (model may know from training data) - LIGHT_ACCEPTABLE = { - "who won the last FIFA World Cup?", - "search for a good pasta carbonara recipe", - "find Python tutorials for beginners", - "search for the best coffee shops in Tokyo", - } - - med_results = [] # list of (question, tier, latency_s, correct) - MEDIUM_TIMEOUT = 120 # seconds — medium takes 20-100s, allow for queue buildup - - for i, question in enumerate(BENCHMARK["medium"], 1): - tag = f"med-{i:02d}" - short_q = question[:60] - print(f" [{tag}] {short_q!r}") - - # Send - t_send = time.monotonic() - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": question, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - print(f" → [{FAIL}] POST returned {status}") - med_results.append((question, "?", None, False)) - continue - except Exception as e: - print(f" → [{FAIL}] POST error: {e}") - med_results.append((question, "?", None, False)) - continue - - # Poll for reply - t_start = time.monotonic() - found = None - while time.monotonic() - t_start < MEDIUM_TIMEOUT: - since = int(time.monotonic() - t_start) + 60 - lines = fetch_logs(since_s=since) - found = parse_run_block(lines, question) - if found: - break - time.sleep(3) - - elapsed = time.monotonic() - t_send - - if not found: - print(f" → [{FAIL}] no reply within {MEDIUM_TIMEOUT}s") - med_results.append((question, "timeout", None, False)) - continue - - tier = found.get("tier", "unknown") - light_ok = question in LIGHT_ACCEPTABLE - - if tier == "medium": - correct = True - label = PASS - note = "medium ✓" - elif tier == "light": - correct = light_ok # light is only acceptable for certain questions - label = WARN if not light_ok else PASS - note = "light (acceptable)" if light_ok else "light (should be medium)" - elif tier == "complex": - correct = False - label = FAIL - note = "complex — wrong escalation" - else: - correct = False - label = FAIL - note = f"unknown tier {tier!r}" - - print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s") - med_results.append((question, tier, found["reply_total"], correct)) - - # Brief pause between questions - time.sleep(1) - - # Summary table - print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}") - print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*55}") - for idx, (q, tier, lat, ok) in enumerate(med_results, 1): - lat_str = f"{lat:.1f}s" if lat is not None else "timeout" - ok_str = "✓" if ok else ("~" if tier == "light" else "✗") - print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:55]!r}") - - total_med = len(med_results) - medium_count = sum(1 for _, tier, _, _ in med_results if tier == "medium") - light_count = sum(1 for _, tier, _, _ in med_results if tier == "light") - complex_count = sum(1 for _, tier, _, _ in med_results if tier == "complex") - timeout_count = sum(1 for _, tier, _, _ in med_results if tier == "timeout") - light_misroute = sum( - 1 for q, tier, _, _ in med_results - if tier == "light" and q not in LIGHT_ACCEPTABLE - ) - lats = [lat for _, _, lat, _ in med_results if lat is not None] - correct_count = medium_count + (light_count - light_misroute) - - print(f"\n Breakdown: medium={medium_count} light={light_count} complex={complex_count} timeout={timeout_count}") - if light_misroute: - print(f" [{WARN}] {light_misroute} question(s) answered via light when medium expected (check reply quality)") - if lats: - print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s") - - no_complex = complex_count == 0 - no_timeout = timeout_count == 0 - all_ok = no_complex and no_timeout - - report( - f"Medium questions: no complex escalation ({medium_count + light_count}/{total_med} routed)", - no_complex, - f"medium={medium_count} light={light_count} complex={complex_count} timeout={timeout_count}", - ) - if not no_timeout: - report( - f"Medium questions: all completed within {MEDIUM_TIMEOUT}s", - False, - f"{timeout_count} question(s) timed out (increase MEDIUM_TIMEOUT or check agent logs)", - ) - - -# ── 12. Hard benchmark — /think questions → complex tier + VRAM flush verified ─ -if _run_hard: - print(f"\n[{INFO}] 12. Hard routing benchmark") - print(f" Sending {len(BENCHMARK['hard'])} /think questions — all must route to 'complex'") - print(f" Verifies: /think prefix → force_complex=True → VRAM flush → qwen3:8b inference") - print(f" Acceptable fallback: 'medium' if VRAM eviction timed out (logged warning)") - print(f" Fail condition: tier=light or timeout") - print(f" Chat ID: {CHAT_ID}") - print() - - hard_results = [] # list of (question, tier, latency_s, ok) - COMPLEX_TIMEOUT = 300 # seconds — complex takes 60-180s + VRAM flush overhead - - # Log markers we expect to see for complex path - _VRAM_ENTER = "[vram] enter_complex_mode" - _VRAM_EXIT = "[vram] exit_complex_mode" - - for i, question in enumerate(BENCHMARK["hard"], 1): - tag = f"hard-{i:02d}" - # Strip /think prefix for display - short_q = question[len("/think "):].strip()[:60] - print(f" [{tag}] /think {short_q!r}") - - # Snapshot log window start time - t_send = time.monotonic() - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": question, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - print(f" → [{FAIL}] POST returned {status}") - hard_results.append((question, "?", None, False)) - continue - except Exception as e: - print(f" → [{FAIL}] POST error: {e}") - hard_results.append((question, "?", None, False)) - continue - - # Poll for reply - t_start = time.monotonic() - found = None - while time.monotonic() - t_start < COMPLEX_TIMEOUT: - since = int(time.monotonic() - t_start) + 90 - lines = fetch_logs(since_s=since) - found = parse_run_block(lines, question[len("/think "):].strip()) - if found: - break - time.sleep(5) - - elapsed = time.monotonic() - t_send - - if not found: - print(f" → [{FAIL}] no reply within {COMPLEX_TIMEOUT}s") - hard_results.append((question, "timeout", None, False)) - continue - - tier = found.get("tier", "unknown") - - if tier == "complex": - ok = True - label = PASS - note = "complex ✓" - elif tier == "medium": - # Acceptable fallback if VRAM eviction timed out - ok = True - label = WARN - note = "medium (VRAM fallback — check [vram] logs)" - else: - ok = False - label = FAIL - note = f"tier={tier} — unexpected" - - # Check if VRAM enter/exit were logged for this block - lines_block = fetch_logs(since_s=int(elapsed) + 120) - msg_key = question[len("/think "):].strip()[:40] - vram_enter_seen = any(_VRAM_ENTER in ln for ln in lines_block - if msg_key in ln or - any(msg_key[:15] in prev_ln - for prev_ln in lines_block[max(0, lines_block.index(ln)-10):lines_block.index(ln)])) - - # Simpler: just check the recent log window for enter/exit markers - recent = "\n".join(lines_block[-200:]) - vram_enter_seen = _VRAM_ENTER in recent - vram_exit_seen = _VRAM_EXIT in recent - - vram_note = "" - if tier == "complex": - if vram_enter_seen: - vram_note = " [vram:flush✓]" - else: - vram_note = f" [{WARN}:no vram flush log]" - - print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s{vram_note}") - hard_results.append((question, tier, found["reply_total"], ok)) - - # Pause to let exit_complex_mode background task complete before next question - # (flushes qwen3:8b and pre-warms 4b+router — avoids VRAM conflict on next enter) - time.sleep(5) - - # Summary table - print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question (/think ...)'}") - print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*55}") - for idx, (q, tier, lat, ok) in enumerate(hard_results, 1): - lat_str = f"{lat:.1f}s" if lat is not None else "timeout" - ok_str = "✓" if tier == "complex" else ("~" if tier == "medium" else "✗") - short = q[len("/think "):].strip()[:55] - print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {short!r}") - - total_hard = len(hard_results) - complex_count = sum(1 for _, t, _, _ in hard_results if t == "complex") - medium_fb = sum(1 for _, t, _, _ in hard_results if t == "medium") - light_count = sum(1 for _, t, _, _ in hard_results if t == "light") - timeout_count = sum(1 for _, t, _, _ in hard_results if t == "timeout") - lats = [lat for _, _, lat, _ in hard_results if lat is not None] - - print(f"\n Breakdown: complex={complex_count} medium(fallback)={medium_fb} light={light_count} timeout={timeout_count}") - if medium_fb: - print(f" [{WARN}] {medium_fb} question(s) fell back to medium (VRAM eviction timeout)") - if light_count: - print(f" [{FAIL}] {light_count} question(s) routed to light — /think prefix not detected") - if lats: - print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s") - - no_light = light_count == 0 - no_timeout = timeout_count == 0 - - report( - f"Hard questions routed to complex (not light) ({complex_count + medium_fb}/{total_hard})", - no_light and no_timeout, - f"complex={complex_count} medium_fallback={medium_fb} light={light_count} timeout={timeout_count}", - ) - - -# ── 13. Memory benchmark — store facts, recall with keyword verification ─────── -if _run_memory: - _mem_name = random.choice([ - "Alice", "Bruno", "Camille", "Diego", "Elena", - "Farid", "Greta", "Hiroshi", "Irina", "Jonas", - ]) - _mem_city = random.choice([ - "Tokyo", "Berlin", "Cairo", "Sydney", "Oslo", - "Nairobi", "Lisbon", "Seoul", "Montreal", "Bangkok", - ]) - _mem_allergy = random.choice(["nuts", "gluten", "dairy", "shellfish", "eggs"]) - _mem_job = random.choice([ - ("software engineer", "startup"), - ("data scientist", "research lab"), - ("product manager", "tech company"), - ("DevOps engineer", "cloud provider"), - ]) - _mem_lang = random.choice(["Python", "Rust", "Go", "TypeScript", "Kotlin"]) - _mem_pet_name = random.choice([ - "Whiskers", "Biscuit", "Mango", "Pebble", "Shadow", - "Noodle", "Cheddar", "Cosmo", "Pippin", "Ziggy", - ]) - - print(f"\n[{INFO}] 13. Memory benchmark") - print(f" name={_mem_name} city={_mem_city} allergy={_mem_allergy} " - f"job={_mem_job[0]}@{_mem_job[1]} lang={_mem_lang} pet={_mem_pet_name}") - print(f" Storing 5 facts, then querying with 10 recall questions") - print(f" Chat ID: {CHAT_ID}") - print() - - # Wipe Qdrant collection and restart openmemory to eliminate stale data interference. - # Deleting the collection alone causes 404s — openmemory holds a live reference to it. - try: - import urllib.request as _ur - _req = _ur.Request(f"{QDRANT}/collections/adolf_memories", method="DELETE") - with _ur.urlopen(_req, timeout=5): - pass - print(f" [{INFO}] Wiped adolf_memories collection") - except Exception as e: - print(f" [{WARN}] Could not wipe collection: {e}") - - try: - subprocess.run( - ["docker", "compose", "-f", COMPOSE_FILE, "restart", "openmemory"], - capture_output=True, timeout=30, - ) - time.sleep(6) # wait for openmemory to reinitialize and recreate collection - print(f" [{INFO}] Restarted openmemory — fresh collection ready") - except Exception as e: - print(f" [{WARN}] Could not restart openmemory: {e}") - - MEMORY_FACTS = [ - f"My name is {_mem_name} and I live in {_mem_city}", - f"I prefer vegetarian food and I'm allergic to {_mem_allergy}", - f"I work as a {_mem_job[0]} at a {_mem_job[1]}", - f"My favorite programming language is {_mem_lang}", - f"I have a cat named {_mem_pet_name}", - ] - - MEMORY_RECALLS = [ - # (question, [keywords that must appear in reply]) - ("What is my name?", [_mem_name.lower()]), - ("Where do I live?", [_mem_city.lower()]), - ("Do I have any food allergies?", [_mem_allergy.lower()]), - ("What is my job?", [_mem_job[0].split()[0].lower()]), - ("What programming language do I prefer?", [_mem_lang.lower()]), - ("Do I have any pets?", [_mem_pet_name.lower()]), - ("Am I vegetarian or do I eat meat?", ["vegetarian"]), - ("What city am I in?", [_mem_city.lower()]), - ("Tell me what you know about me", [_mem_name.lower(), _mem_city.lower()]), - ("What's the name of my pet?", [_mem_pet_name.lower()]), - ] - - MEMORY_STORE_TIMEOUT = 180 # seconds per fact - MEMORY_RECALL_TIMEOUT = 180 # seconds per question - - # ── Store facts ────────────────────────────────────────────────────────── - print(f" Storing {len(MEMORY_FACTS)} facts...") - store_ok = 0 - for i, fact in enumerate(MEMORY_FACTS, 1): - print(f" [mem-store-{i:02d}] {fact!r}") - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": fact, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - print(f" → [{FAIL}] POST returned {status}") - continue - except Exception as e: - print(f" → [{FAIL}] POST error: {e}") - continue - - found = wait_for(f"mem-store-{i:02d}", fact, timeout_s=MEMORY_STORE_TIMEOUT, need_memory=True) - if found: - store_ok += 1 - print(f" → [{PASS}] stored tier={found['tier']} mem={found['memory_s']}s") - else: - print(f" → [{FAIL}] timeout") - - report(f"All memory facts stored ({store_ok}/{len(MEMORY_FACTS)})", - store_ok == len(MEMORY_FACTS)) - - # Wait for async memory extraction to settle — poll Qdrant until point count stabilises - print(f"\n Waiting for memory extraction to settle (up to 60s)...") - _prev_count = -1 - _stable_ticks = 0 - for _ in range(30): - time.sleep(2) - try: - _, body = get(f"{QDRANT}/collections/adolf_memories") - _cur_count = json.loads(body).get("result", {}).get("points_count", 0) - except Exception: - _cur_count = _prev_count - if _cur_count == _prev_count: - _stable_ticks += 1 - if _stable_ticks >= 3: # stable for 6s - break - else: - _stable_ticks = 0 - _prev_count = _cur_count - print(f" Memory settled: {_cur_count} points in Qdrant") - - # ── Recall questions ───────────────────────────────────────────────────── - print(f"\n Querying with {len(MEMORY_RECALLS)} recall questions...") - recall_results = [] # (question, keywords, reply_text, passed) - - for i, (question, keywords) in enumerate(MEMORY_RECALLS, 1): - print(f" [mem-recall-{i:02d}] {question!r}") - - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": question, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - print(f" → [{FAIL}] POST returned {status}") - recall_results.append((question, keywords, None, False)) - continue - except Exception as e: - print(f" → [{FAIL}] POST error: {e}") - recall_results.append((question, keywords, None, False)) - continue - - t_start = time.monotonic() - found = None - while time.monotonic() - t_start < MEMORY_RECALL_TIMEOUT: - since = int(time.monotonic() - t_start) + 30 - lines = fetch_logs(since_s=since) - found = parse_run_block(lines, question) - if found: - break - time.sleep(2) - - if not found: - print(f" → [{FAIL}] timeout") - recall_results.append((question, keywords, None, False)) - continue - - reply_text = (found.get("reply_text") or "").lower() - hit_keywords = [kw for kw in keywords if kw.lower() in reply_text] - passed = len(hit_keywords) == len(keywords) - tag_str = PASS if passed else WARN - missing = [kw for kw in keywords if kw.lower() not in reply_text] - detail = f"tier={found['tier']} lat={found['reply_total']:.1f}s" - if missing: - detail += f" missing keywords: {missing}" - print(f" → [{tag_str}] {detail}") - recall_results.append((question, keywords, found.get("reply_text"), passed)) - - time.sleep(1) - - # Summary - print(f"\n {'#':<4} {'Pass':<5} {'Question':<45} {'Keywords'}") - print(f" {'─'*4} {'─'*5} {'─'*45} {'─'*30}") - for idx, (q, kws, reply, ok) in enumerate(recall_results, 1): - ok_str = "✓" if ok else "✗" - print(f" {ok_str} {idx:<3} {'yes' if ok else 'no':<5} {q[:45]:<45} {kws}") - - recall_pass = sum(1 for _, _, _, ok in recall_results if ok) - total_recall = len(recall_results) - print(f"\n Memory recall score: {recall_pass}/{total_recall}") - - report(f"Memory recall ({recall_pass}/{total_recall} keywords found)", - recall_pass == total_recall, - f"{recall_pass}/{total_recall} questions had all expected keywords in reply") - - -# ── 14. Deduplication test — same fact stored twice must not grow Qdrant by 2 ─ -if _run_memory: - print(f"\n[{INFO}] 14. Memory deduplication test") - print(f" Sends the same fact twice — Qdrant point count must not increase by 2") - print(f" Chat ID: {CHAT_ID}") - print() - - DEDUP_TIMEOUT = 120 - - _dedup_fact = f"My lucky number is {random.randint(1000, 9999)}" - print(f" Fact: {_dedup_fact!r}") - - def _qdrant_count_dedup(): - try: - _, body = get(f"{QDRANT}/collections/adolf_memories") - return json.loads(body).get("result", {}).get("points_count", 0) - except Exception: - return 0 - - pts_before = _qdrant_count_dedup() - print(f" Qdrant points before: {pts_before}") - - # Send fact the first time - print(f" [dedup-1] sending fact (first time)") - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - report("Dedup: first POST accepted", False, f"status={status}") - else: - found1 = wait_for("dedup-1", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True) - if found1: - print(f" [dedup-1] stored tier={found1['tier']} mem={found1['memory_s']}s") - else: - print(f" [dedup-1] timeout") - except Exception as e: - report("Dedup: first POST accepted", False, str(e)) - found1 = None - - pts_after_first = _qdrant_count_dedup() - new_first = pts_after_first - pts_before - print(f" Qdrant after first send: {pts_before} → {pts_after_first} (+{new_first})") - - # Send exact same fact again - print(f" [dedup-2] sending same fact (second time)") - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - report("Dedup: second POST accepted", False, f"status={status}") - else: - found2 = wait_for("dedup-2", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True) - if found2: - print(f" [dedup-2] stored tier={found2['tier']} mem={found2['memory_s']}s") - else: - print(f" [dedup-2] timeout") - except Exception as e: - report("Dedup: second POST accepted", False, str(e)) - - pts_after_second = _qdrant_count_dedup() - new_second = pts_after_second - pts_after_first - print(f" Qdrant after second send: {pts_after_first} → {pts_after_second} (+{new_second})") - - # Pass: second store added no MORE points than the first (NOOP or UPDATE, not ADD) - # If first send stored 0 points (fact too trivial), dedup is vacuously satisfied. - dedup_ok = new_second <= new_first - report( - "Deduplication: second identical fact not added to Qdrant", - dedup_ok, - f"first send: +{new_first} pts, second send: +{new_second} pts (want second ≤ first)", - ) - - -# ── summary ─────────────────────────────────────────────────────────────────── -print(f"\n{'─'*55}") -total = len(results) -passed = sum(1 for _, ok in results if ok) -failed = total - passed -print(f"Results: {passed}/{total} passed", end="") -if failed: - print(f" ({failed} failed)\n") - print("Failed checks:") - for name, ok in results: - if not ok: - print(f" - {name}") -else: - print(" — all good") -print() - -# Print benchmark reference -print(f"[{INFO}] Benchmark questions reference:") -for tier_name, questions in BENCHMARK.items(): - print(f"\n {tier_name.upper()} ({len(questions)} questions):") - for j, q in enumerate(questions, 1): - print(f" {j:2d}. {q}") -print() diff --git a/tests/integration/test_routing.py b/tests/integration/test_routing.py new file mode 100644 index 0000000..d6257af --- /dev/null +++ b/tests/integration/test_routing.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +""" +Adolf tier routing benchmark. + +Tests: + easy — 10 questions that must route to 'light' tier + medium — 11 questions that must route to 'medium' (light acceptable for some; complex = fail) + hard — 10 /think questions that must route to 'complex' (medium fallback acceptable) + +Usage: + python3 test_routing.py [--chat-id CHAT_ID] + [--easy-only] # only easy benchmark + [--medium-only] # only medium benchmark + [--hard-only] # only hard benchmark +""" + +import argparse +import sys +import time + +from common import ( + DEEPAGENTS, COMPOSE_FILE, DEFAULT_CHAT_ID, + BENCHMARK, + INFO, PASS, FAIL, WARN, + report, print_summary, + post_json, fetch_logs, + parse_run_block, +) + +# ── args ────────────────────────────────────────────────────────────────────── +parser = argparse.ArgumentParser(description="Adolf routing benchmark") +parser.add_argument("--chat-id", default=DEFAULT_CHAT_ID) +parser.add_argument("--easy-only", action="store_true") +parser.add_argument("--medium-only", action="store_true") +parser.add_argument("--hard-only", action="store_true") +args = parser.parse_args() + +CHAT_ID = args.chat_id +_only = args.easy_only or args.medium_only or args.hard_only +_run_easy = not _only or args.easy_only +_run_medium = not _only or args.medium_only +_run_hard = not _only or args.hard_only + +results = [] + + +# ── easy benchmark ──────────────────────────────────────────────────────────── +if _run_easy: + print(f"\n[{INFO}] Easy routing benchmark") + print(f" {len(BENCHMARK['easy'])} questions — all must route to 'light'") + print(f" Chat ID: {CHAT_ID}") + print() + + bench_results = [] + LIGHT_TIMEOUT = 60 + + for i, question in enumerate(BENCHMARK["easy"], 1): + tag = f"easy-{i:02d}" + print(f" [{tag}] {question[:55]!r}") + + t_send = time.monotonic() + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": question, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + print(f" → [{FAIL}] POST returned {status}") + bench_results.append((question, "?", None, False)) + continue + except Exception as e: + print(f" → [{FAIL}] POST error: {e}") + bench_results.append((question, "?", None, False)) + continue + + t_start = time.monotonic() + found = None + while time.monotonic() - t_start < LIGHT_TIMEOUT: + since = int(time.monotonic() - t_start) + 30 + lines = fetch_logs(since_s=since) + found = parse_run_block(lines, question) + if found: + break + time.sleep(1) + + if not found: + print(f" → [{FAIL}] no reply within {LIGHT_TIMEOUT}s") + bench_results.append((question, "timeout", None, False)) + continue + + tier = found.get("tier", "unknown") + is_light = (tier == "light") + tag_str = PASS if is_light else FAIL + print(f" → [{tag_str}] tier={tier} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s") + bench_results.append((question, tier, found["reply_total"], is_light)) + time.sleep(1) + + print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}") + print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*50}") + for idx, (q, tier, lat, ok) in enumerate(bench_results, 1): + lat_str = f"{lat:.1f}s" if lat is not None else "timeout" + ok_str = "✓" if ok else "✗" + print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:50]!r}") + + light_count = sum(1 for _, _, _, ok in bench_results if ok) + total_bench = len(bench_results) + lats = [lat for _, _, lat, ok in bench_results if ok and lat is not None] + avg_lat = sum(lats) / len(lats) if lats else 0 + + print(f"\n Light-path score: {light_count}/{total_bench}") + if lats: + print(f" Avg latency (light): {avg_lat:.1f}s min={min(lats):.1f}s max={max(lats):.1f}s") + + report(results, f"All easy questions routed to light ({light_count}/{total_bench})", + light_count == total_bench, + f"{light_count}/{total_bench} via light path, avg {avg_lat:.1f}s") + + +# ── medium benchmark ────────────────────────────────────────────────────────── +if _run_medium: + print(f"\n[{INFO}] Medium routing benchmark") + print(f" {len(BENCHMARK['medium'])} questions — must route to medium (light ok for some; complex = fail)") + print(f" Chat ID: {CHAT_ID}") + print() + + LIGHT_ACCEPTABLE = { + "who won the last FIFA World Cup?", + "search for a good pasta carbonara recipe", + "find Python tutorials for beginners", + "search for the best coffee shops in Tokyo", + } + + med_results = [] + MEDIUM_TIMEOUT = 120 + + for i, question in enumerate(BENCHMARK["medium"], 1): + tag = f"med-{i:02d}" + print(f" [{tag}] {question[:60]!r}") + + t_send = time.monotonic() + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": question, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + print(f" → [{FAIL}] POST returned {status}") + med_results.append((question, "?", None, False)) + continue + except Exception as e: + print(f" → [{FAIL}] POST error: {e}") + med_results.append((question, "?", None, False)) + continue + + t_start = time.monotonic() + found = None + while time.monotonic() - t_start < MEDIUM_TIMEOUT: + since = int(time.monotonic() - t_start) + 60 + lines = fetch_logs(since_s=since) + found = parse_run_block(lines, question) + if found: + break + time.sleep(3) + + if not found: + print(f" → [{FAIL}] no reply within {MEDIUM_TIMEOUT}s") + med_results.append((question, "timeout", None, False)) + continue + + tier = found.get("tier", "unknown") + light_ok = question in LIGHT_ACCEPTABLE + + if tier == "medium": + correct, label, note = True, PASS, "medium ✓" + elif tier == "light": + correct = light_ok + label = PASS if light_ok else WARN + note = "light (acceptable)" if light_ok else "light (should be medium)" + elif tier == "complex": + correct, label, note = False, FAIL, "complex — wrong escalation" + else: + correct, label, note = False, FAIL, f"unknown tier {tier!r}" + + print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s") + med_results.append((question, tier, found["reply_total"], correct)) + time.sleep(1) + + print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}") + print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*55}") + for idx, (q, tier, lat, ok) in enumerate(med_results, 1): + lat_str = f"{lat:.1f}s" if lat is not None else "timeout" + ok_str = "✓" if ok else ("~" if tier == "light" else "✗") + print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:55]!r}") + + total_med = len(med_results) + medium_count = sum(1 for _, tier, _, _ in med_results if tier == "medium") + light_count = sum(1 for _, tier, _, _ in med_results if tier == "light") + complex_count = sum(1 for _, tier, _, _ in med_results if tier == "complex") + timeout_count = sum(1 for _, tier, _, _ in med_results if tier == "timeout") + light_misroute = sum(1 for q, tier, _, _ in med_results + if tier == "light" and q not in LIGHT_ACCEPTABLE) + lats = [lat for _, _, lat, _ in med_results if lat is not None] + + print(f"\n Breakdown: medium={medium_count} light={light_count} " + f"complex={complex_count} timeout={timeout_count}") + if light_misroute: + print(f" [{WARN}] {light_misroute} question(s) answered via light when medium expected") + if lats: + print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s") + + report(results, + f"Medium questions: no complex escalation ({medium_count + light_count}/{total_med} routed)", + complex_count == 0, + f"medium={medium_count} light={light_count} complex={complex_count} timeout={timeout_count}") + if timeout_count: + report(results, f"Medium questions: all completed within {MEDIUM_TIMEOUT}s", False, + f"{timeout_count} question(s) timed out") + + +# ── hard benchmark ──────────────────────────────────────────────────────────── +if _run_hard: + print(f"\n[{INFO}] Hard routing benchmark") + print(f" {len(BENCHMARK['hard'])} /think questions — must route to 'complex'") + print(f" Acceptable fallback: 'medium' if VRAM eviction timed out") + print(f" Fail condition: tier=light or timeout") + print(f" Chat ID: {CHAT_ID}") + print() + + hard_results = [] + COMPLEX_TIMEOUT = 300 + _VRAM_ENTER = "[vram] enter_complex_mode" + _VRAM_EXIT = "[vram] exit_complex_mode" + + for i, question in enumerate(BENCHMARK["hard"], 1): + tag = f"hard-{i:02d}" + short_q = question[len("/think "):].strip()[:60] + print(f" [{tag}] /think {short_q!r}") + + t_send = time.monotonic() + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": question, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + print(f" → [{FAIL}] POST returned {status}") + hard_results.append((question, "?", None, False)) + continue + except Exception as e: + print(f" → [{FAIL}] POST error: {e}") + hard_results.append((question, "?", None, False)) + continue + + t_start = time.monotonic() + found = None + while time.monotonic() - t_start < COMPLEX_TIMEOUT: + since = int(time.monotonic() - t_start) + 90 + lines = fetch_logs(since_s=since) + found = parse_run_block(lines, question[len("/think "):].strip()) + if found: + break + time.sleep(5) + + elapsed = time.monotonic() - t_send + + if not found: + print(f" → [{FAIL}] no reply within {COMPLEX_TIMEOUT}s") + hard_results.append((question, "timeout", None, False)) + continue + + tier = found.get("tier", "unknown") + + if tier == "complex": + ok, label, note = True, PASS, "complex ✓" + elif tier == "medium": + ok, label, note = True, WARN, "medium (VRAM fallback — check [vram] logs)" + else: + ok, label, note = False, FAIL, f"tier={tier} — unexpected" + + lines_block = fetch_logs(since_s=int(elapsed) + 120) + recent = "\n".join(lines_block[-200:]) + vram_enter_seen = _VRAM_ENTER in recent + vram_note = "" + if tier == "complex": + vram_note = " [vram:flush✓]" if vram_enter_seen else f" [{WARN}:no vram flush log]" + + print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s{vram_note}") + hard_results.append((question, tier, found["reply_total"], ok)) + time.sleep(5) + + print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question (/think ...)'}") + print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*55}") + for idx, (q, tier, lat, ok) in enumerate(hard_results, 1): + lat_str = f"{lat:.1f}s" if lat is not None else "timeout" + ok_str = "✓" if tier == "complex" else ("~" if tier == "medium" else "✗") + short = q[len("/think "):].strip()[:55] + print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {short!r}") + + total_hard = len(hard_results) + complex_count = sum(1 for _, t, _, _ in hard_results if t == "complex") + medium_fb = sum(1 for _, t, _, _ in hard_results if t == "medium") + light_count = sum(1 for _, t, _, _ in hard_results if t == "light") + timeout_count = sum(1 for _, t, _, _ in hard_results if t == "timeout") + lats = [lat for _, _, lat, _ in hard_results if lat is not None] + + print(f"\n Breakdown: complex={complex_count} medium(fallback)={medium_fb} " + f"light={light_count} timeout={timeout_count}") + if medium_fb: + print(f" [{WARN}] {medium_fb} question(s) fell back to medium (VRAM eviction timeout)") + if light_count: + print(f" [{FAIL}] {light_count} question(s) routed to light — /think prefix not detected") + if lats: + print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s") + + report(results, + f"Hard questions routed to complex (not light) ({complex_count + medium_fb}/{total_hard})", + light_count == 0 and timeout_count == 0, + f"complex={complex_count} medium_fallback={medium_fb} light={light_count} timeout={timeout_count}") + + +# ── summary ─────────────────────────────────────────────────────────────────── +print_summary(results) +sys.exit(0 if all(ok for _, ok in results) else 1)