diff --git a/Dockerfile.cli b/Dockerfile.cli
new file mode 100644
index 0000000..7c6e049
--- /dev/null
+++ b/Dockerfile.cli
@@ -0,0 +1,9 @@
+FROM python:3.12-slim
+
+WORKDIR /app
+
+RUN pip install --no-cache-dir rich
+
+COPY cli.py .
+
+CMD ["python3", "cli.py"]
diff --git a/agent.py b/agent.py
index 1cb7ac0..d8e51fd 100644
--- a/agent.py
+++ b/agent.py
@@ -41,6 +41,19 @@ CRAWL4AI_URL = os.getenv("CRAWL4AI_URL", "http://crawl4ai:11235")
MAX_HISTORY_TURNS = 5
_conversation_buffers: dict[str, list] = {}
+# Per-session streaming queues — filled during inference, read by /stream/{session_id}
+_stream_queues: dict[str, asyncio.Queue] = {}
+
+
+async def _push_stream_chunk(session_id: str, chunk: str) -> None:
+ q = _stream_queues.setdefault(session_id, asyncio.Queue())
+ await q.put(chunk)
+
+
+async def _end_stream(session_id: str) -> None:
+ q = _stream_queues.setdefault(session_id, asyncio.Queue())
+ await q.put("[DONE]")
+
async def _crawl4ai_fetch_async(url: str) -> str:
"""Async fetch via Crawl4AI — JS-rendered, bot-bypass, returns clean markdown."""
@@ -95,6 +108,7 @@ COMPLEX_SYSTEM_PROMPT = (
"NEVER invent URLs. End with: **Sources checked: N**"
)
+medium_model = None
medium_agent = None
complex_agent = None
router: Router = None
@@ -109,7 +123,7 @@ _reply_semaphore = asyncio.Semaphore(1)
@asynccontextmanager
async def lifespan(app: FastAPI):
- global medium_agent, complex_agent, router, vram_manager, mcp_client, \
+ global medium_model, medium_agent, complex_agent, router, vram_manager, mcp_client, \
_memory_add_tool, _memory_search_tool
# Register channel adapters
@@ -263,6 +277,7 @@ async def lifespan(app: FastAPI):
yield
+ medium_model = None
medium_agent = None
complex_agent = None
router = None
@@ -394,6 +409,8 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
final_text = light_reply
llm_elapsed = time.monotonic() - t0
print(f"[agent] light path: answered by router", flush=True)
+ await _push_stream_chunk(session_id, final_text)
+ await _end_stream(session_id)
elif tier == "medium":
system_prompt = MEDIUM_SYSTEM_PROMPT
@@ -401,16 +418,39 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
system_prompt = system_prompt + "\n\n" + memories
if url_context:
system_prompt = system_prompt + "\n\n" + url_context
- result = await medium_agent.ainvoke({
- "messages": [
- {"role": "system", "content": system_prompt},
- *history,
- {"role": "user", "content": clean_message},
- ]
- })
+
+ # Stream tokens directly — filter out qwen3 blocks
+ in_think = False
+ response_parts = []
+ async for chunk in medium_model.astream([
+ {"role": "system", "content": system_prompt},
+ *history,
+ {"role": "user", "content": clean_message},
+ ]):
+ token = chunk.content or ""
+ if not token:
+ continue
+ if in_think:
+ if "" in token:
+ in_think = False
+ after = token.split("", 1)[1]
+ if after:
+ await _push_stream_chunk(session_id, after)
+ response_parts.append(after)
+ else:
+ if "" in token:
+ in_think = True
+ before = token.split("", 1)[0]
+ if before:
+ await _push_stream_chunk(session_id, before)
+ response_parts.append(before)
+ else:
+ await _push_stream_chunk(session_id, token)
+ response_parts.append(token)
+
+ await _end_stream(session_id)
llm_elapsed = time.monotonic() - t0
- _log_messages(result)
- final_text = _extract_final_text(result)
+ final_text = "".join(response_parts).strip() or None
else: # complex
ok = await vram_manager.enter_complex_mode()
@@ -432,7 +472,6 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
else:
system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id)
if url_context:
- # Inject pre-fetched content — complex agent can still re-fetch or follow links
system_prompt = system_prompt + "\n\n[Pre-fetched URL content from user's message:]\n" + url_context
result = await complex_agent.ainvoke({
"messages": [
@@ -446,12 +485,16 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
llm_elapsed = time.monotonic() - t0
_log_messages(result)
final_text = _extract_final_text(result)
+ if final_text:
+ await _push_stream_chunk(session_id, final_text)
+ await _end_stream(session_id)
except Exception as e:
import traceback
llm_elapsed = time.monotonic() - t0
print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True)
traceback.print_exc()
+ await _end_stream(session_id)
# Deliver reply through the originating channel
if final_text:
@@ -521,6 +564,32 @@ async def reply_stream(session_id: str):
return StreamingResponse(event_generator(), media_type="text/event-stream")
+@app.get("/stream/{session_id}")
+async def stream_reply(session_id: str):
+ """
+ SSE endpoint — streams reply tokens as they are generated.
+ Each chunk: data: \\n\\n
+ Signals completion: data: [DONE]\\n\\n
+
+ Medium tier: real token-by-token streaming (think blocks filtered out).
+ Light and complex tiers: full reply delivered as one chunk then [DONE].
+ """
+ q = _stream_queues.setdefault(session_id, asyncio.Queue())
+
+ async def event_generator():
+ try:
+ while True:
+ chunk = await asyncio.wait_for(q.get(), timeout=900)
+ escaped = chunk.replace("\n", "\\n").replace("\r", "")
+ yield f"data: {escaped}\n\n"
+ if chunk == "[DONE]":
+ break
+ except asyncio.TimeoutError:
+ yield "data: [DONE]\n\n"
+
+ return StreamingResponse(event_generator(), media_type="text/event-stream")
+
+
@app.get("/health")
async def health():
return {"status": "ok", "agent_ready": medium_agent is not None}
diff --git a/cli.py b/cli.py
index 4cb909a..7d76f1c 100644
--- a/cli.py
+++ b/cli.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
"""
-Adolf CLI — interactive REPL for the multi-channel gateway.
+Adolf CLI — interactive REPL with Rich streaming display.
Usage:
- python3 cli.py [--url http://localhost:8000] [--session cli-alvis]
+ python3 cli.py [--url http://deepagents:8000] [--session cli-alvis]
"""
import argparse
@@ -12,7 +12,13 @@ import os
import sys
import urllib.request
-GATEWAY = "http://localhost:8000"
+from rich.console import Console
+from rich.live import Live
+from rich.markdown import Markdown
+from rich.text import Text
+
+GATEWAY = "http://deepagents:8000"
+console = Console()
def post_message(gateway: str, text: str, session_id: str) -> None:
@@ -20,7 +26,7 @@ def post_message(gateway: str, text: str, session_id: str) -> None:
"text": text,
"session_id": session_id,
"channel": "cli",
- "user_id": os.getlogin(),
+ "user_id": os.getenv("USER", "user"),
}).encode()
req = urllib.request.Request(
f"{gateway}/message",
@@ -30,33 +36,49 @@ def post_message(gateway: str, text: str, session_id: str) -> None:
)
with urllib.request.urlopen(req, timeout=10) as r:
if r.status != 202:
- print(f"[error] gateway returned {r.status}", file=sys.stderr)
+ console.print(f"[red][error] gateway returned {r.status}[/red]")
sys.exit(1)
-def wait_for_reply(gateway: str, session_id: str, timeout: int = 400) -> str:
- """Open SSE stream and return first data event."""
+def stream_reply(gateway: str, session_id: str, timeout: int = 400) -> str:
+ """
+ Open the /stream/{session_id} SSE endpoint and display tokens live with
+ Rich. Returns the full assembled reply text.
+ """
req = urllib.request.Request(
- f"{gateway}/reply/{session_id}",
+ f"{gateway}/stream/{session_id}",
headers={"Accept": "text/event-stream"},
)
+ buffer = ""
with urllib.request.urlopen(req, timeout=timeout + 5) as r:
- for raw_line in r:
- line = raw_line.decode("utf-8").rstrip("\n")
- if line.startswith("data:"):
- return line[5:].strip().replace("\\n", "\n")
- return ""
+ with Live(Text(""), console=console, refresh_per_second=20, transient=True) as live:
+ for raw_line in r:
+ line = raw_line.decode("utf-8").rstrip("\n")
+ if not line.startswith("data:"):
+ continue
+ chunk = line[5:].strip()
+ if chunk == "[DONE]":
+ break
+ chunk = chunk.replace("\\n", "\n")
+ buffer += chunk
+ live.update(Text(buffer))
+
+ # Render the complete reply as Markdown once streaming is done
+ console.print(Markdown(buffer))
+ return buffer
def main():
parser = argparse.ArgumentParser(description="Adolf CLI")
parser.add_argument("--url", default=GATEWAY, help="Gateway URL")
- parser.add_argument("--session", default=f"cli-{os.getlogin()}", help="Session ID")
+ parser.add_argument("--session", default=f"cli-{os.getenv('USER', 'user')}",
+ help="Session ID")
parser.add_argument("--timeout", type=int, default=400, help="Reply timeout (seconds)")
args = parser.parse_args()
- print(f"Adolf CLI (session={args.session}, gateway={args.url})")
- print("Type your message and press Enter. Ctrl+C or Ctrl+D to exit.\n")
+ console.print(f"[bold]Adolf CLI[/bold] (session=[cyan]{args.session}[/cyan], "
+ f"gateway=[cyan]{args.url}[/cyan])")
+ console.print("Type your message and press Enter. Ctrl+C or Ctrl+D to exit.\n")
try:
while True:
@@ -68,12 +90,11 @@ def main():
continue
post_message(args.url, text, args.session)
- print("...", end="", flush=True)
- reply = wait_for_reply(args.url, args.session, timeout=args.timeout)
- print(f"\r{reply}\n")
+ stream_reply(args.url, args.session, timeout=args.timeout)
+ console.print()
except KeyboardInterrupt:
- print("\nbye")
+ console.print("\n[dim]bye[/dim]")
if __name__ == "__main__":
diff --git a/docker-compose.yml b/docker-compose.yml
index e9a6ecf..5f20dbe 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -65,6 +65,20 @@ services:
- DEEPAGENTS_URL=http://deepagents:8000
restart: unless-stopped
+ cli:
+ build:
+ context: .
+ dockerfile: Dockerfile.cli
+ container_name: cli
+ environment:
+ - DEEPAGENTS_URL=http://deepagents:8000
+ depends_on:
+ - deepagents
+ stdin_open: true
+ tty: true
+ profiles:
+ - tools
+
crawl4ai:
image: unclecode/crawl4ai:latest
container_name: crawl4ai
diff --git a/tests/use_cases/apple_pie_research.md b/tests/use_cases/apple_pie_research.md
index 5456f7c..b6818bb 100644
--- a/tests/use_cases/apple_pie_research.md
+++ b/tests/use_cases/apple_pie_research.md
@@ -13,17 +13,17 @@ curl -s -X POST http://localhost:8000/message \
-d '{"text": "/think what is the best recipe for an apple pie?", "session_id": "use-case-apple-pie", "channel": "cli", "user_id": "claude"}'
```
-**2. Wait for the reply** via SSE (complex tier can take up to 5 minutes):
+**2. Wait for the streaming reply** (complex tier can take up to 5 minutes):
```bash
-curl -s -N --max-time 300 "http://localhost:8000/reply/use-case-apple-pie"
+curl -s -N --max-time 300 "http://localhost:8000/stream/use-case-apple-pie"
```
**3. Confirm tier and tool usage in agent logs:**
```bash
docker compose -f /home/alvis/adolf/docker-compose.yml logs deepagents \
- --since=600s --no-log-prefix | grep -E "tier=complex|web_search|fetch_url|crawl4ai"
+ --since=600s | grep -E "tier=complex|web_search|fetch_url|crawl4ai"
```
## Evaluate (use your judgment)
diff --git a/tests/use_cases/cli_startup.md b/tests/use_cases/cli_startup.md
index 3acac9d..8ecfe5e 100644
--- a/tests/use_cases/cli_startup.md
+++ b/tests/use_cases/cli_startup.md
@@ -1,13 +1,13 @@
# Use Case: CLI Startup
-Verify the Adolf CLI starts cleanly and exits without error when the user closes input.
+Verify the Adolf CLI container starts cleanly, shows the welcome banner,
+and exits without error when the user closes input.
## Steps
-Run the CLI with empty stdin (simulates user pressing Ctrl+D immediately):
-
```bash
-echo "" | python3 /home/alvis/adolf/cli.py --session use-case-cli-startup
+echo "" | docker compose --profile tools run --rm -T cli \
+ python3 cli.py --url http://deepagents:8000 --session use-case-cli-startup
echo "exit code: $?"
```