From b04e8a092567c53d855dbc2ae92b5d0d0fcb0190 Mon Sep 17 00:00:00 2001 From: Alvis Date: Thu, 12 Mar 2026 17:26:52 +0000 Subject: [PATCH] Add Rich token streaming: server SSE + CLI live display + CLI container MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Server (agent.py): - _stream_queues: per-session asyncio.Queue for token chunks - _push_stream_chunk() / _end_stream() helpers - Medium tier: astream() with block filtering — real token streaming - Light tier: full reply pushed as single chunk then [DONE] - Complex tier: full reply pushed after agent completes then [DONE] - GET /stream/{session_id} SSE endpoint (data: \n\n, data: [DONE]\n\n) - medium_model promoted to module-level global for astream() access CLI (cli.py): - stream_reply(): reads /stream/ SSE, renders tokens live with Rich Live (transient) - Final reply rendered as Markdown after stream completes - os.getlogin() replaced with os.getenv("USER") for container compatibility Dockerfile.cli + docker-compose cli service (profiles: tools): - Run: docker compose --profile tools run --rm -it cli Co-Authored-By: Claude Sonnet 4.6 --- Dockerfile.cli | 9 +++ agent.py | 91 +++++++++++++++++++++++---- cli.py | 61 ++++++++++++------ docker-compose.yml | 14 +++++ tests/use_cases/apple_pie_research.md | 6 +- tests/use_cases/cli_startup.md | 8 +-- 6 files changed, 151 insertions(+), 38 deletions(-) create mode 100644 Dockerfile.cli diff --git a/Dockerfile.cli b/Dockerfile.cli new file mode 100644 index 0000000..7c6e049 --- /dev/null +++ b/Dockerfile.cli @@ -0,0 +1,9 @@ +FROM python:3.12-slim + +WORKDIR /app + +RUN pip install --no-cache-dir rich + +COPY cli.py . + +CMD ["python3", "cli.py"] diff --git a/agent.py b/agent.py index 1cb7ac0..d8e51fd 100644 --- a/agent.py +++ b/agent.py @@ -41,6 +41,19 @@ CRAWL4AI_URL = os.getenv("CRAWL4AI_URL", "http://crawl4ai:11235") MAX_HISTORY_TURNS = 5 _conversation_buffers: dict[str, list] = {} +# Per-session streaming queues — filled during inference, read by /stream/{session_id} +_stream_queues: dict[str, asyncio.Queue] = {} + + +async def _push_stream_chunk(session_id: str, chunk: str) -> None: + q = _stream_queues.setdefault(session_id, asyncio.Queue()) + await q.put(chunk) + + +async def _end_stream(session_id: str) -> None: + q = _stream_queues.setdefault(session_id, asyncio.Queue()) + await q.put("[DONE]") + async def _crawl4ai_fetch_async(url: str) -> str: """Async fetch via Crawl4AI — JS-rendered, bot-bypass, returns clean markdown.""" @@ -95,6 +108,7 @@ COMPLEX_SYSTEM_PROMPT = ( "NEVER invent URLs. End with: **Sources checked: N**" ) +medium_model = None medium_agent = None complex_agent = None router: Router = None @@ -109,7 +123,7 @@ _reply_semaphore = asyncio.Semaphore(1) @asynccontextmanager async def lifespan(app: FastAPI): - global medium_agent, complex_agent, router, vram_manager, mcp_client, \ + global medium_model, medium_agent, complex_agent, router, vram_manager, mcp_client, \ _memory_add_tool, _memory_search_tool # Register channel adapters @@ -263,6 +277,7 @@ async def lifespan(app: FastAPI): yield + medium_model = None medium_agent = None complex_agent = None router = None @@ -394,6 +409,8 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram final_text = light_reply llm_elapsed = time.monotonic() - t0 print(f"[agent] light path: answered by router", flush=True) + await _push_stream_chunk(session_id, final_text) + await _end_stream(session_id) elif tier == "medium": system_prompt = MEDIUM_SYSTEM_PROMPT @@ -401,16 +418,39 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram system_prompt = system_prompt + "\n\n" + memories if url_context: system_prompt = system_prompt + "\n\n" + url_context - result = await medium_agent.ainvoke({ - "messages": [ - {"role": "system", "content": system_prompt}, - *history, - {"role": "user", "content": clean_message}, - ] - }) + + # Stream tokens directly — filter out qwen3 blocks + in_think = False + response_parts = [] + async for chunk in medium_model.astream([ + {"role": "system", "content": system_prompt}, + *history, + {"role": "user", "content": clean_message}, + ]): + token = chunk.content or "" + if not token: + continue + if in_think: + if "" in token: + in_think = False + after = token.split("", 1)[1] + if after: + await _push_stream_chunk(session_id, after) + response_parts.append(after) + else: + if "" in token: + in_think = True + before = token.split("", 1)[0] + if before: + await _push_stream_chunk(session_id, before) + response_parts.append(before) + else: + await _push_stream_chunk(session_id, token) + response_parts.append(token) + + await _end_stream(session_id) llm_elapsed = time.monotonic() - t0 - _log_messages(result) - final_text = _extract_final_text(result) + final_text = "".join(response_parts).strip() or None else: # complex ok = await vram_manager.enter_complex_mode() @@ -432,7 +472,6 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram else: system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id) if url_context: - # Inject pre-fetched content — complex agent can still re-fetch or follow links system_prompt = system_prompt + "\n\n[Pre-fetched URL content from user's message:]\n" + url_context result = await complex_agent.ainvoke({ "messages": [ @@ -446,12 +485,16 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram llm_elapsed = time.monotonic() - t0 _log_messages(result) final_text = _extract_final_text(result) + if final_text: + await _push_stream_chunk(session_id, final_text) + await _end_stream(session_id) except Exception as e: import traceback llm_elapsed = time.monotonic() - t0 print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True) traceback.print_exc() + await _end_stream(session_id) # Deliver reply through the originating channel if final_text: @@ -521,6 +564,32 @@ async def reply_stream(session_id: str): return StreamingResponse(event_generator(), media_type="text/event-stream") +@app.get("/stream/{session_id}") +async def stream_reply(session_id: str): + """ + SSE endpoint — streams reply tokens as they are generated. + Each chunk: data: \\n\\n + Signals completion: data: [DONE]\\n\\n + + Medium tier: real token-by-token streaming (think blocks filtered out). + Light and complex tiers: full reply delivered as one chunk then [DONE]. + """ + q = _stream_queues.setdefault(session_id, asyncio.Queue()) + + async def event_generator(): + try: + while True: + chunk = await asyncio.wait_for(q.get(), timeout=900) + escaped = chunk.replace("\n", "\\n").replace("\r", "") + yield f"data: {escaped}\n\n" + if chunk == "[DONE]": + break + except asyncio.TimeoutError: + yield "data: [DONE]\n\n" + + return StreamingResponse(event_generator(), media_type="text/event-stream") + + @app.get("/health") async def health(): return {"status": "ok", "agent_ready": medium_agent is not None} diff --git a/cli.py b/cli.py index 4cb909a..7d76f1c 100644 --- a/cli.py +++ b/cli.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 """ -Adolf CLI — interactive REPL for the multi-channel gateway. +Adolf CLI — interactive REPL with Rich streaming display. Usage: - python3 cli.py [--url http://localhost:8000] [--session cli-alvis] + python3 cli.py [--url http://deepagents:8000] [--session cli-alvis] """ import argparse @@ -12,7 +12,13 @@ import os import sys import urllib.request -GATEWAY = "http://localhost:8000" +from rich.console import Console +from rich.live import Live +from rich.markdown import Markdown +from rich.text import Text + +GATEWAY = "http://deepagents:8000" +console = Console() def post_message(gateway: str, text: str, session_id: str) -> None: @@ -20,7 +26,7 @@ def post_message(gateway: str, text: str, session_id: str) -> None: "text": text, "session_id": session_id, "channel": "cli", - "user_id": os.getlogin(), + "user_id": os.getenv("USER", "user"), }).encode() req = urllib.request.Request( f"{gateway}/message", @@ -30,33 +36,49 @@ def post_message(gateway: str, text: str, session_id: str) -> None: ) with urllib.request.urlopen(req, timeout=10) as r: if r.status != 202: - print(f"[error] gateway returned {r.status}", file=sys.stderr) + console.print(f"[red][error] gateway returned {r.status}[/red]") sys.exit(1) -def wait_for_reply(gateway: str, session_id: str, timeout: int = 400) -> str: - """Open SSE stream and return first data event.""" +def stream_reply(gateway: str, session_id: str, timeout: int = 400) -> str: + """ + Open the /stream/{session_id} SSE endpoint and display tokens live with + Rich. Returns the full assembled reply text. + """ req = urllib.request.Request( - f"{gateway}/reply/{session_id}", + f"{gateway}/stream/{session_id}", headers={"Accept": "text/event-stream"}, ) + buffer = "" with urllib.request.urlopen(req, timeout=timeout + 5) as r: - for raw_line in r: - line = raw_line.decode("utf-8").rstrip("\n") - if line.startswith("data:"): - return line[5:].strip().replace("\\n", "\n") - return "" + with Live(Text(""), console=console, refresh_per_second=20, transient=True) as live: + for raw_line in r: + line = raw_line.decode("utf-8").rstrip("\n") + if not line.startswith("data:"): + continue + chunk = line[5:].strip() + if chunk == "[DONE]": + break + chunk = chunk.replace("\\n", "\n") + buffer += chunk + live.update(Text(buffer)) + + # Render the complete reply as Markdown once streaming is done + console.print(Markdown(buffer)) + return buffer def main(): parser = argparse.ArgumentParser(description="Adolf CLI") parser.add_argument("--url", default=GATEWAY, help="Gateway URL") - parser.add_argument("--session", default=f"cli-{os.getlogin()}", help="Session ID") + parser.add_argument("--session", default=f"cli-{os.getenv('USER', 'user')}", + help="Session ID") parser.add_argument("--timeout", type=int, default=400, help="Reply timeout (seconds)") args = parser.parse_args() - print(f"Adolf CLI (session={args.session}, gateway={args.url})") - print("Type your message and press Enter. Ctrl+C or Ctrl+D to exit.\n") + console.print(f"[bold]Adolf CLI[/bold] (session=[cyan]{args.session}[/cyan], " + f"gateway=[cyan]{args.url}[/cyan])") + console.print("Type your message and press Enter. Ctrl+C or Ctrl+D to exit.\n") try: while True: @@ -68,12 +90,11 @@ def main(): continue post_message(args.url, text, args.session) - print("...", end="", flush=True) - reply = wait_for_reply(args.url, args.session, timeout=args.timeout) - print(f"\r{reply}\n") + stream_reply(args.url, args.session, timeout=args.timeout) + console.print() except KeyboardInterrupt: - print("\nbye") + console.print("\n[dim]bye[/dim]") if __name__ == "__main__": diff --git a/docker-compose.yml b/docker-compose.yml index e9a6ecf..5f20dbe 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -65,6 +65,20 @@ services: - DEEPAGENTS_URL=http://deepagents:8000 restart: unless-stopped + cli: + build: + context: . + dockerfile: Dockerfile.cli + container_name: cli + environment: + - DEEPAGENTS_URL=http://deepagents:8000 + depends_on: + - deepagents + stdin_open: true + tty: true + profiles: + - tools + crawl4ai: image: unclecode/crawl4ai:latest container_name: crawl4ai diff --git a/tests/use_cases/apple_pie_research.md b/tests/use_cases/apple_pie_research.md index 5456f7c..b6818bb 100644 --- a/tests/use_cases/apple_pie_research.md +++ b/tests/use_cases/apple_pie_research.md @@ -13,17 +13,17 @@ curl -s -X POST http://localhost:8000/message \ -d '{"text": "/think what is the best recipe for an apple pie?", "session_id": "use-case-apple-pie", "channel": "cli", "user_id": "claude"}' ``` -**2. Wait for the reply** via SSE (complex tier can take up to 5 minutes): +**2. Wait for the streaming reply** (complex tier can take up to 5 minutes): ```bash -curl -s -N --max-time 300 "http://localhost:8000/reply/use-case-apple-pie" +curl -s -N --max-time 300 "http://localhost:8000/stream/use-case-apple-pie" ``` **3. Confirm tier and tool usage in agent logs:** ```bash docker compose -f /home/alvis/adolf/docker-compose.yml logs deepagents \ - --since=600s --no-log-prefix | grep -E "tier=complex|web_search|fetch_url|crawl4ai" + --since=600s | grep -E "tier=complex|web_search|fetch_url|crawl4ai" ``` ## Evaluate (use your judgment) diff --git a/tests/use_cases/cli_startup.md b/tests/use_cases/cli_startup.md index 3acac9d..8ecfe5e 100644 --- a/tests/use_cases/cli_startup.md +++ b/tests/use_cases/cli_startup.md @@ -1,13 +1,13 @@ # Use Case: CLI Startup -Verify the Adolf CLI starts cleanly and exits without error when the user closes input. +Verify the Adolf CLI container starts cleanly, shows the welcome banner, +and exits without error when the user closes input. ## Steps -Run the CLI with empty stdin (simulates user pressing Ctrl+D immediately): - ```bash -echo "" | python3 /home/alvis/adolf/cli.py --session use-case-cli-startup +echo "" | docker compose --profile tools run --rm -T cli \ + python3 cli.py --url http://deepagents:8000 --session use-case-cli-startup echo "exit code: $?" ```