Split monolithic test_pipeline.py into focused integration test scripts

- common.py: shared config, URL constants, benchmark questions, all helpers
  (get, post_json, check_sse, qdrant_count, fetch_logs, parse_run_block, wait_for, etc.)
- test_health.py: service health checks (deepagents, bifrost, GPU/CPU Ollama, Qdrant, SearXNG)
- test_memory.py: name store/recall pipeline, memory benchmark (5 facts + 10 recalls), dedup test
- test_routing.py: easy/medium/hard tier routing benchmarks with --easy/medium/hard-only flags
- Removed test_pipeline.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Alvis
2026-03-12 16:02:57 +00:00
parent 50097d6092
commit 021104f510
6 changed files with 1255 additions and 1304 deletions

View File

@@ -14,19 +14,23 @@ docker compose up --build
python3 cli.py [--url http://localhost:8000] [--session cli-alvis] [--timeout 400] python3 cli.py [--url http://localhost:8000] [--session cli-alvis] [--timeout 400]
``` ```
**Run integration tests:** **Run integration tests** (from `tests/integration/`, require all Docker services running):
```bash ```bash
python3 test_pipeline.py [--chat-id CHAT_ID] python3 test_health.py # service health: deepagents, bifrost, Ollama, Qdrant, SearXNG
# Selective sections: python3 test_memory.py # name store/recall + memory benchmark + dedup
python3 test_pipeline.py --bench-only # routing + memory benchmarks only (sections 1013) python3 test_memory.py --name-only # only name store/recall pipeline
python3 test_pipeline.py --easy-only # light-tier routing benchmark python3 test_memory.py --bench-only # only 5-fact store + 10-question recall
python3 test_pipeline.py --medium-only # medium-tier routing benchmark python3 test_memory.py --dedup-only # only deduplication test
python3 test_pipeline.py --hard-only # complex-tier + VRAM flush benchmark
python3 test_pipeline.py --memory-only # memory store/recall/dedup benchmark python3 test_routing.py # all routing benchmarks (easy + medium + hard)
python3 test_pipeline.py --no-bench # service health + single name store/recall only python3 test_routing.py --easy-only # light-tier routing benchmark
python3 test_routing.py --medium-only # medium-tier routing benchmark
python3 test_routing.py --hard-only # complex-tier + VRAM flush benchmark
``` ```
Shared config and helpers are in `tests/integration/common.py`.
## Architecture ## Architecture
Adolf is a multi-channel personal assistant. All LLM inference is routed through **Bifrost**, an open-source Go-based LLM gateway that adds retry logic, failover, and observability in front of Ollama. Adolf is a multi-channel personal assistant. All LLM inference is routed through **Bifrost**, an open-source Go-based LLM gateway that adds retry logic, failover, and observability in front of Ollama.

273
tests/integration/common.py Normal file
View File

@@ -0,0 +1,273 @@
"""
Shared config, helpers, and utilities for Adolf integration tests.
"""
import http.client
import json
import re
import subprocess
import time
import urllib.request
# ── config ────────────────────────────────────────────────────────────────────
DEEPAGENTS = "http://localhost:8000"
BIFROST = "http://localhost:8080"
OPENMEMORY = "http://localhost:8765"
GRAMMY_HOST = "localhost"
GRAMMY_PORT = 3001
OLLAMA_GPU = "http://localhost:11436"
OLLAMA_CPU = "http://localhost:11435"
QDRANT = "http://localhost:6333"
SEARXNG = "http://localhost:11437"
COMPOSE_FILE = "/home/alvis/adolf/docker-compose.yml"
DEFAULT_CHAT_ID = "346967270"
NAMES = [
"Maximilian", "Cornelius", "Zephyr", "Archibald", "Balthazar",
"Ignatius", "Lysander", "Octavian", "Reginald", "Sylvester",
]
BENCHMARK = {
"easy": [
"hi",
"what is 2+2?",
"what is the capital of France?",
"tell me a short joke",
"how are you doing today?",
"thanks!",
"what day comes after Wednesday?",
"name the three primary colors",
"is the sky blue?",
"what does CPU stand for?",
],
"medium": [
"what is the current weather in Berlin?",
"find the latest news about artificial intelligence",
"what is the current price of Bitcoin?",
"search for a good pasta carbonara recipe",
"what movies are in theaters this week?",
"find Python tutorials for beginners",
"who won the last FIFA World Cup?",
"do you remember what we talked about before?",
"search for the best coffee shops in Tokyo",
"what is happening in the tech industry this week?",
"what's the weather like today?",
],
"hard": [
"/think compare the top 3 Python web frameworks (Django, FastAPI, Flask) and recommend one for a production REST API",
"/think research the history of artificial intelligence and create a timeline of key milestones",
"/think plan a 7-day trip to Japan with daily itinerary, accommodation suggestions, and budget breakdown",
"/think analyze microservices vs monolithic architecture: pros, cons, and when to choose each",
"/think write a Python script that reads a CSV file, cleans the data, and generates summary statistics",
"/think research quantum computing: explain the key concepts and how it differs from classical computing",
"/think compare PostgreSQL, MongoDB, and Redis — when to use each and what are the trade-offs?",
"/think create a comprehensive Docker deployment guide covering best practices for production",
"/think research climate change: summarize the latest IPCC findings and key data points",
"/think design a REST API with authentication, rate limiting, and proper error handling — provide architecture and code outline",
],
}
# ── terminal colours ──────────────────────────────────────────────────────────
PASS = "\033[32mPASS\033[0m"
FAIL = "\033[31mFAIL\033[0m"
INFO = "\033[36mINFO\033[0m"
WARN = "\033[33mWARN\033[0m"
# ── result helpers ────────────────────────────────────────────────────────────
def report(results: list, name: str, ok: bool, detail: str = ""):
tag = PASS if ok else FAIL
print(f" [{tag}] {name}" + (f"{detail}" if detail else ""))
results.append((name, ok))
def print_summary(results: list):
print(f"\n{''*55}")
total = len(results)
passed = sum(1 for _, ok in results if ok)
failed = total - passed
print(f"Results: {passed}/{total} passed", end="")
if failed:
print(f" ({failed} failed)\n")
print("Failed checks:")
for name, ok in results:
if not ok:
print(f" - {name}")
else:
print(" — all good")
print()
def tf(v):
"""Format timing value."""
return f"{v:6.2f}s" if v is not None else " n/a"
# ── HTTP helpers ──────────────────────────────────────────────────────────────
def get(url, timeout=5):
with urllib.request.urlopen(urllib.request.Request(url), timeout=timeout) as r:
return r.status, r.read().decode()
def post_json(url, payload, timeout=10):
data = json.dumps(payload).encode()
req = urllib.request.Request(
url, data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, json.loads(r.read().decode())
def check_sse(host, port, path):
try:
conn = http.client.HTTPConnection(host, port, timeout=5)
conn.request("GET", path, headers={"Accept": "text/event-stream"})
r = conn.getresponse()
conn.close()
return r.status == 200, f"HTTP {r.status}"
except Exception as e:
return False, str(e)
def qdrant_count():
try:
_, body = get(f"{QDRANT}/collections/adolf_memories")
return json.loads(body).get("result", {}).get("points_count", 0)
except Exception:
return 0
# ── log helpers ───────────────────────────────────────────────────────────────
def fetch_logs(since_s=600):
"""Return deepagents log lines from the last since_s seconds."""
try:
r = subprocess.run(
["docker", "compose", "-f", COMPOSE_FILE, "logs", "deepagents",
f"--since={int(since_s)}s", "--no-log-prefix"],
capture_output=True, text=True, timeout=15,
)
return r.stdout.splitlines()
except Exception:
return []
def fetch_bifrost_logs(since_s=120):
"""Return bifrost container log lines from the last since_s seconds."""
try:
r = subprocess.run(
["docker", "compose", "-f", COMPOSE_FILE, "logs", "bifrost",
f"--since={int(since_s)}s", "--no-log-prefix"],
capture_output=True, text=True, timeout=10,
)
return r.stdout.splitlines()
except Exception:
return []
def parse_run_block(lines, msg_prefix):
"""
Scan log lines for the LAST '[agent] running: <msg_prefix>' block.
Extracts reply timing, tier, and memory timing from that block.
Returns dict or None if the reply has not appeared in logs yet.
Dict keys:
reply_total, llm, send, tier, reply_text — from "[agent] replied in ..."
memory_s — from "[memory] stored in ..."
memory_error — True if "[memory] error" found
"""
search = msg_prefix[:50]
start_idx = None
for i, line in enumerate(lines):
if "[agent] running:" in line and search in line:
start_idx = i # keep updating — we want the LAST occurrence
if start_idx is None:
return None
block = lines[start_idx:]
last_ai_text = None
reply_data = None
for j, line in enumerate(block):
if "AIMessage:" in line and "" not in line:
txt = line.split("AIMessage:", 1)[-1].strip()
if txt:
last_ai_text = txt
m = re.search(r"replied in ([\d.]+)s \(llm=([\d.]+)s, send=([\d.]+)s\)", line)
if m:
tier_m = re.search(r"\btier=(\w+)", line)
tier = tier_m.group(1) if tier_m else "unknown"
reply_data = {
"reply_total": float(m.group(1)),
"llm": float(m.group(2)),
"send": float(m.group(3)),
"tier": tier,
"reply_text": last_ai_text,
"memory_s": None,
"memory_error": False,
"_j": j,
}
break
if reply_data is not None:
next_lines = block[reply_data["_j"] + 1: reply_data["_j"] + 3]
for line in next_lines:
if line.startswith("[agent] reply_text:"):
reply_data["reply_text"] = line[len("[agent] reply_text:"):].strip()
break
if reply_data is None:
return None
for line in block[reply_data["_j"] + 1:]:
mm = re.search(r"\[memory\] stored in ([\d.]+)s", line)
if mm:
reply_data["memory_s"] = float(mm.group(1))
break
if "[memory] error" in line:
reply_data["memory_error"] = True
break
return reply_data
def wait_for(label, msg_prefix, timeout_s=200, need_memory=True):
"""
Poll deepagents logs until the message is fully processed.
Shows a live progress line. Returns timing dict or None on timeout.
"""
t_start = time.monotonic()
deadline = t_start + timeout_s
tick = 0
last_result = None
while time.monotonic() < deadline:
since = int(time.monotonic() - t_start) + 90
lines = fetch_logs(since_s=since)
result = parse_run_block(lines, msg_prefix)
if result:
last_result = result
has_mem = result["memory_s"] is not None or result["memory_error"]
if (not need_memory) or has_mem:
elapsed = time.monotonic() - t_start
print(f"\r [{label}] done after {elapsed:.0f}s{' ' * 30}")
return result
time.sleep(4)
tick += 1
rem = int(deadline - time.monotonic())
if last_result:
phase = "waiting for memory..." if need_memory else "done"
else:
phase = "waiting for LLM reply..."
print(f"\r [{label}] {tick*4}s elapsed, {rem}s left — {phase} ", end="", flush=True)
print(f"\r [{label}] TIMEOUT after {timeout_s}s{' ' * 30}")
return None

View File

@@ -0,0 +1,214 @@
#!/usr/bin/env python3
"""
Adolf service health integration tests.
Checks:
1. deepagents /health — agent_ready
1b. openmemory /sse reachable
1c. grammy /sse reachable
2. Bifrost /health, /v1/models, direct inference, deepagents startup log
3. GPU Ollama — reachable, qwen3:8b present
4. CPU Ollama — reachable, nomic-embed-text present
5. Qdrant — reachable, adolf_memories collection, vector dims=768
6. SearXNG — reachable, JSON results, latency < 5s
Usage:
python3 test_health.py
"""
import json
import sys
import time
import urllib.request
from common import (
DEEPAGENTS, BIFROST, GRAMMY_HOST, GRAMMY_PORT,
OLLAMA_GPU, OLLAMA_CPU, QDRANT, SEARXNG, COMPOSE_FILE,
INFO, FAIL,
report, print_summary, tf,
get, post_json, check_sse, fetch_logs,
)
results = []
timings = {}
# ── 1. Service health ─────────────────────────────────────────────────────────
print(f"\n[{INFO}] 1. Service health")
t0 = time.monotonic()
try:
status, body = get(f"{DEEPAGENTS}/health")
data = json.loads(body)
ok = status == 200 and data.get("agent_ready") is True
report(results, "deepagents /health — agent_ready", ok,
f"agent_ready={data.get('agent_ready')}")
except Exception as e:
report(results, "deepagents /health", False, str(e))
ok, detail = check_sse("localhost", 8765, "/sse")
report(results, "openmemory /sse reachable", ok, detail)
ok, detail = check_sse(GRAMMY_HOST, GRAMMY_PORT, "/sse")
report(results, "grammy /sse reachable", ok, detail)
timings["health_check"] = time.monotonic() - t0
# ── 2. Bifrost gateway ────────────────────────────────────────────────────────
print(f"\n[{INFO}] 2. Bifrost gateway (port 8080)")
t0 = time.monotonic()
try:
status, body = get(f"{BIFROST}/health", timeout=5)
report(results, "Bifrost /health reachable", status == 200, f"HTTP {status}")
except Exception as e:
report(results, "Bifrost /health reachable", False, str(e))
try:
status, body = get(f"{BIFROST}/v1/models", timeout=5)
data = json.loads(body)
model_ids = [m.get("id", "") for m in data.get("data", [])]
gpu_models = [m for m in model_ids if m.startswith("ollama/")]
report(results, "Bifrost lists ollama GPU models", len(gpu_models) > 0,
f"found: {gpu_models}")
for expected in ["ollama/qwen3:4b", "ollama/qwen3:8b", "ollama/qwen2.5:1.5b"]:
report(results, f" model {expected} listed", expected in model_ids)
except Exception as e:
report(results, "Bifrost /v1/models", False, str(e))
print(f" [bifrost-infer] POST /v1/chat/completions → ollama/qwen2.5:0.5b ...")
t_infer = time.monotonic()
try:
infer_payload = {
"model": "ollama/qwen2.5:0.5b",
"messages": [{"role": "user", "content": "Reply with exactly one word: pong"}],
"max_tokens": 16,
}
data = json.dumps(infer_payload).encode()
req = urllib.request.Request(
f"{BIFROST}/v1/chat/completions",
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=60) as r:
infer_status = r.status
infer_body = json.loads(r.read().decode())
infer_elapsed = time.monotonic() - t_infer
reply_content = infer_body.get("choices", [{}])[0].get("message", {}).get("content", "")
used_model = infer_body.get("model", "")
report(results, "Bifrost → Ollama GPU inference succeeds",
infer_status == 200 and bool(reply_content),
f"{infer_elapsed:.1f}s model={used_model!r} reply={reply_content[:60]!r}")
timings["bifrost_direct_infer"] = infer_elapsed
except Exception as e:
report(results, "Bifrost → Ollama GPU inference succeeds", False, str(e))
timings["bifrost_direct_infer"] = None
try:
import subprocess
r = subprocess.run(
["docker", "compose", "-f", COMPOSE_FILE, "logs", "deepagents",
"--since=3600s", "--no-log-prefix"],
capture_output=True, text=True, timeout=10,
)
log_lines = r.stdout.splitlines()
bifrost_line = next(
(l for l in log_lines if "[agent] bifrost=" in l and "bifrost:8080" in l),
None,
)
report(results, "deepagents startup log confirms bifrost URL",
bifrost_line is not None,
bifrost_line.strip() if bifrost_line else "line not found in logs")
if bifrost_line:
has_prefix = "router=ollama/" in bifrost_line and "medium=ollama/" in bifrost_line
report(results, "deepagents model names use ollama/ prefix", has_prefix,
bifrost_line.strip())
except Exception as e:
report(results, "deepagents startup log check", False, str(e))
timings["bifrost_check"] = time.monotonic() - t0
# ── 3. GPU Ollama ─────────────────────────────────────────────────────────────
print(f"\n[{INFO}] 3. GPU Ollama (port 11436)")
t0 = time.monotonic()
try:
status, body = get(f"{OLLAMA_GPU}/api/tags")
models = [m["name"] for m in json.loads(body).get("models", [])]
has_qwen = any("qwen3" in m for m in models)
report(results, "GPU Ollama reachable", True, f"models: {models}")
report(results, "qwen3:8b present", has_qwen)
except Exception as e:
report(results, "GPU Ollama reachable", False, str(e))
report(results, "qwen3:8b present", False, "skipped")
timings["gpu_ollama_ping"] = time.monotonic() - t0
# ── 4. CPU Ollama ─────────────────────────────────────────────────────────────
print(f"\n[{INFO}] 4. CPU Ollama (port 11435)")
t0 = time.monotonic()
try:
status, body = get(f"{OLLAMA_CPU}/api/tags")
models = [m["name"] for m in json.loads(body).get("models", [])]
has_embed = any("nomic-embed-text" in m for m in models)
report(results, "CPU Ollama reachable", True, f"models: {models}")
report(results, "nomic-embed-text present", has_embed)
except Exception as e:
report(results, "CPU Ollama reachable", False, str(e))
report(results, "nomic-embed-text present", False, "skipped")
timings["cpu_ollama_ping"] = time.monotonic() - t0
# ── 5. Qdrant ─────────────────────────────────────────────────────────────────
print(f"\n[{INFO}] 5. Qdrant (port 6333)")
t0 = time.monotonic()
try:
status, body = get(f"{QDRANT}/collections")
cols = [c["name"] for c in json.loads(body).get("result", {}).get("collections", [])]
report(results, "Qdrant reachable", True, f"collections: {cols}")
report(results, "adolf_memories collection exists", "adolf_memories" in cols)
except Exception as e:
report(results, "Qdrant reachable", False, str(e))
report(results, "adolf_memories collection exists", False, "skipped")
try:
status, body = get(f"{QDRANT}/collections/adolf_memories")
info = json.loads(body).get("result", {})
dims = info.get("config", {}).get("params", {}).get("vectors", {}).get("size")
report(results, "vector dims = 768", dims == 768, f"got {dims}")
except Exception as e:
report(results, "adolf_memories collection info", False, str(e))
timings["qdrant_ping"] = time.monotonic() - t0
# ── 6. SearXNG ────────────────────────────────────────────────────────────────
print(f"\n[{INFO}] 6. SearXNG (port 11437)")
t0 = time.monotonic()
try:
status, body = get(f"{SEARXNG}/search?q=test&format=json", timeout=15)
elapsed = time.monotonic() - t0
n = len(json.loads(body).get("results", []))
report(results, "SearXNG reachable + JSON results", status == 200 and n > 0,
f"{n} results in {elapsed:.1f}s")
report(results, "SearXNG response < 5s", elapsed < 5, f"{elapsed:.2f}s")
timings["searxng_latency"] = elapsed
except Exception as e:
report(results, "SearXNG reachable", False, str(e))
report(results, "SearXNG response < 5s", False, "skipped")
timings["searxng_latency"] = None
timings["searxng_check"] = time.monotonic() - t0
# ── summary ───────────────────────────────────────────────────────────────────
print_summary(results)
sys.exit(0 if all(ok for _, ok in results) else 1)

View File

@@ -0,0 +1,438 @@
#!/usr/bin/env python3
"""
Adolf memory integration tests.
Tests:
1. Name store — POST "remember that your name is <RandomName>"
2. Qdrant point — verifies a new vector was written after store
3. Name recall — POST "what is your name?" → reply must contain <RandomName>
4. Bifrost — verifies store/recall requests passed through Bifrost
5. Timing profile — breakdown of store and recall latencies
6. Memory benchmark — store 5 personal facts, recall with 10 questions
7. Dedup test — same fact stored twice must not grow Qdrant by 2 points
Usage:
python3 test_memory.py [--chat-id CHAT_ID] [--name-only] [--bench-only] [--dedup-only]
"""
import argparse
import json
import random
import subprocess
import sys
import time
import urllib.request
from common import (
DEEPAGENTS, QDRANT, COMPOSE_FILE, DEFAULT_CHAT_ID,
NAMES,
INFO, PASS, FAIL, WARN,
report, print_summary, tf,
get, post_json, qdrant_count, fetch_logs, fetch_bifrost_logs,
parse_run_block, wait_for,
)
# ── args ──────────────────────────────────────────────────────────────────────
parser = argparse.ArgumentParser(description="Adolf memory integration tests")
parser.add_argument("--chat-id", default=DEFAULT_CHAT_ID)
parser.add_argument("--name-only", action="store_true", help="Run only the name store/recall test")
parser.add_argument("--bench-only", action="store_true", help="Run only the memory benchmark")
parser.add_argument("--dedup-only", action="store_true", help="Run only the deduplication test")
args = parser.parse_args()
CHAT_ID = args.chat_id
_only = args.name_only or args.bench_only or args.dedup_only
_run_name = not _only or args.name_only
_run_bench = not _only or args.bench_only
_run_dedup = not _only or args.dedup_only
results = []
timings = {}
random_name = random.choice(NAMES)
TEST_CHAT_ID = f"{CHAT_ID}-{random_name.lower()}"
if _run_name:
print(f"\n Test name : \033[1m{random_name}\033[0m")
print(f" Chat ID : {TEST_CHAT_ID}")
# ── 14. Name store / recall pipeline ────────────────────────────────────────
if _run_name:
print(f"\n[{INFO}] 1. Name store / recall pipeline")
store_msg = f"remember that your name is {random_name}"
recall_msg = "what is your name?"
# Clear memories so each run starts clean
try:
post_json(f"{QDRANT}/collections/adolf_memories/points/delete",
{"filter": {}}, timeout=5)
except Exception:
pass
pts_before = qdrant_count()
print(f" Qdrant points before: {pts_before}")
# ── 1. Store ──────────────────────────────────────────────────────────────
print(f"\n [store] '{store_msg}'")
t_store = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": store_msg, "chat_id": TEST_CHAT_ID}, timeout=5)
t_accept = time.monotonic() - t_store
report(results, "POST /chat (store) returns 202 immediately",
status == 202 and t_accept < 1, f"status={status}, t={t_accept:.3f}s")
timings["store_http_accept"] = t_accept
except Exception as e:
report(results, "POST /chat (store)", False, str(e))
print_summary(results)
sys.exit(1)
store = wait_for("store", store_msg, timeout_s=220, need_memory=True)
if store:
timings.update({
"store_llm": store["llm"],
"store_send": store["send"],
"store_reply": store["reply_total"],
"store_memory": store["memory_s"],
})
report(results, "Agent replied to store message", True,
f"{store['reply_total']:.1f}s total llm={store['llm']:.1f}s "
f"send={store['send']:.1f}s tier={store['tier']}")
if store["memory_s"] is not None:
report(results, "Memory stored without error", True, f"{store['memory_s']:.1f}s")
elif store["memory_error"]:
report(results, "Memory stored without error", False, "error in [memory] log")
else:
report(results, "Memory stored without error", False, "not found in logs")
print(f" Store reply: {store['reply_text']!r}")
else:
report(results, "Agent replied to store message", False, "timeout")
report(results, "Memory stored without error", False, "timeout")
print_summary(results)
sys.exit(1)
# ── 2. Qdrant point check ─────────────────────────────────────────────────
pts_after = qdrant_count()
new_pts = pts_after - pts_before
report(results, "New memory point(s) added to Qdrant", new_pts > 0,
f"{pts_before}{pts_after} (+{new_pts})")
timings["qdrant_new_points"] = new_pts
# ── 3. Recall ─────────────────────────────────────────────────────────────
print(f"\n [recall] '{recall_msg}'")
t_recall = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": recall_msg, "chat_id": TEST_CHAT_ID}, timeout=5)
t_accept2 = time.monotonic() - t_recall
report(results, "POST /chat (recall) returns 202 immediately",
status == 202 and t_accept2 < 1, f"status={status}, t={t_accept2:.3f}s")
timings["recall_http_accept"] = t_accept2
except Exception as e:
report(results, "POST /chat (recall)", False, str(e))
recall = wait_for("recall", recall_msg, timeout_s=160, need_memory=False)
if recall:
timings.update({
"recall_llm": recall["llm"],
"recall_send": recall["send"],
"recall_reply": recall["reply_total"],
})
report(results, "Agent replied to recall message", True,
f"{recall['reply_total']:.1f}s total llm={recall['llm']:.1f}s "
f"send={recall['send']:.1f}s tier={recall['tier']}")
reply_text = recall["reply_text"] or ""
name_in_reply = random_name.lower() in reply_text.lower()
report(results, f"Reply contains '{random_name}'", name_in_reply,
f"reply: {reply_text[:120]!r}")
else:
report(results, "Agent replied to recall message", False, "timeout")
report(results, f"Reply contains '{random_name}'", False, "no reply")
# ── 4. Bifrost pass-through check ─────────────────────────────────────────
bifrost_lines = fetch_bifrost_logs(since_s=300)
report(results, "Bifrost container has log output (requests forwarded)",
len(bifrost_lines) > 0, f"{len(bifrost_lines)} lines in bifrost logs")
bifrost_raw = "\n".join(bifrost_lines)
report(results, " Bifrost log shows AsyncOpenAI agent requests",
"AsyncOpenAI" in bifrost_raw,
f"{'found' if 'AsyncOpenAI' in bifrost_raw else 'NOT found'} in bifrost logs")
# ── 5. Timing profile ─────────────────────────────────────────────────────
print(f"\n[{INFO}] 5. Timing profile")
W = 36
print(f"\n {'Stage':<{W}} {'Time':>8}")
print(f" {''*W} {''*8}")
for label, key in [
("[GPU] HTTP accept — store turn", "store_http_accept"),
("[GPU] qwen3:Xb inference — store turn", "store_llm"),
("[GPU] Telegram send — store turn", "store_send"),
("[GPU] Total reply latency — store", "store_reply"),
("[GPU] qwen2.5:1.5b+embed — async mem", "store_memory"),
]:
print(f" {label:<{W}} {tf(timings.get(key)):>8}")
print(f" {''*W} {''*8}")
for label, key in [
("[GPU] HTTP accept — recall turn", "recall_http_accept"),
("[GPU] qwen3:Xb inference — recall", "recall_llm"),
("[GPU] Telegram send — recall turn", "recall_send"),
("[GPU] Total reply latency — recall", "recall_reply"),
]:
print(f" {label:<{W}} {tf(timings.get(key)):>8}")
print(f"\n Bottleneck analysis (each █ ≈ 5s):")
print(f" {''*(W+12)}")
candidates = [
("[GPU] qwen3:Xb — store reply ", timings.get("store_llm") or 0),
("[GPU] qwen3:Xb — recall reply", timings.get("recall_llm") or 0),
("[GPU] qwen2.5:1.5b+embed (async)", timings.get("store_memory") or 0),
]
candidates.sort(key=lambda x: x[1], reverse=True)
for label, t in candidates:
bar = "" * min(int(t / 5), 24)
total_pipeline = (timings.get("store_reply") or 0) + (timings.get("store_memory") or 0)
pct = f" {t/total_pipeline*100:4.0f}%" if total_pipeline > 0 else ""
print(f" {label} {t:6.1f}s {bar}{pct}")
print()
# ── 6. Memory benchmark ───────────────────────────────────────────────────────
if _run_bench:
_mem_name = random.choice(["Alice", "Bruno", "Camille", "Diego", "Elena",
"Farid", "Greta", "Hiroshi", "Irina", "Jonas"])
_mem_city = random.choice(["Tokyo", "Berlin", "Cairo", "Sydney", "Oslo",
"Nairobi", "Lisbon", "Seoul", "Montreal", "Bangkok"])
_mem_allergy = random.choice(["nuts", "gluten", "dairy", "shellfish", "eggs"])
_mem_job = random.choice([
("software engineer", "startup"),
("data scientist", "research lab"),
("product manager", "tech company"),
("DevOps engineer", "cloud provider"),
])
_mem_lang = random.choice(["Python", "Rust", "Go", "TypeScript", "Kotlin"])
_mem_pet_name = random.choice(["Whiskers", "Biscuit", "Mango", "Pebble", "Shadow",
"Noodle", "Cheddar", "Cosmo", "Pippin", "Ziggy"])
print(f"\n[{INFO}] 6. Memory benchmark")
print(f" name={_mem_name} city={_mem_city} allergy={_mem_allergy} "
f"job={_mem_job[0]}@{_mem_job[1]} lang={_mem_lang} pet={_mem_pet_name}")
print(f" Storing 5 facts, then querying with 10 recall questions")
print(f" Chat ID: {CHAT_ID}")
print()
# Wipe collection and restart openmemory for a clean slate
try:
req = urllib.request.Request(f"{QDRANT}/collections/adolf_memories", method="DELETE")
with urllib.request.urlopen(req, timeout=5):
pass
print(f" [{INFO}] Wiped adolf_memories collection")
except Exception as e:
print(f" [{WARN}] Could not wipe collection: {e}")
try:
subprocess.run(
["docker", "compose", "-f", COMPOSE_FILE, "restart", "openmemory"],
capture_output=True, timeout=30,
)
time.sleep(6)
print(f" [{INFO}] Restarted openmemory — fresh collection ready")
except Exception as e:
print(f" [{WARN}] Could not restart openmemory: {e}")
MEMORY_FACTS = [
f"My name is {_mem_name} and I live in {_mem_city}",
f"I prefer vegetarian food and I'm allergic to {_mem_allergy}",
f"I work as a {_mem_job[0]} at a {_mem_job[1]}",
f"My favorite programming language is {_mem_lang}",
f"I have a cat named {_mem_pet_name}",
]
MEMORY_RECALLS = [
("What is my name?", [_mem_name.lower()]),
("Where do I live?", [_mem_city.lower()]),
("Do I have any food allergies?", [_mem_allergy.lower()]),
("What is my job?", [_mem_job[0].split()[0].lower()]),
("What programming language do I prefer?", [_mem_lang.lower()]),
("Do I have any pets?", [_mem_pet_name.lower()]),
("Am I vegetarian or do I eat meat?", ["vegetarian"]),
("What city am I in?", [_mem_city.lower()]),
("Tell me what you know about me", [_mem_name.lower(), _mem_city.lower()]),
("What's the name of my pet?", [_mem_pet_name.lower()]),
]
STORE_TIMEOUT = 180
RECALL_TIMEOUT = 180
print(f" Storing {len(MEMORY_FACTS)} facts...")
store_ok = 0
for i, fact in enumerate(MEMORY_FACTS, 1):
print(f" [mem-store-{i:02d}] {fact!r}")
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": fact, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
print(f" → [{FAIL}] POST returned {status}")
continue
except Exception as e:
print(f" → [{FAIL}] POST error: {e}")
continue
found = wait_for(f"mem-store-{i:02d}", fact, timeout_s=STORE_TIMEOUT, need_memory=True)
if found:
store_ok += 1
print(f" → [{PASS}] stored tier={found['tier']} mem={found['memory_s']}s")
else:
print(f" → [{FAIL}] timeout")
report(results, f"All memory facts stored ({store_ok}/{len(MEMORY_FACTS)})",
store_ok == len(MEMORY_FACTS))
# Wait for async extraction to settle
print(f"\n Waiting for memory extraction to settle (up to 60s)...")
_prev_count = -1
_stable_ticks = 0
_cur_count = 0
for _ in range(30):
time.sleep(2)
try:
_, body = get(f"{QDRANT}/collections/adolf_memories")
_cur_count = json.loads(body).get("result", {}).get("points_count", 0)
except Exception:
_cur_count = _prev_count
if _cur_count == _prev_count:
_stable_ticks += 1
if _stable_ticks >= 3:
break
else:
_stable_ticks = 0
_prev_count = _cur_count
print(f" Memory settled: {_cur_count} points in Qdrant")
print(f"\n Querying with {len(MEMORY_RECALLS)} recall questions...")
recall_results = []
for i, (question, keywords) in enumerate(MEMORY_RECALLS, 1):
print(f" [mem-recall-{i:02d}] {question!r}")
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": question, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
print(f" → [{FAIL}] POST returned {status}")
recall_results.append((question, keywords, None, False))
continue
except Exception as e:
print(f" → [{FAIL}] POST error: {e}")
recall_results.append((question, keywords, None, False))
continue
t_start = time.monotonic()
found = None
while time.monotonic() - t_start < RECALL_TIMEOUT:
since = int(time.monotonic() - t_start) + 30
lines = fetch_logs(since_s=since)
found = parse_run_block(lines, question)
if found:
break
time.sleep(2)
if not found:
print(f" → [{FAIL}] timeout")
recall_results.append((question, keywords, None, False))
continue
reply_text = (found.get("reply_text") or "").lower()
hit_keywords = [kw for kw in keywords if kw.lower() in reply_text]
passed = len(hit_keywords) == len(keywords)
tag_str = PASS if passed else WARN
missing = [kw for kw in keywords if kw.lower() not in reply_text]
detail = f"tier={found['tier']} lat={found['reply_total']:.1f}s"
if missing:
detail += f" missing keywords: {missing}"
print(f" → [{tag_str}] {detail}")
recall_results.append((question, keywords, found.get("reply_text"), passed))
time.sleep(1)
print(f"\n {'#':<4} {'Pass':<5} {'Question':<45} {'Keywords'}")
print(f" {''*4} {''*5} {''*45} {''*30}")
for idx, (q, kws, reply, ok) in enumerate(recall_results, 1):
ok_str = "" if ok else ""
print(f" {ok_str} {idx:<3} {'yes' if ok else 'no':<5} {q[:45]:<45} {kws}")
recall_pass = sum(1 for _, _, _, ok in recall_results if ok)
total_recall = len(recall_results)
print(f"\n Memory recall score: {recall_pass}/{total_recall}")
report(results, f"Memory recall ({recall_pass}/{total_recall} keywords found)",
recall_pass == total_recall,
f"{recall_pass}/{total_recall} questions had all expected keywords in reply")
# ── 7. Deduplication test ─────────────────────────────────────────────────────
if _run_dedup:
print(f"\n[{INFO}] 7. Memory deduplication test")
print(f" Sends the same fact twice — Qdrant point count must not increase by 2")
print(f" Chat ID: {CHAT_ID}")
print()
DEDUP_TIMEOUT = 120
_dedup_fact = f"My lucky number is {random.randint(1000, 9999)}"
print(f" Fact: {_dedup_fact!r}")
pts_before = qdrant_count()
print(f" Qdrant points before: {pts_before}")
print(f" [dedup-1] sending fact (first time)")
found1 = None
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
report(results, "Dedup: first POST accepted", False, f"status={status}")
else:
found1 = wait_for("dedup-1", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True)
if found1:
print(f" [dedup-1] stored tier={found1['tier']} mem={found1['memory_s']}s")
else:
print(f" [dedup-1] timeout")
except Exception as e:
report(results, "Dedup: first POST accepted", False, str(e))
pts_after_first = qdrant_count()
new_first = pts_after_first - pts_before
print(f" Qdrant after first send: {pts_before}{pts_after_first} (+{new_first})")
print(f" [dedup-2] sending same fact (second time)")
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
report(results, "Dedup: second POST accepted", False, f"status={status}")
else:
found2 = wait_for("dedup-2", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True)
if found2:
print(f" [dedup-2] stored tier={found2['tier']} mem={found2['memory_s']}s")
else:
print(f" [dedup-2] timeout")
except Exception as e:
report(results, "Dedup: second POST accepted", False, str(e))
pts_after_second = qdrant_count()
new_second = pts_after_second - pts_after_first
print(f" Qdrant after second send: {pts_after_first}{pts_after_second} (+{new_second})")
dedup_ok = new_second <= new_first
report(results, "Deduplication: second identical fact not added to Qdrant", dedup_ok,
f"first send: +{new_first} pts, second send: +{new_second} pts (want second ≤ first)")
# ── summary ───────────────────────────────────────────────────────────────────
print_summary(results)
sys.exit(0 if all(ok for _, ok in results) else 1)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,317 @@
#!/usr/bin/env python3
"""
Adolf tier routing benchmark.
Tests:
easy — 10 questions that must route to 'light' tier
medium — 11 questions that must route to 'medium' (light acceptable for some; complex = fail)
hard — 10 /think questions that must route to 'complex' (medium fallback acceptable)
Usage:
python3 test_routing.py [--chat-id CHAT_ID]
[--easy-only] # only easy benchmark
[--medium-only] # only medium benchmark
[--hard-only] # only hard benchmark
"""
import argparse
import sys
import time
from common import (
DEEPAGENTS, COMPOSE_FILE, DEFAULT_CHAT_ID,
BENCHMARK,
INFO, PASS, FAIL, WARN,
report, print_summary,
post_json, fetch_logs,
parse_run_block,
)
# ── args ──────────────────────────────────────────────────────────────────────
parser = argparse.ArgumentParser(description="Adolf routing benchmark")
parser.add_argument("--chat-id", default=DEFAULT_CHAT_ID)
parser.add_argument("--easy-only", action="store_true")
parser.add_argument("--medium-only", action="store_true")
parser.add_argument("--hard-only", action="store_true")
args = parser.parse_args()
CHAT_ID = args.chat_id
_only = args.easy_only or args.medium_only or args.hard_only
_run_easy = not _only or args.easy_only
_run_medium = not _only or args.medium_only
_run_hard = not _only or args.hard_only
results = []
# ── easy benchmark ────────────────────────────────────────────────────────────
if _run_easy:
print(f"\n[{INFO}] Easy routing benchmark")
print(f" {len(BENCHMARK['easy'])} questions — all must route to 'light'")
print(f" Chat ID: {CHAT_ID}")
print()
bench_results = []
LIGHT_TIMEOUT = 60
for i, question in enumerate(BENCHMARK["easy"], 1):
tag = f"easy-{i:02d}"
print(f" [{tag}] {question[:55]!r}")
t_send = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": question, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
print(f" → [{FAIL}] POST returned {status}")
bench_results.append((question, "?", None, False))
continue
except Exception as e:
print(f" → [{FAIL}] POST error: {e}")
bench_results.append((question, "?", None, False))
continue
t_start = time.monotonic()
found = None
while time.monotonic() - t_start < LIGHT_TIMEOUT:
since = int(time.monotonic() - t_start) + 30
lines = fetch_logs(since_s=since)
found = parse_run_block(lines, question)
if found:
break
time.sleep(1)
if not found:
print(f" → [{FAIL}] no reply within {LIGHT_TIMEOUT}s")
bench_results.append((question, "timeout", None, False))
continue
tier = found.get("tier", "unknown")
is_light = (tier == "light")
tag_str = PASS if is_light else FAIL
print(f" → [{tag_str}] tier={tier} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s")
bench_results.append((question, tier, found["reply_total"], is_light))
time.sleep(1)
print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}")
print(f" {''*4} {''*8} {''*8} {''*50}")
for idx, (q, tier, lat, ok) in enumerate(bench_results, 1):
lat_str = f"{lat:.1f}s" if lat is not None else "timeout"
ok_str = "" if ok else ""
print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:50]!r}")
light_count = sum(1 for _, _, _, ok in bench_results if ok)
total_bench = len(bench_results)
lats = [lat for _, _, lat, ok in bench_results if ok and lat is not None]
avg_lat = sum(lats) / len(lats) if lats else 0
print(f"\n Light-path score: {light_count}/{total_bench}")
if lats:
print(f" Avg latency (light): {avg_lat:.1f}s min={min(lats):.1f}s max={max(lats):.1f}s")
report(results, f"All easy questions routed to light ({light_count}/{total_bench})",
light_count == total_bench,
f"{light_count}/{total_bench} via light path, avg {avg_lat:.1f}s")
# ── medium benchmark ──────────────────────────────────────────────────────────
if _run_medium:
print(f"\n[{INFO}] Medium routing benchmark")
print(f" {len(BENCHMARK['medium'])} questions — must route to medium (light ok for some; complex = fail)")
print(f" Chat ID: {CHAT_ID}")
print()
LIGHT_ACCEPTABLE = {
"who won the last FIFA World Cup?",
"search for a good pasta carbonara recipe",
"find Python tutorials for beginners",
"search for the best coffee shops in Tokyo",
}
med_results = []
MEDIUM_TIMEOUT = 120
for i, question in enumerate(BENCHMARK["medium"], 1):
tag = f"med-{i:02d}"
print(f" [{tag}] {question[:60]!r}")
t_send = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": question, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
print(f" → [{FAIL}] POST returned {status}")
med_results.append((question, "?", None, False))
continue
except Exception as e:
print(f" → [{FAIL}] POST error: {e}")
med_results.append((question, "?", None, False))
continue
t_start = time.monotonic()
found = None
while time.monotonic() - t_start < MEDIUM_TIMEOUT:
since = int(time.monotonic() - t_start) + 60
lines = fetch_logs(since_s=since)
found = parse_run_block(lines, question)
if found:
break
time.sleep(3)
if not found:
print(f" → [{FAIL}] no reply within {MEDIUM_TIMEOUT}s")
med_results.append((question, "timeout", None, False))
continue
tier = found.get("tier", "unknown")
light_ok = question in LIGHT_ACCEPTABLE
if tier == "medium":
correct, label, note = True, PASS, "medium ✓"
elif tier == "light":
correct = light_ok
label = PASS if light_ok else WARN
note = "light (acceptable)" if light_ok else "light (should be medium)"
elif tier == "complex":
correct, label, note = False, FAIL, "complex — wrong escalation"
else:
correct, label, note = False, FAIL, f"unknown tier {tier!r}"
print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s")
med_results.append((question, tier, found["reply_total"], correct))
time.sleep(1)
print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}")
print(f" {''*4} {''*8} {''*8} {''*55}")
for idx, (q, tier, lat, ok) in enumerate(med_results, 1):
lat_str = f"{lat:.1f}s" if lat is not None else "timeout"
ok_str = "" if ok else ("~" if tier == "light" else "")
print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:55]!r}")
total_med = len(med_results)
medium_count = sum(1 for _, tier, _, _ in med_results if tier == "medium")
light_count = sum(1 for _, tier, _, _ in med_results if tier == "light")
complex_count = sum(1 for _, tier, _, _ in med_results if tier == "complex")
timeout_count = sum(1 for _, tier, _, _ in med_results if tier == "timeout")
light_misroute = sum(1 for q, tier, _, _ in med_results
if tier == "light" and q not in LIGHT_ACCEPTABLE)
lats = [lat for _, _, lat, _ in med_results if lat is not None]
print(f"\n Breakdown: medium={medium_count} light={light_count} "
f"complex={complex_count} timeout={timeout_count}")
if light_misroute:
print(f" [{WARN}] {light_misroute} question(s) answered via light when medium expected")
if lats:
print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s")
report(results,
f"Medium questions: no complex escalation ({medium_count + light_count}/{total_med} routed)",
complex_count == 0,
f"medium={medium_count} light={light_count} complex={complex_count} timeout={timeout_count}")
if timeout_count:
report(results, f"Medium questions: all completed within {MEDIUM_TIMEOUT}s", False,
f"{timeout_count} question(s) timed out")
# ── hard benchmark ────────────────────────────────────────────────────────────
if _run_hard:
print(f"\n[{INFO}] Hard routing benchmark")
print(f" {len(BENCHMARK['hard'])} /think questions — must route to 'complex'")
print(f" Acceptable fallback: 'medium' if VRAM eviction timed out")
print(f" Fail condition: tier=light or timeout")
print(f" Chat ID: {CHAT_ID}")
print()
hard_results = []
COMPLEX_TIMEOUT = 300
_VRAM_ENTER = "[vram] enter_complex_mode"
_VRAM_EXIT = "[vram] exit_complex_mode"
for i, question in enumerate(BENCHMARK["hard"], 1):
tag = f"hard-{i:02d}"
short_q = question[len("/think "):].strip()[:60]
print(f" [{tag}] /think {short_q!r}")
t_send = time.monotonic()
try:
status, _ = post_json(f"{DEEPAGENTS}/chat",
{"message": question, "chat_id": CHAT_ID}, timeout=5)
if status != 202:
print(f" → [{FAIL}] POST returned {status}")
hard_results.append((question, "?", None, False))
continue
except Exception as e:
print(f" → [{FAIL}] POST error: {e}")
hard_results.append((question, "?", None, False))
continue
t_start = time.monotonic()
found = None
while time.monotonic() - t_start < COMPLEX_TIMEOUT:
since = int(time.monotonic() - t_start) + 90
lines = fetch_logs(since_s=since)
found = parse_run_block(lines, question[len("/think "):].strip())
if found:
break
time.sleep(5)
elapsed = time.monotonic() - t_send
if not found:
print(f" → [{FAIL}] no reply within {COMPLEX_TIMEOUT}s")
hard_results.append((question, "timeout", None, False))
continue
tier = found.get("tier", "unknown")
if tier == "complex":
ok, label, note = True, PASS, "complex ✓"
elif tier == "medium":
ok, label, note = True, WARN, "medium (VRAM fallback — check [vram] logs)"
else:
ok, label, note = False, FAIL, f"tier={tier} — unexpected"
lines_block = fetch_logs(since_s=int(elapsed) + 120)
recent = "\n".join(lines_block[-200:])
vram_enter_seen = _VRAM_ENTER in recent
vram_note = ""
if tier == "complex":
vram_note = " [vram:flush✓]" if vram_enter_seen else f" [{WARN}:no vram flush log]"
print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s{vram_note}")
hard_results.append((question, tier, found["reply_total"], ok))
time.sleep(5)
print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question (/think ...)'}")
print(f" {''*4} {''*8} {''*8} {''*55}")
for idx, (q, tier, lat, ok) in enumerate(hard_results, 1):
lat_str = f"{lat:.1f}s" if lat is not None else "timeout"
ok_str = "" if tier == "complex" else ("~" if tier == "medium" else "")
short = q[len("/think "):].strip()[:55]
print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {short!r}")
total_hard = len(hard_results)
complex_count = sum(1 for _, t, _, _ in hard_results if t == "complex")
medium_fb = sum(1 for _, t, _, _ in hard_results if t == "medium")
light_count = sum(1 for _, t, _, _ in hard_results if t == "light")
timeout_count = sum(1 for _, t, _, _ in hard_results if t == "timeout")
lats = [lat for _, _, lat, _ in hard_results if lat is not None]
print(f"\n Breakdown: complex={complex_count} medium(fallback)={medium_fb} "
f"light={light_count} timeout={timeout_count}")
if medium_fb:
print(f" [{WARN}] {medium_fb} question(s) fell back to medium (VRAM eviction timeout)")
if light_count:
print(f" [{FAIL}] {light_count} question(s) routed to light — /think prefix not detected")
if lats:
print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s")
report(results,
f"Hard questions routed to complex (not light) ({complex_count + medium_fb}/{total_hard})",
light_count == 0 and timeout_count == 0,
f"complex={complex_count} medium_fallback={medium_fb} light={light_count} timeout={timeout_count}")
# ── summary ───────────────────────────────────────────────────────────────────
print_summary(results)
sys.exit(0 if all(ok for _, ok in results) else 1)