import asyncio import json as _json_module import os import time from contextlib import asynccontextmanager, nullcontext from pathlib import Path from fastapi import FastAPI, BackgroundTasks, Request from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel import re as _re import httpx as _httpx _URL_RE = _re.compile(r'https?://[^\s<>"\']+') def _extract_urls(text: str) -> list[str]: return _URL_RE.findall(text) from openai import AsyncOpenAI from langchain_openai import ChatOpenAI from langchain_mcp_adapters.client import MultiServerMCPClient from langchain_community.utilities import SearxSearchWrapper from langchain_core.tools import Tool from vram_manager import VRAMManager from router import Router from agent_factory import build_medium_agent, build_complex_agent from fast_tools import FastToolRunner, WeatherTool, CommuteTool import channels # LiteLLM proxy — all LLM inference goes through here LITELLM_URL = os.getenv("LITELLM_URL", "http://host.docker.internal:4000/v1") LITELLM_API_KEY = os.getenv("LITELLM_API_KEY", "dummy") # Direct Ollama URL — used only by VRAMManager for flush/prewarm/poll OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") ROUTER_MODEL = os.getenv("DEEPAGENTS_ROUTER_MODEL", "qwen2.5:1.5b") MEDIUM_MODEL = os.getenv("DEEPAGENTS_MODEL", "qwen3:4b") COMPLEX_MODEL = os.getenv("DEEPAGENTS_COMPLEX_MODEL", "qwen3:8b") SEARXNG_URL = os.getenv("SEARXNG_URL", "http://host.docker.internal:11437") OPENMEMORY_URL = os.getenv("OPENMEMORY_URL", "http://openmemory:8765") CRAWL4AI_URL = os.getenv("CRAWL4AI_URL", "http://crawl4ai:11235") ROUTECHECK_URL = os.getenv("ROUTECHECK_URL", "http://routecheck:8090") ROUTECHECK_TOKEN = os.getenv("ROUTECHECK_TOKEN", "") MAX_HISTORY_TURNS = 5 _conversation_buffers: dict[str, list] = {} # ── Interaction logging (RLHF data collection) ───────────────────────────────── _LOG_DIR = Path(os.getenv("ADOLF_LOG_DIR", "/app/logs")) _INTERACTIONS_LOG = _LOG_DIR / "interactions.jsonl" def _ensure_log_dir() -> None: try: _LOG_DIR.mkdir(parents=True, exist_ok=True) except Exception as e: print(f"[log] cannot create log dir {_LOG_DIR}: {e}", flush=True) async def _log_interaction( session_id: str, channel: str, tier: str, input_text: str, response_text: str | None, latency_ms: int, metadata: dict | None = None, ) -> None: """Append one interaction record to the JSONL log for future RLHF/finetuning.""" record = { "ts": time.time(), "session_id": session_id, "channel": channel, "tier": tier, "input": input_text, "output": response_text or "", "latency_ms": latency_ms, } if metadata: record["metadata"] = metadata try: _ensure_log_dir() with open(_INTERACTIONS_LOG, "a", encoding="utf-8") as f: f.write(_json_module.dumps(record, ensure_ascii=False) + "\n") except Exception as e: print(f"[log] write error: {e}", flush=True) # Per-session streaming queues — filled during inference, read by /stream/{session_id} _stream_queues: dict[str, asyncio.Queue] = {} async def _push_stream_chunk(session_id: str, chunk: str) -> None: q = _stream_queues.setdefault(session_id, asyncio.Queue()) await q.put(chunk) async def _end_stream(session_id: str) -> None: q = _stream_queues.setdefault(session_id, asyncio.Queue()) await q.put("[DONE]") async def _crawl4ai_fetch_async(url: str) -> str: """Async fetch via Crawl4AI — JS-rendered, bot-bypass, returns clean markdown.""" try: async with _httpx.AsyncClient(timeout=60) as client: r = await client.post(f"{CRAWL4AI_URL}/crawl", json={"urls": [url]}) r.raise_for_status() results = r.json().get("results", []) if not results or not results[0].get("success"): return "" md_obj = results[0].get("markdown") or {} md = md_obj.get("raw_markdown") if isinstance(md_obj, dict) else str(md_obj) return (md or "")[:5000] except Exception as e: return f"[fetch error: {e}]" async def _fetch_urls_from_message(message: str) -> str: """If message contains URLs, fetch their content concurrently via Crawl4AI. Returns a formatted context block, or '' if no URLs or all fetches fail.""" urls = _extract_urls(message) if not urls: return "" # Fetch up to 3 URLs concurrently results = await asyncio.gather(*[_crawl4ai_fetch_async(u) for u in urls[:3]]) parts = [] for url, content in zip(urls[:3], results): if content and not content.startswith("[fetch error"): parts.append(f"### {url}\n{content[:3000]}") if not parts: return "" return "User's message contains URLs. Fetched content:\n\n" + "\n\n".join(parts) # /no_think at the start of the system prompt disables qwen3 chain-of-thought. # create_deep_agent prepends our system_prompt before BASE_AGENT_PROMPT, so # /no_think lands at position 0 and is respected by qwen3 models via Ollama. MEDIUM_SYSTEM_PROMPT = ( "You are a helpful AI assistant. Reply concisely. " "If asked to remember a fact or name, simply confirm: 'Got it, I'll remember that.'" ) COMPLEX_SYSTEM_PROMPT = ( "You are a deep research assistant. " "web_search automatically fetches full page content from top results — use it 6+ times with different queries. " "Also call fetch_url on any specific URL you want to read in full.\n\n" "Run searches in English AND Russian/Latvian. " "After getting results, run follow-up searches based on new facts found.\n\n" "Write a structured markdown report with sections: " "Overview, Education, Career, Publications, Online Presence, Interesting Findings.\n" "Every fact must link to the real URL it came from: [fact](url). " "NEVER invent URLs. End with: **Sources checked: N**" ) medium_model = None medium_agent = None complex_agent = None router: Router = None vram_manager: VRAMManager = None mcp_client = None _memory_add_tool = None _memory_search_tool = None # Fast tools run before the LLM — classifier + context enricher _fast_tool_runner = FastToolRunner([ WeatherTool(), CommuteTool(routecheck_url=ROUTECHECK_URL, internal_token=ROUTECHECK_TOKEN), ]) # GPU mutex: one LLM inference at a time _reply_semaphore = asyncio.Semaphore(1) @asynccontextmanager async def lifespan(app: FastAPI): global medium_model, medium_agent, complex_agent, router, vram_manager, mcp_client, \ _memory_add_tool, _memory_search_tool # Register channel adapters channels.register_defaults() # All three models route through Bifrost → Ollama GPU. router_model = ChatOpenAI( model=f"ollama/{ROUTER_MODEL}", base_url=LITELLM_URL, api_key=LITELLM_API_KEY, temperature=0, timeout=30, ) embedder = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_API_KEY) medium_model = ChatOpenAI( model=f"ollama/{MEDIUM_MODEL}", base_url=LITELLM_URL, api_key=LITELLM_API_KEY, timeout=180, ) complex_model = ChatOpenAI( model=COMPLEX_MODEL, # full model name — may be remote (OpenRouter) or local ollama/* base_url=LITELLM_URL, api_key=LITELLM_API_KEY, timeout=600, ) vram_manager = VRAMManager(base_url=OLLAMA_BASE_URL) router = Router(model=router_model, embedder=embedder, fast_tool_runner=_fast_tool_runner) await router.initialize() mcp_connections = { "openmemory": {"transport": "sse", "url": f"{OPENMEMORY_URL}/sse"}, } mcp_client = MultiServerMCPClient(mcp_connections) for attempt in range(12): try: mcp_tools = await mcp_client.get_tools() break except Exception as e: if attempt == 11: raise print(f"[agent] MCP not ready (attempt {attempt + 1}/12): {e}. Retrying in 5s...") await asyncio.sleep(5) agent_tools = [t for t in mcp_tools if t.name not in ("add_memory", "search_memory", "get_all_memories")] # Expose memory tools directly so run_agent_task can call them outside the agent loop for t in mcp_tools: if t.name == "add_memory": _memory_add_tool = t elif t.name == "search_memory": _memory_search_tool = t searx = SearxSearchWrapper(searx_host=SEARXNG_URL) def _crawl4ai_fetch(url: str) -> str: """Fetch a URL via Crawl4AI (JS-rendered, bot-bypass) and return clean markdown.""" try: r = _httpx.post(f"{CRAWL4AI_URL}/crawl", json={"urls": [url]}, timeout=60) r.raise_for_status() results = r.json().get("results", []) if not results or not results[0].get("success"): return "" md_obj = results[0].get("markdown") or {} md = md_obj.get("raw_markdown") if isinstance(md_obj, dict) else str(md_obj) return (md or "")[:5000] except Exception as e: return f"[fetch error: {e}]" def _search_and_read(query: str) -> str: """Search the web and automatically fetch full content of top results. Returns snippets + full page content from the top URLs.""" import json as _json # Get structured results from SearXNG try: r = _httpx.get( f"{SEARXNG_URL}/search", params={"q": query, "format": "json"}, timeout=15, ) data = r.json() items = data.get("results", [])[:5] except Exception as e: return f"[search error: {e}]" if not items: return "No results found." out = [f"Search: {query}\n"] for i, item in enumerate(items, 1): url = item.get("url", "") title = item.get("title", "") snippet = item.get("content", "")[:300] out.append(f"\n[{i}] {title}\nURL: {url}\nSnippet: {snippet}") # Auto-fetch top 2 URLs for full content out.append("\n\n--- Full page content ---") for item in items[:2]: url = item.get("url", "") if not url: continue content = _crawl4ai_fetch(url) if content and not content.startswith("[fetch error"): out.append(f"\n### {url}\n{content[:3000]}") return "\n".join(out) agent_tools.append(Tool( name="web_search", func=_search_and_read, description=( "Search the web and read full content of top results. " "Returns search snippets AND full page text from the top URLs. " "Use multiple different queries to research a topic thoroughly." ), )) def _fetch_url(url: str) -> str: """Fetch and read the full text content of a URL.""" content = _crawl4ai_fetch(url) return content if content else "[fetch_url: empty or blocked]" agent_tools.append(Tool( name="fetch_url", func=_fetch_url, description=( "Fetch and read the full text content of a specific URL. " "Use for URLs not covered by web_search. Input: a single URL string." ), )) medium_agent = build_medium_agent( model=medium_model, agent_tools=agent_tools, system_prompt=MEDIUM_SYSTEM_PROMPT, ) complex_agent = build_complex_agent( model=complex_model, agent_tools=agent_tools, system_prompt=COMPLEX_SYSTEM_PROMPT.format(user_id="{user_id}"), ) print( f"[agent] litellm={LITELLM_URL} | router=semantic(ollama/{ROUTER_MODEL}+nomic-embed-text) | " f"medium=ollama/{MEDIUM_MODEL} | complex={COMPLEX_MODEL}", flush=True, ) print(f"[agent] agent tools: {[t.name for t in agent_tools]}", flush=True) yield medium_model = None medium_agent = None complex_agent = None router = None vram_manager = None mcp_client = None app = FastAPI(lifespan=lifespan) # ── request models ───────────────────────────────────────────────────────────── class InboundMessage(BaseModel): text: str session_id: str # e.g. "tg-346967270", "cli-alvis" channel: str # "telegram" | "cli" user_id: str = "" # human identity; defaults to session_id if empty metadata: dict = {} class ChatRequest(BaseModel): """Legacy model — kept for test_pipeline.py compatibility.""" message: str chat_id: str # ── helpers ──────────────────────────────────────────────────────────────────── def _strip_think(text: str) -> str: """Strip qwen3 chain-of-thought blocks that appear inline in content when using Ollama's OpenAI-compatible endpoint (/v1/chat/completions).""" return _re.sub(r".*?", "", text, flags=_re.DOTALL).strip() def _extract_final_text(result) -> str | None: msgs = result.get("messages", []) for m in reversed(msgs): if type(m).__name__ == "AIMessage" and getattr(m, "content", ""): return _strip_think(m.content) if isinstance(result, dict) and result.get("output"): return _strip_think(result["output"]) return None def _log_messages(result): msgs = result.get("messages", []) for m in msgs: role = type(m).__name__ content = getattr(m, "content", "") tool_calls = getattr(m, "tool_calls", []) if content: print(f"[agent] {role}: {str(content)[:150]}", flush=True) for tc in tool_calls: print(f"[agent] {role} → {tc['name']}({tc['args']})", flush=True) # ── memory helpers ───────────────────────────────────────────────────────────── def _resolve_user_id(session_id: str) -> str: """Map any session_id to a canonical user identity for openmemory. All channels share the same memory pool for the single user.""" return "alvis" async def _store_memory(session_id: str, user_msg: str, assistant_reply: str) -> None: """Store a conversation turn in openmemory (runs as a background task).""" if _memory_add_tool is None: return t0 = time.monotonic() try: text = f"User: {user_msg}\nAssistant: {assistant_reply}" user_id = _resolve_user_id(session_id) await _memory_add_tool.ainvoke({"text": text, "user_id": user_id}) print(f"[memory] stored in {time.monotonic() - t0:.1f}s", flush=True) except Exception as e: print(f"[memory] error: {e}", flush=True) async def _retrieve_memories(message: str, session_id: str) -> str: """Search openmemory for relevant context. Returns formatted string or ''.""" if _memory_search_tool is None: return "" try: user_id = _resolve_user_id(session_id) result = await _memory_search_tool.ainvoke({"query": message, "user_id": user_id}) if result and result.strip() and result.strip() != "[]": return f"Relevant memories:\n{result}" except Exception: pass return "" # ── core pipeline ────────────────────────────────────────────────────────────── from typing import AsyncGenerator async def _run_agent_pipeline( message: str, history: list[dict], session_id: str, tier_override: str | None = None, no_inference: bool = False, tier_capture: list | None = None, ) -> AsyncGenerator[str, None]: """Core pipeline: pre-flight → routing → inference. Yields text chunks. tier_override: "light" | "medium" | "complex" | None (auto-route) no_inference: if True, routing decision is still made but inference is skipped — yields "I don't know" immediately Caller is responsible for scheduling _store_memory after consuming all chunks. """ async with (nullcontext() if no_inference else _reply_semaphore): t0 = time.monotonic() clean_message = message print(f"[agent] running: {clean_message[:80]!r}", flush=True) # Fetch URL content, memories, and fast-tool context concurrently # Skip preflight IO in no_inference mode — only routing decision needed if no_inference: url_context = memories = fast_context = None else: url_context, memories, fast_context = await asyncio.gather( _fetch_urls_from_message(clean_message), _retrieve_memories(clean_message, session_id), _fast_tool_runner.run_matching(clean_message), ) if url_context: print(f"[agent] crawl4ai: {len(url_context)} chars fetched", flush=True) if fast_context: names = _fast_tool_runner.matching_names(clean_message) print(f"[agent] fast_tools={names}: {len(fast_context)} chars injected", flush=True) # Build enriched history enriched_history = list(history) if url_context: enriched_history = [{"role": "system", "content": url_context}] + enriched_history if fast_context: enriched_history = [{"role": "system", "content": fast_context}] + enriched_history if memories: enriched_history = [{"role": "system", "content": memories}] + enriched_history final_text = None llm_elapsed = 0.0 try: # Short-circuit: fast tool already has the answer if fast_context and tier_override is None and not url_context and not no_inference: tier = "fast" final_text = fast_context llm_elapsed = time.monotonic() - t0 names = _fast_tool_runner.matching_names(clean_message) print(f"[agent] tier=fast tools={names} — delivering directly", flush=True) yield final_text else: # Determine tier if tier_override in ("light", "medium", "complex"): tier = tier_override light_reply = None if tier_override == "light": tier, light_reply = await router.route(clean_message, enriched_history, no_inference=no_inference) tier = "light" else: tier, light_reply = await router.route(clean_message, enriched_history, no_inference=no_inference) if url_context and tier == "light": tier = "medium" light_reply = None print("[agent] URL in message → upgraded light→medium", flush=True) print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True) if tier_capture is not None: tier_capture.append(tier) if no_inference: yield "I don't know" return if tier == "light": final_text = light_reply llm_elapsed = time.monotonic() - t0 print("[agent] light path: answered by router", flush=True) yield final_text elif tier == "medium": system_prompt = MEDIUM_SYSTEM_PROMPT if memories: system_prompt += "\n\n" + memories if url_context: system_prompt += "\n\n" + url_context if fast_context: system_prompt += "\n\nLive web search results (use these to answer):\n\n" + fast_context in_think = False response_parts = [] async for chunk in medium_model.astream([ {"role": "system", "content": system_prompt}, *history, {"role": "user", "content": clean_message}, ]): token = chunk.content or "" if not token: continue if in_think: if "" in token: in_think = False after = token.split("", 1)[1] if after: yield after response_parts.append(after) else: if "" in token: in_think = True before = token.split("", 1)[0] if before: yield before response_parts.append(before) else: yield token response_parts.append(token) llm_elapsed = time.monotonic() - t0 final_text = "".join(response_parts).strip() or None else: # complex — remote model, no VRAM management needed system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id) if url_context: system_prompt += "\n\n[Pre-fetched URL content from user's message:]\n" + url_context result = await complex_agent.ainvoke({ "messages": [ {"role": "system", "content": system_prompt}, *history, {"role": "user", "content": clean_message}, ] }) llm_elapsed = time.monotonic() - t0 _log_messages(result) final_text = _extract_final_text(result) if final_text: yield final_text except Exception as e: import traceback llm_elapsed = time.monotonic() - t0 print(f"[agent] error after {llm_elapsed:.1f}s for {session_id}: {e}", flush=True) traceback.print_exc() print(f"[agent] pipeline done in {time.monotonic() - t0:.1f}s tier={tier if 'tier' in dir() else '?'}", flush=True) # Store memory as side-effect (non-blocking, best-effort) if final_text: asyncio.create_task(_store_memory(session_id, clean_message, final_text)) # ── core task (Telegram / Matrix / CLI wrapper) ───────────────────────────────── async def run_agent_task( message: str, session_id: str, channel: str = "telegram", metadata: dict | None = None, ): print(f"[agent] queued: {message[:80]!r} chat={session_id}", flush=True) t0 = time.monotonic() meta = metadata or {} no_inference = bool(meta.get("no_inference", False)) is_benchmark = bool(meta.get("benchmark", False)) history = _conversation_buffers.get(session_id, []) final_text = None actual_tier = "unknown" tier_capture: list = [] async for chunk in _run_agent_pipeline(message, history, session_id, no_inference=no_inference, tier_capture=tier_capture): await _push_stream_chunk(session_id, chunk) if final_text is None: final_text = chunk else: final_text += chunk await _end_stream(session_id) actual_tier = tier_capture[0] if tier_capture else "unknown" elapsed_ms = int((time.monotonic() - t0) * 1000) if final_text: final_text = final_text.strip() # Skip channel delivery for benchmark sessions (no Telegram spam) if not is_benchmark: try: await channels.deliver(session_id, channel, final_text) except Exception as e: print(f"[agent] delivery error (non-fatal): {e}", flush=True) print(f"[agent] replied in {elapsed_ms / 1000:.1f}s tier={actual_tier}", flush=True) print(f"[agent] reply_text: {final_text}", flush=True) # Update conversation buffer buf = _conversation_buffers.get(session_id, []) buf.append({"role": "user", "content": message}) buf.append({"role": "assistant", "content": final_text}) _conversation_buffers[session_id] = buf[-(MAX_HISTORY_TURNS * 2):] # Log interaction for RLHF data collection (skip benchmark sessions to avoid noise) if not is_benchmark: asyncio.create_task(_log_interaction( session_id=session_id, channel=channel, tier=actual_tier, input_text=message, response_text=final_text, latency_ms=elapsed_ms, metadata=meta if meta else None, )) else: print("[agent] warning: no text reply from agent", flush=True) # ── endpoints ────────────────────────────────────────────────────────────────── @app.post("/message") async def message(request: InboundMessage, background_tasks: BackgroundTasks): """Unified inbound endpoint for all channels.""" if medium_agent is None: return JSONResponse(status_code=503, content={"error": "Agent not ready"}) session_id = request.session_id channel = request.channel background_tasks.add_task(run_agent_task, request.text, session_id, channel, request.metadata) return JSONResponse(status_code=202, content={"status": "accepted"}) @app.post("/chat") async def chat(request: ChatRequest, background_tasks: BackgroundTasks): """Legacy endpoint — maps chat_id to tg- session for backward compatibility.""" if medium_agent is None: return JSONResponse(status_code=503, content={"error": "Agent not ready"}) session_id = f"tg-{request.chat_id}" background_tasks.add_task(run_agent_task, request.message, session_id, "telegram") return JSONResponse(status_code=202, content={"status": "accepted"}) @app.get("/reply/{session_id}") async def reply_stream(session_id: str): """ SSE endpoint — streams the reply for a session once available, then closes. Used by CLI client and wiki_research.py instead of log polling. """ q = channels.pending_replies.setdefault(session_id, asyncio.Queue()) async def event_generator(): try: text = await asyncio.wait_for(q.get(), timeout=900) # Escape newlines so entire reply fits in one SSE data line yield f"data: {text.replace(chr(10), chr(92) + 'n').replace(chr(13), '')}\n\n" except asyncio.TimeoutError: yield "data: [timeout]\n\n" return StreamingResponse(event_generator(), media_type="text/event-stream") @app.get("/stream/{session_id}") async def stream_reply(session_id: str): """ SSE endpoint — streams reply tokens as they are generated. Each chunk: data: \\n\\n Signals completion: data: [DONE]\\n\\n Medium tier: real token-by-token streaming (think blocks filtered out). Light and complex tiers: full reply delivered as one chunk then [DONE]. """ q = _stream_queues.setdefault(session_id, asyncio.Queue()) async def event_generator(): try: while True: chunk = await asyncio.wait_for(q.get(), timeout=900) escaped = chunk.replace("\n", "\\n").replace("\r", "") yield f"data: {escaped}\n\n" if chunk == "[DONE]": break except asyncio.TimeoutError: yield "data: [DONE]\n\n" return StreamingResponse(event_generator(), media_type="text/event-stream") @app.get("/health") async def health(): return {"status": "ok", "agent_ready": medium_agent is not None} # ── OpenAI-compatible API (for OpenWebUI and other clients) ──────────────────── _TIER_MAP = { "adolf": None, "adolf-light": "light", "adolf-medium": "medium", "adolf-deep": "complex", } @app.get("/v1/models") async def list_models(): return { "object": "list", "data": [ {"id": "adolf", "object": "model", "owned_by": "adolf"}, {"id": "adolf-light", "object": "model", "owned_by": "adolf"}, {"id": "adolf-medium", "object": "model", "owned_by": "adolf"}, {"id": "adolf-deep", "object": "model", "owned_by": "adolf"}, ], } @app.post("/v1/chat/completions") async def chat_completions(request: Request): if medium_agent is None: return JSONResponse(status_code=503, content={"error": {"message": "Agent not ready", "type": "server_error"}}) body = await request.json() model = body.get("model", "adolf") messages = body.get("messages", []) stream = body.get("stream", True) # Extract current user message and history user_messages = [m for m in messages if m.get("role") == "user"] if not user_messages: return JSONResponse(status_code=400, content={"error": {"message": "No user message", "type": "invalid_request_error"}}) current_message = user_messages[-1]["content"] # History = everything before the last user message (excluding system messages from OpenWebUI) last_user_idx = len(messages) - 1 - next( i for i, m in enumerate(reversed(messages)) if m.get("role") == "user" ) history = [m for m in messages[:last_user_idx] if m.get("role") in ("user", "assistant")] session_id = request.headers.get("X-Session-Id", "owui-default") tier_override = _TIER_MAP.get(model) import json as _json import uuid as _uuid response_id = f"chatcmpl-{_uuid.uuid4().hex[:12]}" if stream: async def event_stream(): # Opening chunk with role opening = { "id": response_id, "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}] } yield f"data: {_json.dumps(opening)}\n\n" async for chunk in _run_agent_pipeline(current_message, history, session_id, tier_override): data = { "id": response_id, "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {"content": chunk}, "finish_reason": None}] } yield f"data: {_json.dumps(data)}\n\n" # Final chunk final = { "id": response_id, "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}] } yield f"data: {_json.dumps(final)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(event_stream(), media_type="text/event-stream") else: # Non-streaming: collect all chunks parts = [] async for chunk in _run_agent_pipeline(current_message, history, session_id, tier_override): if chunk: parts.append(chunk) full_text = "".join(parts).strip() return { "id": response_id, "object": "chat.completion", "choices": [{"index": 0, "message": {"role": "assistant", "content": full_text}, "finish_reason": "stop"}], "model": model, }