- Tell agent that memory is saved automatically after every reply - Instruct agent to never say it cannot store information - Instruct agent to acknowledge and confirm when user asks to remember something - Fix misleading startup log (gemma3:1b → qwen2.5:1.5b) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
175 lines
6.7 KiB
Python
175 lines
6.7 KiB
Python
import asyncio
|
|
import os
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
|
|
from fastapi import FastAPI, BackgroundTasks
|
|
from fastapi.responses import JSONResponse
|
|
from pydantic import BaseModel
|
|
|
|
from langchain_ollama import ChatOllama
|
|
from langchain_mcp_adapters.client import MultiServerMCPClient
|
|
from langchain_community.utilities import SearxSearchWrapper
|
|
from langchain_core.tools import Tool
|
|
from langgraph.prebuilt import create_react_agent
|
|
|
|
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
|
|
MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:8b")
|
|
SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437")
|
|
OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765")
|
|
GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001")
|
|
|
|
SYSTEM_PROMPT_TEMPLATE = (
|
|
"You are a helpful AI assistant talking to a user via Telegram. "
|
|
"The user's ID is {user_id}. "
|
|
"IMPORTANT: When calling any memory tool (search_memory, get_all_memories), "
|
|
"always use user_id=\"{user_id}\". "
|
|
"Every conversation is automatically saved to memory after you reply — "
|
|
"you do NOT need to explicitly store anything. "
|
|
"NEVER tell the user you cannot remember or store information. "
|
|
"If the user asks you to remember something, acknowledge it and confirm it will be remembered. "
|
|
"Always call search_memory before answering to recall relevant past context. "
|
|
"Use web_search for questions about current events. "
|
|
"Reply concisely."
|
|
)
|
|
|
|
agent = None
|
|
mcp_client = None
|
|
send_tool = None
|
|
add_memory_tool = None
|
|
|
|
# GPU semaphore: one LLM inference at a time
|
|
_reply_semaphore = asyncio.Semaphore(1)
|
|
# CPU semaphore: one memory store at a time (runs on CPU Ollama, no GPU contention)
|
|
_memory_semaphore = asyncio.Semaphore(1)
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
global agent, mcp_client, send_tool, add_memory_tool
|
|
|
|
model = ChatOllama(model=MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=8192)
|
|
|
|
mcp_connections = {
|
|
"openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"},
|
|
"grammy": {"transport": "sse", "url": f"{GRAMMY_URL}/sse"},
|
|
}
|
|
|
|
mcp_client = MultiServerMCPClient(mcp_connections)
|
|
for attempt in range(12):
|
|
try:
|
|
mcp_tools = await mcp_client.get_tools()
|
|
break
|
|
except Exception as e:
|
|
if attempt == 11:
|
|
raise
|
|
print(f"[agent] MCP not ready (attempt {attempt + 1}/12): {e}. Retrying in 5s...")
|
|
await asyncio.sleep(5)
|
|
|
|
# Split tools: send is called by us, add_memory runs async after reply
|
|
send_tool = next((t for t in mcp_tools if t.name == "send_telegram_message"), None)
|
|
add_memory_tool = next((t for t in mcp_tools if t.name == "add_memory"), None)
|
|
# Agent only gets read/search tools — no add_memory (would block reply)
|
|
agent_tools = [t for t in mcp_tools if t.name not in ("send_telegram_message", "add_memory")]
|
|
|
|
searx = SearxSearchWrapper(searx_host=SEARXNG_URL)
|
|
agent_tools.append(Tool(
|
|
name="web_search",
|
|
func=searx.run,
|
|
description="Search the web for current information",
|
|
))
|
|
|
|
agent = create_react_agent(model, agent_tools)
|
|
print(f"[agent] ready — agent tools: {[t.name for t in agent_tools]}", flush=True)
|
|
print(f"[agent] async memory: add_memory via CPU Ollama (qwen2.5:1.5b + nomic-embed-text)", flush=True)
|
|
|
|
yield
|
|
|
|
agent = None
|
|
mcp_client = None
|
|
send_tool = None
|
|
add_memory_tool = None
|
|
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
|
|
|
|
class ChatRequest(BaseModel):
|
|
message: str
|
|
chat_id: str
|
|
|
|
|
|
async def store_memory_async(conversation: str, user_id: str):
|
|
"""Fire-and-forget: extract and store memories using CPU Ollama. Never blocks replies."""
|
|
async with _memory_semaphore:
|
|
t0 = time.monotonic()
|
|
try:
|
|
await add_memory_tool.ainvoke({"text": conversation, "user_id": user_id})
|
|
print(f"[memory] stored in {time.monotonic() - t0:.1f}s for user {user_id}", flush=True)
|
|
except Exception as e:
|
|
print(f"[memory] error after {time.monotonic() - t0:.1f}s: {e}", flush=True)
|
|
|
|
|
|
async def run_agent_task(message: str, chat_id: str):
|
|
print(f"[agent] queued: {message[:80]!r} chat={chat_id}", flush=True)
|
|
async with _reply_semaphore:
|
|
t0 = time.monotonic()
|
|
print(f"[agent] running: {message[:80]!r}", flush=True)
|
|
try:
|
|
system_prompt = SYSTEM_PROMPT_TEMPLATE.format(user_id=chat_id)
|
|
result = await agent.ainvoke(
|
|
{"messages": [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": message},
|
|
]}
|
|
)
|
|
llm_elapsed = time.monotonic() - t0
|
|
|
|
# Log trace
|
|
msgs = result.get("messages", [])
|
|
for m in msgs:
|
|
role = type(m).__name__
|
|
content = getattr(m, "content", "")
|
|
tool_calls = getattr(m, "tool_calls", [])
|
|
if content:
|
|
print(f"[agent] {role}: {str(content)[:150]}", flush=True)
|
|
for tc in tool_calls:
|
|
print(f"[agent] {role} → {tc['name']}({tc['args']})", flush=True)
|
|
|
|
# Send reply immediately
|
|
final_text = None
|
|
for m in reversed(msgs):
|
|
if type(m).__name__ == "AIMessage" and getattr(m, "content", ""):
|
|
final_text = m.content
|
|
break
|
|
|
|
if final_text and send_tool:
|
|
t1 = time.monotonic()
|
|
await send_tool.ainvoke({"chat_id": chat_id, "text": final_text})
|
|
print(f"[agent] replied in {time.monotonic() - t0:.1f}s (llm={llm_elapsed:.1f}s, send={time.monotonic()-t1:.1f}s)", flush=True)
|
|
elif not final_text:
|
|
print(f"[agent] warning: no text reply from agent", flush=True)
|
|
|
|
# Async memoization: runs on CPU Ollama, does not block next reply
|
|
if add_memory_tool and final_text:
|
|
conversation = f"User: {message}\nAssistant: {final_text}"
|
|
asyncio.create_task(store_memory_async(conversation, chat_id))
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
print(f"[agent] error after {time.monotonic()-t0:.1f}s for chat {chat_id}: {e}", flush=True)
|
|
traceback.print_exc()
|
|
|
|
|
|
@app.post("/chat")
|
|
async def chat(request: ChatRequest, background_tasks: BackgroundTasks):
|
|
if agent is None:
|
|
return JSONResponse(status_code=503, content={"error": "Agent not ready"})
|
|
background_tasks.add_task(run_agent_task, request.message, request.chat_id)
|
|
return JSONResponse(status_code=202, content={"status": "accepted"})
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "ok", "agent_ready": agent is not None}
|