diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 5c98ec7..7c8871c 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -1,126 +1,89 @@ # Adolf -Persistent AI assistant reachable via Telegram. Three-tier model routing with GPU VRAM management. +Autonomous personal assistant with a multi-channel gateway. Three-tier model routing with GPU VRAM management. ## Architecture ``` -Telegram user - ↕ (long-polling) -[grammy] Node.js — port 3001 - - grammY bot polls Telegram - - on message: fire-and-forget POST /chat to deepagents - - exposes MCP SSE server: tool send_telegram_message(chat_id, text) - ↓ POST /chat → 202 Accepted immediately -[deepagents] Python FastAPI — port 8000 - ↓ -Pre-check: starts with /think? → force_complex=True, strip prefix - ↓ -Router (qwen2.5:0.5b, ~1-2s, always warm in VRAM) - Structured output: {tier: light|medium|complex, confidence: 0.0-1.0, reply?: str} - - light: simple conversational → router answers directly, ~1-2s - - medium: needs memory/web search → qwen3:4b + deepagents tools - - complex: multi-step research, planning, code → qwen3:8b + subagents - force_complex always overrides to complex - complex only if confidence >= 0.85 (else downgraded to medium) - ↓ - ├── light ─────────── router reply used directly (no extra LLM call) - ├── medium ────────── deepagents qwen3:4b + TodoList + tools - └── complex ───────── VRAM flush → deepagents qwen3:8b + TodoList + subagents - └→ background: exit_complex_mode (flush 8b, prewarm 4b+router) - ↓ -send_telegram_message via grammy MCP - ↓ -asyncio.create_task(store_memory_async) — spin-wait GPU idle → openmemory add_memory - ↕ MCP SSE ↕ HTTP -[openmemory] Python + mem0 — port 8765 [SearXNG — port 11437] - - add_memory, search_memory, get_all_memories - - extractor: qwen2.5:1.5b on GPU Ollama (11436) — 2–5s - - embedder: nomic-embed-text on CPU Ollama (11435) — 50–150ms - - vector store: Qdrant (port 6333), 768 dims +┌─────────────────────────────────────────────────────┐ +│ CHANNEL ADAPTERS │ +│ │ +│ [Telegram/Grammy] [CLI] [Voice — future] │ +│ ↕ ↕ ↕ │ +│ └────────────────┴────────────┘ │ +│ ↕ │ +│ ┌─────────────────────────┐ │ +│ │ GATEWAY (agent.py) │ │ +│ │ FastAPI :8000 │ │ +│ │ │ │ +│ │ POST /message │ ← all inbound │ +│ │ POST /chat (legacy) │ │ +│ │ GET /reply/{id} SSE │ ← CLI polling │ +│ │ GET /health │ │ +│ │ │ │ +│ │ channels.py registry │ │ +│ │ conversation buffers │ │ +│ └──────────┬──────────────┘ │ +│ ↓ │ +│ ┌──────────────────────┐ │ +│ │ AGENT CORE │ │ +│ │ three-tier routing │ │ +│ │ VRAM management │ │ +│ └──────────────────────┘ │ +│ ↓ │ +│ channels.deliver(session_id, channel, text)│ +│ ↓ ↓ │ +│ telegram → POST grammy/send cli → SSE queue │ +└─────────────────────────────────────────────────────┘ +``` + +## Channel Adapters + +| Channel | session_id | Inbound | Outbound | +|---------|-----------|---------|---------| +| Telegram | `tg-` | Grammy long-poll → POST /message | channels.py → POST grammy:3001/send | +| CLI | `cli-` | POST /message directly | GET /reply/{id} SSE stream | +| Voice | `voice-` | (future) | (future) | + +## Unified Message Flow + +``` +1. Channel adapter receives message +2. POST /message {text, session_id, channel, user_id} +3. 202 Accepted immediately +4. Background: run_agent_task(message, session_id, channel) +5. Route → run agent tier → get reply text +6. channels.deliver(session_id, channel, reply_text) + - always puts reply in pending_replies[session_id] queue (for SSE) + - calls channel-specific send callback +7. GET /reply/{session_id} SSE clients receive the reply ``` ## Three-Tier Model Routing | Tier | Model | VRAM | Trigger | Latency | |------|-------|------|---------|---------| -| Light | qwen2.5:1.5b (router answers) | ~1.2 GB (shared with extraction) | Router classifies as light | ~2–4s | -| Medium | qwen3:4b | ~2.5 GB | Default; router classifies medium | ~20–40s | -| Complex | qwen3:8b | ~5.5 GB | `/think` prefix | ~60–120s | +| Light | qwen2.5:1.5b (router answers) | ~1.2 GB | Router classifies as light | ~2–4s | +| Medium | qwen3:4b | ~2.5 GB | Default | ~20–40s | +| Complex | qwen3:8b | ~6.0 GB | `/think` prefix | ~60–120s | -**Normal VRAM** (light + medium): router/extraction(1.2, shared) + medium(2.5) = ~3.7 GB -**Complex VRAM**: 8b alone = ~5.5 GB — must flush others first - -### Router model: qwen2.5:1.5b (not 0.5b) - -qwen2.5:0.5b is too small for reliable classification — tends to output "medium" for everything -or produces nonsensical output. qwen2.5:1.5b is already loaded in VRAM for memory extraction, -so switching adds zero net VRAM overhead while dramatically improving accuracy. - -Router uses **raw text generation** (not structured output/JSON schema): -- Ask model to output one word: `light`, `medium`, or `complex` -- Parse with simple keyword matching (fallback: `medium`) -- For `light` tier: a second call generates the reply text +**`/think` prefix**: forces complex tier, stripped before sending to agent. ## VRAM Management -GTX 1070 has 8 GB VRAM. Ollama's auto-eviction can spill models to CPU RAM permanently -(all subsequent loads stay on CPU). To prevent this: +GTX 1070 — 8 GB. Ollama must be restarted if CUDA init fails (model loads on CPU). -1. **Always flush explicitly** before loading qwen3:8b (`keep_alive=0`) -2. **Verify eviction** via `/api/ps` poll (15s timeout) before proceeding -3. **Fallback**: timeout → log warning, run medium agent instead -4. **Post-complex**: flush 8b immediately, pre-warm 4b + router +1. Flush explicitly before loading qwen3:8b (`keep_alive=0`) +2. Verify eviction via `/api/ps` poll (15s timeout) before proceeding +3. Fallback: timeout → run medium agent instead +4. Post-complex: flush 8b, pre-warm 4b + router -```python -# Flush (force immediate unload): -POST /api/generate {"model": "qwen3:4b", "prompt": "", "keep_alive": 0} +## Session ID Convention -# Pre-warm (load into VRAM for 5 min): -POST /api/generate {"model": "qwen3:4b", "prompt": "", "keep_alive": 300} -``` +- Telegram: `tg-` (e.g. `tg-346967270`) +- CLI: `cli-` (e.g. `cli-alvis`) -## Agents - -**Medium agent** (`build_medium_agent`): -- `create_deep_agent` with TodoListMiddleware (auto-included) -- Tools: `search_memory`, `get_all_memories`, `web_search` -- No subagents - -**Complex agent** (`build_complex_agent`): -- `create_deep_agent` with TodoListMiddleware + SubAgentMiddleware -- Tools: all agent tools -- Subagents: - - `research`: web_search only, for thorough multi-query web research - - `memory`: search_memory + get_all_memories, for comprehensive context retrieval - -## Concurrency - -| Semaphore | Guards | Notes | -|-----------|--------|-------| -| `_reply_semaphore(1)` | GPU Ollama (all tiers) | One LLM reply inference at a time | -| `_memory_semaphore(1)` | GPU Ollama (qwen2.5:1.5b extraction) | One memory extraction at a time | - -Light path holds `_reply_semaphore` briefly (no GPU inference). -Memory extraction spin-waits until `_reply_semaphore` is free (60s timeout). - -## Pipeline - -1. User message → Grammy → `POST /chat` → 202 Accepted -2. Background: acquire `_reply_semaphore` → route → run agent tier → send reply -3. `asyncio.create_task(store_memory_async)` — spin-waits GPU free, then extracts memories -4. For complex: `asyncio.create_task(exit_complex_mode)` — flushes 8b, pre-warms 4b+router - -## External Services (from openai/ stack) - -| Service | Host Port | Role | -|---------|-----------|------| -| Ollama GPU | 11436 | All reply inference + extraction (qwen2.5:1.5b) | -| Ollama CPU | 11435 | Memory embedding (nomic-embed-text) | -| Qdrant | 6333 | Vector store for memories | -| SearXNG | 11437 | Web search | - -GPU Ollama config: `OLLAMA_MAX_LOADED_MODELS=2`, `OLLAMA_NUM_PARALLEL=1`. +Conversation history is keyed by session_id (5-turn buffer). ## Files @@ -128,17 +91,28 @@ GPU Ollama config: `OLLAMA_MAX_LOADED_MODELS=2`, `OLLAMA_NUM_PARALLEL=1`. adolf/ ├── docker-compose.yml Services: deepagents, openmemory, grammy ├── Dockerfile deepagents container (Python 3.12) -├── agent.py FastAPI + three-tier routing + run_agent_task -├── router.py Router class — qwen2.5:0.5b structured output routing +├── agent.py FastAPI gateway + three-tier routing +├── channels.py Channel registry + deliver() + pending_replies +├── router.py Router class — qwen2.5:1.5b routing ├── vram_manager.py VRAMManager — flush/prewarm/poll Ollama VRAM -├── agent_factory.py build_medium_agent / build_complex_agent (deepagents) +├── agent_factory.py build_medium_agent / build_complex_agent +├── cli.py Interactive CLI REPL client +├── wiki_research.py Batch wiki research pipeline (uses /message + SSE) ├── .env TELEGRAM_BOT_TOKEN (not committed) ├── openmemory/ │ ├── server.py FastMCP + mem0 MCP tools -│ ├── requirements.txt │ └── Dockerfile └── grammy/ - ├── bot.mjs grammY bot + MCP SSE server + ├── bot.mjs grammY Telegram bot + POST /send HTTP endpoint ├── package.json └── Dockerfile ``` + +## External Services (from openai/ stack) + +| Service | Host Port | Role | +|---------|-----------|------| +| Ollama GPU | 11436 | All reply inference | +| Ollama CPU | 11435 | Memory embedding (nomic-embed-text) | +| Qdrant | 6333 | Vector store for memories | +| SearXNG | 11437 | Web search | diff --git a/Dockerfile b/Dockerfile index d4cd94f..d81ee0c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,6 +5,6 @@ WORKDIR /app RUN pip install --no-cache-dir deepagents langchain-ollama langgraph \ fastapi uvicorn langchain-mcp-adapters langchain-community httpx -COPY agent.py vram_manager.py router.py agent_factory.py hello_world.py . +COPY agent.py channels.py vram_manager.py router.py agent_factory.py hello_world.py . CMD ["uvicorn", "agent:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/agent.py b/agent.py index 2ab6351..8d415f9 100644 --- a/agent.py +++ b/agent.py @@ -3,10 +3,13 @@ import os import time from contextlib import asynccontextmanager -from fastapi import FastAPI, BackgroundTasks -from fastapi.responses import JSONResponse +from fastapi import FastAPI, BackgroundTasks, Request +from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel +import re as _re +import httpx as _httpx + from langchain_ollama import ChatOllama from langchain_mcp_adapters.client import MultiServerMCPClient from langchain_community.utilities import SearxSearchWrapper @@ -15,6 +18,7 @@ from langchain_core.tools import Tool from vram_manager import VRAMManager from router import Router from agent_factory import build_medium_agent, build_complex_agent +import channels OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") ROUTER_MODEL = os.getenv("DEEPAGENTS_ROUTER_MODEL", "qwen2.5:0.5b") @@ -22,36 +26,27 @@ MEDIUM_MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:4b") COMPLEX_MODEL = os.getenv("DEEPAGENTS_COMPLEX_MODEL", "qwen3:8b") SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437") OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765") -GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001") +CRAWL4AI_URL = os.getenv("CRAWL4AI_URL", "http://crawl4ai:11235") MAX_HISTORY_TURNS = 5 _conversation_buffers: dict[str, list] = {} MEDIUM_SYSTEM_PROMPT = ( - "You are a helpful AI assistant talking to a user via Telegram. " - "The user's ID is {user_id}. " - "IMPORTANT: When calling any memory tool (search_memory, get_all_memories), " - "always use user_id=\"{user_id}\". " - "Every conversation is automatically saved to memory after you reply — " - "you do NOT need to explicitly store anything. " - "NEVER tell the user you cannot remember or store information. " - "If the user asks you to remember something, acknowledge it and confirm it will be remembered. " - "Use search_memory when context from past conversations may be relevant. " + "You are a helpful AI assistant. " "Use web_search for questions about current events or facts you don't know. " "Reply concisely." ) COMPLEX_SYSTEM_PROMPT = ( - "You are a capable AI assistant tackling a complex, multi-step task for a Telegram user. " - "The user's ID is {user_id}. " - "IMPORTANT: When calling any memory tool (search_memory, get_all_memories), " - "always use user_id=\"{user_id}\". " - "Plan your work using write_todos before diving in. " - "Delegate: use the 'research' subagent for thorough web research across multiple queries, " - "and the 'memory' subagent to gather comprehensive context from past conversations. " - "Every conversation is automatically saved to memory — you do NOT need to store anything. " - "NEVER tell the user you cannot remember or store information. " - "Produce a thorough, well-structured reply." + "You are a deep research assistant. " + "web_search automatically fetches full page content from top results — use it 6+ times with different queries. " + "Also call fetch_url on any specific URL you want to read in full.\n\n" + "Run searches in English AND Russian/Latvian. " + "After getting results, run follow-up searches based on new facts found.\n\n" + "Write a structured markdown report with sections: " + "Overview, Education, Career, Publications, Online Presence, Interesting Findings.\n" + "Every fact must link to the real URL it came from: [fact](url). " + "NEVER invent URLs. End with: **Sources checked: N**" ) medium_agent = None @@ -59,24 +54,22 @@ complex_agent = None router: Router = None vram_manager: VRAMManager = None mcp_client = None -send_tool = None -add_memory_tool = None # GPU mutex: one LLM inference at a time _reply_semaphore = asyncio.Semaphore(1) -# Memory semaphore: one async extraction at a time -_memory_semaphore = asyncio.Semaphore(1) @asynccontextmanager async def lifespan(app: FastAPI): - global medium_agent, complex_agent, router, vram_manager - global mcp_client, send_tool, add_memory_tool + global medium_agent, complex_agent, router, vram_manager, mcp_client + + # Register channel adapters + channels.register_defaults() # Three model instances router_model = ChatOllama( model=ROUTER_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=4096, - temperature=0, # deterministic classification + temperature=0, ) medium_model = ChatOllama( model=MEDIUM_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=8192 @@ -90,7 +83,6 @@ async def lifespan(app: FastAPI): mcp_connections = { "openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"}, - "grammy": {"transport": "sse", "url": f"{GRAMMY_URL}/sse"}, } mcp_client = MultiServerMCPClient(mcp_connections) for attempt in range(12): @@ -103,22 +95,90 @@ async def lifespan(app: FastAPI): print(f"[agent] MCP not ready (attempt {attempt + 1}/12): {e}. Retrying in 5s...") await asyncio.sleep(5) - send_tool = next((t for t in mcp_tools if t.name == "send_telegram_message"), None) - add_memory_tool = next((t for t in mcp_tools if t.name == "add_memory"), None) - agent_tools = [t for t in mcp_tools if t.name not in ("send_telegram_message", "add_memory")] + agent_tools = [t for t in mcp_tools if t.name not in ("add_memory", "search_memory", "get_all_memories")] searx = SearxSearchWrapper(searx_host=SEARXNG_URL) + + def _crawl4ai_fetch(url: str) -> str: + """Fetch a URL via Crawl4AI (JS-rendered, bot-bypass) and return clean markdown.""" + try: + r = _httpx.post(f"{CRAWL4AI_URL}/crawl", json={"urls": [url]}, timeout=60) + r.raise_for_status() + results = r.json().get("results", []) + if not results or not results[0].get("success"): + return "" + md_obj = results[0].get("markdown") or {} + md = md_obj.get("raw_markdown") if isinstance(md_obj, dict) else str(md_obj) + return (md or "")[:5000] + except Exception as e: + return f"[fetch error: {e}]" + + def _search_and_read(query: str) -> str: + """Search the web and automatically fetch full content of top results. + Returns snippets + full page content from the top URLs.""" + import json as _json + # Get structured results from SearXNG + try: + r = _httpx.get( + f"{SEARXNG_URL}/search", + params={"q": query, "format": "json"}, + timeout=15, + ) + data = r.json() + items = data.get("results", [])[:5] + except Exception as e: + return f"[search error: {e}]" + + if not items: + return "No results found." + + out = [f"Search: {query}\n"] + for i, item in enumerate(items, 1): + url = item.get("url", "") + title = item.get("title", "") + snippet = item.get("content", "")[:300] + out.append(f"\n[{i}] {title}\nURL: {url}\nSnippet: {snippet}") + + # Auto-fetch top 2 URLs for full content + out.append("\n\n--- Full page content ---") + for item in items[:2]: + url = item.get("url", "") + if not url: + continue + content = _crawl4ai_fetch(url) + if content and not content.startswith("[fetch error"): + out.append(f"\n### {url}\n{content[:3000]}") + + return "\n".join(out) + agent_tools.append(Tool( name="web_search", - func=searx.run, - description="Search the web for current information", + func=_search_and_read, + description=( + "Search the web and read full content of top results. " + "Returns search snippets AND full page text from the top URLs. " + "Use multiple different queries to research a topic thoroughly." + ), + )) + + def _fetch_url(url: str) -> str: + """Fetch and read the full text content of a URL.""" + content = _crawl4ai_fetch(url) + return content if content else "[fetch_url: empty or blocked]" + + agent_tools.append(Tool( + name="fetch_url", + func=_fetch_url, + description=( + "Fetch and read the full text content of a specific URL. " + "Use for URLs not covered by web_search. Input: a single URL string." + ), )) - # Build agents (system_prompt filled per-request with user_id) medium_agent = build_medium_agent( model=medium_model, agent_tools=agent_tools, - system_prompt=MEDIUM_SYSTEM_PROMPT.format(user_id="{user_id}"), + system_prompt=MEDIUM_SYSTEM_PROMPT, ) complex_agent = build_complex_agent( model=complex_model, @@ -139,42 +199,34 @@ async def lifespan(app: FastAPI): router = None vram_manager = None mcp_client = None - send_tool = None - add_memory_tool = None app = FastAPI(lifespan=lifespan) +# ── request models ───────────────────────────────────────────────────────────── + +class InboundMessage(BaseModel): + text: str + session_id: str # e.g. "tg-346967270", "cli-alvis" + channel: str # "telegram" | "cli" + user_id: str = "" # human identity; defaults to session_id if empty + metadata: dict = {} + + class ChatRequest(BaseModel): + """Legacy model — kept for test_pipeline.py compatibility.""" message: str chat_id: str -async def store_memory_async(conversation: str, user_id: str): - """Fire-and-forget: extract and store memories after GPU is free.""" - t_wait = time.monotonic() - while _reply_semaphore.locked(): - if time.monotonic() - t_wait > 60: - print(f"[memory] spin-wait timeout 60s, proceeding for user {user_id}", flush=True) - break - await asyncio.sleep(0.5) - async with _memory_semaphore: - t0 = time.monotonic() - try: - await add_memory_tool.ainvoke({"text": conversation, "user_id": user_id}) - print(f"[memory] stored in {time.monotonic() - t0:.1f}s for user {user_id}", flush=True) - except Exception as e: - print(f"[memory] error after {time.monotonic() - t0:.1f}s: {e}", flush=True) - +# ── helpers ──────────────────────────────────────────────────────────────────── def _extract_final_text(result) -> str | None: - """Extract last AIMessage content from agent result.""" msgs = result.get("messages", []) for m in reversed(msgs): if type(m).__name__ == "AIMessage" and getattr(m, "content", ""): return m.content - # deepagents may return output differently if isinstance(result, dict) and result.get("output"): return result["output"] return None @@ -192,10 +244,11 @@ def _log_messages(result): print(f"[agent] {role} → {tc['name']}({tc['args']})", flush=True) -async def run_agent_task(message: str, chat_id: str): - print(f"[agent] queued: {message[:80]!r} chat={chat_id}", flush=True) +# ── core task ────────────────────────────────────────────────────────────────── + +async def run_agent_task(message: str, session_id: str, channel: str = "telegram"): + print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True) - # Pre-check: /think prefix forces complex tier force_complex = False clean_message = message if message.startswith("/think "): @@ -205,10 +258,9 @@ async def run_agent_task(message: str, chat_id: str): async with _reply_semaphore: t0 = time.monotonic() - history = _conversation_buffers.get(chat_id, []) + history = _conversation_buffers.get(session_id, []) print(f"[agent] running: {clean_message[:80]!r}", flush=True) - # Route the message tier, light_reply = await router.route(clean_message, history, force_complex) print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True) @@ -220,7 +272,7 @@ async def run_agent_task(message: str, chat_id: str): print(f"[agent] light path: answered by router", flush=True) elif tier == "medium": - system_prompt = MEDIUM_SYSTEM_PROMPT.format(user_id=chat_id) + system_prompt = MEDIUM_SYSTEM_PROMPT result = await medium_agent.ainvoke({ "messages": [ {"role": "system", "content": system_prompt}, @@ -237,16 +289,15 @@ async def run_agent_task(message: str, chat_id: str): if not ok: print("[agent] complex→medium fallback (eviction timeout)", flush=True) tier = "medium" - system_prompt = MEDIUM_SYSTEM_PROMPT.format(user_id=chat_id) result = await medium_agent.ainvoke({ "messages": [ - {"role": "system", "content": system_prompt}, + {"role": "system", "content": MEDIUM_SYSTEM_PROMPT}, *history, {"role": "user", "content": clean_message}, ] }) else: - system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=chat_id) + system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id) result = await complex_agent.ainvoke({ "messages": [ {"role": "system", "content": system_prompt}, @@ -263,47 +314,73 @@ async def run_agent_task(message: str, chat_id: str): except Exception as e: import traceback llm_elapsed = time.monotonic() - t0 - print(f"[agent] error after {llm_elapsed:.1f}s for chat {chat_id}: {e}", flush=True) + print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True) traceback.print_exc() - # Send reply via grammy MCP (split if > Telegram's 4096-char limit) - if final_text and send_tool: + # Deliver reply through the originating channel + if final_text: t1 = time.monotonic() - MAX_TG = 4000 # leave headroom below the 4096 hard limit - chunks = [final_text[i:i + MAX_TG] for i in range(0, len(final_text), MAX_TG)] - for chunk in chunks: - await send_tool.ainvoke({"chat_id": chat_id, "text": chunk}) + await channels.deliver(session_id, channel, final_text) send_elapsed = time.monotonic() - t1 - # Log in format compatible with test_pipeline.py parser print( f"[agent] replied in {time.monotonic() - t0:.1f}s " f"(llm={llm_elapsed:.1f}s, send={send_elapsed:.1f}s) tier={tier}", flush=True, ) - elif not final_text: + print(f"[agent] reply_text: {final_text}", flush=True) + else: print("[agent] warning: no text reply from agent", flush=True) # Update conversation buffer if final_text: - buf = _conversation_buffers.get(chat_id, []) + buf = _conversation_buffers.get(session_id, []) buf.append({"role": "user", "content": clean_message}) buf.append({"role": "assistant", "content": final_text}) - _conversation_buffers[chat_id] = buf[-(MAX_HISTORY_TURNS * 2):] + _conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):] - # Async memory storage (fire-and-forget) - if add_memory_tool and final_text: - conversation = f"User: {clean_message}\nAssistant: {final_text}" - asyncio.create_task(store_memory_async(conversation, chat_id)) + +# ── endpoints ────────────────────────────────────────────────────────────────── + +@app.post("/message") +async def message(request: InboundMessage, background_tasks: BackgroundTasks): + """Unified inbound endpoint for all channels.""" + if medium_agent is None: + return JSONResponse(status_code=503, content={"error": "Agent not ready"}) + session_id = request.session_id + channel = request.channel + background_tasks.add_task(run_agent_task, request.text, session_id, channel) + return JSONResponse(status_code=202, content={"status": "accepted"}) @app.post("/chat") async def chat(request: ChatRequest, background_tasks: BackgroundTasks): + """Legacy endpoint — maps chat_id to tg- session for backward compatibility.""" if medium_agent is None: return JSONResponse(status_code=503, content={"error": "Agent not ready"}) - background_tasks.add_task(run_agent_task, request.message, request.chat_id) + session_id = f"tg-{request.chat_id}" + background_tasks.add_task(run_agent_task, request.message, session_id, "telegram") return JSONResponse(status_code=202, content={"status": "accepted"}) +@app.get("/reply/{session_id}") +async def reply_stream(session_id: str): + """ + SSE endpoint — streams the reply for a session once available, then closes. + Used by CLI client and wiki_research.py instead of log polling. + """ + q = channels.pending_replies.setdefault(session_id, asyncio.Queue()) + + async def event_generator(): + try: + text = await asyncio.wait_for(q.get(), timeout=900) + # Escape newlines so entire reply fits in one SSE data line + yield f"data: {text.replace(chr(10), '\\n').replace(chr(13), '')}\n\n" + except asyncio.TimeoutError: + yield "data: [timeout]\n\n" + + return StreamingResponse(event_generator(), media_type="text/event-stream") + + @app.get("/health") async def health(): return {"status": "ok", "agent_ready": medium_agent is not None} diff --git a/agent_factory.py b/agent_factory.py index 5926504..6182ff4 100644 --- a/agent_factory.py +++ b/agent_factory.py @@ -1,4 +1,4 @@ -from deepagents import create_deep_agent, SubAgent +from deepagents import create_deep_agent def build_medium_agent(model, agent_tools: list, system_prompt: str): @@ -11,44 +11,9 @@ def build_medium_agent(model, agent_tools: list, system_prompt: str): def build_complex_agent(model, agent_tools: list, system_prompt: str): - """Complex agent: create_deep_agent with TodoList planning + research/memory subagents.""" - web_tools = [t for t in agent_tools if getattr(t, "name", "") == "web_search"] - memory_tools = [ - t for t in agent_tools - if getattr(t, "name", "") in ("search_memory", "get_all_memories") - ] - - research_sub: SubAgent = { - "name": "research", - "description": ( - "Runs multiple web searches in parallel and synthesizes findings. " - "Use for thorough research tasks requiring several queries." - ), - "system_prompt": ( - "You are a research specialist. Search the web thoroughly using multiple queries. " - "Cite sources and synthesize information into a clear summary." - ), - "tools": web_tools, - "model": model, - } - - memory_sub: SubAgent = { - "name": "memory", - "description": ( - "Searches and retrieves all relevant memories about the user comprehensively. " - "Use to gather full context from past conversations." - ), - "system_prompt": ( - "You are a memory specialist. Search broadly using multiple queries. " - "Return all relevant facts and context you find." - ), - "tools": memory_tools, - "model": model, - } - + """Complex agent: direct agentic loop — calls web_search/fetch_url itself, no subagent indirection.""" return create_deep_agent( model=model, tools=agent_tools, system_prompt=system_prompt, - subagents=[research_sub, memory_sub], ) diff --git a/channels.py b/channels.py new file mode 100644 index 0000000..97c50c5 --- /dev/null +++ b/channels.py @@ -0,0 +1,75 @@ +""" +Channel registry and reply delivery for the Adolf multi-channel gateway. + +Each channel registers an async send callback: + channels.register("telegram", telegram_send) + channels.register("cli", cli_send) + +When the agent is done, it calls: + await channels.deliver(session_id, channel, text) + +Replies are also placed into per-session asyncio Queues so the +GET /reply/{session_id} SSE endpoint can stream them to polling clients. +""" + +import asyncio +import os +from typing import Awaitable, Callable + +import httpx + +# ── channel registry ────────────────────────────────────────────────────────── + +_callbacks: dict[str, Callable[[str, str], Awaitable[None]]] = {} +# session_id → Queue that receives the final reply text +pending_replies: dict[str, asyncio.Queue] = {} + + +def register(channel: str, callback: Callable[[str, str], Awaitable[None]]) -> None: + """Register an async send callback for a channel name.""" + _callbacks[channel] = callback + + +async def deliver(session_id: str, channel: str, text: str) -> None: + """ + Deliver a reply to the originating channel AND put it in the pending + queue so SSE /reply/{session_id} clients get it. + """ + # Always enqueue first so SSE listeners aren't missed + q = pending_replies.setdefault(session_id, asyncio.Queue()) + await q.put(text) + + cb = _callbacks.get(channel) + if cb: + await cb(session_id, text) + else: + print(f"[channels] no callback for channel={channel!r}, reply queued only", flush=True) + + +# ── built-in channel adapters ───────────────────────────────────────────────── + +GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001") + + +async def _telegram_send(session_id: str, text: str) -> None: + """Send reply to Telegram via Grammy's POST /send endpoint.""" + chat_id = session_id.removeprefix("tg-") + MAX_TG = 4000 + chunks = [text[i:i + MAX_TG] for i in range(0, len(text), MAX_TG)] + async with httpx.AsyncClient(timeout=15) as client: + for chunk in chunks: + await client.post( + f"{GRAMMY_URL}/send", + json={"chat_id": chat_id, "text": chunk}, + ) + + +async def _cli_send(session_id: str, text: str) -> None: + """CLI replies are handled entirely through the pending_replies queue — no-op here.""" + pass + + +def register_defaults() -> None: + """Register the built-in Telegram and CLI channel adapters.""" + register("telegram", _telegram_send) + register("cli", _cli_send) diff --git a/cli.py b/cli.py new file mode 100644 index 0000000..4cb909a --- /dev/null +++ b/cli.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +""" +Adolf CLI — interactive REPL for the multi-channel gateway. + +Usage: + python3 cli.py [--url http://localhost:8000] [--session cli-alvis] +""" + +import argparse +import json +import os +import sys +import urllib.request + +GATEWAY = "http://localhost:8000" + + +def post_message(gateway: str, text: str, session_id: str) -> None: + payload = json.dumps({ + "text": text, + "session_id": session_id, + "channel": "cli", + "user_id": os.getlogin(), + }).encode() + req = urllib.request.Request( + f"{gateway}/message", + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=10) as r: + if r.status != 202: + print(f"[error] gateway returned {r.status}", file=sys.stderr) + sys.exit(1) + + +def wait_for_reply(gateway: str, session_id: str, timeout: int = 400) -> str: + """Open SSE stream and return first data event.""" + req = urllib.request.Request( + f"{gateway}/reply/{session_id}", + headers={"Accept": "text/event-stream"}, + ) + with urllib.request.urlopen(req, timeout=timeout + 5) as r: + for raw_line in r: + line = raw_line.decode("utf-8").rstrip("\n") + if line.startswith("data:"): + return line[5:].strip().replace("\\n", "\n") + return "" + + +def main(): + parser = argparse.ArgumentParser(description="Adolf CLI") + parser.add_argument("--url", default=GATEWAY, help="Gateway URL") + parser.add_argument("--session", default=f"cli-{os.getlogin()}", help="Session ID") + parser.add_argument("--timeout", type=int, default=400, help="Reply timeout (seconds)") + args = parser.parse_args() + + print(f"Adolf CLI (session={args.session}, gateway={args.url})") + print("Type your message and press Enter. Ctrl+C or Ctrl+D to exit.\n") + + try: + while True: + try: + text = input("> ").strip() + except EOFError: + break + if not text: + continue + + post_message(args.url, text, args.session) + print("...", end="", flush=True) + reply = wait_for_reply(args.url, args.session, timeout=args.timeout) + print(f"\r{reply}\n") + + except KeyboardInterrupt: + print("\nbye") + + +if __name__ == "__main__": + main() diff --git a/docker-compose.yml b/docker-compose.yml index 2469c37..6851e19 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,11 +11,14 @@ services: - DEEPAGENTS_COMPLEX_MODEL=qwen3:8b - DEEPAGENTS_ROUTER_MODEL=qwen2.5:1.5b - SEARXNG_URL=http://host.docker.internal:11437 + - GRAMMY_URL=http://grammy:3001 + - CRAWL4AI_URL=http://crawl4ai:11235 extra_hosts: - "host.docker.internal:host-gateway" depends_on: - openmemory - grammy + - crawl4ai restart: unless-stopped openmemory: @@ -41,3 +44,13 @@ services: - TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN} - DEEPAGENTS_URL=http://deepagents:8000 restart: unless-stopped + + crawl4ai: + image: unclecode/crawl4ai:latest + container_name: crawl4ai + ports: + - "11235:11235" + environment: + - CRAWL4AI_LOG_LEVEL=WARNING + shm_size: "1g" + restart: unless-stopped diff --git a/grammy/Dockerfile b/grammy/Dockerfile new file mode 100644 index 0000000..c55d6a5 --- /dev/null +++ b/grammy/Dockerfile @@ -0,0 +1,6 @@ +FROM node:22-alpine +WORKDIR /app +COPY package.json . +RUN npm install +COPY bot.mjs . +CMD ["node", "bot.mjs"] diff --git a/grammy/bot.mjs b/grammy/bot.mjs new file mode 100644 index 0000000..51238fb --- /dev/null +++ b/grammy/bot.mjs @@ -0,0 +1,56 @@ +import { Bot } from "grammy"; +import express from "express"; + +const TELEGRAM_BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN; +const DEEPAGENTS_URL = process.env.DEEPAGENTS_URL || "http://deepagents:8000"; + +const bot = new Bot(TELEGRAM_BOT_TOKEN); + +// Forward all text messages to the unified gateway /message endpoint +bot.on("message:text", (ctx) => { + const text = ctx.message.text; + const chat_id = String(ctx.chat.id); + + console.log(`[grammy] message from ${chat_id}: ${text.slice(0, 80)}`); + + fetch(`${DEEPAGENTS_URL}/message`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + text, + session_id: `tg-${chat_id}`, + channel: "telegram", + user_id: chat_id, + }), + }).catch((err) => console.error("[grammy] error forwarding to deepagents:", err)); +}); + +// HTTP server — delivers replies from the gateway back to Telegram +const app = express(); +app.use(express.json()); + +app.post("/send", async (req, res) => { + const { chat_id, text } = req.body; + if (!chat_id || !text) { + res.status(400).json({ error: "chat_id and text required" }); + return; + } + try { + await bot.api.sendMessage(chat_id, text); + console.log(`[grammy] sent to ${chat_id}: ${text.slice(0, 60)}`); + res.json({ ok: true }); + } catch (err) { + console.error(`[grammy] send error to ${chat_id}:`, err.message); + res.status(500).json({ error: err.message }); + } +}); + +app.get("/health", (_req, res) => res.json({ ok: true })); + +app.listen(3001, () => { + console.log("[grammy] HTTP server listening on port 3001"); +}); + +bot.start({ + onStart: (info) => console.log(`[grammy] bot @${info.username} started`), +}); diff --git a/grammy/package.json b/grammy/package.json new file mode 100644 index 0000000..ca1ab35 --- /dev/null +++ b/grammy/package.json @@ -0,0 +1,7 @@ +{ + "type": "module", + "dependencies": { + "grammy": "^1.36.0", +"express": "^4.21.0" + } +} diff --git a/hello_world.py b/hello_world.py new file mode 100644 index 0000000..630136e --- /dev/null +++ b/hello_world.py @@ -0,0 +1,21 @@ +import os +from langchain_ollama import ChatOllama +from deepagents import create_deep_agent + +OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") +MODEL = os.getenv("DEEPAGENTS_MODEL", "gemma3:4b") + +print(f"Connecting to Ollama at {OLLAMA_BASE_URL} with model {MODEL}") + +model = ChatOllama(model=MODEL, base_url=OLLAMA_BASE_URL) + +agent = create_deep_agent(model=model) + +result = agent.invoke({ + "messages": [{"role": "user", "content": "Say hello world in one sentence."}] +}) + +print("\n--- Agent Response ---") +for msg in result.get("messages", []): + if hasattr(msg, "content") and msg.content: + print(f"[{msg.__class__.__name__}]: {msg.content}") diff --git a/langgraph.md b/langgraph.md new file mode 100644 index 0000000..8a9d3b5 --- /dev/null +++ b/langgraph.md @@ -0,0 +1,247 @@ +# LangGraph: Multi-Model Routing Architecture + +## Problem + +`create_react_agent` uses one model for all steps in the ReAct loop: + +``` +qwen3:4b → decide to call tool ~37s +→ run tool ~1s +qwen3:4b → final answer ~37s +───────────────────────────────────── +Total ~75s +``` + +The routing step is classification + argument extraction — low entropy, constrained output. +It does not need the same model as answer generation. + +--- + +## Is the Pattern Established? (2025 Research) + +Yes. The 2025 consensus from multiple papers treats heterogeneous model architectures +(small for routing, large for generation) as **settled production engineering**, not research: + +- SLMs (1–12B) match or exceed LLMs on schema-constrained tasks (tool calls, JSON, function + calling) at 10×–100× lower compute cost (arXiv 2510.03847, arXiv 2506.02153) +- MasRouter (ACL 2025): routing in multi-agent graphs reduces costs 2× without quality loss +- Cascade routing (ICLR 2025): 4% accuracy improvement, 30–92% cost reduction vs naive routing +- NVIDIA research (2025): "Small Language Models are the Future of Agentic AI" + +**Limitations acknowledged in literature:** +- Bad router defeats the purpose — classifier quality is critical +- Cascade (try small, escalate if uncertain) adds latency on queries that escalate +- Pre-trained routers (RouteLLM, etc.) are calibrated for specific model pairs; local model + pairs need independent validation + +--- + +## Three-Tier Architecture (small → medium → large) + +### Concept + +``` +Incoming query + ↓ +[Router: tiny model or embedding classifier] ~1-2s + ├── simple/conversational → [Medium: qwen3:4b] ~20s + ├── needs tool call → [Medium: qwen3:4b + tools] ~20-40s + └── complex/multi-step → [Large: qwen3:8b + sub-agents] ~60s+ +``` + +### When to route to large + +Signals that justify loading a larger model: +- Multi-step reasoning required (math, code, planning) +- Sub-agent orchestration (the agent needs to call other agents) +- Explicit reasoning request ("think through", "analyze carefully") +- Low confidence from medium model (cascade pattern) + +### Trade-offs of three-tier vs two-tier + +| | Two-tier | Three-tier | +|--|---------|-----------| +| Simple queries | small router + medium answer | small router + medium answer (same) | +| Complex queries | medium (may struggle) | swap to large (better quality) | +| GPU constraint | manageable | hard — see below | +| Routing error cost | low | high (wrong tier = much slower) | + +--- + +## The 8GB GPU Constraint — Core Problem + +This is the central issue. Research numbers on model swapping (2025): + +**Cold swap from disk (no optimization)** +- TTFT exceeds 140s for 7B-class models on HDD; 5–15s on NVMe SSD +- Not viable for interactive use at any tier + +**vLLM Sleep Mode (offload to CPU RAM, not disk)** +- 18–200× faster than cold start; TTFT 2–3s per switch +- vLLM-specific — not available in Ollama + +**Ollama behavior on 8GB VRAM** +- Default `keep_alive`: 5 minutes — model stays warm after use +- Two models simultaneously: qwen3:4b (~2.5GB) + qwen2.5:1.5b (~1.2GB) = ~3.7GB — fits +- qwen3:4b + qwen3:8b = ~8GB — does not reliably fit; eviction required +- Sequential swap in Ollama: Ollama evicts old model, loads new one from SSD (~5–15s on NVMe) +- Known Ollama bug: model spills from VRAM to RAM → all subsequent loads stay on CPU until restart + +**Conclusion for three-tier on single 8GB GPU:** + +| Tier switch | Cost | Viable? | +|------------|------|---------| +| tiny router → medium (qwen3:4b) | model swap ~5-15s if router is separate | borderline | +| medium → large (qwen3:8b) | evict qwen3:4b, load qwen3:8b = ~5-15s additional | no, for interactive | +| Keep medium always warm, route to large on demand | 5-15s swap overhead per complex query | acceptable if complex queries are rare | + +**Honest verdict: three-tier with model swapping is not viable for interactive per-turn latency +on 8GB VRAM with Ollama.** vLLM with Sleep Mode would make it viable (2–3s switch) but +requires replacing Ollama. + +--- + +## Practical Architecture for 8GB GPU (Ollama) + +### Option 1: Two-tier, both models always in VRAM (recommended) + +Keep two small models loaded simultaneously: + +``` +qwen2.5:0.5b (~0.4GB) — router: tool call decision + arg extraction +qwen3:4b (~2.5GB) — answer: all generation +nomic-embed-text (CPU) — embedding: search and store +qwen2.5:1.5b (~1.2GB) — extraction: mem0 fact extraction (GPU) +───────────────────────────────────────────────────────── +Total VRAM: ~4.1GB — well within 8GB +``` + +No swapping needed. Router runs first (~1-2s), answer model runs after (~20s). + +``` +Router → tool call JSON or "no tool" ~1-2s +→ tool runs (if needed) ~1s +→ Answer model generates reply ~20s +───────────────────────────────────────────── +Total ~22-23s +``` + +vs current two-call approach: ~75s. + +### Option 2: Semantic routing (encoder-only, free) + +Use nomic-embed-text (already running on CPU) as the router: + +```python +query_vec = embed(query) +sims = { + "search_memory": cosine(query_vec, memory_topic_vec), + "web_search": cosine(query_vec, web_topic_vec), +} +# If max sim > threshold → call that tool directly +# Then pass result + original query to answer model +``` + +Zero VRAM overhead. ~50ms routing. Can't extract tool args from embedding alone — +needs hardcoded arg construction (e.g. query = original user message). + +### Option 3: Three-tier with rare large-model escalation + +Keep qwen3:4b warm. Route to qwen3:8b only for explicitly complex tasks. +Accept ~10s swap overhead for those queries. qwen3:8b gets unloaded after. + +``` +Router → simple → qwen3:4b ~20s (no swap) +Router → complex → evict 4b, load 8b → ~30s (10s swap + 20s inference) +``` + +Works if <20% of queries are "complex" and users accept occasional slow responses. +Best implemented with explicit user trigger ("think about this carefully") rather than +automatic classification, to avoid swap overhead on misclassified queries. + +--- + +## LangGraph Implementation + +`create_react_agent` locks to one model. Explicit graph supports per-node models: + +```python +from langgraph.graph import StateGraph, MessagesState +from langgraph.prebuilt import ToolNode +from langchain_ollama import ChatOllama + +router_model = ChatOllama(model="qwen2.5:0.5b", base_url=OLLAMA_GPU_URL) +answer_model = ChatOllama(model="qwen3:4b", base_url=OLLAMA_GPU_URL) +# For Option 3: large_model = ChatOllama(model="qwen3:8b", ...) + +def router_node(state): + # Small model only outputs tool call JSON or nothing + return {"messages": [router_model.bind_tools(tools).invoke(state["messages"])]} + +def answer_node(state): + # Large(r) model generates human reply — no tools bound + return {"messages": [answer_model.invoke(state["messages"])]} + +def route(state) -> str: + last = state["messages"][-1] + return "tools" if getattr(last, "tool_calls", []) else "answer" + +graph = StateGraph(MessagesState) +graph.add_node("router", router_node) +graph.add_node("tools", ToolNode(tools)) +graph.add_node("answer", answer_node) +graph.set_entry_point("router") +graph.add_conditional_edges("router", route) +graph.add_edge("tools", "answer") +graph.add_edge("answer", END) +agent = graph.compile() +``` + +For three-tier, add a complexity classifier node before the router that selects +`answer_model = medium_model or large_model` based on query signals. + +--- + +## Open Source Routing Tools + +| Tool | Ollama support | Status | Notes | +|------|---------------|--------|-------| +| LiteLLM | First-class | Active 2025 | Proxy with tiered routing, fallbacks, load balancing | +| RouteLLM (LMSYS) | Yes (documented) | Stale (last commit Aug 2024) | Calibrated for GPT-4 vs Mixtral pair | +| Router-R1 | No | Active (NeurIPS 2025) | RL-based, open-sourced on HuggingFace | +| LLMRouter (ulab) | No | Research 2025 | 16+ routing methods, fair comparison framework | +| FrugalGPT | No direct | Algorithm only | Portkey.ai has implementation guide | + +**Most practical for Ollama**: LiteLLM proxy with tiered model config. Handles routing, +fallbacks, and load balancing without changing agent code. + +--- + +## Summary: What to Do for Adolf + +| | Recommendation | +|--|---------------| +| Quick win (zero risk) | Remove "always call search_memory" from system prompt — history buffer covers conversational recall, saves ~37s | +| Best architecture for 8GB | Two-tier: qwen2.5:0.5b router + qwen3:4b answer, both in VRAM, ~22s total | +| Three-tier feasibility | Not viable for interactive use with Ollama model swapping; viable with vLLM Sleep Mode (~3s swap) if Ollama is replaced | +| Complex task routing | Use explicit user trigger or keyword detection rather than automatic classifier — avoids swap penalty on misclassification | + +--- + +## References + +- arXiv 2510.03847 — Small Language Models for Agentic Systems: A Survey +- arXiv 2506.02153 — Small Language Models are the Future of Agentic AI +- arXiv 2406.04692 — Mixture-of-Agents Enhances LLM Capabilities (original MoA paper) +- arXiv 2410.10347 — A Unified Approach to Routing and Cascading for LLMs (ICLR 2025) +- MasRouter — ACL 2025: https://aclanthology.org/2025.acl-long.757.pdf +- Router-R1 — NeurIPS 2025: https://github.com/ulab-uiuc/Router-R1 +- vLLM Sleep Mode: https://blog.vllm.ai/2025/10/26/sleep-mode.html +- NVIDIA GPU Memory Swap: https://developer.nvidia.com/blog/cut-model-deployment-costs-while-keeping-performance-with-gpu-memory-swap/ +- LangGraph multi-agent: https://langchain-ai.github.io/langgraph/tutorials/multi_agent/ +- LangGraph custom ReAct: https://langchain-ai.github.io/langgraph/how-tos/react-agent-from-scratch/ +- LiteLLM Ollama docs: https://docs.litellm.ai/docs/providers/ollama +- RouteLLM + Ollama example: https://github.com/lm-sys/RouteLLM/blob/main/examples/routing_to_local_models.md +- LLMRouter framework: https://ulab-uiuc.github.io/LLMRouter/ +- Functionary (tool-call fine-tuned): https://github.com/MeetKai/functionary +- Constrained generation (outlines): https://github.com/dottxt-ai/outlines diff --git a/openmemory/Dockerfile b/openmemory/Dockerfile new file mode 100644 index 0000000..038abf3 --- /dev/null +++ b/openmemory/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.12-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY server.py . +CMD ["python", "server.py"] diff --git a/openmemory/requirements.txt b/openmemory/requirements.txt new file mode 100644 index 0000000..6ce37d4 --- /dev/null +++ b/openmemory/requirements.txt @@ -0,0 +1,6 @@ +mem0ai +ollama +fastapi +uvicorn +mcp[cli] +qdrant-client diff --git a/openmemory/server.py b/openmemory/server.py index fd85b37..73fd93c 100644 --- a/openmemory/server.py +++ b/openmemory/server.py @@ -1,24 +1,109 @@ +import json import os from mcp.server.fastmcp import FastMCP from mem0 import Memory +# Extraction LLM — GPU Ollama (qwen3:4b, same model as medium agent) +# Runs after reply when GPU is idle; spin-wait in agent.py prevents contention +OLLAMA_GPU_URL = os.getenv("OLLAMA_GPU_URL", "http://host.docker.internal:11436") + +# Embedding — CPU Ollama (nomic-embed-text, 137 MB RAM) +# Used for both search (50-150ms, acceptable) and store-time embedding OLLAMA_CPU_URL = os.getenv("OLLAMA_CPU_URL", "http://host.docker.internal:11435") + QDRANT_HOST = os.getenv("QDRANT_HOST", "host.docker.internal") QDRANT_PORT = int(os.getenv("QDRANT_PORT", "6333")) +# Change 2: Custom extraction prompt +# /no_think disables qwen3 thinking tokens so output is clean JSON +EXTRACTION_PROMPT = """/no_think +You are a memory extraction assistant. Extract factual statements from a conversation that are worth remembering long-term. + +Extract facts from BOTH user AND assistant messages, including: +- User details, preferences, and personal information +- User's plans, goals, and intentions +- The assistant's name or persona (if set by the user or stated by the assistant) +- Any commitments or agreements made +- Key facts stated as true + +Return ONLY valid JSON in this exact format: +{"facts": ["fact 1", "fact 2"]} + +If there are no facts worth storing, return: {"facts": []} + +IMPORTANT rules: +- Extract the EXACT concrete values mentioned. Never say "not known" or "unspecified". +- If the user states their name, job, pet, city, allergy, or preference — store the exact value. +- A single message may contain multiple facts — extract ALL of them. +- Do NOT extract vague summaries. Extract specific facts with real values. + +Examples: + +Input: "User: I live in Berlin\nAssistant: Got it, you're in Berlin!" +Output: {"facts": ["User lives in Berlin"]} + +Input: "User: My name is Alice and I live in Tokyo\nAssistant: Nice to meet you Alice!" +Output: {"facts": ["User's name is Alice", "User lives in Tokyo"]} + +Input: "User: I work as a software engineer at a startup\nAssistant: Cool!" +Output: {"facts": ["User works as a software engineer at a startup"]} + +Input: "User: I have a cat named Whiskers\nAssistant: Whiskers is a cute name!" +Output: {"facts": ["User has a cat named Whiskers"]} + +Input: "User: I'm allergic to nuts\nAssistant: I'll remember that." +Output: {"facts": ["User is allergic to nuts"]} + +Input: "User: remember that your name is Adolf\nAssistant: My name is Adolf!" +Output: {"facts": ["Assistant's name is Adolf"]} + +Input: "User: what time is it?\nAssistant: I don't have access to real-time data." +Output: {"facts": []} + +Input: "User: I prefer dark mode\nAssistant: Noted, I'll keep that in mind." +Output: {"facts": ["User prefers dark mode"]} + +Now extract facts from this conversation:""" + +# Update/dedup decision prompt — overrides mem0's default. +# qwen2.5:1.5b struggles with the default multi-step reasoning; this version is +# more explicit: list existing, list new, decide ADD/NONE per item. +UPDATE_PROMPT = """/no_think +You manage a memory store. Given EXISTING memories and NEW facts: +- For each EXISTING memory: output NONE (no change) or UPDATE (if a new fact replaces it) or DELETE. +- For each NEW fact: output ADD if it is not already covered by existing memories. Output NONE if it is already covered. +- IMPORTANT: You MUST include ALL new facts in your output — either as ADD or NONE. +- Output ONLY valid JSON, no explanation. + +Example A — new fact is genuinely new: +Existing: [{"id": "0", "text": "User lives in Berlin"}] +New facts: ["User is allergic to nuts"] +Output: {"memory": [{"id": "0", "text": "User lives in Berlin", "event": "NONE"}, {"id": "1", "text": "User is allergic to nuts", "event": "ADD"}]} + +Example B — new fact updates an existing one: +Existing: [{"id": "0", "text": "User lives in Berlin"}] +New facts: ["User lives in Paris"] +Output: {"memory": [{"id": "0", "text": "User lives in Paris", "event": "UPDATE", "old_memory": "User lives in Berlin"}]} + +Example C — new fact already covered: +Existing: [{"id": "0", "text": "User is allergic to nuts"}] +New facts: ["User has a nut allergy"] +Output: {"memory": [{"id": "0", "text": "User is allergic to nuts", "event": "NONE"}]}""" + config = { "llm": { "provider": "ollama", "config": { - "model": "qwen2.5:1.5b", - "ollama_base_url": OLLAMA_CPU_URL, + "model": "qwen3:4b", + "ollama_base_url": OLLAMA_GPU_URL, + "temperature": 0.1, # consistent JSON output }, }, "embedder": { "provider": "ollama", "config": { "model": "nomic-embed-text", - "ollama_base_url": OLLAMA_CPU_URL, + "ollama_base_url": OLLAMA_CPU_URL, # CPU: 50-150ms per query, no GPU needed }, }, "vector_store": { @@ -30,6 +115,8 @@ config = { "port": QDRANT_PORT, }, }, + "custom_fact_extraction_prompt": EXTRACTION_PROMPT, + "custom_update_memory_prompt": UPDATE_PROMPT, } memory = Memory.from_config(config) @@ -41,21 +128,27 @@ mcp = FastMCP("openmemory", host="0.0.0.0", port=8765) def add_memory(text: str, user_id: str = "default") -> str: """Store a memory for a user.""" result = memory.add(text, user_id=user_id) - return str(result) + # Change 3: return clean JSON instead of Python repr + return json.dumps(result, default=str) @mcp.tool() def search_memory(query: str, user_id: str = "default") -> str: """Search memories for a user using semantic similarity.""" - results = memory.search(query, user_id=user_id) - return str(results) + results = memory.search(query, user_id=user_id, limit=10, threshold=0.3) + # Filter to only return results with score >= 0.5 to avoid irrelevant noise + if isinstance(results, dict) and "results" in results: + results["results"] = [r for r in results["results"] if r.get("score", 0) >= 0.5] + return json.dumps(results, default=str) @mcp.tool() -def get_all_memories(user_id: str = "default") -> str: - """Get all stored memories for a user.""" - results = memory.get_all(user_id=user_id) - return str(results) +def get_all_memories(user_id: str = "default", limit: int = 50) -> str: + """Get stored memories for a user (up to limit).""" + # Change 5: cap results to avoid flooding context + results = memory.get_all(user_id=user_id, limit=limit) + # Change 3: return clean JSON instead of Python repr + return json.dumps(results, default=str) if __name__ == "__main__": diff --git a/reasoning.md b/reasoning.md new file mode 100644 index 0000000..414c26b --- /dev/null +++ b/reasoning.md @@ -0,0 +1,287 @@ +# Reasoning & Self-Reflection in Local LLM Agents + +Research-backed notes on implementing multi-stage reasoning for local 4-8B models (2025). + +--- + +## TL;DR + +For local 4-8B models, **programmatic self-critique loops rarely justify their cost**. +Native thinking tokens (Qwen3 `enable_thinking=True`) or external verifiers +give better results at lower complexity. See bottom of this file for recommendations. + +--- + +## Reasoning Patterns + +### Chain-of-Thought (CoT) + +Single forward pass, model thinks step-by-step before answering. +Zero implementation cost — just a prompt change. +Typical gain: +5-10pp on multi-step tasks vs no CoT. +No latency overhead beyond the extra output tokens. + +### Reflexion (Shinn et al., NeurIPS 2023) + +Multiple complete attempts. After each attempt, the model writes a textual critique +of what went wrong and stores it in episodic memory. Next attempt is conditioned on +that memory. + +``` +attempt 1 → fail → write critique → attempt 2 (reads critique) → ... +``` + +Key results (GPT-4): HumanEval 80% → 91% pass@1. +Cost: N complete task executions. At 30s/attempt, 5 trials = 2.5 minutes. +Implementation: https://github.com/noahshinn/reflexion + +### Reflection Loop (in-turn revision) + +Within a single turn: generate → critique → revise → [repeat]. +Simpler than Reflexion. More common in practice. + +``` +Generate → Critique → Revise → [stop condition] +``` + +Stop condition options: max iterations, score threshold, external verifier passes. + +### ReAct + Reflect + +Standard ReAct (Reason + Act) with an added Reflect step after failed tool calls. +Most common production pattern. Adds 1-3 extra LLM calls per failed action. + +### Tree of Thoughts (ToT) + +Explore N reasoning branches simultaneously, evaluate each node, BFS/DFS search. +Branching factor 3, depth 3 = 54 LLM calls per problem. Prohibitive for local models. +Works only if the model has strong self-evaluation capability (typically ≥32B). +ToTRL-trained Qwen3-8B achieved 0.633 on AIME 2025 — but required training-time RL, not +a prompt trick. + +### Graph of Thoughts (GoT) + +Generalizes ToT to arbitrary DAGs: thoughts can merge, split, or loop. +62% improvement in sorting vs ToT, 31% cost reduction. +Implementation: https://github.com/spcl/graph-of-thoughts +Higher complexity than ToT; graph structure is problem-specific. + +--- + +## Native Thinking Tokens (vs Programmatic Reflection) + +Open models with built-in reasoning scratchpads: + +| Model | Size | Ollama | Toggle | Notes | +|-------|------|--------|--------|-------| +| Qwen3 | 0.6B–235B | Yes | enable_thinking / think=True/False | Best option for local use | +| Qwen3-4B-Thinking-2507 | 4B | Yes | Always on | Dedicated thinking variant | +| QwQ-32B | 32B | Yes | Always on | Strong reasoning, needs VRAM | +| DeepSeek-R1 distills | 1.5B–70B | Yes | Always on | Llama/Qwen base | + +### Qwen3 thinking toggle in Ollama / LangChain + +```python +# LangChain +model = ChatOllama(model="qwen3:4b", think=True, num_ctx=8192) + +# Prompt-level (Ollama API) +# /think — enable per-request +# /no_think — disable per-request +``` + +Latency: thinking mode is 2-3x slower in wall-clock time (model generates internal +`...` tokens before answering). Qwen3-VL 8B Thinking: 262s vs 65s on a +complex visual reasoning task — but meaningfully better output. + +### Native thinking vs programmatic loop + +| | Native thinking | Programmatic multi-stage | +|--|----------------|------------------------| +| API calls | 1 | N (rounds × 2) | +| Implementation | Zero | Significant | +| Quality on 4-8B | Good (capability in weights) | Poor (weak model critiques itself) | +| Transparency | Opaque (one streamed block) | Inspectable per stage | +| Controllability | thinking_budget only | Full control | +| Latency | 2-3x tokens, 1 call | N × base latency | + +**For local 4-8B: native thinking almost always beats a hand-coded reflection loop.** + +--- + +## Does Programmatic Reflection Work on Small Models? + +Short answer: **mostly no without external verification**. + +From the research (2024-2025): + +- **"When Hindsight is Not 20/20" (arXiv 2404.09129)**: Self-reflection often makes + small models worse. A model that generated an error also lacks the capability to + identify it. It confidently accepts flawed reasoning on re-reading. + +- **THINKSLM (EMNLP 2025)**: Inference-time self-critique on Llama-3.1-8B is unreliable. + Training-time distilled reasoning traces help; prompt-based self-critique does not. + +- **Nature 2025 study**: Large gains (GPT-4: +18.5pp) diminish sharply for smaller models. + +- **Latency cost**: Each reflection round on a local 8B adds 5-30s. A 3-round loop + = 3x latency for 0-5% gain (or regression) on most tasks. + +### When it actually helps on small models + +1. **External verifier**: model doesn't self-evaluate — it reads objective pass/fail + feedback (unit tests, JSON schema checker, math verifier, search result grader). + Most reliable pattern. No self-evaluation capability required. + +2. **Stronger critic**: generate with 4B, critique with 32B or API model. Hybrid approach. + +3. **Native thinking weights**: reflection happens in a single forward pass with + trained weights. Far more reliable than prompt-based self-critique. + +4. **Structured error types**: code syntax, JSON validity, regex match — computable + error signal, not linguistic self-assessment. + +--- + +## LangGraph Reflection Loop Implementation + +LangGraph is suited for this because it supports cyclic graphs with state. + +### Minimal reflection graph + +```python +from langgraph.graph import StateGraph, START, END, MessagesState +from langchain_ollama import ChatOllama + +llm = ChatOllama(model="qwen3:4b", think=False) +critic_llm = ChatOllama(model="qwen3:4b", think=True) # or stronger model + +MAX_REFLECTIONS = 2 + +def generate(state): + response = llm.invoke(state["messages"]) + return {"messages": [response], "iterations": state.get("iterations", 0)} + +def reflect(state): + critique = critic_llm.invoke( + [{"role": "system", "content": "Critique this response. Be specific about errors."}] + + state["messages"] + ) + return { + "messages": [{"role": "user", "content": critique.content}], + "iterations": state["iterations"] + 1, + } + +def should_reflect(state) -> str: + if state.get("iterations", 0) >= MAX_REFLECTIONS: + return END + # Optionally: check external verifier here + return "reflect" + +graph = StateGraph(MessagesState) +graph.add_node("generate", generate) +graph.add_node("reflect", reflect) +graph.add_edge(START, "generate") +graph.add_conditional_edges("generate", should_reflect) +graph.add_edge("reflect", "generate") +agent = graph.compile() +``` + +### Self-Correcting RAG (CRAG pattern) + +``` +Retrieve → Grade documents → [rewrite query if bad] → Generate → Grade answer → [loop or END] +``` + +The document grader and answer grader are the "external verifiers" — they do +objective quality checks rather than linguistic self-critique. +LangChain tutorial: https://learnopencv.com/langgraph-self-correcting-agent-code-generation/ + +--- + +## Alternative Tooling + +### DSPy (recommended for pipeline optimization) + +DSPy treats prompts as learnable parameters. Define input/output signatures, run an +optimizer on examples, and DSPy auto-tunes prompts for your specific model. + +```python +import dspy +lm = dspy.LM('ollama_chat/qwen3:4b', api_base='http://localhost:11434', api_key='') +dspy.configure(lm=lm) + +class Reflect(dspy.Module): + def __init__(self): + self.gen = dspy.ChainOfThought("question -> answer") + self.critique = dspy.ChainOfThought("question, answer -> critique, improved_answer") + def forward(self, question): + first = self.gen(question=question) + return self.critique(question=question, answer=first.answer).improved_answer +``` + +Works with Ollama. Optimizer (BootstrapFewShot, MIPRO) tunes prompts automatically +but requires multiple LLM calls per training example — slow on local hardware. + +### Outlines (structured output) + +Constrained decoding — guarantees valid JSON/regex output from any model. +Use this inside a reflection loop to ensure the critic always returns structured feedback. +Works with Ollama via OpenAI-compatible API. +https://dottxt-ai.github.io/outlines/ + +### SGLang + +High-performance GPU serving runtime (replaces Ollama for GPU inference). +Natively understands `...` tokens, caches KV-prefix across reflection +rounds (RadixAttention). If you replace Ollama with SGLang: reflection loops become +significantly cheaper because repeated prompt prefixes are cache-hit. +https://github.com/sgl-project/sglang + +--- + +## Benchmarks Summary + +| Setup | Task | Quality Gain | Latency Cost | +|-------|------|-------------|-------------| +| GPT-4 + Reflexion | HumanEval | +11pp (80→91%) | ~5x | +| GPT-4 + reflection | Problem solving | +18.5pp | ~3x | +| Llama-7B + programmatic self-critique | Math | +7.1% | ~3x | +| Local 8B + same-model critique (typical) | General | 0-5% (often regression) | 2-3x | +| Qwen3-8B + native thinking | AIME 2025 | Matches models 10x larger | 2-3x tokens | +| Any model + external verifier (tests) | Code | +15-26pp | 1.5-2x | + +--- + +## Practical Recommendations for Adolf (local qwen3:4b / 8b) + +| Goal | Approach | Cost | +|------|----------|------| +| Better reasoning on hard questions | `think=True` in ChatOllama | 2-3x latency, zero code | +| Code/JSON correctness | External verifier (schema check, exec) + retry loop | +1 LLM call on failure | +| Complex multi-step tasks | Route to qwen3:8b with `think=True` | model swap + 2-3x tokens | +| Full reflection loop | Only if using stronger critic model or external verifier | significant complexity | +| Avoid | Programmatic self-critique using same 4-8B model as critic | adds latency, no gain | + +--- + +## References + +- Reflexion (Shinn et al., NeurIPS 2023): https://arxiv.org/abs/2303.11366 +- Tree of Thoughts: https://arxiv.org/abs/2305.10601 +- ToTRL (Qwen3 RL training): https://arxiv.org/html/2505.12717v1 +- Graph of Thoughts: https://arxiv.org/abs/2308.09687 +- Adaptive GoT (2025): https://arxiv.org/pdf/2502.05078 +- When Hindsight is Not 20/20: https://arxiv.org/html/2404.09129v1 +- THINKSLM (EMNLP 2025): https://aclanthology.org/2025.emnlp-main.1659.pdf +- MAR — Multi-Agent Reflexion: https://arxiv.org/html/2512.20845 +- Qwen3 technical report: https://arxiv.org/pdf/2505.09388 +- Qwen3 thinking cost measurement: https://medium.com/@frankmorales_91352/the-computational-cost-of-cognitive-depth-qwen3-vl-8b-instruct-vs-thinking-2517b677ba29 +- DeepSeek-R1: https://arxiv.org/abs/2501.12948 +- LangChain reflection blog: https://blog.langchain.com/reflection-agents/ +- LangGraph CRAG: https://learnopencv.com/langgraph-self-correcting-agent-code-generation/ +- DSPy: https://dspy.ai/ +- Outlines: https://dottxt-ai.github.io/outlines/ +- SGLang: https://github.com/sgl-project/sglang +- graph-of-thoughts: https://github.com/spcl/graph-of-thoughts +- reflexion (original code): https://github.com/noahshinn/reflexion diff --git a/router.py b/router.py index 31bce20..85b0012 100644 --- a/router.py +++ b/router.py @@ -32,6 +32,8 @@ LIGHT = answerable from general knowledge, no internet needed: MEDIUM = requires web search or the user's stored memories: current weather / today's news / Bitcoin price / what did we talk about + what is my name / where do I live / what is my job / do I have any pets + what do you know about me / what are my preferences / what did I tell you COMPLEX = /think prefix only: /think compare frameworks / /think plan a trip diff --git a/test_pipeline.py b/test_pipeline.py index c696a41..034720d 100644 --- a/test_pipeline.py +++ b/test_pipeline.py @@ -13,16 +13,19 @@ Tests: 8. Name recall — "what is your name?" → reply contains 9. Timing profile + bottleneck report 10. Easy benchmark — 10 easy questions → all must route to light - 11. Medium benchmark — 10 medium questions → must route to medium (or light, never complex) + 11. Medium benchmark — 11 medium questions → must route to medium (or light, never complex) 12. Hard benchmark — 10 /think questions → all must route to complex; VRAM flush verified + 13. Memory benchmark — store 5 facts, recall with 10 questions → verify keyword presence + 14. Dedup test — same fact sent twice → Qdrant must not grow by 2 Usage: python3 test_pipeline.py [--chat-id CHAT_ID] - [--bench-only] skip sections 1-9, run 10+11+12 - [--easy-only] skip 1-9 and 11+12, run only section 10 - [--medium-only] skip 1-9 and 10+12, run only section 11 - [--hard-only] skip 1-9 and 10+11, run only section 12 - [--no-bench] skip sections 10-12 + [--bench-only] skip sections 1-9, run 10+11+12+13 + [--easy-only] skip 1-9 and 11+12+13, run only section 10 + [--medium-only] skip 1-9 and 10+12+13, run only section 11 + [--hard-only] skip 1-9 and 10+11+13, run only section 12 + [--memory-only] skip 1-9 and 10+11+12, run only section 13 + [--no-bench] skip sections 10-13 Timing is extracted from deepagents container logs, not estimated from sleeps. """ @@ -79,6 +82,7 @@ BENCHMARK = { "do you remember what we talked about before?", "search for the best coffee shops in Tokyo", "what is happening in the tech industry this week?", + "what's the weather like today?", ], "hard": [ "/think compare the top 3 Python web frameworks (Django, FastAPI, Flask) and recommend one for a production REST API", @@ -187,18 +191,13 @@ def parse_run_block(lines, msg_prefix): reply_data = None for j, line in enumerate(block): - # Track last non-tool AIMessage (the final reply) + # Track last non-tool AIMessage (the final reply) — truncated at 150 chars in logs, + # used only as fallback if reply_text line is absent (older server versions) if "AIMessage:" in line and "→" not in line: txt = line.split("AIMessage:", 1)[-1].strip() if txt: last_ai_text = txt - # For light tier: router reply is stored in _conversation_buffers directly - # so there may be no AIMessage log — grab from tier=light line - if "[agent] tier=light" in line and "message=" in line: - # Extract preview text logged elsewhere if available - pass - m = re.search(r"replied in ([\d.]+)s \(llm=([\d.]+)s, send=([\d.]+)s\)", line) if m: # Extract optional tier tag at end of line @@ -209,13 +208,21 @@ def parse_run_block(lines, msg_prefix): "llm": float(m.group(2)), "send": float(m.group(3)), "tier": tier, - "reply_text": last_ai_text, + "reply_text": last_ai_text, # may be overwritten by reply_text line below "memory_s": None, "memory_error": False, "_j": j, } break + # Read full reply_text from dedicated log line (written immediately after replied-in line) + if reply_data is not None: + next_lines = block[reply_data["_j"] + 1: reply_data["_j"] + 3] + for line in next_lines: + if line.startswith("[agent] reply_text:"): + reply_data["reply_text"] = line[len("[agent] reply_text:"):].strip() + break + if reply_data is None: return None # reply not in logs yet @@ -281,16 +288,19 @@ parser.add_argument("--medium-only", action="store_true", help="Skip sections 1-9 and 10, run only section 11 (medium benchmark)") parser.add_argument("--hard-only", action="store_true", help="Skip sections 1-9 and 10+11, run only section 12 (hard benchmark)") +parser.add_argument("--memory-only", action="store_true", + help="Skip sections 1-9 and 10+11+12, run only section 13 (memory benchmark)") parser.add_argument("--no-bench", action="store_true", - help="Skip sections 10-12 (all benchmarks)") + help="Skip sections 10-13 (all benchmarks)") args = parser.parse_args() CHAT_ID = args.chat_id # Derived flags for readability -_skip_pipeline = args.bench_only or args.easy_only or args.medium_only or args.hard_only -_run_easy = not args.no_bench and not args.medium_only and not args.hard_only -_run_medium = not args.no_bench and not args.easy_only and not args.hard_only -_run_hard = not args.no_bench and not args.easy_only and not args.medium_only +_skip_pipeline = args.bench_only or args.easy_only or args.medium_only or args.hard_only or args.memory_only +_run_easy = not args.no_bench and not args.medium_only and not args.hard_only and not args.memory_only +_run_medium = not args.no_bench and not args.easy_only and not args.hard_only and not args.memory_only +_run_hard = not args.no_bench and not args.easy_only and not args.medium_only and not args.memory_only +_run_memory = not args.no_bench and not args.easy_only and not args.medium_only and not args.hard_only random_name = random.choice(NAMES) @@ -880,6 +890,263 @@ if _run_hard: ) +# ── 13. Memory benchmark — store facts, recall with keyword verification ─────── +if _run_memory: + _mem_name = random.choice([ + "Alice", "Bruno", "Camille", "Diego", "Elena", + "Farid", "Greta", "Hiroshi", "Irina", "Jonas", + ]) + _mem_city = random.choice([ + "Tokyo", "Berlin", "Cairo", "Sydney", "Oslo", + "Nairobi", "Lisbon", "Seoul", "Montreal", "Bangkok", + ]) + _mem_allergy = random.choice(["nuts", "gluten", "dairy", "shellfish", "eggs"]) + _mem_job = random.choice([ + ("software engineer", "startup"), + ("data scientist", "research lab"), + ("product manager", "tech company"), + ("DevOps engineer", "cloud provider"), + ]) + _mem_lang = random.choice(["Python", "Rust", "Go", "TypeScript", "Kotlin"]) + _mem_pet_name = random.choice([ + "Whiskers", "Biscuit", "Mango", "Pebble", "Shadow", + "Noodle", "Cheddar", "Cosmo", "Pippin", "Ziggy", + ]) + + print(f"\n[{INFO}] 13. Memory benchmark") + print(f" name={_mem_name} city={_mem_city} allergy={_mem_allergy} " + f"job={_mem_job[0]}@{_mem_job[1]} lang={_mem_lang} pet={_mem_pet_name}") + print(f" Storing 5 facts, then querying with 10 recall questions") + print(f" Chat ID: {CHAT_ID}") + print() + + # Wipe Qdrant collection and restart openmemory to eliminate stale data interference. + # Deleting the collection alone causes 404s — openmemory holds a live reference to it. + try: + import urllib.request as _ur + _req = _ur.Request(f"{QDRANT}/collections/adolf_memories", method="DELETE") + with _ur.urlopen(_req, timeout=5): + pass + print(f" [{INFO}] Wiped adolf_memories collection") + except Exception as e: + print(f" [{WARN}] Could not wipe collection: {e}") + + try: + subprocess.run( + ["docker", "compose", "-f", COMPOSE_FILE, "restart", "openmemory"], + capture_output=True, timeout=30, + ) + time.sleep(6) # wait for openmemory to reinitialize and recreate collection + print(f" [{INFO}] Restarted openmemory — fresh collection ready") + except Exception as e: + print(f" [{WARN}] Could not restart openmemory: {e}") + + MEMORY_FACTS = [ + f"My name is {_mem_name} and I live in {_mem_city}", + f"I prefer vegetarian food and I'm allergic to {_mem_allergy}", + f"I work as a {_mem_job[0]} at a {_mem_job[1]}", + f"My favorite programming language is {_mem_lang}", + f"I have a cat named {_mem_pet_name}", + ] + + MEMORY_RECALLS = [ + # (question, [keywords that must appear in reply]) + ("What is my name?", [_mem_name.lower()]), + ("Where do I live?", [_mem_city.lower()]), + ("Do I have any food allergies?", [_mem_allergy.lower()]), + ("What is my job?", [_mem_job[0].split()[0].lower()]), + ("What programming language do I prefer?", [_mem_lang.lower()]), + ("Do I have any pets?", [_mem_pet_name.lower()]), + ("Am I vegetarian or do I eat meat?", ["vegetarian"]), + ("What city am I in?", [_mem_city.lower()]), + ("Tell me what you know about me", [_mem_name.lower(), _mem_city.lower()]), + ("What's the name of my pet?", [_mem_pet_name.lower()]), + ] + + MEMORY_STORE_TIMEOUT = 180 # seconds per fact + MEMORY_RECALL_TIMEOUT = 180 # seconds per question + + # ── Store facts ────────────────────────────────────────────────────────── + print(f" Storing {len(MEMORY_FACTS)} facts...") + store_ok = 0 + for i, fact in enumerate(MEMORY_FACTS, 1): + print(f" [mem-store-{i:02d}] {fact!r}") + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": fact, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + print(f" → [{FAIL}] POST returned {status}") + continue + except Exception as e: + print(f" → [{FAIL}] POST error: {e}") + continue + + found = wait_for(f"mem-store-{i:02d}", fact, timeout_s=MEMORY_STORE_TIMEOUT, need_memory=True) + if found: + store_ok += 1 + print(f" → [{PASS}] stored tier={found['tier']} mem={found['memory_s']}s") + else: + print(f" → [{FAIL}] timeout") + + report(f"All memory facts stored ({store_ok}/{len(MEMORY_FACTS)})", + store_ok == len(MEMORY_FACTS)) + + # Wait for async memory extraction to settle — poll Qdrant until point count stabilises + print(f"\n Waiting for memory extraction to settle (up to 60s)...") + _prev_count = -1 + _stable_ticks = 0 + for _ in range(30): + time.sleep(2) + try: + _, body = get(f"{QDRANT}/collections/adolf_memories") + _cur_count = json.loads(body).get("result", {}).get("points_count", 0) + except Exception: + _cur_count = _prev_count + if _cur_count == _prev_count: + _stable_ticks += 1 + if _stable_ticks >= 3: # stable for 6s + break + else: + _stable_ticks = 0 + _prev_count = _cur_count + print(f" Memory settled: {_cur_count} points in Qdrant") + + # ── Recall questions ───────────────────────────────────────────────────── + print(f"\n Querying with {len(MEMORY_RECALLS)} recall questions...") + recall_results = [] # (question, keywords, reply_text, passed) + + for i, (question, keywords) in enumerate(MEMORY_RECALLS, 1): + print(f" [mem-recall-{i:02d}] {question!r}") + + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": question, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + print(f" → [{FAIL}] POST returned {status}") + recall_results.append((question, keywords, None, False)) + continue + except Exception as e: + print(f" → [{FAIL}] POST error: {e}") + recall_results.append((question, keywords, None, False)) + continue + + t_start = time.monotonic() + found = None + while time.monotonic() - t_start < MEMORY_RECALL_TIMEOUT: + since = int(time.monotonic() - t_start) + 30 + lines = fetch_logs(since_s=since) + found = parse_run_block(lines, question) + if found: + break + time.sleep(2) + + if not found: + print(f" → [{FAIL}] timeout") + recall_results.append((question, keywords, None, False)) + continue + + reply_text = (found.get("reply_text") or "").lower() + hit_keywords = [kw for kw in keywords if kw.lower() in reply_text] + passed = len(hit_keywords) == len(keywords) + tag_str = PASS if passed else WARN + missing = [kw for kw in keywords if kw.lower() not in reply_text] + detail = f"tier={found['tier']} lat={found['reply_total']:.1f}s" + if missing: + detail += f" missing keywords: {missing}" + print(f" → [{tag_str}] {detail}") + recall_results.append((question, keywords, found.get("reply_text"), passed)) + + time.sleep(1) + + # Summary + print(f"\n {'#':<4} {'Pass':<5} {'Question':<45} {'Keywords'}") + print(f" {'─'*4} {'─'*5} {'─'*45} {'─'*30}") + for idx, (q, kws, reply, ok) in enumerate(recall_results, 1): + ok_str = "✓" if ok else "✗" + print(f" {ok_str} {idx:<3} {'yes' if ok else 'no':<5} {q[:45]:<45} {kws}") + + recall_pass = sum(1 for _, _, _, ok in recall_results if ok) + total_recall = len(recall_results) + print(f"\n Memory recall score: {recall_pass}/{total_recall}") + + report(f"Memory recall ({recall_pass}/{total_recall} keywords found)", + recall_pass == total_recall, + f"{recall_pass}/{total_recall} questions had all expected keywords in reply") + + +# ── 14. Deduplication test — same fact stored twice must not grow Qdrant by 2 ─ +if _run_memory: + print(f"\n[{INFO}] 14. Memory deduplication test") + print(f" Sends the same fact twice — Qdrant point count must not increase by 2") + print(f" Chat ID: {CHAT_ID}") + print() + + DEDUP_TIMEOUT = 120 + + _dedup_fact = f"My lucky number is {random.randint(1000, 9999)}" + print(f" Fact: {_dedup_fact!r}") + + def _qdrant_count_dedup(): + try: + _, body = get(f"{QDRANT}/collections/adolf_memories") + return json.loads(body).get("result", {}).get("points_count", 0) + except Exception: + return 0 + + pts_before = _qdrant_count_dedup() + print(f" Qdrant points before: {pts_before}") + + # Send fact the first time + print(f" [dedup-1] sending fact (first time)") + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + report("Dedup: first POST accepted", False, f"status={status}") + else: + found1 = wait_for("dedup-1", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True) + if found1: + print(f" [dedup-1] stored tier={found1['tier']} mem={found1['memory_s']}s") + else: + print(f" [dedup-1] timeout") + except Exception as e: + report("Dedup: first POST accepted", False, str(e)) + found1 = None + + pts_after_first = _qdrant_count_dedup() + new_first = pts_after_first - pts_before + print(f" Qdrant after first send: {pts_before} → {pts_after_first} (+{new_first})") + + # Send exact same fact again + print(f" [dedup-2] sending same fact (second time)") + try: + status, _ = post_json(f"{DEEPAGENTS}/chat", + {"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5) + if status != 202: + report("Dedup: second POST accepted", False, f"status={status}") + else: + found2 = wait_for("dedup-2", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True) + if found2: + print(f" [dedup-2] stored tier={found2['tier']} mem={found2['memory_s']}s") + else: + print(f" [dedup-2] timeout") + except Exception as e: + report("Dedup: second POST accepted", False, str(e)) + + pts_after_second = _qdrant_count_dedup() + new_second = pts_after_second - pts_after_first + print(f" Qdrant after second send: {pts_after_first} → {pts_after_second} (+{new_second})") + + # Pass: second store added no MORE points than the first (NOOP or UPDATE, not ADD) + # If first send stored 0 points (fact too trivial), dedup is vacuously satisfied. + dedup_ok = new_second <= new_first + report( + "Deduplication: second identical fact not added to Qdrant", + dedup_ok, + f"first send: +{new_first} pts, second send: +{new_second} pts (want second ≤ first)", + ) + + # ── summary ─────────────────────────────────────────────────────────────────── print(f"\n{'─'*55}") total = len(results) diff --git a/wiki_research.py b/wiki_research.py new file mode 100644 index 0000000..f2e2def --- /dev/null +++ b/wiki_research.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python3 +""" +Wiki Research Pipeline — searches the web for each person/place in the family wiki. + +Uses Adolf's complex agent (/think prefix → qwen3:8b + web_search) to research +each subject and aggregates findings into research.md. + +Usage: + python3 wiki_research.py [--subject "Name"] [--dry-run] [--timeout 300] + [--output PATH] +""" + +import argparse +import json +import re +import sys +import time +import urllib.parse +import urllib.request +from datetime import datetime +from pathlib import Path + +# ── config ───────────────────────────────────────────────────────────────────── +GATEWAY = "http://localhost:8000" +WIKI_ROOT = Path("/mnt/ssd/dbs/otter/app-data/repository") +DEFAULT_OUTPUT = WIKI_ROOT / "research.md" + +PASS = "\033[32mPASS\033[0m" +FAIL = "\033[31mFAIL\033[0m" +INFO = "\033[36mINFO\033[0m" + + +# ── helpers ──────────────────────────────────────────────────────────────────── + +def post_message(text: str, session_id: str, timeout: int = 10) -> int: + payload = json.dumps({ + "text": text, + "session_id": session_id, + "channel": "cli", + "user_id": "wiki-pipeline", + }).encode() + req = urllib.request.Request( + f"{GATEWAY}/message", + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=timeout) as r: + return r.status + + +def wait_for_reply(label: str, session_id: str, timeout_s: int = 300) -> str | None: + """Open SSE stream on /reply/{session_id} and return reply text, or None on timeout.""" + req = urllib.request.Request( + f"{GATEWAY}/reply/{urllib.parse.quote(session_id, safe='')}", + headers={"Accept": "text/event-stream"}, + ) + t0 = time.monotonic() + tick = 0 + deadline = t0 + timeout_s + + # Show progress while waiting (SSE blocks until reply is ready) + print(f"\r [{label}] waiting... ", end="", flush=True) + + try: + with urllib.request.urlopen(req, timeout=timeout_s + 30) as r: + for raw_line in r: + elapsed = time.monotonic() - t0 + line = raw_line.decode("utf-8").rstrip("\n") + if line.startswith("data:"): + text = line[5:].strip().replace("\\n", "\n") + print(f"\r [{label}] done after {elapsed:.0f}s{' ' * 30}") + if text == "[timeout]": + return None + return text + tick += 1 + rem = int(deadline - time.monotonic()) + print(f"\r [{label}] {elapsed:.0f}s elapsed, {rem}s left — waiting... ", + end="", flush=True) + except Exception as e: + print(f"\r [{label}] SSE error: {e}{' ' * 30}") + + print(f"\r [{label}] TIMEOUT after {timeout_s}s{' ' * 30}") + return None + + +# ── wiki parsing ─────────────────────────────────────────────────────────────── + +def slugify(name: str) -> str: + s = name.lower() + s = re.sub(r"[^\w\s-]", "", s) + s = re.sub(r"\s+", "-", s.strip()) + return s[:60] + + +def parse_wiki_file(path: Path): + try: + text = path.read_text(encoding="utf-8") + except Exception: + return None + + lines = text.splitlines() + name = None + context_parts = [] + + for line in lines[:50]: + stripped = line.strip() + if not name and stripped.startswith("# "): + name = stripped[2:].strip() + continue + if name: + if stripped.startswith("[![") or stripped.startswith("!["): + continue + if stripped: + context_parts.append(stripped) + if len(context_parts) >= 20: + break + + if not name: + return None + return name, "\n".join(context_parts) + + +def discover_subjects(wiki_root: Path): + subjects = [] + for subdir in ["люди", "места"]: + folder = wiki_root / subdir + if not folder.exists(): + continue + for md_file in sorted(folder.glob("*.md")): + result = parse_wiki_file(md_file) + if result: + name, context = result + subjects.append((name, context, subdir)) + return subjects + + +# ── output ───────────────────────────────────────────────────────────────────── + +def load_existing_names(output_path: Path) -> set: + if not output_path.exists(): + return set() + return set(re.findall(r"^## (.+)$", output_path.read_text(encoding="utf-8"), re.MULTILINE)) + + +def init_output(output_path: Path, total: int): + if not output_path.exists(): + output_path.write_text( + f"# Wiki Research Results\n\n" + f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n" + f"Subjects: {total}\n\n---\n\n", + encoding="utf-8", + ) + + +def append_result(output_path: Path, name: str, elapsed: float, reply_text: str): + date_str = datetime.now().strftime("%Y-%m-%d") + block = ( + f"## {name}\n\n" + f"**Searched**: {date_str} **Elapsed**: {elapsed:.0f}s\n\n" + f"{reply_text or '_No reply captured._'}\n\n---\n\n" + ) + with open(output_path, "a", encoding="utf-8") as f: + f.write(block) + + +# ── research prompt ──────────────────────────────────────────────────────────── + +def build_prompt(name: str, context: str, subdir: str) -> str: + kind = "person" if subdir == "люди" else "place" + return ( + f"/think You are researching a {kind} for a private family wiki. " + f"Find everything publicly available. Be thorough and specific.\n\n" + f"**Subject**: {name}\n" + f"**Known context** (from the family wiki — do NOT just repeat this):\n{context}\n\n" + f"**Research instructions** (MUST follow exactly):\n" + f"1. Call web_search, then IMMEDIATELY call fetch_url on every URL found in results.\n" + f"2. You MUST call fetch_url at least 5 times — do not write the report until you have.\n" + f"3. Priority URLs to fetch: Google Scholar profile, ResearchGate, IEEE Xplore, LinkedIn, employer page.\n" + f"4. Run searches in English AND Russian/Latvian.\n" + f"5. After fetching pages, derive follow-up searches from what you find.\n\n" + f"**Output format** (required):\n" + f"- Use markdown with sections: Overview, Education, Career, Publications, " + f"Online Presence, Interesting Findings, Not Found\n" + f"- Every fact must have a source link: [fact](url)\n" + f"- Include actual URLs to profiles, papers, articles found\n" + f"- 'Interesting Findings': non-trivial facts not in the wiki context above\n" + f"- Last line must be: **Sources checked: N** (count of URLs you fetched with fetch_url)\n\n" + f'If truly nothing is found publicly, say "No public information found." ' + f"but only after exhausting all search angles." + ) + + +# ── main ─────────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Wiki research pipeline") + parser.add_argument("--subject", help="Single subject (substring match)") + parser.add_argument("--dry-run", action="store_true", help="Print prompts, don't send") + parser.add_argument("--timeout", type=int, default=300, help="Per-subject timeout (s)") + parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT, help="Output file") + args = parser.parse_args() + + subjects = discover_subjects(WIKI_ROOT) + if not subjects: + print(f"[{FAIL}] No subjects found in {WIKI_ROOT}") + sys.exit(1) + + print(f"[{INFO}] Discovered {len(subjects)} subjects") + + if args.subject: + needle = args.subject.lower() + subjects = [(n, c, s) for n, c, s in subjects if needle in n.lower()] + if not subjects: + print(f"[{FAIL}] No subject matching '{args.subject}'") + sys.exit(1) + print(f"[{INFO}] Filtered to {len(subjects)} subject(s)") + + if args.dry_run: + for name, context, subdir in subjects: + print(f"\n{'='*60}\nSUBJECT: {name} ({subdir})") + print(f"PROMPT:\n{build_prompt(name, context, subdir)}") + return + + init_output(args.output, len(subjects)) + existing = load_existing_names(args.output) + print(f"[{INFO}] Output: {args.output} ({len(existing)} already done)") + + total = len(subjects) + done = 0 + failed = [] + + for idx, (name, context, subdir) in enumerate(subjects, 1): + if name in existing: + print(f"[{idx}/{total}] SKIP {name} (already in output)") + done += 1 + continue + + prompt = build_prompt(name, context, subdir) + session_id = f"wiki-{slugify(name)}" + label = f"{idx}/{total}" + + print(f"\n[{label}] {name}") + + try: + status = post_message(prompt, session_id, timeout=10) + if status != 202: + print(f" [{FAIL}] Unexpected status {status}") + failed.append(name) + continue + except Exception as e: + print(f" [{FAIL}] POST failed: {e}") + failed.append(name) + continue + + t0 = time.monotonic() + reply_text = wait_for_reply(label, session_id, timeout_s=args.timeout) + elapsed = time.monotonic() - t0 + + if reply_text is None: + print(f" [{FAIL}] Timeout") + failed.append(name) + append_result(args.output, name, elapsed, "_Research timed out._") + continue + + print(f" [{PASS}] {elapsed:.0f}s — {len(reply_text)} chars") + append_result(args.output, name, elapsed, reply_text) + done += 1 + + print(f"\n{'='*60}") + print(f"Done: {done}/{total}") + if failed: + print(f"Failed ({len(failed)}): {', '.join(failed)}") + print(f"Output: {args.output}") + + +if __name__ == "__main__": + main()