WeatherTool: fetch open-meteo directly, skip LLM for fast tool replies
- Replace SearXNG search with direct open-meteo.com API call (no key needed) - WeatherTool now returns a ready-to-deliver reply string - agent.py: short-circuit router+LLM when fast tools return a result (tier=fast) - router.py: fast tool match no longer triggers light reply generation Weather latency: 105-190s → ~1s Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
197
agent.py
197
agent.py
@@ -123,7 +123,7 @@ _memory_search_tool = None
|
||||
|
||||
# Fast tools run before the LLM — classifier + context enricher
|
||||
_fast_tool_runner = FastToolRunner([
|
||||
WeatherTool(searxng_url=SEARXNG_URL),
|
||||
WeatherTool(),
|
||||
CommuteTool(routecheck_url=ROUTECHECK_URL, internal_token=ROUTECHECK_TOKEN),
|
||||
])
|
||||
|
||||
@@ -410,110 +410,121 @@ async def run_agent_task(message: str, session_id: str, channel: str = "telegram
|
||||
if memories:
|
||||
enriched_history = [{"role": "system", "content": memories}] + enriched_history
|
||||
|
||||
tier, light_reply = await router.route(clean_message, enriched_history, force_complex)
|
||||
# Short-circuit: fast tool result is already a complete reply — skip router+LLM
|
||||
if fast_context and not force_complex and not url_context:
|
||||
tier = "fast"
|
||||
final_text = fast_context
|
||||
llm_elapsed = time.monotonic() - t0
|
||||
names = _fast_tool_runner.matching_names(clean_message)
|
||||
print(f"[agent] tier=fast tools={names} — delivering directly", flush=True)
|
||||
await _push_stream_chunk(session_id, final_text)
|
||||
await _end_stream(session_id)
|
||||
else:
|
||||
tier, light_reply = await router.route(clean_message, enriched_history, force_complex)
|
||||
|
||||
# Messages with URL content must be handled by at least medium tier
|
||||
if url_context and tier == "light":
|
||||
tier = "medium"
|
||||
light_reply = None
|
||||
print("[agent] URL in message → upgraded light→medium", flush=True)
|
||||
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
|
||||
# Messages with URL content must be handled by at least medium tier
|
||||
if url_context and tier == "light":
|
||||
tier = "medium"
|
||||
light_reply = None
|
||||
print("[agent] URL in message → upgraded light→medium", flush=True)
|
||||
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
|
||||
|
||||
final_text = None
|
||||
try:
|
||||
if tier == "light":
|
||||
final_text = light_reply
|
||||
llm_elapsed = time.monotonic() - t0
|
||||
print(f"[agent] light path: answered by router", flush=True)
|
||||
await _push_stream_chunk(session_id, final_text)
|
||||
await _end_stream(session_id)
|
||||
if tier != "fast":
|
||||
final_text = None
|
||||
try:
|
||||
if tier == "light":
|
||||
final_text = light_reply
|
||||
llm_elapsed = time.monotonic() - t0
|
||||
print(f"[agent] light path: answered by router", flush=True)
|
||||
await _push_stream_chunk(session_id, final_text)
|
||||
await _end_stream(session_id)
|
||||
|
||||
elif tier == "medium":
|
||||
system_prompt = MEDIUM_SYSTEM_PROMPT
|
||||
if memories:
|
||||
system_prompt = system_prompt + "\n\n" + memories
|
||||
if url_context:
|
||||
system_prompt = system_prompt + "\n\n" + url_context
|
||||
if fast_context:
|
||||
system_prompt = system_prompt + "\n\nLive web search results (use these to answer):\n\n" + fast_context
|
||||
|
||||
# Stream tokens directly — filter out qwen3 <think> blocks
|
||||
in_think = False
|
||||
response_parts = []
|
||||
async for chunk in medium_model.astream([
|
||||
{"role": "system", "content": system_prompt},
|
||||
*history,
|
||||
{"role": "user", "content": clean_message},
|
||||
]):
|
||||
token = chunk.content or ""
|
||||
if not token:
|
||||
continue
|
||||
if in_think:
|
||||
if "</think>" in token:
|
||||
in_think = False
|
||||
after = token.split("</think>", 1)[1]
|
||||
if after:
|
||||
await _push_stream_chunk(session_id, after)
|
||||
response_parts.append(after)
|
||||
else:
|
||||
if "<think>" in token:
|
||||
in_think = True
|
||||
before = token.split("<think>", 1)[0]
|
||||
if before:
|
||||
await _push_stream_chunk(session_id, before)
|
||||
response_parts.append(before)
|
||||
else:
|
||||
await _push_stream_chunk(session_id, token)
|
||||
response_parts.append(token)
|
||||
|
||||
await _end_stream(session_id)
|
||||
llm_elapsed = time.monotonic() - t0
|
||||
final_text = "".join(response_parts).strip() or None
|
||||
|
||||
else: # complex
|
||||
ok = await vram_manager.enter_complex_mode()
|
||||
if not ok:
|
||||
print("[agent] complex→medium fallback (eviction timeout)", flush=True)
|
||||
tier = "medium"
|
||||
elif tier == "medium":
|
||||
system_prompt = MEDIUM_SYSTEM_PROMPT
|
||||
if memories:
|
||||
system_prompt = system_prompt + "\n\n" + memories
|
||||
if url_context:
|
||||
system_prompt = system_prompt + "\n\n" + url_context
|
||||
result = await medium_agent.ainvoke({
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
*history,
|
||||
{"role": "user", "content": clean_message},
|
||||
]
|
||||
})
|
||||
else:
|
||||
system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id)
|
||||
if url_context:
|
||||
system_prompt = system_prompt + "\n\n[Pre-fetched URL content from user's message:]\n" + url_context
|
||||
result = await complex_agent.ainvoke({
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
*history,
|
||||
{"role": "user", "content": clean_message},
|
||||
]
|
||||
})
|
||||
asyncio.create_task(vram_manager.exit_complex_mode())
|
||||
if fast_context:
|
||||
system_prompt = system_prompt + "\n\nLive web search results (use these to answer):\n\n" + fast_context
|
||||
|
||||
# Stream tokens directly — filter out qwen3 <think> blocks
|
||||
in_think = False
|
||||
response_parts = []
|
||||
async for chunk in medium_model.astream([
|
||||
{"role": "system", "content": system_prompt},
|
||||
*history,
|
||||
{"role": "user", "content": clean_message},
|
||||
]):
|
||||
token = chunk.content or ""
|
||||
if not token:
|
||||
continue
|
||||
if in_think:
|
||||
if "</think>" in token:
|
||||
in_think = False
|
||||
after = token.split("</think>", 1)[1]
|
||||
if after:
|
||||
await _push_stream_chunk(session_id, after)
|
||||
response_parts.append(after)
|
||||
else:
|
||||
if "<think>" in token:
|
||||
in_think = True
|
||||
before = token.split("<think>", 1)[0]
|
||||
if before:
|
||||
await _push_stream_chunk(session_id, before)
|
||||
response_parts.append(before)
|
||||
else:
|
||||
await _push_stream_chunk(session_id, token)
|
||||
response_parts.append(token)
|
||||
|
||||
await _end_stream(session_id)
|
||||
llm_elapsed = time.monotonic() - t0
|
||||
final_text = "".join(response_parts).strip() or None
|
||||
|
||||
else: # complex
|
||||
ok = await vram_manager.enter_complex_mode()
|
||||
if not ok:
|
||||
print("[agent] complex→medium fallback (eviction timeout)", flush=True)
|
||||
tier = "medium"
|
||||
system_prompt = MEDIUM_SYSTEM_PROMPT
|
||||
if memories:
|
||||
system_prompt = system_prompt + "\n\n" + memories
|
||||
if url_context:
|
||||
system_prompt = system_prompt + "\n\n" + url_context
|
||||
result = await medium_agent.ainvoke({
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
*history,
|
||||
{"role": "user", "content": clean_message},
|
||||
]
|
||||
})
|
||||
else:
|
||||
system_prompt = COMPLEX_SYSTEM_PROMPT.format(user_id=session_id)
|
||||
if url_context:
|
||||
system_prompt = system_prompt + "\n\n[Pre-fetched URL content from user's message:]\n" + url_context
|
||||
result = await complex_agent.ainvoke({
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
*history,
|
||||
{"role": "user", "content": clean_message},
|
||||
]
|
||||
})
|
||||
asyncio.create_task(vram_manager.exit_complex_mode())
|
||||
|
||||
llm_elapsed = time.monotonic() - t0
|
||||
_log_messages(result)
|
||||
final_text = _extract_final_text(result)
|
||||
if final_text:
|
||||
await _push_stream_chunk(session_id, final_text)
|
||||
await _end_stream(session_id)
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
llm_elapsed = time.monotonic() - t0
|
||||
_log_messages(result)
|
||||
final_text = _extract_final_text(result)
|
||||
if final_text:
|
||||
await _push_stream_chunk(session_id, final_text)
|
||||
print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True)
|
||||
traceback.print_exc()
|
||||
await _end_stream(session_id)
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
llm_elapsed = time.monotonic() - t0
|
||||
print(f"[agent] error after {llm_elapsed:.1f}s for chat {session_id}: {e}", flush=True)
|
||||
traceback.print_exc()
|
||||
await _end_stream(session_id)
|
||||
|
||||
# Deliver reply through the originating channel
|
||||
if final_text:
|
||||
t1 = time.monotonic()
|
||||
|
||||
@@ -35,13 +35,22 @@ class FastTool(ABC):
|
||||
async def run(self, message: str) -> str: ...
|
||||
|
||||
|
||||
_WMO_CODES = {
|
||||
0: "clear sky", 1: "mainly clear", 2: "partly cloudy", 3: "overcast",
|
||||
45: "fog", 48: "icy fog",
|
||||
51: "light drizzle", 53: "drizzle", 55: "heavy drizzle",
|
||||
61: "light rain", 63: "rain", 65: "heavy rain",
|
||||
71: "light snow", 73: "snow", 75: "heavy snow", 77: "snow grains",
|
||||
80: "light showers", 81: "showers", 82: "heavy showers",
|
||||
85: "snow showers", 86: "heavy snow showers",
|
||||
95: "thunderstorm", 96: "thunderstorm with hail", 99: "thunderstorm with heavy hail",
|
||||
}
|
||||
|
||||
|
||||
class WeatherTool(FastTool):
|
||||
"""
|
||||
Fetches current weather for the user's location (Balashikha, Moscow region)
|
||||
by querying SearXNG, which has external internet access.
|
||||
|
||||
Triggered by any weather-related query. The Router also forces medium tier
|
||||
when this tool matches so the richer model handles the injected data.
|
||||
Fetches current weather for Balashikha, Moscow region directly from open-meteo.com.
|
||||
No API key required. Returns a ready-to-deliver reply — no LLM reformatting needed.
|
||||
"""
|
||||
|
||||
_PATTERN = re.compile(
|
||||
@@ -51,11 +60,13 @@ class WeatherTool(FastTool):
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
# Fixed query — always fetch home location weather
|
||||
_SEARCH_QUERY = "погода Балашиха сейчас" # Russian query → Celsius sources
|
||||
|
||||
def __init__(self, searxng_url: str):
|
||||
self._searxng_url = searxng_url
|
||||
_URL = (
|
||||
"https://api.open-meteo.com/v1/forecast"
|
||||
"?latitude=55.7963&longitude=37.9382"
|
||||
"¤t=temperature_2m,apparent_temperature,relative_humidity_2m"
|
||||
",wind_speed_10m,weather_code"
|
||||
"&wind_speed_unit=ms"
|
||||
)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
@@ -65,31 +76,24 @@ class WeatherTool(FastTool):
|
||||
return bool(self._PATTERN.search(message))
|
||||
|
||||
async def run(self, message: str) -> str:
|
||||
"""Query SearXNG for Balashikha weather and return current conditions snippet."""
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
r = await client.get(
|
||||
f"{self._searxng_url}/search",
|
||||
params={"q": self._SEARCH_QUERY, "format": "json"},
|
||||
)
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
r = await client.get(self._URL)
|
||||
r.raise_for_status()
|
||||
items = r.json().get("results", [])[:5]
|
||||
c = r.json()["current"]
|
||||
except Exception as e:
|
||||
return f"[weather error: {e}]"
|
||||
|
||||
if not items:
|
||||
return ""
|
||||
temp = c["temperature_2m"]
|
||||
feels = c["apparent_temperature"]
|
||||
humidity = c["relative_humidity_2m"]
|
||||
wind = c["wind_speed_10m"]
|
||||
condition = _WMO_CODES.get(c.get("weather_code", 0), "unknown")
|
||||
|
||||
# Prefer results whose snippets contain actual current conditions
|
||||
lines = ["Current weather data for Balashikha, Moscow region (temperatures in °C):\n"]
|
||||
for item in items:
|
||||
snippet = item.get("content", "")
|
||||
title = item.get("title", "")
|
||||
url = item.get("url", "")
|
||||
if snippet:
|
||||
lines.append(f"[{title}]\n{snippet}\nSource: {url}\n")
|
||||
|
||||
return "\n".join(lines) if len(lines) > 1 else ""
|
||||
return (
|
||||
f"Balashikha: {condition}, {temp:+.0f}°C (feels like {feels:+.0f}°C), "
|
||||
f"wind {wind:.1f} m/s, humidity {humidity}%."
|
||||
)
|
||||
|
||||
|
||||
class CommuteTool(FastTool):
|
||||
|
||||
@@ -92,7 +92,8 @@ class Router:
|
||||
if force_complex:
|
||||
return "complex", None
|
||||
|
||||
# Step 0a: force medium if any fast tool matches (live-data queries)
|
||||
# Step 0a: fast tool match — agent.py short-circuits before reaching router
|
||||
# This branch is only hit if force_complex=True with a fast-tool message (rare)
|
||||
if self._fast_tool_runner and self._fast_tool_runner.any_matches(message.strip()):
|
||||
names = self._fast_tool_runner.matching_names(message.strip())
|
||||
print(f"[router] fast_tool_match={names} → medium", flush=True)
|
||||
|
||||
Reference in New Issue
Block a user