diff --git a/agent.py b/agent.py index a90332f..c26f351 100644 --- a/agent.py +++ b/agent.py @@ -123,7 +123,7 @@ _memory_search_tool = None # Fast tools run before the LLM — classifier + context enricher _fast_tool_runner = FastToolRunner([ - WeatherTool(searxng_url=SEARXNG_URL), + WeatherTool(), CommuteTool(routecheck_url=ROUTECHECK_URL, internal_token=ROUTECHECK_TOKEN), ]) @@ -410,110 +410,121 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram if memories: enriched_history = [{"role": "system", "content": memories}] + enriched_history - tier, light_reply = await router.route(clean_message, enriched_history, force_complex) + # Short-circuit: fast tool result is already a complete reply — skip router+LLM + if fast_context and not force_complex and not url_context: + tier = "fast" + final_text = fast_context + llm_elapsed = time.monotonic() - t0 + names = _fast_tool_runner.matching_names(clean_message) + print(f"[agent] tier=fast tools={names} — delivering directly", flush=True) + await _push_stream_chunk(session_id, final_text) + await _end_stream(session_id) + else: + tier, light_reply = await router.route(clean_message, enriched_history, force_complex) - # Messages with URL content must be handled by at least medium tier - if url_context and tier == "light": - tier = "medium" - light_reply = None - print("[agent] URL in message → upgraded light→medium", flush=True) - print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True) + # Messages with URL content must be handled by at least medium tier + if url_context and tier == "light": + tier = "medium" + light_reply = None + print("[agent] URL in message → upgraded light→medium", flush=True) + print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True) - final_text = None - try: - if tier == "light": - final_text = light_reply - llm_elapsed = time.monotonic() - t0 - print(f"[agent] light path: answered by router", flush=True) - await _push_stream_chunk(session_id, final_text) - await _end_stream(session_id) + if tier != "fast": + final_text = None + try: + if tier == "light": + final_text = light_reply + llm_elapsed = time.monotonic() - t0 + print(f"[agent] light path: answered by router", flush=True) + await _push_stream_chunk(session_id, final_text) + await _end_stream(session_id) - elif tier == "medium": - system_prompt = MEDIUM_SYSTEM_PROMPT - if memories: - system_prompt = system_prompt + "\n\n" + memories - if url_context: - system_prompt = system_prompt + "\n\n" + url_context - if fast_context: - system_prompt = system_prompt + "\n\nLive web search results (use these to answer):\n\n" + fast_context - - # Stream tokens directly — filter out qwen3 blocks - in_think = False - response_parts = [] - async for chunk in medium_model.astream([ - {"role": "system", "content": system_prompt}, - *history, - {"role": "user", "content": clean_message}, - ]): - token = chunk.content or "" - if not token: - continue - if in_think: - if "" in token: - in_think = False - after = token.split("", 1)[1] - if after: - await _push_stream_chunk(session_id, after) - response_parts.append(after) - else: - if "" in token: - in_think = True - before = token.split("", 1)[0] - if before: - await _push_stream_chunk(session_id, before) - response_parts.append(before) - else: - await _push_stream_chunk(session_id, token) - response_parts.append(token) - - await _end_stream(session_id) - llm_elapsed = time.monotonic() - t0 - final_text = "".join(response_parts).strip() or None - - else: # complex - ok = await vram_manager.enter_complex_mode() - if not ok: - print("[agent] complex→medium fallback (eviction timeout)", flush=True) - tier = "medium" + elif tier == "medium": system_prompt = MEDIUM_SYSTEM_PROMPT if memories: system_prompt = system_prompt + "\n\n" + memories if url_context: system_prompt = system_prompt + "\n\n" + url_context - result = await medium_agent.ainvoke({ - "messages": [ - {"role": "system", "content": system_prompt}, - *history, - {"role": "user", "content": clean_message}, - ] - }) - else: - system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id) - if url_context: - system_prompt = system_prompt + "\n\n[Pre-fetched URL content from user's message:]\n" + url_context - result = await complex_agent.ainvoke({ - "messages": [ - {"role": "system", "content": system_prompt}, - *history, - {"role": "user", "content": clean_message}, - ] - }) - asyncio.create_task(vram_manager.exit_complex_mode()) + if fast_context: + system_prompt = system_prompt + "\n\nLive web search results (use these to answer):\n\n" + fast_context + # Stream tokens directly — filter out qwen3 blocks + in_think = False + response_parts = [] + async for chunk in medium_model.astream([ + {"role": "system", "content": system_prompt}, + *history, + {"role": "user", "content": clean_message}, + ]): + token = chunk.content or "" + if not token: + continue + if in_think: + if "" in token: + in_think = False + after = token.split("", 1)[1] + if after: + await _push_stream_chunk(session_id, after) + response_parts.append(after) + else: + if "" in token: + in_think = True + before = token.split("", 1)[0] + if before: + await _push_stream_chunk(session_id, before) + response_parts.append(before) + else: + await _push_stream_chunk(session_id, token) + response_parts.append(token) + + await _end_stream(session_id) + llm_elapsed = time.monotonic() - t0 + final_text = "".join(response_parts).strip() or None + + else: # complex + ok = await vram_manager.enter_complex_mode() + if not ok: + print("[agent] complex→medium fallback (eviction timeout)", flush=True) + tier = "medium" + system_prompt = MEDIUM_SYSTEM_PROMPT + if memories: + system_prompt = system_prompt + "\n\n" + memories + if url_context: + system_prompt = system_prompt + "\n\n" + url_context + result = await medium_agent.ainvoke({ + "messages": [ + {"role": "system", "content": system_prompt}, + *history, + {"role": "user", "content": clean_message}, + ] + }) + else: + system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id) + if url_context: + system_prompt = system_prompt + "\n\n[Pre-fetched URL content from user's message:]\n" + url_context + result = await complex_agent.ainvoke({ + "messages": [ + {"role": "system", "content": system_prompt}, + *history, + {"role": "user", "content": clean_message}, + ] + }) + asyncio.create_task(vram_manager.exit_complex_mode()) + + llm_elapsed = time.monotonic() - t0 + _log_messages(result) + final_text = _extract_final_text(result) + if final_text: + await _push_stream_chunk(session_id, final_text) + await _end_stream(session_id) + + except Exception as e: + import traceback llm_elapsed = time.monotonic() - t0 - _log_messages(result) - final_text = _extract_final_text(result) - if final_text: - await _push_stream_chunk(session_id, final_text) + print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True) + traceback.print_exc() await _end_stream(session_id) - except Exception as e: - import traceback - llm_elapsed = time.monotonic() - t0 - print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True) - traceback.print_exc() - await _end_stream(session_id) - # Deliver reply through the originating channel if final_text: t1 = time.monotonic() diff --git a/fast_tools.py b/fast_tools.py index ef3991c..704ddcf 100644 --- a/fast_tools.py +++ b/fast_tools.py @@ -35,13 +35,22 @@ class FastTool(ABC): async def run(self, message: str) -> str: ... +_WMO_CODES = { + 0: "clear sky", 1: "mainly clear", 2: "partly cloudy", 3: "overcast", + 45: "fog", 48: "icy fog", + 51: "light drizzle", 53: "drizzle", 55: "heavy drizzle", + 61: "light rain", 63: "rain", 65: "heavy rain", + 71: "light snow", 73: "snow", 75: "heavy snow", 77: "snow grains", + 80: "light showers", 81: "showers", 82: "heavy showers", + 85: "snow showers", 86: "heavy snow showers", + 95: "thunderstorm", 96: "thunderstorm with hail", 99: "thunderstorm with heavy hail", +} + + class WeatherTool(FastTool): """ - Fetches current weather for the user's location (Balashikha, Moscow region) - by querying SearXNG, which has external internet access. - - Triggered by any weather-related query. The Router also forces medium tier - when this tool matches so the richer model handles the injected data. + Fetches current weather for Balashikha, Moscow region directly from open-meteo.com. + No API key required. Returns a ready-to-deliver reply — no LLM reformatting needed. """ _PATTERN = re.compile( @@ -51,11 +60,13 @@ class WeatherTool(FastTool): re.IGNORECASE, ) - # Fixed query — always fetch home location weather - _SEARCH_QUERY = "погода Балашиха сейчас" # Russian query → Celsius sources - - def __init__(self, searxng_url: str): - self._searxng_url = searxng_url + _URL = ( + "https://api.open-meteo.com/v1/forecast" + "?latitude=55.7963&longitude=37.9382" + "¤t=temperature_2m,apparent_temperature,relative_humidity_2m" + ",wind_speed_10m,weather_code" + "&wind_speed_unit=ms" + ) @property def name(self) -> str: @@ -65,31 +76,24 @@ class WeatherTool(FastTool): return bool(self._PATTERN.search(message)) async def run(self, message: str) -> str: - """Query SearXNG for Balashikha weather and return current conditions snippet.""" try: - async with httpx.AsyncClient(timeout=15) as client: - r = await client.get( - f"{self._searxng_url}/search", - params={"q": self._SEARCH_QUERY, "format": "json"}, - ) + async with httpx.AsyncClient(timeout=10) as client: + r = await client.get(self._URL) r.raise_for_status() - items = r.json().get("results", [])[:5] + c = r.json()["current"] except Exception as e: return f"[weather error: {e}]" - if not items: - return "" + temp = c["temperature_2m"] + feels = c["apparent_temperature"] + humidity = c["relative_humidity_2m"] + wind = c["wind_speed_10m"] + condition = _WMO_CODES.get(c.get("weather_code", 0), "unknown") - # Prefer results whose snippets contain actual current conditions - lines = ["Current weather data for Balashikha, Moscow region (temperatures in °C):\n"] - for item in items: - snippet = item.get("content", "") - title = item.get("title", "") - url = item.get("url", "") - if snippet: - lines.append(f"[{title}]\n{snippet}\nSource: {url}\n") - - return "\n".join(lines) if len(lines) > 1 else "" + return ( + f"Balashikha: {condition}, {temp:+.0f}°C (feels like {feels:+.0f}°C), " + f"wind {wind:.1f} m/s, humidity {humidity}%." + ) class CommuteTool(FastTool): diff --git a/router.py b/router.py index a68e7c3..e3dde5f 100644 --- a/router.py +++ b/router.py @@ -92,7 +92,8 @@ class Router: if force_complex: return "complex", None - # Step 0a: force medium if any fast tool matches (live-data queries) + # Step 0a: fast tool match — agent.py short-circuits before reaching router + # This branch is only hit if force_complex=True with a fast-tool message (rare) if self._fast_tool_runner and self._fast_tool_runner.any_matches(message.strip()): names = self._fast_tool_runner.matching_names(message.strip()) print(f"[router] fast_tool_match={names} → medium", flush=True)