import asyncio import os import time from contextlib import asynccontextmanager from fastapi import FastAPI, BackgroundTasks from fastapi.responses import JSONResponse from pydantic import BaseModel from langchain_ollama import ChatOllama from langchain_mcp_adapters.client import MultiServerMCPClient from langchain_community.utilities import SearxSearchWrapper from langchain_core.tools import Tool from langgraph.prebuilt import create_react_agent OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:8b") SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437") OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765") GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001") SYSTEM_PROMPT_TEMPLATE = ( "You are a helpful AI assistant talking to a user via Telegram. " "The user's ID is {user_id}. " "IMPORTANT: When calling any memory tool (search_memory, get_all_memories), " "always use user_id=\"{user_id}\". " "Every conversation is automatically saved to memory after you reply — " "you do NOT need to explicitly store anything. " "NEVER tell the user you cannot remember or store information. " "If the user asks you to remember something, acknowledge it and confirm it will be remembered. " "Always call search_memory before answering to recall relevant past context. " "Use web_search for questions about current events. " "Reply concisely." ) agent = None mcp_client = None send_tool = None add_memory_tool = None # GPU semaphore: one LLM inference at a time _reply_semaphore = asyncio.Semaphore(1) # CPU semaphore: one memory store at a time (runs on CPU Ollama, no GPU contention) _memory_semaphore = asyncio.Semaphore(1) @asynccontextmanager async def lifespan(app: FastAPI): global agent, mcp_client, send_tool, add_memory_tool model = ChatOllama(model=MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=8192) mcp_connections = { "openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"}, "grammy": {"transport": "sse", "url": f"{GRAMMY_URL}/sse"}, } mcp_client = MultiServerMCPClient(mcp_connections) for attempt in range(12): try: mcp_tools = await mcp_client.get_tools() break except Exception as e: if attempt == 11: raise print(f"[agent] MCP not ready (attempt {attempt + 1}/12): {e}. Retrying in 5s...") await asyncio.sleep(5) # Split tools: send is called by us, add_memory runs async after reply send_tool = next((t for t in mcp_tools if t.name == "send_telegram_message"), None) add_memory_tool = next((t for t in mcp_tools if t.name == "add_memory"), None) # Agent only gets read/search tools — no add_memory (would block reply) agent_tools = [t for t in mcp_tools if t.name not in ("send_telegram_message", "add_memory")] searx = SearxSearchWrapper(searx_host=SEARXNG_URL) agent_tools.append(Tool( name="web_search", func=searx.run, description="Search the web for current information", )) agent = create_react_agent(model, agent_tools) print(f"[agent] ready — agent tools: {[t.name for t in agent_tools]}", flush=True) print(f"[agent] async memory: add_memory via CPU Ollama (qwen2.5:1.5b + nomic-embed-text)", flush=True) yield agent = None mcp_client = None send_tool = None add_memory_tool = None app = FastAPI(lifespan=lifespan) class ChatRequest(BaseModel): message: str chat_id: str async def store_memory_async(conversation: str, user_id: str): """Fire-and-forget: extract and store memories using CPU Ollama. Never blocks replies.""" async with _memory_semaphore: t0 = time.monotonic() try: await add_memory_tool.ainvoke({"text": conversation, "user_id": user_id}) print(f"[memory] stored in {time.monotonic() - t0:.1f}s for user {user_id}", flush=True) except Exception as e: print(f"[memory] error after {time.monotonic() - t0:.1f}s: {e}", flush=True) async def run_agent_task(message: str, chat_id: str): print(f"[agent] queued: {message[:80]!r} chat={chat_id}", flush=True) async with _reply_semaphore: t0 = time.monotonic() print(f"[agent] running: {message[:80]!r}", flush=True) try: system_prompt = SYSTEM_PROMPT_TEMPLATE.format(user_id=chat_id) result = await agent.ainvoke( {"messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": message}, ]} ) llm_elapsed = time.monotonic() - t0 # Log trace msgs = result.get("messages", []) for m in msgs: role = type(m).__name__ content = getattr(m, "content", "") tool_calls = getattr(m, "tool_calls", []) if content: print(f"[agent] {role}: {str(content)[:150]}", flush=True) for tc in tool_calls: print(f"[agent] {role} → {tc['name']}({tc['args']})", flush=True) # Send reply immediately final_text = None for m in reversed(msgs): if type(m).__name__ == "AIMessage" and getattr(m, "content", ""): final_text = m.content break if final_text and send_tool: t1 = time.monotonic() await send_tool.ainvoke({"chat_id": chat_id, "text": final_text}) print(f"[agent] replied in {time.monotonic() - t0:.1f}s (llm={llm_elapsed:.1f}s, send={time.monotonic()-t1:.1f}s)", flush=True) elif not final_text: print(f"[agent] warning: no text reply from agent", flush=True) # Async memoization: runs on CPU Ollama, does not block next reply if add_memory_tool and final_text: conversation = f"User: {message}\nAssistant: {final_text}" asyncio.create_task(store_memory_async(conversation, chat_id)) except Exception as e: import traceback print(f"[agent] error after {time.monotonic()-t0:.1f}s for chat {chat_id}: {e}", flush=True) traceback.print_exc() @app.post("/chat") async def chat(request: ChatRequest, background_tasks: BackgroundTasks): if agent is None: return JSONResponse(status_code=503, content={"error": "Agent not ready"}) background_tasks.add_task(run_agent_task, request.message, request.chat_id) return JSONResponse(status_code=202, content={"status": "accepted"}) @app.get("/health") async def health(): return {"status": "ok", "agent_ready": agent is not None}