""" Channel registry and reply delivery for the Adolf multi-channel gateway. Each channel registers an async send callback: channels.register("telegram", telegram_send) channels.register("cli", cli_send) When the agent is done, it calls: await channels.deliver(session_id, channel, text) Replies are also placed into per-session asyncio Queues so the GET /reply/{session_id} SSE endpoint can stream them to polling clients. """ import asyncio import os from typing import Awaitable, Callable import httpx # ── channel registry ────────────────────────────────────────────────────────── _callbacks: dict[str, Callable[[str, str], Awaitable[None]]] = {} # session_id → Queue that receives the final reply text pending_replies: dict[str, asyncio.Queue] = {} def register(channel: str, callback: Callable[[str, str], Awaitable[None]]) -> None: """Register an async send callback for a channel name.""" _callbacks[channel] = callback async def deliver(session_id: str, channel: str, text: str) -> None: """ Deliver a reply to the originating channel AND put it in the pending queue so SSE /reply/{session_id} clients get it. """ # Always enqueue first so SSE listeners aren't missed q = pending_replies.setdefault(session_id, asyncio.Queue()) await q.put(text) cb = _callbacks.get(channel) if cb: await cb(session_id, text) else: print(f"[channels] no callback for channel={channel!r}, reply queued only", flush=True) # ── built-in channel adapters ───────────────────────────────────────────────── GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001") async def _telegram_send(session_id: str, text: str) -> None: """Send reply to Telegram via Grammy's POST /send endpoint.""" chat_id = session_id.removeprefix("tg-") MAX_TG = 4000 chunks = [text[i:i + MAX_TG] for i in range(0, len(text), MAX_TG)] async with httpx.AsyncClient(timeout=15) as client: for chunk in chunks: await client.post( f"{GRAMMY_URL}/send", json={"chat_id": chat_id, "text": chunk}, ) async def _cli_send(session_id: str, text: str) -> None: """CLI replies are handled entirely through the pending_replies queue — no-op here.""" pass def register_defaults() -> None: """Register the built-in Telegram and CLI channel adapters.""" register("telegram", _telegram_send) register("cli", _cli_send)