""" Fast Tools — pre-flight tools invoked by a classifier before the main LLM call. Each FastTool has: - matches(message) → bool : regex classifier that determines if this tool applies - run(message) → str : async fetch that returns enrichment context FastToolRunner holds a list of FastTools. The Router uses any_matches() to force the tier to medium before LLM classification. run_agent_task() calls run_matching() to build extra context that is injected into the system prompt. To add a new fast tool: 1. Subclass FastTool, implement name/matches/run 2. Add an instance to the list passed to FastToolRunner in agent.py """ import asyncio import re from abc import ABC, abstractmethod import httpx class FastTool(ABC): """Base class for all pre-flight fast tools.""" @property @abstractmethod def name(self) -> str: ... @abstractmethod def matches(self, message: str) -> bool: ... @abstractmethod async def run(self, message: str) -> str: ... class WeatherTool(FastTool): """ Fetches current weather for the user's location (Balashikha, Moscow region) by querying SearXNG, which has external internet access. Triggered by any weather-related query. The Router also forces medium tier when this tool matches so the richer model handles the injected data. """ _PATTERN = re.compile( r"\b(weather|forecast|temperature|rain(ing)?|snow(ing)?|humidity|wind\s*speed" r"|холодно|тепло|погода|прогноз погоды" r"|how (hot|cold|warm) is it|what.?s the (weather|temp)|dress for the weather)\b", re.IGNORECASE, ) # Fixed query — always fetch home location weather _SEARCH_QUERY = "погода Балашиха сейчас" # Russian query → Celsius sources def __init__(self, searxng_url: str): self._searxng_url = searxng_url @property def name(self) -> str: return "weather" def matches(self, message: str) -> bool: return bool(self._PATTERN.search(message)) async def run(self, message: str) -> str: """Query SearXNG for Balashikha weather and return current conditions snippet.""" try: async with httpx.AsyncClient(timeout=15) as client: r = await client.get( f"{self._searxng_url}/search", params={"q": self._SEARCH_QUERY, "format": "json"}, ) r.raise_for_status() items = r.json().get("results", [])[:5] except Exception as e: return f"[weather error: {e}]" if not items: return "" # Prefer results whose snippets contain actual current conditions lines = ["Current weather data for Balashikha, Moscow region (temperatures in °C):\n"] for item in items: snippet = item.get("content", "") title = item.get("title", "") url = item.get("url", "") if snippet: lines.append(f"[{title}]\n{snippet}\nSource: {url}\n") return "\n".join(lines) if len(lines) > 1 else "" class CommuteTool(FastTool): """ Returns real-time driving time from home (Balashikha) to a destination using Yandex traffic data via the local routecheck service. Triggered by queries about commute time, arrival, or road traffic. The routecheck service handles Yandex API auth and the HTTPS proxy. """ _PATTERN = re.compile( r"\b(commute|arrival time|how long.{0,20}(drive|get|travel|reach)" r"|сколько.{0,20}(ехать|добираться|минут)" r"|пробки|traffic|road.{0,10}now|drive to (work|office|center|москва|moscow)" r"|when (will i|do i) (arrive|get there|reach))\b", re.IGNORECASE, ) # Home: Balashikha. Default destination: Moscow city center. _HOME = "55.7963,37.9382" _DEST = "55.7558,37.6173" def __init__(self, routecheck_url: str, internal_token: str): self._url = routecheck_url.rstrip("/") self._token = internal_token @property def name(self) -> str: return "commute" def matches(self, message: str) -> bool: return bool(self._PATTERN.search(message)) async def run(self, message: str) -> str: if not self._token: return "[commute: ROUTECHECK_TOKEN not configured]" try: async with httpx.AsyncClient(timeout=15) as client: r = await client.get( f"{self._url}/api/route", params={"from": self._HOME, "to": self._DEST, "token": self._token}, ) r.raise_for_status() d = r.json() except Exception as e: return f"[commute error: {e}]" traffic = d["duration_traffic_min"] normal = d["duration_min"] dist = d["distance_km"] delay = traffic - normal lines = [ f"Current drive time from Balashikha to Moscow center:", f" With traffic: {traffic} min", f" Without traffic: {normal} min", f" Distance: {dist} km", ] if delay > 5: lines.append(f" Traffic delay: +{delay} min") return "\n".join(lines) class FastToolRunner: """ Classifier + executor for fast tools. Used in two places: - Router.route(): any_matches() forces medium tier before LLM classification - run_agent_task(): run_matching() builds enrichment context in the pre-flight gather """ def __init__(self, tools: list[FastTool]): self._tools = tools def any_matches(self, message: str) -> bool: """True if any fast tool applies to this message.""" return any(t.matches(message) for t in self._tools) def matching_names(self, message: str) -> list[str]: """Names of tools that match this message (for logging).""" return [t.name for t in self._tools if t.matches(message)] async def run_matching(self, message: str) -> str: """Run all matching tools concurrently and return combined context.""" matching = [t for t in self._tools if t.matches(message)] if not matching: return "" results = await asyncio.gather(*[t.run(message) for t in matching]) parts = [r for r in results if r and not r.startswith("[")] return "\n\n".join(parts)