- Replace SearXNG search with direct open-meteo.com API call (no key needed) - WeatherTool now returns a ready-to-deliver reply string - agent.py: short-circuit router+LLM when fast tools return a result (tier=fast) - router.py: fast tool match no longer triggers light reply generation Weather latency: 105-190s → ~1s Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
189 lines
6.4 KiB
Python
189 lines
6.4 KiB
Python
"""
|
|
Fast Tools — pre-flight tools invoked by a classifier before the main LLM call.
|
|
|
|
Each FastTool has:
|
|
- matches(message) → bool : regex classifier that determines if this tool applies
|
|
- run(message) → str : async fetch that returns enrichment context
|
|
|
|
FastToolRunner holds a list of FastTools. The Router uses any_matches() to force
|
|
the tier to medium before LLM classification. run_agent_task() calls run_matching()
|
|
to build extra context that is injected into the system prompt.
|
|
|
|
To add a new fast tool:
|
|
1. Subclass FastTool, implement name/matches/run
|
|
2. Add an instance to the list passed to FastToolRunner in agent.py
|
|
"""
|
|
|
|
import asyncio
|
|
import re
|
|
from abc import ABC, abstractmethod
|
|
|
|
import httpx
|
|
|
|
|
|
class FastTool(ABC):
|
|
"""Base class for all pre-flight fast tools."""
|
|
|
|
@property
|
|
@abstractmethod
|
|
def name(self) -> str: ...
|
|
|
|
@abstractmethod
|
|
def matches(self, message: str) -> bool: ...
|
|
|
|
@abstractmethod
|
|
async def run(self, message: str) -> str: ...
|
|
|
|
|
|
_WMO_CODES = {
|
|
0: "clear sky", 1: "mainly clear", 2: "partly cloudy", 3: "overcast",
|
|
45: "fog", 48: "icy fog",
|
|
51: "light drizzle", 53: "drizzle", 55: "heavy drizzle",
|
|
61: "light rain", 63: "rain", 65: "heavy rain",
|
|
71: "light snow", 73: "snow", 75: "heavy snow", 77: "snow grains",
|
|
80: "light showers", 81: "showers", 82: "heavy showers",
|
|
85: "snow showers", 86: "heavy snow showers",
|
|
95: "thunderstorm", 96: "thunderstorm with hail", 99: "thunderstorm with heavy hail",
|
|
}
|
|
|
|
|
|
class WeatherTool(FastTool):
|
|
"""
|
|
Fetches current weather for Balashikha, Moscow region directly from open-meteo.com.
|
|
No API key required. Returns a ready-to-deliver reply — no LLM reformatting needed.
|
|
"""
|
|
|
|
_PATTERN = re.compile(
|
|
r"\b(weather|forecast|temperature|rain(ing)?|snow(ing)?|humidity|wind\s*speed"
|
|
r"|холодно|тепло|погода|прогноз погоды"
|
|
r"|how (hot|cold|warm) is it|what.?s the (weather|temp)|dress for the weather)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
_URL = (
|
|
"https://api.open-meteo.com/v1/forecast"
|
|
"?latitude=55.7963&longitude=37.9382"
|
|
"¤t=temperature_2m,apparent_temperature,relative_humidity_2m"
|
|
",wind_speed_10m,weather_code"
|
|
"&wind_speed_unit=ms"
|
|
)
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "weather"
|
|
|
|
def matches(self, message: str) -> bool:
|
|
return bool(self._PATTERN.search(message))
|
|
|
|
async def run(self, message: str) -> str:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
r = await client.get(self._URL)
|
|
r.raise_for_status()
|
|
c = r.json()["current"]
|
|
except Exception as e:
|
|
return f"[weather error: {e}]"
|
|
|
|
temp = c["temperature_2m"]
|
|
feels = c["apparent_temperature"]
|
|
humidity = c["relative_humidity_2m"]
|
|
wind = c["wind_speed_10m"]
|
|
condition = _WMO_CODES.get(c.get("weather_code", 0), "unknown")
|
|
|
|
return (
|
|
f"Balashikha: {condition}, {temp:+.0f}°C (feels like {feels:+.0f}°C), "
|
|
f"wind {wind:.1f} m/s, humidity {humidity}%."
|
|
)
|
|
|
|
|
|
class CommuteTool(FastTool):
|
|
"""
|
|
Returns real-time driving time from home (Balashikha) to a destination
|
|
using Yandex traffic data via the local routecheck service.
|
|
|
|
Triggered by queries about commute time, arrival, or road traffic.
|
|
The routecheck service handles Yandex API auth and the HTTPS proxy.
|
|
"""
|
|
|
|
_PATTERN = re.compile(
|
|
r"\b(commute|arrival time|how long.{0,20}(drive|get|travel|reach)"
|
|
r"|сколько.{0,20}(ехать|добираться|минут)"
|
|
r"|пробки|traffic|road.{0,10}now|drive to (work|office|center|москва|moscow)"
|
|
r"|when (will i|do i) (arrive|get there|reach))\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Home: Balashikha. Default destination: Moscow city center.
|
|
_HOME = "55.7963,37.9382"
|
|
_DEST = "55.7558,37.6173"
|
|
|
|
def __init__(self, routecheck_url: str, internal_token: str):
|
|
self._url = routecheck_url.rstrip("/")
|
|
self._token = internal_token
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "commute"
|
|
|
|
def matches(self, message: str) -> bool:
|
|
return bool(self._PATTERN.search(message))
|
|
|
|
async def run(self, message: str) -> str:
|
|
if not self._token:
|
|
return "[commute: ROUTECHECK_TOKEN not configured]"
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
r = await client.get(
|
|
f"{self._url}/api/route",
|
|
params={"from": self._HOME, "to": self._DEST, "token": self._token},
|
|
)
|
|
r.raise_for_status()
|
|
d = r.json()
|
|
except Exception as e:
|
|
return f"[commute error: {e}]"
|
|
|
|
traffic = d["duration_traffic_min"]
|
|
normal = d["duration_min"]
|
|
dist = d["distance_km"]
|
|
delay = traffic - normal
|
|
|
|
lines = [
|
|
f"Current drive time from Balashikha to Moscow center:",
|
|
f" With traffic: {traffic} min",
|
|
f" Without traffic: {normal} min",
|
|
f" Distance: {dist} km",
|
|
]
|
|
if delay > 5:
|
|
lines.append(f" Traffic delay: +{delay} min")
|
|
return "\n".join(lines)
|
|
|
|
|
|
class FastToolRunner:
|
|
"""
|
|
Classifier + executor for fast tools.
|
|
|
|
Used in two places:
|
|
- Router.route(): any_matches() forces medium tier before LLM classification
|
|
- run_agent_task(): run_matching() builds enrichment context in the pre-flight gather
|
|
"""
|
|
|
|
def __init__(self, tools: list[FastTool]):
|
|
self._tools = tools
|
|
|
|
def any_matches(self, message: str) -> bool:
|
|
"""True if any fast tool applies to this message."""
|
|
return any(t.matches(message) for t in self._tools)
|
|
|
|
def matching_names(self, message: str) -> list[str]:
|
|
"""Names of tools that match this message (for logging)."""
|
|
return [t.name for t in self._tools if t.matches(message)]
|
|
|
|
async def run_matching(self, message: str) -> str:
|
|
"""Run all matching tools concurrently and return combined context."""
|
|
matching = [t for t in self._tools if t.matches(message)]
|
|
if not matching:
|
|
return ""
|
|
results = await asyncio.gather(*[t.run(message) for t in matching])
|
|
parts = [r for r in results if r and not r.startswith("[")]
|
|
return "\n\n".join(parts)
|