diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 7c8871c..c0504a0 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -52,23 +52,66 @@ Autonomous personal assistant with a multi-channel gateway. Three-tier model rou 2. POST /message {text, session_id, channel, user_id} 3. 202 Accepted immediately 4. Background: run_agent_task(message, session_id, channel) -5. Route → run agent tier → get reply text -6. channels.deliver(session_id, channel, reply_text) +5. Parallel IO (asyncio.gather): + a. _fetch_urls_from_message() — Crawl4AI fetches any URLs in message + b. _retrieve_memories() — openmemory semantic search for context +6. router.route() with enriched history (url_context + memories as system msgs) + - if URL content fetched and tier=light → upgrade to medium +7. Invoke agent for tier with url_context + memories in system prompt +8. channels.deliver(session_id, channel, reply_text) - always puts reply in pending_replies[session_id] queue (for SSE) - calls channel-specific send callback -7. GET /reply/{session_id} SSE clients receive the reply +9. _store_memory() background task — stores turn in openmemory +10. GET /reply/{session_id} SSE clients receive the reply ``` +## Tool Handling + +Adolf uses LangChain's tool interface but only the complex agent actually invokes tools at runtime. + +**Complex agent (`/think` prefix):** `web_search` and `fetch_url` are defined as `langchain_core.tools.Tool` objects and passed to `create_deep_agent()`. The deepagents library runs an agentic loop (LangGraph `create_react_agent` under the hood) that sends the tool schema to the model via OpenAI function-calling format and handles tool dispatch. + +**Medium agent (default):** `_DirectModel` makes a single `model.ainvoke(messages)` call with no tool schema. Context (memories, fetched URL content) is injected via the system prompt instead. This is intentional — `qwen3:4b` behaves unreliably when a tool array is present. + +**Memory tools (out-of-loop):** `add_memory` and `search_memory` are LangChain MCP tool objects (via `langchain_mcp_adapters`) but are excluded from both agents' tool lists. They are called directly — `await _memory_add_tool.ainvoke(...)` — outside the agent loop, before and after each turn. + ## Three-Tier Model Routing -| Tier | Model | VRAM | Trigger | Latency | -|------|-------|------|---------|---------| -| Light | qwen2.5:1.5b (router answers) | ~1.2 GB | Router classifies as light | ~2–4s | -| Medium | qwen3:4b | ~2.5 GB | Default | ~20–40s | -| Complex | qwen3:8b | ~6.0 GB | `/think` prefix | ~60–120s | +| Tier | Model | Agent | Trigger | Latency | +|------|-------|-------|---------|---------| +| Light | `qwen2.5:1.5b` (router answers directly) | — | Regex pre-match or LLM classifies "light" | ~2–4s | +| Medium | `qwen3:4b` (`DEEPAGENTS_MODEL`) | `_DirectModel` — single LLM call, no tools | Default; also forced when message contains URLs | ~10–20s | +| Complex | `qwen3:8b` (`DEEPAGENTS_COMPLEX_MODEL`) | `create_deep_agent` — agentic loop with tools | `/think` prefix only | ~60–120s | **`/think` prefix**: forces complex tier, stripped before sending to agent. +Complex tier is locked out unless the message starts with `/think` — any LLM classification of "complex" is downgraded to medium. + +## Crawl4AI Integration + +Crawl4AI runs as a Docker service (`crawl4ai:11235`) providing JS-rendered, bot-bypass page fetching. + +**Pre-routing fetch (all tiers):** +- `_URL_RE` detects `https?://` URLs in any incoming message +- `_crawl4ai_fetch_async()` uses `httpx.AsyncClient` to POST `{urls: [...]}` to `/crawl` +- Up to 3 URLs fetched concurrently via `asyncio.gather` +- Fetched content (up to 3000 chars/URL) injected as a system context block into enriched history before routing and into medium/complex system prompts +- If fetch succeeds and router returns light → tier upgraded to medium + +**Complex agent tools:** +- `web_search`: SearXNG query + Crawl4AI auto-fetch of top 2 result URLs → combined snippet + page text +- `fetch_url`: Crawl4AI single-URL fetch for any specific URL + +## Memory Pipeline + +openmemory runs as a FastMCP server (`openmemory:8765`) backed by mem0 + Qdrant + nomic-embed-text. + +**Retrieval (before routing):** `_retrieve_memories()` calls `search_memory` MCP tool with the user message as query. Results (threshold ≥ 0.5) are prepended to enriched history so all tiers benefit. + +**Storage (after reply):** `_store_memory()` runs as an asyncio background task, calling `add_memory` with `"User: ...\nAssistant: ..."`. The extraction LLM (`qwen2.5:1.5b` on GPU Ollama) pulls facts; dedup is handled by mem0's update prompt. + +Memory tools (`add_memory`, `search_memory`, `get_all_memories`) are excluded from agent tool lists — memory management happens outside the agent loop. + ## VRAM Management GTX 1070 — 8 GB. Ollama must be restarted if CUDA init fails (model loads on CPU). @@ -76,7 +119,7 @@ GTX 1070 — 8 GB. Ollama must be restarted if CUDA init fails (model loads on C 1. Flush explicitly before loading qwen3:8b (`keep_alive=0`) 2. Verify eviction via `/api/ps` poll (15s timeout) before proceeding 3. Fallback: timeout → run medium agent instead -4. Post-complex: flush 8b, pre-warm 4b + router +4. Post-complex: flush 8b, pre-warm medium + router ## Session ID Convention @@ -89,18 +132,18 @@ Conversation history is keyed by session_id (5-turn buffer). ``` adolf/ -├── docker-compose.yml Services: deepagents, openmemory, grammy +├── docker-compose.yml Services: bifrost, deepagents, openmemory, grammy, crawl4ai ├── Dockerfile deepagents container (Python 3.12) -├── agent.py FastAPI gateway + three-tier routing +├── agent.py FastAPI gateway, run_agent_task, Crawl4AI pre-fetch, memory pipeline ├── channels.py Channel registry + deliver() + pending_replies -├── router.py Router class — qwen2.5:1.5b routing +├── router.py Router class — regex + LLM tier classification ├── vram_manager.py VRAMManager — flush/prewarm/poll Ollama VRAM -├── agent_factory.py build_medium_agent / build_complex_agent +├── agent_factory.py _DirectModel (medium) / create_deep_agent (complex) ├── cli.py Interactive CLI REPL client ├── wiki_research.py Batch wiki research pipeline (uses /message + SSE) ├── .env TELEGRAM_BOT_TOKEN (not committed) ├── openmemory/ -│ ├── server.py FastMCP + mem0 MCP tools +│ ├── server.py FastMCP + mem0: add_memory, search_memory, get_all_memories │ └── Dockerfile └── grammy/ ├── bot.mjs grammY Telegram bot + POST /send HTTP endpoint @@ -108,11 +151,11 @@ adolf/ └── Dockerfile ``` -## External Services (from openai/ stack) +## External Services (host ports, from openai/ stack) | Service | Host Port | Role | |---------|-----------|------| -| Ollama GPU | 11436 | All reply inference | -| Ollama CPU | 11435 | Memory embedding (nomic-embed-text) | +| Ollama GPU | 11436 | All LLM inference (via Bifrost) + VRAM management (direct) + memory extraction | +| Ollama CPU | 11435 | nomic-embed-text embeddings for openmemory | | Qdrant | 6333 | Vector store for memories | -| SearXNG | 11437 | Web search | +| SearXNG | 11437 | Web search (used by `web_search` tool) | diff --git a/CLAUDE.md b/CLAUDE.md index a85d22f..5f6b428 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -37,12 +37,18 @@ Adolf is a multi-channel personal assistant. All LLM inference is routed through Channel adapter → POST /message {text, session_id, channel, user_id} → 202 Accepted (immediate) → background: run_agent_task() + → asyncio.gather( + _fetch_urls_from_message() ← Crawl4AI, concurrent + _retrieve_memories() ← openmemory search, concurrent + ) → router.route() → tier decision (light/medium/complex) - → invoke agent for tier via Bifrost + if URL content fetched → upgrade light→medium + → invoke agent for tier via Bifrost (url_context + memories in system prompt) deepagents:8000 → bifrost:8080/v1 → ollama:11436 → channels.deliver(session_id, channel, reply) → pending_replies[session_id] queue (SSE) → channel-specific callback (Telegram POST, CLI no-op) + → _store_memory() background task (openmemory) CLI/wiki polling → GET /reply/{session_id} (SSE, blocks until reply) ``` @@ -59,7 +65,7 @@ Bifrost (`bifrost-config.json`) is configured with the `ollama` provider pointin | Tier | Model (env var) | Trigger | |------|-----------------|---------| | light | `qwen2.5:1.5b` (`DEEPAGENTS_ROUTER_MODEL`) | Regex pre-match or LLM classifies "light" — answered by router model directly, no agent invoked | -| medium | `qwen2.5:1.5b` (`DEEPAGENTS_MODEL`) | Default for tool-requiring queries | +| medium | `qwen3:4b` (`DEEPAGENTS_MODEL`) | Default for tool-requiring queries | | complex | `qwen3:8b` (`DEEPAGENTS_COMPLEX_MODEL`) | `/think ` prefix only | The router does regex pre-classification first, then LLM classification. Complex tier is blocked unless the message starts with `/think ` — any LLM classification of "complex" is downgraded to medium. @@ -107,10 +113,14 @@ External (from `openai/` stack, host ports): The file is mounted into the bifrost container at `/app/data/config.json`. It declares one Ollama provider key pointing to `host.docker.internal:11436` with 2 retries and 300s timeout. To add fallback providers or adjust weights, edit this file and restart the bifrost container. -### Agent tools +### Crawl4AI integration + +Crawl4AI is embedded at all levels of the pipeline: + +- **Pre-routing (all tiers)**: `_fetch_urls_from_message()` detects URLs in any message via `_URL_RE`, fetches up to 3 URLs concurrently with `_crawl4ai_fetch_async()` (async httpx). URL content is injected as a system context block into enriched history before routing, and into the system prompt for medium/complex agents. +- **Tier upgrade**: if URL content is successfully fetched, light tier is upgraded to medium (light model cannot process page content). +- **Complex agent tools**: `web_search` (SearXNG + Crawl4AI auto-fetch of top 2 results) and `fetch_url` (single-URL Crawl4AI fetch) remain available for the complex agent's agentic loop. Complex tier also receives the pre-fetched content in system prompt to avoid redundant re-fetching. -`web_search`: SearXNG search + Crawl4AI auto-fetch of top 2 results → combined snippet + full page content. -`fetch_url`: Crawl4AI single-URL fetch. MCP tools from openmemory (`add_memory`, `search_memory`, `get_all_memories`) are **excluded** from agent tools — memory management is handled outside the agent loop. ### Medium vs Complex agent @@ -122,12 +132,12 @@ MCP tools from openmemory (`add_memory`, `search_memory`, `get_all_memories`) ar ### Key files -- `agent.py` — FastAPI app, lifespan wiring, `run_agent_task()`, all endpoints +- `agent.py` — FastAPI app, lifespan wiring, `run_agent_task()`, Crawl4AI pre-fetch, memory pipeline, all endpoints - `bifrost-config.json` — Bifrost provider config (Ollama GPU, retries, timeouts) - `channels.py` — channel registry and `deliver()` dispatcher - `router.py` — `Router` class: regex + LLM classification, light-tier reply generation - `vram_manager.py` — `VRAMManager`: flush/poll/prewarm Ollama VRAM directly -- `agent_factory.py` — `build_medium_agent` / `build_complex_agent` via `create_deep_agent()` +- `agent_factory.py` — `build_medium_agent` (`_DirectModel`, single call) / `build_complex_agent` (`create_deep_agent`) - `openmemory/server.py` — FastMCP + mem0 config with custom extraction/dedup prompts - `wiki_research.py` — batch research pipeline using `/message` + SSE polling - `grammy/bot.mjs` — Telegram long-poll + HTTP `/send` endpoint diff --git a/agent.py b/agent.py index 4d85675..1cb7ac0 100644 --- a/agent.py +++ b/agent.py @@ -10,6 +10,12 @@ from pydantic import BaseModel import re as _re import httpx as _httpx +_URL_RE = _re.compile(r'https?://[^\s<>"\']+') + + +def _extract_urls(text: str) -> list[str]: + return _URL_RE.findall(text) + from langchain_openai import ChatOpenAI from langchain_mcp_adapters.client import MultiServerMCPClient from langchain_community.utilities import SearxSearchWrapper @@ -35,6 +41,40 @@ CRAWL4AI_URL = os.getenv("CRAWL4AI_URL", "http://crawl4ai:11235") MAX_HISTORY_TURNS = 5 _conversation_buffers: dict[str, list] = {} + +async def _crawl4ai_fetch_async(url: str) -> str: + """Async fetch via Crawl4AI — JS-rendered, bot-bypass, returns clean markdown.""" + try: + async with _httpx.AsyncClient(timeout=60) as client: + r = await client.post(f"{CRAWL4AI_URL}/crawl", json={"urls": [url]}) + r.raise_for_status() + results = r.json().get("results", []) + if not results or not results[0].get("success"): + return "" + md_obj = results[0].get("markdown") or {} + md = md_obj.get("raw_markdown") if isinstance(md_obj, dict) else str(md_obj) + return (md or "")[:5000] + except Exception as e: + return f"[fetch error: {e}]" + + +async def _fetch_urls_from_message(message: str) -> str: + """If message contains URLs, fetch their content concurrently via Crawl4AI. + Returns a formatted context block, or '' if no URLs or all fetches fail.""" + urls = _extract_urls(message) + if not urls: + return "" + # Fetch up to 3 URLs concurrently + results = await asyncio.gather(*[_crawl4ai_fetch_async(u) for u in urls[:3]]) + parts = [] + for url, content in zip(urls[:3], results): + if content and not content.startswith("[fetch error"): + parts.append(f"### {url}\n{content[:3000]}") + if not parts: + return "" + return "User's message contains URLs. Fetched content:\n\n" + "\n\n".join(parts) + + # /no_think at the start of the system prompt disables qwen3 chain-of-thought. # create_deep_agent prepends our system_prompt before BASE_AGENT_PROMPT, so # /no_think lands at position 0 and is respected by qwen3 models via Ollama. @@ -324,13 +364,28 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram history = _conversation_buffers.get(session_id, []) print(f"[agent] running: {clean_message[:80]!r}", flush=True) - # Retrieve memories once; inject into history so ALL tiers can use them - memories = await _retrieve_memories(clean_message, session_id) - enriched_history = ( - [{"role": "system", "content": memories}] + history if memories else history + # Fetch URL content and memories concurrently — both are IO-bound, neither needs GPU + url_context, memories = await asyncio.gather( + _fetch_urls_from_message(clean_message), + _retrieve_memories(clean_message, session_id), ) + if url_context: + print(f"[agent] crawl4ai: {len(url_context)} chars fetched from message URLs", flush=True) + + # Build enriched history: memories + url_context as system context for ALL tiers + enriched_history = list(history) + if url_context: + enriched_history = [{"role": "system", "content": url_context}] + enriched_history + if memories: + enriched_history = [{"role": "system", "content": memories}] + enriched_history tier, light_reply = await router.route(clean_message, enriched_history, force_complex) + + # Messages with URL content must be handled by at least medium tier + if url_context and tier == "light": + tier = "medium" + light_reply = None + print("[agent] URL in message → upgraded light→medium", flush=True) print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True) final_text = None @@ -344,6 +399,8 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram system_prompt = MEDIUM_SYSTEM_PROMPT if memories: system_prompt = system_prompt + "\n\n" + memories + if url_context: + system_prompt = system_prompt + "\n\n" + url_context result = await medium_agent.ainvoke({ "messages": [ {"role": "system", "content": system_prompt}, @@ -363,6 +420,8 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram system_prompt = MEDIUM_SYSTEM_PROMPT if memories: system_prompt = system_prompt + "\n\n" + memories + if url_context: + system_prompt = system_prompt + "\n\n" + url_context result = await medium_agent.ainvoke({ "messages": [ {"role": "system", "content": system_prompt}, @@ -372,6 +431,9 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram }) else: system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id) + if url_context: + # Inject pre-fetched content — complex agent can still re-fetch or follow links + system_prompt = system_prompt + "\n\n[Pre-fetched URL content from user's message:]\n" + url_context result = await complex_agent.ainvoke({ "messages": [ {"role": "system", "content": system_prompt}, diff --git a/docker-compose.yml b/docker-compose.yml index fbc177d..e9a6ecf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,7 +25,7 @@ services: - BIFROST_URL=http://bifrost:8080/v1 # Direct Ollama GPU URL — used only by VRAMManager for flush/prewarm - OLLAMA_BASE_URL=http://host.docker.internal:11436 - - DEEPAGENTS_MODEL=qwen2.5:1.5b + - DEEPAGENTS_MODEL=qwen3:4b - DEEPAGENTS_COMPLEX_MODEL=qwen3:8b - DEEPAGENTS_ROUTER_MODEL=qwen2.5:1.5b - SEARXNG_URL=http://host.docker.internal:11437 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/test_agent_helpers.py b/tests/unit/test_agent_helpers.py index 9df77d1..65e221c 100644 --- a/tests/unit/test_agent_helpers.py +++ b/tests/unit/test_agent_helpers.py @@ -13,7 +13,7 @@ import pytest # The FastAPI app is instantiated at module level in agent.py — # with the mocked fastapi, that just creates a MagicMock() object # and the route decorators are no-ops. -from agent import _strip_think, _extract_final_text +from agent import _strip_think, _extract_final_text, _extract_urls # ── _strip_think ─────────────────────────────────────────────────────────────── @@ -159,3 +159,40 @@ class TestExtractFinalText: ] } assert _extract_final_text(result) == "## Report\n\nSome content." + + +# ── _extract_urls ────────────────────────────────────────────────────────────── + +class TestExtractUrls: + def test_single_url(self): + assert _extract_urls("check this out https://example.com please") == ["https://example.com"] + + def test_multiple_urls(self): + urls = _extract_urls("see https://foo.com and https://bar.org/path?q=1") + assert urls == ["https://foo.com", "https://bar.org/path?q=1"] + + def test_no_urls(self): + assert _extract_urls("no links here at all") == [] + + def test_http_and_https(self): + urls = _extract_urls("http://old.site and https://new.site") + assert "http://old.site" in urls + assert "https://new.site" in urls + + def test_url_at_start_of_message(self): + assert _extract_urls("https://example.com is interesting") == ["https://example.com"] + + def test_url_only(self): + assert _extract_urls("https://example.com/page") == ["https://example.com/page"] + + def test_url_with_path_and_query(self): + url = "https://example.com/articles/123?ref=home&page=2" + assert _extract_urls(url) == [url] + + def test_empty_string(self): + assert _extract_urls("") == [] + + def test_does_not_include_surrounding_quotes(self): + # URLs inside quotes should not include the quote character + urls = _extract_urls('visit "https://example.com" today') + assert urls == ["https://example.com"]