#!/usr/bin/env python3 """ Adolf pipeline integration test with end-to-end timing profiling. Tests: 1. Service health (deepagents, openmemory, grammy MCP SSE) 2. GPU Ollama models 3. CPU Ollama models 4. Qdrant collection + vector dims 5. SearXNG 6. Name store — "remember that your name is " 7. Qdrant point added after store 8. Name recall — "what is your name?" → reply contains 9. Timing profile + bottleneck report 10. Easy benchmark — 10 easy questions → all must route to light 11. Medium benchmark — 11 medium questions → must route to medium (or light, never complex) 12. Hard benchmark — 10 /think questions → all must route to complex; VRAM flush verified 13. Memory benchmark — store 5 facts, recall with 10 questions → verify keyword presence 14. Dedup test — same fact sent twice → Qdrant must not grow by 2 Usage: python3 test_pipeline.py [--chat-id CHAT_ID] [--bench-only] skip sections 1-9, run 10+11+12+13 [--easy-only] skip 1-9 and 11+12+13, run only section 10 [--medium-only] skip 1-9 and 10+12+13, run only section 11 [--hard-only] skip 1-9 and 10+11+13, run only section 12 [--memory-only] skip 1-9 and 10+11+12, run only section 13 [--no-bench] skip sections 10-13 Timing is extracted from deepagents container logs, not estimated from sleeps. """ import argparse import http.client import json import random import re import subprocess import sys import time import urllib.request # ── config ──────────────────────────────────────────────────────────────────── DEEPAGENTS = "http://localhost:8000" OPENMEMORY = "http://localhost:8765" GRAMMY_HOST = "localhost" GRAMMY_PORT = 3001 OLLAMA_GPU = "http://localhost:11436" OLLAMA_CPU = "http://localhost:11435" QDRANT = "http://localhost:6333" SEARXNG = "http://localhost:11437" COMPOSE_FILE = "/home/alvis/agap_git/adolf/docker-compose.yml" DEFAULT_CHAT_ID = "346967270" NAMES = [ "Maximilian", "Cornelius", "Zephyr", "Archibald", "Balthazar", "Ignatius", "Lysander", "Octavian", "Reginald", "Sylvester", ] # ── benchmark questions ─────────────────────────────────────────────────────── BENCHMARK = { "easy": [ "hi", "what is 2+2?", "what is the capital of France?", "tell me a short joke", "how are you doing today?", "thanks!", "what day comes after Wednesday?", "name the three primary colors", "is the sky blue?", "what does CPU stand for?", ], "medium": [ "what is the current weather in Berlin?", "find the latest news about artificial intelligence", "what is the current price of Bitcoin?", "search for a good pasta carbonara recipe", "what movies are in theaters this week?", "find Python tutorials for beginners", "who won the last FIFA World Cup?", "do you remember what we talked about before?", "search for the best coffee shops in Tokyo", "what is happening in the tech industry this week?", "what's the weather like today?", ], "hard": [ "/think compare the top 3 Python web frameworks (Django, FastAPI, Flask) and recommend one for a production REST API", "/think research the history of artificial intelligence and create a timeline of key milestones", "/think plan a 7-day trip to Japan with daily itinerary, accommodation suggestions, and budget breakdown", "/think analyze microservices vs monolithic architecture: pros, cons, and when to choose each", "/think write a Python script that reads a CSV file, cleans the data, and generates summary statistics", "/think research quantum computing: explain the key concepts and how it differs from classical computing", "/think compare PostgreSQL, MongoDB, and Redis — when to use each and what are the trade-offs?", "/think create a comprehensive Docker deployment guide covering best practices for production", "/think research climate change: summarize the latest IPCC findings and key data points", "/think design a REST API with authentication, rate limiting, and proper error handling — provide architecture and code outline", ], } PASS = "\033[32mPASS\033[0m" FAIL = "\033[31mFAIL\033[0m" INFO = "\033[36mINFO\033[0m" WARN = "\033[33mWARN\033[0m" results = [] timings = {} # label → float seconds | None # ── helpers ─────────────────────────────────────────────────────────────────── def report(name, ok, detail=""): tag = PASS if ok else FAIL print(f" [{tag}] {name}" + (f" — {detail}" if detail else "")) results.append((name, ok)) def tf(v): """Format timing value.""" return f"{v:6.2f}s" if v is not None else " n/a" def get(url, timeout=5): with urllib.request.urlopen(urllib.request.Request(url), timeout=timeout) as r: return r.status, r.read().decode() def post_json(url, payload, timeout=10): data = json.dumps(payload).encode() req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST") with urllib.request.urlopen(req, timeout=timeout) as r: return r.status, json.loads(r.read().decode()) def check_sse(host, port, path): try: conn = http.client.HTTPConnection(host, port, timeout=5) conn.request("GET", path, headers={"Accept": "text/event-stream"}) r = conn.getresponse() conn.close() return r.status == 200, f"HTTP {r.status}" except Exception as e: return False, str(e) def qdrant_count(): try: _, body = get(f"{QDRANT}/collections/adolf_memories") return json.loads(body).get("result", {}).get("points_count", 0) except Exception: return 0 def fetch_logs(since_s=600): """Return deepagents log lines from the last since_s seconds.""" try: r = subprocess.run( ["docker", "compose", "-f", COMPOSE_FILE, "logs", "deepagents", f"--since={int(since_s)}s", "--no-log-prefix"], capture_output=True, text=True, timeout=15, ) return r.stdout.splitlines() except Exception: return [] def parse_run_block(lines, msg_prefix): """ Scan log lines for the LAST '[agent] running: ' block. Extracts reply timing, tier, and memory timing from that block. Returns dict or None if the reply has not appeared in logs yet. Dict keys: reply_total, llm, send, tier, reply_text — from "[agent] replied in ..." memory_s — from "[memory] stored in ..." memory_error — True if "[memory] error" found """ search = msg_prefix[:50] start_idx = None for i, line in enumerate(lines): if "[agent] running:" in line and search in line: start_idx = i # keep updating — we want the LAST occurrence if start_idx is None: return None block = lines[start_idx:] last_ai_text = None reply_data = None for j, line in enumerate(block): # Track last non-tool AIMessage (the final reply) — truncated at 150 chars in logs, # used only as fallback if reply_text line is absent (older server versions) if "AIMessage:" in line and "→" not in line: txt = line.split("AIMessage:", 1)[-1].strip() if txt: last_ai_text = txt m = re.search(r"replied in ([\d.]+)s \(llm=([\d.]+)s, send=([\d.]+)s\)", line) if m: # Extract optional tier tag at end of line tier_m = re.search(r"\btier=(\w+)", line) tier = tier_m.group(1) if tier_m else "unknown" reply_data = { "reply_total": float(m.group(1)), "llm": float(m.group(2)), "send": float(m.group(3)), "tier": tier, "reply_text": last_ai_text, # may be overwritten by reply_text line below "memory_s": None, "memory_error": False, "_j": j, } break # Read full reply_text from dedicated log line (written immediately after replied-in line) if reply_data is not None: next_lines = block[reply_data["_j"] + 1: reply_data["_j"] + 3] for line in next_lines: if line.startswith("[agent] reply_text:"): reply_data["reply_text"] = line[len("[agent] reply_text:"):].strip() break if reply_data is None: return None # reply not in logs yet # Memory line can appear after the next "[agent] running:" — no stop condition for line in block[reply_data["_j"] + 1:]: mm = re.search(r"\[memory\] stored in ([\d.]+)s", line) if mm: reply_data["memory_s"] = float(mm.group(1)) break if "[memory] error" in line: reply_data["memory_error"] = True break return reply_data def wait_for(label, msg_prefix, timeout_s=200, need_memory=True): """ Poll deepagents logs until the message is fully processed. Shows a live progress line. Returns timing dict or None on timeout. """ t_start = time.monotonic() deadline = t_start + timeout_s tick = 0 last_result = None while time.monotonic() < deadline: # Window grows with elapsed time — never miss a line that appeared late since = int(time.monotonic() - t_start) + 90 lines = fetch_logs(since_s=since) result = parse_run_block(lines, msg_prefix) if result: last_result = result has_mem = result["memory_s"] is not None or result["memory_error"] if (not need_memory) or has_mem: elapsed = time.monotonic() - t_start print(f"\r [{label}] done after {elapsed:.0f}s{' ' * 30}") return result time.sleep(4) tick += 1 rem = int(deadline - time.monotonic()) if last_result: phase = "waiting for memory..." if need_memory else "done" else: phase = "waiting for LLM reply..." print(f"\r [{label}] {tick*4}s elapsed, {rem}s left — {phase} ", end="", flush=True) print(f"\r [{label}] TIMEOUT after {timeout_s}s{' ' * 30}") return None # ── args ────────────────────────────────────────────────────────────────────── parser = argparse.ArgumentParser(description="Adolf pipeline test") parser.add_argument("--chat-id", default=DEFAULT_CHAT_ID) parser.add_argument("--bench-only", action="store_true", help="Skip sections 1-9, run sections 10+11 (both benchmarks)") parser.add_argument("--easy-only", action="store_true", help="Skip sections 1-9 and 11, run only section 10 (easy benchmark)") parser.add_argument("--medium-only", action="store_true", help="Skip sections 1-9 and 10, run only section 11 (medium benchmark)") parser.add_argument("--hard-only", action="store_true", help="Skip sections 1-9 and 10+11, run only section 12 (hard benchmark)") parser.add_argument("--memory-only", action="store_true", help="Skip sections 1-9 and 10+11+12, run only section 13 (memory benchmark)") parser.add_argument("--no-bench", action="store_true", help="Skip sections 10-13 (all benchmarks)") args = parser.parse_args() CHAT_ID = args.chat_id # Derived flags for readability _skip_pipeline = args.bench_only or args.easy_only or args.medium_only or args.hard_only or args.memory_only _run_easy = not args.no_bench and not args.medium_only and not args.hard_only and not args.memory_only _run_medium = not args.no_bench and not args.easy_only and not args.hard_only and not args.memory_only _run_hard = not args.no_bench and not args.easy_only and not args.medium_only and not args.memory_only _run_memory = not args.no_bench and not args.easy_only and not args.medium_only and not args.hard_only random_name = random.choice(NAMES) if not _skip_pipeline: print(f"\n Test name : \033[1m{random_name}\033[0m") print(f" Chat ID : {CHAT_ID}") # ── 1. service health ───────────────────────────────────────────────────────── if not _skip_pipeline: print(f"\n[{INFO}] 1. Service health") t0 = time.monotonic() try: status, body = get(f"{DEEPAGENTS}/health") data = json.loads(body) ok = status == 200 and data.get("agent_ready") is True report("deepagents /health — agent_ready", ok, f"agent_ready={data.get('agent_ready')}") except Exception as e: report("deepagents /health", False, str(e)) ok, detail = check_sse("localhost", 8765, "/sse") report("openmemory /sse reachable", ok, detail) ok, detail = check_sse(GRAMMY_HOST, GRAMMY_PORT, "/sse") report("grammy /sse reachable", ok, detail) timings["health_check"] = time.monotonic() - t0 # ── 2. GPU Ollama ───────────────────────────────────────────────────────────── if not _skip_pipeline: print(f"\n[{INFO}] 2. GPU Ollama (port 11436)") t0 = time.monotonic() try: status, body = get(f"{OLLAMA_GPU}/api/tags") models = [m["name"] for m in json.loads(body).get("models", [])] has_qwen = any("qwen3" in m for m in models) report("GPU Ollama reachable", True, f"models: {models}") report("qwen3:8b present", has_qwen) except Exception as e: report("GPU Ollama reachable", False, str(e)) report("qwen3:8b present", False, "skipped") timings["gpu_ollama_ping"] = time.monotonic() - t0 # ── 3. CPU Ollama ───────────────────────────────────────────────────────────── if not _skip_pipeline: print(f"\n[{INFO}] 3. CPU Ollama (port 11435)") t0 = time.monotonic() try: status, body = get(f"{OLLAMA_CPU}/api/tags") models = [m["name"] for m in json.loads(body).get("models", [])] has_embed = any("nomic-embed-text" in m for m in models) report("CPU Ollama reachable", True, f"models: {models}") report("nomic-embed-text present", has_embed) except Exception as e: report("CPU Ollama reachable", False, str(e)) report("nomic-embed-text present", False, "skipped") timings["cpu_ollama_ping"] = time.monotonic() - t0 # ── 4. Qdrant ───────────────────────────────────────────────────────────────── if not _skip_pipeline: print(f"\n[{INFO}] 4. Qdrant (port 6333)") t0 = time.monotonic() try: status, body = get(f"{QDRANT}/collections") cols = [c["name"] for c in json.loads(body).get("result", {}).get("collections", [])] report("Qdrant reachable", True, f"collections: {cols}") report("adolf_memories collection exists", "adolf_memories" in cols) except Exception as e: report("Qdrant reachable", False, str(e)) report("adolf_memories collection exists", False, "skipped") try: status, body = get(f"{QDRANT}/collections/adolf_memories") info = json.loads(body).get("result", {}) dims = info.get("config", {}).get("params", {}).get("vectors", {}).get("size") report("vector dims = 768", dims == 768, f"got {dims}") except Exception as e: report("adolf_memories collection info", False, str(e)) timings["qdrant_ping"] = time.monotonic() - t0 # ── 5. SearXNG ──────────────────────────────────────────────────────────────── if not _skip_pipeline: print(f"\n[{INFO}] 5. SearXNG (port 11437)") t0 = time.monotonic() try: status, body = get(f"{SEARXNG}/search?q=test&format=json", timeout=15) elapsed = time.monotonic() - t0 n = len(json.loads(body).get("results", [])) report("SearXNG reachable + JSON results", status == 200 and n > 0, f"{n} results in {elapsed:.1f}s") report("SearXNG response < 5s", elapsed < 5, f"{elapsed:.2f}s") timings["searxng_latency"] = elapsed except Exception as e: report("SearXNG reachable", False, str(e)) report("SearXNG response < 5s", False, "skipped") timings["searxng_latency"] = None timings["searxng_check"] = time.monotonic() - t0 # ── 6–8. Name memory pipeline ───────────────────────────────────────────────── if not _skip_pipeline: print(f"\n[{INFO}] 6–8. Name memory pipeline") print(f" chat_id={CHAT_ID} name={random_name}") store_msg = f"remember that your name is {random_name}" recall_msg = "what is your name?" pts_before = qdrant_count() print(f" Qdrant points before: {pts_before}") # ── 6. Send store message ───────────────────────────────────────────────────── print(f"\n [store] '{store_msg}'") t_store = time.monotonic() try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": store_msg, "chat_id": CHAT_ID}, timeout=5) t_accept = time.monotonic() - t_store report("POST /chat (store) returns 202 immediately", status == 202 and t_accept < 1, f"status={status}, t={t_accept:.3f}s") timings["store_http_accept"] = t_accept except Exception as e: report("POST /chat (store)", False, str(e)) sys.exit(1) store = wait_for("store", store_msg, timeout_s=220, need_memory=True) if store: timings["store_llm"] = store["llm"] timings["store_send"] = store["send"] timings["store_reply"] = store["reply_total"] timings["store_memory"] = store["memory_s"] report("Agent replied to store message", True, f"{store['reply_total']:.1f}s total llm={store['llm']:.1f}s send={store['send']:.1f}s tier={store['tier']}") if store["memory_s"] is not None: report("Memory stored without error", True, f"{store['memory_s']:.1f}s") elif store["memory_error"]: report("Memory stored without error", False, "error in [memory] log") else: report("Memory stored without error", False, "not found in logs (still running?)") print(f" Store reply: {store['reply_text']!r}") else: report("Agent replied to store message", False, "timeout") report("Memory stored without error", False, "timeout") sys.exit(1) # ── 7. Verify Qdrant ────────────────────────────────────────────────────────── pts_after = qdrant_count() new_pts = pts_after - pts_before report("New memory point(s) added to Qdrant", new_pts > 0, f"{pts_before} → {pts_after} (+{new_pts})") timings["qdrant_new_points"] = new_pts # ── 8. Send recall message ──────────────────────────────────────────────────── print(f"\n [recall] '{recall_msg}'") t_recall = time.monotonic() try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": recall_msg, "chat_id": CHAT_ID}, timeout=5) t_accept2 = time.monotonic() - t_recall report("POST /chat (recall) returns 202 immediately", status == 202 and t_accept2 < 1, f"status={status}, t={t_accept2:.3f}s") timings["recall_http_accept"] = t_accept2 except Exception as e: report("POST /chat (recall)", False, str(e)) recall = wait_for("recall", recall_msg, timeout_s=160, need_memory=False) if recall: timings["recall_llm"] = recall["llm"] timings["recall_send"] = recall["send"] timings["recall_reply"] = recall["reply_total"] report("Agent replied to recall message", True, f"{recall['reply_total']:.1f}s total llm={recall['llm']:.1f}s send={recall['send']:.1f}s tier={recall['tier']}") reply_text = recall["reply_text"] or "" name_in_reply = random_name.lower() in reply_text.lower() report(f"Reply contains '{random_name}'", name_in_reply, f"reply: {reply_text[:120]!r}") else: report("Agent replied to recall message", False, "timeout") report(f"Reply contains '{random_name}'", False, "no reply") # ── 9. Timing profile ───────────────────────────────────────────────────────── if not _skip_pipeline: print(f"\n[{INFO}] 9. Timing profile") W = 36 print(f"\n {'Stage':<{W}} {'Time':>8}") print(f" {'─'*W} {'─'*8}") rows_store = [ ("[GPU] HTTP accept — store turn", "store_http_accept"), ("[GPU] qwen3:Xb inference — store turn","store_llm"), ("[GPU] Telegram send — store turn", "store_send"), ("[GPU] Total reply latency — store", "store_reply"), ("[GPU] qwen2.5:1.5b+embed — async mem", "store_memory"), ] rows_recall = [ ("[GPU] HTTP accept — recall turn", "recall_http_accept"), ("[GPU] qwen3:Xb inference — recall", "recall_llm"), ("[GPU] Telegram send — recall turn", "recall_send"), ("[GPU] Total reply latency — recall", "recall_reply"), ] for label, key in rows_store: v = timings.get(key) print(f" {label:<{W}} {tf(v):>8}") print(f" {'─'*W} {'─'*8}") for label, key in rows_recall: v = timings.get(key) print(f" {label:<{W}} {tf(v):>8}") # Bottleneck bar chart print(f"\n Bottleneck analysis (each █ ≈ 5s):") print(f" {'─'*(W+12)}") candidates = [ ("[GPU] qwen3:Xb — store reply ", timings.get("store_llm") or 0), ("[GPU] qwen3:Xb — recall reply", timings.get("recall_llm") or 0), ("[GPU] qwen2.5:1.5b+embed (async)", timings.get("store_memory") or 0), ("[net] SearXNG ", timings.get("searxng_latency") or 0), ] candidates.sort(key=lambda x: x[1], reverse=True) for label, t in candidates: bar = "█" * min(int(t / 5), 24) pct = "" total_pipeline = (timings.get("store_reply") or 0) + (timings.get("store_memory") or 0) if total_pipeline > 0: pct = f" {t/total_pipeline*100:4.0f}%" print(f" {label} {t:6.1f}s {bar}{pct}") print() # ── 10. Tier routing benchmark — easy questions → light path ────────────────── if _run_easy: print(f"\n[{INFO}] 10. Tier routing benchmark") print(f" Sending {len(BENCHMARK['easy'])} easy questions — all must route to 'light'") print(f" Chat ID: {CHAT_ID}") print() bench_results = [] # list of (question, tier, latency_s, ok) LIGHT_TIMEOUT = 60 # seconds — light is fast but may queue behind prior messages for i, question in enumerate(BENCHMARK["easy"], 1): tag = f"easy-{i:02d}" short_q = question[:55] print(f" [{tag}] {short_q!r}") # Send t_send = time.monotonic() try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": question, "chat_id": CHAT_ID}, timeout=5) if status != 202: print(f" → [{FAIL}] POST returned {status}") bench_results.append((question, "?", None, False)) continue except Exception as e: print(f" → [{FAIL}] POST error: {e}") bench_results.append((question, "?", None, False)) continue # Poll for reply t_start = time.monotonic() found = None while time.monotonic() - t_start < LIGHT_TIMEOUT: since = int(time.monotonic() - t_start) + 30 lines = fetch_logs(since_s=since) found = parse_run_block(lines, question) if found: break time.sleep(1) elapsed = time.monotonic() - t_send if not found: print(f" → [{FAIL}] no reply within {LIGHT_TIMEOUT}s") bench_results.append((question, "timeout", None, False)) continue tier = found.get("tier", "unknown") is_light = (tier == "light") tag_str = PASS if is_light else FAIL print(f" → [{tag_str}] tier={tier} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s") bench_results.append((question, tier, found["reply_total"], is_light)) # Brief pause between questions to keep logs clean time.sleep(1) # Summary table print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}") print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*50}") for idx, (q, tier, lat, ok) in enumerate(bench_results, 1): lat_str = f"{lat:.1f}s" if lat is not None else "timeout" ok_str = "✓" if ok else "✗" print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:50]!r}") light_count = sum(1 for _, _, _, ok in bench_results if ok) total_bench = len(bench_results) lats = [lat for _, _, lat, ok in bench_results if ok and lat is not None] avg_lat = sum(lats) / len(lats) if lats else 0 print(f"\n Light-path score: {light_count}/{total_bench}") if lats: print(f" Avg latency (light): {avg_lat:.1f}s " f"min={min(lats):.1f}s max={max(lats):.1f}s") report(f"All easy questions routed to light ({light_count}/{total_bench})", light_count == total_bench, f"{light_count}/{total_bench} via light path, avg {avg_lat:.1f}s") # ── 11. Medium benchmark — medium questions → medium or light, never complex ── if _run_medium: print(f"\n[{INFO}] 11. Medium routing benchmark") print(f" Sending {len(BENCHMARK['medium'])} medium questions") print(f" Expected: tier=medium (needs tools). Light is acceptable for factual questions.") print(f" Fail condition: tier=complex or timeout.") print(f" Chat ID: {CHAT_ID}") print() # Questions where light is a valid alternative (model may know from training data) LIGHT_ACCEPTABLE = { "who won the last FIFA World Cup?", "search for a good pasta carbonara recipe", "find Python tutorials for beginners", "search for the best coffee shops in Tokyo", } med_results = [] # list of (question, tier, latency_s, correct) MEDIUM_TIMEOUT = 120 # seconds — medium takes 20-100s, allow for queue buildup for i, question in enumerate(BENCHMARK["medium"], 1): tag = f"med-{i:02d}" short_q = question[:60] print(f" [{tag}] {short_q!r}") # Send t_send = time.monotonic() try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": question, "chat_id": CHAT_ID}, timeout=5) if status != 202: print(f" → [{FAIL}] POST returned {status}") med_results.append((question, "?", None, False)) continue except Exception as e: print(f" → [{FAIL}] POST error: {e}") med_results.append((question, "?", None, False)) continue # Poll for reply t_start = time.monotonic() found = None while time.monotonic() - t_start < MEDIUM_TIMEOUT: since = int(time.monotonic() - t_start) + 60 lines = fetch_logs(since_s=since) found = parse_run_block(lines, question) if found: break time.sleep(3) elapsed = time.monotonic() - t_send if not found: print(f" → [{FAIL}] no reply within {MEDIUM_TIMEOUT}s") med_results.append((question, "timeout", None, False)) continue tier = found.get("tier", "unknown") light_ok = question in LIGHT_ACCEPTABLE if tier == "medium": correct = True label = PASS note = "medium ✓" elif tier == "light": correct = light_ok # light is only acceptable for certain questions label = WARN if not light_ok else PASS note = "light (acceptable)" if light_ok else "light (should be medium)" elif tier == "complex": correct = False label = FAIL note = "complex — wrong escalation" else: correct = False label = FAIL note = f"unknown tier {tier!r}" print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s") med_results.append((question, tier, found["reply_total"], correct)) # Brief pause between questions time.sleep(1) # Summary table print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}") print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*55}") for idx, (q, tier, lat, ok) in enumerate(med_results, 1): lat_str = f"{lat:.1f}s" if lat is not None else "timeout" ok_str = "✓" if ok else ("~" if tier == "light" else "✗") print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:55]!r}") total_med = len(med_results) medium_count = sum(1 for _, tier, _, _ in med_results if tier == "medium") light_count = sum(1 for _, tier, _, _ in med_results if tier == "light") complex_count = sum(1 for _, tier, _, _ in med_results if tier == "complex") timeout_count = sum(1 for _, tier, _, _ in med_results if tier == "timeout") light_misroute = sum( 1 for q, tier, _, _ in med_results if tier == "light" and q not in LIGHT_ACCEPTABLE ) lats = [lat for _, _, lat, _ in med_results if lat is not None] correct_count = medium_count + (light_count - light_misroute) print(f"\n Breakdown: medium={medium_count} light={light_count} complex={complex_count} timeout={timeout_count}") if light_misroute: print(f" [{WARN}] {light_misroute} question(s) answered via light when medium expected (check reply quality)") if lats: print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s") no_complex = complex_count == 0 no_timeout = timeout_count == 0 all_ok = no_complex and no_timeout report( f"Medium questions: no complex escalation ({medium_count + light_count}/{total_med} routed)", no_complex, f"medium={medium_count} light={light_count} complex={complex_count} timeout={timeout_count}", ) if not no_timeout: report( f"Medium questions: all completed within {MEDIUM_TIMEOUT}s", False, f"{timeout_count} question(s) timed out (increase MEDIUM_TIMEOUT or check agent logs)", ) # ── 12. Hard benchmark — /think questions → complex tier + VRAM flush verified ─ if _run_hard: print(f"\n[{INFO}] 12. Hard routing benchmark") print(f" Sending {len(BENCHMARK['hard'])} /think questions — all must route to 'complex'") print(f" Verifies: /think prefix → force_complex=True → VRAM flush → qwen3:8b inference") print(f" Acceptable fallback: 'medium' if VRAM eviction timed out (logged warning)") print(f" Fail condition: tier=light or timeout") print(f" Chat ID: {CHAT_ID}") print() hard_results = [] # list of (question, tier, latency_s, ok) COMPLEX_TIMEOUT = 300 # seconds — complex takes 60-180s + VRAM flush overhead # Log markers we expect to see for complex path _VRAM_ENTER = "[vram] enter_complex_mode" _VRAM_EXIT = "[vram] exit_complex_mode" for i, question in enumerate(BENCHMARK["hard"], 1): tag = f"hard-{i:02d}" # Strip /think prefix for display short_q = question[len("/think "):].strip()[:60] print(f" [{tag}] /think {short_q!r}") # Snapshot log window start time t_send = time.monotonic() try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": question, "chat_id": CHAT_ID}, timeout=5) if status != 202: print(f" → [{FAIL}] POST returned {status}") hard_results.append((question, "?", None, False)) continue except Exception as e: print(f" → [{FAIL}] POST error: {e}") hard_results.append((question, "?", None, False)) continue # Poll for reply t_start = time.monotonic() found = None while time.monotonic() - t_start < COMPLEX_TIMEOUT: since = int(time.monotonic() - t_start) + 90 lines = fetch_logs(since_s=since) found = parse_run_block(lines, question[len("/think "):].strip()) if found: break time.sleep(5) elapsed = time.monotonic() - t_send if not found: print(f" → [{FAIL}] no reply within {COMPLEX_TIMEOUT}s") hard_results.append((question, "timeout", None, False)) continue tier = found.get("tier", "unknown") if tier == "complex": ok = True label = PASS note = "complex ✓" elif tier == "medium": # Acceptable fallback if VRAM eviction timed out ok = True label = WARN note = "medium (VRAM fallback — check [vram] logs)" else: ok = False label = FAIL note = f"tier={tier} — unexpected" # Check if VRAM enter/exit were logged for this block lines_block = fetch_logs(since_s=int(elapsed) + 120) msg_key = question[len("/think "):].strip()[:40] vram_enter_seen = any(_VRAM_ENTER in ln for ln in lines_block if msg_key in ln or any(msg_key[:15] in prev_ln for prev_ln in lines_block[max(0, lines_block.index(ln)-10):lines_block.index(ln)])) # Simpler: just check the recent log window for enter/exit markers recent = "\n".join(lines_block[-200:]) vram_enter_seen = _VRAM_ENTER in recent vram_exit_seen = _VRAM_EXIT in recent vram_note = "" if tier == "complex": if vram_enter_seen: vram_note = " [vram:flush✓]" else: vram_note = f" [{WARN}:no vram flush log]" print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s{vram_note}") hard_results.append((question, tier, found["reply_total"], ok)) # Pause to let exit_complex_mode background task complete before next question # (flushes qwen3:8b and pre-warms 4b+router — avoids VRAM conflict on next enter) time.sleep(5) # Summary table print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question (/think ...)'}") print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*55}") for idx, (q, tier, lat, ok) in enumerate(hard_results, 1): lat_str = f"{lat:.1f}s" if lat is not None else "timeout" ok_str = "✓" if tier == "complex" else ("~" if tier == "medium" else "✗") short = q[len("/think "):].strip()[:55] print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {short!r}") total_hard = len(hard_results) complex_count = sum(1 for _, t, _, _ in hard_results if t == "complex") medium_fb = sum(1 for _, t, _, _ in hard_results if t == "medium") light_count = sum(1 for _, t, _, _ in hard_results if t == "light") timeout_count = sum(1 for _, t, _, _ in hard_results if t == "timeout") lats = [lat for _, _, lat, _ in hard_results if lat is not None] print(f"\n Breakdown: complex={complex_count} medium(fallback)={medium_fb} light={light_count} timeout={timeout_count}") if medium_fb: print(f" [{WARN}] {medium_fb} question(s) fell back to medium (VRAM eviction timeout)") if light_count: print(f" [{FAIL}] {light_count} question(s) routed to light — /think prefix not detected") if lats: print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s") no_light = light_count == 0 no_timeout = timeout_count == 0 report( f"Hard questions routed to complex (not light) ({complex_count + medium_fb}/{total_hard})", no_light and no_timeout, f"complex={complex_count} medium_fallback={medium_fb} light={light_count} timeout={timeout_count}", ) # ── 13. Memory benchmark — store facts, recall with keyword verification ─────── if _run_memory: _mem_name = random.choice([ "Alice", "Bruno", "Camille", "Diego", "Elena", "Farid", "Greta", "Hiroshi", "Irina", "Jonas", ]) _mem_city = random.choice([ "Tokyo", "Berlin", "Cairo", "Sydney", "Oslo", "Nairobi", "Lisbon", "Seoul", "Montreal", "Bangkok", ]) _mem_allergy = random.choice(["nuts", "gluten", "dairy", "shellfish", "eggs"]) _mem_job = random.choice([ ("software engineer", "startup"), ("data scientist", "research lab"), ("product manager", "tech company"), ("DevOps engineer", "cloud provider"), ]) _mem_lang = random.choice(["Python", "Rust", "Go", "TypeScript", "Kotlin"]) _mem_pet_name = random.choice([ "Whiskers", "Biscuit", "Mango", "Pebble", "Shadow", "Noodle", "Cheddar", "Cosmo", "Pippin", "Ziggy", ]) print(f"\n[{INFO}] 13. Memory benchmark") print(f" name={_mem_name} city={_mem_city} allergy={_mem_allergy} " f"job={_mem_job[0]}@{_mem_job[1]} lang={_mem_lang} pet={_mem_pet_name}") print(f" Storing 5 facts, then querying with 10 recall questions") print(f" Chat ID: {CHAT_ID}") print() # Wipe Qdrant collection and restart openmemory to eliminate stale data interference. # Deleting the collection alone causes 404s — openmemory holds a live reference to it. try: import urllib.request as _ur _req = _ur.Request(f"{QDRANT}/collections/adolf_memories", method="DELETE") with _ur.urlopen(_req, timeout=5): pass print(f" [{INFO}] Wiped adolf_memories collection") except Exception as e: print(f" [{WARN}] Could not wipe collection: {e}") try: subprocess.run( ["docker", "compose", "-f", COMPOSE_FILE, "restart", "openmemory"], capture_output=True, timeout=30, ) time.sleep(6) # wait for openmemory to reinitialize and recreate collection print(f" [{INFO}] Restarted openmemory — fresh collection ready") except Exception as e: print(f" [{WARN}] Could not restart openmemory: {e}") MEMORY_FACTS = [ f"My name is {_mem_name} and I live in {_mem_city}", f"I prefer vegetarian food and I'm allergic to {_mem_allergy}", f"I work as a {_mem_job[0]} at a {_mem_job[1]}", f"My favorite programming language is {_mem_lang}", f"I have a cat named {_mem_pet_name}", ] MEMORY_RECALLS = [ # (question, [keywords that must appear in reply]) ("What is my name?", [_mem_name.lower()]), ("Where do I live?", [_mem_city.lower()]), ("Do I have any food allergies?", [_mem_allergy.lower()]), ("What is my job?", [_mem_job[0].split()[0].lower()]), ("What programming language do I prefer?", [_mem_lang.lower()]), ("Do I have any pets?", [_mem_pet_name.lower()]), ("Am I vegetarian or do I eat meat?", ["vegetarian"]), ("What city am I in?", [_mem_city.lower()]), ("Tell me what you know about me", [_mem_name.lower(), _mem_city.lower()]), ("What's the name of my pet?", [_mem_pet_name.lower()]), ] MEMORY_STORE_TIMEOUT = 180 # seconds per fact MEMORY_RECALL_TIMEOUT = 180 # seconds per question # ── Store facts ────────────────────────────────────────────────────────── print(f" Storing {len(MEMORY_FACTS)} facts...") store_ok = 0 for i, fact in enumerate(MEMORY_FACTS, 1): print(f" [mem-store-{i:02d}] {fact!r}") try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": fact, "chat_id": CHAT_ID}, timeout=5) if status != 202: print(f" → [{FAIL}] POST returned {status}") continue except Exception as e: print(f" → [{FAIL}] POST error: {e}") continue found = wait_for(f"mem-store-{i:02d}", fact, timeout_s=MEMORY_STORE_TIMEOUT, need_memory=True) if found: store_ok += 1 print(f" → [{PASS}] stored tier={found['tier']} mem={found['memory_s']}s") else: print(f" → [{FAIL}] timeout") report(f"All memory facts stored ({store_ok}/{len(MEMORY_FACTS)})", store_ok == len(MEMORY_FACTS)) # Wait for async memory extraction to settle — poll Qdrant until point count stabilises print(f"\n Waiting for memory extraction to settle (up to 60s)...") _prev_count = -1 _stable_ticks = 0 for _ in range(30): time.sleep(2) try: _, body = get(f"{QDRANT}/collections/adolf_memories") _cur_count = json.loads(body).get("result", {}).get("points_count", 0) except Exception: _cur_count = _prev_count if _cur_count == _prev_count: _stable_ticks += 1 if _stable_ticks >= 3: # stable for 6s break else: _stable_ticks = 0 _prev_count = _cur_count print(f" Memory settled: {_cur_count} points in Qdrant") # ── Recall questions ───────────────────────────────────────────────────── print(f"\n Querying with {len(MEMORY_RECALLS)} recall questions...") recall_results = [] # (question, keywords, reply_text, passed) for i, (question, keywords) in enumerate(MEMORY_RECALLS, 1): print(f" [mem-recall-{i:02d}] {question!r}") try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": question, "chat_id": CHAT_ID}, timeout=5) if status != 202: print(f" → [{FAIL}] POST returned {status}") recall_results.append((question, keywords, None, False)) continue except Exception as e: print(f" → [{FAIL}] POST error: {e}") recall_results.append((question, keywords, None, False)) continue t_start = time.monotonic() found = None while time.monotonic() - t_start < MEMORY_RECALL_TIMEOUT: since = int(time.monotonic() - t_start) + 30 lines = fetch_logs(since_s=since) found = parse_run_block(lines, question) if found: break time.sleep(2) if not found: print(f" → [{FAIL}] timeout") recall_results.append((question, keywords, None, False)) continue reply_text = (found.get("reply_text") or "").lower() hit_keywords = [kw for kw in keywords if kw.lower() in reply_text] passed = len(hit_keywords) == len(keywords) tag_str = PASS if passed else WARN missing = [kw for kw in keywords if kw.lower() not in reply_text] detail = f"tier={found['tier']} lat={found['reply_total']:.1f}s" if missing: detail += f" missing keywords: {missing}" print(f" → [{tag_str}] {detail}") recall_results.append((question, keywords, found.get("reply_text"), passed)) time.sleep(1) # Summary print(f"\n {'#':<4} {'Pass':<5} {'Question':<45} {'Keywords'}") print(f" {'─'*4} {'─'*5} {'─'*45} {'─'*30}") for idx, (q, kws, reply, ok) in enumerate(recall_results, 1): ok_str = "✓" if ok else "✗" print(f" {ok_str} {idx:<3} {'yes' if ok else 'no':<5} {q[:45]:<45} {kws}") recall_pass = sum(1 for _, _, _, ok in recall_results if ok) total_recall = len(recall_results) print(f"\n Memory recall score: {recall_pass}/{total_recall}") report(f"Memory recall ({recall_pass}/{total_recall} keywords found)", recall_pass == total_recall, f"{recall_pass}/{total_recall} questions had all expected keywords in reply") # ── 14. Deduplication test — same fact stored twice must not grow Qdrant by 2 ─ if _run_memory: print(f"\n[{INFO}] 14. Memory deduplication test") print(f" Sends the same fact twice — Qdrant point count must not increase by 2") print(f" Chat ID: {CHAT_ID}") print() DEDUP_TIMEOUT = 120 _dedup_fact = f"My lucky number is {random.randint(1000, 9999)}" print(f" Fact: {_dedup_fact!r}") def _qdrant_count_dedup(): try: _, body = get(f"{QDRANT}/collections/adolf_memories") return json.loads(body).get("result", {}).get("points_count", 0) except Exception: return 0 pts_before = _qdrant_count_dedup() print(f" Qdrant points before: {pts_before}") # Send fact the first time print(f" [dedup-1] sending fact (first time)") try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5) if status != 202: report("Dedup: first POST accepted", False, f"status={status}") else: found1 = wait_for("dedup-1", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True) if found1: print(f" [dedup-1] stored tier={found1['tier']} mem={found1['memory_s']}s") else: print(f" [dedup-1] timeout") except Exception as e: report("Dedup: first POST accepted", False, str(e)) found1 = None pts_after_first = _qdrant_count_dedup() new_first = pts_after_first - pts_before print(f" Qdrant after first send: {pts_before} → {pts_after_first} (+{new_first})") # Send exact same fact again print(f" [dedup-2] sending same fact (second time)") try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5) if status != 202: report("Dedup: second POST accepted", False, f"status={status}") else: found2 = wait_for("dedup-2", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True) if found2: print(f" [dedup-2] stored tier={found2['tier']} mem={found2['memory_s']}s") else: print(f" [dedup-2] timeout") except Exception as e: report("Dedup: second POST accepted", False, str(e)) pts_after_second = _qdrant_count_dedup() new_second = pts_after_second - pts_after_first print(f" Qdrant after second send: {pts_after_first} → {pts_after_second} (+{new_second})") # Pass: second store added no MORE points than the first (NOOP or UPDATE, not ADD) # If first send stored 0 points (fact too trivial), dedup is vacuously satisfied. dedup_ok = new_second <= new_first report( "Deduplication: second identical fact not added to Qdrant", dedup_ok, f"first send: +{new_first} pts, second send: +{new_second} pts (want second ≤ first)", ) # ── summary ─────────────────────────────────────────────────────────────────── print(f"\n{'─'*55}") total = len(results) passed = sum(1 for _, ok in results if ok) failed = total - passed print(f"Results: {passed}/{total} passed", end="") if failed: print(f" ({failed} failed)\n") print("Failed checks:") for name, ok in results: if not ok: print(f" - {name}") else: print(" — all good") print() # Print benchmark reference print(f"[{INFO}] Benchmark questions reference:") for tier_name, questions in BENCHMARK.items(): print(f"\n {tier_name.upper()} ({len(questions)} questions):") for j, q in enumerate(questions, 1): print(f" {j:2d}. {q}") print()