From 1718d70203aa9c4f157a7277c19b915bab82014d Mon Sep 17 00:00:00 2001 From: Alvis Date: Mon, 23 Feb 2026 05:22:08 +0000 Subject: [PATCH] Fix system prompt: agent now correctly handles memory requests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Tell agent that memory is saved automatically after every reply - Instruct agent to never say it cannot store information - Instruct agent to acknowledge and confirm when user asks to remember something - Fix misleading startup log (gemma3:1b → qwen2.5:1.5b) Co-Authored-By: Claude Sonnet 4.6 --- agent.py | 174 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 agent.py diff --git a/agent.py b/agent.py new file mode 100644 index 0000000..c54952e --- /dev/null +++ b/agent.py @@ -0,0 +1,174 @@ +import asyncio +import os +import time +from contextlib import asynccontextmanager + +from fastapi import FastAPI, BackgroundTasks +from fastapi.responses import JSONResponse +from pydantic import BaseModel + +from langchain_ollama import ChatOllama +from langchain_mcp_adapters.client import MultiServerMCPClient +from langchain_community.utilities import SearxSearchWrapper +from langchain_core.tools import Tool +from langgraph.prebuilt import create_react_agent + +OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") +MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:8b") +SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437") +OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765") +GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001") + +SYSTEM_PROMPT_TEMPLATE = ( + "You are a helpful AI assistant talking to a user via Telegram. " + "The user's ID is {user_id}. " + "IMPORTANT: When calling any memory tool (search_memory, get_all_memories), " + "always use user_id=\"{user_id}\". " + "Every conversation is automatically saved to memory after you reply — " + "you do NOT need to explicitly store anything. " + "NEVER tell the user you cannot remember or store information. " + "If the user asks you to remember something, acknowledge it and confirm it will be remembered. " + "Always call search_memory before answering to recall relevant past context. " + "Use web_search for questions about current events. " + "Reply concisely." +) + +agent = None +mcp_client = None +send_tool = None +add_memory_tool = None + +# GPU semaphore: one LLM inference at a time +_reply_semaphore = asyncio.Semaphore(1) +# CPU semaphore: one memory store at a time (runs on CPU Ollama, no GPU contention) +_memory_semaphore = asyncio.Semaphore(1) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + global agent, mcp_client, send_tool, add_memory_tool + + model = ChatOllama(model=MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=8192) + + mcp_connections = { + "openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"}, + "grammy": {"transport": "sse", "url": f"{GRAMMY_URL}/sse"}, + } + + mcp_client = MultiServerMCPClient(mcp_connections) + for attempt in range(12): + try: + mcp_tools = await mcp_client.get_tools() + break + except Exception as e: + if attempt == 11: + raise + print(f"[agent] MCP not ready (attempt {attempt + 1}/12): {e}. Retrying in 5s...") + await asyncio.sleep(5) + + # Split tools: send is called by us, add_memory runs async after reply + send_tool = next((t for t in mcp_tools if t.name == "send_telegram_message"), None) + add_memory_tool = next((t for t in mcp_tools if t.name == "add_memory"), None) + # Agent only gets read/search tools — no add_memory (would block reply) + agent_tools = [t for t in mcp_tools if t.name not in ("send_telegram_message", "add_memory")] + + searx = SearxSearchWrapper(searx_host=SEARXNG_URL) + agent_tools.append(Tool( + name="web_search", + func=searx.run, + description="Search the web for current information", + )) + + agent = create_react_agent(model, agent_tools) + print(f"[agent] ready — agent tools: {[t.name for t in agent_tools]}", flush=True) + print(f"[agent] async memory: add_memory via CPU Ollama (qwen2.5:1.5b + nomic-embed-text)", flush=True) + + yield + + agent = None + mcp_client = None + send_tool = None + add_memory_tool = None + + +app = FastAPI(lifespan=lifespan) + + +class ChatRequest(BaseModel): + message: str + chat_id: str + + +async def store_memory_async(conversation: str, user_id: str): + """Fire-and-forget: extract and store memories using CPU Ollama. Never blocks replies.""" + async with _memory_semaphore: + t0 = time.monotonic() + try: + await add_memory_tool.ainvoke({"text": conversation, "user_id": user_id}) + print(f"[memory] stored in {time.monotonic() - t0:.1f}s for user {user_id}", flush=True) + except Exception as e: + print(f"[memory] error after {time.monotonic() - t0:.1f}s: {e}", flush=True) + + +async def run_agent_task(message: str, chat_id: str): + print(f"[agent] queued: {message[:80]!r} chat={chat_id}", flush=True) + async with _reply_semaphore: + t0 = time.monotonic() + print(f"[agent] running: {message[:80]!r}", flush=True) + try: + system_prompt = SYSTEM_PROMPT_TEMPLATE.format(user_id=chat_id) + result = await agent.ainvoke( + {"messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": message}, + ]} + ) + llm_elapsed = time.monotonic() - t0 + + # Log trace + msgs = result.get("messages", []) + for m in msgs: + role = type(m).__name__ + content = getattr(m, "content", "") + tool_calls = getattr(m, "tool_calls", []) + if content: + print(f"[agent] {role}: {str(content)[:150]}", flush=True) + for tc in tool_calls: + print(f"[agent] {role} → {tc['name']}({tc['args']})", flush=True) + + # Send reply immediately + final_text = None + for m in reversed(msgs): + if type(m).__name__ == "AIMessage" and getattr(m, "content", ""): + final_text = m.content + break + + if final_text and send_tool: + t1 = time.monotonic() + await send_tool.ainvoke({"chat_id": chat_id, "text": final_text}) + print(f"[agent] replied in {time.monotonic() - t0:.1f}s (llm={llm_elapsed:.1f}s, send={time.monotonic()-t1:.1f}s)", flush=True) + elif not final_text: + print(f"[agent] warning: no text reply from agent", flush=True) + + # Async memoization: runs on CPU Ollama, does not block next reply + if add_memory_tool and final_text: + conversation = f"User: {message}\nAssistant: {final_text}" + asyncio.create_task(store_memory_async(conversation, chat_id)) + + except Exception as e: + import traceback + print(f"[agent] error after {time.monotonic()-t0:.1f}s for chat {chat_id}: {e}", flush=True) + traceback.print_exc() + + +@app.post("/chat") +async def chat(request: ChatRequest, background_tasks: BackgroundTasks): + if agent is None: + return JSONResponse(status_code=503, content={"error": "Agent not ready"}) + background_tasks.add_task(run_agent_task, request.message, request.chat_id) + return JSONResponse(status_code=202, content={"status": "accepted"}) + + +@app.get("/health") +async def health(): + return {"status": "ok", "agent_ready": agent is not None}