- Remove BIFROST constant and fetch_bifrost_logs() from common.py - Add LITELLM constant (localhost:4000) - Replace test_memory.py test 4 (Bifrost pass-through) with LiteLLM health check Fixes #5 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
438 lines
19 KiB
Python
438 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Adolf memory integration tests.
|
||
|
||
Tests:
|
||
1. Name store — POST "remember that your name is <RandomName>"
|
||
2. Qdrant point — verifies a new vector was written after store
|
||
3. Name recall — POST "what is your name?" → reply must contain <RandomName>
|
||
4. LiteLLM — verifies LiteLLM proxy is reachable (replaced Bifrost)
|
||
5. Timing profile — breakdown of store and recall latencies
|
||
6. Memory benchmark — store 5 personal facts, recall with 10 questions
|
||
7. Dedup test — same fact stored twice must not grow Qdrant by 2 points
|
||
|
||
Usage:
|
||
python3 test_memory.py [--chat-id CHAT_ID] [--name-only] [--bench-only] [--dedup-only]
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import random
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
import urllib.request
|
||
|
||
from common import (
|
||
DEEPAGENTS, LITELLM, QDRANT, COMPOSE_FILE, DEFAULT_CHAT_ID,
|
||
NAMES,
|
||
INFO, PASS, FAIL, WARN,
|
||
report, print_summary, tf,
|
||
get, post_json, qdrant_count, fetch_logs,
|
||
parse_run_block, wait_for,
|
||
)
|
||
|
||
# ── args ──────────────────────────────────────────────────────────────────────
|
||
parser = argparse.ArgumentParser(description="Adolf memory integration tests")
|
||
parser.add_argument("--chat-id", default=DEFAULT_CHAT_ID)
|
||
parser.add_argument("--name-only", action="store_true", help="Run only the name store/recall test")
|
||
parser.add_argument("--bench-only", action="store_true", help="Run only the memory benchmark")
|
||
parser.add_argument("--dedup-only", action="store_true", help="Run only the deduplication test")
|
||
args = parser.parse_args()
|
||
|
||
CHAT_ID = args.chat_id
|
||
_only = args.name_only or args.bench_only or args.dedup_only
|
||
_run_name = not _only or args.name_only
|
||
_run_bench = not _only or args.bench_only
|
||
_run_dedup = not _only or args.dedup_only
|
||
|
||
results = []
|
||
timings = {}
|
||
|
||
random_name = random.choice(NAMES)
|
||
TEST_CHAT_ID = f"{CHAT_ID}-{random_name.lower()}"
|
||
|
||
if _run_name:
|
||
print(f"\n Test name : \033[1m{random_name}\033[0m")
|
||
print(f" Chat ID : {TEST_CHAT_ID}")
|
||
|
||
|
||
# ── 1–4. Name store / recall pipeline ────────────────────────────────────────
|
||
if _run_name:
|
||
print(f"\n[{INFO}] 1. Name store / recall pipeline")
|
||
|
||
store_msg = f"remember that your name is {random_name}"
|
||
recall_msg = "what is your name?"
|
||
|
||
# Clear memories so each run starts clean
|
||
try:
|
||
post_json(f"{QDRANT}/collections/adolf_memories/points/delete",
|
||
{"filter": {}}, timeout=5)
|
||
except Exception:
|
||
pass
|
||
|
||
pts_before = qdrant_count()
|
||
print(f" Qdrant points before: {pts_before}")
|
||
|
||
# ── 1. Store ──────────────────────────────────────────────────────────────
|
||
print(f"\n [store] '{store_msg}'")
|
||
t_store = time.monotonic()
|
||
|
||
try:
|
||
status, _ = post_json(f"{DEEPAGENTS}/chat",
|
||
{"message": store_msg, "chat_id": TEST_CHAT_ID}, timeout=5)
|
||
t_accept = time.monotonic() - t_store
|
||
report(results, "POST /chat (store) returns 202 immediately",
|
||
status == 202 and t_accept < 1, f"status={status}, t={t_accept:.3f}s")
|
||
timings["store_http_accept"] = t_accept
|
||
except Exception as e:
|
||
report(results, "POST /chat (store)", False, str(e))
|
||
print_summary(results)
|
||
sys.exit(1)
|
||
|
||
store = wait_for("store", store_msg, timeout_s=220, need_memory=True)
|
||
|
||
if store:
|
||
timings.update({
|
||
"store_llm": store["llm"],
|
||
"store_send": store["send"],
|
||
"store_reply": store["reply_total"],
|
||
"store_memory": store["memory_s"],
|
||
})
|
||
report(results, "Agent replied to store message", True,
|
||
f"{store['reply_total']:.1f}s total llm={store['llm']:.1f}s "
|
||
f"send={store['send']:.1f}s tier={store['tier']}")
|
||
if store["memory_s"] is not None:
|
||
report(results, "Memory stored without error", True, f"{store['memory_s']:.1f}s")
|
||
elif store["memory_error"]:
|
||
report(results, "Memory stored without error", False, "error in [memory] log")
|
||
else:
|
||
report(results, "Memory stored without error", False, "not found in logs")
|
||
print(f" Store reply: {store['reply_text']!r}")
|
||
else:
|
||
report(results, "Agent replied to store message", False, "timeout")
|
||
report(results, "Memory stored without error", False, "timeout")
|
||
print_summary(results)
|
||
sys.exit(1)
|
||
|
||
# ── 2. Qdrant point check ─────────────────────────────────────────────────
|
||
pts_after = qdrant_count()
|
||
new_pts = pts_after - pts_before
|
||
report(results, "New memory point(s) added to Qdrant", new_pts > 0,
|
||
f"{pts_before} → {pts_after} (+{new_pts})")
|
||
timings["qdrant_new_points"] = new_pts
|
||
|
||
# ── 3. Recall ─────────────────────────────────────────────────────────────
|
||
print(f"\n [recall] '{recall_msg}'")
|
||
t_recall = time.monotonic()
|
||
|
||
try:
|
||
status, _ = post_json(f"{DEEPAGENTS}/chat",
|
||
{"message": recall_msg, "chat_id": TEST_CHAT_ID}, timeout=5)
|
||
t_accept2 = time.monotonic() - t_recall
|
||
report(results, "POST /chat (recall) returns 202 immediately",
|
||
status == 202 and t_accept2 < 1, f"status={status}, t={t_accept2:.3f}s")
|
||
timings["recall_http_accept"] = t_accept2
|
||
except Exception as e:
|
||
report(results, "POST /chat (recall)", False, str(e))
|
||
|
||
recall = wait_for("recall", recall_msg, timeout_s=160, need_memory=False)
|
||
|
||
if recall:
|
||
timings.update({
|
||
"recall_llm": recall["llm"],
|
||
"recall_send": recall["send"],
|
||
"recall_reply": recall["reply_total"],
|
||
})
|
||
report(results, "Agent replied to recall message", True,
|
||
f"{recall['reply_total']:.1f}s total llm={recall['llm']:.1f}s "
|
||
f"send={recall['send']:.1f}s tier={recall['tier']}")
|
||
reply_text = recall["reply_text"] or ""
|
||
name_in_reply = random_name.lower() in reply_text.lower()
|
||
report(results, f"Reply contains '{random_name}'", name_in_reply,
|
||
f"reply: {reply_text[:120]!r}")
|
||
else:
|
||
report(results, "Agent replied to recall message", False, "timeout")
|
||
report(results, f"Reply contains '{random_name}'", False, "no reply")
|
||
|
||
# ── 4. LiteLLM proxy reachable (replaced Bifrost) ─────────────────────────
|
||
try:
|
||
status, _ = get(f"{LITELLM}/health", timeout=5)
|
||
litellm_ok = status == 200
|
||
except Exception:
|
||
litellm_ok = False
|
||
report(results, "LiteLLM proxy reachable", litellm_ok)
|
||
|
||
# ── 5. Timing profile ─────────────────────────────────────────────────────
|
||
print(f"\n[{INFO}] 5. Timing profile")
|
||
W = 36
|
||
print(f"\n {'Stage':<{W}} {'Time':>8}")
|
||
print(f" {'─'*W} {'─'*8}")
|
||
|
||
for label, key in [
|
||
("[GPU] HTTP accept — store turn", "store_http_accept"),
|
||
("[GPU] qwen3:Xb inference — store turn", "store_llm"),
|
||
("[GPU] Telegram send — store turn", "store_send"),
|
||
("[GPU] Total reply latency — store", "store_reply"),
|
||
("[GPU] qwen2.5:1.5b+embed — async mem", "store_memory"),
|
||
]:
|
||
print(f" {label:<{W}} {tf(timings.get(key)):>8}")
|
||
|
||
print(f" {'─'*W} {'─'*8}")
|
||
|
||
for label, key in [
|
||
("[GPU] HTTP accept — recall turn", "recall_http_accept"),
|
||
("[GPU] qwen3:Xb inference — recall", "recall_llm"),
|
||
("[GPU] Telegram send — recall turn", "recall_send"),
|
||
("[GPU] Total reply latency — recall", "recall_reply"),
|
||
]:
|
||
print(f" {label:<{W}} {tf(timings.get(key)):>8}")
|
||
|
||
print(f"\n Bottleneck analysis (each █ ≈ 5s):")
|
||
print(f" {'─'*(W+12)}")
|
||
candidates = [
|
||
("[GPU] qwen3:Xb — store reply ", timings.get("store_llm") or 0),
|
||
("[GPU] qwen3:Xb — recall reply", timings.get("recall_llm") or 0),
|
||
("[GPU] qwen2.5:1.5b+embed (async)", timings.get("store_memory") or 0),
|
||
]
|
||
candidates.sort(key=lambda x: x[1], reverse=True)
|
||
for label, t in candidates:
|
||
bar = "█" * min(int(t / 5), 24)
|
||
total_pipeline = (timings.get("store_reply") or 0) + (timings.get("store_memory") or 0)
|
||
pct = f" {t/total_pipeline*100:4.0f}%" if total_pipeline > 0 else ""
|
||
print(f" {label} {t:6.1f}s {bar}{pct}")
|
||
print()
|
||
|
||
|
||
# ── 6. Memory benchmark ───────────────────────────────────────────────────────
|
||
if _run_bench:
|
||
_mem_name = random.choice(["Alice", "Bruno", "Camille", "Diego", "Elena",
|
||
"Farid", "Greta", "Hiroshi", "Irina", "Jonas"])
|
||
_mem_city = random.choice(["Tokyo", "Berlin", "Cairo", "Sydney", "Oslo",
|
||
"Nairobi", "Lisbon", "Seoul", "Montreal", "Bangkok"])
|
||
_mem_allergy = random.choice(["nuts", "gluten", "dairy", "shellfish", "eggs"])
|
||
_mem_job = random.choice([
|
||
("software engineer", "startup"),
|
||
("data scientist", "research lab"),
|
||
("product manager", "tech company"),
|
||
("DevOps engineer", "cloud provider"),
|
||
])
|
||
_mem_lang = random.choice(["Python", "Rust", "Go", "TypeScript", "Kotlin"])
|
||
_mem_pet_name = random.choice(["Whiskers", "Biscuit", "Mango", "Pebble", "Shadow",
|
||
"Noodle", "Cheddar", "Cosmo", "Pippin", "Ziggy"])
|
||
|
||
print(f"\n[{INFO}] 6. Memory benchmark")
|
||
print(f" name={_mem_name} city={_mem_city} allergy={_mem_allergy} "
|
||
f"job={_mem_job[0]}@{_mem_job[1]} lang={_mem_lang} pet={_mem_pet_name}")
|
||
print(f" Storing 5 facts, then querying with 10 recall questions")
|
||
print(f" Chat ID: {CHAT_ID}")
|
||
print()
|
||
|
||
# Wipe collection and restart openmemory for a clean slate
|
||
try:
|
||
req = urllib.request.Request(f"{QDRANT}/collections/adolf_memories", method="DELETE")
|
||
with urllib.request.urlopen(req, timeout=5):
|
||
pass
|
||
print(f" [{INFO}] Wiped adolf_memories collection")
|
||
except Exception as e:
|
||
print(f" [{WARN}] Could not wipe collection: {e}")
|
||
|
||
try:
|
||
subprocess.run(
|
||
["docker", "compose", "-f", COMPOSE_FILE, "restart", "openmemory"],
|
||
capture_output=True, timeout=30,
|
||
)
|
||
time.sleep(6)
|
||
print(f" [{INFO}] Restarted openmemory — fresh collection ready")
|
||
except Exception as e:
|
||
print(f" [{WARN}] Could not restart openmemory: {e}")
|
||
|
||
MEMORY_FACTS = [
|
||
f"My name is {_mem_name} and I live in {_mem_city}",
|
||
f"I prefer vegetarian food and I'm allergic to {_mem_allergy}",
|
||
f"I work as a {_mem_job[0]} at a {_mem_job[1]}",
|
||
f"My favorite programming language is {_mem_lang}",
|
||
f"I have a cat named {_mem_pet_name}",
|
||
]
|
||
|
||
MEMORY_RECALLS = [
|
||
("What is my name?", [_mem_name.lower()]),
|
||
("Where do I live?", [_mem_city.lower()]),
|
||
("Do I have any food allergies?", [_mem_allergy.lower()]),
|
||
("What is my job?", [_mem_job[0].split()[0].lower()]),
|
||
("What programming language do I prefer?", [_mem_lang.lower()]),
|
||
("Do I have any pets?", [_mem_pet_name.lower()]),
|
||
("Am I vegetarian or do I eat meat?", ["vegetarian"]),
|
||
("What city am I in?", [_mem_city.lower()]),
|
||
("Tell me what you know about me", [_mem_name.lower(), _mem_city.lower()]),
|
||
("What's the name of my pet?", [_mem_pet_name.lower()]),
|
||
]
|
||
|
||
STORE_TIMEOUT = 180
|
||
RECALL_TIMEOUT = 180
|
||
|
||
print(f" Storing {len(MEMORY_FACTS)} facts...")
|
||
store_ok = 0
|
||
for i, fact in enumerate(MEMORY_FACTS, 1):
|
||
print(f" [mem-store-{i:02d}] {fact!r}")
|
||
try:
|
||
status, _ = post_json(f"{DEEPAGENTS}/chat",
|
||
{"message": fact, "chat_id": CHAT_ID}, timeout=5)
|
||
if status != 202:
|
||
print(f" → [{FAIL}] POST returned {status}")
|
||
continue
|
||
except Exception as e:
|
||
print(f" → [{FAIL}] POST error: {e}")
|
||
continue
|
||
|
||
found = wait_for(f"mem-store-{i:02d}", fact, timeout_s=STORE_TIMEOUT, need_memory=True)
|
||
if found:
|
||
store_ok += 1
|
||
print(f" → [{PASS}] stored tier={found['tier']} mem={found['memory_s']}s")
|
||
else:
|
||
print(f" → [{FAIL}] timeout")
|
||
|
||
report(results, f"All memory facts stored ({store_ok}/{len(MEMORY_FACTS)})",
|
||
store_ok == len(MEMORY_FACTS))
|
||
|
||
# Wait for async extraction to settle
|
||
print(f"\n Waiting for memory extraction to settle (up to 60s)...")
|
||
_prev_count = -1
|
||
_stable_ticks = 0
|
||
_cur_count = 0
|
||
for _ in range(30):
|
||
time.sleep(2)
|
||
try:
|
||
_, body = get(f"{QDRANT}/collections/adolf_memories")
|
||
_cur_count = json.loads(body).get("result", {}).get("points_count", 0)
|
||
except Exception:
|
||
_cur_count = _prev_count
|
||
if _cur_count == _prev_count:
|
||
_stable_ticks += 1
|
||
if _stable_ticks >= 3:
|
||
break
|
||
else:
|
||
_stable_ticks = 0
|
||
_prev_count = _cur_count
|
||
print(f" Memory settled: {_cur_count} points in Qdrant")
|
||
|
||
print(f"\n Querying with {len(MEMORY_RECALLS)} recall questions...")
|
||
recall_results = []
|
||
|
||
for i, (question, keywords) in enumerate(MEMORY_RECALLS, 1):
|
||
print(f" [mem-recall-{i:02d}] {question!r}")
|
||
try:
|
||
status, _ = post_json(f"{DEEPAGENTS}/chat",
|
||
{"message": question, "chat_id": CHAT_ID}, timeout=5)
|
||
if status != 202:
|
||
print(f" → [{FAIL}] POST returned {status}")
|
||
recall_results.append((question, keywords, None, False))
|
||
continue
|
||
except Exception as e:
|
||
print(f" → [{FAIL}] POST error: {e}")
|
||
recall_results.append((question, keywords, None, False))
|
||
continue
|
||
|
||
t_start = time.monotonic()
|
||
found = None
|
||
while time.monotonic() - t_start < RECALL_TIMEOUT:
|
||
since = int(time.monotonic() - t_start) + 30
|
||
lines = fetch_logs(since_s=since)
|
||
found = parse_run_block(lines, question)
|
||
if found:
|
||
break
|
||
time.sleep(2)
|
||
|
||
if not found:
|
||
print(f" → [{FAIL}] timeout")
|
||
recall_results.append((question, keywords, None, False))
|
||
continue
|
||
|
||
reply_text = (found.get("reply_text") or "").lower()
|
||
hit_keywords = [kw for kw in keywords if kw.lower() in reply_text]
|
||
passed = len(hit_keywords) == len(keywords)
|
||
tag_str = PASS if passed else WARN
|
||
missing = [kw for kw in keywords if kw.lower() not in reply_text]
|
||
detail = f"tier={found['tier']} lat={found['reply_total']:.1f}s"
|
||
if missing:
|
||
detail += f" missing keywords: {missing}"
|
||
print(f" → [{tag_str}] {detail}")
|
||
recall_results.append((question, keywords, found.get("reply_text"), passed))
|
||
time.sleep(1)
|
||
|
||
print(f"\n {'#':<4} {'Pass':<5} {'Question':<45} {'Keywords'}")
|
||
print(f" {'─'*4} {'─'*5} {'─'*45} {'─'*30}")
|
||
for idx, (q, kws, reply, ok) in enumerate(recall_results, 1):
|
||
ok_str = "✓" if ok else "✗"
|
||
print(f" {ok_str} {idx:<3} {'yes' if ok else 'no':<5} {q[:45]:<45} {kws}")
|
||
|
||
recall_pass = sum(1 for _, _, _, ok in recall_results if ok)
|
||
total_recall = len(recall_results)
|
||
print(f"\n Memory recall score: {recall_pass}/{total_recall}")
|
||
report(results, f"Memory recall ({recall_pass}/{total_recall} keywords found)",
|
||
recall_pass == total_recall,
|
||
f"{recall_pass}/{total_recall} questions had all expected keywords in reply")
|
||
|
||
|
||
# ── 7. Deduplication test ─────────────────────────────────────────────────────
|
||
if _run_dedup:
|
||
print(f"\n[{INFO}] 7. Memory deduplication test")
|
||
print(f" Sends the same fact twice — Qdrant point count must not increase by 2")
|
||
print(f" Chat ID: {CHAT_ID}")
|
||
print()
|
||
|
||
DEDUP_TIMEOUT = 120
|
||
_dedup_fact = f"My lucky number is {random.randint(1000, 9999)}"
|
||
print(f" Fact: {_dedup_fact!r}")
|
||
|
||
pts_before = qdrant_count()
|
||
print(f" Qdrant points before: {pts_before}")
|
||
|
||
print(f" [dedup-1] sending fact (first time)")
|
||
found1 = None
|
||
try:
|
||
status, _ = post_json(f"{DEEPAGENTS}/chat",
|
||
{"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5)
|
||
if status != 202:
|
||
report(results, "Dedup: first POST accepted", False, f"status={status}")
|
||
else:
|
||
found1 = wait_for("dedup-1", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True)
|
||
if found1:
|
||
print(f" [dedup-1] stored tier={found1['tier']} mem={found1['memory_s']}s")
|
||
else:
|
||
print(f" [dedup-1] timeout")
|
||
except Exception as e:
|
||
report(results, "Dedup: first POST accepted", False, str(e))
|
||
|
||
pts_after_first = qdrant_count()
|
||
new_first = pts_after_first - pts_before
|
||
print(f" Qdrant after first send: {pts_before} → {pts_after_first} (+{new_first})")
|
||
|
||
print(f" [dedup-2] sending same fact (second time)")
|
||
try:
|
||
status, _ = post_json(f"{DEEPAGENTS}/chat",
|
||
{"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5)
|
||
if status != 202:
|
||
report(results, "Dedup: second POST accepted", False, f"status={status}")
|
||
else:
|
||
found2 = wait_for("dedup-2", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True)
|
||
if found2:
|
||
print(f" [dedup-2] stored tier={found2['tier']} mem={found2['memory_s']}s")
|
||
else:
|
||
print(f" [dedup-2] timeout")
|
||
except Exception as e:
|
||
report(results, "Dedup: second POST accepted", False, str(e))
|
||
|
||
pts_after_second = qdrant_count()
|
||
new_second = pts_after_second - pts_after_first
|
||
print(f" Qdrant after second send: {pts_after_first} → {pts_after_second} (+{new_second})")
|
||
|
||
dedup_ok = new_second <= new_first
|
||
report(results, "Deduplication: second identical fact not added to Qdrant", dedup_ok,
|
||
f"first send: +{new_first} pts, second send: +{new_second} pts (want second ≤ first)")
|
||
|
||
|
||
# ── summary ───────────────────────────────────────────────────────────────────
|
||
print_summary(results)
|
||
sys.exit(0 if all(ok for _, ok in results) else 1)
|