routecheck/ — FastAPI service (port 8090): - Image captcha (PIL: arithmetic problem, noise, wave distortion) - POST /api/captcha/new + /api/captcha/solve → short-lived token - GET /api/route?from=lat,lon&to=lat,lon&token=... → Yandex Routing API - Internal bypass via INTERNAL_TOKEN env var (for CommuteTool) - HTTPS proxy forwarded to reach Yandex API from container CommuteTool (fast_tools.py): - Matches commute/traffic/arrival time queries - Calls routecheck /api/route with ROUTECHECK_TOKEN - Hardcoded route: Balashikha home → Moscow center - Returns traffic-adjusted travel time + delay annotation Needs: YANDEX_ROUTING_KEY + ROUTECHECK_TOKEN in .env Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
378 lines
14 KiB
Python
378 lines
14 KiB
Python
"""
|
||
RouteCheck — local routing web service with image captcha.
|
||
|
||
Endpoints:
|
||
GET / — web UI
|
||
GET /captcha/image/{id} — PNG captcha image
|
||
POST /api/captcha/new — create captcha, return {id}
|
||
POST /api/captcha/solve — {id, answer} → {token} or 400
|
||
GET /api/route — ?from=lat,lon&to=lat,lon&token=...
|
||
token = solved captcha token OR INTERNAL_TOKEN env var
|
||
"""
|
||
|
||
import io
|
||
import math
|
||
import os
|
||
import random
|
||
import string
|
||
import time
|
||
import uuid
|
||
from typing import Optional
|
||
|
||
import httpx
|
||
from fastapi import FastAPI, HTTPException, Query
|
||
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
|
||
from PIL import Image, ImageDraw, ImageFilter, ImageFont
|
||
from pydantic import BaseModel
|
||
|
||
app = FastAPI(title="RouteCheck")
|
||
|
||
# ── Config ─────────────────────────────────────────────────────────────────────
|
||
YANDEX_KEY = os.getenv("YANDEX_ROUTING_KEY", "")
|
||
INTERNAL_TOKEN = os.getenv("INTERNAL_TOKEN", "")
|
||
HTTPS_PROXY = os.getenv("HTTPS_PROXY", "")
|
||
CAPTCHA_TTL = 300 # seconds a captcha is valid
|
||
TOKEN_TTL = 3600 # seconds a solved token is valid
|
||
|
||
# ── In-memory captcha store ────────────────────────────────────────────────────
|
||
_captchas: dict[str, dict] = {} # id → {answer, token, expires}
|
||
_tokens: dict[str, float] = {} # token → expires
|
||
|
||
|
||
def _purge():
|
||
now = time.time()
|
||
for k in list(_captchas.keys()):
|
||
if _captchas[k]["expires"] < now:
|
||
del _captchas[k]
|
||
for k in list(_tokens.keys()):
|
||
if _tokens[k] < now:
|
||
del _tokens[k]
|
||
|
||
|
||
# ── Captcha image generation ───────────────────────────────────────────────────
|
||
|
||
def _rand_color(dark=False):
|
||
if dark:
|
||
return tuple(random.randint(0, 100) for _ in range(3))
|
||
return tuple(random.randint(140, 255) for _ in range(3))
|
||
|
||
|
||
def _make_captcha_image(text: str) -> bytes:
|
||
W, H = 220, 80
|
||
img = Image.new("RGB", (W, H), color=_rand_color())
|
||
draw = ImageDraw.Draw(img)
|
||
|
||
# Background noise: random lines
|
||
for _ in range(8):
|
||
x1, y1 = random.randint(0, W), random.randint(0, H)
|
||
x2, y2 = random.randint(0, W), random.randint(0, H)
|
||
draw.line([(x1, y1), (x2, y2)], fill=_rand_color(dark=True), width=2)
|
||
|
||
# Background noise: random dots
|
||
for _ in range(300):
|
||
x, y = random.randint(0, W), random.randint(0, H)
|
||
draw.point((x, y), fill=_rand_color(dark=True))
|
||
|
||
# Draw each character with slight random offset and rotation
|
||
try:
|
||
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 36)
|
||
except Exception:
|
||
font = ImageFont.load_default()
|
||
|
||
char_w = W // (len(text) + 2)
|
||
for i, ch in enumerate(text):
|
||
x = char_w + i * char_w + random.randint(-4, 4)
|
||
y = (H - 40) // 2 + random.randint(-6, 6)
|
||
# Draw shadow
|
||
draw.text((x + 2, y + 2), ch, font=font, fill=_rand_color(dark=True))
|
||
draw.text((x, y), ch, font=font, fill=_rand_color(dark=True))
|
||
|
||
# Wavy distortion via pixel manipulation
|
||
pixels = img.load()
|
||
for x in range(W):
|
||
shift = int(4 * math.sin(x / 15.0))
|
||
col = [pixels[x, y] for y in range(H)]
|
||
for y in range(H):
|
||
pixels[x, y] = col[(y - shift) % H]
|
||
|
||
img = img.filter(ImageFilter.SMOOTH)
|
||
|
||
buf = io.BytesIO()
|
||
img.save(buf, format="PNG")
|
||
return buf.getvalue()
|
||
|
||
|
||
def _generate_problem() -> tuple[str, int]:
|
||
"""Return (display_text, answer)."""
|
||
ops = [
|
||
lambda a, b: (f"{a} + {b} = ?", a + b),
|
||
lambda a, b: (f"{a} × {b} = ?", a * b),
|
||
lambda a, b: (f"{max(a,b)} − {min(a,b)} = ?", max(a, b) - min(a, b)),
|
||
]
|
||
op = random.choice(ops)
|
||
a, b = random.randint(2, 9), random.randint(2, 9)
|
||
text, answer = op(a, b)
|
||
return text, answer
|
||
|
||
|
||
# ── Routes ─────────────────────────────────────────────────────────────────────
|
||
|
||
@app.get("/", response_class=HTMLResponse)
|
||
async def index():
|
||
return HTML_PAGE
|
||
|
||
|
||
@app.get("/captcha/image/{captcha_id}")
|
||
async def captcha_image(captcha_id: str):
|
||
_purge()
|
||
entry = _captchas.get(captcha_id)
|
||
if not entry:
|
||
raise HTTPException(404, "Captcha not found or expired")
|
||
png = _make_captcha_image(entry["problem"])
|
||
return StreamingResponse(io.BytesIO(png), media_type="image/png",
|
||
headers={"Cache-Control": "no-store"})
|
||
|
||
|
||
class CaptchaNewResponse(BaseModel):
|
||
id: str
|
||
|
||
|
||
@app.post("/api/captcha/new")
|
||
async def captcha_new():
|
||
_purge()
|
||
problem_text, answer = _generate_problem()
|
||
cid = str(uuid.uuid4())
|
||
_captchas[cid] = {
|
||
"problem": problem_text,
|
||
"answer": answer,
|
||
"expires": time.time() + CAPTCHA_TTL,
|
||
}
|
||
return {"id": cid}
|
||
|
||
|
||
class SolveRequest(BaseModel):
|
||
id: str
|
||
answer: int
|
||
|
||
|
||
@app.post("/api/captcha/solve")
|
||
async def captcha_solve(req: SolveRequest):
|
||
_purge()
|
||
entry = _captchas.get(req.id)
|
||
if not entry:
|
||
raise HTTPException(400, "Captcha expired or not found")
|
||
if entry["answer"] != req.answer:
|
||
raise HTTPException(400, "Wrong answer")
|
||
token = str(uuid.uuid4())
|
||
_tokens[token] = time.time() + TOKEN_TTL
|
||
del _captchas[req.id]
|
||
return {"token": token}
|
||
|
||
|
||
@app.get("/api/route")
|
||
async def route(
|
||
from_coords: str = Query(..., alias="from", description="lat,lon"),
|
||
to_coords: str = Query(..., alias="to", description="lat,lon"),
|
||
token: str = Query(...),
|
||
):
|
||
_purge()
|
||
|
||
# Auth: internal service token or valid captcha token
|
||
if token != INTERNAL_TOKEN:
|
||
if token not in _tokens:
|
||
raise HTTPException(401, "Invalid or expired token — solve captcha first")
|
||
|
||
if not YANDEX_KEY:
|
||
raise HTTPException(503, "YANDEX_ROUTING_KEY not configured")
|
||
|
||
# Parse coords
|
||
try:
|
||
from_lat, from_lon = map(float, from_coords.split(","))
|
||
to_lat, to_lon = map(float, to_coords.split(","))
|
||
except ValueError:
|
||
raise HTTPException(400, "coords must be lat,lon")
|
||
|
||
# Yandex Routing API expects lon,lat order
|
||
waypoints = f"{from_lon},{from_lat}|{to_lon},{to_lat}"
|
||
|
||
transport = httpx.AsyncHTTPTransport(proxy=HTTPS_PROXY) if HTTPS_PROXY else None
|
||
async with httpx.AsyncClient(timeout=15, transport=transport) as client:
|
||
try:
|
||
r = await client.get(
|
||
"https://api.routing.yandex.net/v2/route",
|
||
params={"apikey": YANDEX_KEY, "waypoints": waypoints, "mode": "driving"},
|
||
)
|
||
except Exception as e:
|
||
raise HTTPException(502, f"Yandex API unreachable: {e}")
|
||
|
||
if r.status_code != 200:
|
||
raise HTTPException(502, f"Yandex API error {r.status_code}: {r.text[:200]}")
|
||
|
||
data = r.json()
|
||
try:
|
||
leg = data["route"]["legs"][0]
|
||
duration_s = leg["duration"]
|
||
duration_traffic_s = leg.get("duration_in_traffic", duration_s)
|
||
distance_m = leg["distance"]
|
||
except (KeyError, IndexError) as e:
|
||
raise HTTPException(502, f"Unexpected Yandex response: {e} — {str(data)[:200]}")
|
||
|
||
return {
|
||
"duration_min": round(duration_s / 60),
|
||
"duration_traffic_min": round(duration_traffic_s / 60),
|
||
"distance_km": round(distance_m / 1000, 1),
|
||
}
|
||
|
||
|
||
# ── HTML ───────────────────────────────────────────────────────────────────────
|
||
|
||
HTML_PAGE = """<!DOCTYPE html>
|
||
<html lang="en">
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<meta name="viewport" content="width=device-width, initial-scale=1">
|
||
<title>RouteCheck</title>
|
||
<style>
|
||
* { box-sizing: border-box; margin: 0; padding: 0; }
|
||
body { font-family: system-ui, sans-serif; background: #0f172a; color: #e2e8f0; min-height: 100vh;
|
||
display: flex; align-items: center; justify-content: center; }
|
||
.card { background: #1e293b; border-radius: 12px; padding: 2rem; width: 420px;
|
||
box-shadow: 0 20px 60px rgba(0,0,0,.5); }
|
||
h1 { font-size: 1.4rem; font-weight: 700; color: #38bdf8; margin-bottom: .3rem; }
|
||
.sub { color: #94a3b8; font-size: .85rem; margin-bottom: 1.5rem; }
|
||
label { display: block; font-size: .8rem; color: #94a3b8; margin-bottom: .3rem; margin-top: 1rem; }
|
||
input { width: 100%; background: #0f172a; border: 1px solid #334155; border-radius: 6px;
|
||
color: #e2e8f0; padding: .55rem .75rem; font-size: .95rem; outline: none; }
|
||
input:focus { border-color: #38bdf8; }
|
||
button { width: 100%; margin-top: 1.2rem; padding: .7rem; background: #0ea5e9;
|
||
border: none; border-radius: 6px; color: #fff; font-size: 1rem;
|
||
font-weight: 600; cursor: pointer; transition: background .2s; }
|
||
button:hover { background: #0284c7; }
|
||
button:disabled { background: #334155; cursor: default; }
|
||
.captcha-row { display: flex; gap: .75rem; align-items: center; margin-top: 1rem; }
|
||
.captcha-row img { border-radius: 6px; border: 1px solid #334155; cursor: pointer; }
|
||
.captcha-row input { flex: 1; }
|
||
.result { margin-top: 1.2rem; background: #0f172a; border-radius: 8px; padding: 1rem;
|
||
border-left: 3px solid #38bdf8; display: none; }
|
||
.result .big { font-size: 1.6rem; font-weight: 700; color: #38bdf8; }
|
||
.result .label { font-size: .8rem; color: #64748b; margin-top: .2rem; }
|
||
.result .row { display: flex; gap: 1.5rem; margin-top: .8rem; }
|
||
.result .metric { flex: 1; }
|
||
.result .metric .val { font-size: 1.1rem; font-weight: 600; }
|
||
.error { color: #f87171; margin-top: .8rem; font-size: .85rem; display: none; }
|
||
.step { display: none; }
|
||
.step.active { display: block; }
|
||
a.refresh { font-size: .75rem; color: #38bdf8; text-decoration: none; display: block;
|
||
margin-top: .4rem; }
|
||
a.refresh:hover { text-decoration: underline; }
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="card">
|
||
<h1>RouteCheck</h1>
|
||
<p class="sub">Real-time driving time with Yandex traffic data</p>
|
||
|
||
<!-- Step 1: captcha -->
|
||
<div class="step active" id="step-captcha">
|
||
<label>Prove you are human</label>
|
||
<div class="captcha-row">
|
||
<img id="captcha-img" src="" alt="captcha" width="160" height="60"
|
||
title="Click to refresh" onclick="loadCaptcha()">
|
||
<input id="captcha-ans" type="number" placeholder="Answer" min="0" max="999">
|
||
</div>
|
||
<a class="refresh" href="#" onclick="loadCaptcha();return false;">↻ New challenge</a>
|
||
<div class="error" id="captcha-err">Wrong answer, try again.</div>
|
||
<button id="captcha-btn" onclick="solveCaptcha()">Verify →</button>
|
||
</div>
|
||
|
||
<!-- Step 2: route query -->
|
||
<div class="step" id="step-route">
|
||
<label>From (lat, lon)</label>
|
||
<input id="from" type="text" placeholder="55.7963, 37.9382" value="55.7963, 37.9382">
|
||
<label>To (lat, lon)</label>
|
||
<input id="to" type="text" placeholder="55.7558, 37.6173" value="55.7558, 37.6173">
|
||
<button id="route-btn" onclick="queryRoute()">Get travel time</button>
|
||
<div class="error" id="route-err"></div>
|
||
<div class="result" id="result">
|
||
<div class="big" id="res-traffic"></div>
|
||
<div class="label">with current traffic</div>
|
||
<div class="row">
|
||
<div class="metric"><div class="val" id="res-normal"></div>
|
||
<div class="label">without traffic</div></div>
|
||
<div class="metric"><div class="val" id="res-dist"></div>
|
||
<div class="label">distance</div></div>
|
||
</div>
|
||
</div>
|
||
</div>
|
||
</div>
|
||
|
||
<script>
|
||
let captchaId = null;
|
||
let routeToken = null;
|
||
|
||
async function loadCaptcha() {
|
||
const r = await fetch('/api/captcha/new', {method: 'POST'});
|
||
const d = await r.json();
|
||
captchaId = d.id;
|
||
document.getElementById('captcha-img').src = '/captcha/image/' + captchaId + '?t=' + Date.now();
|
||
document.getElementById('captcha-ans').value = '';
|
||
document.getElementById('captcha-err').style.display = 'none';
|
||
}
|
||
|
||
async function solveCaptcha() {
|
||
const ans = parseInt(document.getElementById('captcha-ans').value);
|
||
if (isNaN(ans)) return;
|
||
const btn = document.getElementById('captcha-btn');
|
||
btn.disabled = true;
|
||
const r = await fetch('/api/captcha/solve', {
|
||
method: 'POST',
|
||
headers: {'Content-Type': 'application/json'},
|
||
body: JSON.stringify({id: captchaId, answer: ans})
|
||
});
|
||
if (r.ok) {
|
||
const d = await r.json();
|
||
routeToken = d.token;
|
||
document.getElementById('step-captcha').classList.remove('active');
|
||
document.getElementById('step-route').classList.add('active');
|
||
} else {
|
||
document.getElementById('captcha-err').style.display = 'block';
|
||
loadCaptcha();
|
||
}
|
||
btn.disabled = false;
|
||
}
|
||
|
||
async function queryRoute() {
|
||
const from = document.getElementById('from').value.trim();
|
||
const to = document.getElementById('to').value.trim();
|
||
const btn = document.getElementById('route-btn');
|
||
const err = document.getElementById('route-err');
|
||
err.style.display = 'none';
|
||
document.getElementById('result').style.display = 'none';
|
||
btn.disabled = true;
|
||
btn.textContent = 'Fetching…';
|
||
const r = await fetch(`/api/route?from=${encodeURIComponent(from)}&to=${encodeURIComponent(to)}&token=${routeToken}`);
|
||
btn.disabled = false;
|
||
btn.textContent = 'Get travel time';
|
||
if (!r.ok) {
|
||
const d = await r.json();
|
||
err.textContent = d.detail || 'Error';
|
||
err.style.display = 'block';
|
||
return;
|
||
}
|
||
const d = await r.json();
|
||
document.getElementById('res-traffic').textContent = d.duration_traffic_min + ' min';
|
||
document.getElementById('res-normal').textContent = d.duration_min + ' min';
|
||
document.getElementById('res-dist').textContent = d.distance_km + ' km';
|
||
document.getElementById('result').style.display = 'block';
|
||
}
|
||
|
||
loadCaptcha();
|
||
|
||
document.getElementById('captcha-ans').addEventListener('keydown', e => {
|
||
if (e.key === 'Enter') solveCaptcha();
|
||
});
|
||
</script>
|
||
</body>
|
||
</html>
|
||
"""
|