Embed Crawl4AI at all tiers, restore qwen3:4b medium, update docs

- Pre-routing URL fetch: any message with URLs gets content fetched
  async (httpx.AsyncClient) before routing via _fetch_urls_from_message()
- URL context and memories gathered concurrently with asyncio.gather
- Light tier upgraded to medium when URL content is present
- url_context injected into system prompt for medium and complex agents
- Complex agent retains web_search/fetch_url tools + receives pre-fetched content
- Medium model restored to qwen3:4b (was temporarily qwen2.5:1.5b)
- Unit tests added for _extract_urls
- ARCHITECTURE.md: added Tool Handling, Crawl4AI Integration, Memory Pipeline sections
- CLAUDE.md: updated request flow and Crawl4AI integration docs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Alvis
2026-03-12 15:49:34 +00:00
parent f9618a9bbf
commit 50097d6092
8 changed files with 183 additions and 31 deletions

View File

@@ -10,6 +10,12 @@ from pydantic import BaseModel
import re as _re
import httpx as _httpx
_URL_RE = _re.compile(r'https?://[^\s<>"\']+')
def _extract_urls(text: str) -> list[str]:
return _URL_RE.findall(text)
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_community.utilities import SearxSearchWrapper
@@ -35,6 +41,40 @@ CRAWL4AI_URL = os.getenv("CRAWL4AI_URL", "http://crawl4ai:11235")
MAX_HISTORY_TURNS = 5
_conversation_buffers: dict[str, list] = {}
async def _crawl4ai_fetch_async(url: str) -> str:
"""Async fetch via Crawl4AI — JS-rendered, bot-bypass, returns clean markdown."""
try:
async with _httpx.AsyncClient(timeout=60) as client:
r = await client.post(f"{CRAWL4AI_URL}/crawl", json={"urls": [url]})
r.raise_for_status()
results = r.json().get("results", [])
if not results or not results[0].get("success"):
return ""
md_obj = results[0].get("markdown") or {}
md = md_obj.get("raw_markdown") if isinstance(md_obj, dict) else str(md_obj)
return (md or "")[:5000]
except Exception as e:
return f"[fetch error: {e}]"
async def _fetch_urls_from_message(message: str) -> str:
"""If message contains URLs, fetch their content concurrently via Crawl4AI.
Returns a formatted context block, or '' if no URLs or all fetches fail."""
urls = _extract_urls(message)
if not urls:
return ""
# Fetch up to 3 URLs concurrently
results = await asyncio.gather(*[_crawl4ai_fetch_async(u) for u in urls[:3]])
parts = []
for url, content in zip(urls[:3], results):
if content and not content.startswith("[fetch error"):
parts.append(f"### {url}\n{content[:3000]}")
if not parts:
return ""
return "User's message contains URLs. Fetched content:\n\n" + "\n\n".join(parts)
# /no_think at the start of the system prompt disables qwen3 chain-of-thought.
# create_deep_agent prepends our system_prompt before BASE_AGENT_PROMPT, so
# /no_think lands at position 0 and is respected by qwen3 models via Ollama.
@@ -324,13 +364,28 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
history = _conversation_buffers.get(session_id, [])
print(f"[agent] running: {clean_message[:80]!r}", flush=True)
# Retrieve memories once; inject into history so ALL tiers can use them
memories = await _retrieve_memories(clean_message, session_id)
enriched_history = (
[{"role": "system", "content": memories}] + history if memories else history
# Fetch URL content and memories concurrently — both are IO-bound, neither needs GPU
url_context, memories = await asyncio.gather(
_fetch_urls_from_message(clean_message),
_retrieve_memories(clean_message, session_id),
)
if url_context:
print(f"[agent] crawl4ai: {len(url_context)} chars fetched from message URLs", flush=True)
# Build enriched history: memories + url_context as system context for ALL tiers
enriched_history = list(history)
if url_context:
enriched_history = [{"role": "system", "content": url_context}] + enriched_history
if memories:
enriched_history = [{"role": "system", "content": memories}] + enriched_history
tier, light_reply = await router.route(clean_message, enriched_history, force_complex)
# Messages with URL content must be handled by at least medium tier
if url_context and tier == "light":
tier = "medium"
light_reply = None
print("[agent] URL in message → upgraded light→medium", flush=True)
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
final_text = None
@@ -344,6 +399,8 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
system_prompt = MEDIUM_SYSTEM_PROMPT
if memories:
system_prompt = system_prompt + "\n\n" + memories
if url_context:
system_prompt = system_prompt + "\n\n" + url_context
result = await medium_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
@@ -363,6 +420,8 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
system_prompt = MEDIUM_SYSTEM_PROMPT
if memories:
system_prompt = system_prompt + "\n\n" + memories
if url_context:
system_prompt = system_prompt + "\n\n" + url_context
result = await medium_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
@@ -372,6 +431,9 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
})
else:
system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id)
if url_context:
# Inject pre-fetched content — complex agent can still re-fetch or follow links
system_prompt = system_prompt + "\n\n[Pre-fetched URL content from user's message:]\n" + url_context
result = await complex_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},