wiki search people tested pipeline

This commit is contained in:
Alvis
2026-03-05 11:22:34 +00:00
parent ea77b2308b
commit ec45d255f0
19 changed files with 1717 additions and 257 deletions

241
agent.py
View File

@@ -3,10 +3,13 @@ import os
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import JSONResponse
from fastapi import FastAPI, BackgroundTasks, Request
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
import re as _re
import httpx as _httpx
from langchain_ollama import ChatOllama
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_community.utilities import SearxSearchWrapper
@@ -15,6 +18,7 @@ from langchain_core.tools import Tool
from vram_manager import VRAMManager
from router import Router
from agent_factory import build_medium_agent, build_complex_agent
import channels
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
ROUTER_MODEL = os.getenv("DEEPAGENTS_ROUTER_MODEL", "qwen2.5:0.5b")
@@ -22,36 +26,27 @@ MEDIUM_MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:4b")
COMPLEX_MODEL = os.getenv("DEEPAGENTS_COMPLEX_MODEL", "qwen3:8b")
SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437")
OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765")
GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001")
CRAWL4AI_URL = os.getenv("CRAWL4AI_URL", "http://crawl4ai:11235")
MAX_HISTORY_TURNS = 5
_conversation_buffers: dict[str, list] = {}
MEDIUM_SYSTEM_PROMPT = (
"You are a helpful AI assistant talking to a user via Telegram. "
"The user's ID is {user_id}. "
"IMPORTANT: When calling any memory tool (search_memory, get_all_memories), "
"always use user_id=\"{user_id}\". "
"Every conversation is automatically saved to memory after you reply — "
"you do NOT need to explicitly store anything. "
"NEVER tell the user you cannot remember or store information. "
"If the user asks you to remember something, acknowledge it and confirm it will be remembered. "
"Use search_memory when context from past conversations may be relevant. "
"You are a helpful AI assistant. "
"Use web_search for questions about current events or facts you don't know. "
"Reply concisely."
)
COMPLEX_SYSTEM_PROMPT = (
"You are a capable AI assistant tackling a complex, multi-step task for a Telegram user. "
"The user's ID is {user_id}. "
"IMPORTANT: When calling any memory tool (search_memory, get_all_memories), "
"always use user_id=\"{user_id}\". "
"Plan your work using write_todos before diving in. "
"Delegate: use the 'research' subagent for thorough web research across multiple queries, "
"and the 'memory' subagent to gather comprehensive context from past conversations. "
"Every conversation is automatically saved to memory — you do NOT need to store anything. "
"NEVER tell the user you cannot remember or store information. "
"Produce a thorough, well-structured reply."
"You are a deep research assistant. "
"web_search automatically fetches full page content from top results — use it 6+ times with different queries. "
"Also call fetch_url on any specific URL you want to read in full.\n\n"
"Run searches in English AND Russian/Latvian. "
"After getting results, run follow-up searches based on new facts found.\n\n"
"Write a structured markdown report with sections: "
"Overview, Education, Career, Publications, Online Presence, Interesting Findings.\n"
"Every fact must link to the real URL it came from: [fact](url). "
"NEVER invent URLs. End with: **Sources checked: N**"
)
medium_agent = None
@@ -59,24 +54,22 @@ complex_agent = None
router: Router = None
vram_manager: VRAMManager = None
mcp_client = None
send_tool = None
add_memory_tool = None
# GPU mutex: one LLM inference at a time
_reply_semaphore = asyncio.Semaphore(1)
# Memory semaphore: one async extraction at a time
_memory_semaphore = asyncio.Semaphore(1)
@asynccontextmanager
async def lifespan(app: FastAPI):
global medium_agent, complex_agent, router, vram_manager
global mcp_client, send_tool, add_memory_tool
global medium_agent, complex_agent, router, vram_manager, mcp_client
# Register channel adapters
channels.register_defaults()
# Three model instances
router_model = ChatOllama(
model=ROUTER_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=4096,
temperature=0, # deterministic classification
temperature=0,
)
medium_model = ChatOllama(
model=MEDIUM_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=8192
@@ -90,7 +83,6 @@ async def lifespan(app: FastAPI):
mcp_connections = {
"openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"},
"grammy": {"transport": "sse", "url": f"{GRAMMY_URL}/sse"},
}
mcp_client = MultiServerMCPClient(mcp_connections)
for attempt in range(12):
@@ -103,22 +95,90 @@ async def lifespan(app: FastAPI):
print(f"[agent] MCP not ready (attempt {attempt + 1}/12): {e}. Retrying in 5s...")
await asyncio.sleep(5)
send_tool = next((t for t in mcp_tools if t.name == "send_telegram_message"), None)
add_memory_tool = next((t for t in mcp_tools if t.name == "add_memory"), None)
agent_tools = [t for t in mcp_tools if t.name not in ("send_telegram_message", "add_memory")]
agent_tools = [t for t in mcp_tools if t.name not in ("add_memory", "search_memory", "get_all_memories")]
searx = SearxSearchWrapper(searx_host=SEARXNG_URL)
def _crawl4ai_fetch(url: str) -> str:
"""Fetch a URL via Crawl4AI (JS-rendered, bot-bypass) and return clean markdown."""
try:
r = _httpx.post(f"{CRAWL4AI_URL}/crawl", json={"urls": [url]}, timeout=60)
r.raise_for_status()
results = r.json().get("results", [])
if not results or not results[0].get("success"):
return ""
md_obj = results[0].get("markdown") or {}
md = md_obj.get("raw_markdown") if isinstance(md_obj, dict) else str(md_obj)
return (md or "")[:5000]
except Exception as e:
return f"[fetch error: {e}]"
def _search_and_read(query: str) -> str:
"""Search the web and automatically fetch full content of top results.
Returns snippets + full page content from the top URLs."""
import json as _json
# Get structured results from SearXNG
try:
r = _httpx.get(
f"{SEARXNG_URL}/search",
params={"q": query, "format": "json"},
timeout=15,
)
data = r.json()
items = data.get("results", [])[:5]
except Exception as e:
return f"[search error: {e}]"
if not items:
return "No results found."
out = [f"Search: {query}\n"]
for i, item in enumerate(items, 1):
url = item.get("url", "")
title = item.get("title", "")
snippet = item.get("content", "")[:300]
out.append(f"\n[{i}] {title}\nURL: {url}\nSnippet: {snippet}")
# Auto-fetch top 2 URLs for full content
out.append("\n\n--- Full page content ---")
for item in items[:2]:
url = item.get("url", "")
if not url:
continue
content = _crawl4ai_fetch(url)
if content and not content.startswith("[fetch error"):
out.append(f"\n### {url}\n{content[:3000]}")
return "\n".join(out)
agent_tools.append(Tool(
name="web_search",
func=searx.run,
description="Search the web for current information",
func=_search_and_read,
description=(
"Search the web and read full content of top results. "
"Returns search snippets AND full page text from the top URLs. "
"Use multiple different queries to research a topic thoroughly."
),
))
def _fetch_url(url: str) -> str:
"""Fetch and read the full text content of a URL."""
content = _crawl4ai_fetch(url)
return content if content else "[fetch_url: empty or blocked]"
agent_tools.append(Tool(
name="fetch_url",
func=_fetch_url,
description=(
"Fetch and read the full text content of a specific URL. "
"Use for URLs not covered by web_search. Input: a single URL string."
),
))
# Build agents (system_prompt filled per-request with user_id)
medium_agent = build_medium_agent(
model=medium_model,
agent_tools=agent_tools,
system_prompt=MEDIUM_SYSTEM_PROMPT.format(user_id="{user_id}"),
system_prompt=MEDIUM_SYSTEM_PROMPT,
)
complex_agent = build_complex_agent(
model=complex_model,
@@ -139,42 +199,34 @@ async def lifespan(app: FastAPI):
router = None
vram_manager = None
mcp_client = None
send_tool = None
add_memory_tool = None
app = FastAPI(lifespan=lifespan)
# ── request models ─────────────────────────────────────────────────────────────
class InboundMessage(BaseModel):
text: str
session_id: str # e.g. "tg-346967270", "cli-alvis"
channel: str # "telegram" | "cli"
user_id: str = "" # human identity; defaults to session_id if empty
metadata: dict = {}
class ChatRequest(BaseModel):
"""Legacy model — kept for test_pipeline.py compatibility."""
message: str
chat_id: str
async def store_memory_async(conversation: str, user_id: str):
"""Fire-and-forget: extract and store memories after GPU is free."""
t_wait = time.monotonic()
while _reply_semaphore.locked():
if time.monotonic() - t_wait > 60:
print(f"[memory] spin-wait timeout 60s, proceeding for user {user_id}", flush=True)
break
await asyncio.sleep(0.5)
async with _memory_semaphore:
t0 = time.monotonic()
try:
await add_memory_tool.ainvoke({"text": conversation, "user_id": user_id})
print(f"[memory] stored in {time.monotonic() - t0:.1f}s for user {user_id}", flush=True)
except Exception as e:
print(f"[memory] error after {time.monotonic() - t0:.1f}s: {e}", flush=True)
# ── helpers ────────────────────────────────────────────────────────────────────
def _extract_final_text(result) -> str | None:
"""Extract last AIMessage content from agent result."""
msgs = result.get("messages", [])
for m in reversed(msgs):
if type(m).__name__ == "AIMessage" and getattr(m, "content", ""):
return m.content
# deepagents may return output differently
if isinstance(result, dict) and result.get("output"):
return result["output"]
return None
@@ -192,10 +244,11 @@ def _log_messages(result):
print(f"[agent] {role}{tc['name']}({tc['args']})", flush=True)
async def run_agent_task(message: str, chat_id: str):
print(f"[agent] queued: {message[:80]!r} chat={chat_id}", flush=True)
# ── core task ──────────────────────────────────────────────────────────────────
async def run_agent_task(message: str, session_id: str, channel: str = "telegram"):
print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True)
# Pre-check: /think prefix forces complex tier
force_complex = False
clean_message = message
if message.startswith("/think "):
@@ -205,10 +258,9 @@ async def run_agent_task(message: str, chat_id: str):
async with _reply_semaphore:
t0 = time.monotonic()
history = _conversation_buffers.get(chat_id, [])
history = _conversation_buffers.get(session_id, [])
print(f"[agent] running: {clean_message[:80]!r}", flush=True)
# Route the message
tier, light_reply = await router.route(clean_message, history, force_complex)
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
@@ -220,7 +272,7 @@ async def run_agent_task(message: str, chat_id: str):
print(f"[agent] light path: answered by router", flush=True)
elif tier == "medium":
system_prompt = MEDIUM_SYSTEM_PROMPT.format(user_id=chat_id)
system_prompt = MEDIUM_SYSTEM_PROMPT
result = await medium_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
@@ -237,16 +289,15 @@ async def run_agent_task(message: str, chat_id: str):
if not ok:
print("[agent] complex→medium fallback (eviction timeout)", flush=True)
tier = "medium"
system_prompt = MEDIUM_SYSTEM_PROMPT.format(user_id=chat_id)
result = await medium_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
{"role": "system", "content": MEDIUM_SYSTEM_PROMPT},
*history,
{"role": "user", "content": clean_message},
]
})
else:
system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=chat_id)
system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id)
result = await complex_agent.ainvoke({
"messages": [
{"role": "system", "content": system_prompt},
@@ -263,47 +314,73 @@ async def run_agent_task(message: str, chat_id: str):
except Exception as e:
import traceback
llm_elapsed = time.monotonic() - t0
print(f"[agent] error after {llm_elapsed:.1f}s for chat {chat_id}: {e}", flush=True)
print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True)
traceback.print_exc()
# Send reply via grammy MCP (split if > Telegram's 4096-char limit)
if final_text and send_tool:
# Deliver reply through the originating channel
if final_text:
t1 = time.monotonic()
MAX_TG = 4000 # leave headroom below the 4096 hard limit
chunks = [final_text[i:i + MAX_TG] for i in range(0, len(final_text), MAX_TG)]
for chunk in chunks:
await send_tool.ainvoke({"chat_id": chat_id, "text": chunk})
await channels.deliver(session_id, channel, final_text)
send_elapsed = time.monotonic() - t1
# Log in format compatible with test_pipeline.py parser
print(
f"[agent] replied in {time.monotonic() - t0:.1f}s "
f"(llm={llm_elapsed:.1f}s, send={send_elapsed:.1f}s) tier={tier}",
flush=True,
)
elif not final_text:
print(f"[agent] reply_text: {final_text}", flush=True)
else:
print("[agent] warning: no text reply from agent", flush=True)
# Update conversation buffer
if final_text:
buf = _conversation_buffers.get(chat_id, [])
buf = _conversation_buffers.get(session_id, [])
buf.append({"role": "user", "content": clean_message})
buf.append({"role": "assistant", "content": final_text})
_conversation_buffers[chat_id] = buf[-(MAX_HISTORY_TURNS * 2):]
_conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):]
# Async memory storage (fire-and-forget)
if add_memory_tool and final_text:
conversation = f"User: {clean_message}\nAssistant: {final_text}"
asyncio.create_task(store_memory_async(conversation, chat_id))
# ── endpoints ──────────────────────────────────────────────────────────────────
@app.post("/message")
async def message(request: InboundMessage, background_tasks: BackgroundTasks):
"""Unified inbound endpoint for all channels."""
if medium_agent is None:
return JSONResponse(status_code=503, content={"error": "Agent not ready"})
session_id = request.session_id
channel = request.channel
background_tasks.add_task(run_agent_task, request.text, session_id, channel)
return JSONResponse(status_code=202, content={"status": "accepted"})
@app.post("/chat")
async def chat(request: ChatRequest, background_tasks: BackgroundTasks):
"""Legacy endpoint — maps chat_id to tg-<chat_id> session for backward compatibility."""
if medium_agent is None:
return JSONResponse(status_code=503, content={"error": "Agent not ready"})
background_tasks.add_task(run_agent_task, request.message, request.chat_id)
session_id = f"tg-{request.chat_id}"
background_tasks.add_task(run_agent_task, request.message, session_id, "telegram")
return JSONResponse(status_code=202, content={"status": "accepted"})
@app.get("/reply/{session_id}")
async def reply_stream(session_id: str):
"""
SSE endpoint — streams the reply for a session once available, then closes.
Used by CLI client and wiki_research.py instead of log polling.
"""
q = channels.pending_replies.setdefault(session_id, asyncio.Queue())
async def event_generator():
try:
text = await asyncio.wait_for(q.get(), timeout=900)
# Escape newlines so entire reply fits in one SSE data line
yield f"data: {text.replace(chr(10), '\\n').replace(chr(13), '')}\n\n"
except asyncio.TimeoutError:
yield "data: [timeout]\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.get("/health")
async def health():
return {"status": "ok", "agent_ready": medium_agent is not None}