Add three-tier model routing with VRAM management and benchmark suite
- Three-tier routing: light (router answers directly ~3s), medium (qwen3:4b + tools ~60s), complex (/think prefix → qwen3:8b + subagents ~140s) - Router: qwen2.5:1.5b, temp=0, regex pre-classifier + raw-text LLM classify - VRAMManager: explicit flush/poll/prewarm to prevent Ollama CPU-spill bug - agent_factory: build_medium_agent and build_complex_agent using deepagents (TodoListMiddleware + SubAgentMiddleware with research/memory subagents) - Fix: split Telegram replies >4000 chars into multiple messages - Benchmark: 30 questions (easy/medium/hard) — 10/10/10 verified passing easy→light, medium→medium, hard→complex with VRAM flush confirmed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
142
ARCHITECTURE.md
142
ARCHITECTURE.md
@@ -1,6 +1,6 @@
|
|||||||
# Adolf
|
# Adolf
|
||||||
|
|
||||||
Persistent AI assistant reachable via Telegram. GPU-accelerated inference with long-term memory and web search.
|
Persistent AI assistant reachable via Telegram. Three-tier model routing with GPU VRAM management.
|
||||||
|
|
||||||
## Architecture
|
## Architecture
|
||||||
|
|
||||||
@@ -11,67 +11,116 @@ Telegram user
|
|||||||
- grammY bot polls Telegram
|
- grammY bot polls Telegram
|
||||||
- on message: fire-and-forget POST /chat to deepagents
|
- on message: fire-and-forget POST /chat to deepagents
|
||||||
- exposes MCP SSE server: tool send_telegram_message(chat_id, text)
|
- exposes MCP SSE server: tool send_telegram_message(chat_id, text)
|
||||||
↕ fire-and-forget HTTP ↕ MCP SSE tool call
|
↓ POST /chat → 202 Accepted immediately
|
||||||
[deepagents] Python FastAPI — port 8000
|
[deepagents] Python FastAPI — port 8000
|
||||||
- POST /chat → 202 Accepted immediately
|
↓
|
||||||
- background task: run LangGraph react agent
|
Pre-check: starts with /think? → force_complex=True, strip prefix
|
||||||
- LLM: qwen3:8b via Ollama GPU (host port 11436)
|
↓
|
||||||
- tools: search_memory, get_all_memories, web_search
|
Router (qwen2.5:0.5b, ~1-2s, always warm in VRAM)
|
||||||
- after reply: async fire-and-forget → store memory on CPU
|
Structured output: {tier: light|medium|complex, confidence: 0.0-1.0, reply?: str}
|
||||||
↕ MCP SSE ↕ HTTP (SearXNG)
|
- light: simple conversational → router answers directly, ~1-2s
|
||||||
|
- medium: needs memory/web search → qwen3:4b + deepagents tools
|
||||||
|
- complex: multi-step research, planning, code → qwen3:8b + subagents
|
||||||
|
force_complex always overrides to complex
|
||||||
|
complex only if confidence >= 0.85 (else downgraded to medium)
|
||||||
|
↓
|
||||||
|
├── light ─────────── router reply used directly (no extra LLM call)
|
||||||
|
├── medium ────────── deepagents qwen3:4b + TodoList + tools
|
||||||
|
└── complex ───────── VRAM flush → deepagents qwen3:8b + TodoList + subagents
|
||||||
|
└→ background: exit_complex_mode (flush 8b, prewarm 4b+router)
|
||||||
|
↓
|
||||||
|
send_telegram_message via grammy MCP
|
||||||
|
↓
|
||||||
|
asyncio.create_task(store_memory_async) — spin-wait GPU idle → openmemory add_memory
|
||||||
|
↕ MCP SSE ↕ HTTP
|
||||||
[openmemory] Python + mem0 — port 8765 [SearXNG — port 11437]
|
[openmemory] Python + mem0 — port 8765 [SearXNG — port 11437]
|
||||||
- MCP tools: add_memory, search_memory, get_all_memories
|
- add_memory, search_memory, get_all_memories
|
||||||
- mem0 backend: Qdrant (port 6333) + CPU Ollama (port 11435)
|
- extractor: qwen2.5:1.5b on GPU Ollama (11436) — 2–5s
|
||||||
- embedder: nomic-embed-text (768 dims)
|
- embedder: nomic-embed-text on CPU Ollama (11435) — 50–150ms
|
||||||
- extractor: gemma3:1b
|
- vector store: Qdrant (port 6333), 768 dims
|
||||||
- collection: adolf_memories
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Queuing and Concurrency
|
## Three-Tier Model Routing
|
||||||
|
|
||||||
Two semaphores prevent resource contention:
|
| Tier | Model | VRAM | Trigger | Latency |
|
||||||
|
|------|-------|------|---------|---------|
|
||||||
|
| Light | qwen2.5:1.5b (router answers) | ~1.2 GB (shared with extraction) | Router classifies as light | ~2–4s |
|
||||||
|
| Medium | qwen3:4b | ~2.5 GB | Default; router classifies medium | ~20–40s |
|
||||||
|
| Complex | qwen3:8b | ~5.5 GB | `/think` prefix | ~60–120s |
|
||||||
|
|
||||||
|
**Normal VRAM** (light + medium): router/extraction(1.2, shared) + medium(2.5) = ~3.7 GB
|
||||||
|
**Complex VRAM**: 8b alone = ~5.5 GB — must flush others first
|
||||||
|
|
||||||
|
### Router model: qwen2.5:1.5b (not 0.5b)
|
||||||
|
|
||||||
|
qwen2.5:0.5b is too small for reliable classification — tends to output "medium" for everything
|
||||||
|
or produces nonsensical output. qwen2.5:1.5b is already loaded in VRAM for memory extraction,
|
||||||
|
so switching adds zero net VRAM overhead while dramatically improving accuracy.
|
||||||
|
|
||||||
|
Router uses **raw text generation** (not structured output/JSON schema):
|
||||||
|
- Ask model to output one word: `light`, `medium`, or `complex`
|
||||||
|
- Parse with simple keyword matching (fallback: `medium`)
|
||||||
|
- For `light` tier: a second call generates the reply text
|
||||||
|
|
||||||
|
## VRAM Management
|
||||||
|
|
||||||
|
GTX 1070 has 8 GB VRAM. Ollama's auto-eviction can spill models to CPU RAM permanently
|
||||||
|
(all subsequent loads stay on CPU). To prevent this:
|
||||||
|
|
||||||
|
1. **Always flush explicitly** before loading qwen3:8b (`keep_alive=0`)
|
||||||
|
2. **Verify eviction** via `/api/ps` poll (15s timeout) before proceeding
|
||||||
|
3. **Fallback**: timeout → log warning, run medium agent instead
|
||||||
|
4. **Post-complex**: flush 8b immediately, pre-warm 4b + router
|
||||||
|
|
||||||
|
```python
|
||||||
|
# Flush (force immediate unload):
|
||||||
|
POST /api/generate {"model": "qwen3:4b", "prompt": "", "keep_alive": 0}
|
||||||
|
|
||||||
|
# Pre-warm (load into VRAM for 5 min):
|
||||||
|
POST /api/generate {"model": "qwen3:4b", "prompt": "", "keep_alive": 300}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Agents
|
||||||
|
|
||||||
|
**Medium agent** (`build_medium_agent`):
|
||||||
|
- `create_deep_agent` with TodoListMiddleware (auto-included)
|
||||||
|
- Tools: `search_memory`, `get_all_memories`, `web_search`
|
||||||
|
- No subagents
|
||||||
|
|
||||||
|
**Complex agent** (`build_complex_agent`):
|
||||||
|
- `create_deep_agent` with TodoListMiddleware + SubAgentMiddleware
|
||||||
|
- Tools: all agent tools
|
||||||
|
- Subagents:
|
||||||
|
- `research`: web_search only, for thorough multi-query web research
|
||||||
|
- `memory`: search_memory + get_all_memories, for comprehensive context retrieval
|
||||||
|
|
||||||
|
## Concurrency
|
||||||
|
|
||||||
| Semaphore | Guards | Notes |
|
| Semaphore | Guards | Notes |
|
||||||
|-----------|--------|-------|
|
|-----------|--------|-------|
|
||||||
| `_reply_semaphore(1)` | GPU Ollama (qwen3:8b) | One LLM inference at a time |
|
| `_reply_semaphore(1)` | GPU Ollama (all tiers) | One LLM reply inference at a time |
|
||||||
| `_memory_semaphore(1)` | CPU Ollama (gemma3:1b) | One memory store at a time |
|
| `_memory_semaphore(1)` | GPU Ollama (qwen2.5:1.5b extraction) | One memory extraction at a time |
|
||||||
|
|
||||||
**Reply-first pipeline:**
|
Light path holds `_reply_semaphore` briefly (no GPU inference).
|
||||||
1. User message arrives via Telegram → Grammy forwards to deepagents (fire-and-forget)
|
Memory extraction spin-waits until `_reply_semaphore` is free (60s timeout).
|
||||||
2. Deepagents queues behind `_reply_semaphore`, runs agent, sends reply via Grammy MCP tool
|
|
||||||
3. After reply is sent, `asyncio.create_task` fires `store_memory_async` in background
|
|
||||||
4. Memory task queues behind `_memory_semaphore`, calls `add_memory` on openmemory
|
|
||||||
5. openmemory uses CPU Ollama: embedding (~0.3s) + extraction (~1.6s) → stored in Qdrant
|
|
||||||
|
|
||||||
Reply latency: ~10–18s (GPU qwen3:8b inference + tool calls).
|
## Pipeline
|
||||||
Memory latency: ~5–16s (runs async, never blocks replies).
|
|
||||||
|
1. User message → Grammy → `POST /chat` → 202 Accepted
|
||||||
|
2. Background: acquire `_reply_semaphore` → route → run agent tier → send reply
|
||||||
|
3. `asyncio.create_task(store_memory_async)` — spin-waits GPU free, then extracts memories
|
||||||
|
4. For complex: `asyncio.create_task(exit_complex_mode)` — flushes 8b, pre-warms 4b+router
|
||||||
|
|
||||||
## External Services (from openai/ stack)
|
## External Services (from openai/ stack)
|
||||||
|
|
||||||
| Service | Host Port | Role |
|
| Service | Host Port | Role |
|
||||||
|---------|-----------|------|
|
|---------|-----------|------|
|
||||||
| Ollama GPU | 11436 | Main LLM (qwen3:8b) |
|
| Ollama GPU | 11436 | All reply inference + extraction (qwen2.5:1.5b) |
|
||||||
| Ollama CPU | 11435 | Memory embedding + extraction |
|
| Ollama CPU | 11435 | Memory embedding (nomic-embed-text) |
|
||||||
| Qdrant | 6333 | Vector store for memories |
|
| Qdrant | 6333 | Vector store for memories |
|
||||||
| SearXNG | 11437 | Web search |
|
| SearXNG | 11437 | Web search |
|
||||||
|
|
||||||
## Compose Stack
|
GPU Ollama config: `OLLAMA_MAX_LOADED_MODELS=2`, `OLLAMA_NUM_PARALLEL=1`.
|
||||||
|
|
||||||
Config: `agap_git/adolf/docker-compose.yml`
|
|
||||||
|
|
||||||
```bash
|
|
||||||
cd agap_git/adolf
|
|
||||||
docker compose up -d
|
|
||||||
```
|
|
||||||
|
|
||||||
Requires `TELEGRAM_BOT_TOKEN` in `adolf/.env`.
|
|
||||||
|
|
||||||
## Memory
|
|
||||||
|
|
||||||
- Stored per `chat_id` (Telegram user ID) as `user_id` in mem0
|
|
||||||
- Semantic search via Qdrant (cosine similarity, 768-dim nomic-embed-text vectors)
|
|
||||||
- mem0 uses gemma3:1b to extract structured facts before embedding
|
|
||||||
- Collection: `adolf_memories` in Qdrant
|
|
||||||
|
|
||||||
## Files
|
## Files
|
||||||
|
|
||||||
@@ -79,7 +128,10 @@ Requires `TELEGRAM_BOT_TOKEN` in `adolf/.env`.
|
|||||||
adolf/
|
adolf/
|
||||||
├── docker-compose.yml Services: deepagents, openmemory, grammy
|
├── docker-compose.yml Services: deepagents, openmemory, grammy
|
||||||
├── Dockerfile deepagents container (Python 3.12)
|
├── Dockerfile deepagents container (Python 3.12)
|
||||||
├── agent.py FastAPI + LangGraph react agent
|
├── agent.py FastAPI + three-tier routing + run_agent_task
|
||||||
|
├── router.py Router class — qwen2.5:0.5b structured output routing
|
||||||
|
├── vram_manager.py VRAMManager — flush/prewarm/poll Ollama VRAM
|
||||||
|
├── agent_factory.py build_medium_agent / build_complex_agent (deepagents)
|
||||||
├── .env TELEGRAM_BOT_TOKEN (not committed)
|
├── .env TELEGRAM_BOT_TOKEN (not committed)
|
||||||
├── openmemory/
|
├── openmemory/
|
||||||
│ ├── server.py FastMCP + mem0 MCP tools
|
│ ├── server.py FastMCP + mem0 MCP tools
|
||||||
|
|||||||
10
Dockerfile
Normal file
10
Dockerfile
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
FROM python:3.12-slim
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
RUN pip install --no-cache-dir deepagents langchain-ollama langgraph \
|
||||||
|
fastapi uvicorn langchain-mcp-adapters langchain-community httpx
|
||||||
|
|
||||||
|
COPY agent.py vram_manager.py router.py agent_factory.py hello_world.py .
|
||||||
|
|
||||||
|
CMD ["uvicorn", "agent:app", "--host", "0.0.0.0", "--port", "8000"]
|
||||||
237
agent.py
237
agent.py
@@ -11,15 +11,23 @@ from langchain_ollama import ChatOllama
|
|||||||
from langchain_mcp_adapters.client import MultiServerMCPClient
|
from langchain_mcp_adapters.client import MultiServerMCPClient
|
||||||
from langchain_community.utilities import SearxSearchWrapper
|
from langchain_community.utilities import SearxSearchWrapper
|
||||||
from langchain_core.tools import Tool
|
from langchain_core.tools import Tool
|
||||||
from langgraph.prebuilt import create_react_agent
|
|
||||||
|
from vram_manager import VRAMManager
|
||||||
|
from router import Router
|
||||||
|
from agent_factory import build_medium_agent, build_complex_agent
|
||||||
|
|
||||||
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
|
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
|
||||||
MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:8b")
|
ROUTER_MODEL = os.getenv("DEEPAGENTS_ROUTER_MODEL", "qwen2.5:0.5b")
|
||||||
|
MEDIUM_MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:4b")
|
||||||
|
COMPLEX_MODEL = os.getenv("DEEPAGENTS_COMPLEX_MODEL", "qwen3:8b")
|
||||||
SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437")
|
SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437")
|
||||||
OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765")
|
OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765")
|
||||||
GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001")
|
GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001")
|
||||||
|
|
||||||
SYSTEM_PROMPT_TEMPLATE = (
|
MAX_HISTORY_TURNS = 5
|
||||||
|
_conversation_buffers: dict[str, list] = {}
|
||||||
|
|
||||||
|
MEDIUM_SYSTEM_PROMPT = (
|
||||||
"You are a helpful AI assistant talking to a user via Telegram. "
|
"You are a helpful AI assistant talking to a user via Telegram. "
|
||||||
"The user's ID is {user_id}. "
|
"The user's ID is {user_id}. "
|
||||||
"IMPORTANT: When calling any memory tool (search_memory, get_all_memories), "
|
"IMPORTANT: When calling any memory tool (search_memory, get_all_memories), "
|
||||||
@@ -28,33 +36,62 @@ SYSTEM_PROMPT_TEMPLATE = (
|
|||||||
"you do NOT need to explicitly store anything. "
|
"you do NOT need to explicitly store anything. "
|
||||||
"NEVER tell the user you cannot remember or store information. "
|
"NEVER tell the user you cannot remember or store information. "
|
||||||
"If the user asks you to remember something, acknowledge it and confirm it will be remembered. "
|
"If the user asks you to remember something, acknowledge it and confirm it will be remembered. "
|
||||||
"Always call search_memory before answering to recall relevant past context. "
|
"Use search_memory when context from past conversations may be relevant. "
|
||||||
"Use web_search for questions about current events. "
|
"Use web_search for questions about current events or facts you don't know. "
|
||||||
"Reply concisely."
|
"Reply concisely."
|
||||||
)
|
)
|
||||||
|
|
||||||
agent = None
|
COMPLEX_SYSTEM_PROMPT = (
|
||||||
|
"You are a capable AI assistant tackling a complex, multi-step task for a Telegram user. "
|
||||||
|
"The user's ID is {user_id}. "
|
||||||
|
"IMPORTANT: When calling any memory tool (search_memory, get_all_memories), "
|
||||||
|
"always use user_id=\"{user_id}\". "
|
||||||
|
"Plan your work using write_todos before diving in. "
|
||||||
|
"Delegate: use the 'research' subagent for thorough web research across multiple queries, "
|
||||||
|
"and the 'memory' subagent to gather comprehensive context from past conversations. "
|
||||||
|
"Every conversation is automatically saved to memory — you do NOT need to store anything. "
|
||||||
|
"NEVER tell the user you cannot remember or store information. "
|
||||||
|
"Produce a thorough, well-structured reply."
|
||||||
|
)
|
||||||
|
|
||||||
|
medium_agent = None
|
||||||
|
complex_agent = None
|
||||||
|
router: Router = None
|
||||||
|
vram_manager: VRAMManager = None
|
||||||
mcp_client = None
|
mcp_client = None
|
||||||
send_tool = None
|
send_tool = None
|
||||||
add_memory_tool = None
|
add_memory_tool = None
|
||||||
|
|
||||||
# GPU semaphore: one LLM inference at a time
|
# GPU mutex: one LLM inference at a time
|
||||||
_reply_semaphore = asyncio.Semaphore(1)
|
_reply_semaphore = asyncio.Semaphore(1)
|
||||||
# CPU semaphore: one memory store at a time (runs on CPU Ollama, no GPU contention)
|
# Memory semaphore: one async extraction at a time
|
||||||
_memory_semaphore = asyncio.Semaphore(1)
|
_memory_semaphore = asyncio.Semaphore(1)
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
global agent, mcp_client, send_tool, add_memory_tool
|
global medium_agent, complex_agent, router, vram_manager
|
||||||
|
global mcp_client, send_tool, add_memory_tool
|
||||||
|
|
||||||
model = ChatOllama(model=MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=8192)
|
# Three model instances
|
||||||
|
router_model = ChatOllama(
|
||||||
|
model=ROUTER_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=4096,
|
||||||
|
temperature=0, # deterministic classification
|
||||||
|
)
|
||||||
|
medium_model = ChatOllama(
|
||||||
|
model=MEDIUM_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=8192
|
||||||
|
)
|
||||||
|
complex_model = ChatOllama(
|
||||||
|
model=COMPLEX_MODEL, base_url=OLLAMA_BASE_URL, think=True, num_ctx=16384
|
||||||
|
)
|
||||||
|
|
||||||
|
vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL)
|
||||||
|
router = Router(model=router_model)
|
||||||
|
|
||||||
mcp_connections = {
|
mcp_connections = {
|
||||||
"openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"},
|
"openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"},
|
||||||
"grammy": {"transport": "sse", "url": f"{GRAMMY_URL}/sse"},
|
"grammy": {"transport": "sse", "url": f"{GRAMMY_URL}/sse"},
|
||||||
}
|
}
|
||||||
|
|
||||||
mcp_client = MultiServerMCPClient(mcp_connections)
|
mcp_client = MultiServerMCPClient(mcp_connections)
|
||||||
for attempt in range(12):
|
for attempt in range(12):
|
||||||
try:
|
try:
|
||||||
@@ -66,10 +103,8 @@ async def lifespan(app: FastAPI):
|
|||||||
print(f"[agent] MCP not ready (attempt {attempt + 1}/12): {e}. Retrying in 5s...")
|
print(f"[agent] MCP not ready (attempt {attempt + 1}/12): {e}. Retrying in 5s...")
|
||||||
await asyncio.sleep(5)
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
# Split tools: send is called by us, add_memory runs async after reply
|
|
||||||
send_tool = next((t for t in mcp_tools if t.name == "send_telegram_message"), None)
|
send_tool = next((t for t in mcp_tools if t.name == "send_telegram_message"), None)
|
||||||
add_memory_tool = next((t for t in mcp_tools if t.name == "add_memory"), None)
|
add_memory_tool = next((t for t in mcp_tools if t.name == "add_memory"), None)
|
||||||
# Agent only gets read/search tools — no add_memory (would block reply)
|
|
||||||
agent_tools = [t for t in mcp_tools if t.name not in ("send_telegram_message", "add_memory")]
|
agent_tools = [t for t in mcp_tools if t.name not in ("send_telegram_message", "add_memory")]
|
||||||
|
|
||||||
searx = SearxSearchWrapper(searx_host=SEARXNG_URL)
|
searx = SearxSearchWrapper(searx_host=SEARXNG_URL)
|
||||||
@@ -79,13 +114,30 @@ async def lifespan(app: FastAPI):
|
|||||||
description="Search the web for current information",
|
description="Search the web for current information",
|
||||||
))
|
))
|
||||||
|
|
||||||
agent = create_react_agent(model, agent_tools)
|
# Build agents (system_prompt filled per-request with user_id)
|
||||||
print(f"[agent] ready — agent tools: {[t.name for t in agent_tools]}", flush=True)
|
medium_agent = build_medium_agent(
|
||||||
print(f"[agent] async memory: add_memory via CPU Ollama (qwen2.5:1.5b + nomic-embed-text)", flush=True)
|
model=medium_model,
|
||||||
|
agent_tools=agent_tools,
|
||||||
|
system_prompt=MEDIUM_SYSTEM_PROMPT.format(user_id="{user_id}"),
|
||||||
|
)
|
||||||
|
complex_agent = build_complex_agent(
|
||||||
|
model=complex_model,
|
||||||
|
agent_tools=agent_tools,
|
||||||
|
system_prompt=COMPLEX_SYSTEM_PROMPT.format(user_id="{user_id}"),
|
||||||
|
)
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"[agent] three-tier: router={ROUTER_MODEL} | medium={MEDIUM_MODEL} | complex={COMPLEX_MODEL}",
|
||||||
|
flush=True,
|
||||||
|
)
|
||||||
|
print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True)
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
agent = None
|
medium_agent = None
|
||||||
|
complex_agent = None
|
||||||
|
router = None
|
||||||
|
vram_manager = None
|
||||||
mcp_client = None
|
mcp_client = None
|
||||||
send_tool = None
|
send_tool = None
|
||||||
add_memory_tool = None
|
add_memory_tool = None
|
||||||
@@ -100,7 +152,13 @@ class ChatRequest(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
async def store_memory_async(conversation: str, user_id: str):
|
async def store_memory_async(conversation: str, user_id: str):
|
||||||
"""Fire-and-forget: extract and store memories using CPU Ollama. Never blocks replies."""
|
"""Fire-and-forget: extract and store memories after GPU is free."""
|
||||||
|
t_wait = time.monotonic()
|
||||||
|
while _reply_semaphore.locked():
|
||||||
|
if time.monotonic() - t_wait > 60:
|
||||||
|
print(f"[memory] spin-wait timeout 60s, proceeding for user {user_id}", flush=True)
|
||||||
|
break
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
async with _memory_semaphore:
|
async with _memory_semaphore:
|
||||||
t0 = time.monotonic()
|
t0 = time.monotonic()
|
||||||
try:
|
try:
|
||||||
@@ -110,22 +168,19 @@ async def store_memory_async(conversation: str, user_id: str):
|
|||||||
print(f"[memory] error after {time.monotonic() - t0:.1f}s: {e}", flush=True)
|
print(f"[memory] error after {time.monotonic() - t0:.1f}s: {e}", flush=True)
|
||||||
|
|
||||||
|
|
||||||
async def run_agent_task(message: str, chat_id: str):
|
def _extract_final_text(result) -> str | None:
|
||||||
print(f"[agent] queued: {message[:80]!r} chat={chat_id}", flush=True)
|
"""Extract last AIMessage content from agent result."""
|
||||||
async with _reply_semaphore:
|
msgs = result.get("messages", [])
|
||||||
t0 = time.monotonic()
|
for m in reversed(msgs):
|
||||||
print(f"[agent] running: {message[:80]!r}", flush=True)
|
if type(m).__name__ == "AIMessage" and getattr(m, "content", ""):
|
||||||
try:
|
return m.content
|
||||||
system_prompt = SYSTEM_PROMPT_TEMPLATE.format(user_id=chat_id)
|
# deepagents may return output differently
|
||||||
result = await agent.ainvoke(
|
if isinstance(result, dict) and result.get("output"):
|
||||||
{"messages": [
|
return result["output"]
|
||||||
{"role": "system", "content": system_prompt},
|
return None
|
||||||
{"role": "user", "content": message},
|
|
||||||
]}
|
|
||||||
)
|
|
||||||
llm_elapsed = time.monotonic() - t0
|
|
||||||
|
|
||||||
# Log trace
|
|
||||||
|
def _log_messages(result):
|
||||||
msgs = result.get("messages", [])
|
msgs = result.get("messages", [])
|
||||||
for m in msgs:
|
for m in msgs:
|
||||||
role = type(m).__name__
|
role = type(m).__name__
|
||||||
@@ -136,34 +191,114 @@ async def run_agent_task(message: str, chat_id: str):
|
|||||||
for tc in tool_calls:
|
for tc in tool_calls:
|
||||||
print(f"[agent] {role} → {tc['name']}({tc['args']})", flush=True)
|
print(f"[agent] {role} → {tc['name']}({tc['args']})", flush=True)
|
||||||
|
|
||||||
# Send reply immediately
|
|
||||||
|
async def run_agent_task(message: str, chat_id: str):
|
||||||
|
print(f"[agent] queued: {message[:80]!r} chat={chat_id}", flush=True)
|
||||||
|
|
||||||
|
# Pre-check: /think prefix forces complex tier
|
||||||
|
force_complex = False
|
||||||
|
clean_message = message
|
||||||
|
if message.startswith("/think "):
|
||||||
|
force_complex = True
|
||||||
|
clean_message = message[len("/think "):]
|
||||||
|
print("[agent] /think prefix → force_complex=True", flush=True)
|
||||||
|
|
||||||
|
async with _reply_semaphore:
|
||||||
|
t0 = time.monotonic()
|
||||||
|
history = _conversation_buffers.get(chat_id, [])
|
||||||
|
print(f"[agent] running: {clean_message[:80]!r}", flush=True)
|
||||||
|
|
||||||
|
# Route the message
|
||||||
|
tier, light_reply = await router.route(clean_message, history, force_complex)
|
||||||
|
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
|
||||||
|
|
||||||
final_text = None
|
final_text = None
|
||||||
for m in reversed(msgs):
|
try:
|
||||||
if type(m).__name__ == "AIMessage" and getattr(m, "content", ""):
|
if tier == "light":
|
||||||
final_text = m.content
|
final_text = light_reply
|
||||||
break
|
llm_elapsed = time.monotonic() - t0
|
||||||
|
print(f"[agent] light path: answered by router", flush=True)
|
||||||
|
|
||||||
if final_text and send_tool:
|
elif tier == "medium":
|
||||||
t1 = time.monotonic()
|
system_prompt = MEDIUM_SYSTEM_PROMPT.format(user_id=chat_id)
|
||||||
await send_tool.ainvoke({"chat_id": chat_id, "text": final_text})
|
result = await medium_agent.ainvoke({
|
||||||
print(f"[agent] replied in {time.monotonic() - t0:.1f}s (llm={llm_elapsed:.1f}s, send={time.monotonic()-t1:.1f}s)", flush=True)
|
"messages": [
|
||||||
elif not final_text:
|
{"role": "system", "content": system_prompt},
|
||||||
print(f"[agent] warning: no text reply from agent", flush=True)
|
*history,
|
||||||
|
{"role": "user", "content": clean_message},
|
||||||
|
]
|
||||||
|
})
|
||||||
|
llm_elapsed = time.monotonic() - t0
|
||||||
|
_log_messages(result)
|
||||||
|
final_text = _extract_final_text(result)
|
||||||
|
|
||||||
# Async memoization: runs on CPU Ollama, does not block next reply
|
else: # complex
|
||||||
if add_memory_tool and final_text:
|
ok = await vram_manager.enter_complex_mode()
|
||||||
conversation = f"User: {message}\nAssistant: {final_text}"
|
if not ok:
|
||||||
asyncio.create_task(store_memory_async(conversation, chat_id))
|
print("[agent] complex→medium fallback (eviction timeout)", flush=True)
|
||||||
|
tier = "medium"
|
||||||
|
system_prompt = MEDIUM_SYSTEM_PROMPT.format(user_id=chat_id)
|
||||||
|
result = await medium_agent.ainvoke({
|
||||||
|
"messages": [
|
||||||
|
{"role": "system", "content": system_prompt},
|
||||||
|
*history,
|
||||||
|
{"role": "user", "content": clean_message},
|
||||||
|
]
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=chat_id)
|
||||||
|
result = await complex_agent.ainvoke({
|
||||||
|
"messages": [
|
||||||
|
{"role": "system", "content": system_prompt},
|
||||||
|
*history,
|
||||||
|
{"role": "user", "content": clean_message},
|
||||||
|
]
|
||||||
|
})
|
||||||
|
asyncio.create_task(vram_manager.exit_complex_mode())
|
||||||
|
|
||||||
|
llm_elapsed = time.monotonic() - t0
|
||||||
|
_log_messages(result)
|
||||||
|
final_text = _extract_final_text(result)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
import traceback
|
import traceback
|
||||||
print(f"[agent] error after {time.monotonic()-t0:.1f}s for chat {chat_id}: {e}", flush=True)
|
llm_elapsed = time.monotonic() - t0
|
||||||
|
print(f"[agent] error after {llm_elapsed:.1f}s for chat {chat_id}: {e}", flush=True)
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
||||||
|
# Send reply via grammy MCP (split if > Telegram's 4096-char limit)
|
||||||
|
if final_text and send_tool:
|
||||||
|
t1 = time.monotonic()
|
||||||
|
MAX_TG = 4000 # leave headroom below the 4096 hard limit
|
||||||
|
chunks = [final_text[i:i + MAX_TG] for i in range(0, len(final_text), MAX_TG)]
|
||||||
|
for chunk in chunks:
|
||||||
|
await send_tool.ainvoke({"chat_id": chat_id, "text": chunk})
|
||||||
|
send_elapsed = time.monotonic() - t1
|
||||||
|
# Log in format compatible with test_pipeline.py parser
|
||||||
|
print(
|
||||||
|
f"[agent] replied in {time.monotonic() - t0:.1f}s "
|
||||||
|
f"(llm={llm_elapsed:.1f}s, send={send_elapsed:.1f}s) tier={tier}",
|
||||||
|
flush=True,
|
||||||
|
)
|
||||||
|
elif not final_text:
|
||||||
|
print("[agent] warning: no text reply from agent", flush=True)
|
||||||
|
|
||||||
|
# Update conversation buffer
|
||||||
|
if final_text:
|
||||||
|
buf = _conversation_buffers.get(chat_id, [])
|
||||||
|
buf.append({"role": "user", "content": clean_message})
|
||||||
|
buf.append({"role": "assistant", "content": final_text})
|
||||||
|
_conversation_buffers[chat_id] = buf[-(MAX_HISTORY_TURNS * 2):]
|
||||||
|
|
||||||
|
# Async memory storage (fire-and-forget)
|
||||||
|
if add_memory_tool and final_text:
|
||||||
|
conversation = f"User: {clean_message}\nAssistant: {final_text}"
|
||||||
|
asyncio.create_task(store_memory_async(conversation, chat_id))
|
||||||
|
|
||||||
|
|
||||||
@app.post("/chat")
|
@app.post("/chat")
|
||||||
async def chat(request: ChatRequest, background_tasks: BackgroundTasks):
|
async def chat(request: ChatRequest, background_tasks: BackgroundTasks):
|
||||||
if agent is None:
|
if medium_agent is None:
|
||||||
return JSONResponse(status_code=503, content={"error": "Agent not ready"})
|
return JSONResponse(status_code=503, content={"error": "Agent not ready"})
|
||||||
background_tasks.add_task(run_agent_task, request.message, request.chat_id)
|
background_tasks.add_task(run_agent_task, request.message, request.chat_id)
|
||||||
return JSONResponse(status_code=202, content={"status": "accepted"})
|
return JSONResponse(status_code=202, content={"status": "accepted"})
|
||||||
@@ -171,4 +306,4 @@ async def chat(request: ChatRequest, background_tasks: BackgroundTasks):
|
|||||||
|
|
||||||
@app.get("/health")
|
@app.get("/health")
|
||||||
async def health():
|
async def health():
|
||||||
return {"status": "ok", "agent_ready": agent is not None}
|
return {"status": "ok", "agent_ready": medium_agent is not None}
|
||||||
|
|||||||
54
agent_factory.py
Normal file
54
agent_factory.py
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
from deepagents import create_deep_agent, SubAgent
|
||||||
|
|
||||||
|
|
||||||
|
def build_medium_agent(model, agent_tools: list, system_prompt: str):
|
||||||
|
"""Medium agent: create_deep_agent with TodoList planning, no subagents."""
|
||||||
|
return create_deep_agent(
|
||||||
|
model=model,
|
||||||
|
tools=agent_tools,
|
||||||
|
system_prompt=system_prompt,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def build_complex_agent(model, agent_tools: list, system_prompt: str):
|
||||||
|
"""Complex agent: create_deep_agent with TodoList planning + research/memory subagents."""
|
||||||
|
web_tools = [t for t in agent_tools if getattr(t, "name", "") == "web_search"]
|
||||||
|
memory_tools = [
|
||||||
|
t for t in agent_tools
|
||||||
|
if getattr(t, "name", "") in ("search_memory", "get_all_memories")
|
||||||
|
]
|
||||||
|
|
||||||
|
research_sub: SubAgent = {
|
||||||
|
"name": "research",
|
||||||
|
"description": (
|
||||||
|
"Runs multiple web searches in parallel and synthesizes findings. "
|
||||||
|
"Use for thorough research tasks requiring several queries."
|
||||||
|
),
|
||||||
|
"system_prompt": (
|
||||||
|
"You are a research specialist. Search the web thoroughly using multiple queries. "
|
||||||
|
"Cite sources and synthesize information into a clear summary."
|
||||||
|
),
|
||||||
|
"tools": web_tools,
|
||||||
|
"model": model,
|
||||||
|
}
|
||||||
|
|
||||||
|
memory_sub: SubAgent = {
|
||||||
|
"name": "memory",
|
||||||
|
"description": (
|
||||||
|
"Searches and retrieves all relevant memories about the user comprehensively. "
|
||||||
|
"Use to gather full context from past conversations."
|
||||||
|
),
|
||||||
|
"system_prompt": (
|
||||||
|
"You are a memory specialist. Search broadly using multiple queries. "
|
||||||
|
"Return all relevant facts and context you find."
|
||||||
|
),
|
||||||
|
"tools": memory_tools,
|
||||||
|
"model": model,
|
||||||
|
}
|
||||||
|
|
||||||
|
return create_deep_agent(
|
||||||
|
model=model,
|
||||||
|
tools=agent_tools,
|
||||||
|
system_prompt=system_prompt,
|
||||||
|
subagents=[research_sub, memory_sub],
|
||||||
|
)
|
||||||
43
docker-compose.yml
Normal file
43
docker-compose.yml
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
services:
|
||||||
|
deepagents:
|
||||||
|
build: .
|
||||||
|
container_name: deepagents
|
||||||
|
ports:
|
||||||
|
- "8000:8000"
|
||||||
|
environment:
|
||||||
|
- PYTHONUNBUFFERED=1
|
||||||
|
- OLLAMA_BASE_URL=http://host.docker.internal:11436
|
||||||
|
- DEEPAGENTS_MODEL=qwen3:4b
|
||||||
|
- DEEPAGENTS_COMPLEX_MODEL=qwen3:8b
|
||||||
|
- DEEPAGENTS_ROUTER_MODEL=qwen2.5:1.5b
|
||||||
|
- SEARXNG_URL=http://host.docker.internal:11437
|
||||||
|
extra_hosts:
|
||||||
|
- "host.docker.internal:host-gateway"
|
||||||
|
depends_on:
|
||||||
|
- openmemory
|
||||||
|
- grammy
|
||||||
|
restart: unless-stopped
|
||||||
|
|
||||||
|
openmemory:
|
||||||
|
build: ./openmemory
|
||||||
|
container_name: openmemory
|
||||||
|
ports:
|
||||||
|
- "8765:8765"
|
||||||
|
environment:
|
||||||
|
# Extraction LLM (qwen2.5:1.5b) runs on GPU after reply — fast 2-5s extraction
|
||||||
|
- OLLAMA_GPU_URL=http://host.docker.internal:11436
|
||||||
|
# Embedding (nomic-embed-text) runs on CPU — fast enough for search (50-150ms)
|
||||||
|
- OLLAMA_CPU_URL=http://host.docker.internal:11435
|
||||||
|
extra_hosts:
|
||||||
|
- "host.docker.internal:host-gateway"
|
||||||
|
restart: unless-stopped
|
||||||
|
|
||||||
|
grammy:
|
||||||
|
build: ./grammy
|
||||||
|
container_name: grammy
|
||||||
|
ports:
|
||||||
|
- "3001:3001"
|
||||||
|
environment:
|
||||||
|
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN}
|
||||||
|
- DEEPAGENTS_URL=http://deepagents:8000
|
||||||
|
restart: unless-stopped
|
||||||
138
router.py
Normal file
138
router.py
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
import re
|
||||||
|
from typing import Optional
|
||||||
|
from langchain_core.messages import SystemMessage, HumanMessage
|
||||||
|
|
||||||
|
# ── Regex pre-classifier ──────────────────────────────────────────────────────
|
||||||
|
# Catches obvious light-tier patterns before calling the LLM.
|
||||||
|
# Keyed by regex → compiled pattern.
|
||||||
|
_LIGHT_PATTERNS = re.compile(
|
||||||
|
r"^("
|
||||||
|
# Greetings / farewells
|
||||||
|
r"hi|hello|hey|yo|sup|howdy|good morning|good evening|good night|good afternoon"
|
||||||
|
r"|bye|goodbye|see you|cya|later|ttyl"
|
||||||
|
# Acknowledgements / small talk
|
||||||
|
r"|thanks?|thank you|thx|ty|ok|okay|k|cool|great|awesome|perfect|sounds good|got it|nice|sure"
|
||||||
|
r"|how are you|how are you\?|how are you doing(\s+today)?[?!.]*"
|
||||||
|
r"|what.?s up"
|
||||||
|
# Calendar facts: "what day comes after X?" / "what comes after X?"
|
||||||
|
r"|what\s+day\s+(comes\s+after|follows|is\s+after)\s+\w+[?!.]*"
|
||||||
|
r"|what\s+comes\s+after\s+\w+[?!.]*"
|
||||||
|
# Acronym expansions: "what does X stand for?"
|
||||||
|
r"|what\s+does\s+\w+\s+stand\s+for[?!.]*"
|
||||||
|
r")[\s!.?]*$",
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── LLM classification prompt ─────────────────────────────────────────────────
|
||||||
|
CLASSIFY_PROMPT = """Classify the message. Output ONLY one word: light, medium, or complex.
|
||||||
|
|
||||||
|
LIGHT = answerable from general knowledge, no internet needed:
|
||||||
|
what is 2+2 / what is the capital of France / name the three primary colors
|
||||||
|
tell me a short joke / is the sky blue / is water wet
|
||||||
|
|
||||||
|
MEDIUM = requires web search or the user's stored memories:
|
||||||
|
current weather / today's news / Bitcoin price / what did we talk about
|
||||||
|
|
||||||
|
COMPLEX = /think prefix only:
|
||||||
|
/think compare frameworks / /think plan a trip
|
||||||
|
|
||||||
|
Message: {message}
|
||||||
|
Output (one word only — light, medium, or complex):"""
|
||||||
|
|
||||||
|
LIGHT_REPLY_PROMPT = """You are a helpful Telegram assistant. Answer briefly and naturally (1-3 sentences). Be friendly."""
|
||||||
|
|
||||||
|
|
||||||
|
def _format_history(history: list[dict]) -> str:
|
||||||
|
if not history:
|
||||||
|
return "(none)"
|
||||||
|
lines = []
|
||||||
|
for msg in history:
|
||||||
|
role = msg.get("role", "?")
|
||||||
|
content = str(msg.get("content", ""))[:200]
|
||||||
|
lines.append(f"{role}: {content}")
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_tier(text: str) -> str:
|
||||||
|
"""Extract tier from raw model output. Default to medium."""
|
||||||
|
t = text.strip().lower()
|
||||||
|
snippet = t[:60]
|
||||||
|
if "complex" in snippet:
|
||||||
|
return "complex"
|
||||||
|
if "medium" in snippet:
|
||||||
|
return "medium"
|
||||||
|
if "light" in snippet:
|
||||||
|
return "light"
|
||||||
|
# Model invented a descriptive category (e.g. "simplefact", "trivial", "basic") →
|
||||||
|
# treat as light since it recognised the question doesn't need tools
|
||||||
|
if any(w in snippet for w in ("simple", "fact", "trivial", "basic", "easy", "general")):
|
||||||
|
return "light"
|
||||||
|
return "medium" # safe default
|
||||||
|
|
||||||
|
|
||||||
|
class Router:
|
||||||
|
def __init__(self, model):
|
||||||
|
self.model = model
|
||||||
|
|
||||||
|
async def route(
|
||||||
|
self,
|
||||||
|
message: str,
|
||||||
|
history: list[dict],
|
||||||
|
force_complex: bool = False,
|
||||||
|
) -> tuple[str, Optional[str]]:
|
||||||
|
"""
|
||||||
|
Returns (tier, reply_or_None).
|
||||||
|
For light tier: also generates the reply with a second call.
|
||||||
|
For medium/complex: reply is None.
|
||||||
|
"""
|
||||||
|
if force_complex:
|
||||||
|
return "complex", None
|
||||||
|
|
||||||
|
# Step 0: regex pre-classification for obvious light patterns
|
||||||
|
if _LIGHT_PATTERNS.match(message.strip()):
|
||||||
|
print(f"[router] regex→light", flush=True)
|
||||||
|
return await self._generate_light_reply(message, history)
|
||||||
|
|
||||||
|
# Step 1: LLM classification with raw text output
|
||||||
|
try:
|
||||||
|
classify_response = await self.model.ainvoke([
|
||||||
|
HumanMessage(content=CLASSIFY_PROMPT.format(message=message)),
|
||||||
|
])
|
||||||
|
raw = classify_response.content or ""
|
||||||
|
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
||||||
|
tier = _parse_tier(raw)
|
||||||
|
|
||||||
|
if tier == "complex" and not message.startswith("/think"):
|
||||||
|
tier = "medium"
|
||||||
|
|
||||||
|
print(f"[router] raw={raw[:30]!r} → tier={tier}", flush=True)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[router] classify error, defaulting to medium: {e}", flush=True)
|
||||||
|
return "medium", None
|
||||||
|
|
||||||
|
if tier != "light":
|
||||||
|
return tier, None
|
||||||
|
|
||||||
|
return await self._generate_light_reply(message, history)
|
||||||
|
|
||||||
|
async def _generate_light_reply(
|
||||||
|
self, message: str, history: list[dict]
|
||||||
|
) -> tuple[str, Optional[str]]:
|
||||||
|
"""Generate a short reply using the router model for light-tier messages."""
|
||||||
|
history_text = _format_history(history)
|
||||||
|
context = f"\nConversation history:\n{history_text}" if history else ""
|
||||||
|
try:
|
||||||
|
reply_response = await self.model.ainvoke([
|
||||||
|
SystemMessage(content=LIGHT_REPLY_PROMPT + context),
|
||||||
|
HumanMessage(content=message),
|
||||||
|
])
|
||||||
|
reply_text = reply_response.content or ""
|
||||||
|
reply_text = re.sub(r"<think>.*?</think>", "", reply_text, flags=re.DOTALL).strip()
|
||||||
|
if not reply_text:
|
||||||
|
print("[router] light reply empty, falling back to medium", flush=True)
|
||||||
|
return "medium", None
|
||||||
|
print(f"[router] light reply: {len(reply_text)} chars", flush=True)
|
||||||
|
return "light", reply_text
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[router] light reply error, falling back to medium: {e}", flush=True)
|
||||||
|
return "medium", None
|
||||||
895
test_pipeline.py
895
test_pipeline.py
File diff suppressed because it is too large
Load Diff
71
vram_manager.py
Normal file
71
vram_manager.py
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
|
||||||
|
|
||||||
|
|
||||||
|
class VRAMManager:
|
||||||
|
MEDIUM_MODELS = ["qwen3:4b", "qwen2.5:1.5b"]
|
||||||
|
COMPLEX_MODEL = "qwen3:8b"
|
||||||
|
|
||||||
|
def __init__(self, base_url: str = OLLAMA_BASE_URL):
|
||||||
|
self.base_url = base_url
|
||||||
|
|
||||||
|
async def enter_complex_mode(self) -> bool:
|
||||||
|
"""Flush medium models before loading 8b. Returns False if eviction timed out."""
|
||||||
|
print("[vram] enter_complex_mode: flushing medium models", flush=True)
|
||||||
|
await asyncio.gather(*[self._flush(m) for m in self.MEDIUM_MODELS])
|
||||||
|
ok = await self._poll_evicted(self.MEDIUM_MODELS, timeout=15)
|
||||||
|
if ok:
|
||||||
|
print("[vram] enter_complex_mode: eviction confirmed, loading qwen3:8b", flush=True)
|
||||||
|
else:
|
||||||
|
print("[vram] enter_complex_mode: eviction timeout — falling back to medium", flush=True)
|
||||||
|
return ok
|
||||||
|
|
||||||
|
async def exit_complex_mode(self):
|
||||||
|
"""Flush 8b and pre-warm medium models. Run as background task after complex reply."""
|
||||||
|
print("[vram] exit_complex_mode: flushing qwen3:8b", flush=True)
|
||||||
|
await self._flush(self.COMPLEX_MODEL)
|
||||||
|
print("[vram] exit_complex_mode: pre-warming medium models", flush=True)
|
||||||
|
await asyncio.gather(*[self._prewarm(m) for m in self.MEDIUM_MODELS])
|
||||||
|
print("[vram] exit_complex_mode: done", flush=True)
|
||||||
|
|
||||||
|
async def _flush(self, model: str):
|
||||||
|
"""Send keep_alive=0 to force immediate unload from VRAM."""
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||||
|
await client.post(
|
||||||
|
f"{self.base_url}/api/generate",
|
||||||
|
json={"model": model, "prompt": "", "keep_alive": 0},
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[vram] flush {model} error: {e}", flush=True)
|
||||||
|
|
||||||
|
async def _poll_evicted(self, models: list[str], timeout: float) -> bool:
|
||||||
|
"""Poll /api/ps until none of the given models appear (or timeout)."""
|
||||||
|
deadline = asyncio.get_event_loop().time() + timeout
|
||||||
|
while asyncio.get_event_loop().time() < deadline:
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||||
|
resp = await client.get(f"{self.base_url}/api/ps")
|
||||||
|
data = resp.json()
|
||||||
|
loaded = {m.get("name", "") for m in data.get("models", [])}
|
||||||
|
if not any(m in loaded for m in models):
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[vram] poll_evicted error: {e}", flush=True)
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def _prewarm(self, model: str):
|
||||||
|
"""Load model into VRAM with keep_alive=300 (5 min)."""
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
||||||
|
await client.post(
|
||||||
|
f"{self.base_url}/api/generate",
|
||||||
|
json={"model": model, "prompt": "", "keep_alive": 300},
|
||||||
|
)
|
||||||
|
print(f"[vram] pre-warmed {model}", flush=True)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[vram] prewarm {model} error: {e}", flush=True)
|
||||||
Reference in New Issue
Block a user