#!/usr/bin/env python3 """ Adolf CLI — interactive REPL with Rich streaming display. Usage: python3 cli.py [--url http://deepagents:8000] [--session cli-alvis] """ import argparse import json import os import sys import urllib.request from rich.console import Console from rich.live import Live from rich.markdown import Markdown from rich.text import Text GATEWAY = "http://deepagents:8000" console = Console() def post_message(gateway: str, text: str, session_id: str) -> None: payload = json.dumps({ "text": text, "session_id": session_id, "channel": "cli", "user_id": os.getenv("USER", "user"), }).encode() req = urllib.request.Request( f"{gateway}/message", data=payload, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=10) as r: if r.status != 202: console.print(f"[red][error] gateway returned {r.status}[/red]") sys.exit(1) def stream_reply(gateway: str, session_id: str, timeout: int = 400) -> str: """ Open the /stream/{session_id} SSE endpoint and display tokens live with Rich. Returns the full assembled reply text. """ req = urllib.request.Request( f"{gateway}/stream/{session_id}", headers={"Accept": "text/event-stream"}, ) buffer = "" with urllib.request.urlopen(req, timeout=timeout + 5) as r: with Live(Text(""), console=console, refresh_per_second=20, transient=True) as live: for raw_line in r: line = raw_line.decode("utf-8").rstrip("\n") if not line.startswith("data:"): continue chunk = line[5:].strip() if chunk == "[DONE]": break chunk = chunk.replace("\\n", "\n") buffer += chunk live.update(Text(buffer)) # Render the complete reply as Markdown once streaming is done console.print(Markdown(buffer)) return buffer def main(): parser = argparse.ArgumentParser(description="Adolf CLI") parser.add_argument("--url", default=GATEWAY, help="Gateway URL") parser.add_argument("--session", default=f"cli-{os.getenv('USER', 'user')}", help="Session ID") parser.add_argument("--timeout", type=int, default=400, help="Reply timeout (seconds)") args = parser.parse_args() console.print(f"[bold]Adolf CLI[/bold] (session=[cyan]{args.session}[/cyan], " f"gateway=[cyan]{args.url}[/cyan])") console.print("Type your message and press Enter. Ctrl+C or Ctrl+D to exit.\n") try: while True: try: text = input("> ").strip() except EOFError: break if not text: continue post_message(args.url, text, args.session) stream_reply(args.url, args.session, timeout=args.timeout) console.print() except KeyboardInterrupt: console.print("\n[dim]bye[/dim]") if __name__ == "__main__": main()