Compare commits
1 Commits
fix/benchm
...
fix/tier-l
| Author | SHA1 | Date | |
|---|---|---|---|
| 8ef4897869 |
13
agent.py
13
agent.py
@@ -432,6 +432,7 @@ async def _run_agent_pipeline(
|
|||||||
session_id: str,
|
session_id: str,
|
||||||
tier_override: str | None = None,
|
tier_override: str | None = None,
|
||||||
dry_run: bool = False,
|
dry_run: bool = False,
|
||||||
|
tier_capture: list | None = None,
|
||||||
) -> AsyncGenerator[str, None]:
|
) -> AsyncGenerator[str, None]:
|
||||||
"""Core pipeline: pre-flight → routing → inference. Yields text chunks.
|
"""Core pipeline: pre-flight → routing → inference. Yields text chunks.
|
||||||
|
|
||||||
@@ -501,6 +502,8 @@ async def _run_agent_pipeline(
|
|||||||
else:
|
else:
|
||||||
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
|
print(f"[agent] tier={tier} message={clean_message[:60]!r}", flush=True)
|
||||||
tier = effective_tier
|
tier = effective_tier
|
||||||
|
if tier_capture is not None:
|
||||||
|
tier_capture.append(tier)
|
||||||
|
|
||||||
if tier == "light":
|
if tier == "light":
|
||||||
final_text = light_reply
|
final_text = light_reply
|
||||||
@@ -597,10 +600,9 @@ async def run_agent_task(
|
|||||||
history = _conversation_buffers.get(session_id, [])
|
history = _conversation_buffers.get(session_id, [])
|
||||||
final_text = None
|
final_text = None
|
||||||
actual_tier = "unknown"
|
actual_tier = "unknown"
|
||||||
|
tier_capture: list = []
|
||||||
|
|
||||||
# Patch pipeline to capture tier for logging
|
async for chunk in _run_agent_pipeline(message, history, session_id, dry_run=dry_run, tier_capture=tier_capture):
|
||||||
# We read it from logs post-hoc; capture via a wrapper
|
|
||||||
async for chunk in _run_agent_pipeline(message, history, session_id, dry_run=dry_run):
|
|
||||||
await _push_stream_chunk(session_id, chunk)
|
await _push_stream_chunk(session_id, chunk)
|
||||||
if final_text is None:
|
if final_text is None:
|
||||||
final_text = chunk
|
final_text = chunk
|
||||||
@@ -608,6 +610,7 @@ async def run_agent_task(
|
|||||||
final_text += chunk
|
final_text += chunk
|
||||||
|
|
||||||
await _end_stream(session_id)
|
await _end_stream(session_id)
|
||||||
|
actual_tier = tier_capture[0] if tier_capture else "unknown"
|
||||||
|
|
||||||
elapsed_ms = int((time.monotonic() - t0) * 1000)
|
elapsed_ms = int((time.monotonic() - t0) * 1000)
|
||||||
|
|
||||||
@@ -621,8 +624,8 @@ async def run_agent_task(
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[agent] delivery error (non-fatal): {e}", flush=True)
|
print(f"[agent] delivery error (non-fatal): {e}", flush=True)
|
||||||
|
|
||||||
print(f"[agent] replied in {elapsed_ms / 1000:.1f}s", flush=True)
|
print(f"[agent] replied in {elapsed_ms / 1000:.1f}s tier={actual_tier}", flush=True)
|
||||||
print(f"[agent] reply_text: {final_text[:200]}", flush=True)
|
print(f"[agent] reply_text: {final_text}", flush=True)
|
||||||
|
|
||||||
# Update conversation buffer
|
# Update conversation buffer
|
||||||
buf = _conversation_buffers.get(session_id, [])
|
buf = _conversation_buffers.get(session_id, [])
|
||||||
|
|||||||
@@ -120,7 +120,7 @@ def extract_tier_from_logs(logs_before: str, logs_after: str) -> str | None:
|
|||||||
"""Find new tier= lines that appeared after we sent the query."""
|
"""Find new tier= lines that appeared after we sent the query."""
|
||||||
before_lines = set(logs_before.splitlines())
|
before_lines = set(logs_before.splitlines())
|
||||||
new_lines = [l for l in logs_after.splitlines() if l not in before_lines]
|
new_lines = [l for l in logs_after.splitlines() if l not in before_lines]
|
||||||
for line in new_lines:
|
for line in reversed(new_lines):
|
||||||
m = re.search(r"tier=(\w+(?:\s*\(dry-run\))?)", line)
|
m = re.search(r"tier=(\w+(?:\s*\(dry-run\))?)", line)
|
||||||
if m:
|
if m:
|
||||||
tier_raw = m.group(1)
|
tier_raw = m.group(1)
|
||||||
@@ -203,7 +203,7 @@ async def run(queries: list[dict], dry_run: bool = False) -> list[dict]:
|
|||||||
|
|
||||||
print(f"{qid:>3} {expected:8} ", end="", flush=True)
|
print(f"{qid:>3} {expected:8} ", end="", flush=True)
|
||||||
|
|
||||||
logs_before = get_log_tail(300)
|
logs_before = get_log_tail(80)
|
||||||
t0 = time.monotonic()
|
t0 = time.monotonic()
|
||||||
|
|
||||||
ok_post = await post_message(client, qid, query_text, dry_run=send_dry)
|
ok_post = await post_message(client, qid, query_text, dry_run=send_dry)
|
||||||
@@ -225,7 +225,7 @@ async def run(queries: list[dict], dry_run: bool = False) -> list[dict]:
|
|||||||
|
|
||||||
# Now the query is done — check logs for tier
|
# Now the query is done — check logs for tier
|
||||||
await asyncio.sleep(0.3)
|
await asyncio.sleep(0.3)
|
||||||
logs_after = get_log_tail(300)
|
logs_after = get_log_tail(80)
|
||||||
actual = extract_tier_from_logs(logs_before, logs_after)
|
actual = extract_tier_from_logs(logs_before, logs_after)
|
||||||
|
|
||||||
elapsed = time.monotonic() - t0
|
elapsed = time.monotonic() - t0
|
||||||
|
|||||||
@@ -199,14 +199,13 @@ def parse_run_block(lines, msg_prefix):
|
|||||||
if txt:
|
if txt:
|
||||||
last_ai_text = txt
|
last_ai_text = txt
|
||||||
|
|
||||||
m = re.search(r"replied in ([\d.]+)s \(llm=([\d.]+)s, send=([\d.]+)s\)", line)
|
m = re.search(r"replied in ([\d.]+)s(?:\s+tier=(\w+))?", line)
|
||||||
if m:
|
if m:
|
||||||
tier_m = re.search(r"\btier=(\w+)", line)
|
tier = m.group(2) if m.group(2) else "unknown"
|
||||||
tier = tier_m.group(1) if tier_m else "unknown"
|
|
||||||
reply_data = {
|
reply_data = {
|
||||||
"reply_total": float(m.group(1)),
|
"reply_total": float(m.group(1)),
|
||||||
"llm": float(m.group(2)),
|
"llm": None,
|
||||||
"send": float(m.group(3)),
|
"send": None,
|
||||||
"tier": tier,
|
"tier": tier,
|
||||||
"reply_text": last_ai_text,
|
"reply_text": last_ai_text,
|
||||||
"memory_s": None,
|
"memory_s": None,
|
||||||
|
|||||||
Reference in New Issue
Block a user