#!/usr/bin/env python3 """ Adolf memory integration tests. Tests: 1. Name store — POST "remember that your name is " 2. Qdrant point — verifies a new vector was written after store 3. Name recall — POST "what is your name?" → reply must contain 4. LiteLLM — verifies LiteLLM proxy is reachable (replaced Bifrost) 5. Timing profile — breakdown of store and recall latencies 6. Memory benchmark — store 5 personal facts, recall with 10 questions 7. Dedup test — same fact stored twice must not grow Qdrant by 2 points Usage: python3 test_memory.py [--chat-id CHAT_ID] [--name-only] [--bench-only] [--dedup-only] """ import argparse import json import random import subprocess import sys import time import urllib.request from common import ( DEEPAGENTS, LITELLM, QDRANT, COMPOSE_FILE, DEFAULT_CHAT_ID, NAMES, INFO, PASS, FAIL, WARN, report, print_summary, tf, get, post_json, qdrant_count, fetch_logs, parse_run_block, wait_for, ) # ── args ────────────────────────────────────────────────────────────────────── parser = argparse.ArgumentParser(description="Adolf memory integration tests") parser.add_argument("--chat-id", default=DEFAULT_CHAT_ID) parser.add_argument("--name-only", action="store_true", help="Run only the name store/recall test") parser.add_argument("--bench-only", action="store_true", help="Run only the memory benchmark") parser.add_argument("--dedup-only", action="store_true", help="Run only the deduplication test") args = parser.parse_args() CHAT_ID = args.chat_id _only = args.name_only or args.bench_only or args.dedup_only _run_name = not _only or args.name_only _run_bench = not _only or args.bench_only _run_dedup = not _only or args.dedup_only results = [] timings = {} random_name = random.choice(NAMES) TEST_CHAT_ID = f"{CHAT_ID}-{random_name.lower()}" if _run_name: print(f"\n Test name : \033[1m{random_name}\033[0m") print(f" Chat ID : {TEST_CHAT_ID}") # ── 1–4. Name store / recall pipeline ──────────────────────────────────────── if _run_name: print(f"\n[{INFO}] 1. Name store / recall pipeline") store_msg = f"remember that your name is {random_name}" recall_msg = "what is your name?" # Clear memories so each run starts clean try: post_json(f"{QDRANT}/collections/adolf_memories/points/delete", {"filter": {}}, timeout=5) except Exception: pass pts_before = qdrant_count() print(f" Qdrant points before: {pts_before}") # ── 1. Store ────────────────────────────────────────────────────────────── print(f"\n [store] '{store_msg}'") t_store = time.monotonic() try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": store_msg, "chat_id": TEST_CHAT_ID}, timeout=5) t_accept = time.monotonic() - t_store report(results, "POST /chat (store) returns 202 immediately", status == 202 and t_accept < 1, f"status={status}, t={t_accept:.3f}s") timings["store_http_accept"] = t_accept except Exception as e: report(results, "POST /chat (store)", False, str(e)) print_summary(results) sys.exit(1) store = wait_for("store", store_msg, timeout_s=220, need_memory=True) if store: timings.update({ "store_llm": store["llm"], "store_send": store["send"], "store_reply": store["reply_total"], "store_memory": store["memory_s"], }) report(results, "Agent replied to store message", True, f"{store['reply_total']:.1f}s total llm={store['llm']:.1f}s " f"send={store['send']:.1f}s tier={store['tier']}") if store["memory_s"] is not None: report(results, "Memory stored without error", True, f"{store['memory_s']:.1f}s") elif store["memory_error"]: report(results, "Memory stored without error", False, "error in [memory] log") else: report(results, "Memory stored without error", False, "not found in logs") print(f" Store reply: {store['reply_text']!r}") else: report(results, "Agent replied to store message", False, "timeout") report(results, "Memory stored without error", False, "timeout") print_summary(results) sys.exit(1) # ── 2. Qdrant point check ───────────────────────────────────────────────── pts_after = qdrant_count() new_pts = pts_after - pts_before report(results, "New memory point(s) added to Qdrant", new_pts > 0, f"{pts_before} → {pts_after} (+{new_pts})") timings["qdrant_new_points"] = new_pts # ── 3. Recall ───────────────────────────────────────────────────────────── print(f"\n [recall] '{recall_msg}'") t_recall = time.monotonic() try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": recall_msg, "chat_id": TEST_CHAT_ID}, timeout=5) t_accept2 = time.monotonic() - t_recall report(results, "POST /chat (recall) returns 202 immediately", status == 202 and t_accept2 < 1, f"status={status}, t={t_accept2:.3f}s") timings["recall_http_accept"] = t_accept2 except Exception as e: report(results, "POST /chat (recall)", False, str(e)) recall = wait_for("recall", recall_msg, timeout_s=160, need_memory=False) if recall: timings.update({ "recall_llm": recall["llm"], "recall_send": recall["send"], "recall_reply": recall["reply_total"], }) report(results, "Agent replied to recall message", True, f"{recall['reply_total']:.1f}s total llm={recall['llm']:.1f}s " f"send={recall['send']:.1f}s tier={recall['tier']}") reply_text = recall["reply_text"] or "" name_in_reply = random_name.lower() in reply_text.lower() report(results, f"Reply contains '{random_name}'", name_in_reply, f"reply: {reply_text[:120]!r}") else: report(results, "Agent replied to recall message", False, "timeout") report(results, f"Reply contains '{random_name}'", False, "no reply") # ── 4. LiteLLM proxy reachable (replaced Bifrost) ───────────────────────── try: status, _ = get(f"{LITELLM}/health", timeout=5) litellm_ok = status == 200 except Exception: litellm_ok = False report(results, "LiteLLM proxy reachable", litellm_ok) # ── 5. Timing profile ───────────────────────────────────────────────────── print(f"\n[{INFO}] 5. Timing profile") W = 36 print(f"\n {'Stage':<{W}} {'Time':>8}") print(f" {'─'*W} {'─'*8}") for label, key in [ ("[GPU] HTTP accept — store turn", "store_http_accept"), ("[GPU] qwen3:Xb inference — store turn", "store_llm"), ("[GPU] Telegram send — store turn", "store_send"), ("[GPU] Total reply latency — store", "store_reply"), ("[GPU] qwen2.5:1.5b+embed — async mem", "store_memory"), ]: print(f" {label:<{W}} {tf(timings.get(key)):>8}") print(f" {'─'*W} {'─'*8}") for label, key in [ ("[GPU] HTTP accept — recall turn", "recall_http_accept"), ("[GPU] qwen3:Xb inference — recall", "recall_llm"), ("[GPU] Telegram send — recall turn", "recall_send"), ("[GPU] Total reply latency — recall", "recall_reply"), ]: print(f" {label:<{W}} {tf(timings.get(key)):>8}") print(f"\n Bottleneck analysis (each █ ≈ 5s):") print(f" {'─'*(W+12)}") candidates = [ ("[GPU] qwen3:Xb — store reply ", timings.get("store_llm") or 0), ("[GPU] qwen3:Xb — recall reply", timings.get("recall_llm") or 0), ("[GPU] qwen2.5:1.5b+embed (async)", timings.get("store_memory") or 0), ] candidates.sort(key=lambda x: x[1], reverse=True) for label, t in candidates: bar = "█" * min(int(t / 5), 24) total_pipeline = (timings.get("store_reply") or 0) + (timings.get("store_memory") or 0) pct = f" {t/total_pipeline*100:4.0f}%" if total_pipeline > 0 else "" print(f" {label} {t:6.1f}s {bar}{pct}") print() # ── 6. Memory benchmark ─────────────────────────────────────────────────────── if _run_bench: _mem_name = random.choice(["Alice", "Bruno", "Camille", "Diego", "Elena", "Farid", "Greta", "Hiroshi", "Irina", "Jonas"]) _mem_city = random.choice(["Tokyo", "Berlin", "Cairo", "Sydney", "Oslo", "Nairobi", "Lisbon", "Seoul", "Montreal", "Bangkok"]) _mem_allergy = random.choice(["nuts", "gluten", "dairy", "shellfish", "eggs"]) _mem_job = random.choice([ ("software engineer", "startup"), ("data scientist", "research lab"), ("product manager", "tech company"), ("DevOps engineer", "cloud provider"), ]) _mem_lang = random.choice(["Python", "Rust", "Go", "TypeScript", "Kotlin"]) _mem_pet_name = random.choice(["Whiskers", "Biscuit", "Mango", "Pebble", "Shadow", "Noodle", "Cheddar", "Cosmo", "Pippin", "Ziggy"]) print(f"\n[{INFO}] 6. Memory benchmark") print(f" name={_mem_name} city={_mem_city} allergy={_mem_allergy} " f"job={_mem_job[0]}@{_mem_job[1]} lang={_mem_lang} pet={_mem_pet_name}") print(f" Storing 5 facts, then querying with 10 recall questions") print(f" Chat ID: {CHAT_ID}") print() # Wipe collection and restart openmemory for a clean slate try: req = urllib.request.Request(f"{QDRANT}/collections/adolf_memories", method="DELETE") with urllib.request.urlopen(req, timeout=5): pass print(f" [{INFO}] Wiped adolf_memories collection") except Exception as e: print(f" [{WARN}] Could not wipe collection: {e}") try: subprocess.run( ["docker", "compose", "-f", COMPOSE_FILE, "restart", "openmemory"], capture_output=True, timeout=30, ) time.sleep(6) print(f" [{INFO}] Restarted openmemory — fresh collection ready") except Exception as e: print(f" [{WARN}] Could not restart openmemory: {e}") MEMORY_FACTS = [ f"My name is {_mem_name} and I live in {_mem_city}", f"I prefer vegetarian food and I'm allergic to {_mem_allergy}", f"I work as a {_mem_job[0]} at a {_mem_job[1]}", f"My favorite programming language is {_mem_lang}", f"I have a cat named {_mem_pet_name}", ] MEMORY_RECALLS = [ ("What is my name?", [_mem_name.lower()]), ("Where do I live?", [_mem_city.lower()]), ("Do I have any food allergies?", [_mem_allergy.lower()]), ("What is my job?", [_mem_job[0].split()[0].lower()]), ("What programming language do I prefer?", [_mem_lang.lower()]), ("Do I have any pets?", [_mem_pet_name.lower()]), ("Am I vegetarian or do I eat meat?", ["vegetarian"]), ("What city am I in?", [_mem_city.lower()]), ("Tell me what you know about me", [_mem_name.lower(), _mem_city.lower()]), ("What's the name of my pet?", [_mem_pet_name.lower()]), ] STORE_TIMEOUT = 180 RECALL_TIMEOUT = 180 print(f" Storing {len(MEMORY_FACTS)} facts...") store_ok = 0 for i, fact in enumerate(MEMORY_FACTS, 1): print(f" [mem-store-{i:02d}] {fact!r}") try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": fact, "chat_id": CHAT_ID}, timeout=5) if status != 202: print(f" → [{FAIL}] POST returned {status}") continue except Exception as e: print(f" → [{FAIL}] POST error: {e}") continue found = wait_for(f"mem-store-{i:02d}", fact, timeout_s=STORE_TIMEOUT, need_memory=True) if found: store_ok += 1 print(f" → [{PASS}] stored tier={found['tier']} mem={found['memory_s']}s") else: print(f" → [{FAIL}] timeout") report(results, f"All memory facts stored ({store_ok}/{len(MEMORY_FACTS)})", store_ok == len(MEMORY_FACTS)) # Wait for async extraction to settle print(f"\n Waiting for memory extraction to settle (up to 60s)...") _prev_count = -1 _stable_ticks = 0 _cur_count = 0 for _ in range(30): time.sleep(2) try: _, body = get(f"{QDRANT}/collections/adolf_memories") _cur_count = json.loads(body).get("result", {}).get("points_count", 0) except Exception: _cur_count = _prev_count if _cur_count == _prev_count: _stable_ticks += 1 if _stable_ticks >= 3: break else: _stable_ticks = 0 _prev_count = _cur_count print(f" Memory settled: {_cur_count} points in Qdrant") print(f"\n Querying with {len(MEMORY_RECALLS)} recall questions...") recall_results = [] for i, (question, keywords) in enumerate(MEMORY_RECALLS, 1): print(f" [mem-recall-{i:02d}] {question!r}") try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": question, "chat_id": CHAT_ID}, timeout=5) if status != 202: print(f" → [{FAIL}] POST returned {status}") recall_results.append((question, keywords, None, False)) continue except Exception as e: print(f" → [{FAIL}] POST error: {e}") recall_results.append((question, keywords, None, False)) continue t_start = time.monotonic() found = None while time.monotonic() - t_start < RECALL_TIMEOUT: since = int(time.monotonic() - t_start) + 30 lines = fetch_logs(since_s=since) found = parse_run_block(lines, question) if found: break time.sleep(2) if not found: print(f" → [{FAIL}] timeout") recall_results.append((question, keywords, None, False)) continue reply_text = (found.get("reply_text") or "").lower() hit_keywords = [kw for kw in keywords if kw.lower() in reply_text] passed = len(hit_keywords) == len(keywords) tag_str = PASS if passed else WARN missing = [kw for kw in keywords if kw.lower() not in reply_text] detail = f"tier={found['tier']} lat={found['reply_total']:.1f}s" if missing: detail += f" missing keywords: {missing}" print(f" → [{tag_str}] {detail}") recall_results.append((question, keywords, found.get("reply_text"), passed)) time.sleep(1) print(f"\n {'#':<4} {'Pass':<5} {'Question':<45} {'Keywords'}") print(f" {'─'*4} {'─'*5} {'─'*45} {'─'*30}") for idx, (q, kws, reply, ok) in enumerate(recall_results, 1): ok_str = "✓" if ok else "✗" print(f" {ok_str} {idx:<3} {'yes' if ok else 'no':<5} {q[:45]:<45} {kws}") recall_pass = sum(1 for _, _, _, ok in recall_results if ok) total_recall = len(recall_results) print(f"\n Memory recall score: {recall_pass}/{total_recall}") report(results, f"Memory recall ({recall_pass}/{total_recall} keywords found)", recall_pass == total_recall, f"{recall_pass}/{total_recall} questions had all expected keywords in reply") # ── 7. Deduplication test ───────────────────────────────────────────────────── if _run_dedup: print(f"\n[{INFO}] 7. Memory deduplication test") print(f" Sends the same fact twice — Qdrant point count must not increase by 2") print(f" Chat ID: {CHAT_ID}") print() DEDUP_TIMEOUT = 120 _dedup_fact = f"My lucky number is {random.randint(1000, 9999)}" print(f" Fact: {_dedup_fact!r}") pts_before = qdrant_count() print(f" Qdrant points before: {pts_before}") print(f" [dedup-1] sending fact (first time)") found1 = None try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5) if status != 202: report(results, "Dedup: first POST accepted", False, f"status={status}") else: found1 = wait_for("dedup-1", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True) if found1: print(f" [dedup-1] stored tier={found1['tier']} mem={found1['memory_s']}s") else: print(f" [dedup-1] timeout") except Exception as e: report(results, "Dedup: first POST accepted", False, str(e)) pts_after_first = qdrant_count() new_first = pts_after_first - pts_before print(f" Qdrant after first send: {pts_before} → {pts_after_first} (+{new_first})") print(f" [dedup-2] sending same fact (second time)") try: status, _ = post_json(f"{DEEPAGENTS}/chat", {"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5) if status != 202: report(results, "Dedup: second POST accepted", False, f"status={status}") else: found2 = wait_for("dedup-2", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True) if found2: print(f" [dedup-2] stored tier={found2['tier']} mem={found2['memory_s']}s") else: print(f" [dedup-2] timeout") except Exception as e: report(results, "Dedup: second POST accepted", False, str(e)) pts_after_second = qdrant_count() new_second = pts_after_second - pts_after_first print(f" Qdrant after second send: {pts_after_first} → {pts_after_second} (+{new_second})") dedup_ok = new_second <= new_first report(results, "Deduplication: second identical fact not added to Qdrant", dedup_ok, f"first send: +{new_first} pts, second send: +{new_second} pts (want second ≤ first)") # ── summary ─────────────────────────────────────────────────────────────────── print_summary(results) sys.exit(0 if all(ok for _, ok in results) else 1)