Add Adolf architecture doc and integration test script
- ARCHITECTURE.md: comprehensive pipeline description (copied from Gitea wiki) - test_pipeline.py: tests all services, memory, async timing, and recall Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
316
adolf/test_pipeline.py
Normal file
316
adolf/test_pipeline.py
Normal file
@@ -0,0 +1,316 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Adolf pipeline integration test.
|
||||
|
||||
Tests:
|
||||
1. Service health (deepagents, openmemory, grammy MCP SSE)
|
||||
2. GPU Ollama reachability and model availability
|
||||
3. CPU Ollama reachability and model availability
|
||||
4. Qdrant reachability and adolf_memories collection
|
||||
5. SearXNG reachability and JSON results
|
||||
6. Full chat pipeline — POST /chat returns 202 immediately
|
||||
7. Async memory storage — memories appear in Qdrant after reply
|
||||
8. Memory recall — agent retrieves stored facts on next query
|
||||
9. Async timing — reply logged before memory stored (from deepagents logs)
|
||||
|
||||
Usage:
|
||||
python3 test_pipeline.py [--chat-id CHAT_ID]
|
||||
|
||||
Does NOT send real Telegram messages — calls deepagents /chat directly and
|
||||
reads Qdrant to verify memory. Grammy delivery is confirmed via MCP tool
|
||||
visible in deepagents logs ('[agent] replied in Xs ... send=Ys').
|
||||
|
||||
Known limitation: gemma3:1b (CPU extraction model) may abstract or
|
||||
deduplicate memories rather than storing raw text verbatim.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import http.client
|
||||
import json
|
||||
import random
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
|
||||
# ── config ────────────────────────────────────────────────────────────────────
|
||||
DEEPAGENTS = "http://localhost:8000"
|
||||
OPENMEMORY = "http://localhost:8765"
|
||||
GRAMMY_HOST = "localhost"
|
||||
GRAMMY_PORT = 3001
|
||||
OLLAMA_GPU = "http://localhost:11436"
|
||||
OLLAMA_CPU = "http://localhost:11435"
|
||||
QDRANT = "http://localhost:6333"
|
||||
SEARXNG = "http://localhost:11437"
|
||||
|
||||
DEFAULT_CHAT_ID = "346967270"
|
||||
|
||||
PASS = "\033[32mPASS\033[0m"
|
||||
FAIL = "\033[31mFAIL\033[0m"
|
||||
INFO = "\033[36mINFO\033[0m"
|
||||
|
||||
results = []
|
||||
|
||||
|
||||
def report(name, ok, detail=""):
|
||||
tag = PASS if ok else FAIL
|
||||
line = f" [{tag}] {name}"
|
||||
if detail:
|
||||
line += f" — {detail}"
|
||||
print(line)
|
||||
results.append((name, ok))
|
||||
|
||||
|
||||
def get(url, timeout=5):
|
||||
req = urllib.request.Request(url)
|
||||
with urllib.request.urlopen(req, timeout=timeout) as r:
|
||||
return r.status, r.read().decode()
|
||||
|
||||
|
||||
def post_json(url, payload, timeout=30):
|
||||
data = json.dumps(payload).encode()
|
||||
req = urllib.request.Request(
|
||||
url, data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST"
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=timeout) as r:
|
||||
return r.status, json.loads(r.read().decode())
|
||||
|
||||
|
||||
def check_sse(host, port, path):
|
||||
"""
|
||||
SSE endpoints stream indefinitely — urlopen would hang waiting for body.
|
||||
Use http.client directly to read just the response status line and headers.
|
||||
"""
|
||||
try:
|
||||
conn = http.client.HTTPConnection(host, port, timeout=5)
|
||||
conn.request("GET", path, headers={"Accept": "text/event-stream"})
|
||||
r = conn.getresponse()
|
||||
ok = r.status == 200
|
||||
conn.close()
|
||||
return ok, f"HTTP {r.status}"
|
||||
except Exception as e:
|
||||
return False, str(e)
|
||||
|
||||
|
||||
# ── 1. service health ─────────────────────────────────────────────────────────
|
||||
print(f"\n[{INFO}] 1. Service health")
|
||||
|
||||
try:
|
||||
status, body = get(f"{DEEPAGENTS}/health")
|
||||
data = json.loads(body)
|
||||
ok = status == 200 and data.get("agent_ready") is True
|
||||
report("deepagents /health — agent_ready", ok, f"agent_ready={data.get('agent_ready')}")
|
||||
except Exception as e:
|
||||
report("deepagents /health", False, str(e))
|
||||
|
||||
ok, detail = check_sse("localhost", 8765, "/sse")
|
||||
report("openmemory /sse reachable (HTTP 200)", ok, detail)
|
||||
|
||||
ok, detail = check_sse(GRAMMY_HOST, GRAMMY_PORT, "/sse")
|
||||
report("grammy /sse reachable (HTTP 200)", ok, detail)
|
||||
|
||||
|
||||
# ── 2. GPU Ollama ─────────────────────────────────────────────────────────────
|
||||
print(f"\n[{INFO}] 2. GPU Ollama (port 11436)")
|
||||
|
||||
try:
|
||||
status, body = get(f"{OLLAMA_GPU}/api/tags")
|
||||
models = [m["name"] for m in json.loads(body).get("models", [])]
|
||||
has_qwen = any("qwen3" in m for m in models)
|
||||
report("GPU Ollama reachable", True, f"models: {models}")
|
||||
report("qwen3:8b present on GPU Ollama", has_qwen)
|
||||
except Exception as e:
|
||||
report("GPU Ollama reachable", False, str(e))
|
||||
report("qwen3:8b present on GPU Ollama", False, "skipped")
|
||||
|
||||
|
||||
# ── 3. CPU Ollama ─────────────────────────────────────────────────────────────
|
||||
print(f"\n[{INFO}] 3. CPU Ollama (port 11435)")
|
||||
|
||||
try:
|
||||
status, body = get(f"{OLLAMA_CPU}/api/tags")
|
||||
models = [m["name"] for m in json.loads(body).get("models", [])]
|
||||
has_embed = any("nomic-embed-text" in m for m in models)
|
||||
has_gemma = any("gemma3:1b" in m for m in models)
|
||||
report("CPU Ollama reachable", True, f"models: {models}")
|
||||
report("nomic-embed-text present on CPU Ollama", has_embed)
|
||||
report("gemma3:1b present on CPU Ollama", has_gemma)
|
||||
except Exception as e:
|
||||
report("CPU Ollama reachable", False, str(e))
|
||||
report("nomic-embed-text present on CPU Ollama", False, "skipped")
|
||||
report("gemma3:1b present on CPU Ollama", False, "skipped")
|
||||
|
||||
|
||||
# ── 4. Qdrant ─────────────────────────────────────────────────────────────────
|
||||
print(f"\n[{INFO}] 4. Qdrant (port 6333)")
|
||||
|
||||
try:
|
||||
status, body = get(f"{QDRANT}/collections")
|
||||
collections = [c["name"] for c in json.loads(body).get("result", {}).get("collections", [])]
|
||||
has_col = "adolf_memories" in collections
|
||||
report("Qdrant reachable", True, f"collections: {collections}")
|
||||
report("adolf_memories collection exists", has_col)
|
||||
except Exception as e:
|
||||
report("Qdrant reachable", False, str(e))
|
||||
report("adolf_memories collection exists", False, "skipped")
|
||||
|
||||
try:
|
||||
status, body = get(f"{QDRANT}/collections/adolf_memories")
|
||||
info = json.loads(body).get("result", {})
|
||||
dims = info.get("config", {}).get("params", {}).get("vectors", {}).get("size")
|
||||
report("adolf_memories vector dims = 768", dims == 768, f"got {dims}")
|
||||
except Exception as e:
|
||||
report("adolf_memories collection info", False, str(e))
|
||||
|
||||
|
||||
# ── 5. SearXNG ────────────────────────────────────────────────────────────────
|
||||
print(f"\n[{INFO}] 5. SearXNG (port 11437)")
|
||||
|
||||
try:
|
||||
t0 = time.monotonic()
|
||||
status, body = get(f"{SEARXNG}/search?q=test&format=json", timeout=15)
|
||||
elapsed = time.monotonic() - t0
|
||||
data = json.loads(body)
|
||||
n_results = len(data.get("results", []))
|
||||
report("SearXNG reachable + JSON format enabled", status == 200 and n_results > 0,
|
||||
f"{n_results} results in {elapsed:.1f}s")
|
||||
report("SearXNG response < 5s", elapsed < 5, f"{elapsed:.2f}s")
|
||||
except Exception as e:
|
||||
report("SearXNG reachable", False, str(e))
|
||||
report("SearXNG response < 5s", False, "skipped")
|
||||
|
||||
|
||||
# ── 6. POST /chat returns 202 immediately ─────────────────────────────────────
|
||||
print(f"\n[{INFO}] 6–8. Full pipeline (chat → reply → memory → recall)")
|
||||
print(f" Using chat_id={DEFAULT_CHAT_ID}")
|
||||
|
||||
marker_word = f"testword{random.randint(1000, 9999)}"
|
||||
marker_msg = f"My test marker for this run is: {marker_word}. Please acknowledge."
|
||||
|
||||
# Record point count before test so we can verify new points are added
|
||||
try:
|
||||
_, col_body = get(f"{QDRANT}/collections/adolf_memories")
|
||||
points_before = json.loads(col_body).get("result", {}).get("points_count", 0)
|
||||
except Exception:
|
||||
points_before = 0
|
||||
|
||||
print(f"\n [send] '{marker_msg}'")
|
||||
print(f" Qdrant points before: {points_before}")
|
||||
|
||||
t_send = time.monotonic()
|
||||
try:
|
||||
status, resp = post_json(f"{DEEPAGENTS}/chat",
|
||||
{"message": marker_msg, "chat_id": DEFAULT_CHAT_ID},
|
||||
timeout=5)
|
||||
t_accepted = time.monotonic() - t_send
|
||||
report("POST /chat returns 202 immediately (< 1s)", status == 202 and t_accepted < 1,
|
||||
f"status={status}, t={t_accepted:.3f}s")
|
||||
except Exception as e:
|
||||
report("POST /chat returns 202 immediately", False, str(e))
|
||||
print(" Cannot continue pipeline tests.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# ── 7. Async memory storage ───────────────────────────────────────────────────
|
||||
# Wait long enough for: GPU reply (~20s) + async CPU memory store (~20s) = ~40s
|
||||
print(f" Waiting 50s for agent reply + async memory store…")
|
||||
for i in range(10):
|
||||
time.sleep(5)
|
||||
print(f" …{(i+1)*5}s", end="\r")
|
||||
print()
|
||||
|
||||
try:
|
||||
_, col_body = get(f"{QDRANT}/collections/adolf_memories")
|
||||
points_after = json.loads(col_body).get("result", {}).get("points_count", 0)
|
||||
new_points = points_after - points_before
|
||||
report("New memory point(s) added to Qdrant after reply", new_points > 0,
|
||||
f"{points_before} → {points_after} (+{new_points})")
|
||||
except Exception as e:
|
||||
report("Qdrant points after reply", False, str(e))
|
||||
|
||||
# Inspect Qdrant payloads — the `data` field holds what mem0 stored
|
||||
# Note: gemma3:1b may abstract/rewrite facts; raw marker_word may or may not appear
|
||||
try:
|
||||
_, scroll_body = post_json(
|
||||
f"{QDRANT}/collections/adolf_memories/points/scroll",
|
||||
{"limit": 50, "with_payload": True, "with_vector": False},
|
||||
timeout=10
|
||||
)
|
||||
points = scroll_body.get("result", {}).get("points", [])
|
||||
all_data = [str(p.get("payload", {}).get("data", "")) for p in points]
|
||||
marker_in_data = any(marker_word in d for d in all_data)
|
||||
report(
|
||||
f"Marker '{marker_word}' found verbatim in Qdrant payloads",
|
||||
marker_in_data,
|
||||
"(gemma3:1b may abstract facts — check logs if FAIL)" if not marker_in_data else "found"
|
||||
)
|
||||
if not marker_in_data and all_data:
|
||||
print(f" Most recent stored data: {all_data[-1][:120]!r}")
|
||||
except Exception as e:
|
||||
report("Qdrant payload inspection", False, str(e))
|
||||
|
||||
|
||||
# ── 8. Memory recall ──────────────────────────────────────────────────────────
|
||||
recall_msg = f"What is the test marker word I just told you? (hint: it starts with 'testword')"
|
||||
print(f"\n [recall] '{recall_msg}'")
|
||||
|
||||
try:
|
||||
status, _ = post_json(f"{DEEPAGENTS}/chat",
|
||||
{"message": recall_msg, "chat_id": DEFAULT_CHAT_ID},
|
||||
timeout=5)
|
||||
report("Recall query accepted (202)", status == 202)
|
||||
except Exception as e:
|
||||
report("Recall query accepted", False, str(e))
|
||||
|
||||
print(f" Waiting 35s for recall reply (check Telegram for actual answer)…")
|
||||
for i in range(7):
|
||||
time.sleep(5)
|
||||
print(f" …{(i+1)*5}s", end="\r")
|
||||
print()
|
||||
print(f" NOTE: Check Telegram — the bot should reply with '{marker_word}'.")
|
||||
print(f" Check deepagents logs for: search_memory tool call and correct result.")
|
||||
|
||||
|
||||
# ── 9. Async timing verification ──────────────────────────────────────────────
|
||||
print(f"\n[{INFO}] 9. Async pipeline timing")
|
||||
|
||||
# Verify two rapid POSTs both return 202 quickly (queuing, not blocking)
|
||||
t0 = time.monotonic()
|
||||
try:
|
||||
s1, _ = post_json(f"{DEEPAGENTS}/chat",
|
||||
{"message": "async timing check one", "chat_id": DEFAULT_CHAT_ID},
|
||||
timeout=3)
|
||||
s2, _ = post_json(f"{DEEPAGENTS}/chat",
|
||||
{"message": "async timing check two", "chat_id": DEFAULT_CHAT_ID},
|
||||
timeout=3)
|
||||
t_both = time.monotonic() - t0
|
||||
report("Two consecutive POSTs both 202, total < 1s (fire-and-forget queue)",
|
||||
s1 == 202 and s2 == 202 and t_both < 1, f"{t_both:.3f}s")
|
||||
except Exception as e:
|
||||
report("Consecutive POST queueing", False, str(e))
|
||||
|
||||
print()
|
||||
print(f" To confirm reply-before-memory async ordering, run:")
|
||||
print(f" docker compose -f adolf/docker-compose.yml logs deepagents | grep -E 'replied|stored'")
|
||||
print(f" Expected pattern per message:")
|
||||
print(f" [agent] replied in Xs ← GPU reply first")
|
||||
print(f" [memory] stored in Ys ← CPU memory after (Y > X - reply_time)")
|
||||
|
||||
|
||||
# ── summary ───────────────────────────────────────────────────────────────────
|
||||
print(f"\n{'─'*55}")
|
||||
total = len(results)
|
||||
passed = sum(1 for _, ok in results if ok)
|
||||
failed = total - passed
|
||||
print(f"Results: {passed}/{total} passed", end="")
|
||||
if failed:
|
||||
print(f" ({failed} failed)\n")
|
||||
print("Failed checks:")
|
||||
for name, ok in results:
|
||||
if not ok:
|
||||
print(f" - {name}")
|
||||
else:
|
||||
print(" — all good")
|
||||
print()
|
||||
Reference in New Issue
Block a user