Remove adolf — moved to separate repo (alvis/adolf)
This commit is contained in:
@@ -1,118 +0,0 @@
|
||||
# Adolf
|
||||
|
||||
Autonomous personal assistant with a multi-channel gateway. Three-tier model routing with GPU VRAM management.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────┐
|
||||
│ CHANNEL ADAPTERS │
|
||||
│ │
|
||||
│ [Telegram/Grammy] [CLI] [Voice — future] │
|
||||
│ ↕ ↕ ↕ │
|
||||
│ └────────────────┴────────────┘ │
|
||||
│ ↕ │
|
||||
│ ┌─────────────────────────┐ │
|
||||
│ │ GATEWAY (agent.py) │ │
|
||||
│ │ FastAPI :8000 │ │
|
||||
│ │ │ │
|
||||
│ │ POST /message │ ← all inbound │
|
||||
│ │ POST /chat (legacy) │ │
|
||||
│ │ GET /reply/{id} SSE │ ← CLI polling │
|
||||
│ │ GET /health │ │
|
||||
│ │ │ │
|
||||
│ │ channels.py registry │ │
|
||||
│ │ conversation buffers │ │
|
||||
│ └──────────┬──────────────┘ │
|
||||
│ ↓ │
|
||||
│ ┌──────────────────────┐ │
|
||||
│ │ AGENT CORE │ │
|
||||
│ │ three-tier routing │ │
|
||||
│ │ VRAM management │ │
|
||||
│ └──────────────────────┘ │
|
||||
│ ↓ │
|
||||
│ channels.deliver(session_id, channel, text)│
|
||||
│ ↓ ↓ │
|
||||
│ telegram → POST grammy/send cli → SSE queue │
|
||||
└─────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## Channel Adapters
|
||||
|
||||
| Channel | session_id | Inbound | Outbound |
|
||||
|---------|-----------|---------|---------|
|
||||
| Telegram | `tg-<chat_id>` | Grammy long-poll → POST /message | channels.py → POST grammy:3001/send |
|
||||
| CLI | `cli-<user>` | POST /message directly | GET /reply/{id} SSE stream |
|
||||
| Voice | `voice-<device>` | (future) | (future) |
|
||||
|
||||
## Unified Message Flow
|
||||
|
||||
```
|
||||
1. Channel adapter receives message
|
||||
2. POST /message {text, session_id, channel, user_id}
|
||||
3. 202 Accepted immediately
|
||||
4. Background: run_agent_task(message, session_id, channel)
|
||||
5. Route → run agent tier → get reply text
|
||||
6. channels.deliver(session_id, channel, reply_text)
|
||||
- always puts reply in pending_replies[session_id] queue (for SSE)
|
||||
- calls channel-specific send callback
|
||||
7. GET /reply/{session_id} SSE clients receive the reply
|
||||
```
|
||||
|
||||
## Three-Tier Model Routing
|
||||
|
||||
| Tier | Model | VRAM | Trigger | Latency |
|
||||
|------|-------|------|---------|---------|
|
||||
| Light | qwen2.5:1.5b (router answers) | ~1.2 GB | Router classifies as light | ~2–4s |
|
||||
| Medium | qwen3:4b | ~2.5 GB | Default | ~20–40s |
|
||||
| Complex | qwen3:8b | ~6.0 GB | `/think` prefix | ~60–120s |
|
||||
|
||||
**`/think` prefix**: forces complex tier, stripped before sending to agent.
|
||||
|
||||
## VRAM Management
|
||||
|
||||
GTX 1070 — 8 GB. Ollama must be restarted if CUDA init fails (model loads on CPU).
|
||||
|
||||
1. Flush explicitly before loading qwen3:8b (`keep_alive=0`)
|
||||
2. Verify eviction via `/api/ps` poll (15s timeout) before proceeding
|
||||
3. Fallback: timeout → run medium agent instead
|
||||
4. Post-complex: flush 8b, pre-warm 4b + router
|
||||
|
||||
## Session ID Convention
|
||||
|
||||
- Telegram: `tg-<chat_id>` (e.g. `tg-346967270`)
|
||||
- CLI: `cli-<username>` (e.g. `cli-alvis`)
|
||||
|
||||
Conversation history is keyed by session_id (5-turn buffer).
|
||||
|
||||
## Files
|
||||
|
||||
```
|
||||
adolf/
|
||||
├── docker-compose.yml Services: deepagents, openmemory, grammy
|
||||
├── Dockerfile deepagents container (Python 3.12)
|
||||
├── agent.py FastAPI gateway + three-tier routing
|
||||
├── channels.py Channel registry + deliver() + pending_replies
|
||||
├── router.py Router class — qwen2.5:1.5b routing
|
||||
├── vram_manager.py VRAMManager — flush/prewarm/poll Ollama VRAM
|
||||
├── agent_factory.py build_medium_agent / build_complex_agent
|
||||
├── cli.py Interactive CLI REPL client
|
||||
├── wiki_research.py Batch wiki research pipeline (uses /message + SSE)
|
||||
├── .env TELEGRAM_BOT_TOKEN (not committed)
|
||||
├── openmemory/
|
||||
│ ├── server.py FastMCP + mem0 MCP tools
|
||||
│ └── Dockerfile
|
||||
└── grammy/
|
||||
├── bot.mjs grammY Telegram bot + POST /send HTTP endpoint
|
||||
├── package.json
|
||||
└── Dockerfile
|
||||
```
|
||||
|
||||
## External Services (from openai/ stack)
|
||||
|
||||
| Service | Host Port | Role |
|
||||
|---------|-----------|------|
|
||||
| Ollama GPU | 11436 | All reply inference |
|
||||
| Ollama CPU | 11435 | Memory embedding (nomic-embed-text) |
|
||||
| Qdrant | 6333 | Vector store for memories |
|
||||
| SearXNG | 11437 | Web search |
|
||||
@@ -1,10 +0,0 @@
|
||||
FROM python:3.12-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
RUN pip install --no-cache-dir deepagents langchain-ollama langgraph \
|
||||
fastapi uvicorn langchain-mcp-adapters langchain-community httpx
|
||||
|
||||
COPY agent.py channels.py vram_manager.py router.py agent_factory.py hello_world.py .
|
||||
|
||||
CMD ["uvicorn", "agent:app", "--host", "0.0.0.0", "--port", "8000"]
|
||||
386
adolf/agent.py
386
adolf/agent.py
@@ -1,386 +0,0 @@
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from fastapi import FastAPI, BackgroundTasks, Request
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
import re as _re
|
||||
import httpx as _httpx
|
||||
|
||||
from langchain_ollama import ChatOllama
|
||||
from langchain_mcp_adapters.client import MultiServerMCPClient
|
||||
from langchain_community.utilities import SearxSearchWrapper
|
||||
from langchain_core.tools import Tool
|
||||
|
||||
from vram_manager import VRAMManager
|
||||
from router import Router
|
||||
from agent_factory import build_medium_agent, build_complex_agent
|
||||
import channels
|
||||
|
||||
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
|
||||
ROUTER_MODEL = os.getenv("DEEPAGENTS_ROUTER_MODEL", "qwen2.5:0.5b")
|
||||
MEDIUM_MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:4b")
|
||||
COMPLEX_MODEL = os.getenv("DEEPAGENTS_COMPLEX_MODEL", "qwen3:8b")
|
||||
SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437")
|
||||
OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765")
|
||||
CRAWL4AI_URL = os.getenv("CRAWL4AI_URL", "http://crawl4ai:11235")
|
||||
|
||||
MAX_HISTORY_TURNS = 5
|
||||
_conversation_buffers: dict[str, list] = {}
|
||||
|
||||
MEDIUM_SYSTEM_PROMPT = (
|
||||
"You are a helpful AI assistant. "
|
||||
"Use web_search for questions about current events or facts you don't know. "
|
||||
"Reply concisely."
|
||||
)
|
||||
|
||||
COMPLEX_SYSTEM_PROMPT = (
|
||||
"You are a deep research assistant. "
|
||||
"web_search automatically fetches full page content from top results — use it 6+ times with different queries. "
|
||||
"Also call fetch_url on any specific URL you want to read in full.\n\n"
|
||||
"Run searches in English AND Russian/Latvian. "
|
||||
"After getting results, run follow-up searches based on new facts found.\n\n"
|
||||
"Write a structured markdown report with sections: "
|
||||
"Overview, Education, Career, Publications, Online Presence, Interesting Findings.\n"
|
||||
"Every fact must link to the real URL it came from: [fact](url). "
|
||||
"NEVER invent URLs. End with: **Sources checked: N**"
|
||||
)
|
||||
|
||||
medium_agent = None
|
||||
complex_agent = None
|
||||
router: Router = None
|
||||
vram_manager: VRAMManager = None
|
||||
mcp_client = None
|
||||
|
||||
# GPU mutex: one LLM inference at a time
|
||||
_reply_semaphore = asyncio.Semaphore(1)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
global medium_agent, complex_agent, router, vram_manager, mcp_client
|
||||
|
||||
# Register channel adapters
|
||||
channels.register_defaults()
|
||||
|
||||
# Three model instances
|
||||
router_model = ChatOllama(
|
||||
model=ROUTER_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=4096,
|
||||
temperature=0,
|
||||
)
|
||||
medium_model = ChatOllama(
|
||||
model=MEDIUM_MODEL, base_url=OLLAMA_BASE_URL, think=False, num_ctx=8192
|
||||
)
|
||||
complex_model = ChatOllama(
|
||||
model=COMPLEX_MODEL, base_url=OLLAMA_BASE_URL, think=True, num_ctx=16384
|
||||
)
|
||||
|
||||
vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL)
|
||||
router = Router(model=router_model)
|
||||
|
||||
mcp_connections = {
|
||||
"openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"},
|
||||
}
|
||||
mcp_client = MultiServerMCPClient(mcp_connections)
|
||||
for attempt in range(12):
|
||||
try:
|
||||
mcp_tools = await mcp_client.get_tools()
|
||||
break
|
||||
except Exception as e:
|
||||
if attempt == 11:
|
||||
raise
|
||||
print(f"[agent] MCP not ready (attempt {attempt + 1}/12): {e}. Retrying in 5s...")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
agent_tools = [t for t in mcp_tools if t.name not in ("add_memory", "search_memory", "get_all_memories")]
|
||||
|
||||
searx = SearxSearchWrapper(searx_host=SEARXNG_URL)
|
||||
|
||||
def _crawl4ai_fetch(url: str) -> str:
|
||||
"""Fetch a URL via Crawl4AI (JS-rendered, bot-bypass) and return clean markdown."""
|
||||
try:
|
||||
r = _httpx.post(f"{CRAWL4AI_URL}/crawl", json={"urls": [url]}, timeout=60)
|
||||
r.raise_for_status()
|
||||
results = r.json().get("results", [])
|
||||
if not results or not results[0].get("success"):
|
||||
return ""
|
||||
md_obj = results[0].get("markdown") or {}
|
||||
md = md_obj.get("raw_markdown") if isinstance(md_obj, dict) else str(md_obj)
|
||||
return (md or "")[:5000]
|
||||
except Exception as e:
|
||||
return f"[fetch error: {e}]"
|
||||
|
||||
def _search_and_read(query: str) -> str:
|
||||
"""Search the web and automatically fetch full content of top results.
|
||||
Returns snippets + full page content from the top URLs."""
|
||||
import json as _json
|
||||
# Get structured results from SearXNG
|
||||
try:
|
||||
r = _httpx.get(
|
||||
f"{SEARXNG_URL}/search",
|
||||
params={"q": query, "format": "json"},
|
||||
timeout=15,
|
||||
)
|
||||
data = r.json()
|
||||
items = data.get("results", [])[:5]
|
||||
except Exception as e:
|
||||
return f"[search error: {e}]"
|
||||
|
||||
if not items:
|
||||
return "No results found."
|
||||
|
||||
out = [f"Search: {query}\n"]
|
||||
for i, item in enumerate(items, 1):
|
||||
url = item.get("url", "")
|
||||
title = item.get("title", "")
|
||||
snippet = item.get("content", "")[:300]
|
||||
out.append(f"\n[{i}] {title}\nURL: {url}\nSnippet: {snippet}")
|
||||
|
||||
# Auto-fetch top 2 URLs for full content
|
||||
out.append("\n\n--- Full page content ---")
|
||||
for item in items[:2]:
|
||||
url = item.get("url", "")
|
||||
if not url:
|
||||
continue
|
||||
content = _crawl4ai_fetch(url)
|
||||
if content and not content.startswith("[fetch error"):
|
||||
out.append(f"\n### {url}\n{content[:3000]}")
|
||||
|
||||
return "\n".join(out)
|
||||
|
||||
agent_tools.append(Tool(
|
||||
name="web_search",
|
||||
func=_search_and_read,
|
||||
description=(
|
||||
"Search the web and read full content of top results. "
|
||||
"Returns search snippets AND full page text from the top URLs. "
|
||||
"Use multiple different queries to research a topic thoroughly."
|
||||
),
|
||||
))
|
||||
|
||||
def _fetch_url(url: str) -> str:
|
||||
"""Fetch and read the full text content of a URL."""
|
||||
content = _crawl4ai_fetch(url)
|
||||
return content if content else "[fetch_url: empty or blocked]"
|
||||
|
||||
agent_tools.append(Tool(
|
||||
name="fetch_url",
|
||||
func=_fetch_url,
|
||||
description=(
|
||||
"Fetch and read the full text content of a specific URL. "
|
||||
"Use for URLs not covered by web_search. Input: a single URL string."
|
||||
),
|
||||
))
|
||||
|
||||
medium_agent = build_medium_agent(
|
||||
model=medium_model,
|
||||
agent_tools=agent_tools,
|
||||
system_prompt=MEDIUM_SYSTEM_PROMPT,
|
||||
)
|
||||
complex_agent = build_complex_agent(
|
||||
model=complex_model,
|
||||
agent_tools=agent_tools,
|
||||
system_prompt=COMPLEX_SYSTEM_PROMPT.format(user_id="{user_id}"),
|
||||
)
|
||||
|
||||
print(
|
||||
f"[agent] three-tier: router={ROUTER_MODEL} | medium={MEDIUM_MODEL} | complex={COMPLEX_MODEL}",
|
||||
flush=True,
|
||||
)
|
||||
print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True)
|
||||
|
||||
yield
|
||||
|
||||
medium_agent = None
|
||||
complex_agent = None
|
||||
router = None
|
||||
vram_manager = None
|
||||
mcp_client = None
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
|
||||
# ── request models ─────────────────────────────────────────────────────────────
|
||||
|
||||
class InboundMessage(BaseModel):
|
||||
text: str
|
||||
session_id: str # e.g. "tg-346967270", "cli-alvis"
|
||||
channel: str # "telegram" | "cli"
|
||||
user_id: str = "" # human identity; defaults to session_id if empty
|
||||
metadata: dict = {}
|
||||
|
||||
|
||||
class ChatRequest(BaseModel):
|
||||
"""Legacy model — kept for test_pipeline.py compatibility."""
|
||||
message: str
|
||||
chat_id: str
|
||||
|
||||
|
||||
# ── helpers ────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _extract_final_text(result) -> str | None:
|
||||
msgs = result.get("messages", [])
|
||||
for m in reversed(msgs):
|
||||
if type(m).__name__ == "AIMessage" and getattr(m, "content", ""):
|
||||
return m.content
|
||||
if isinstance(result, dict) and result.get("output"):
|
||||
return result["output"]
|
||||
return None
|
||||
|
||||
|
||||
def _log_messages(result):
|
||||
msgs = result.get("messages", [])
|
||||
for m in msgs:
|
||||
role = type(m).__name__
|
||||
content = getattr(m, "content", "")
|
||||
tool_calls = getattr(m, "tool_calls", [])
|
||||
if content:
|
||||
print(f"[agent] {role}: {str(content)[:150]}", flush=True)
|
||||
for tc in tool_calls:
|
||||
print(f"[agent] {role} → {tc['name']}({tc['args']})", flush=True)
|
||||
|
||||
|
||||
# ── core task ──────────────────────────────────────────────────────────────────
|
||||
|
||||
async def run_agent_task(message: str, session_id: str, channel: str = "telegram"):
|
||||
print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True)
|
||||
|
||||
force_complex = False
|
||||
clean_message = message
|
||||
if message.startswith("/think "):
|
||||
force_complex = True
|
||||
clean_message = message[len("/think "):]
|
||||
print("[agent] /think prefix → force_complex=True", flush=True)
|
||||
|
||||
async with _reply_semaphore:
|
||||
t0 = time.monotonic()
|
||||
history = _conversation_buffers.get(session_id, [])
|
||||
print(f"[agent] running: {clean_message[:80]!r}", flush=True)
|
||||
|
||||
tier, light_reply = await router.route(clean_message, history, force_complex)
|
||||
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
|
||||
|
||||
final_text = None
|
||||
try:
|
||||
if tier == "light":
|
||||
final_text = light_reply
|
||||
llm_elapsed = time.monotonic() - t0
|
||||
print(f"[agent] light path: answered by router", flush=True)
|
||||
|
||||
elif tier == "medium":
|
||||
system_prompt = MEDIUM_SYSTEM_PROMPT
|
||||
result = await medium_agent.ainvoke({
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
*history,
|
||||
{"role": "user", "content": clean_message},
|
||||
]
|
||||
})
|
||||
llm_elapsed = time.monotonic() - t0
|
||||
_log_messages(result)
|
||||
final_text = _extract_final_text(result)
|
||||
|
||||
else: # complex
|
||||
ok = await vram_manager.enter_complex_mode()
|
||||
if not ok:
|
||||
print("[agent] complex→medium fallback (eviction timeout)", flush=True)
|
||||
tier = "medium"
|
||||
result = await medium_agent.ainvoke({
|
||||
"messages": [
|
||||
{"role": "system", "content": MEDIUM_SYSTEM_PROMPT},
|
||||
*history,
|
||||
{"role": "user", "content": clean_message},
|
||||
]
|
||||
})
|
||||
else:
|
||||
system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id)
|
||||
result = await complex_agent.ainvoke({
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
*history,
|
||||
{"role": "user", "content": clean_message},
|
||||
]
|
||||
})
|
||||
asyncio.create_task(vram_manager.exit_complex_mode())
|
||||
|
||||
llm_elapsed = time.monotonic() - t0
|
||||
_log_messages(result)
|
||||
final_text = _extract_final_text(result)
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
llm_elapsed = time.monotonic() - t0
|
||||
print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True)
|
||||
traceback.print_exc()
|
||||
|
||||
# Deliver reply through the originating channel
|
||||
if final_text:
|
||||
t1 = time.monotonic()
|
||||
await channels.deliver(session_id, channel, final_text)
|
||||
send_elapsed = time.monotonic() - t1
|
||||
print(
|
||||
f"[agent] replied in {time.monotonic() - t0:.1f}s "
|
||||
f"(llm={llm_elapsed:.1f}s, send={send_elapsed:.1f}s) tier={tier}",
|
||||
flush=True,
|
||||
)
|
||||
print(f"[agent] reply_text: {final_text}", flush=True)
|
||||
else:
|
||||
print("[agent] warning: no text reply from agent", flush=True)
|
||||
|
||||
# Update conversation buffer
|
||||
if final_text:
|
||||
buf = _conversation_buffers.get(session_id, [])
|
||||
buf.append({"role": "user", "content": clean_message})
|
||||
buf.append({"role": "assistant", "content": final_text})
|
||||
_conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):]
|
||||
|
||||
|
||||
# ── endpoints ──────────────────────────────────────────────────────────────────
|
||||
|
||||
@app.post("/message")
|
||||
async def message(request: InboundMessage, background_tasks: BackgroundTasks):
|
||||
"""Unified inbound endpoint for all channels."""
|
||||
if medium_agent is None:
|
||||
return JSONResponse(status_code=503, content={"error": "Agent not ready"})
|
||||
session_id = request.session_id
|
||||
channel = request.channel
|
||||
background_tasks.add_task(run_agent_task, request.text, session_id, channel)
|
||||
return JSONResponse(status_code=202, content={"status": "accepted"})
|
||||
|
||||
|
||||
@app.post("/chat")
|
||||
async def chat(request: ChatRequest, background_tasks: BackgroundTasks):
|
||||
"""Legacy endpoint — maps chat_id to tg-<chat_id> session for backward compatibility."""
|
||||
if medium_agent is None:
|
||||
return JSONResponse(status_code=503, content={"error": "Agent not ready"})
|
||||
session_id = f"tg-{request.chat_id}"
|
||||
background_tasks.add_task(run_agent_task, request.message, session_id, "telegram")
|
||||
return JSONResponse(status_code=202, content={"status": "accepted"})
|
||||
|
||||
|
||||
@app.get("/reply/{session_id}")
|
||||
async def reply_stream(session_id: str):
|
||||
"""
|
||||
SSE endpoint — streams the reply for a session once available, then closes.
|
||||
Used by CLI client and wiki_research.py instead of log polling.
|
||||
"""
|
||||
q = channels.pending_replies.setdefault(session_id, asyncio.Queue())
|
||||
|
||||
async def event_generator():
|
||||
try:
|
||||
text = await asyncio.wait_for(q.get(), timeout=900)
|
||||
# Escape newlines so entire reply fits in one SSE data line
|
||||
yield f"data: {text.replace(chr(10), '\\n').replace(chr(13), '')}\n\n"
|
||||
except asyncio.TimeoutError:
|
||||
yield "data: [timeout]\n\n"
|
||||
|
||||
return StreamingResponse(event_generator(), media_type="text/event-stream")
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
return {"status": "ok", "agent_ready": medium_agent is not None}
|
||||
@@ -1,19 +0,0 @@
|
||||
from deepagents import create_deep_agent
|
||||
|
||||
|
||||
def build_medium_agent(model, agent_tools: list, system_prompt: str):
|
||||
"""Medium agent: create_deep_agent with TodoList planning, no subagents."""
|
||||
return create_deep_agent(
|
||||
model=model,
|
||||
tools=agent_tools,
|
||||
system_prompt=system_prompt,
|
||||
)
|
||||
|
||||
|
||||
def build_complex_agent(model, agent_tools: list, system_prompt: str):
|
||||
"""Complex agent: direct agentic loop — calls web_search/fetch_url itself, no subagent indirection."""
|
||||
return create_deep_agent(
|
||||
model=model,
|
||||
tools=agent_tools,
|
||||
system_prompt=system_prompt,
|
||||
)
|
||||
@@ -1,75 +0,0 @@
|
||||
"""
|
||||
Channel registry and reply delivery for the Adolf multi-channel gateway.
|
||||
|
||||
Each channel registers an async send callback:
|
||||
channels.register("telegram", telegram_send)
|
||||
channels.register("cli", cli_send)
|
||||
|
||||
When the agent is done, it calls:
|
||||
await channels.deliver(session_id, channel, text)
|
||||
|
||||
Replies are also placed into per-session asyncio Queues so the
|
||||
GET /reply/{session_id} SSE endpoint can stream them to polling clients.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Awaitable, Callable
|
||||
|
||||
import httpx
|
||||
|
||||
# ── channel registry ──────────────────────────────────────────────────────────
|
||||
|
||||
_callbacks: dict[str, Callable[[str, str], Awaitable[None]]] = {}
|
||||
# session_id → Queue that receives the final reply text
|
||||
pending_replies: dict[str, asyncio.Queue] = {}
|
||||
|
||||
|
||||
def register(channel: str, callback: Callable[[str, str], Awaitable[None]]) -> None:
|
||||
"""Register an async send callback for a channel name."""
|
||||
_callbacks[channel] = callback
|
||||
|
||||
|
||||
async def deliver(session_id: str, channel: str, text: str) -> None:
|
||||
"""
|
||||
Deliver a reply to the originating channel AND put it in the pending
|
||||
queue so SSE /reply/{session_id} clients get it.
|
||||
"""
|
||||
# Always enqueue first so SSE listeners aren't missed
|
||||
q = pending_replies.setdefault(session_id, asyncio.Queue())
|
||||
await q.put(text)
|
||||
|
||||
cb = _callbacks.get(channel)
|
||||
if cb:
|
||||
await cb(session_id, text)
|
||||
else:
|
||||
print(f"[channels] no callback for channel={channel!r}, reply queued only", flush=True)
|
||||
|
||||
|
||||
# ── built-in channel adapters ─────────────────────────────────────────────────
|
||||
|
||||
GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001")
|
||||
|
||||
|
||||
async def _telegram_send(session_id: str, text: str) -> None:
|
||||
"""Send reply to Telegram via Grammy's POST /send endpoint."""
|
||||
chat_id = session_id.removeprefix("tg-")
|
||||
MAX_TG = 4000
|
||||
chunks = [text[i:i + MAX_TG] for i in range(0, len(text), MAX_TG)]
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
for chunk in chunks:
|
||||
await client.post(
|
||||
f"{GRAMMY_URL}/send",
|
||||
json={"chat_id": chat_id, "text": chunk},
|
||||
)
|
||||
|
||||
|
||||
async def _cli_send(session_id: str, text: str) -> None:
|
||||
"""CLI replies are handled entirely through the pending_replies queue — no-op here."""
|
||||
pass
|
||||
|
||||
|
||||
def register_defaults() -> None:
|
||||
"""Register the built-in Telegram and CLI channel adapters."""
|
||||
register("telegram", _telegram_send)
|
||||
register("cli", _cli_send)
|
||||
80
adolf/cli.py
80
adolf/cli.py
@@ -1,80 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Adolf CLI — interactive REPL for the multi-channel gateway.
|
||||
|
||||
Usage:
|
||||
python3 cli.py [--url http://localhost:8000] [--session cli-alvis]
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.request
|
||||
|
||||
GATEWAY = "http://localhost:8000"
|
||||
|
||||
|
||||
def post_message(gateway: str, text: str, session_id: str) -> None:
|
||||
payload = json.dumps({
|
||||
"text": text,
|
||||
"session_id": session_id,
|
||||
"channel": "cli",
|
||||
"user_id": os.getlogin(),
|
||||
}).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{gateway}/message",
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
if r.status != 202:
|
||||
print(f"[error] gateway returned {r.status}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def wait_for_reply(gateway: str, session_id: str, timeout: int = 400) -> str:
|
||||
"""Open SSE stream and return first data event."""
|
||||
req = urllib.request.Request(
|
||||
f"{gateway}/reply/{session_id}",
|
||||
headers={"Accept": "text/event-stream"},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=timeout + 5) as r:
|
||||
for raw_line in r:
|
||||
line = raw_line.decode("utf-8").rstrip("\n")
|
||||
if line.startswith("data:"):
|
||||
return line[5:].strip().replace("\\n", "\n")
|
||||
return ""
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Adolf CLI")
|
||||
parser.add_argument("--url", default=GATEWAY, help="Gateway URL")
|
||||
parser.add_argument("--session", default=f"cli-{os.getlogin()}", help="Session ID")
|
||||
parser.add_argument("--timeout", type=int, default=400, help="Reply timeout (seconds)")
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"Adolf CLI (session={args.session}, gateway={args.url})")
|
||||
print("Type your message and press Enter. Ctrl+C or Ctrl+D to exit.\n")
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
text = input("> ").strip()
|
||||
except EOFError:
|
||||
break
|
||||
if not text:
|
||||
continue
|
||||
|
||||
post_message(args.url, text, args.session)
|
||||
print("...", end="", flush=True)
|
||||
reply = wait_for_reply(args.url, args.session, timeout=args.timeout)
|
||||
print(f"\r{reply}\n")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nbye")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,56 +0,0 @@
|
||||
services:
|
||||
deepagents:
|
||||
build: .
|
||||
container_name: deepagents
|
||||
ports:
|
||||
- "8000:8000"
|
||||
environment:
|
||||
- PYTHONUNBUFFERED=1
|
||||
- OLLAMA_BASE_URL=http://host.docker.internal:11436
|
||||
- DEEPAGENTS_MODEL=qwen3:4b
|
||||
- DEEPAGENTS_COMPLEX_MODEL=qwen3:8b
|
||||
- DEEPAGENTS_ROUTER_MODEL=qwen2.5:1.5b
|
||||
- SEARXNG_URL=http://host.docker.internal:11437
|
||||
- GRAMMY_URL=http://grammy:3001
|
||||
- CRAWL4AI_URL=http://crawl4ai:11235
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
depends_on:
|
||||
- openmemory
|
||||
- grammy
|
||||
- crawl4ai
|
||||
restart: unless-stopped
|
||||
|
||||
openmemory:
|
||||
build: ./openmemory
|
||||
container_name: openmemory
|
||||
ports:
|
||||
- "8765:8765"
|
||||
environment:
|
||||
# Extraction LLM (qwen2.5:1.5b) runs on GPU after reply — fast 2-5s extraction
|
||||
- OLLAMA_GPU_URL=http://host.docker.internal:11436
|
||||
# Embedding (nomic-embed-text) runs on CPU — fast enough for search (50-150ms)
|
||||
- OLLAMA_CPU_URL=http://host.docker.internal:11435
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
restart: unless-stopped
|
||||
|
||||
grammy:
|
||||
build: ./grammy
|
||||
container_name: grammy
|
||||
ports:
|
||||
- "3001:3001"
|
||||
environment:
|
||||
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN}
|
||||
- DEEPAGENTS_URL=http://deepagents:8000
|
||||
restart: unless-stopped
|
||||
|
||||
crawl4ai:
|
||||
image: unclecode/crawl4ai:latest
|
||||
container_name: crawl4ai
|
||||
ports:
|
||||
- "11235:11235"
|
||||
environment:
|
||||
- CRAWL4AI_LOG_LEVEL=WARNING
|
||||
shm_size: "1g"
|
||||
restart: unless-stopped
|
||||
@@ -1,6 +0,0 @@
|
||||
FROM node:22-alpine
|
||||
WORKDIR /app
|
||||
COPY package.json .
|
||||
RUN npm install
|
||||
COPY bot.mjs .
|
||||
CMD ["node", "bot.mjs"]
|
||||
@@ -1,56 +0,0 @@
|
||||
import { Bot } from "grammy";
|
||||
import express from "express";
|
||||
|
||||
const TELEGRAM_BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN;
|
||||
const DEEPAGENTS_URL = process.env.DEEPAGENTS_URL || "http://deepagents:8000";
|
||||
|
||||
const bot = new Bot(TELEGRAM_BOT_TOKEN);
|
||||
|
||||
// Forward all text messages to the unified gateway /message endpoint
|
||||
bot.on("message:text", (ctx) => {
|
||||
const text = ctx.message.text;
|
||||
const chat_id = String(ctx.chat.id);
|
||||
|
||||
console.log(`[grammy] message from ${chat_id}: ${text.slice(0, 80)}`);
|
||||
|
||||
fetch(`${DEEPAGENTS_URL}/message`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
text,
|
||||
session_id: `tg-${chat_id}`,
|
||||
channel: "telegram",
|
||||
user_id: chat_id,
|
||||
}),
|
||||
}).catch((err) => console.error("[grammy] error forwarding to deepagents:", err));
|
||||
});
|
||||
|
||||
// HTTP server — delivers replies from the gateway back to Telegram
|
||||
const app = express();
|
||||
app.use(express.json());
|
||||
|
||||
app.post("/send", async (req, res) => {
|
||||
const { chat_id, text } = req.body;
|
||||
if (!chat_id || !text) {
|
||||
res.status(400).json({ error: "chat_id and text required" });
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await bot.api.sendMessage(chat_id, text);
|
||||
console.log(`[grammy] sent to ${chat_id}: ${text.slice(0, 60)}`);
|
||||
res.json({ ok: true });
|
||||
} catch (err) {
|
||||
console.error(`[grammy] send error to ${chat_id}:`, err.message);
|
||||
res.status(500).json({ error: err.message });
|
||||
}
|
||||
});
|
||||
|
||||
app.get("/health", (_req, res) => res.json({ ok: true }));
|
||||
|
||||
app.listen(3001, () => {
|
||||
console.log("[grammy] HTTP server listening on port 3001");
|
||||
});
|
||||
|
||||
bot.start({
|
||||
onStart: (info) => console.log(`[grammy] bot @${info.username} started`),
|
||||
});
|
||||
@@ -1,7 +0,0 @@
|
||||
{
|
||||
"type": "module",
|
||||
"dependencies": {
|
||||
"grammy": "^1.36.0",
|
||||
"express": "^4.21.0"
|
||||
}
|
||||
}
|
||||
@@ -1,21 +0,0 @@
|
||||
import os
|
||||
from langchain_ollama import ChatOllama
|
||||
from deepagents import create_deep_agent
|
||||
|
||||
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
|
||||
MODEL = os.getenv("DEEPAGENTS_MODEL", "gemma3:4b")
|
||||
|
||||
print(f"Connecting to Ollama at {OLLAMA_BASE_URL} with model {MODEL}")
|
||||
|
||||
model = ChatOllama(model=MODEL, base_url=OLLAMA_BASE_URL)
|
||||
|
||||
agent = create_deep_agent(model=model)
|
||||
|
||||
result = agent.invoke({
|
||||
"messages": [{"role": "user", "content": "Say hello world in one sentence."}]
|
||||
})
|
||||
|
||||
print("\n--- Agent Response ---")
|
||||
for msg in result.get("messages", []):
|
||||
if hasattr(msg, "content") and msg.content:
|
||||
print(f"[{msg.__class__.__name__}]: {msg.content}")
|
||||
@@ -1,247 +0,0 @@
|
||||
# LangGraph: Multi-Model Routing Architecture
|
||||
|
||||
## Problem
|
||||
|
||||
`create_react_agent` uses one model for all steps in the ReAct loop:
|
||||
|
||||
```
|
||||
qwen3:4b → decide to call tool ~37s
|
||||
→ run tool ~1s
|
||||
qwen3:4b → final answer ~37s
|
||||
─────────────────────────────────────
|
||||
Total ~75s
|
||||
```
|
||||
|
||||
The routing step is classification + argument extraction — low entropy, constrained output.
|
||||
It does not need the same model as answer generation.
|
||||
|
||||
---
|
||||
|
||||
## Is the Pattern Established? (2025 Research)
|
||||
|
||||
Yes. The 2025 consensus from multiple papers treats heterogeneous model architectures
|
||||
(small for routing, large for generation) as **settled production engineering**, not research:
|
||||
|
||||
- SLMs (1–12B) match or exceed LLMs on schema-constrained tasks (tool calls, JSON, function
|
||||
calling) at 10×–100× lower compute cost (arXiv 2510.03847, arXiv 2506.02153)
|
||||
- MasRouter (ACL 2025): routing in multi-agent graphs reduces costs 2× without quality loss
|
||||
- Cascade routing (ICLR 2025): 4% accuracy improvement, 30–92% cost reduction vs naive routing
|
||||
- NVIDIA research (2025): "Small Language Models are the Future of Agentic AI"
|
||||
|
||||
**Limitations acknowledged in literature:**
|
||||
- Bad router defeats the purpose — classifier quality is critical
|
||||
- Cascade (try small, escalate if uncertain) adds latency on queries that escalate
|
||||
- Pre-trained routers (RouteLLM, etc.) are calibrated for specific model pairs; local model
|
||||
pairs need independent validation
|
||||
|
||||
---
|
||||
|
||||
## Three-Tier Architecture (small → medium → large)
|
||||
|
||||
### Concept
|
||||
|
||||
```
|
||||
Incoming query
|
||||
↓
|
||||
[Router: tiny model or embedding classifier] ~1-2s
|
||||
├── simple/conversational → [Medium: qwen3:4b] ~20s
|
||||
├── needs tool call → [Medium: qwen3:4b + tools] ~20-40s
|
||||
└── complex/multi-step → [Large: qwen3:8b + sub-agents] ~60s+
|
||||
```
|
||||
|
||||
### When to route to large
|
||||
|
||||
Signals that justify loading a larger model:
|
||||
- Multi-step reasoning required (math, code, planning)
|
||||
- Sub-agent orchestration (the agent needs to call other agents)
|
||||
- Explicit reasoning request ("think through", "analyze carefully")
|
||||
- Low confidence from medium model (cascade pattern)
|
||||
|
||||
### Trade-offs of three-tier vs two-tier
|
||||
|
||||
| | Two-tier | Three-tier |
|
||||
|--|---------|-----------|
|
||||
| Simple queries | small router + medium answer | small router + medium answer (same) |
|
||||
| Complex queries | medium (may struggle) | swap to large (better quality) |
|
||||
| GPU constraint | manageable | hard — see below |
|
||||
| Routing error cost | low | high (wrong tier = much slower) |
|
||||
|
||||
---
|
||||
|
||||
## The 8GB GPU Constraint — Core Problem
|
||||
|
||||
This is the central issue. Research numbers on model swapping (2025):
|
||||
|
||||
**Cold swap from disk (no optimization)**
|
||||
- TTFT exceeds 140s for 7B-class models on HDD; 5–15s on NVMe SSD
|
||||
- Not viable for interactive use at any tier
|
||||
|
||||
**vLLM Sleep Mode (offload to CPU RAM, not disk)**
|
||||
- 18–200× faster than cold start; TTFT 2–3s per switch
|
||||
- vLLM-specific — not available in Ollama
|
||||
|
||||
**Ollama behavior on 8GB VRAM**
|
||||
- Default `keep_alive`: 5 minutes — model stays warm after use
|
||||
- Two models simultaneously: qwen3:4b (~2.5GB) + qwen2.5:1.5b (~1.2GB) = ~3.7GB — fits
|
||||
- qwen3:4b + qwen3:8b = ~8GB — does not reliably fit; eviction required
|
||||
- Sequential swap in Ollama: Ollama evicts old model, loads new one from SSD (~5–15s on NVMe)
|
||||
- Known Ollama bug: model spills from VRAM to RAM → all subsequent loads stay on CPU until restart
|
||||
|
||||
**Conclusion for three-tier on single 8GB GPU:**
|
||||
|
||||
| Tier switch | Cost | Viable? |
|
||||
|------------|------|---------|
|
||||
| tiny router → medium (qwen3:4b) | model swap ~5-15s if router is separate | borderline |
|
||||
| medium → large (qwen3:8b) | evict qwen3:4b, load qwen3:8b = ~5-15s additional | no, for interactive |
|
||||
| Keep medium always warm, route to large on demand | 5-15s swap overhead per complex query | acceptable if complex queries are rare |
|
||||
|
||||
**Honest verdict: three-tier with model swapping is not viable for interactive per-turn latency
|
||||
on 8GB VRAM with Ollama.** vLLM with Sleep Mode would make it viable (2–3s switch) but
|
||||
requires replacing Ollama.
|
||||
|
||||
---
|
||||
|
||||
## Practical Architecture for 8GB GPU (Ollama)
|
||||
|
||||
### Option 1: Two-tier, both models always in VRAM (recommended)
|
||||
|
||||
Keep two small models loaded simultaneously:
|
||||
|
||||
```
|
||||
qwen2.5:0.5b (~0.4GB) — router: tool call decision + arg extraction
|
||||
qwen3:4b (~2.5GB) — answer: all generation
|
||||
nomic-embed-text (CPU) — embedding: search and store
|
||||
qwen2.5:1.5b (~1.2GB) — extraction: mem0 fact extraction (GPU)
|
||||
─────────────────────────────────────────────────────────
|
||||
Total VRAM: ~4.1GB — well within 8GB
|
||||
```
|
||||
|
||||
No swapping needed. Router runs first (~1-2s), answer model runs after (~20s).
|
||||
|
||||
```
|
||||
Router → tool call JSON or "no tool" ~1-2s
|
||||
→ tool runs (if needed) ~1s
|
||||
→ Answer model generates reply ~20s
|
||||
─────────────────────────────────────────────
|
||||
Total ~22-23s
|
||||
```
|
||||
|
||||
vs current two-call approach: ~75s.
|
||||
|
||||
### Option 2: Semantic routing (encoder-only, free)
|
||||
|
||||
Use nomic-embed-text (already running on CPU) as the router:
|
||||
|
||||
```python
|
||||
query_vec = embed(query)
|
||||
sims = {
|
||||
"search_memory": cosine(query_vec, memory_topic_vec),
|
||||
"web_search": cosine(query_vec, web_topic_vec),
|
||||
}
|
||||
# If max sim > threshold → call that tool directly
|
||||
# Then pass result + original query to answer model
|
||||
```
|
||||
|
||||
Zero VRAM overhead. ~50ms routing. Can't extract tool args from embedding alone —
|
||||
needs hardcoded arg construction (e.g. query = original user message).
|
||||
|
||||
### Option 3: Three-tier with rare large-model escalation
|
||||
|
||||
Keep qwen3:4b warm. Route to qwen3:8b only for explicitly complex tasks.
|
||||
Accept ~10s swap overhead for those queries. qwen3:8b gets unloaded after.
|
||||
|
||||
```
|
||||
Router → simple → qwen3:4b ~20s (no swap)
|
||||
Router → complex → evict 4b, load 8b → ~30s (10s swap + 20s inference)
|
||||
```
|
||||
|
||||
Works if <20% of queries are "complex" and users accept occasional slow responses.
|
||||
Best implemented with explicit user trigger ("think about this carefully") rather than
|
||||
automatic classification, to avoid swap overhead on misclassified queries.
|
||||
|
||||
---
|
||||
|
||||
## LangGraph Implementation
|
||||
|
||||
`create_react_agent` locks to one model. Explicit graph supports per-node models:
|
||||
|
||||
```python
|
||||
from langgraph.graph import StateGraph, MessagesState
|
||||
from langgraph.prebuilt import ToolNode
|
||||
from langchain_ollama import ChatOllama
|
||||
|
||||
router_model = ChatOllama(model="qwen2.5:0.5b", base_url=OLLAMA_GPU_URL)
|
||||
answer_model = ChatOllama(model="qwen3:4b", base_url=OLLAMA_GPU_URL)
|
||||
# For Option 3: large_model = ChatOllama(model="qwen3:8b", ...)
|
||||
|
||||
def router_node(state):
|
||||
# Small model only outputs tool call JSON or nothing
|
||||
return {"messages": [router_model.bind_tools(tools).invoke(state["messages"])]}
|
||||
|
||||
def answer_node(state):
|
||||
# Large(r) model generates human reply — no tools bound
|
||||
return {"messages": [answer_model.invoke(state["messages"])]}
|
||||
|
||||
def route(state) -> str:
|
||||
last = state["messages"][-1]
|
||||
return "tools" if getattr(last, "tool_calls", []) else "answer"
|
||||
|
||||
graph = StateGraph(MessagesState)
|
||||
graph.add_node("router", router_node)
|
||||
graph.add_node("tools", ToolNode(tools))
|
||||
graph.add_node("answer", answer_node)
|
||||
graph.set_entry_point("router")
|
||||
graph.add_conditional_edges("router", route)
|
||||
graph.add_edge("tools", "answer")
|
||||
graph.add_edge("answer", END)
|
||||
agent = graph.compile()
|
||||
```
|
||||
|
||||
For three-tier, add a complexity classifier node before the router that selects
|
||||
`answer_model = medium_model or large_model` based on query signals.
|
||||
|
||||
---
|
||||
|
||||
## Open Source Routing Tools
|
||||
|
||||
| Tool | Ollama support | Status | Notes |
|
||||
|------|---------------|--------|-------|
|
||||
| LiteLLM | First-class | Active 2025 | Proxy with tiered routing, fallbacks, load balancing |
|
||||
| RouteLLM (LMSYS) | Yes (documented) | Stale (last commit Aug 2024) | Calibrated for GPT-4 vs Mixtral pair |
|
||||
| Router-R1 | No | Active (NeurIPS 2025) | RL-based, open-sourced on HuggingFace |
|
||||
| LLMRouter (ulab) | No | Research 2025 | 16+ routing methods, fair comparison framework |
|
||||
| FrugalGPT | No direct | Algorithm only | Portkey.ai has implementation guide |
|
||||
|
||||
**Most practical for Ollama**: LiteLLM proxy with tiered model config. Handles routing,
|
||||
fallbacks, and load balancing without changing agent code.
|
||||
|
||||
---
|
||||
|
||||
## Summary: What to Do for Adolf
|
||||
|
||||
| | Recommendation |
|
||||
|--|---------------|
|
||||
| Quick win (zero risk) | Remove "always call search_memory" from system prompt — history buffer covers conversational recall, saves ~37s |
|
||||
| Best architecture for 8GB | Two-tier: qwen2.5:0.5b router + qwen3:4b answer, both in VRAM, ~22s total |
|
||||
| Three-tier feasibility | Not viable for interactive use with Ollama model swapping; viable with vLLM Sleep Mode (~3s swap) if Ollama is replaced |
|
||||
| Complex task routing | Use explicit user trigger or keyword detection rather than automatic classifier — avoids swap penalty on misclassification |
|
||||
|
||||
---
|
||||
|
||||
## References
|
||||
|
||||
- arXiv 2510.03847 — Small Language Models for Agentic Systems: A Survey
|
||||
- arXiv 2506.02153 — Small Language Models are the Future of Agentic AI
|
||||
- arXiv 2406.04692 — Mixture-of-Agents Enhances LLM Capabilities (original MoA paper)
|
||||
- arXiv 2410.10347 — A Unified Approach to Routing and Cascading for LLMs (ICLR 2025)
|
||||
- MasRouter — ACL 2025: https://aclanthology.org/2025.acl-long.757.pdf
|
||||
- Router-R1 — NeurIPS 2025: https://github.com/ulab-uiuc/Router-R1
|
||||
- vLLM Sleep Mode: https://blog.vllm.ai/2025/10/26/sleep-mode.html
|
||||
- NVIDIA GPU Memory Swap: https://developer.nvidia.com/blog/cut-model-deployment-costs-while-keeping-performance-with-gpu-memory-swap/
|
||||
- LangGraph multi-agent: https://langchain-ai.github.io/langgraph/tutorials/multi_agent/
|
||||
- LangGraph custom ReAct: https://langchain-ai.github.io/langgraph/how-tos/react-agent-from-scratch/
|
||||
- LiteLLM Ollama docs: https://docs.litellm.ai/docs/providers/ollama
|
||||
- RouteLLM + Ollama example: https://github.com/lm-sys/RouteLLM/blob/main/examples/routing_to_local_models.md
|
||||
- LLMRouter framework: https://ulab-uiuc.github.io/LLMRouter/
|
||||
- Functionary (tool-call fine-tuned): https://github.com/MeetKai/functionary
|
||||
- Constrained generation (outlines): https://github.com/dottxt-ai/outlines
|
||||
@@ -1,6 +0,0 @@
|
||||
FROM python:3.12-slim
|
||||
WORKDIR /app
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
COPY server.py .
|
||||
CMD ["python", "server.py"]
|
||||
@@ -1,6 +0,0 @@
|
||||
mem0ai
|
||||
ollama
|
||||
fastapi
|
||||
uvicorn
|
||||
mcp[cli]
|
||||
qdrant-client
|
||||
@@ -1,155 +0,0 @@
|
||||
import json
|
||||
import os
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
from mem0 import Memory
|
||||
|
||||
# Extraction LLM — GPU Ollama (qwen3:4b, same model as medium agent)
|
||||
# Runs after reply when GPU is idle; spin-wait in agent.py prevents contention
|
||||
OLLAMA_GPU_URL = os.getenv("OLLAMA_GPU_URL", "http://host.docker.internal:11436")
|
||||
|
||||
# Embedding — CPU Ollama (nomic-embed-text, 137 MB RAM)
|
||||
# Used for both search (50-150ms, acceptable) and store-time embedding
|
||||
OLLAMA_CPU_URL = os.getenv("OLLAMA_CPU_URL", "http://host.docker.internal:11435")
|
||||
|
||||
QDRANT_HOST = os.getenv("QDRANT_HOST", "host.docker.internal")
|
||||
QDRANT_PORT = int(os.getenv("QDRANT_PORT", "6333"))
|
||||
|
||||
# Change 2: Custom extraction prompt
|
||||
# /no_think disables qwen3 thinking tokens so output is clean JSON
|
||||
EXTRACTION_PROMPT = """/no_think
|
||||
You are a memory extraction assistant. Extract factual statements from a conversation that are worth remembering long-term.
|
||||
|
||||
Extract facts from BOTH user AND assistant messages, including:
|
||||
- User details, preferences, and personal information
|
||||
- User's plans, goals, and intentions
|
||||
- The assistant's name or persona (if set by the user or stated by the assistant)
|
||||
- Any commitments or agreements made
|
||||
- Key facts stated as true
|
||||
|
||||
Return ONLY valid JSON in this exact format:
|
||||
{"facts": ["fact 1", "fact 2"]}
|
||||
|
||||
If there are no facts worth storing, return: {"facts": []}
|
||||
|
||||
IMPORTANT rules:
|
||||
- Extract the EXACT concrete values mentioned. Never say "not known" or "unspecified".
|
||||
- If the user states their name, job, pet, city, allergy, or preference — store the exact value.
|
||||
- A single message may contain multiple facts — extract ALL of them.
|
||||
- Do NOT extract vague summaries. Extract specific facts with real values.
|
||||
|
||||
Examples:
|
||||
|
||||
Input: "User: I live in Berlin\nAssistant: Got it, you're in Berlin!"
|
||||
Output: {"facts": ["User lives in Berlin"]}
|
||||
|
||||
Input: "User: My name is Alice and I live in Tokyo\nAssistant: Nice to meet you Alice!"
|
||||
Output: {"facts": ["User's name is Alice", "User lives in Tokyo"]}
|
||||
|
||||
Input: "User: I work as a software engineer at a startup\nAssistant: Cool!"
|
||||
Output: {"facts": ["User works as a software engineer at a startup"]}
|
||||
|
||||
Input: "User: I have a cat named Whiskers\nAssistant: Whiskers is a cute name!"
|
||||
Output: {"facts": ["User has a cat named Whiskers"]}
|
||||
|
||||
Input: "User: I'm allergic to nuts\nAssistant: I'll remember that."
|
||||
Output: {"facts": ["User is allergic to nuts"]}
|
||||
|
||||
Input: "User: remember that your name is Adolf\nAssistant: My name is Adolf!"
|
||||
Output: {"facts": ["Assistant's name is Adolf"]}
|
||||
|
||||
Input: "User: what time is it?\nAssistant: I don't have access to real-time data."
|
||||
Output: {"facts": []}
|
||||
|
||||
Input: "User: I prefer dark mode\nAssistant: Noted, I'll keep that in mind."
|
||||
Output: {"facts": ["User prefers dark mode"]}
|
||||
|
||||
Now extract facts from this conversation:"""
|
||||
|
||||
# Update/dedup decision prompt — overrides mem0's default.
|
||||
# qwen2.5:1.5b struggles with the default multi-step reasoning; this version is
|
||||
# more explicit: list existing, list new, decide ADD/NONE per item.
|
||||
UPDATE_PROMPT = """/no_think
|
||||
You manage a memory store. Given EXISTING memories and NEW facts:
|
||||
- For each EXISTING memory: output NONE (no change) or UPDATE (if a new fact replaces it) or DELETE.
|
||||
- For each NEW fact: output ADD if it is not already covered by existing memories. Output NONE if it is already covered.
|
||||
- IMPORTANT: You MUST include ALL new facts in your output — either as ADD or NONE.
|
||||
- Output ONLY valid JSON, no explanation.
|
||||
|
||||
Example A — new fact is genuinely new:
|
||||
Existing: [{"id": "0", "text": "User lives in Berlin"}]
|
||||
New facts: ["User is allergic to nuts"]
|
||||
Output: {"memory": [{"id": "0", "text": "User lives in Berlin", "event": "NONE"}, {"id": "1", "text": "User is allergic to nuts", "event": "ADD"}]}
|
||||
|
||||
Example B — new fact updates an existing one:
|
||||
Existing: [{"id": "0", "text": "User lives in Berlin"}]
|
||||
New facts: ["User lives in Paris"]
|
||||
Output: {"memory": [{"id": "0", "text": "User lives in Paris", "event": "UPDATE", "old_memory": "User lives in Berlin"}]}
|
||||
|
||||
Example C — new fact already covered:
|
||||
Existing: [{"id": "0", "text": "User is allergic to nuts"}]
|
||||
New facts: ["User has a nut allergy"]
|
||||
Output: {"memory": [{"id": "0", "text": "User is allergic to nuts", "event": "NONE"}]}"""
|
||||
|
||||
config = {
|
||||
"llm": {
|
||||
"provider": "ollama",
|
||||
"config": {
|
||||
"model": "qwen3:4b",
|
||||
"ollama_base_url": OLLAMA_GPU_URL,
|
||||
"temperature": 0.1, # consistent JSON output
|
||||
},
|
||||
},
|
||||
"embedder": {
|
||||
"provider": "ollama",
|
||||
"config": {
|
||||
"model": "nomic-embed-text",
|
||||
"ollama_base_url": OLLAMA_CPU_URL, # CPU: 50-150ms per query, no GPU needed
|
||||
},
|
||||
},
|
||||
"vector_store": {
|
||||
"provider": "qdrant",
|
||||
"config": {
|
||||
"collection_name": "adolf_memories",
|
||||
"embedding_model_dims": 768,
|
||||
"host": QDRANT_HOST,
|
||||
"port": QDRANT_PORT,
|
||||
},
|
||||
},
|
||||
"custom_fact_extraction_prompt": EXTRACTION_PROMPT,
|
||||
"custom_update_memory_prompt": UPDATE_PROMPT,
|
||||
}
|
||||
|
||||
memory = Memory.from_config(config)
|
||||
|
||||
mcp = FastMCP("openmemory", host="0.0.0.0", port=8765)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def add_memory(text: str, user_id: str = "default") -> str:
|
||||
"""Store a memory for a user."""
|
||||
result = memory.add(text, user_id=user_id)
|
||||
# Change 3: return clean JSON instead of Python repr
|
||||
return json.dumps(result, default=str)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def search_memory(query: str, user_id: str = "default") -> str:
|
||||
"""Search memories for a user using semantic similarity."""
|
||||
results = memory.search(query, user_id=user_id, limit=10, threshold=0.3)
|
||||
# Filter to only return results with score >= 0.5 to avoid irrelevant noise
|
||||
if isinstance(results, dict) and "results" in results:
|
||||
results["results"] = [r for r in results["results"] if r.get("score", 0) >= 0.5]
|
||||
return json.dumps(results, default=str)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def get_all_memories(user_id: str = "default", limit: int = 50) -> str:
|
||||
"""Get stored memories for a user (up to limit)."""
|
||||
# Change 5: cap results to avoid flooding context
|
||||
results = memory.get_all(user_id=user_id, limit=limit)
|
||||
# Change 3: return clean JSON instead of Python repr
|
||||
return json.dumps(results, default=str)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
mcp.run(transport="sse")
|
||||
@@ -1,13 +0,0 @@
|
||||
# Potential Directions
|
||||
|
||||
## CPU Extraction Model Candidates (mem0 / openmemory)
|
||||
|
||||
Replacing `gemma3:1b` — documented JSON/structured output failures make it unreliable for mem0's extraction pipeline.
|
||||
|
||||
| Rank | Model | Size | CPU speed | JSON reliability | Notes |
|
||||
|------|-------|------|-----------|-----------------|-------|
|
||||
| 1 | `qwen2.5:1.5b` | ~934 MB | 25–40 tok/s | Excellent | Best fit: fast + structured output, 18T token training |
|
||||
| 2 | `qwen2.5:3b` | ~1.9 GB | 15–25 tok/s | Excellent | Quality upgrade, same family |
|
||||
| 3 | `llama3.2:3b` | ~2 GB | 15–25 tok/s | Good | Highest IFEval score (77.4) in class |
|
||||
| 4 | `smollm2:1.7b` | ~1.1 GB | 25–35 tok/s | Moderate | Use temp=0; NuExtract-1.5-smol is fine-tuned variant |
|
||||
| 5 | `phi4-mini` | ~2.5 GB | 10–17 tok/s | Good | Function calling support, borderline CPU speed |
|
||||
@@ -1,287 +0,0 @@
|
||||
# Reasoning & Self-Reflection in Local LLM Agents
|
||||
|
||||
Research-backed notes on implementing multi-stage reasoning for local 4-8B models (2025).
|
||||
|
||||
---
|
||||
|
||||
## TL;DR
|
||||
|
||||
For local 4-8B models, **programmatic self-critique loops rarely justify their cost**.
|
||||
Native thinking tokens (Qwen3 `enable_thinking=True`) or external verifiers
|
||||
give better results at lower complexity. See bottom of this file for recommendations.
|
||||
|
||||
---
|
||||
|
||||
## Reasoning Patterns
|
||||
|
||||
### Chain-of-Thought (CoT)
|
||||
|
||||
Single forward pass, model thinks step-by-step before answering.
|
||||
Zero implementation cost — just a prompt change.
|
||||
Typical gain: +5-10pp on multi-step tasks vs no CoT.
|
||||
No latency overhead beyond the extra output tokens.
|
||||
|
||||
### Reflexion (Shinn et al., NeurIPS 2023)
|
||||
|
||||
Multiple complete attempts. After each attempt, the model writes a textual critique
|
||||
of what went wrong and stores it in episodic memory. Next attempt is conditioned on
|
||||
that memory.
|
||||
|
||||
```
|
||||
attempt 1 → fail → write critique → attempt 2 (reads critique) → ...
|
||||
```
|
||||
|
||||
Key results (GPT-4): HumanEval 80% → 91% pass@1.
|
||||
Cost: N complete task executions. At 30s/attempt, 5 trials = 2.5 minutes.
|
||||
Implementation: https://github.com/noahshinn/reflexion
|
||||
|
||||
### Reflection Loop (in-turn revision)
|
||||
|
||||
Within a single turn: generate → critique → revise → [repeat].
|
||||
Simpler than Reflexion. More common in practice.
|
||||
|
||||
```
|
||||
Generate → Critique → Revise → [stop condition]
|
||||
```
|
||||
|
||||
Stop condition options: max iterations, score threshold, external verifier passes.
|
||||
|
||||
### ReAct + Reflect
|
||||
|
||||
Standard ReAct (Reason + Act) with an added Reflect step after failed tool calls.
|
||||
Most common production pattern. Adds 1-3 extra LLM calls per failed action.
|
||||
|
||||
### Tree of Thoughts (ToT)
|
||||
|
||||
Explore N reasoning branches simultaneously, evaluate each node, BFS/DFS search.
|
||||
Branching factor 3, depth 3 = 54 LLM calls per problem. Prohibitive for local models.
|
||||
Works only if the model has strong self-evaluation capability (typically ≥32B).
|
||||
ToTRL-trained Qwen3-8B achieved 0.633 on AIME 2025 — but required training-time RL, not
|
||||
a prompt trick.
|
||||
|
||||
### Graph of Thoughts (GoT)
|
||||
|
||||
Generalizes ToT to arbitrary DAGs: thoughts can merge, split, or loop.
|
||||
62% improvement in sorting vs ToT, 31% cost reduction.
|
||||
Implementation: https://github.com/spcl/graph-of-thoughts
|
||||
Higher complexity than ToT; graph structure is problem-specific.
|
||||
|
||||
---
|
||||
|
||||
## Native Thinking Tokens (vs Programmatic Reflection)
|
||||
|
||||
Open models with built-in reasoning scratchpads:
|
||||
|
||||
| Model | Size | Ollama | Toggle | Notes |
|
||||
|-------|------|--------|--------|-------|
|
||||
| Qwen3 | 0.6B–235B | Yes | enable_thinking / think=True/False | Best option for local use |
|
||||
| Qwen3-4B-Thinking-2507 | 4B | Yes | Always on | Dedicated thinking variant |
|
||||
| QwQ-32B | 32B | Yes | Always on | Strong reasoning, needs VRAM |
|
||||
| DeepSeek-R1 distills | 1.5B–70B | Yes | Always on | Llama/Qwen base |
|
||||
|
||||
### Qwen3 thinking toggle in Ollama / LangChain
|
||||
|
||||
```python
|
||||
# LangChain
|
||||
model = ChatOllama(model="qwen3:4b", think=True, num_ctx=8192)
|
||||
|
||||
# Prompt-level (Ollama API)
|
||||
# /think — enable per-request
|
||||
# /no_think — disable per-request
|
||||
```
|
||||
|
||||
Latency: thinking mode is 2-3x slower in wall-clock time (model generates internal
|
||||
`<think>...</think>` tokens before answering). Qwen3-VL 8B Thinking: 262s vs 65s on a
|
||||
complex visual reasoning task — but meaningfully better output.
|
||||
|
||||
### Native thinking vs programmatic loop
|
||||
|
||||
| | Native thinking | Programmatic multi-stage |
|
||||
|--|----------------|------------------------|
|
||||
| API calls | 1 | N (rounds × 2) |
|
||||
| Implementation | Zero | Significant |
|
||||
| Quality on 4-8B | Good (capability in weights) | Poor (weak model critiques itself) |
|
||||
| Transparency | Opaque (one streamed block) | Inspectable per stage |
|
||||
| Controllability | thinking_budget only | Full control |
|
||||
| Latency | 2-3x tokens, 1 call | N × base latency |
|
||||
|
||||
**For local 4-8B: native thinking almost always beats a hand-coded reflection loop.**
|
||||
|
||||
---
|
||||
|
||||
## Does Programmatic Reflection Work on Small Models?
|
||||
|
||||
Short answer: **mostly no without external verification**.
|
||||
|
||||
From the research (2024-2025):
|
||||
|
||||
- **"When Hindsight is Not 20/20" (arXiv 2404.09129)**: Self-reflection often makes
|
||||
small models worse. A model that generated an error also lacks the capability to
|
||||
identify it. It confidently accepts flawed reasoning on re-reading.
|
||||
|
||||
- **THINKSLM (EMNLP 2025)**: Inference-time self-critique on Llama-3.1-8B is unreliable.
|
||||
Training-time distilled reasoning traces help; prompt-based self-critique does not.
|
||||
|
||||
- **Nature 2025 study**: Large gains (GPT-4: +18.5pp) diminish sharply for smaller models.
|
||||
|
||||
- **Latency cost**: Each reflection round on a local 8B adds 5-30s. A 3-round loop
|
||||
= 3x latency for 0-5% gain (or regression) on most tasks.
|
||||
|
||||
### When it actually helps on small models
|
||||
|
||||
1. **External verifier**: model doesn't self-evaluate — it reads objective pass/fail
|
||||
feedback (unit tests, JSON schema checker, math verifier, search result grader).
|
||||
Most reliable pattern. No self-evaluation capability required.
|
||||
|
||||
2. **Stronger critic**: generate with 4B, critique with 32B or API model. Hybrid approach.
|
||||
|
||||
3. **Native thinking weights**: reflection happens in a single forward pass with
|
||||
trained weights. Far more reliable than prompt-based self-critique.
|
||||
|
||||
4. **Structured error types**: code syntax, JSON validity, regex match — computable
|
||||
error signal, not linguistic self-assessment.
|
||||
|
||||
---
|
||||
|
||||
## LangGraph Reflection Loop Implementation
|
||||
|
||||
LangGraph is suited for this because it supports cyclic graphs with state.
|
||||
|
||||
### Minimal reflection graph
|
||||
|
||||
```python
|
||||
from langgraph.graph import StateGraph, START, END, MessagesState
|
||||
from langchain_ollama import ChatOllama
|
||||
|
||||
llm = ChatOllama(model="qwen3:4b", think=False)
|
||||
critic_llm = ChatOllama(model="qwen3:4b", think=True) # or stronger model
|
||||
|
||||
MAX_REFLECTIONS = 2
|
||||
|
||||
def generate(state):
|
||||
response = llm.invoke(state["messages"])
|
||||
return {"messages": [response], "iterations": state.get("iterations", 0)}
|
||||
|
||||
def reflect(state):
|
||||
critique = critic_llm.invoke(
|
||||
[{"role": "system", "content": "Critique this response. Be specific about errors."}]
|
||||
+ state["messages"]
|
||||
)
|
||||
return {
|
||||
"messages": [{"role": "user", "content": critique.content}],
|
||||
"iterations": state["iterations"] + 1,
|
||||
}
|
||||
|
||||
def should_reflect(state) -> str:
|
||||
if state.get("iterations", 0) >= MAX_REFLECTIONS:
|
||||
return END
|
||||
# Optionally: check external verifier here
|
||||
return "reflect"
|
||||
|
||||
graph = StateGraph(MessagesState)
|
||||
graph.add_node("generate", generate)
|
||||
graph.add_node("reflect", reflect)
|
||||
graph.add_edge(START, "generate")
|
||||
graph.add_conditional_edges("generate", should_reflect)
|
||||
graph.add_edge("reflect", "generate")
|
||||
agent = graph.compile()
|
||||
```
|
||||
|
||||
### Self-Correcting RAG (CRAG pattern)
|
||||
|
||||
```
|
||||
Retrieve → Grade documents → [rewrite query if bad] → Generate → Grade answer → [loop or END]
|
||||
```
|
||||
|
||||
The document grader and answer grader are the "external verifiers" — they do
|
||||
objective quality checks rather than linguistic self-critique.
|
||||
LangChain tutorial: https://learnopencv.com/langgraph-self-correcting-agent-code-generation/
|
||||
|
||||
---
|
||||
|
||||
## Alternative Tooling
|
||||
|
||||
### DSPy (recommended for pipeline optimization)
|
||||
|
||||
DSPy treats prompts as learnable parameters. Define input/output signatures, run an
|
||||
optimizer on examples, and DSPy auto-tunes prompts for your specific model.
|
||||
|
||||
```python
|
||||
import dspy
|
||||
lm = dspy.LM('ollama_chat/qwen3:4b', api_base='http://localhost:11434', api_key='')
|
||||
dspy.configure(lm=lm)
|
||||
|
||||
class Reflect(dspy.Module):
|
||||
def __init__(self):
|
||||
self.gen = dspy.ChainOfThought("question -> answer")
|
||||
self.critique = dspy.ChainOfThought("question, answer -> critique, improved_answer")
|
||||
def forward(self, question):
|
||||
first = self.gen(question=question)
|
||||
return self.critique(question=question, answer=first.answer).improved_answer
|
||||
```
|
||||
|
||||
Works with Ollama. Optimizer (BootstrapFewShot, MIPRO) tunes prompts automatically
|
||||
but requires multiple LLM calls per training example — slow on local hardware.
|
||||
|
||||
### Outlines (structured output)
|
||||
|
||||
Constrained decoding — guarantees valid JSON/regex output from any model.
|
||||
Use this inside a reflection loop to ensure the critic always returns structured feedback.
|
||||
Works with Ollama via OpenAI-compatible API.
|
||||
https://dottxt-ai.github.io/outlines/
|
||||
|
||||
### SGLang
|
||||
|
||||
High-performance GPU serving runtime (replaces Ollama for GPU inference).
|
||||
Natively understands `<think>...</think>` tokens, caches KV-prefix across reflection
|
||||
rounds (RadixAttention). If you replace Ollama with SGLang: reflection loops become
|
||||
significantly cheaper because repeated prompt prefixes are cache-hit.
|
||||
https://github.com/sgl-project/sglang
|
||||
|
||||
---
|
||||
|
||||
## Benchmarks Summary
|
||||
|
||||
| Setup | Task | Quality Gain | Latency Cost |
|
||||
|-------|------|-------------|-------------|
|
||||
| GPT-4 + Reflexion | HumanEval | +11pp (80→91%) | ~5x |
|
||||
| GPT-4 + reflection | Problem solving | +18.5pp | ~3x |
|
||||
| Llama-7B + programmatic self-critique | Math | +7.1% | ~3x |
|
||||
| Local 8B + same-model critique (typical) | General | 0-5% (often regression) | 2-3x |
|
||||
| Qwen3-8B + native thinking | AIME 2025 | Matches models 10x larger | 2-3x tokens |
|
||||
| Any model + external verifier (tests) | Code | +15-26pp | 1.5-2x |
|
||||
|
||||
---
|
||||
|
||||
## Practical Recommendations for Adolf (local qwen3:4b / 8b)
|
||||
|
||||
| Goal | Approach | Cost |
|
||||
|------|----------|------|
|
||||
| Better reasoning on hard questions | `think=True` in ChatOllama | 2-3x latency, zero code |
|
||||
| Code/JSON correctness | External verifier (schema check, exec) + retry loop | +1 LLM call on failure |
|
||||
| Complex multi-step tasks | Route to qwen3:8b with `think=True` | model swap + 2-3x tokens |
|
||||
| Full reflection loop | Only if using stronger critic model or external verifier | significant complexity |
|
||||
| Avoid | Programmatic self-critique using same 4-8B model as critic | adds latency, no gain |
|
||||
|
||||
---
|
||||
|
||||
## References
|
||||
|
||||
- Reflexion (Shinn et al., NeurIPS 2023): https://arxiv.org/abs/2303.11366
|
||||
- Tree of Thoughts: https://arxiv.org/abs/2305.10601
|
||||
- ToTRL (Qwen3 RL training): https://arxiv.org/html/2505.12717v1
|
||||
- Graph of Thoughts: https://arxiv.org/abs/2308.09687
|
||||
- Adaptive GoT (2025): https://arxiv.org/pdf/2502.05078
|
||||
- When Hindsight is Not 20/20: https://arxiv.org/html/2404.09129v1
|
||||
- THINKSLM (EMNLP 2025): https://aclanthology.org/2025.emnlp-main.1659.pdf
|
||||
- MAR — Multi-Agent Reflexion: https://arxiv.org/html/2512.20845
|
||||
- Qwen3 technical report: https://arxiv.org/pdf/2505.09388
|
||||
- Qwen3 thinking cost measurement: https://medium.com/@frankmorales_91352/the-computational-cost-of-cognitive-depth-qwen3-vl-8b-instruct-vs-thinking-2517b677ba29
|
||||
- DeepSeek-R1: https://arxiv.org/abs/2501.12948
|
||||
- LangChain reflection blog: https://blog.langchain.com/reflection-agents/
|
||||
- LangGraph CRAG: https://learnopencv.com/langgraph-self-correcting-agent-code-generation/
|
||||
- DSPy: https://dspy.ai/
|
||||
- Outlines: https://dottxt-ai.github.io/outlines/
|
||||
- SGLang: https://github.com/sgl-project/sglang
|
||||
- graph-of-thoughts: https://github.com/spcl/graph-of-thoughts
|
||||
- reflexion (original code): https://github.com/noahshinn/reflexion
|
||||
140
adolf/router.py
140
adolf/router.py
@@ -1,140 +0,0 @@
|
||||
import re
|
||||
from typing import Optional
|
||||
from langchain_core.messages import SystemMessage, HumanMessage
|
||||
|
||||
# ── Regex pre-classifier ──────────────────────────────────────────────────────
|
||||
# Catches obvious light-tier patterns before calling the LLM.
|
||||
# Keyed by regex → compiled pattern.
|
||||
_LIGHT_PATTERNS = re.compile(
|
||||
r"^("
|
||||
# Greetings / farewells
|
||||
r"hi|hello|hey|yo|sup|howdy|good morning|good evening|good night|good afternoon"
|
||||
r"|bye|goodbye|see you|cya|later|ttyl"
|
||||
# Acknowledgements / small talk
|
||||
r"|thanks?|thank you|thx|ty|ok|okay|k|cool|great|awesome|perfect|sounds good|got it|nice|sure"
|
||||
r"|how are you|how are you\?|how are you doing(\s+today)?[?!.]*"
|
||||
r"|what.?s up"
|
||||
# Calendar facts: "what day comes after X?" / "what comes after X?"
|
||||
r"|what\s+day\s+(comes\s+after|follows|is\s+after)\s+\w+[?!.]*"
|
||||
r"|what\s+comes\s+after\s+\w+[?!.]*"
|
||||
# Acronym expansions: "what does X stand for?"
|
||||
r"|what\s+does\s+\w+\s+stand\s+for[?!.]*"
|
||||
r")[\s!.?]*$",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
# ── LLM classification prompt ─────────────────────────────────────────────────
|
||||
CLASSIFY_PROMPT = """Classify the message. Output ONLY one word: light, medium, or complex.
|
||||
|
||||
LIGHT = answerable from general knowledge, no internet needed:
|
||||
what is 2+2 / what is the capital of France / name the three primary colors
|
||||
tell me a short joke / is the sky blue / is water wet
|
||||
|
||||
MEDIUM = requires web search or the user's stored memories:
|
||||
current weather / today's news / Bitcoin price / what did we talk about
|
||||
what is my name / where do I live / what is my job / do I have any pets
|
||||
what do you know about me / what are my preferences / what did I tell you
|
||||
|
||||
COMPLEX = /think prefix only:
|
||||
/think compare frameworks / /think plan a trip
|
||||
|
||||
Message: {message}
|
||||
Output (one word only — light, medium, or complex):"""
|
||||
|
||||
LIGHT_REPLY_PROMPT = """You are a helpful Telegram assistant. Answer briefly and naturally (1-3 sentences). Be friendly."""
|
||||
|
||||
|
||||
def _format_history(history: list[dict]) -> str:
|
||||
if not history:
|
||||
return "(none)"
|
||||
lines = []
|
||||
for msg in history:
|
||||
role = msg.get("role", "?")
|
||||
content = str(msg.get("content", ""))[:200]
|
||||
lines.append(f"{role}: {content}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _parse_tier(text: str) -> str:
|
||||
"""Extract tier from raw model output. Default to medium."""
|
||||
t = text.strip().lower()
|
||||
snippet = t[:60]
|
||||
if "complex" in snippet:
|
||||
return "complex"
|
||||
if "medium" in snippet:
|
||||
return "medium"
|
||||
if "light" in snippet:
|
||||
return "light"
|
||||
# Model invented a descriptive category (e.g. "simplefact", "trivial", "basic") →
|
||||
# treat as light since it recognised the question doesn't need tools
|
||||
if any(w in snippet for w in ("simple", "fact", "trivial", "basic", "easy", "general")):
|
||||
return "light"
|
||||
return "medium" # safe default
|
||||
|
||||
|
||||
class Router:
|
||||
def __init__(self, model):
|
||||
self.model = model
|
||||
|
||||
async def route(
|
||||
self,
|
||||
message: str,
|
||||
history: list[dict],
|
||||
force_complex: bool = False,
|
||||
) -> tuple[str, Optional[str]]:
|
||||
"""
|
||||
Returns (tier, reply_or_None).
|
||||
For light tier: also generates the reply with a second call.
|
||||
For medium/complex: reply is None.
|
||||
"""
|
||||
if force_complex:
|
||||
return "complex", None
|
||||
|
||||
# Step 0: regex pre-classification for obvious light patterns
|
||||
if _LIGHT_PATTERNS.match(message.strip()):
|
||||
print(f"[router] regex→light", flush=True)
|
||||
return await self._generate_light_reply(message, history)
|
||||
|
||||
# Step 1: LLM classification with raw text output
|
||||
try:
|
||||
classify_response = await self.model.ainvoke([
|
||||
HumanMessage(content=CLASSIFY_PROMPT.format(message=message)),
|
||||
])
|
||||
raw = classify_response.content or ""
|
||||
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
||||
tier = _parse_tier(raw)
|
||||
|
||||
if tier == "complex" and not message.startswith("/think"):
|
||||
tier = "medium"
|
||||
|
||||
print(f"[router] raw={raw[:30]!r} → tier={tier}", flush=True)
|
||||
except Exception as e:
|
||||
print(f"[router] classify error, defaulting to medium: {e}", flush=True)
|
||||
return "medium", None
|
||||
|
||||
if tier != "light":
|
||||
return tier, None
|
||||
|
||||
return await self._generate_light_reply(message, history)
|
||||
|
||||
async def _generate_light_reply(
|
||||
self, message: str, history: list[dict]
|
||||
) -> tuple[str, Optional[str]]:
|
||||
"""Generate a short reply using the router model for light-tier messages."""
|
||||
history_text = _format_history(history)
|
||||
context = f"\nConversation history:\n{history_text}" if history else ""
|
||||
try:
|
||||
reply_response = await self.model.ainvoke([
|
||||
SystemMessage(content=LIGHT_REPLY_PROMPT + context),
|
||||
HumanMessage(content=message),
|
||||
])
|
||||
reply_text = reply_response.content or ""
|
||||
reply_text = re.sub(r"<think>.*?</think>", "", reply_text, flags=re.DOTALL).strip()
|
||||
if not reply_text:
|
||||
print("[router] light reply empty, falling back to medium", flush=True)
|
||||
return "medium", None
|
||||
print(f"[router] light reply: {len(reply_text)} chars", flush=True)
|
||||
return "light", reply_text
|
||||
except Exception as e:
|
||||
print(f"[router] light reply error, falling back to medium: {e}", flush=True)
|
||||
return "medium", None
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,71 +0,0 @@
|
||||
import asyncio
|
||||
import os
|
||||
import httpx
|
||||
|
||||
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
|
||||
|
||||
|
||||
class VRAMManager:
|
||||
MEDIUM_MODELS = ["qwen3:4b", "qwen2.5:1.5b"]
|
||||
COMPLEX_MODEL = "qwen3:8b"
|
||||
|
||||
def __init__(self, base_url: str = OLLAMA_BASE_URL):
|
||||
self.base_url = base_url
|
||||
|
||||
async def enter_complex_mode(self) -> bool:
|
||||
"""Flush medium models before loading 8b. Returns False if eviction timed out."""
|
||||
print("[vram] enter_complex_mode: flushing medium models", flush=True)
|
||||
await asyncio.gather(*[self._flush(m) for m in self.MEDIUM_MODELS])
|
||||
ok = await self._poll_evicted(self.MEDIUM_MODELS, timeout=15)
|
||||
if ok:
|
||||
print("[vram] enter_complex_mode: eviction confirmed, loading qwen3:8b", flush=True)
|
||||
else:
|
||||
print("[vram] enter_complex_mode: eviction timeout — falling back to medium", flush=True)
|
||||
return ok
|
||||
|
||||
async def exit_complex_mode(self):
|
||||
"""Flush 8b and pre-warm medium models. Run as background task after complex reply."""
|
||||
print("[vram] exit_complex_mode: flushing qwen3:8b", flush=True)
|
||||
await self._flush(self.COMPLEX_MODEL)
|
||||
print("[vram] exit_complex_mode: pre-warming medium models", flush=True)
|
||||
await asyncio.gather(*[self._prewarm(m) for m in self.MEDIUM_MODELS])
|
||||
print("[vram] exit_complex_mode: done", flush=True)
|
||||
|
||||
async def _flush(self, model: str):
|
||||
"""Send keep_alive=0 to force immediate unload from VRAM."""
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
await client.post(
|
||||
f"{self.base_url}/api/generate",
|
||||
json={"model": model, "prompt": "", "keep_alive": 0},
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"[vram] flush {model} error: {e}", flush=True)
|
||||
|
||||
async def _poll_evicted(self, models: list[str], timeout: float) -> bool:
|
||||
"""Poll /api/ps until none of the given models appear (or timeout)."""
|
||||
deadline = asyncio.get_event_loop().time() + timeout
|
||||
while asyncio.get_event_loop().time() < deadline:
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||
resp = await client.get(f"{self.base_url}/api/ps")
|
||||
data = resp.json()
|
||||
loaded = {m.get("name", "") for m in data.get("models", [])}
|
||||
if not any(m in loaded for m in models):
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[vram] poll_evicted error: {e}", flush=True)
|
||||
await asyncio.sleep(0.5)
|
||||
return False
|
||||
|
||||
async def _prewarm(self, model: str):
|
||||
"""Load model into VRAM with keep_alive=300 (5 min)."""
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=60.0) as client:
|
||||
await client.post(
|
||||
f"{self.base_url}/api/generate",
|
||||
json={"model": model, "prompt": "", "keep_alive": 300},
|
||||
)
|
||||
print(f"[vram] pre-warmed {model}", flush=True)
|
||||
except Exception as e:
|
||||
print(f"[vram] prewarm {model} error: {e}", flush=True)
|
||||
@@ -1,278 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Wiki Research Pipeline — searches the web for each person/place in the family wiki.
|
||||
|
||||
Uses Adolf's complex agent (/think prefix → qwen3:8b + web_search) to research
|
||||
each subject and aggregates findings into research.md.
|
||||
|
||||
Usage:
|
||||
python3 wiki_research.py [--subject "Name"] [--dry-run] [--timeout 300]
|
||||
[--output PATH]
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# ── config ─────────────────────────────────────────────────────────────────────
|
||||
GATEWAY = "http://localhost:8000"
|
||||
WIKI_ROOT = Path("/mnt/ssd/dbs/otter/app-data/repository")
|
||||
DEFAULT_OUTPUT = WIKI_ROOT / "research.md"
|
||||
|
||||
PASS = "\033[32mPASS\033[0m"
|
||||
FAIL = "\033[31mFAIL\033[0m"
|
||||
INFO = "\033[36mINFO\033[0m"
|
||||
|
||||
|
||||
# ── helpers ────────────────────────────────────────────────────────────────────
|
||||
|
||||
def post_message(text: str, session_id: str, timeout: int = 10) -> int:
|
||||
payload = json.dumps({
|
||||
"text": text,
|
||||
"session_id": session_id,
|
||||
"channel": "cli",
|
||||
"user_id": "wiki-pipeline",
|
||||
}).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{GATEWAY}/message",
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=timeout) as r:
|
||||
return r.status
|
||||
|
||||
|
||||
def wait_for_reply(label: str, session_id: str, timeout_s: int = 300) -> str | None:
|
||||
"""Open SSE stream on /reply/{session_id} and return reply text, or None on timeout."""
|
||||
req = urllib.request.Request(
|
||||
f"{GATEWAY}/reply/{urllib.parse.quote(session_id, safe='')}",
|
||||
headers={"Accept": "text/event-stream"},
|
||||
)
|
||||
t0 = time.monotonic()
|
||||
tick = 0
|
||||
deadline = t0 + timeout_s
|
||||
|
||||
# Show progress while waiting (SSE blocks until reply is ready)
|
||||
print(f"\r [{label}] waiting... ", end="", flush=True)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=timeout_s + 30) as r:
|
||||
for raw_line in r:
|
||||
elapsed = time.monotonic() - t0
|
||||
line = raw_line.decode("utf-8").rstrip("\n")
|
||||
if line.startswith("data:"):
|
||||
text = line[5:].strip().replace("\\n", "\n")
|
||||
print(f"\r [{label}] done after {elapsed:.0f}s{' ' * 30}")
|
||||
if text == "[timeout]":
|
||||
return None
|
||||
return text
|
||||
tick += 1
|
||||
rem = int(deadline - time.monotonic())
|
||||
print(f"\r [{label}] {elapsed:.0f}s elapsed, {rem}s left — waiting... ",
|
||||
end="", flush=True)
|
||||
except Exception as e:
|
||||
print(f"\r [{label}] SSE error: {e}{' ' * 30}")
|
||||
|
||||
print(f"\r [{label}] TIMEOUT after {timeout_s}s{' ' * 30}")
|
||||
return None
|
||||
|
||||
|
||||
# ── wiki parsing ───────────────────────────────────────────────────────────────
|
||||
|
||||
def slugify(name: str) -> str:
|
||||
s = name.lower()
|
||||
s = re.sub(r"[^\w\s-]", "", s)
|
||||
s = re.sub(r"\s+", "-", s.strip())
|
||||
return s[:60]
|
||||
|
||||
|
||||
def parse_wiki_file(path: Path):
|
||||
try:
|
||||
text = path.read_text(encoding="utf-8")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
lines = text.splitlines()
|
||||
name = None
|
||||
context_parts = []
|
||||
|
||||
for line in lines[:50]:
|
||||
stripped = line.strip()
|
||||
if not name and stripped.startswith("# "):
|
||||
name = stripped[2:].strip()
|
||||
continue
|
||||
if name:
|
||||
if stripped.startswith("[![") or stripped.startswith("!["):
|
||||
continue
|
||||
if stripped:
|
||||
context_parts.append(stripped)
|
||||
if len(context_parts) >= 20:
|
||||
break
|
||||
|
||||
if not name:
|
||||
return None
|
||||
return name, "\n".join(context_parts)
|
||||
|
||||
|
||||
def discover_subjects(wiki_root: Path):
|
||||
subjects = []
|
||||
for subdir in ["люди", "места"]:
|
||||
folder = wiki_root / subdir
|
||||
if not folder.exists():
|
||||
continue
|
||||
for md_file in sorted(folder.glob("*.md")):
|
||||
result = parse_wiki_file(md_file)
|
||||
if result:
|
||||
name, context = result
|
||||
subjects.append((name, context, subdir))
|
||||
return subjects
|
||||
|
||||
|
||||
# ── output ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def load_existing_names(output_path: Path) -> set:
|
||||
if not output_path.exists():
|
||||
return set()
|
||||
return set(re.findall(r"^## (.+)$", output_path.read_text(encoding="utf-8"), re.MULTILINE))
|
||||
|
||||
|
||||
def init_output(output_path: Path, total: int):
|
||||
if not output_path.exists():
|
||||
output_path.write_text(
|
||||
f"# Wiki Research Results\n\n"
|
||||
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
|
||||
f"Subjects: {total}\n\n---\n\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
|
||||
def append_result(output_path: Path, name: str, elapsed: float, reply_text: str):
|
||||
date_str = datetime.now().strftime("%Y-%m-%d")
|
||||
block = (
|
||||
f"## {name}\n\n"
|
||||
f"**Searched**: {date_str} **Elapsed**: {elapsed:.0f}s\n\n"
|
||||
f"{reply_text or '_No reply captured._'}\n\n---\n\n"
|
||||
)
|
||||
with open(output_path, "a", encoding="utf-8") as f:
|
||||
f.write(block)
|
||||
|
||||
|
||||
# ── research prompt ────────────────────────────────────────────────────────────
|
||||
|
||||
def build_prompt(name: str, context: str, subdir: str) -> str:
|
||||
kind = "person" if subdir == "люди" else "place"
|
||||
return (
|
||||
f"/think You are researching a {kind} for a private family wiki. "
|
||||
f"Find everything publicly available. Be thorough and specific.\n\n"
|
||||
f"**Subject**: {name}\n"
|
||||
f"**Known context** (from the family wiki — do NOT just repeat this):\n{context}\n\n"
|
||||
f"**Research instructions** (MUST follow exactly):\n"
|
||||
f"1. Call web_search, then IMMEDIATELY call fetch_url on every URL found in results.\n"
|
||||
f"2. You MUST call fetch_url at least 5 times — do not write the report until you have.\n"
|
||||
f"3. Priority URLs to fetch: Google Scholar profile, ResearchGate, IEEE Xplore, LinkedIn, employer page.\n"
|
||||
f"4. Run searches in English AND Russian/Latvian.\n"
|
||||
f"5. After fetching pages, derive follow-up searches from what you find.\n\n"
|
||||
f"**Output format** (required):\n"
|
||||
f"- Use markdown with sections: Overview, Education, Career, Publications, "
|
||||
f"Online Presence, Interesting Findings, Not Found\n"
|
||||
f"- Every fact must have a source link: [fact](url)\n"
|
||||
f"- Include actual URLs to profiles, papers, articles found\n"
|
||||
f"- 'Interesting Findings': non-trivial facts not in the wiki context above\n"
|
||||
f"- Last line must be: **Sources checked: N** (count of URLs you fetched with fetch_url)\n\n"
|
||||
f'If truly nothing is found publicly, say "No public information found." '
|
||||
f"but only after exhausting all search angles."
|
||||
)
|
||||
|
||||
|
||||
# ── main ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Wiki research pipeline")
|
||||
parser.add_argument("--subject", help="Single subject (substring match)")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Print prompts, don't send")
|
||||
parser.add_argument("--timeout", type=int, default=300, help="Per-subject timeout (s)")
|
||||
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT, help="Output file")
|
||||
args = parser.parse_args()
|
||||
|
||||
subjects = discover_subjects(WIKI_ROOT)
|
||||
if not subjects:
|
||||
print(f"[{FAIL}] No subjects found in {WIKI_ROOT}")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"[{INFO}] Discovered {len(subjects)} subjects")
|
||||
|
||||
if args.subject:
|
||||
needle = args.subject.lower()
|
||||
subjects = [(n, c, s) for n, c, s in subjects if needle in n.lower()]
|
||||
if not subjects:
|
||||
print(f"[{FAIL}] No subject matching '{args.subject}'")
|
||||
sys.exit(1)
|
||||
print(f"[{INFO}] Filtered to {len(subjects)} subject(s)")
|
||||
|
||||
if args.dry_run:
|
||||
for name, context, subdir in subjects:
|
||||
print(f"\n{'='*60}\nSUBJECT: {name} ({subdir})")
|
||||
print(f"PROMPT:\n{build_prompt(name, context, subdir)}")
|
||||
return
|
||||
|
||||
init_output(args.output, len(subjects))
|
||||
existing = load_existing_names(args.output)
|
||||
print(f"[{INFO}] Output: {args.output} ({len(existing)} already done)")
|
||||
|
||||
total = len(subjects)
|
||||
done = 0
|
||||
failed = []
|
||||
|
||||
for idx, (name, context, subdir) in enumerate(subjects, 1):
|
||||
if name in existing:
|
||||
print(f"[{idx}/{total}] SKIP {name} (already in output)")
|
||||
done += 1
|
||||
continue
|
||||
|
||||
prompt = build_prompt(name, context, subdir)
|
||||
session_id = f"wiki-{slugify(name)}"
|
||||
label = f"{idx}/{total}"
|
||||
|
||||
print(f"\n[{label}] {name}")
|
||||
|
||||
try:
|
||||
status = post_message(prompt, session_id, timeout=10)
|
||||
if status != 202:
|
||||
print(f" [{FAIL}] Unexpected status {status}")
|
||||
failed.append(name)
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f" [{FAIL}] POST failed: {e}")
|
||||
failed.append(name)
|
||||
continue
|
||||
|
||||
t0 = time.monotonic()
|
||||
reply_text = wait_for_reply(label, session_id, timeout_s=args.timeout)
|
||||
elapsed = time.monotonic() - t0
|
||||
|
||||
if reply_text is None:
|
||||
print(f" [{FAIL}] Timeout")
|
||||
failed.append(name)
|
||||
append_result(args.output, name, elapsed, "_Research timed out._")
|
||||
continue
|
||||
|
||||
print(f" [{PASS}] {elapsed:.0f}s — {len(reply_text)} chars")
|
||||
append_result(args.output, name, elapsed, reply_text)
|
||||
done += 1
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Done: {done}/{total}")
|
||||
if failed:
|
||||
print(f"Failed ({len(failed)}): {', '.join(failed)}")
|
||||
print(f"Output: {args.output}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user