From 87eb4fb765b4cffee0e40413515e40f02df34965 Mon Sep 17 00:00:00 2001 From: Alvis Date: Sun, 8 Mar 2026 07:06:07 +0000 Subject: [PATCH] =?UTF-8?q?Remove=20adolf=20=E2=80=94=20moved=20to=20separ?= =?UTF-8?q?ate=20repo=20(alvis/adolf)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- adolf/ARCHITECTURE.md | 118 --- adolf/Dockerfile | 10 - adolf/agent.py | 386 ---------- adolf/agent_factory.py | 19 - adolf/channels.py | 75 -- adolf/cli.py | 80 -- adolf/docker-compose.yml | 56 -- adolf/grammy/Dockerfile | 6 - adolf/grammy/bot.mjs | 56 -- adolf/grammy/package.json | 7 - adolf/hello_world.py | 21 - adolf/langgraph.md | 247 ------ adolf/openmemory/Dockerfile | 6 - adolf/openmemory/requirements.txt | 6 - adolf/openmemory/server.py | 155 ---- adolf/potential-directions.md | 13 - adolf/reasoning.md | 287 ------- adolf/router.py | 140 ---- adolf/test_pipeline.py | 1172 ----------------------------- adolf/vram_manager.py | 71 -- adolf/wiki_research.py | 278 ------- 21 files changed, 3209 deletions(-) delete mode 100644 adolf/ARCHITECTURE.md delete mode 100644 adolf/Dockerfile delete mode 100644 adolf/agent.py delete mode 100644 adolf/agent_factory.py delete mode 100644 adolf/channels.py delete mode 100644 adolf/cli.py delete mode 100644 adolf/docker-compose.yml delete mode 100644 adolf/grammy/Dockerfile delete mode 100644 adolf/grammy/bot.mjs delete mode 100644 adolf/grammy/package.json delete mode 100644 adolf/hello_world.py delete mode 100644 adolf/langgraph.md delete mode 100644 adolf/openmemory/Dockerfile delete mode 100644 adolf/openmemory/requirements.txt delete mode 100644 adolf/openmemory/server.py delete mode 100644 adolf/potential-directions.md delete mode 100644 adolf/reasoning.md delete mode 100644 adolf/router.py delete mode 100644 adolf/test_pipeline.py delete mode 100644 adolf/vram_manager.py delete mode 100644 adolf/wiki_research.py diff --git a/adolf/ARCHITECTURE.md b/adolf/ARCHITECTURE.md deleted file mode 100644 index 7c8871c..0000000 --- a/adolf/ARCHITECTURE.md +++ /dev/null @@ -1,118 +0,0 @@ -# Adolf - -Autonomous personal assistant with a multi-channel gateway. Three-tier model routing with GPU VRAM management. - -## Architecture - -``` -┌─────────────────────────────────────────────────────┐ -│ CHANNEL ADAPTERS │ -│ │ -│ [Telegram/Grammy] [CLI] [Voice — future] │ -│ ↕ ↕ ↕ │ -│ └────────────────┴────────────┘ │ -│ ↕ │ -│ ┌─────────────────────────┐ │ -│ │ GATEWAY (agent.py) │ │ -│ │ FastAPI :8000 │ │ -│ │ │ │ -│ │ POST /message │ ← all inbound │ -│ │ POST /chat (legacy) │ │ -│ │ GET /reply/{id} SSE │ ← CLI polling │ -│ │ GET /health │ │ -│ │ │ │ -│ │ channels.py registry │ │ -│ │ conversation buffers │ │ -│ └──────────┬──────────────┘ │ -│ ↓ │ -│ ┌──────────────────────┐ │ -│ │ AGENT CORE │ │ -│ │ three-tier routing │ │ -│ │ VRAM management │ │ -│ └──────────────────────┘ │ -│ ↓ │ -│ channels.deliver(session_id, channel, text)│ -│ ↓ ↓ │ -│ telegram → POST grammy/send cli → SSE queue │ -└─────────────────────────────────────────────────────┘ -``` - -## Channel Adapters - -| Channel | session_id | Inbound | Outbound | -|---------|-----------|---------|---------| -| Telegram | `tg-` | Grammy long-poll → POST /message | channels.py → POST grammy:3001/send | -| CLI | `cli-` | POST /message directly | GET /reply/{id} SSE stream | -| Voice | `voice-` | (future) | (future) | - -## Unified Message Flow - -``` -1. Channel adapter receives message -2. POST /message {text, session_id, channel, user_id} -3. 202 Accepted immediately -4. Background: run_agent_task(message, session_id, channel) -5. Route → run agent tier → get reply text -6. channels.deliver(session_id, channel, reply_text) - - always puts reply in pending_replies[session_id] queue (for SSE) - - calls channel-specific send callback -7. GET /reply/{session_id} SSE clients receive the reply -``` - -## Three-Tier Model Routing - -| Tier | Model | VRAM | Trigger | Latency | -|------|-------|------|---------|---------| -| Light | qwen2.5:1.5b (router answers) | ~1.2 GB | Router classifies as light | ~2–4s | -| Medium | qwen3:4b | ~2.5 GB | Default | ~20–40s | -| Complex | qwen3:8b | ~6.0 GB | `/think` prefix | ~60–120s | - -**`/think` prefix**: forces complex tier, stripped before sending to agent. - -## VRAM Management - -GTX 1070 — 8 GB. Ollama must be restarted if CUDA init fails (model loads on CPU). - -1. Flush explicitly before loading qwen3:8b (`keep_alive=0`) -2. Verify eviction via `/api/ps` poll (15s timeout) before proceeding -3. Fallback: timeout → run medium agent instead -4. Post-complex: flush 8b, pre-warm 4b + router - -## Session ID Convention - -- Telegram: `tg-` (e.g. `tg-346967270`) -- CLI: `cli-` (e.g. `cli-alvis`) - -Conversation history is keyed by session_id (5-turn buffer). - -## Files - -``` -adolf/ -├── docker-compose.yml Services: deepagents, openmemory, grammy -├── Dockerfile deepagents container (Python 3.12) -├── agent.py FastAPI gateway + three-tier routing -├── channels.py Channel registry + deliver() + pending_replies -├── router.py Router class — qwen2.5:1.5b routing -├── vram_manager.py VRAMManager — flush/prewarm/poll Ollama VRAM -├── agent_factory.py build_medium_agent / build_complex_agent -├── cli.py Interactive CLI REPL client -├── wiki_research.py Batch wiki research pipeline (uses /message + SSE) -├── .env TELEGRAM_BOT_TOKEN (not committed) -├── openmemory/ -│ ├── server.py FastMCP + mem0 MCP tools -│ └── Dockerfile -└── grammy/ - ├── bot.mjs grammY Telegram bot + POST /send HTTP endpoint - ├── package.json - └── Dockerfile -``` - -## External Services (from openai/ stack) - -| Service | Host Port | Role | -|---------|-----------|------| -| Ollama GPU | 11436 | All reply inference | -| Ollama CPU | 11435 | Memory embedding (nomic-embed-text) | -| Qdrant | 6333 | Vector store for memories | -| SearXNG | 11437 | Web search | diff --git a/adolf/Dockerfile b/adolf/Dockerfile deleted file mode 100644 index d81ee0c..0000000 --- a/adolf/Dockerfile +++ /dev/null @@ -1,10 +0,0 @@ -FROM python:3.12-slim - -WORKDIR /app - -RUN pip install --no-cache-dir deepagents langchain-ollama langgraph \ - fastapi uvicorn langchain-mcp-adapters langchain-community httpx - -COPY agent.py channels.py vram_manager.py router.py agent_factory.py hello_world.py . - -CMD ["uvicorn", "agent:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/adolf/agent.py b/adolf/agent.py deleted file mode 100644 index 8d415f9..0000000 --- a/adolf/agent.py +++ /dev/null @@ -1,386 +0,0 @@ -import asyncio -import os -import time -from contextlib import asynccontextmanager - -from fastapi import FastAPI, BackgroundTasks, Request -from fastapi.responses import JSONResponse, StreamingResponse -from pydantic import BaseModel - -import re as _re -import httpx as _httpx - -from langchain_ollama import ChatOllama -from langchain_mcp_adapters.client import MultiServerMCPClient -from langchain_community.utilities import SearxSearchWrapper -from langchain_core.tools import Tool - -from vram_manager import VRAMManager -from router import Router -from agent_factory import build_medium_agent, build_complex_agent -import channels - -OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") -ROUTER_MODEL = os.getenv("DEEPAGENTS_ROUTER_MODEL", "qwen2.5:0.5b") -MEDIUM_MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:4b") -COMPLEX_MODEL = os.getenv("DEEPAGENTS_COMPLEX_MODEL", "qwen3:8b") -SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437") -OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765") -CRAWL4AI_URL = os.getenv("CRAWL4AI_URL", "http://crawl4ai:11235") - -MAX_HISTORY_TURNS = 5 -_conversation_buffers: dict[str, list] = {} - -MEDIUM_SYSTEM_PROMPT = ( - "You are a helpful AI assistant. " - "Use web_search for questions about current events or facts you don't know. " - "Reply concisely." -) - -COMPLEX_SYSTEM_PROMPT = ( - "You are a deep research assistant. " - "web_search automatically fetches full page content from top results — use it 6+ times with different queries. " - "Also call fetch_url on any specific URL you want to read in full.\n\n" - "Run searches in English AND Russian/Latvian. " - "After getting results, run follow-up searches based on new facts found.\n\n" - "Write a structured markdown report with sections: " - "Overview, Education, Career, Publications, Online Presence, Interesting Findings.\n" - "Every fact must link to the real URL it came from: [fact](url). " - "NEVER invent URLs. End with: **Sources checked: N**" -) - -medium_agent = None -complex_agent = None -router: Router = None -vram_manager: VRAMManager = None -mcp_client = None - -# GPU mutex: one LLM inference at a time -_reply_semaphore = asyncio.Semaphore(1) - - -@asynccontextmanager -async def lifespan(app: FastAPI): - global medium_agent, complex_agent, router, vram_manager, mcp_client - - # Register channel adapters - channels.register_defaults() - - # Three model instances - router_model = ChatOllama( - model=ROUTER_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=4096, - temperature=0, - ) - medium_model = ChatOllama( - model=MEDIUM_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=8192 - ) - complex_model = ChatOllama( - model=COMPLEX_MODEL, base_url=OLLAMA_BASE_URL, think=True, num_ctx=16384 - ) - - vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL) - router = Router(model=router_model) - - mcp_connections = { - "openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"}, - } - mcp_client = MultiServerMCPClient(mcp_connections) - for attempt in range(12): - try: - mcp_tools = await mcp_client.get_tools() - break - except Exception as e: - if attempt == 11: - raise - print(f"[agent] MCP not ready (attempt {attempt + 1}/12): {e}. Retrying in 5s...") - await asyncio.sleep(5) - - agent_tools = [t for t in mcp_tools if t.name not in ("add_memory", "search_memory", "get_all_memories")] - - searx = SearxSearchWrapper(searx_host=SEARXNG_URL) - - def _crawl4ai_fetch(url: str) -> str: - """Fetch a URL via Crawl4AI (JS-rendered, bot-bypass) and return clean markdown.""" - try: - r = _httpx.post(f"{CRAWL4AI_URL}/crawl", json={"urls": [url]}, timeout=60) - r.raise_for_status() - results = r.json().get("results", []) - if not results or not results[0].get("success"): - return "" - md_obj = results[0].get("markdown") or {} - md = md_obj.get("raw_markdown") if isinstance(md_obj, dict) else str(md_obj) - return (md or "")[:5000] - except Exception as e: - return f"[fetch error: {e}]" - - def _search_and_read(query: str) -> str: - """Search the web and automatically fetch full content of top results. - Returns snippets + full page content from the top URLs.""" - import json as _json - # Get structured results from SearXNG - try: - r = _httpx.get( - f"{SEARXNG_URL}/search", - params={"q": query, "format": "json"}, - timeout=15, - ) - data = r.json() - items = data.get("results", [])[:5] - except Exception as e: - return f"[search error: {e}]" - - if not items: - return "No results found." - - out = [f"Search: {query}\n"] - for i, item in enumerate(items, 1): - url = item.get("url", "") - title = item.get("title", "") - snippet = item.get("content", "")[:300] - out.append(f"\n[{i}] {title}\nURL: {url}\nSnippet: {snippet}") - - # Auto-fetch top 2 URLs for full content - out.append("\n\n--- Full page content ---") - for item in items[:2]: - url = item.get("url", "") - if not url: - continue - content = _crawl4ai_fetch(url) - if content and not content.startswith("[fetch error"): - out.append(f"\n### {url}\n{content[:3000]}") - - return "\n".join(out) - - agent_tools.append(Tool( - name="web_search", - func=_search_and_read, - description=( - "Search the web and read full content of top results. " - "Returns search snippets AND full page text from the top URLs. " - "Use multiple different queries to research a topic thoroughly." - ), - )) - - def _fetch_url(url: str) -> str: - """Fetch and read the full text content of a URL.""" - content = _crawl4ai_fetch(url) - return content if content else "[fetch_url: empty or blocked]" - - agent_tools.append(Tool( - name="fetch_url", - func=_fetch_url, - description=( - "Fetch and read the full text content of a specific URL. " - "Use for URLs not covered by web_search. Input: a single URL string." - ), - )) - - medium_agent = build_medium_agent( - model=medium_model, - agent_tools=agent_tools, - system_prompt=MEDIUM_SYSTEM_PROMPT, - ) - complex_agent = build_complex_agent( - model=complex_model, - agent_tools=agent_tools, - system_prompt=COMPLEX_SYSTEM_PROMPT.format(user_id="{user_id}"), - ) - - print( - f"[agent] three-tier: router={ROUTER_MODEL} | medium={MEDIUM_MODEL} | complex={COMPLEX_MODEL}", - flush=True, - ) - print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True) - - yield - - medium_agent = None - complex_agent = None - router = None - vram_manager = None - mcp_client = None - - -app = FastAPI(lifespan=lifespan) - - -# ── request models ───────────────────────────────────────────────────────────── - -class InboundMessage(BaseModel): - text: str - session_id: str # e.g. "tg-346967270", "cli-alvis" - channel: str # "telegram" | "cli" - user_id: str = "" # human identity; defaults to session_id if empty - metadata: dict = {} - - -class ChatRequest(BaseModel): - """Legacy model — kept for test_pipeline.py compatibility.""" - message: str - chat_id: str - - -# ── helpers ──────────────────────────────────────────────────────────────────── - -def _extract_final_text(result) -> str | None: - msgs = result.get("messages", []) - for m in reversed(msgs): - if type(m).__name__ == "AIMessage" and getattr(m, "content", ""): - return m.content - if isinstance(result, dict) and result.get("output"): - return result["output"] - return None - - -def _log_messages(result): - msgs = result.get("messages", []) - for m in msgs: - role = type(m).__name__ - content = getattr(m, "content", "") - tool_calls = getattr(m, "tool_calls", []) - if content: - print(f"[agent] {role}: {str(content)[:150]}", flush=True) - for tc in tool_calls: - print(f"[agent] {role} → {tc['name']}({tc['args']})", flush=True) - - -# ── core task ────────────────────────────────────────────────────────────────── - -async def run_agent_task(message: str, session_id: str, channel: str = "telegram"): - print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True) - - force_complex = False - clean_message = message - if message.startswith("/think "): - force_complex = True - clean_message = message[len("/think "):] - print("[agent] /think prefix → force_complex=True", flush=True) - - async with _reply_semaphore: - t0 = time.monotonic() - history = _conversation_buffers.get(session_id, []) - print(f"[agent] running: {clean_message[:80]!r}", flush=True) - - tier, light_reply = await router.route(clean_message, history, force_complex) - print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True) - - final_text = None - try: - if tier == "light": - final_text = light_reply - llm_elapsed = time.monotonic() - t0 - print(f"[agent] light path: answered by router", flush=True) - - elif tier == "medium": - system_prompt = MEDIUM_SYSTEM_PROMPT - result = await medium_agent.ainvoke({ - "messages": [ - {"role": "system", "content": system_prompt}, - *history, - {"role": "user", "content": clean_message}, - ] - }) - llm_elapsed = time.monotonic() - t0 - _log_messages(result) - final_text = _extract_final_text(result) - - else: # complex - ok = await vram_manager.enter_complex_mode() - if not ok: - print("[agent] complex→medium fallback (eviction timeout)", flush=True) - tier = "medium" - result = await medium_agent.ainvoke({ - "messages": [ - {"role": "system", "content": MEDIUM_SYSTEM_PROMPT}, - *history, - {"role": "user", "content": clean_message}, - ] - }) - else: - system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id) - result = await complex_agent.ainvoke({ - "messages": [ - {"role": "system", "content": system_prompt}, - *history, - {"role": "user", "content": clean_message}, - ] - }) - asyncio.create_task(vram_manager.exit_complex_mode()) - - llm_elapsed = time.monotonic() - t0 - _log_messages(result) - final_text = _extract_final_text(result) - - except Exception as e: - import traceback - llm_elapsed = time.monotonic() - t0 - print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True) - traceback.print_exc() - - # Deliver reply through the originating channel - if final_text: - t1 = time.monotonic() - await channels.deliver(session_id, channel, final_text) - send_elapsed = time.monotonic() - t1 - print( - f"[agent] replied in {time.monotonic() - t0:.1f}s " - f"(llm={llm_elapsed:.1f}s, send={send_elapsed:.1f}s) tier={tier}", - flush=True, - ) - print(f"[agent] reply_text: {final_text}", flush=True) - else: - print("[agent] warning: no text reply from agent", flush=True) - - # Update conversation buffer - if final_text: - buf = _conversation_buffers.get(session_id, []) - buf.append({"role": "user", "content": clean_message}) - buf.append({"role": "assistant", "content": final_text}) - _conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):] - - -# ── endpoints ────────────────────────────────────────────────────────────────── - -@app.post("/message") -async def message(request: InboundMessage, background_tasks: BackgroundTasks): - """Unified inbound endpoint for all channels.""" - if medium_agent is None: - return JSONResponse(status_code=503, content={"error": "Agent not ready"}) - session_id = request.session_id - channel = request.channel - background_tasks.add_task(run_agent_task, request.text, session_id, channel) - return JSONResponse(status_code=202, content={"status": "accepted"}) - - -@app.post("/chat") -async def chat(request: ChatRequest, background_tasks: BackgroundTasks): - """Legacy endpoint — maps chat_id to tg- session for backward compatibility.""" - if medium_agent is None: - return JSONResponse(status_code=503, content={"error": "Agent not ready"}) - session_id = f"tg-{request.chat_id}" - background_tasks.add_task(run_agent_task, request.message, session_id, "telegram") - return JSONResponse(status_code=202, content={"status": "accepted"}) - - -@app.get("/reply/{session_id}") -async def reply_stream(session_id: str): - """ - SSE endpoint — streams the reply for a session once available, then closes. - Used by CLI client and wiki_research.py instead of log polling. - """ - q = channels.pending_replies.setdefault(session_id, asyncio.Queue()) - - async def event_generator(): - try: - text = await asyncio.wait_for(q.get(), timeout=900) - # Escape newlines so entire reply fits in one SSE data line - yield f"data: {text.replace(chr(10), '\\n').replace(chr(13), '')}\n\n" - except asyncio.TimeoutError: - yield "data: [timeout]\n\n" - - return StreamingResponse(event_generator(), media_type="text/event-stream") - - -@app.get("/health") -async def health(): - return {"status": "ok", "agent_ready": medium_agent is not None} diff --git a/adolf/agent_factory.py b/adolf/agent_factory.py deleted file mode 100644 index 6182ff4..0000000 --- a/adolf/agent_factory.py +++ /dev/null @@ -1,19 +0,0 @@ -from deepagents import create_deep_agent - - -def build_medium_agent(model, agent_tools: list, system_prompt: str): - """Medium agent: create_deep_agent with TodoList planning, no subagents.""" - return create_deep_agent( - model=model, - tools=agent_tools, - system_prompt=system_prompt, - ) - - -def build_complex_agent(model, agent_tools: list, system_prompt: str): - """Complex agent: direct agentic loop — calls web_search/fetch_url itself, no subagent indirection.""" - return create_deep_agent( - model=model, - tools=agent_tools, - system_prompt=system_prompt, - ) diff --git a/adolf/channels.py b/adolf/channels.py deleted file mode 100644 index 97c50c5..0000000 --- a/adolf/channels.py +++ /dev/null @@ -1,75 +0,0 @@ -""" -Channel registry and reply delivery for the Adolf multi-channel gateway. - -Each channel registers an async send callback: - channels.register("telegram", telegram_send) - channels.register("cli", cli_send) - -When the agent is done, it calls: - await channels.deliver(session_id, channel, text) - -Replies are also placed into per-session asyncio Queues so the -GET /reply/{session_id} SSE endpoint can stream them to polling clients. -""" - -import asyncio -import os -from typing import Awaitable, Callable - -import httpx - -# ── channel registry ────────────────────────────────────────────────────────── - -_callbacks: dict[str, Callable[[str, str], Awaitable[None]]] = {} -# session_id → Queue that receives the final reply text -pending_replies: dict[str, asyncio.Queue] = {} - - -def register(channel: str, callback: Callable[[str, str], Awaitable[None]]) -> None: - """Register an async send callback for a channel name.""" - _callbacks[channel] = callback - - -async def deliver(session_id: str, channel: str, text: str) -> None: - """ - Deliver a reply to the originating channel AND put it in the pending - queue so SSE /reply/{session_id} clients get it. - """ - # Always enqueue first so SSE listeners aren't missed - q = pending_replies.setdefault(session_id, asyncio.Queue()) - await q.put(text) - - cb = _callbacks.get(channel) - if cb: - await cb(session_id, text) - else: - print(f"[channels] no callback for channel={channel!r}, reply queued only", flush=True) - - -# ── built-in channel adapters ───────────────────────────────────────────────── - -GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001") - - -async def _telegram_send(session_id: str, text: str) -> None: - """Send reply to Telegram via Grammy's POST /send endpoint.""" - chat_id = session_id.removeprefix("tg-") - MAX_TG = 4000 - chunks = [text[i:i + MAX_TG] for i in range(0, len(text), MAX_TG)] - async with httpx.AsyncClient(timeout=15) as client: - for chunk in chunks: - await client.post( - f"{GRAMMY_URL}/send", - json={"chat_id": chat_id, "text": chunk}, - ) - - -async def _cli_send(session_id: str, text: str) -> None: - """CLI replies are handled entirely through the pending_replies queue — no-op here.""" - pass - - -def register_defaults() -> None: - """Register the built-in Telegram and CLI channel adapters.""" - register("telegram", _telegram_send) - register("cli", _cli_send) diff --git a/adolf/cli.py b/adolf/cli.py deleted file mode 100644 index 4cb909a..0000000 --- a/adolf/cli.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env python3 -""" -Adolf CLI — interactive REPL for the multi-channel gateway. - -Usage: - python3 cli.py [--url http://localhost:8000] [--session cli-alvis] -""" - -import argparse -import json -import os -import sys -import urllib.request - -GATEWAY = "http://localhost:8000" - - -def post_message(gateway: str, text: str, session_id: str) -> None: - payload = json.dumps({ - "text": text, - "session_id": session_id, - "channel": "cli", - "user_id": os.getlogin(), - }).encode() - req = urllib.request.Request( - f"{gateway}/message", - data=payload, - headers={"Content-Type": "application/json"}, - method="POST", - ) - with urllib.request.urlopen(req, timeout=10) as r: - if r.status != 202: - print(f"[error] gateway returned {r.status}", file=sys.stderr) - sys.exit(1) - - -def wait_for_reply(gateway: str, session_id: str, timeout: int = 400) -> str: - """Open SSE stream and return first data event.""" - req = urllib.request.Request( - f"{gateway}/reply/{session_id}", - headers={"Accept": "text/event-stream"}, - ) - with urllib.request.urlopen(req, timeout=timeout + 5) as r: - for raw_line in r: - line = raw_line.decode("utf-8").rstrip("\n") - if line.startswith("data:"): - return line[5:].strip().replace("\\n", "\n") - return "" - - -def main(): - parser = argparse.ArgumentParser(description="Adolf CLI") - parser.add_argument("--url", default=GATEWAY, help="Gateway URL") - parser.add_argument("--session", default=f"cli-{os.getlogin()}", help="Session ID") - parser.add_argument("--timeout", type=int, default=400, help="Reply timeout (seconds)") - args = parser.parse_args() - - print(f"Adolf CLI (session={args.session}, gateway={args.url})") - print("Type your message and press Enter. Ctrl+C or Ctrl+D to exit.\n") - - try: - while True: - try: - text = input("> ").strip() - except EOFError: - break - if not text: - continue - - post_message(args.url, text, args.session) - print("...", end="", flush=True) - reply = wait_for_reply(args.url, args.session, timeout=args.timeout) - print(f"\r{reply}\n") - - except KeyboardInterrupt: - print("\nbye") - - -if __name__ == "__main__": - main() diff --git a/adolf/docker-compose.yml b/adolf/docker-compose.yml deleted file mode 100644 index 6851e19..0000000 --- a/adolf/docker-compose.yml +++ /dev/null @@ -1,56 +0,0 @@ -services: - deepagents: - build: . - container_name: deepagents - ports: - - "8000:8000" - environment: - - PYTHONUNBUFFERED=1 - - OLLAMA_BASE_URL=http://host.docker.internal:11436 - - DEEPAGENTS_MODEL=qwen3:4b - - DEEPAGENTS_COMPLEX_MODEL=qwen3:8b - - DEEPAGENTS_ROUTER_MODEL=qwen2.5:1.5b - - SEARXNG_URL=http://host.docker.internal:11437 - - GRAMMY_URL=http://grammy:3001 - - CRAWL4AI_URL=http://crawl4ai:11235 - extra_hosts: - - "host.docker.internal:host-gateway" - depends_on: - - openmemory - - grammy - - crawl4ai - restart: unless-stopped - - openmemory: - build: ./openmemory - container_name: openmemory - ports: - - "8765:8765" - environment: - # Extraction LLM (qwen2.5:1.5b) runs on GPU after reply — fast 2-5s extraction - - OLLAMA_GPU_URL=http://host.docker.internal:11436 - # Embedding (nomic-embed-text) runs on CPU — fast enough for search (50-150ms) - - OLLAMA_CPU_URL=http://host.docker.internal:11435 - extra_hosts: - - "host.docker.internal:host-gateway" - restart: unless-stopped - - grammy: - build: ./grammy - container_name: grammy - ports: - - "3001:3001" - environment: - - TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN} - - DEEPAGENTS_URL=http://deepagents:8000 - restart: unless-stopped - - crawl4ai: - image: unclecode/crawl4ai:latest - container_name: crawl4ai - ports: - - "11235:11235" - environment: - - CRAWL4AI_LOG_LEVEL=WARNING - shm_size: "1g" - restart: unless-stopped diff --git a/adolf/grammy/Dockerfile b/adolf/grammy/Dockerfile deleted file mode 100644 index c55d6a5..0000000 --- a/adolf/grammy/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM node:22-alpine -WORKDIR /app -COPY package.json . -RUN npm install -COPY bot.mjs . -CMD ["node", "bot.mjs"] diff --git a/adolf/grammy/bot.mjs b/adolf/grammy/bot.mjs deleted file mode 100644 index 51238fb..0000000 --- a/adolf/grammy/bot.mjs +++ /dev/null @@ -1,56 +0,0 @@ -import { Bot } from "grammy"; -import express from "express"; - -const TELEGRAM_BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN; -const DEEPAGENTS_URL = process.env.DEEPAGENTS_URL || "http://deepagents:8000"; - -const bot = new Bot(TELEGRAM_BOT_TOKEN); - -// Forward all text messages to the unified gateway /message endpoint -bot.on("message:text", (ctx) => { - const text = ctx.message.text; - const chat_id = String(ctx.chat.id); - - console.log(`[grammy] message from ${chat_id}: ${text.slice(0, 80)}`); - - fetch(`${DEEPAGENTS_URL}/message`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - text, - session_id: `tg-${chat_id}`, - channel: "telegram", - user_id: chat_id, - }), - }).catch((err) => console.error("[grammy] error forwarding to deepagents:", err)); -}); - -// HTTP server — delivers replies from the gateway back to Telegram -const app = express(); -app.use(express.json()); - -app.post("/send", async (req, res) => { - const { chat_id, text } = req.body; - if (!chat_id || !text) { - res.status(400).json({ error: "chat_id and text required" }); - return; - } - try { - await bot.api.sendMessage(chat_id, text); - console.log(`[grammy] sent to ${chat_id}: ${text.slice(0, 60)}`); - res.json({ ok: true }); - } catch (err) { - console.error(`[grammy] send error to ${chat_id}:`, err.message); - res.status(500).json({ error: err.message }); - } -}); - -app.get("/health", (_req, res) => res.json({ ok: true })); - -app.listen(3001, () => { - console.log("[grammy] HTTP server listening on port 3001"); -}); - -bot.start({ - onStart: (info) => console.log(`[grammy] bot @${info.username} started`), -}); diff --git a/adolf/grammy/package.json b/adolf/grammy/package.json deleted file mode 100644 index ca1ab35..0000000 --- a/adolf/grammy/package.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "type": "module", - "dependencies": { - "grammy": "^1.36.0", -"express": "^4.21.0" - } -} diff --git a/adolf/hello_world.py b/adolf/hello_world.py deleted file mode 100644 index 630136e..0000000 --- a/adolf/hello_world.py +++ /dev/null @@ -1,21 +0,0 @@ -import os -from langchain_ollama import ChatOllama -from deepagents import create_deep_agent - -OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") -MODEL = os.getenv("DEEPAGENTS_MODEL", "gemma3:4b") - -print(f"Connecting to Ollama at {OLLAMA_BASE_URL} with model {MODEL}") - -model = ChatOllama(model=MODEL, base_url=OLLAMA_BASE_URL) - -agent = create_deep_agent(model=model) - -result = agent.invoke({ - "messages": [{"role": "user", "content": "Say hello world in one sentence."}] -}) - -print("\n--- Agent Response ---") -for msg in result.get("messages", []): - if hasattr(msg, "content") and msg.content: - print(f"[{msg.__class__.__name__}]: {msg.content}") diff --git a/adolf/langgraph.md b/adolf/langgraph.md deleted file mode 100644 index 8a9d3b5..0000000 --- a/adolf/langgraph.md +++ /dev/null @@ -1,247 +0,0 @@ -# LangGraph: Multi-Model Routing Architecture - -## Problem - -`create_react_agent` uses one model for all steps in the ReAct loop: - -``` -qwen3:4b → decide to call tool ~37s -→ run tool ~1s -qwen3:4b → final answer ~37s -───────────────────────────────────── -Total ~75s -``` - -The routing step is classification + argument extraction — low entropy, constrained output. -It does not need the same model as answer generation. - ---- - -## Is the Pattern Established? (2025 Research) - -Yes. The 2025 consensus from multiple papers treats heterogeneous model architectures -(small for routing, large for generation) as **settled production engineering**, not research: - -- SLMs (1–12B) match or exceed LLMs on schema-constrained tasks (tool calls, JSON, function - calling) at 10×–100× lower compute cost (arXiv 2510.03847, arXiv 2506.02153) -- MasRouter (ACL 2025): routing in multi-agent graphs reduces costs 2× without quality loss -- Cascade routing (ICLR 2025): 4% accuracy improvement, 30–92% cost reduction vs naive routing -- NVIDIA research (2025): "Small Language Models are the Future of Agentic AI" - -**Limitations acknowledged in literature:** -- Bad router defeats the purpose — classifier quality is critical -- Cascade (try small, escalate if uncertain) adds latency on queries that escalate -- Pre-trained routers (RouteLLM, etc.) are calibrated for specific model pairs; local model - pairs need independent validation - ---- - -## Three-Tier Architecture (small → medium → large) - -### Concept - -``` -Incoming query - ↓ -[Router: tiny model or embedding classifier] ~1-2s - ├── simple/conversational → [Medium: qwen3:4b] ~20s - ├── needs tool call → [Medium: qwen3:4b + tools] ~20-40s - └── complex/multi-step → [Large: qwen3:8b + sub-agents] ~60s+ -``` - -### When to route to large - -Signals that justify loading a larger model: -- Multi-step reasoning required (math, code, planning) -- Sub-agent orchestration (the agent needs to call other agents) -- Explicit reasoning request ("think through", "analyze carefully") -- Low confidence from medium model (cascade pattern) - -### Trade-offs of three-tier vs two-tier - -| | Two-tier | Three-tier | -|--|---------|-----------| -| Simple queries | small router + medium answer | small router + medium answer (same) | -| Complex queries | medium (may struggle) | swap to large (better quality) | -| GPU constraint | manageable | hard — see below | -| Routing error cost | low | high (wrong tier = much slower) | - ---- - -## The 8GB GPU Constraint — Core Problem - -This is the central issue. Research numbers on model swapping (2025): - -**Cold swap from disk (no optimization)** -- TTFT exceeds 140s for 7B-class models on HDD; 5–15s on NVMe SSD -- Not viable for interactive use at any tier - -**vLLM Sleep Mode (offload to CPU RAM, not disk)** -- 18–200× faster than cold start; TTFT 2–3s per switch -- vLLM-specific — not available in Ollama - -**Ollama behavior on 8GB VRAM** -- Default `keep_alive`: 5 minutes — model stays warm after use -- Two models simultaneously: qwen3:4b (~2.5GB) + qwen2.5:1.5b (~1.2GB) = ~3.7GB — fits -- qwen3:4b + qwen3:8b = ~8GB — does not reliably fit; eviction required -- Sequential swap in Ollama: Ollama evicts old model, loads new one from SSD (~5–15s on NVMe) -- Known Ollama bug: model spills from VRAM to RAM → all subsequent loads stay on CPU until restart - -**Conclusion for three-tier on single 8GB GPU:** - -| Tier switch | Cost | Viable? | -|------------|------|---------| -| tiny router → medium (qwen3:4b) | model swap ~5-15s if router is separate | borderline | -| medium → large (qwen3:8b) | evict qwen3:4b, load qwen3:8b = ~5-15s additional | no, for interactive | -| Keep medium always warm, route to large on demand | 5-15s swap overhead per complex query | acceptable if complex queries are rare | - -**Honest verdict: three-tier with model swapping is not viable for interactive per-turn latency -on 8GB VRAM with Ollama.** vLLM with Sleep Mode would make it viable (2–3s switch) but -requires replacing Ollama. - ---- - -## Practical Architecture for 8GB GPU (Ollama) - -### Option 1: Two-tier, both models always in VRAM (recommended) - -Keep two small models loaded simultaneously: - -``` -qwen2.5:0.5b (~0.4GB) — router: tool call decision + arg extraction -qwen3:4b (~2.5GB) — answer: all generation -nomic-embed-text (CPU) — embedding: search and store -qwen2.5:1.5b (~1.2GB) — extraction: mem0 fact extraction (GPU) -───────────────────────────────────────────────────────── -Total VRAM: ~4.1GB — well within 8GB -``` - -No swapping needed. Router runs first (~1-2s), answer model runs after (~20s). - -``` -Router → tool call JSON or "no tool" ~1-2s -→ tool runs (if needed) ~1s -→ Answer model generates reply ~20s -───────────────────────────────────────────── -Total ~22-23s -``` - -vs current two-call approach: ~75s. - -### Option 2: Semantic routing (encoder-only, free) - -Use nomic-embed-text (already running on CPU) as the router: - -```python -query_vec = embed(query) -sims = { - "search_memory": cosine(query_vec, memory_topic_vec), - "web_search": cosine(query_vec, web_topic_vec), -} -# If max sim > threshold → call that tool directly -# Then pass result + original query to answer model -``` - -Zero VRAM overhead. ~50ms routing. Can't extract tool args from embedding alone — -needs hardcoded arg construction (e.g. query = original user message). - -### Option 3: Three-tier with rare large-model escalation - -Keep qwen3:4b warm. Route to qwen3:8b only for explicitly complex tasks. -Accept ~10s swap overhead for those queries. qwen3:8b gets unloaded after. - -``` -Router → simple → qwen3:4b ~20s (no swap) -Router → complex → evict 4b, load 8b → ~30s (10s swap + 20s inference) -``` - -Works if <20% of queries are "complex" and users accept occasional slow responses. -Best implemented with explicit user trigger ("think about this carefully") rather than -automatic classification, to avoid swap overhead on misclassified queries. - ---- - -## LangGraph Implementation - -`create_react_agent` locks to one model. Explicit graph supports per-node models: - -```python -from langgraph.graph import StateGraph, MessagesState -from langgraph.prebuilt import ToolNode -from langchain_ollama import ChatOllama - -router_model = ChatOllama(model="qwen2.5:0.5b", base_url=OLLAMA_GPU_URL) -answer_model = ChatOllama(model="qwen3:4b", base_url=OLLAMA_GPU_URL) -# For Option 3: large_model = ChatOllama(model="qwen3:8b", ...) - -def router_node(state): - # Small model only outputs tool call JSON or nothing - return {"messages": [router_model.bind_tools(tools).invoke(state["messages"])]} - -def answer_node(state): - # Large(r) model generates human reply — no tools bound - return {"messages": [answer_model.invoke(state["messages"])]} - -def route(state) -> str: - last = state["messages"][-1] - return "tools" if getattr(last, "tool_calls", []) else "answer" - -graph = StateGraph(MessagesState) -graph.add_node("router", router_node) -graph.add_node("tools", ToolNode(tools)) -graph.add_node("answer", answer_node) -graph.set_entry_point("router") -graph.add_conditional_edges("router", route) -graph.add_edge("tools", "answer") -graph.add_edge("answer", END) -agent = graph.compile() -``` - -For three-tier, add a complexity classifier node before the router that selects -`answer_model = medium_model or large_model` based on query signals. - ---- - -## Open Source Routing Tools - -| Tool | Ollama support | Status | Notes | -|------|---------------|--------|-------| -| LiteLLM | First-class | Active 2025 | Proxy with tiered routing, fallbacks, load balancing | -| RouteLLM (LMSYS) | Yes (documented) | Stale (last commit Aug 2024) | Calibrated for GPT-4 vs Mixtral pair | -| Router-R1 | No | Active (NeurIPS 2025) | RL-based, open-sourced on HuggingFace | -| LLMRouter (ulab) | No | Research 2025 | 16+ routing methods, fair comparison framework | -| FrugalGPT | No direct | Algorithm only | Portkey.ai has implementation guide | - -**Most practical for Ollama**: LiteLLM proxy with tiered model config. Handles routing, -fallbacks, and load balancing without changing agent code. - ---- - -## Summary: What to Do for Adolf - -| | Recommendation | -|--|---------------| -| Quick win (zero risk) | Remove "always call search_memory" from system prompt — history buffer covers conversational recall, saves ~37s | -| Best architecture for 8GB | Two-tier: qwen2.5:0.5b router + qwen3:4b answer, both in VRAM, ~22s total | -| Three-tier feasibility | Not viable for interactive use with Ollama model swapping; viable with vLLM Sleep Mode (~3s swap) if Ollama is replaced | -| Complex task routing | Use explicit user trigger or keyword detection rather than automatic classifier — avoids swap penalty on misclassification | - ---- - -## References - -- arXiv 2510.03847 — Small Language Models for Agentic Systems: A Survey -- arXiv 2506.02153 — Small Language Models are the Future of Agentic AI -- arXiv 2406.04692 — Mixture-of-Agents Enhances LLM Capabilities (original MoA paper) -- arXiv 2410.10347 — A Unified Approach to Routing and Cascading for LLMs (ICLR 2025) -- MasRouter — ACL 2025: https://aclanthology.org/2025.acl-long.757.pdf -- Router-R1 — NeurIPS 2025: https://github.com/ulab-uiuc/Router-R1 -- vLLM Sleep Mode: https://blog.vllm.ai/2025/10/26/sleep-mode.html -- NVIDIA GPU Memory Swap: https://developer.nvidia.com/blog/cut-model-deployment-costs-while-keeping-performance-with-gpu-memory-swap/ -- LangGraph multi-agent: https://langchain-ai.github.io/langgraph/tutorials/multi_agent/ -- LangGraph custom ReAct: https://langchain-ai.github.io/langgraph/how-tos/react-agent-from-scratch/ -- LiteLLM Ollama docs: https://docs.litellm.ai/docs/providers/ollama -- RouteLLM + Ollama example: https://github.com/lm-sys/RouteLLM/blob/main/examples/routing_to_local_models.md -- LLMRouter framework: https://ulab-uiuc.github.io/LLMRouter/ -- Functionary (tool-call fine-tuned): https://github.com/MeetKai/functionary -- Constrained generation (outlines): https://github.com/dottxt-ai/outlines diff --git a/adolf/openmemory/Dockerfile b/adolf/openmemory/Dockerfile deleted file mode 100644 index 038abf3..0000000 --- a/adolf/openmemory/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM python:3.12-slim -WORKDIR /app -COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt -COPY server.py . -CMD ["python", "server.py"] diff --git a/adolf/openmemory/requirements.txt b/adolf/openmemory/requirements.txt deleted file mode 100644 index 6ce37d4..0000000 --- a/adolf/openmemory/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -mem0ai -ollama -fastapi -uvicorn -mcp[cli] -qdrant-client diff --git a/adolf/openmemory/server.py b/adolf/openmemory/server.py deleted file mode 100644 index 73fd93c..0000000 --- a/adolf/openmemory/server.py +++ /dev/null @@ -1,155 +0,0 @@ -import json -import os -from mcp.server.fastmcp import FastMCP -from mem0 import Memory - -# Extraction LLM — GPU Ollama (qwen3:4b, same model as medium agent) -# Runs after reply when GPU is idle; spin-wait in agent.py prevents contention -OLLAMA_GPU_URL = os.getenv("OLLAMA_GPU_URL", "http://host.docker.internal:11436") - -# Embedding — CPU Ollama (nomic-embed-text, 137 MB RAM) -# Used for both search (50-150ms, acceptable) and store-time embedding -OLLAMA_CPU_URL = os.getenv("OLLAMA_CPU_URL", "http://host.docker.internal:11435") - -QDRANT_HOST = os.getenv("QDRANT_HOST", "host.docker.internal") -QDRANT_PORT = int(os.getenv("QDRANT_PORT", "6333")) - -# Change 2: Custom extraction prompt -# /no_think disables qwen3 thinking tokens so output is clean JSON -EXTRACTION_PROMPT = """/no_think -You are a memory extraction assistant. Extract factual statements from a conversation that are worth remembering long-term. - -Extract facts from BOTH user AND assistant messages, including: -- User details, preferences, and personal information -- User's plans, goals, and intentions -- The assistant's name or persona (if set by the user or stated by the assistant) -- Any commitments or agreements made -- Key facts stated as true - -Return ONLY valid JSON in this exact format: -{"facts": ["fact 1", "fact 2"]} - -If there are no facts worth storing, return: {"facts": []} - -IMPORTANT rules: -- Extract the EXACT concrete values mentioned. Never say "not known" or "unspecified". -- If the user states their name, job, pet, city, allergy, or preference — store the exact value. -- A single message may contain multiple facts — extract ALL of them. -- Do NOT extract vague summaries. Extract specific facts with real values. - -Examples: - -Input: "User: I live in Berlin\nAssistant: Got it, you're in Berlin!" -Output: {"facts": ["User lives in Berlin"]} - -Input: "User: My name is Alice and I live in Tokyo\nAssistant: Nice to meet you Alice!" -Output: {"facts": ["User's name is Alice", "User lives in Tokyo"]} - -Input: "User: I work as a software engineer at a startup\nAssistant: Cool!" -Output: {"facts": ["User works as a software engineer at a startup"]} - -Input: "User: I have a cat named Whiskers\nAssistant: Whiskers is a cute name!" -Output: {"facts": ["User has a cat named Whiskers"]} - -Input: "User: I'm allergic to nuts\nAssistant: I'll remember that." -Output: {"facts": ["User is allergic to nuts"]} - -Input: "User: remember that your name is Adolf\nAssistant: My name is Adolf!" -Output: {"facts": ["Assistant's name is Adolf"]} - -Input: "User: what time is it?\nAssistant: I don't have access to real-time data." -Output: {"facts": []} - -Input: "User: I prefer dark mode\nAssistant: Noted, I'll keep that in mind." -Output: {"facts": ["User prefers dark mode"]} - -Now extract facts from this conversation:""" - -# Update/dedup decision prompt — overrides mem0's default. -# qwen2.5:1.5b struggles with the default multi-step reasoning; this version is -# more explicit: list existing, list new, decide ADD/NONE per item. -UPDATE_PROMPT = """/no_think -You manage a memory store. Given EXISTING memories and NEW facts: -- For each EXISTING memory: output NONE (no change) or UPDATE (if a new fact replaces it) or DELETE. -- For each NEW fact: output ADD if it is not already covered by existing memories. Output NONE if it is already covered. -- IMPORTANT: You MUST include ALL new facts in your output — either as ADD or NONE. -- Output ONLY valid JSON, no explanation. - -Example A — new fact is genuinely new: -Existing: [{"id": "0", "text": "User lives in Berlin"}] -New facts: ["User is allergic to nuts"] -Output: {"memory": [{"id": "0", "text": "User lives in Berlin", "event": "NONE"}, {"id": "1", "text": "User is allergic to nuts", "event": "ADD"}]} - -Example B — new fact updates an existing one: -Existing: [{"id": "0", "text": "User lives in Berlin"}] -New facts: ["User lives in Paris"] -Output: {"memory": [{"id": "0", "text": "User lives in Paris", "event": "UPDATE", "old_memory": "User lives in Berlin"}]} - -Example C — new fact already covered: -Existing: [{"id": "0", "text": "User is allergic to nuts"}] -New facts: ["User has a nut allergy"] -Output: {"memory": [{"id": "0", "text": "User is allergic to nuts", "event": "NONE"}]}""" - -config = { - "llm": { - "provider": "ollama", - "config": { - "model": "qwen3:4b", - "ollama_base_url": OLLAMA_GPU_URL, - "temperature": 0.1, # consistent JSON output - }, - }, - "embedder": { - "provider": "ollama", - "config": { - "model": "nomic-embed-text", - "ollama_base_url": OLLAMA_CPU_URL, # CPU: 50-150ms per query, no GPU needed - }, - }, - "vector_store": { - "provider": "qdrant", - "config": { - "collection_name": "adolf_memories", - "embedding_model_dims": 768, - "host": QDRANT_HOST, - "port": QDRANT_PORT, - }, - }, - "custom_fact_extraction_prompt": EXTRACTION_PROMPT, - "custom_update_memory_prompt": UPDATE_PROMPT, -} - -memory = Memory.from_config(config) - -mcp = FastMCP("openmemory", host="0.0.0.0", port=8765) - - -@mcp.tool() -def add_memory(text: str, user_id: str = "default") -> str: - """Store a memory for a user.""" - result = memory.add(text, user_id=user_id) - # Change 3: return clean JSON instead of Python repr - return json.dumps(result, default=str) - - -@mcp.tool() -def search_memory(query: str, user_id: str = "default") -> str: - """Search memories for a user using semantic similarity.""" - results = memory.search(query, user_id=user_id, limit=10, threshold=0.3) - # Filter to only return results with score >= 0.5 to avoid irrelevant noise - if isinstance(results, dict) and "results" in results: - results["results"] = [r for r in results["results"] if r.get("score", 0) >= 0.5] - return json.dumps(results, default=str) - - -@mcp.tool() -def get_all_memories(user_id: str = "default", limit: int = 50) -> str: - """Get stored memories for a user (up to limit).""" - # Change 5: cap results to avoid flooding context - results = memory.get_all(user_id=user_id, limit=limit) - # Change 3: return clean JSON instead of Python repr - return json.dumps(results, default=str) - - -if __name__ == "__main__": - mcp.run(transport="sse") diff --git a/adolf/potential-directions.md b/adolf/potential-directions.md deleted file mode 100644 index b3ae04e..0000000 --- a/adolf/potential-directions.md +++ /dev/null @@ -1,13 +0,0 @@ -# Potential Directions - -## CPU Extraction Model Candidates (mem0 / openmemory) - -Replacing `gemma3:1b` — documented JSON/structured output failures make it unreliable for mem0's extraction pipeline. - -| Rank | Model | Size | CPU speed | JSON reliability | Notes | -|------|-------|------|-----------|-----------------|-------| -| 1 | `qwen2.5:1.5b` | ~934 MB | 25–40 tok/s | Excellent | Best fit: fast + structured output, 18T token training | -| 2 | `qwen2.5:3b` | ~1.9 GB | 15–25 tok/s | Excellent | Quality upgrade, same family | -| 3 | `llama3.2:3b` | ~2 GB | 15–25 tok/s | Good | Highest IFEval score (77.4) in class | -| 4 | `smollm2:1.7b` | ~1.1 GB | 25–35 tok/s | Moderate | Use temp=0; NuExtract-1.5-smol is fine-tuned variant | -| 5 | `phi4-mini` | ~2.5 GB | 10–17 tok/s | Good | Function calling support, borderline CPU speed | diff --git a/adolf/reasoning.md b/adolf/reasoning.md deleted file mode 100644 index 414c26b..0000000 --- a/adolf/reasoning.md +++ /dev/null @@ -1,287 +0,0 @@ -# Reasoning & Self-Reflection in Local LLM Agents - -Research-backed notes on implementing multi-stage reasoning for local 4-8B models (2025). - ---- - -## TL;DR - -For local 4-8B models, **programmatic self-critique loops rarely justify their cost**. -Native thinking tokens (Qwen3 `enable_thinking=True`) or external verifiers -give better results at lower complexity. See bottom of this file for recommendations. - ---- - -## Reasoning Patterns - -### Chain-of-Thought (CoT) - -Single forward pass, model thinks step-by-step before answering. -Zero implementation cost — just a prompt change. -Typical gain: +5-10pp on multi-step tasks vs no CoT. -No latency overhead beyond the extra output tokens. - -### Reflexion (Shinn et al., NeurIPS 2023) - -Multiple complete attempts. After each attempt, the model writes a textual critique -of what went wrong and stores it in episodic memory. Next attempt is conditioned on -that memory. - -``` -attempt 1 → fail → write critique → attempt 2 (reads critique) → ... -``` - -Key results (GPT-4): HumanEval 80% → 91% pass@1. -Cost: N complete task executions. At 30s/attempt, 5 trials = 2.5 minutes. -Implementation: https://github.com/noahshinn/reflexion - -### Reflection Loop (in-turn revision) - -Within a single turn: generate → critique → revise → [repeat]. -Simpler than Reflexion. More common in practice. - -``` -Generate → Critique → Revise → [stop condition] -``` - -Stop condition options: max iterations, score threshold, external verifier passes. - -### ReAct + Reflect - -Standard ReAct (Reason + Act) with an added Reflect step after failed tool calls. -Most common production pattern. Adds 1-3 extra LLM calls per failed action. - -### Tree of Thoughts (ToT) - -Explore N reasoning branches simultaneously, evaluate each node, BFS/DFS search. -Branching factor 3, depth 3 = 54 LLM calls per problem. Prohibitive for local models. -Works only if the model has strong self-evaluation capability (typically ≥32B). -ToTRL-trained Qwen3-8B achieved 0.633 on AIME 2025 — but required training-time RL, not -a prompt trick. - -### Graph of Thoughts (GoT) - -Generalizes ToT to arbitrary DAGs: thoughts can merge, split, or loop. -62% improvement in sorting vs ToT, 31% cost reduction. -Implementation: https://github.com/spcl/graph-of-thoughts -Higher complexity than ToT; graph structure is problem-specific. - ---- - -## Native Thinking Tokens (vs Programmatic Reflection) - -Open models with built-in reasoning scratchpads: - -| Model | Size | Ollama | Toggle | Notes | -|-------|------|--------|--------|-------| -| Qwen3 | 0.6B–235B | Yes | enable_thinking / think=True/False | Best option for local use | -| Qwen3-4B-Thinking-2507 | 4B | Yes | Always on | Dedicated thinking variant | -| QwQ-32B | 32B | Yes | Always on | Strong reasoning, needs VRAM | -| DeepSeek-R1 distills | 1.5B–70B | Yes | Always on | Llama/Qwen base | - -### Qwen3 thinking toggle in Ollama / LangChain - -```python -# LangChain -model = ChatOllama(model="qwen3:4b", think=True, num_ctx=8192) - -# Prompt-level (Ollama API) -# /think — enable per-request -# /no_think — disable per-request -``` - -Latency: thinking mode is 2-3x slower in wall-clock time (model generates internal -`...` tokens before answering). Qwen3-VL 8B Thinking: 262s vs 65s on a -complex visual reasoning task — but meaningfully better output. - -### Native thinking vs programmatic loop - -| | Native thinking | Programmatic multi-stage | -|--|----------------|------------------------| -| API calls | 1 | N (rounds × 2) | -| Implementation | Zero | Significant | -| Quality on 4-8B | Good (capability in weights) | Poor (weak model critiques itself) | -| Transparency | Opaque (one streamed block) | Inspectable per stage | -| Controllability | thinking_budget only | Full control | -| Latency | 2-3x tokens, 1 call | N × base latency | - -**For local 4-8B: native thinking almost always beats a hand-coded reflection loop.** - ---- - -## Does Programmatic Reflection Work on Small Models? - -Short answer: **mostly no without external verification**. - -From the research (2024-2025): - -- **"When Hindsight is Not 20/20" (arXiv 2404.09129)**: Self-reflection often makes - small models worse. A model that generated an error also lacks the capability to - identify it. It confidently accepts flawed reasoning on re-reading. - -- **THINKSLM (EMNLP 2025)**: Inference-time self-critique on Llama-3.1-8B is unreliable. - Training-time distilled reasoning traces help; prompt-based self-critique does not. - -- **Nature 2025 study**: Large gains (GPT-4: +18.5pp) diminish sharply for smaller models. - -- **Latency cost**: Each reflection round on a local 8B adds 5-30s. A 3-round loop - = 3x latency for 0-5% gain (or regression) on most tasks. - -### When it actually helps on small models - -1. **External verifier**: model doesn't self-evaluate — it reads objective pass/fail - feedback (unit tests, JSON schema checker, math verifier, search result grader). - Most reliable pattern. No self-evaluation capability required. - -2. **Stronger critic**: generate with 4B, critique with 32B or API model. Hybrid approach. - -3. **Native thinking weights**: reflection happens in a single forward pass with - trained weights. Far more reliable than prompt-based self-critique. - -4. **Structured error types**: code syntax, JSON validity, regex match — computable - error signal, not linguistic self-assessment. - ---- - -## LangGraph Reflection Loop Implementation - -LangGraph is suited for this because it supports cyclic graphs with state. - -### Minimal reflection graph - -```python -from langgraph.graph import StateGraph, START, END, MessagesState -from langchain_ollama import ChatOllama - -llm = ChatOllama(model="qwen3:4b", think=False) -critic_llm = ChatOllama(model="qwen3:4b", think=True) # or stronger model - -MAX_REFLECTIONS = 2 - -def generate(state): - response = llm.invoke(state["messages"]) - return {"messages": [response], "iterations": state.get("iterations", 0)} - -def reflect(state): - critique = critic_llm.invoke( - [{"role": "system", "content": "Critique this response. Be specific about errors."}] - + state["messages"] - ) - return { - "messages": [{"role": "user", "content": critique.content}], - "iterations": state["iterations"] + 1, - } - -def should_reflect(state) -> str: - if state.get("iterations", 0) >= MAX_REFLECTIONS: - return END - # Optionally: check external verifier here - return "reflect" - -graph = StateGraph(MessagesState) -graph.add_node("generate", generate) -graph.add_node("reflect", reflect) -graph.add_edge(START, "generate") -graph.add_conditional_edges("generate", should_reflect) -graph.add_edge("reflect", "generate") -agent = graph.compile() -``` - -### Self-Correcting RAG (CRAG pattern) - -``` -Retrieve → Grade documents → [rewrite query if bad] → Generate → Grade answer → [loop or END] -``` - -The document grader and answer grader are the "external verifiers" — they do -objective quality checks rather than linguistic self-critique. -LangChain tutorial: https://learnopencv.com/langgraph-self-correcting-agent-code-generation/ - ---- - -## Alternative Tooling - -### DSPy (recommended for pipeline optimization) - -DSPy treats prompts as learnable parameters. Define input/output signatures, run an -optimizer on examples, and DSPy auto-tunes prompts for your specific model. - -```python -import dspy -lm = dspy.LM('ollama_chat/qwen3:4b', api_base='http://localhost:11434', api_key='') -dspy.configure(lm=lm) - -class Reflect(dspy.Module): - def __init__(self): - self.gen = dspy.ChainOfThought("question -> answer") - self.critique = dspy.ChainOfThought("question, answer -> critique, improved_answer") - def forward(self, question): - first = self.gen(question=question) - return self.critique(question=question, answer=first.answer).improved_answer -``` - -Works with Ollama. Optimizer (BootstrapFewShot, MIPRO) tunes prompts automatically -but requires multiple LLM calls per training example — slow on local hardware. - -### Outlines (structured output) - -Constrained decoding — guarantees valid JSON/regex output from any model. -Use this inside a reflection loop to ensure the critic always returns structured feedback. -Works with Ollama via OpenAI-compatible API. -https://dottxt-ai.github.io/outlines/ - -### SGLang - -High-performance GPU serving runtime (replaces Ollama for GPU inference). -Natively understands `...` tokens, caches KV-prefix across reflection -rounds (RadixAttention). If you replace Ollama with SGLang: reflection loops become -significantly cheaper because repeated prompt prefixes are cache-hit. -https://github.com/sgl-project/sglang - ---- - -## Benchmarks Summary - -| Setup | Task | Quality Gain | Latency Cost | -|-------|------|-------------|-------------| -| GPT-4 + Reflexion | HumanEval | +11pp (80→91%) | ~5x | -| GPT-4 + reflection | Problem solving | +18.5pp | ~3x | -| Llama-7B + programmatic self-critique | Math | +7.1% | ~3x | -| Local 8B + same-model critique (typical) | General | 0-5% (often regression) | 2-3x | -| Qwen3-8B + native thinking | AIME 2025 | Matches models 10x larger | 2-3x tokens | -| Any model + external verifier (tests) | Code | +15-26pp | 1.5-2x | - ---- - -## Practical Recommendations for Adolf (local qwen3:4b / 8b) - -| Goal | Approach | Cost | -|------|----------|------| -| Better reasoning on hard questions | `think=True` in ChatOllama | 2-3x latency, zero code | -| Code/JSON correctness | External verifier (schema check, exec) + retry loop | +1 LLM call on failure | -| Complex multi-step tasks | Route to qwen3:8b with `think=True` | model swap + 2-3x tokens | -| Full reflection loop | Only if using stronger critic model or external verifier | significant complexity | -| Avoid | Programmatic self-critique using same 4-8B model as critic | adds latency, no gain | - ---- - -## References - -- Reflexion (Shinn et al., NeurIPS 2023): https://arxiv.org/abs/2303.11366 -- Tree of Thoughts: https://arxiv.org/abs/2305.10601 -- ToTRL (Qwen3 RL training): https://arxiv.org/html/2505.12717v1 -- Graph of Thoughts: https://arxiv.org/abs/2308.09687 -- Adaptive GoT (2025): https://arxiv.org/pdf/2502.05078 -- When Hindsight is Not 20/20: https://arxiv.org/html/2404.09129v1 -- THINKSLM (EMNLP 2025): https://aclanthology.org/2025.emnlp-main.1659.pdf -- MAR — Multi-Agent Reflexion: https://arxiv.org/html/2512.20845 -- Qwen3 technical report: https://arxiv.org/pdf/2505.09388 -- Qwen3 thinking cost measurement: https://medium.com/@frankmorales_91352/the-computational-cost-of-cognitive-depth-qwen3-vl-8b-instruct-vs-thinking-2517b677ba29 -- DeepSeek-R1: https://arxiv.org/abs/2501.12948 -- LangChain reflection blog: https://blog.langchain.com/reflection-agents/ -- LangGraph CRAG: https://learnopencv.com/langgraph-self-correcting-agent-code-generation/ -- DSPy: https://dspy.ai/ -- Outlines: https://dottxt-ai.github.io/outlines/ -- SGLang: https://github.com/sgl-project/sglang -- graph-of-thoughts: https://github.com/spcl/graph-of-thoughts -- reflexion (original code): https://github.com/noahshinn/reflexion diff --git a/adolf/router.py b/adolf/router.py deleted file mode 100644 index 85b0012..0000000 --- a/adolf/router.py +++ /dev/null @@ -1,140 +0,0 @@ -import re -from typing import Optional -from langchain_core.messages import SystemMessage, HumanMessage - -# ── Regex pre-classifier ────────────────────────────────────────────────────── -# Catches obvious light-tier patterns before calling the LLM. -# Keyed by regex → compiled pattern. -_LIGHT_PATTERNS = re.compile( - r"^(" - # Greetings / farewells - r"hi|hello|hey|yo|sup|howdy|good morning|good evening|good night|good afternoon" - r"|bye|goodbye|see you|cya|later|ttyl" - # Acknowledgements / small talk - r"|thanks?|thank you|thx|ty|ok|okay|k|cool|great|awesome|perfect|sounds good|got it|nice|sure" - r"|how are you|how are you\?|how are you doing(\s+today)?[?!.]*" - r"|what.?s up" - # Calendar facts: "what day comes after X?" / "what comes after X?" - r"|what\s+day\s+(comes\s+after|follows|is\s+after)\s+\w+[?!.]*" - r"|what\s+comes\s+after\s+\w+[?!.]*" - # Acronym expansions: "what does X stand for?" - r"|what\s+does\s+\w+\s+stand\s+for[?!.]*" - r")[\s!.?]*$", - re.IGNORECASE, -) - -# ── LLM classification prompt ───────────────────────────────────────────────── -CLASSIFY_PROMPT = """Classify the message. Output ONLY one word: light, medium, or complex. - -LIGHT = answerable from general knowledge, no internet needed: - what is 2+2 / what is the capital of France / name the three primary colors - tell me a short joke / is the sky blue / is water wet - -MEDIUM = requires web search or the user's stored memories: - current weather / today's news / Bitcoin price / what did we talk about - what is my name / where do I live / what is my job / do I have any pets - what do you know about me / what are my preferences / what did I tell you - -COMPLEX = /think prefix only: - /think compare frameworks / /think plan a trip - -Message: {message} -Output (one word only — light, medium, or complex):""" - -LIGHT_REPLY_PROMPT = """You are a helpful Telegram assistant. Answer briefly and naturally (1-3 sentences). Be friendly.""" - - -def _format_history(history: list[dict]) -> str: - if not history: - return "(none)" - lines = [] - for msg in history: - role = msg.get("role", "?") - content = str(msg.get("content", ""))[:200] - lines.append(f"{role}: {content}") - return "\n".join(lines) - - -def _parse_tier(text: str) -> str: - """Extract tier from raw model output. Default to medium.""" - t = text.strip().lower() - snippet = t[:60] - if "complex" in snippet: - return "complex" - if "medium" in snippet: - return "medium" - if "light" in snippet: - return "light" - # Model invented a descriptive category (e.g. "simplefact", "trivial", "basic") → - # treat as light since it recognised the question doesn't need tools - if any(w in snippet for w in ("simple", "fact", "trivial", "basic", "easy", "general")): - return "light" - return "medium" # safe default - - -class Router: - def __init__(self, model): - self.model = model - - async def route( - self, - message: str, - history: list[dict], - force_complex: bool = False, - ) -> tuple[str, Optional[str]]: - """ - Returns (tier, reply_or_None). - For light tier: also generates the reply with a second call. - For medium/complex: reply is None. - """ - if force_complex: - return "complex", None - - # Step 0: regex pre-classification for obvious light patterns - if _LIGHT_PATTERNS.match(message.strip()): - print(f"[router] regex→light", flush=True) - return await self._generate_light_reply(message, history) - - # Step 1: LLM classification with raw text output - try: - classify_response = await self.model.ainvoke([ - HumanMessage(content=CLASSIFY_PROMPT.format(message=message)), - ]) - raw = classify_response.content or "" - raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() - tier = _parse_tier(raw) - - if tier == "complex" and not message.startswith("/think"): - tier = "medium" - - print(f"[router] raw={raw[:30]!r} → tier={tier}", flush=True) - except Exception as e: - print(f"[router] classify error, defaulting to medium: {e}", flush=True) - return "medium", None - - if tier != "light": - return tier, None - - return await self._generate_light_reply(message, history) - - async def _generate_light_reply( - self, message: str, history: list[dict] - ) -> tuple[str, Optional[str]]: - """Generate a short reply using the router model for light-tier messages.""" - history_text = _format_history(history) - context = f"\nConversation history:\n{history_text}" if history else "" - try: - reply_response = await self.model.ainvoke([ - SystemMessage(content=LIGHT_REPLY_PROMPT + context), - HumanMessage(content=message), - ]) - reply_text = reply_response.content or "" - reply_text = re.sub(r".*?", "", reply_text, flags=re.DOTALL).strip() - if not reply_text: - print("[router] light reply empty, falling back to medium", flush=True) - return "medium", None - print(f"[router] light reply: {len(reply_text)} chars", flush=True) - return "light", reply_text - except Exception as e: - print(f"[router] light reply error, falling back to medium: {e}", flush=True) - return "medium", None diff --git a/adolf/test_pipeline.py b/adolf/test_pipeline.py deleted file mode 100644 index 034720d..0000000 --- a/adolf/test_pipeline.py +++ /dev/null @@ -1,1172 +0,0 @@ -#!/usr/bin/env python3 -""" -Adolf pipeline integration test with end-to-end timing profiling. - -Tests: - 1. Service health (deepagents, openmemory, grammy MCP SSE) - 2. GPU Ollama models - 3. CPU Ollama models - 4. Qdrant collection + vector dims - 5. SearXNG - 6. Name store — "remember that your name is " - 7. Qdrant point added after store - 8. Name recall — "what is your name?" → reply contains - 9. Timing profile + bottleneck report - 10. Easy benchmark — 10 easy questions → all must route to light - 11. Medium benchmark — 11 medium questions → must route to medium (or light, never complex) - 12. Hard benchmark — 10 /think questions → all must route to complex; VRAM flush verified - 13. Memory benchmark — store 5 facts, recall with 10 questions → verify keyword presence - 14. Dedup test — same fact sent twice → Qdrant must not grow by 2 - -Usage: - python3 test_pipeline.py [--chat-id CHAT_ID] - [--bench-only] skip sections 1-9, run 10+11+12+13 - [--easy-only] skip 1-9 and 11+12+13, run only section 10 - [--medium-only] skip 1-9 and 10+12+13, run only section 11 - [--hard-only] skip 1-9 and 10+11+13, run only section 12 - [--memory-only] skip 1-9 and 10+11+12, run only section 13 - [--no-bench] skip sections 10-13 - -Timing is extracted from deepagents container logs, not estimated from sleeps. -""" - -import argparse -import http.client -import json -import random -import re -import subprocess -import sys -import time -import urllib.request - -# ── config ──────────────────────────────────────────────────────────────────── -DEEPAGENTS = "http://localhost:8000" -OPENMEMORY = "http://localhost:8765" -GRAMMY_HOST = "localhost" -GRAMMY_PORT = 3001 -OLLAMA_GPU = "http://localhost:11436" -OLLAMA_CPU = "http://localhost:11435" -QDRANT = "http://localhost:6333" -SEARXNG = "http://localhost:11437" -COMPOSE_FILE = "/home/alvis/agap_git/adolf/docker-compose.yml" -DEFAULT_CHAT_ID = "346967270" - -NAMES = [ - "Maximilian", "Cornelius", "Zephyr", "Archibald", "Balthazar", - "Ignatius", "Lysander", "Octavian", "Reginald", "Sylvester", -] - -# ── benchmark questions ─────────────────────────────────────────────────────── -BENCHMARK = { - "easy": [ - "hi", - "what is 2+2?", - "what is the capital of France?", - "tell me a short joke", - "how are you doing today?", - "thanks!", - "what day comes after Wednesday?", - "name the three primary colors", - "is the sky blue?", - "what does CPU stand for?", - ], - "medium": [ - "what is the current weather in Berlin?", - "find the latest news about artificial intelligence", - "what is the current price of Bitcoin?", - "search for a good pasta carbonara recipe", - "what movies are in theaters this week?", - "find Python tutorials for beginners", - "who won the last FIFA World Cup?", - "do you remember what we talked about before?", - "search for the best coffee shops in Tokyo", - "what is happening in the tech industry this week?", - "what's the weather like today?", - ], - "hard": [ - "/think compare the top 3 Python web frameworks (Django, FastAPI, Flask) and recommend one for a production REST API", - "/think research the history of artificial intelligence and create a timeline of key milestones", - "/think plan a 7-day trip to Japan with daily itinerary, accommodation suggestions, and budget breakdown", - "/think analyze microservices vs monolithic architecture: pros, cons, and when to choose each", - "/think write a Python script that reads a CSV file, cleans the data, and generates summary statistics", - "/think research quantum computing: explain the key concepts and how it differs from classical computing", - "/think compare PostgreSQL, MongoDB, and Redis — when to use each and what are the trade-offs?", - "/think create a comprehensive Docker deployment guide covering best practices for production", - "/think research climate change: summarize the latest IPCC findings and key data points", - "/think design a REST API with authentication, rate limiting, and proper error handling — provide architecture and code outline", - ], -} - -PASS = "\033[32mPASS\033[0m" -FAIL = "\033[31mFAIL\033[0m" -INFO = "\033[36mINFO\033[0m" -WARN = "\033[33mWARN\033[0m" - -results = [] -timings = {} # label → float seconds | None - - -# ── helpers ─────────────────────────────────────────────────────────────────── - -def report(name, ok, detail=""): - tag = PASS if ok else FAIL - print(f" [{tag}] {name}" + (f" — {detail}" if detail else "")) - results.append((name, ok)) - - -def tf(v): - """Format timing value.""" - return f"{v:6.2f}s" if v is not None else " n/a" - - -def get(url, timeout=5): - with urllib.request.urlopen(urllib.request.Request(url), timeout=timeout) as r: - return r.status, r.read().decode() - - -def post_json(url, payload, timeout=10): - data = json.dumps(payload).encode() - req = urllib.request.Request(url, data=data, - headers={"Content-Type": "application/json"}, - method="POST") - with urllib.request.urlopen(req, timeout=timeout) as r: - return r.status, json.loads(r.read().decode()) - - -def check_sse(host, port, path): - try: - conn = http.client.HTTPConnection(host, port, timeout=5) - conn.request("GET", path, headers={"Accept": "text/event-stream"}) - r = conn.getresponse() - conn.close() - return r.status == 200, f"HTTP {r.status}" - except Exception as e: - return False, str(e) - - -def qdrant_count(): - try: - _, body = get(f"{QDRANT}/collections/adolf_memories") - return json.loads(body).get("result", {}).get("points_count", 0) - except Exception: - return 0 - - -def fetch_logs(since_s=600): - """Return deepagents log lines from the last since_s seconds.""" - try: - r = subprocess.run( - ["docker", "compose", "-f", COMPOSE_FILE, "logs", "deepagents", - f"--since={int(since_s)}s", "--no-log-prefix"], - capture_output=True, text=True, timeout=15, - ) - return r.stdout.splitlines() - except Exception: - return [] - - -def parse_run_block(lines, msg_prefix): - """ - Scan log lines for the LAST '[agent] running: ' block. - Extracts reply timing, tier, and memory timing from that block. - - Returns dict or None if the reply has not appeared in logs yet. - Dict keys: - reply_total, llm, send, tier, reply_text — from "[agent] replied in ..." - memory_s — from "[memory] stored in ..." - memory_error — True if "[memory] error" found - """ - search = msg_prefix[:50] - start_idx = None - for i, line in enumerate(lines): - if "[agent] running:" in line and search in line: - start_idx = i # keep updating — we want the LAST occurrence - - if start_idx is None: - return None - - block = lines[start_idx:] - last_ai_text = None - reply_data = None - - for j, line in enumerate(block): - # Track last non-tool AIMessage (the final reply) — truncated at 150 chars in logs, - # used only as fallback if reply_text line is absent (older server versions) - if "AIMessage:" in line and "→" not in line: - txt = line.split("AIMessage:", 1)[-1].strip() - if txt: - last_ai_text = txt - - m = re.search(r"replied in ([\d.]+)s \(llm=([\d.]+)s, send=([\d.]+)s\)", line) - if m: - # Extract optional tier tag at end of line - tier_m = re.search(r"\btier=(\w+)", line) - tier = tier_m.group(1) if tier_m else "unknown" - reply_data = { - "reply_total": float(m.group(1)), - "llm": float(m.group(2)), - "send": float(m.group(3)), - "tier": tier, - "reply_text": last_ai_text, # may be overwritten by reply_text line below - "memory_s": None, - "memory_error": False, - "_j": j, - } - break - - # Read full reply_text from dedicated log line (written immediately after replied-in line) - if reply_data is not None: - next_lines = block[reply_data["_j"] + 1: reply_data["_j"] + 3] - for line in next_lines: - if line.startswith("[agent] reply_text:"): - reply_data["reply_text"] = line[len("[agent] reply_text:"):].strip() - break - - if reply_data is None: - return None # reply not in logs yet - - # Memory line can appear after the next "[agent] running:" — no stop condition - for line in block[reply_data["_j"] + 1:]: - mm = re.search(r"\[memory\] stored in ([\d.]+)s", line) - if mm: - reply_data["memory_s"] = float(mm.group(1)) - break - if "[memory] error" in line: - reply_data["memory_error"] = True - break - - return reply_data - - -def wait_for(label, msg_prefix, timeout_s=200, need_memory=True): - """ - Poll deepagents logs until the message is fully processed. - Shows a live progress line. - Returns timing dict or None on timeout. - """ - t_start = time.monotonic() - deadline = t_start + timeout_s - tick = 0 - last_result = None - - while time.monotonic() < deadline: - # Window grows with elapsed time — never miss a line that appeared late - since = int(time.monotonic() - t_start) + 90 - lines = fetch_logs(since_s=since) - result = parse_run_block(lines, msg_prefix) - - if result: - last_result = result - has_mem = result["memory_s"] is not None or result["memory_error"] - if (not need_memory) or has_mem: - elapsed = time.monotonic() - t_start - print(f"\r [{label}] done after {elapsed:.0f}s{' ' * 30}") - return result - - time.sleep(4) - tick += 1 - rem = int(deadline - time.monotonic()) - if last_result: - phase = "waiting for memory..." if need_memory else "done" - else: - phase = "waiting for LLM reply..." - print(f"\r [{label}] {tick*4}s elapsed, {rem}s left — {phase} ", end="", flush=True) - - print(f"\r [{label}] TIMEOUT after {timeout_s}s{' ' * 30}") - return None - - -# ── args ────────────────────────────────────────────────────────────────────── -parser = argparse.ArgumentParser(description="Adolf pipeline test") -parser.add_argument("--chat-id", default=DEFAULT_CHAT_ID) -parser.add_argument("--bench-only", action="store_true", - help="Skip sections 1-9, run sections 10+11 (both benchmarks)") -parser.add_argument("--easy-only", action="store_true", - help="Skip sections 1-9 and 11, run only section 10 (easy benchmark)") -parser.add_argument("--medium-only", action="store_true", - help="Skip sections 1-9 and 10, run only section 11 (medium benchmark)") -parser.add_argument("--hard-only", action="store_true", - help="Skip sections 1-9 and 10+11, run only section 12 (hard benchmark)") -parser.add_argument("--memory-only", action="store_true", - help="Skip sections 1-9 and 10+11+12, run only section 13 (memory benchmark)") -parser.add_argument("--no-bench", action="store_true", - help="Skip sections 10-13 (all benchmarks)") -args = parser.parse_args() -CHAT_ID = args.chat_id - -# Derived flags for readability -_skip_pipeline = args.bench_only or args.easy_only or args.medium_only or args.hard_only or args.memory_only -_run_easy = not args.no_bench and not args.medium_only and not args.hard_only and not args.memory_only -_run_medium = not args.no_bench and not args.easy_only and not args.hard_only and not args.memory_only -_run_hard = not args.no_bench and not args.easy_only and not args.medium_only and not args.memory_only -_run_memory = not args.no_bench and not args.easy_only and not args.medium_only and not args.hard_only - -random_name = random.choice(NAMES) - -if not _skip_pipeline: - print(f"\n Test name : \033[1m{random_name}\033[0m") - print(f" Chat ID : {CHAT_ID}") - - -# ── 1. service health ───────────────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 1. Service health") - t0 = time.monotonic() - - try: - status, body = get(f"{DEEPAGENTS}/health") - data = json.loads(body) - ok = status == 200 and data.get("agent_ready") is True - report("deepagents /health — agent_ready", ok, f"agent_ready={data.get('agent_ready')}") - except Exception as e: - report("deepagents /health", False, str(e)) - - ok, detail = check_sse("localhost", 8765, "/sse") - report("openmemory /sse reachable", ok, detail) - - ok, detail = check_sse(GRAMMY_HOST, GRAMMY_PORT, "/sse") - report("grammy /sse reachable", ok, detail) - - timings["health_check"] = time.monotonic() - t0 - - -# ── 2. GPU Ollama ───────────────────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 2. GPU Ollama (port 11436)") - t0 = time.monotonic() - - try: - status, body = get(f"{OLLAMA_GPU}/api/tags") - models = [m["name"] for m in json.loads(body).get("models", [])] - has_qwen = any("qwen3" in m for m in models) - report("GPU Ollama reachable", True, f"models: {models}") - report("qwen3:8b present", has_qwen) - except Exception as e: - report("GPU Ollama reachable", False, str(e)) - report("qwen3:8b present", False, "skipped") - - timings["gpu_ollama_ping"] = time.monotonic() - t0 - - -# ── 3. CPU Ollama ───────────────────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 3. CPU Ollama (port 11435)") - t0 = time.monotonic() - - try: - status, body = get(f"{OLLAMA_CPU}/api/tags") - models = [m["name"] for m in json.loads(body).get("models", [])] - has_embed = any("nomic-embed-text" in m for m in models) - report("CPU Ollama reachable", True, f"models: {models}") - report("nomic-embed-text present", has_embed) - except Exception as e: - report("CPU Ollama reachable", False, str(e)) - report("nomic-embed-text present", False, "skipped") - - timings["cpu_ollama_ping"] = time.monotonic() - t0 - - -# ── 4. Qdrant ───────────────────────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 4. Qdrant (port 6333)") - t0 = time.monotonic() - - try: - status, body = get(f"{QDRANT}/collections") - cols = [c["name"] for c in json.loads(body).get("result", {}).get("collections", [])] - report("Qdrant reachable", True, f"collections: {cols}") - report("adolf_memories collection exists", "adolf_memories" in cols) - except Exception as e: - report("Qdrant reachable", False, str(e)) - report("adolf_memories collection exists", False, "skipped") - - try: - status, body = get(f"{QDRANT}/collections/adolf_memories") - info = json.loads(body).get("result", {}) - dims = info.get("config", {}).get("params", {}).get("vectors", {}).get("size") - report("vector dims = 768", dims == 768, f"got {dims}") - except Exception as e: - report("adolf_memories collection info", False, str(e)) - - timings["qdrant_ping"] = time.monotonic() - t0 - - -# ── 5. SearXNG ──────────────────────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 5. SearXNG (port 11437)") - t0 = time.monotonic() - - try: - status, body = get(f"{SEARXNG}/search?q=test&format=json", timeout=15) - elapsed = time.monotonic() - t0 - n = len(json.loads(body).get("results", [])) - report("SearXNG reachable + JSON results", status == 200 and n > 0, f"{n} results in {elapsed:.1f}s") - report("SearXNG response < 5s", elapsed < 5, f"{elapsed:.2f}s") - timings["searxng_latency"] = elapsed - except Exception as e: - report("SearXNG reachable", False, str(e)) - report("SearXNG response < 5s", False, "skipped") - timings["searxng_latency"] = None - - timings["searxng_check"] = time.monotonic() - t0 - - -# ── 6–8. Name memory pipeline ───────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 6–8. Name memory pipeline") - print(f" chat_id={CHAT_ID} name={random_name}") - - store_msg = f"remember that your name is {random_name}" - recall_msg = "what is your name?" - - pts_before = qdrant_count() - print(f" Qdrant points before: {pts_before}") - - # ── 6. Send store message ───────────────────────────────────────────────────── - print(f"\n [store] '{store_msg}'") - t_store = time.monotonic() - - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": store_msg, "chat_id": CHAT_ID}, timeout=5) - t_accept = time.monotonic() - t_store - report("POST /chat (store) returns 202 immediately", - status == 202 and t_accept < 1, f"status={status}, t={t_accept:.3f}s") - timings["store_http_accept"] = t_accept - except Exception as e: - report("POST /chat (store)", False, str(e)) - sys.exit(1) - - store = wait_for("store", store_msg, timeout_s=220, need_memory=True) - - if store: - timings["store_llm"] = store["llm"] - timings["store_send"] = store["send"] - timings["store_reply"] = store["reply_total"] - timings["store_memory"] = store["memory_s"] - report("Agent replied to store message", True, - f"{store['reply_total']:.1f}s total llm={store['llm']:.1f}s send={store['send']:.1f}s tier={store['tier']}") - if store["memory_s"] is not None: - report("Memory stored without error", True, f"{store['memory_s']:.1f}s") - elif store["memory_error"]: - report("Memory stored without error", False, "error in [memory] log") - else: - report("Memory stored without error", False, "not found in logs (still running?)") - print(f" Store reply: {store['reply_text']!r}") - else: - report("Agent replied to store message", False, "timeout") - report("Memory stored without error", False, "timeout") - sys.exit(1) - - # ── 7. Verify Qdrant ────────────────────────────────────────────────────────── - pts_after = qdrant_count() - new_pts = pts_after - pts_before - report("New memory point(s) added to Qdrant", new_pts > 0, - f"{pts_before} → {pts_after} (+{new_pts})") - timings["qdrant_new_points"] = new_pts - - # ── 8. Send recall message ──────────────────────────────────────────────────── - print(f"\n [recall] '{recall_msg}'") - t_recall = time.monotonic() - - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": recall_msg, "chat_id": CHAT_ID}, timeout=5) - t_accept2 = time.monotonic() - t_recall - report("POST /chat (recall) returns 202 immediately", - status == 202 and t_accept2 < 1, f"status={status}, t={t_accept2:.3f}s") - timings["recall_http_accept"] = t_accept2 - except Exception as e: - report("POST /chat (recall)", False, str(e)) - - recall = wait_for("recall", recall_msg, timeout_s=160, need_memory=False) - - if recall: - timings["recall_llm"] = recall["llm"] - timings["recall_send"] = recall["send"] - timings["recall_reply"] = recall["reply_total"] - report("Agent replied to recall message", True, - f"{recall['reply_total']:.1f}s total llm={recall['llm']:.1f}s send={recall['send']:.1f}s tier={recall['tier']}") - reply_text = recall["reply_text"] or "" - name_in_reply = random_name.lower() in reply_text.lower() - report(f"Reply contains '{random_name}'", name_in_reply, - f"reply: {reply_text[:120]!r}") - else: - report("Agent replied to recall message", False, "timeout") - report(f"Reply contains '{random_name}'", False, "no reply") - - -# ── 9. Timing profile ───────────────────────────────────────────────────────── -if not _skip_pipeline: - print(f"\n[{INFO}] 9. Timing profile") - - W = 36 - - print(f"\n {'Stage':<{W}} {'Time':>8}") - print(f" {'─'*W} {'─'*8}") - - rows_store = [ - ("[GPU] HTTP accept — store turn", "store_http_accept"), - ("[GPU] qwen3:Xb inference — store turn","store_llm"), - ("[GPU] Telegram send — store turn", "store_send"), - ("[GPU] Total reply latency — store", "store_reply"), - ("[GPU] qwen2.5:1.5b+embed — async mem", "store_memory"), - ] - rows_recall = [ - ("[GPU] HTTP accept — recall turn", "recall_http_accept"), - ("[GPU] qwen3:Xb inference — recall", "recall_llm"), - ("[GPU] Telegram send — recall turn", "recall_send"), - ("[GPU] Total reply latency — recall", "recall_reply"), - ] - - for label, key in rows_store: - v = timings.get(key) - print(f" {label:<{W}} {tf(v):>8}") - - print(f" {'─'*W} {'─'*8}") - - for label, key in rows_recall: - v = timings.get(key) - print(f" {label:<{W}} {tf(v):>8}") - - # Bottleneck bar chart - print(f"\n Bottleneck analysis (each █ ≈ 5s):") - print(f" {'─'*(W+12)}") - - candidates = [ - ("[GPU] qwen3:Xb — store reply ", timings.get("store_llm") or 0), - ("[GPU] qwen3:Xb — recall reply", timings.get("recall_llm") or 0), - ("[GPU] qwen2.5:1.5b+embed (async)", timings.get("store_memory") or 0), - ("[net] SearXNG ", timings.get("searxng_latency") or 0), - ] - candidates.sort(key=lambda x: x[1], reverse=True) - - for label, t in candidates: - bar = "█" * min(int(t / 5), 24) - pct = "" - total_pipeline = (timings.get("store_reply") or 0) + (timings.get("store_memory") or 0) - if total_pipeline > 0: - pct = f" {t/total_pipeline*100:4.0f}%" - print(f" {label} {t:6.1f}s {bar}{pct}") - - print() - - -# ── 10. Tier routing benchmark — easy questions → light path ────────────────── -if _run_easy: - print(f"\n[{INFO}] 10. Tier routing benchmark") - print(f" Sending {len(BENCHMARK['easy'])} easy questions — all must route to 'light'") - print(f" Chat ID: {CHAT_ID}") - print() - - bench_results = [] # list of (question, tier, latency_s, ok) - LIGHT_TIMEOUT = 60 # seconds — light is fast but may queue behind prior messages - - for i, question in enumerate(BENCHMARK["easy"], 1): - tag = f"easy-{i:02d}" - short_q = question[:55] - print(f" [{tag}] {short_q!r}") - - # Send - t_send = time.monotonic() - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": question, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - print(f" → [{FAIL}] POST returned {status}") - bench_results.append((question, "?", None, False)) - continue - except Exception as e: - print(f" → [{FAIL}] POST error: {e}") - bench_results.append((question, "?", None, False)) - continue - - # Poll for reply - t_start = time.monotonic() - found = None - while time.monotonic() - t_start < LIGHT_TIMEOUT: - since = int(time.monotonic() - t_start) + 30 - lines = fetch_logs(since_s=since) - found = parse_run_block(lines, question) - if found: - break - time.sleep(1) - - elapsed = time.monotonic() - t_send - - if not found: - print(f" → [{FAIL}] no reply within {LIGHT_TIMEOUT}s") - bench_results.append((question, "timeout", None, False)) - continue - - tier = found.get("tier", "unknown") - is_light = (tier == "light") - tag_str = PASS if is_light else FAIL - print(f" → [{tag_str}] tier={tier} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s") - bench_results.append((question, tier, found["reply_total"], is_light)) - - # Brief pause between questions to keep logs clean - time.sleep(1) - - # Summary table - print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}") - print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*50}") - for idx, (q, tier, lat, ok) in enumerate(bench_results, 1): - lat_str = f"{lat:.1f}s" if lat is not None else "timeout" - ok_str = "✓" if ok else "✗" - print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:50]!r}") - - light_count = sum(1 for _, _, _, ok in bench_results if ok) - total_bench = len(bench_results) - lats = [lat for _, _, lat, ok in bench_results if ok and lat is not None] - avg_lat = sum(lats) / len(lats) if lats else 0 - - print(f"\n Light-path score: {light_count}/{total_bench}") - if lats: - print(f" Avg latency (light): {avg_lat:.1f}s " - f"min={min(lats):.1f}s max={max(lats):.1f}s") - - report(f"All easy questions routed to light ({light_count}/{total_bench})", - light_count == total_bench, - f"{light_count}/{total_bench} via light path, avg {avg_lat:.1f}s") - - -# ── 11. Medium benchmark — medium questions → medium or light, never complex ── -if _run_medium: - print(f"\n[{INFO}] 11. Medium routing benchmark") - print(f" Sending {len(BENCHMARK['medium'])} medium questions") - print(f" Expected: tier=medium (needs tools). Light is acceptable for factual questions.") - print(f" Fail condition: tier=complex or timeout.") - print(f" Chat ID: {CHAT_ID}") - print() - - # Questions where light is a valid alternative (model may know from training data) - LIGHT_ACCEPTABLE = { - "who won the last FIFA World Cup?", - "search for a good pasta carbonara recipe", - "find Python tutorials for beginners", - "search for the best coffee shops in Tokyo", - } - - med_results = [] # list of (question, tier, latency_s, correct) - MEDIUM_TIMEOUT = 120 # seconds — medium takes 20-100s, allow for queue buildup - - for i, question in enumerate(BENCHMARK["medium"], 1): - tag = f"med-{i:02d}" - short_q = question[:60] - print(f" [{tag}] {short_q!r}") - - # Send - t_send = time.monotonic() - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": question, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - print(f" → [{FAIL}] POST returned {status}") - med_results.append((question, "?", None, False)) - continue - except Exception as e: - print(f" → [{FAIL}] POST error: {e}") - med_results.append((question, "?", None, False)) - continue - - # Poll for reply - t_start = time.monotonic() - found = None - while time.monotonic() - t_start < MEDIUM_TIMEOUT: - since = int(time.monotonic() - t_start) + 60 - lines = fetch_logs(since_s=since) - found = parse_run_block(lines, question) - if found: - break - time.sleep(3) - - elapsed = time.monotonic() - t_send - - if not found: - print(f" → [{FAIL}] no reply within {MEDIUM_TIMEOUT}s") - med_results.append((question, "timeout", None, False)) - continue - - tier = found.get("tier", "unknown") - light_ok = question in LIGHT_ACCEPTABLE - - if tier == "medium": - correct = True - label = PASS - note = "medium ✓" - elif tier == "light": - correct = light_ok # light is only acceptable for certain questions - label = WARN if not light_ok else PASS - note = "light (acceptable)" if light_ok else "light (should be medium)" - elif tier == "complex": - correct = False - label = FAIL - note = "complex — wrong escalation" - else: - correct = False - label = FAIL - note = f"unknown tier {tier!r}" - - print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s") - med_results.append((question, tier, found["reply_total"], correct)) - - # Brief pause between questions - time.sleep(1) - - # Summary table - print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question'}") - print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*55}") - for idx, (q, tier, lat, ok) in enumerate(med_results, 1): - lat_str = f"{lat:.1f}s" if lat is not None else "timeout" - ok_str = "✓" if ok else ("~" if tier == "light" else "✗") - print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {q[:55]!r}") - - total_med = len(med_results) - medium_count = sum(1 for _, tier, _, _ in med_results if tier == "medium") - light_count = sum(1 for _, tier, _, _ in med_results if tier == "light") - complex_count = sum(1 for _, tier, _, _ in med_results if tier == "complex") - timeout_count = sum(1 for _, tier, _, _ in med_results if tier == "timeout") - light_misroute = sum( - 1 for q, tier, _, _ in med_results - if tier == "light" and q not in LIGHT_ACCEPTABLE - ) - lats = [lat for _, _, lat, _ in med_results if lat is not None] - correct_count = medium_count + (light_count - light_misroute) - - print(f"\n Breakdown: medium={medium_count} light={light_count} complex={complex_count} timeout={timeout_count}") - if light_misroute: - print(f" [{WARN}] {light_misroute} question(s) answered via light when medium expected (check reply quality)") - if lats: - print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s") - - no_complex = complex_count == 0 - no_timeout = timeout_count == 0 - all_ok = no_complex and no_timeout - - report( - f"Medium questions: no complex escalation ({medium_count + light_count}/{total_med} routed)", - no_complex, - f"medium={medium_count} light={light_count} complex={complex_count} timeout={timeout_count}", - ) - if not no_timeout: - report( - f"Medium questions: all completed within {MEDIUM_TIMEOUT}s", - False, - f"{timeout_count} question(s) timed out (increase MEDIUM_TIMEOUT or check agent logs)", - ) - - -# ── 12. Hard benchmark — /think questions → complex tier + VRAM flush verified ─ -if _run_hard: - print(f"\n[{INFO}] 12. Hard routing benchmark") - print(f" Sending {len(BENCHMARK['hard'])} /think questions — all must route to 'complex'") - print(f" Verifies: /think prefix → force_complex=True → VRAM flush → qwen3:8b inference") - print(f" Acceptable fallback: 'medium' if VRAM eviction timed out (logged warning)") - print(f" Fail condition: tier=light or timeout") - print(f" Chat ID: {CHAT_ID}") - print() - - hard_results = [] # list of (question, tier, latency_s, ok) - COMPLEX_TIMEOUT = 300 # seconds — complex takes 60-180s + VRAM flush overhead - - # Log markers we expect to see for complex path - _VRAM_ENTER = "[vram] enter_complex_mode" - _VRAM_EXIT = "[vram] exit_complex_mode" - - for i, question in enumerate(BENCHMARK["hard"], 1): - tag = f"hard-{i:02d}" - # Strip /think prefix for display - short_q = question[len("/think "):].strip()[:60] - print(f" [{tag}] /think {short_q!r}") - - # Snapshot log window start time - t_send = time.monotonic() - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": question, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - print(f" → [{FAIL}] POST returned {status}") - hard_results.append((question, "?", None, False)) - continue - except Exception as e: - print(f" → [{FAIL}] POST error: {e}") - hard_results.append((question, "?", None, False)) - continue - - # Poll for reply - t_start = time.monotonic() - found = None - while time.monotonic() - t_start < COMPLEX_TIMEOUT: - since = int(time.monotonic() - t_start) + 90 - lines = fetch_logs(since_s=since) - found = parse_run_block(lines, question[len("/think "):].strip()) - if found: - break - time.sleep(5) - - elapsed = time.monotonic() - t_send - - if not found: - print(f" → [{FAIL}] no reply within {COMPLEX_TIMEOUT}s") - hard_results.append((question, "timeout", None, False)) - continue - - tier = found.get("tier", "unknown") - - if tier == "complex": - ok = True - label = PASS - note = "complex ✓" - elif tier == "medium": - # Acceptable fallback if VRAM eviction timed out - ok = True - label = WARN - note = "medium (VRAM fallback — check [vram] logs)" - else: - ok = False - label = FAIL - note = f"tier={tier} — unexpected" - - # Check if VRAM enter/exit were logged for this block - lines_block = fetch_logs(since_s=int(elapsed) + 120) - msg_key = question[len("/think "):].strip()[:40] - vram_enter_seen = any(_VRAM_ENTER in ln for ln in lines_block - if msg_key in ln or - any(msg_key[:15] in prev_ln - for prev_ln in lines_block[max(0, lines_block.index(ln)-10):lines_block.index(ln)])) - - # Simpler: just check the recent log window for enter/exit markers - recent = "\n".join(lines_block[-200:]) - vram_enter_seen = _VRAM_ENTER in recent - vram_exit_seen = _VRAM_EXIT in recent - - vram_note = "" - if tier == "complex": - if vram_enter_seen: - vram_note = " [vram:flush✓]" - else: - vram_note = f" [{WARN}:no vram flush log]" - - print(f" → [{label}] {note} latency={found['reply_total']:.1f}s llm={found['llm']:.1f}s{vram_note}") - hard_results.append((question, tier, found["reply_total"], ok)) - - # Pause to let exit_complex_mode background task complete before next question - # (flushes qwen3:8b and pre-warms 4b+router — avoids VRAM conflict on next enter) - time.sleep(5) - - # Summary table - print(f"\n {'#':<4} {'Tier':<8} {'Latency':>8} {'Question (/think ...)'}") - print(f" {'─'*4} {'─'*8} {'─'*8} {'─'*55}") - for idx, (q, tier, lat, ok) in enumerate(hard_results, 1): - lat_str = f"{lat:.1f}s" if lat is not None else "timeout" - ok_str = "✓" if tier == "complex" else ("~" if tier == "medium" else "✗") - short = q[len("/think "):].strip()[:55] - print(f" {ok_str} {idx:<3} {tier:<8} {lat_str:>8} {short!r}") - - total_hard = len(hard_results) - complex_count = sum(1 for _, t, _, _ in hard_results if t == "complex") - medium_fb = sum(1 for _, t, _, _ in hard_results if t == "medium") - light_count = sum(1 for _, t, _, _ in hard_results if t == "light") - timeout_count = sum(1 for _, t, _, _ in hard_results if t == "timeout") - lats = [lat for _, _, lat, _ in hard_results if lat is not None] - - print(f"\n Breakdown: complex={complex_count} medium(fallback)={medium_fb} light={light_count} timeout={timeout_count}") - if medium_fb: - print(f" [{WARN}] {medium_fb} question(s) fell back to medium (VRAM eviction timeout)") - if light_count: - print(f" [{FAIL}] {light_count} question(s) routed to light — /think prefix not detected") - if lats: - print(f" Avg latency: {sum(lats)/len(lats):.1f}s min={min(lats):.1f}s max={max(lats):.1f}s") - - no_light = light_count == 0 - no_timeout = timeout_count == 0 - - report( - f"Hard questions routed to complex (not light) ({complex_count + medium_fb}/{total_hard})", - no_light and no_timeout, - f"complex={complex_count} medium_fallback={medium_fb} light={light_count} timeout={timeout_count}", - ) - - -# ── 13. Memory benchmark — store facts, recall with keyword verification ─────── -if _run_memory: - _mem_name = random.choice([ - "Alice", "Bruno", "Camille", "Diego", "Elena", - "Farid", "Greta", "Hiroshi", "Irina", "Jonas", - ]) - _mem_city = random.choice([ - "Tokyo", "Berlin", "Cairo", "Sydney", "Oslo", - "Nairobi", "Lisbon", "Seoul", "Montreal", "Bangkok", - ]) - _mem_allergy = random.choice(["nuts", "gluten", "dairy", "shellfish", "eggs"]) - _mem_job = random.choice([ - ("software engineer", "startup"), - ("data scientist", "research lab"), - ("product manager", "tech company"), - ("DevOps engineer", "cloud provider"), - ]) - _mem_lang = random.choice(["Python", "Rust", "Go", "TypeScript", "Kotlin"]) - _mem_pet_name = random.choice([ - "Whiskers", "Biscuit", "Mango", "Pebble", "Shadow", - "Noodle", "Cheddar", "Cosmo", "Pippin", "Ziggy", - ]) - - print(f"\n[{INFO}] 13. Memory benchmark") - print(f" name={_mem_name} city={_mem_city} allergy={_mem_allergy} " - f"job={_mem_job[0]}@{_mem_job[1]} lang={_mem_lang} pet={_mem_pet_name}") - print(f" Storing 5 facts, then querying with 10 recall questions") - print(f" Chat ID: {CHAT_ID}") - print() - - # Wipe Qdrant collection and restart openmemory to eliminate stale data interference. - # Deleting the collection alone causes 404s — openmemory holds a live reference to it. - try: - import urllib.request as _ur - _req = _ur.Request(f"{QDRANT}/collections/adolf_memories", method="DELETE") - with _ur.urlopen(_req, timeout=5): - pass - print(f" [{INFO}] Wiped adolf_memories collection") - except Exception as e: - print(f" [{WARN}] Could not wipe collection: {e}") - - try: - subprocess.run( - ["docker", "compose", "-f", COMPOSE_FILE, "restart", "openmemory"], - capture_output=True, timeout=30, - ) - time.sleep(6) # wait for openmemory to reinitialize and recreate collection - print(f" [{INFO}] Restarted openmemory — fresh collection ready") - except Exception as e: - print(f" [{WARN}] Could not restart openmemory: {e}") - - MEMORY_FACTS = [ - f"My name is {_mem_name} and I live in {_mem_city}", - f"I prefer vegetarian food and I'm allergic to {_mem_allergy}", - f"I work as a {_mem_job[0]} at a {_mem_job[1]}", - f"My favorite programming language is {_mem_lang}", - f"I have a cat named {_mem_pet_name}", - ] - - MEMORY_RECALLS = [ - # (question, [keywords that must appear in reply]) - ("What is my name?", [_mem_name.lower()]), - ("Where do I live?", [_mem_city.lower()]), - ("Do I have any food allergies?", [_mem_allergy.lower()]), - ("What is my job?", [_mem_job[0].split()[0].lower()]), - ("What programming language do I prefer?", [_mem_lang.lower()]), - ("Do I have any pets?", [_mem_pet_name.lower()]), - ("Am I vegetarian or do I eat meat?", ["vegetarian"]), - ("What city am I in?", [_mem_city.lower()]), - ("Tell me what you know about me", [_mem_name.lower(), _mem_city.lower()]), - ("What's the name of my pet?", [_mem_pet_name.lower()]), - ] - - MEMORY_STORE_TIMEOUT = 180 # seconds per fact - MEMORY_RECALL_TIMEOUT = 180 # seconds per question - - # ── Store facts ────────────────────────────────────────────────────────── - print(f" Storing {len(MEMORY_FACTS)} facts...") - store_ok = 0 - for i, fact in enumerate(MEMORY_FACTS, 1): - print(f" [mem-store-{i:02d}] {fact!r}") - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": fact, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - print(f" → [{FAIL}] POST returned {status}") - continue - except Exception as e: - print(f" → [{FAIL}] POST error: {e}") - continue - - found = wait_for(f"mem-store-{i:02d}", fact, timeout_s=MEMORY_STORE_TIMEOUT, need_memory=True) - if found: - store_ok += 1 - print(f" → [{PASS}] stored tier={found['tier']} mem={found['memory_s']}s") - else: - print(f" → [{FAIL}] timeout") - - report(f"All memory facts stored ({store_ok}/{len(MEMORY_FACTS)})", - store_ok == len(MEMORY_FACTS)) - - # Wait for async memory extraction to settle — poll Qdrant until point count stabilises - print(f"\n Waiting for memory extraction to settle (up to 60s)...") - _prev_count = -1 - _stable_ticks = 0 - for _ in range(30): - time.sleep(2) - try: - _, body = get(f"{QDRANT}/collections/adolf_memories") - _cur_count = json.loads(body).get("result", {}).get("points_count", 0) - except Exception: - _cur_count = _prev_count - if _cur_count == _prev_count: - _stable_ticks += 1 - if _stable_ticks >= 3: # stable for 6s - break - else: - _stable_ticks = 0 - _prev_count = _cur_count - print(f" Memory settled: {_cur_count} points in Qdrant") - - # ── Recall questions ───────────────────────────────────────────────────── - print(f"\n Querying with {len(MEMORY_RECALLS)} recall questions...") - recall_results = [] # (question, keywords, reply_text, passed) - - for i, (question, keywords) in enumerate(MEMORY_RECALLS, 1): - print(f" [mem-recall-{i:02d}] {question!r}") - - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": question, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - print(f" → [{FAIL}] POST returned {status}") - recall_results.append((question, keywords, None, False)) - continue - except Exception as e: - print(f" → [{FAIL}] POST error: {e}") - recall_results.append((question, keywords, None, False)) - continue - - t_start = time.monotonic() - found = None - while time.monotonic() - t_start < MEMORY_RECALL_TIMEOUT: - since = int(time.monotonic() - t_start) + 30 - lines = fetch_logs(since_s=since) - found = parse_run_block(lines, question) - if found: - break - time.sleep(2) - - if not found: - print(f" → [{FAIL}] timeout") - recall_results.append((question, keywords, None, False)) - continue - - reply_text = (found.get("reply_text") or "").lower() - hit_keywords = [kw for kw in keywords if kw.lower() in reply_text] - passed = len(hit_keywords) == len(keywords) - tag_str = PASS if passed else WARN - missing = [kw for kw in keywords if kw.lower() not in reply_text] - detail = f"tier={found['tier']} lat={found['reply_total']:.1f}s" - if missing: - detail += f" missing keywords: {missing}" - print(f" → [{tag_str}] {detail}") - recall_results.append((question, keywords, found.get("reply_text"), passed)) - - time.sleep(1) - - # Summary - print(f"\n {'#':<4} {'Pass':<5} {'Question':<45} {'Keywords'}") - print(f" {'─'*4} {'─'*5} {'─'*45} {'─'*30}") - for idx, (q, kws, reply, ok) in enumerate(recall_results, 1): - ok_str = "✓" if ok else "✗" - print(f" {ok_str} {idx:<3} {'yes' if ok else 'no':<5} {q[:45]:<45} {kws}") - - recall_pass = sum(1 for _, _, _, ok in recall_results if ok) - total_recall = len(recall_results) - print(f"\n Memory recall score: {recall_pass}/{total_recall}") - - report(f"Memory recall ({recall_pass}/{total_recall} keywords found)", - recall_pass == total_recall, - f"{recall_pass}/{total_recall} questions had all expected keywords in reply") - - -# ── 14. Deduplication test — same fact stored twice must not grow Qdrant by 2 ─ -if _run_memory: - print(f"\n[{INFO}] 14. Memory deduplication test") - print(f" Sends the same fact twice — Qdrant point count must not increase by 2") - print(f" Chat ID: {CHAT_ID}") - print() - - DEDUP_TIMEOUT = 120 - - _dedup_fact = f"My lucky number is {random.randint(1000, 9999)}" - print(f" Fact: {_dedup_fact!r}") - - def _qdrant_count_dedup(): - try: - _, body = get(f"{QDRANT}/collections/adolf_memories") - return json.loads(body).get("result", {}).get("points_count", 0) - except Exception: - return 0 - - pts_before = _qdrant_count_dedup() - print(f" Qdrant points before: {pts_before}") - - # Send fact the first time - print(f" [dedup-1] sending fact (first time)") - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - report("Dedup: first POST accepted", False, f"status={status}") - else: - found1 = wait_for("dedup-1", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True) - if found1: - print(f" [dedup-1] stored tier={found1['tier']} mem={found1['memory_s']}s") - else: - print(f" [dedup-1] timeout") - except Exception as e: - report("Dedup: first POST accepted", False, str(e)) - found1 = None - - pts_after_first = _qdrant_count_dedup() - new_first = pts_after_first - pts_before - print(f" Qdrant after first send: {pts_before} → {pts_after_first} (+{new_first})") - - # Send exact same fact again - print(f" [dedup-2] sending same fact (second time)") - try: - status, _ = post_json(f"{DEEPAGENTS}/chat", - {"message": _dedup_fact, "chat_id": CHAT_ID}, timeout=5) - if status != 202: - report("Dedup: second POST accepted", False, f"status={status}") - else: - found2 = wait_for("dedup-2", _dedup_fact, timeout_s=DEDUP_TIMEOUT, need_memory=True) - if found2: - print(f" [dedup-2] stored tier={found2['tier']} mem={found2['memory_s']}s") - else: - print(f" [dedup-2] timeout") - except Exception as e: - report("Dedup: second POST accepted", False, str(e)) - - pts_after_second = _qdrant_count_dedup() - new_second = pts_after_second - pts_after_first - print(f" Qdrant after second send: {pts_after_first} → {pts_after_second} (+{new_second})") - - # Pass: second store added no MORE points than the first (NOOP or UPDATE, not ADD) - # If first send stored 0 points (fact too trivial), dedup is vacuously satisfied. - dedup_ok = new_second <= new_first - report( - "Deduplication: second identical fact not added to Qdrant", - dedup_ok, - f"first send: +{new_first} pts, second send: +{new_second} pts (want second ≤ first)", - ) - - -# ── summary ─────────────────────────────────────────────────────────────────── -print(f"\n{'─'*55}") -total = len(results) -passed = sum(1 for _, ok in results if ok) -failed = total - passed -print(f"Results: {passed}/{total} passed", end="") -if failed: - print(f" ({failed} failed)\n") - print("Failed checks:") - for name, ok in results: - if not ok: - print(f" - {name}") -else: - print(" — all good") -print() - -# Print benchmark reference -print(f"[{INFO}] Benchmark questions reference:") -for tier_name, questions in BENCHMARK.items(): - print(f"\n {tier_name.upper()} ({len(questions)} questions):") - for j, q in enumerate(questions, 1): - print(f" {j:2d}. {q}") -print() diff --git a/adolf/vram_manager.py b/adolf/vram_manager.py deleted file mode 100644 index fbf0083..0000000 --- a/adolf/vram_manager.py +++ /dev/null @@ -1,71 +0,0 @@ -import asyncio -import os -import httpx - -OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") - - -class VRAMManager: - MEDIUM_MODELS = ["qwen3:4b", "qwen2.5:1.5b"] - COMPLEX_MODEL = "qwen3:8b" - - def __init__(self, base_url: str = OLLAMA_BASE_URL): - self.base_url = base_url - - async def enter_complex_mode(self) -> bool: - """Flush medium models before loading 8b. Returns False if eviction timed out.""" - print("[vram] enter_complex_mode: flushing medium models", flush=True) - await asyncio.gather(*[self._flush(m) for m in self.MEDIUM_MODELS]) - ok = await self._poll_evicted(self.MEDIUM_MODELS, timeout=15) - if ok: - print("[vram] enter_complex_mode: eviction confirmed, loading qwen3:8b", flush=True) - else: - print("[vram] enter_complex_mode: eviction timeout — falling back to medium", flush=True) - return ok - - async def exit_complex_mode(self): - """Flush 8b and pre-warm medium models. Run as background task after complex reply.""" - print("[vram] exit_complex_mode: flushing qwen3:8b", flush=True) - await self._flush(self.COMPLEX_MODEL) - print("[vram] exit_complex_mode: pre-warming medium models", flush=True) - await asyncio.gather(*[self._prewarm(m) for m in self.MEDIUM_MODELS]) - print("[vram] exit_complex_mode: done", flush=True) - - async def _flush(self, model: str): - """Send keep_alive=0 to force immediate unload from VRAM.""" - try: - async with httpx.AsyncClient(timeout=10.0) as client: - await client.post( - f"{self.base_url}/api/generate", - json={"model": model, "prompt": "", "keep_alive": 0}, - ) - except Exception as e: - print(f"[vram] flush {model} error: {e}", flush=True) - - async def _poll_evicted(self, models: list[str], timeout: float) -> bool: - """Poll /api/ps until none of the given models appear (or timeout).""" - deadline = asyncio.get_event_loop().time() + timeout - while asyncio.get_event_loop().time() < deadline: - try: - async with httpx.AsyncClient(timeout=5.0) as client: - resp = await client.get(f"{self.base_url}/api/ps") - data = resp.json() - loaded = {m.get("name", "") for m in data.get("models", [])} - if not any(m in loaded for m in models): - return True - except Exception as e: - print(f"[vram] poll_evicted error: {e}", flush=True) - await asyncio.sleep(0.5) - return False - - async def _prewarm(self, model: str): - """Load model into VRAM with keep_alive=300 (5 min).""" - try: - async with httpx.AsyncClient(timeout=60.0) as client: - await client.post( - f"{self.base_url}/api/generate", - json={"model": model, "prompt": "", "keep_alive": 300}, - ) - print(f"[vram] pre-warmed {model}", flush=True) - except Exception as e: - print(f"[vram] prewarm {model} error: {e}", flush=True) diff --git a/adolf/wiki_research.py b/adolf/wiki_research.py deleted file mode 100644 index f2e2def..0000000 --- a/adolf/wiki_research.py +++ /dev/null @@ -1,278 +0,0 @@ -#!/usr/bin/env python3 -""" -Wiki Research Pipeline — searches the web for each person/place in the family wiki. - -Uses Adolf's complex agent (/think prefix → qwen3:8b + web_search) to research -each subject and aggregates findings into research.md. - -Usage: - python3 wiki_research.py [--subject "Name"] [--dry-run] [--timeout 300] - [--output PATH] -""" - -import argparse -import json -import re -import sys -import time -import urllib.parse -import urllib.request -from datetime import datetime -from pathlib import Path - -# ── config ───────────────────────────────────────────────────────────────────── -GATEWAY = "http://localhost:8000" -WIKI_ROOT = Path("/mnt/ssd/dbs/otter/app-data/repository") -DEFAULT_OUTPUT = WIKI_ROOT / "research.md" - -PASS = "\033[32mPASS\033[0m" -FAIL = "\033[31mFAIL\033[0m" -INFO = "\033[36mINFO\033[0m" - - -# ── helpers ──────────────────────────────────────────────────────────────────── - -def post_message(text: str, session_id: str, timeout: int = 10) -> int: - payload = json.dumps({ - "text": text, - "session_id": session_id, - "channel": "cli", - "user_id": "wiki-pipeline", - }).encode() - req = urllib.request.Request( - f"{GATEWAY}/message", - data=payload, - headers={"Content-Type": "application/json"}, - method="POST", - ) - with urllib.request.urlopen(req, timeout=timeout) as r: - return r.status - - -def wait_for_reply(label: str, session_id: str, timeout_s: int = 300) -> str | None: - """Open SSE stream on /reply/{session_id} and return reply text, or None on timeout.""" - req = urllib.request.Request( - f"{GATEWAY}/reply/{urllib.parse.quote(session_id, safe='')}", - headers={"Accept": "text/event-stream"}, - ) - t0 = time.monotonic() - tick = 0 - deadline = t0 + timeout_s - - # Show progress while waiting (SSE blocks until reply is ready) - print(f"\r [{label}] waiting... ", end="", flush=True) - - try: - with urllib.request.urlopen(req, timeout=timeout_s + 30) as r: - for raw_line in r: - elapsed = time.monotonic() - t0 - line = raw_line.decode("utf-8").rstrip("\n") - if line.startswith("data:"): - text = line[5:].strip().replace("\\n", "\n") - print(f"\r [{label}] done after {elapsed:.0f}s{' ' * 30}") - if text == "[timeout]": - return None - return text - tick += 1 - rem = int(deadline - time.monotonic()) - print(f"\r [{label}] {elapsed:.0f}s elapsed, {rem}s left — waiting... ", - end="", flush=True) - except Exception as e: - print(f"\r [{label}] SSE error: {e}{' ' * 30}") - - print(f"\r [{label}] TIMEOUT after {timeout_s}s{' ' * 30}") - return None - - -# ── wiki parsing ─────────────────────────────────────────────────────────────── - -def slugify(name: str) -> str: - s = name.lower() - s = re.sub(r"[^\w\s-]", "", s) - s = re.sub(r"\s+", "-", s.strip()) - return s[:60] - - -def parse_wiki_file(path: Path): - try: - text = path.read_text(encoding="utf-8") - except Exception: - return None - - lines = text.splitlines() - name = None - context_parts = [] - - for line in lines[:50]: - stripped = line.strip() - if not name and stripped.startswith("# "): - name = stripped[2:].strip() - continue - if name: - if stripped.startswith("[![") or stripped.startswith("!["): - continue - if stripped: - context_parts.append(stripped) - if len(context_parts) >= 20: - break - - if not name: - return None - return name, "\n".join(context_parts) - - -def discover_subjects(wiki_root: Path): - subjects = [] - for subdir in ["люди", "места"]: - folder = wiki_root / subdir - if not folder.exists(): - continue - for md_file in sorted(folder.glob("*.md")): - result = parse_wiki_file(md_file) - if result: - name, context = result - subjects.append((name, context, subdir)) - return subjects - - -# ── output ───────────────────────────────────────────────────────────────────── - -def load_existing_names(output_path: Path) -> set: - if not output_path.exists(): - return set() - return set(re.findall(r"^## (.+)$", output_path.read_text(encoding="utf-8"), re.MULTILINE)) - - -def init_output(output_path: Path, total: int): - if not output_path.exists(): - output_path.write_text( - f"# Wiki Research Results\n\n" - f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n" - f"Subjects: {total}\n\n---\n\n", - encoding="utf-8", - ) - - -def append_result(output_path: Path, name: str, elapsed: float, reply_text: str): - date_str = datetime.now().strftime("%Y-%m-%d") - block = ( - f"## {name}\n\n" - f"**Searched**: {date_str} **Elapsed**: {elapsed:.0f}s\n\n" - f"{reply_text or '_No reply captured._'}\n\n---\n\n" - ) - with open(output_path, "a", encoding="utf-8") as f: - f.write(block) - - -# ── research prompt ──────────────────────────────────────────────────────────── - -def build_prompt(name: str, context: str, subdir: str) -> str: - kind = "person" if subdir == "люди" else "place" - return ( - f"/think You are researching a {kind} for a private family wiki. " - f"Find everything publicly available. Be thorough and specific.\n\n" - f"**Subject**: {name}\n" - f"**Known context** (from the family wiki — do NOT just repeat this):\n{context}\n\n" - f"**Research instructions** (MUST follow exactly):\n" - f"1. Call web_search, then IMMEDIATELY call fetch_url on every URL found in results.\n" - f"2. You MUST call fetch_url at least 5 times — do not write the report until you have.\n" - f"3. Priority URLs to fetch: Google Scholar profile, ResearchGate, IEEE Xplore, LinkedIn, employer page.\n" - f"4. Run searches in English AND Russian/Latvian.\n" - f"5. After fetching pages, derive follow-up searches from what you find.\n\n" - f"**Output format** (required):\n" - f"- Use markdown with sections: Overview, Education, Career, Publications, " - f"Online Presence, Interesting Findings, Not Found\n" - f"- Every fact must have a source link: [fact](url)\n" - f"- Include actual URLs to profiles, papers, articles found\n" - f"- 'Interesting Findings': non-trivial facts not in the wiki context above\n" - f"- Last line must be: **Sources checked: N** (count of URLs you fetched with fetch_url)\n\n" - f'If truly nothing is found publicly, say "No public information found." ' - f"but only after exhausting all search angles." - ) - - -# ── main ─────────────────────────────────────────────────────────────────────── - -def main(): - parser = argparse.ArgumentParser(description="Wiki research pipeline") - parser.add_argument("--subject", help="Single subject (substring match)") - parser.add_argument("--dry-run", action="store_true", help="Print prompts, don't send") - parser.add_argument("--timeout", type=int, default=300, help="Per-subject timeout (s)") - parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT, help="Output file") - args = parser.parse_args() - - subjects = discover_subjects(WIKI_ROOT) - if not subjects: - print(f"[{FAIL}] No subjects found in {WIKI_ROOT}") - sys.exit(1) - - print(f"[{INFO}] Discovered {len(subjects)} subjects") - - if args.subject: - needle = args.subject.lower() - subjects = [(n, c, s) for n, c, s in subjects if needle in n.lower()] - if not subjects: - print(f"[{FAIL}] No subject matching '{args.subject}'") - sys.exit(1) - print(f"[{INFO}] Filtered to {len(subjects)} subject(s)") - - if args.dry_run: - for name, context, subdir in subjects: - print(f"\n{'='*60}\nSUBJECT: {name} ({subdir})") - print(f"PROMPT:\n{build_prompt(name, context, subdir)}") - return - - init_output(args.output, len(subjects)) - existing = load_existing_names(args.output) - print(f"[{INFO}] Output: {args.output} ({len(existing)} already done)") - - total = len(subjects) - done = 0 - failed = [] - - for idx, (name, context, subdir) in enumerate(subjects, 1): - if name in existing: - print(f"[{idx}/{total}] SKIP {name} (already in output)") - done += 1 - continue - - prompt = build_prompt(name, context, subdir) - session_id = f"wiki-{slugify(name)}" - label = f"{idx}/{total}" - - print(f"\n[{label}] {name}") - - try: - status = post_message(prompt, session_id, timeout=10) - if status != 202: - print(f" [{FAIL}] Unexpected status {status}") - failed.append(name) - continue - except Exception as e: - print(f" [{FAIL}] POST failed: {e}") - failed.append(name) - continue - - t0 = time.monotonic() - reply_text = wait_for_reply(label, session_id, timeout_s=args.timeout) - elapsed = time.monotonic() - t0 - - if reply_text is None: - print(f" [{FAIL}] Timeout") - failed.append(name) - append_result(args.output, name, elapsed, "_Research timed out._") - continue - - print(f" [{PASS}] {elapsed:.0f}s — {len(reply_text)} chars") - append_result(args.output, name, elapsed, reply_text) - done += 1 - - print(f"\n{'='*60}") - print(f"Done: {done}/{total}") - if failed: - print(f"Failed ({len(failed)}): {', '.join(failed)}") - print(f"Output: {args.output}") - - -if __name__ == "__main__": - main()