import asyncio import os import time from contextlib import asynccontextmanager from fastapi import FastAPI, BackgroundTasks, Request from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel import re as _re import httpx as _httpx from langchain_openai import ChatOpenAI from langchain_mcp_adapters.client import MultiServerMCPClient from langchain_community.utilities import SearxSearchWrapper from langchain_core.tools import Tool from vram_manager import VRAMManager from router import Router from agent_factory import build_medium_agent, build_complex_agent import channels # Bifrost gateway — all LLM inference goes through here BIFROST_URL = os.getenv("BIFROST_URL", "http://bifrost:8080/v1") # Direct Ollama URL — used only by VRAMManager for flush/prewarm/poll OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") ROUTER_MODEL = os.getenv("DEEPAGENTS_ROUTER_MODEL", "qwen2.5:1.5b") MEDIUM_MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:4b") COMPLEX_MODEL = os.getenv("DEEPAGENTS_COMPLEX_MODEL", "qwen3:8b") SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437") OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765") CRAWL4AI_URL = os.getenv("CRAWL4AI_URL", "http://crawl4ai:11235") MAX_HISTORY_TURNS = 5 _conversation_buffers: dict[str, list] = {} # /no_think at the start of the system prompt disables qwen3 chain-of-thought. # create_deep_agent prepends our system_prompt before BASE_AGENT_PROMPT, so # /no_think lands at position 0 and is respected by qwen3 models via Ollama. MEDIUM_SYSTEM_PROMPT = ( "You are a helpful AI assistant. Reply concisely. " "If asked to remember a fact or name, simply confirm: 'Got it, I'll remember that.'" ) COMPLEX_SYSTEM_PROMPT = ( "You are a deep research assistant. " "web_search automatically fetches full page content from top results — use it 6+ times with different queries. " "Also call fetch_url on any specific URL you want to read in full.\n\n" "Run searches in English AND Russian/Latvian. " "After getting results, run follow-up searches based on new facts found.\n\n" "Write a structured markdown report with sections: " "Overview, Education, Career, Publications, Online Presence, Interesting Findings.\n" "Every fact must link to the real URL it came from: [fact](url). " "NEVER invent URLs. End with: **Sources checked: N**" ) medium_agent = None complex_agent = None router: Router = None vram_manager: VRAMManager = None mcp_client = None _memory_add_tool = None _memory_search_tool = None # GPU mutex: one LLM inference at a time _reply_semaphore = asyncio.Semaphore(1) @asynccontextmanager async def lifespan(app: FastAPI): global medium_agent, complex_agent, router, vram_manager, mcp_client, \ _memory_add_tool, _memory_search_tool # Register channel adapters channels.register_defaults() # All three models route through Bifrost → Ollama GPU. # Bifrost adds retry logic, observability, and failover. # Model names use provider/model format: Bifrost strips the "ollama/" prefix # before forwarding to Ollama's /v1/chat/completions endpoint. router_model = ChatOpenAI( model=f"ollama/{ROUTER_MODEL}", base_url=BIFROST_URL, api_key="dummy", temperature=0, timeout=30, ) medium_model = ChatOpenAI( model=f"ollama/{MEDIUM_MODEL}", base_url=BIFROST_URL, api_key="dummy", timeout=180, ) complex_model = ChatOpenAI( model=f"ollama/{COMPLEX_MODEL}", base_url=BIFROST_URL, api_key="dummy", timeout=600, ) vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL) router = Router(model=router_model) mcp_connections = { "openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"}, } mcp_client = MultiServerMCPClient(mcp_connections) for attempt in range(12): try: mcp_tools = await mcp_client.get_tools() break except Exception as e: if attempt == 11: raise print(f"[agent] MCP not ready (attempt {attempt + 1}/12): {e}. Retrying in 5s...") await asyncio.sleep(5) agent_tools = [t for t in mcp_tools if t.name not in ("add_memory", "search_memory", "get_all_memories")] # Expose memory tools directly so run_agent_task can call them outside the agent loop for t in mcp_tools: if t.name == "add_memory": _memory_add_tool = t elif t.name == "search_memory": _memory_search_tool = t searx = SearxSearchWrapper(searx_host=SEARXNG_URL) def _crawl4ai_fetch(url: str) -> str: """Fetch a URL via Crawl4AI (JS-rendered, bot-bypass) and return clean markdown.""" try: r = _httpx.post(f"{CRAWL4AI_URL}/crawl", json={"urls": [url]}, timeout=60) r.raise_for_status() results = r.json().get("results", []) if not results or not results[0].get("success"): return "" md_obj = results[0].get("markdown") or {} md = md_obj.get("raw_markdown") if isinstance(md_obj, dict) else str(md_obj) return (md or "")[:5000] except Exception as e: return f"[fetch error: {e}]" def _search_and_read(query: str) -> str: """Search the web and automatically fetch full content of top results. Returns snippets + full page content from the top URLs.""" import json as _json # Get structured results from SearXNG try: r = _httpx.get( f"{SEARXNG_URL}/search", params={"q": query, "format": "json"}, timeout=15, ) data = r.json() items = data.get("results", [])[:5] except Exception as e: return f"[search error: {e}]" if not items: return "No results found." out = [f"Search: {query}\n"] for i, item in enumerate(items, 1): url = item.get("url", "") title = item.get("title", "") snippet = item.get("content", "")[:300] out.append(f"\n[{i}] {title}\nURL: {url}\nSnippet: {snippet}") # Auto-fetch top 2 URLs for full content out.append("\n\n--- Full page content ---") for item in items[:2]: url = item.get("url", "") if not url: continue content = _crawl4ai_fetch(url) if content and not content.startswith("[fetch error"): out.append(f"\n### {url}\n{content[:3000]}") return "\n".join(out) agent_tools.append(Tool( name="web_search", func=_search_and_read, description=( "Search the web and read full content of top results. " "Returns search snippets AND full page text from the top URLs. " "Use multiple different queries to research a topic thoroughly." ), )) def _fetch_url(url: str) -> str: """Fetch and read the full text content of a URL.""" content = _crawl4ai_fetch(url) return content if content else "[fetch_url: empty or blocked]" agent_tools.append(Tool( name="fetch_url", func=_fetch_url, description=( "Fetch and read the full text content of a specific URL. " "Use for URLs not covered by web_search. Input: a single URL string." ), )) medium_agent = build_medium_agent( model=medium_model, agent_tools=agent_tools, system_prompt=MEDIUM_SYSTEM_PROMPT, ) complex_agent = build_complex_agent( model=complex_model, agent_tools=agent_tools, system_prompt=COMPLEX_SYSTEM_PROMPT.format(user_id="{user_id}"), ) print( f"[agent] bifrost={BIFROST_URL} | router=ollama/{ROUTER_MODEL} | " f"medium=ollama/{MEDIUM_MODEL} | complex=ollama/{COMPLEX_MODEL}", flush=True, ) print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True) yield medium_agent = None complex_agent = None router = None vram_manager = None mcp_client = None app = FastAPI(lifespan=lifespan) # ── request models ───────────────────────────────────────────────────────────── class InboundMessage(BaseModel): text: str session_id: str # e.g. "tg-346967270", "cli-alvis" channel: str # "telegram" | "cli" user_id: str = "" # human identity; defaults to session_id if empty metadata: dict = {} class ChatRequest(BaseModel): """Legacy model — kept for test_pipeline.py compatibility.""" message: str chat_id: str # ── helpers ──────────────────────────────────────────────────────────────────── def _strip_think(text: str) -> str: """Strip qwen3 chain-of-thought blocks that appear inline in content when using Ollama's OpenAI-compatible endpoint (/v1/chat/completions).""" return _re.sub(r".*?", "", text, flags=_re.DOTALL).strip() def _extract_final_text(result) -> str | None: msgs = result.get("messages", []) for m in reversed(msgs): if type(m).__name__ == "AIMessage" and getattr(m, "content", ""): return _strip_think(m.content) if isinstance(result, dict) and result.get("output"): return _strip_think(result["output"]) return None def _log_messages(result): msgs = result.get("messages", []) for m in msgs: role = type(m).__name__ content = getattr(m, "content", "") tool_calls = getattr(m, "tool_calls", []) if content: print(f"[agent] {role}: {str(content)[:150]}", flush=True) for tc in tool_calls: print(f"[agent] {role} → {tc['name']}({tc['args']})", flush=True) # ── memory helpers ───────────────────────────────────────────────────────────── async def _store_memory(session_id: str, user_msg: str, assistant_reply: str) -> None: """Store a conversation turn in openmemory (runs as a background task).""" if _memory_add_tool is None: return t0 = time.monotonic() try: text = f"User: {user_msg}\nAssistant: {assistant_reply}" await _memory_add_tool.ainvoke({"text": text, "user_id": session_id}) print(f"[memory] stored in {time.monotonic() - t0:.1f}s", flush=True) except Exception as e: print(f"[memory] error: {e}", flush=True) async def _retrieve_memories(message: str, session_id: str) -> str: """Search openmemory for relevant context. Returns formatted string or ''.""" if _memory_search_tool is None: return "" try: result = await _memory_search_tool.ainvoke({"query": message, "user_id": session_id}) if result and result.strip() and result.strip() != "[]": return f"Relevant memories:\n{result}" except Exception: pass return "" # ── core task ────────────────────────────────────────────────────────────────── async def run_agent_task(message: str, session_id: str, channel: str = "telegram"): print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True) force_complex = False clean_message = message if message.startswith("/think "): force_complex = True clean_message = message[len("/think "):] print("[agent] /think prefix → force_complex=True", flush=True) async with _reply_semaphore: t0 = time.monotonic() history = _conversation_buffers.get(session_id, []) print(f"[agent] running: {clean_message[:80]!r}", flush=True) # Retrieve memories once; inject into history so ALL tiers can use them memories = await _retrieve_memories(clean_message, session_id) enriched_history = ( [{"role": "system", "content": memories}] + history if memories else history ) tier, light_reply = await router.route(clean_message, enriched_history, force_complex) print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True) final_text = None try: if tier == "light": final_text = light_reply llm_elapsed = time.monotonic() - t0 print(f"[agent] light path: answered by router", flush=True) elif tier == "medium": system_prompt = MEDIUM_SYSTEM_PROMPT if memories: system_prompt = system_prompt + "\n\n" + memories result = await medium_agent.ainvoke({ "messages": [ {"role": "system", "content": system_prompt}, *history, {"role": "user", "content": clean_message}, ] }) llm_elapsed = time.monotonic() - t0 _log_messages(result) final_text = _extract_final_text(result) else: # complex ok = await vram_manager.enter_complex_mode() if not ok: print("[agent] complex→medium fallback (eviction timeout)", flush=True) tier = "medium" system_prompt = MEDIUM_SYSTEM_PROMPT if memories: system_prompt = system_prompt + "\n\n" + memories result = await medium_agent.ainvoke({ "messages": [ {"role": "system", "content": system_prompt}, *history, {"role": "user", "content": clean_message}, ] }) else: system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id) result = await complex_agent.ainvoke({ "messages": [ {"role": "system", "content": system_prompt}, *history, {"role": "user", "content": clean_message}, ] }) asyncio.create_task(vram_manager.exit_complex_mode()) llm_elapsed = time.monotonic() - t0 _log_messages(result) final_text = _extract_final_text(result) except Exception as e: import traceback llm_elapsed = time.monotonic() - t0 print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True) traceback.print_exc() # Deliver reply through the originating channel if final_text: t1 = time.monotonic() try: await channels.deliver(session_id, channel, final_text) except Exception as e: print(f"[agent] delivery error (non-fatal): {e}", flush=True) send_elapsed = time.monotonic() - t1 print( f"[agent] replied in {time.monotonic() - t0:.1f}s " f"(llm={llm_elapsed:.1f}s, send={send_elapsed:.1f}s) tier={tier}", flush=True, ) print(f"[agent] reply_text: {final_text}", flush=True) else: print("[agent] warning: no text reply from agent", flush=True) # Update conversation buffer and schedule memory storage if final_text: buf = _conversation_buffers.get(session_id, []) buf.append({"role": "user", "content": clean_message}) buf.append({"role": "assistant", "content": final_text}) _conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):] asyncio.create_task(_store_memory(session_id, clean_message, final_text)) # ── endpoints ────────────────────────────────────────────────────────────────── @app.post("/message") async def message(request: InboundMessage, background_tasks: BackgroundTasks): """Unified inbound endpoint for all channels.""" if medium_agent is None: return JSONResponse(status_code=503, content={"error": "Agent not ready"}) session_id = request.session_id channel = request.channel background_tasks.add_task(run_agent_task, request.text, session_id, channel) return JSONResponse(status_code=202, content={"status": "accepted"}) @app.post("/chat") async def chat(request: ChatRequest, background_tasks: BackgroundTasks): """Legacy endpoint — maps chat_id to tg- session for backward compatibility.""" if medium_agent is None: return JSONResponse(status_code=503, content={"error": "Agent not ready"}) session_id = f"tg-{request.chat_id}" background_tasks.add_task(run_agent_task, request.message, session_id, "telegram") return JSONResponse(status_code=202, content={"status": "accepted"}) @app.get("/reply/{session_id}") async def reply_stream(session_id: str): """ SSE endpoint — streams the reply for a session once available, then closes. Used by CLI client and wiki_research.py instead of log polling. """ q = channels.pending_replies.setdefault(session_id, asyncio.Queue()) async def event_generator(): try: text = await asyncio.wait_for(q.get(), timeout=900) # Escape newlines so entire reply fits in one SSE data line yield f"data: {text.replace(chr(10), chr(92) + 'n').replace(chr(13), '')}\n\n" except asyncio.TimeoutError: yield "data: [timeout]\n\n" return StreamingResponse(event_generator(), media_type="text/event-stream") @app.get("/health") async def health(): return {"status": "ok", "agent_ready": medium_agent is not None}