From f6714f9392c8df610a97654718d86ae15d77de43 Mon Sep 17 00:00:00 2001 From: Alvis Date: Mon, 23 Feb 2026 04:52:40 +0000 Subject: [PATCH] Add Adolf architecture doc and integration test script - ARCHITECTURE.md: comprehensive pipeline description (copied from Gitea wiki) - test_pipeline.py: tests all services, memory, async timing, and recall Co-Authored-By: Claude Sonnet 4.6 --- adolf/ARCHITECTURE.md | 92 ++++++++++++ adolf/test_pipeline.py | 316 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 408 insertions(+) create mode 100644 adolf/ARCHITECTURE.md create mode 100644 adolf/test_pipeline.py diff --git a/adolf/ARCHITECTURE.md b/adolf/ARCHITECTURE.md new file mode 100644 index 0000000..43f31cc --- /dev/null +++ b/adolf/ARCHITECTURE.md @@ -0,0 +1,92 @@ +# Adolf + +Persistent AI assistant reachable via Telegram. GPU-accelerated inference with long-term memory and web search. + +## Architecture + +``` +Telegram user + ↕ (long-polling) +[grammy] Node.js — port 3001 + - grammY bot polls Telegram + - on message: fire-and-forget POST /chat to deepagents + - exposes MCP SSE server: tool send_telegram_message(chat_id, text) + ↕ fire-and-forget HTTP ↕ MCP SSE tool call +[deepagents] Python FastAPI — port 8000 + - POST /chat → 202 Accepted immediately + - background task: run LangGraph react agent + - LLM: qwen3:8b via Ollama GPU (host port 11436) + - tools: search_memory, get_all_memories, web_search + - after reply: async fire-and-forget → store memory on CPU + ↕ MCP SSE ↕ HTTP (SearXNG) +[openmemory] Python + mem0 — port 8765 [SearXNG — port 11437] + - MCP tools: add_memory, search_memory, get_all_memories + - mem0 backend: Qdrant (port 6333) + CPU Ollama (port 11435) + - embedder: nomic-embed-text (768 dims) + - extractor: gemma3:1b + - collection: adolf_memories +``` + +## Queuing and Concurrency + +Two semaphores prevent resource contention: + +| Semaphore | Guards | Notes | +|-----------|--------|-------| +| `_reply_semaphore(1)` | GPU Ollama (qwen3:8b) | One LLM inference at a time | +| `_memory_semaphore(1)` | CPU Ollama (gemma3:1b) | One memory store at a time | + +**Reply-first pipeline:** +1. User message arrives via Telegram → Grammy forwards to deepagents (fire-and-forget) +2. Deepagents queues behind `_reply_semaphore`, runs agent, sends reply via Grammy MCP tool +3. After reply is sent, `asyncio.create_task` fires `store_memory_async` in background +4. Memory task queues behind `_memory_semaphore`, calls `add_memory` on openmemory +5. openmemory uses CPU Ollama: embedding (~0.3s) + extraction (~1.6s) → stored in Qdrant + +Reply latency: ~10–18s (GPU qwen3:8b inference + tool calls). +Memory latency: ~5–16s (runs async, never blocks replies). + +## External Services (from openai/ stack) + +| Service | Host Port | Role | +|---------|-----------|------| +| Ollama GPU | 11436 | Main LLM (qwen3:8b) | +| Ollama CPU | 11435 | Memory embedding + extraction | +| Qdrant | 6333 | Vector store for memories | +| SearXNG | 11437 | Web search | + +## Compose Stack + +Config: `agap_git/adolf/docker-compose.yml` + +```bash +cd agap_git/adolf +docker compose up -d +``` + +Requires `TELEGRAM_BOT_TOKEN` in `adolf/.env`. + +## Memory + +- Stored per `chat_id` (Telegram user ID) as `user_id` in mem0 +- Semantic search via Qdrant (cosine similarity, 768-dim nomic-embed-text vectors) +- mem0 uses gemma3:1b to extract structured facts before embedding +- Collection: `adolf_memories` in Qdrant + +## Files + +``` +adolf/ +├── docker-compose.yml Services: deepagents, openmemory, grammy +├── Dockerfile deepagents container (Python 3.12) +├── agent.py FastAPI + LangGraph react agent +├── .env TELEGRAM_BOT_TOKEN (not committed) +├── openmemory/ +│ ├── server.py FastMCP + mem0 MCP tools +│ ├── requirements.txt +│ └── Dockerfile +└── grammy/ + ├── bot.mjs grammY bot + MCP SSE server + ├── package.json + └── Dockerfile +``` diff --git a/adolf/test_pipeline.py b/adolf/test_pipeline.py new file mode 100644 index 0000000..b541c65 --- /dev/null +++ b/adolf/test_pipeline.py @@ -0,0 +1,316 @@ +#!/usr/bin/env python3 +""" +Adolf pipeline integration test. + +Tests: + 1. Service health (deepagents, openmemory, grammy MCP SSE) + 2. GPU Ollama reachability and model availability + 3. CPU Ollama reachability and model availability + 4. Qdrant reachability and adolf_memories collection + 5. SearXNG reachability and JSON results + 6. Full chat pipeline — POST /chat returns 202 immediately + 7. Async memory storage — memories appear in Qdrant after reply + 8. Memory recall — agent retrieves stored facts on next query + 9. Async timing — reply logged before memory stored (from deepagents logs) + +Usage: + python3 test_pipeline.py [--chat-id CHAT_ID] + +Does NOT send real Telegram messages — calls deepagents /chat directly and +reads Qdrant to verify memory. Grammy delivery is confirmed via MCP tool +visible in deepagents logs ('[agent] replied in Xs ... send=Ys'). + +Known limitation: gemma3:1b (CPU extraction model) may abstract or +deduplicate memories rather than storing raw text verbatim. +""" + +import argparse +import http.client +import json +import random +import sys +import time +import urllib.error +import urllib.request + +# ── config ──────────────────────────────────────────────────────────────────── +DEEPAGENTS = "http://localhost:8000" +OPENMEMORY = "http://localhost:8765" +GRAMMY_HOST = "localhost" +GRAMMY_PORT = 3001 +OLLAMA_GPU = "http://localhost:11436" +OLLAMA_CPU = "http://localhost:11435" +QDRANT = "http://localhost:6333" +SEARXNG = "http://localhost:11437" + +DEFAULT_CHAT_ID = "346967270" + +PASS = "\033[32mPASS\033[0m" +FAIL = "\033[31mFAIL\033[0m" +INFO = "\033[36mINFO\033[0m" + +results = [] + + +def report(name, ok, detail=""): + tag = PASS if ok else FAIL + line = f" [{tag}] {name}" + if detail: + line += f" — {detail}" + print(line) + results.append((name, ok)) + + +def get(url, timeout=5): + req = urllib.request.Request(url) + with urllib.request.urlopen(req, timeout=timeout) as r: + return r.status, r.read().decode() + + +def post_json(url, payload, timeout=30): + data = json.dumps(payload).encode() + req = urllib.request.Request( + url, data=data, + headers={"Content-Type": "application/json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=timeout) as r: + return r.status, json.loads(r.read().decode()) + + +def check_sse(host, port, path): + """ + SSE endpoints stream indefinitely — urlopen would hang waiting for body. + Use http.client directly to read just the response status line and headers. + """ + try: + conn = http.client.HTTPConnection(host, port, timeout=5) + conn.request("GET", path, headers={"Accept": "text/event-stream"}) + r = conn.getresponse() + ok = r.status == 200 + conn.close() + return ok, f"HTTP {r.status}" + except Exception as e: + return False, str(e) + + +# ── 1. service health ───────────────────────────────────────────────────────── +print(f"\n[{INFO}] 1. Service health") + +try: + status, body = get(f"{DEEPAGENTS}/health") + data = json.loads(body) + ok = status == 200 and data.get("agent_ready") is True + report("deepagents /health — agent_ready", ok, f"agent_ready={data.get('agent_ready')}") +except Exception as e: + report("deepagents /health", False, str(e)) + +ok, detail = check_sse("localhost", 8765, "/sse") +report("openmemory /sse reachable (HTTP 200)", ok, detail) + +ok, detail = check_sse(GRAMMY_HOST, GRAMMY_PORT, "/sse") +report("grammy /sse reachable (HTTP 200)", ok, detail) + + +# ── 2. GPU Ollama ───────────────────────────────────────────────────────────── +print(f"\n[{INFO}] 2. GPU Ollama (port 11436)") + +try: + status, body = get(f"{OLLAMA_GPU}/api/tags") + models = [m["name"] for m in json.loads(body).get("models", [])] + has_qwen = any("qwen3" in m for m in models) + report("GPU Ollama reachable", True, f"models: {models}") + report("qwen3:8b present on GPU Ollama", has_qwen) +except Exception as e: + report("GPU Ollama reachable", False, str(e)) + report("qwen3:8b present on GPU Ollama", False, "skipped") + + +# ── 3. CPU Ollama ───────────────────────────────────────────────────────────── +print(f"\n[{INFO}] 3. CPU Ollama (port 11435)") + +try: + status, body = get(f"{OLLAMA_CPU}/api/tags") + models = [m["name"] for m in json.loads(body).get("models", [])] + has_embed = any("nomic-embed-text" in m for m in models) + has_gemma = any("gemma3:1b" in m for m in models) + report("CPU Ollama reachable", True, f"models: {models}") + report("nomic-embed-text present on CPU Ollama", has_embed) + report("gemma3:1b present on CPU Ollama", has_gemma) +except Exception as e: + report("CPU Ollama reachable", False, str(e)) + report("nomic-embed-text present on CPU Ollama", False, "skipped") + report("gemma3:1b present on CPU Ollama", False, "skipped") + + +# ── 4. Qdrant ───────────────────────────────────────────────────────────────── +print(f"\n[{INFO}] 4. Qdrant (port 6333)") + +try: + status, body = get(f"{QDRANT}/collections") + collections = [c["name"] for c in json.loads(body).get("result", {}).get("collections", [])] + has_col = "adolf_memories" in collections + report("Qdrant reachable", True, f"collections: {collections}") + report("adolf_memories collection exists", has_col) +except Exception as e: + report("Qdrant reachable", False, str(e)) + report("adolf_memories collection exists", False, "skipped") + +try: + status, body = get(f"{QDRANT}/collections/adolf_memories") + info = json.loads(body).get("result", {}) + dims = info.get("config", {}).get("params", {}).get("vectors", {}).get("size") + report("adolf_memories vector dims = 768", dims == 768, f"got {dims}") +except Exception as e: + report("adolf_memories collection info", False, str(e)) + + +# ── 5. SearXNG ──────────────────────────────────────────────────────────────── +print(f"\n[{INFO}] 5. SearXNG (port 11437)") + +try: + t0 = time.monotonic() + status, body = get(f"{SEARXNG}/search?q=test&format=json", timeout=15) + elapsed = time.monotonic() - t0 + data = json.loads(body) + n_results = len(data.get("results", [])) + report("SearXNG reachable + JSON format enabled", status == 200 and n_results > 0, + f"{n_results} results in {elapsed:.1f}s") + report("SearXNG response < 5s", elapsed < 5, f"{elapsed:.2f}s") +except Exception as e: + report("SearXNG reachable", False, str(e)) + report("SearXNG response < 5s", False, "skipped") + + +# ── 6. POST /chat returns 202 immediately ───────────────────────────────────── +print(f"\n[{INFO}] 6–8. Full pipeline (chat → reply → memory → recall)") +print(f" Using chat_id={DEFAULT_CHAT_ID}") + +marker_word = f"testword{random.randint(1000, 9999)}" +marker_msg = f"My test marker for this run is: {marker_word}. Please acknowledge." + +# Record point count before test so we can verify new points are added +try: + _, col_body = get(f"{QDRANT}/collections/adolf_memories") + points_before = json.loads(col_body).get("result", {}).get("points_count", 0) +except Exception: + points_before = 0 + +print(f"\n [send] '{marker_msg}'") +print(f" Qdrant points before: {points_before}") + +t_send = time.monotonic() +try: + status, resp = post_json(f"{DEEPAGENTS}/chat", + {"message": marker_msg, "chat_id": DEFAULT_CHAT_ID}, + timeout=5) + t_accepted = time.monotonic() - t_send + report("POST /chat returns 202 immediately (< 1s)", status == 202 and t_accepted < 1, + f"status={status}, t={t_accepted:.3f}s") +except Exception as e: + report("POST /chat returns 202 immediately", False, str(e)) + print(" Cannot continue pipeline tests.") + sys.exit(1) + + +# ── 7. Async memory storage ─────────────────────────────────────────────────── +# Wait long enough for: GPU reply (~20s) + async CPU memory store (~20s) = ~40s +print(f" Waiting 50s for agent reply + async memory store…") +for i in range(10): + time.sleep(5) + print(f" …{(i+1)*5}s", end="\r") +print() + +try: + _, col_body = get(f"{QDRANT}/collections/adolf_memories") + points_after = json.loads(col_body).get("result", {}).get("points_count", 0) + new_points = points_after - points_before + report("New memory point(s) added to Qdrant after reply", new_points > 0, + f"{points_before} → {points_after} (+{new_points})") +except Exception as e: + report("Qdrant points after reply", False, str(e)) + +# Inspect Qdrant payloads — the `data` field holds what mem0 stored +# Note: gemma3:1b may abstract/rewrite facts; raw marker_word may or may not appear +try: + _, scroll_body = post_json( + f"{QDRANT}/collections/adolf_memories/points/scroll", + {"limit": 50, "with_payload": True, "with_vector": False}, + timeout=10 + ) + points = scroll_body.get("result", {}).get("points", []) + all_data = [str(p.get("payload", {}).get("data", "")) for p in points] + marker_in_data = any(marker_word in d for d in all_data) + report( + f"Marker '{marker_word}' found verbatim in Qdrant payloads", + marker_in_data, + "(gemma3:1b may abstract facts — check logs if FAIL)" if not marker_in_data else "found" + ) + if not marker_in_data and all_data: + print(f" Most recent stored data: {all_data[-1][:120]!r}") +except Exception as e: + report("Qdrant payload inspection", False, str(e)) + + +# ── 8. Memory recall ────────────────────────────────────────────────────────── +recall_msg = f"What is the test marker word I just told you? (hint: it starts with 'testword')" +print(f"\n [recall] '{recall_msg}'") + +try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": recall_msg, "chat_id": DEFAULT_CHAT_ID}, + timeout=5) + report("Recall query accepted (202)", status == 202) +except Exception as e: + report("Recall query accepted", False, str(e)) + +print(f" Waiting 35s for recall reply (check Telegram for actual answer)…") +for i in range(7): + time.sleep(5) + print(f" …{(i+1)*5}s", end="\r") +print() +print(f" NOTE: Check Telegram — the bot should reply with '{marker_word}'.") +print(f" Check deepagents logs for: search_memory tool call and correct result.") + + +# ── 9. Async timing verification ────────────────────────────────────────────── +print(f"\n[{INFO}] 9. Async pipeline timing") + +# Verify two rapid POSTs both return 202 quickly (queuing, not blocking) +t0 = time.monotonic() +try: + s1, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": "async timing check one", "chat_id": DEFAULT_CHAT_ID}, + timeout=3) + s2, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": "async timing check two", "chat_id": DEFAULT_CHAT_ID}, + timeout=3) + t_both = time.monotonic() - t0 + report("Two consecutive POSTs both 202, total < 1s (fire-and-forget queue)", + s1 == 202 and s2 == 202 and t_both < 1, f"{t_both:.3f}s") +except Exception as e: + report("Consecutive POST queueing", False, str(e)) + +print() +print(f" To confirm reply-before-memory async ordering, run:") +print(f" docker compose -f adolf/docker-compose.yml logs deepagents | grep -E 'replied|stored'") +print(f" Expected pattern per message:") +print(f" [agent] replied in Xs ← GPU reply first") +print(f" [memory] stored in Ys ← CPU memory after (Y > X - reply_time)") + + +# ── summary ─────────────────────────────────────────────────────────────────── +print(f"\n{'─'*55}") +total = len(results) +passed = sum(1 for _, ok in results if ok) +failed = total - passed +print(f"Results: {passed}/{total} passed", end="") +if failed: + print(f" ({failed} failed)\n") + print("Failed checks:") + for name, ok in results: + if not ok: + print(f" - {name}") +else: + print(" — all good") +print()