Files
adolf/channels.py
Alvis 1f5e272600 Switch from Bifrost to LiteLLM; add Matrix channel; update rules
Infrastructure:
- docker-compose.yml: replace bifrost container with LiteLLM proxy
  (host.docker.internal:4000); complex model → deepseek-r1:free via
  OpenRouter; add Matrix URL env var; mount logs volume
- bifrost-config.json: add auth_config + postgres config_store (archived)

Routing:
- router.py: full semantic 3-tier classifier rewrite — nomic-embed-text
  centroids for light/medium/complex; regex pre-classifiers for all tiers;
  Russian utterance sets expanded
- agent.py: wire LiteLLM URL; add dry_run support; add Matrix channel

Channels:
- channels.py: add Matrix adapter (_matrix_send via mx- session prefix)

Rules / docs:
- agent-pipeline.md: remove /think prefix requirement; document automatic
  complex tier classification
- llm-inference.md: update BIFROST_URL → LITELLM_URL references; add
  remote model note for complex tier
- ARCHITECTURE.md: deleted (superseded by README.md)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 02:14:13 +00:00

91 lines
3.2 KiB
Python

"""
Channel registry and reply delivery for the Adolf multi-channel gateway.
Each channel registers an async send callback:
channels.register("telegram", telegram_send)
channels.register("cli", cli_send)
When the agent is done, it calls:
await channels.deliver(session_id, channel, text)
Replies are also placed into per-session asyncio Queues so the
GET /reply/{session_id} SSE endpoint can stream them to polling clients.
"""
import asyncio
import os
from typing import Awaitable, Callable
import httpx
# ── channel registry ──────────────────────────────────────────────────────────
_callbacks: dict[str, Callable[[str, str], Awaitable[None]]] = {}
# session_id → Queue that receives the final reply text
pending_replies: dict[str, asyncio.Queue] = {}
def register(channel: str, callback: Callable[[str, str], Awaitable[None]]) -> None:
"""Register an async send callback for a channel name."""
_callbacks[channel] = callback
async def deliver(session_id: str, channel: str, text: str) -> None:
"""
Deliver a reply to the originating channel AND put it in the pending
queue so SSE /reply/{session_id} clients get it.
"""
# Always enqueue first so SSE listeners aren't missed
q = pending_replies.setdefault(session_id, asyncio.Queue())
await q.put(text)
cb = _callbacks.get(channel)
if cb:
await cb(session_id, text)
else:
print(f"[channels] no callback for channel={channel!r}, reply queued only", flush=True)
# ── built-in channel adapters ─────────────────────────────────────────────────
GRAMMY_URL = os.getenv("GRAMMY_URL", "http://grammy:3001")
MATRIX_URL = os.getenv("MATRIX_URL", "http://matrix:3002")
async def _telegram_send(session_id: str, text: str) -> None:
"""Send reply to Telegram via Grammy's POST /send endpoint."""
chat_id = session_id.removeprefix("tg-")
MAX_TG = 4000
chunks = [text[i:i + MAX_TG] for i in range(0, len(text), MAX_TG)]
async with httpx.AsyncClient(timeout=15) as client:
for chunk in chunks:
await client.post(
f"{GRAMMY_URL}/send",
json={"chat_id": chat_id, "text": chunk},
)
async def _matrix_send(session_id: str, text: str) -> None:
"""Send reply to Matrix via the matrix adapter POST /send endpoint."""
room_id = session_id.removeprefix("mx-")
MAX_MTX = 4000
chunks = [text[i:i + MAX_MTX] for i in range(0, len(text), MAX_MTX)]
async with httpx.AsyncClient(timeout=15) as client:
for chunk in chunks:
await client.post(
f"{MATRIX_URL}/send",
json={"room_id": room_id, "text": chunk},
)
async def _cli_send(session_id: str, text: str) -> None:
"""CLI replies are handled entirely through the pending_replies queue — no-op here."""
pass
def register_defaults() -> None:
"""Register the built-in Telegram, Matrix, and CLI channel adapters."""
register("telegram", _telegram_send)
register("matrix", _matrix_send)
register("cli", _cli_send)