#!/usr/bin/env python3 """ Wiki Research Pipeline — searches the web for each person/place in the family wiki. Uses Adolf's complex agent (/think prefix → qwen3:8b + web_search) to research each subject and aggregates findings into research.md. Usage: python3 wiki_research.py [--subject "Name"] [--dry-run] [--timeout 300] [--output PATH] """ import argparse import json import re import sys import time import urllib.parse import urllib.request from datetime import datetime from pathlib import Path # ── config ───────────────────────────────────────────────────────────────────── GATEWAY = "http://localhost:8000" WIKI_ROOT = Path("/mnt/ssd/dbs/otter/app-data/repository") DEFAULT_OUTPUT = WIKI_ROOT / "research.md" PASS = "\033[32mPASS\033[0m" FAIL = "\033[31mFAIL\033[0m" INFO = "\033[36mINFO\033[0m" # ── helpers ──────────────────────────────────────────────────────────────────── def post_message(text: str, session_id: str, timeout: int = 10) -> int: payload = json.dumps({ "text": text, "session_id": session_id, "channel": "cli", "user_id": "wiki-pipeline", }).encode() req = urllib.request.Request( f"{GATEWAY}/message", data=payload, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=timeout) as r: return r.status def wait_for_reply(label: str, session_id: str, timeout_s: int = 300) -> str | None: """Open SSE stream on /reply/{session_id} and return reply text, or None on timeout.""" req = urllib.request.Request( f"{GATEWAY}/reply/{urllib.parse.quote(session_id, safe='')}", headers={"Accept": "text/event-stream"}, ) t0 = time.monotonic() tick = 0 deadline = t0 + timeout_s # Show progress while waiting (SSE blocks until reply is ready) print(f"\r [{label}] waiting... ", end="", flush=True) try: with urllib.request.urlopen(req, timeout=timeout_s + 30) as r: for raw_line in r: elapsed = time.monotonic() - t0 line = raw_line.decode("utf-8").rstrip("\n") if line.startswith("data:"): text = line[5:].strip().replace("\\n", "\n") print(f"\r [{label}] done after {elapsed:.0f}s{' ' * 30}") if text == "[timeout]": return None return text tick += 1 rem = int(deadline - time.monotonic()) print(f"\r [{label}] {elapsed:.0f}s elapsed, {rem}s left — waiting... ", end="", flush=True) except Exception as e: print(f"\r [{label}] SSE error: {e}{' ' * 30}") print(f"\r [{label}] TIMEOUT after {timeout_s}s{' ' * 30}") return None # ── wiki parsing ─────────────────────────────────────────────────────────────── def slugify(name: str) -> str: s = name.lower() s = re.sub(r"[^\w\s-]", "", s) s = re.sub(r"\s+", "-", s.strip()) return s[:60] def parse_wiki_file(path: Path): try: text = path.read_text(encoding="utf-8") except Exception: return None lines = text.splitlines() name = None context_parts = [] for line in lines[:50]: stripped = line.strip() if not name and stripped.startswith("# "): name = stripped[2:].strip() continue if name: if stripped.startswith("[![") or stripped.startswith("!["): continue if stripped: context_parts.append(stripped) if len(context_parts) >= 20: break if not name: return None return name, "\n".join(context_parts) def discover_subjects(wiki_root: Path): subjects = [] for subdir in ["люди", "места"]: folder = wiki_root / subdir if not folder.exists(): continue for md_file in sorted(folder.glob("*.md")): result = parse_wiki_file(md_file) if result: name, context = result subjects.append((name, context, subdir)) return subjects # ── output ───────────────────────────────────────────────────────────────────── def load_existing_names(output_path: Path) -> set: if not output_path.exists(): return set() return set(re.findall(r"^## (.+)$", output_path.read_text(encoding="utf-8"), re.MULTILINE)) def init_output(output_path: Path, total: int): if not output_path.exists(): output_path.write_text( f"# Wiki Research Results\n\n" f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n" f"Subjects: {total}\n\n---\n\n", encoding="utf-8", ) def append_result(output_path: Path, name: str, elapsed: float, reply_text: str): date_str = datetime.now().strftime("%Y-%m-%d") block = ( f"## {name}\n\n" f"**Searched**: {date_str} **Elapsed**: {elapsed:.0f}s\n\n" f"{reply_text or '_No reply captured._'}\n\n---\n\n" ) with open(output_path, "a", encoding="utf-8") as f: f.write(block) # ── research prompt ──────────────────────────────────────────────────────────── def build_prompt(name: str, context: str, subdir: str) -> str: kind = "person" if subdir == "люди" else "place" return ( f"/think You are researching a {kind} for a private family wiki. " f"Find everything publicly available. Be thorough and specific.\n\n" f"**Subject**: {name}\n" f"**Known context** (from the family wiki — do NOT just repeat this):\n{context}\n\n" f"**Research instructions** (MUST follow exactly):\n" f"1. Call web_search, then IMMEDIATELY call fetch_url on every URL found in results.\n" f"2. You MUST call fetch_url at least 5 times — do not write the report until you have.\n" f"3. Priority URLs to fetch: Google Scholar profile, ResearchGate, IEEE Xplore, LinkedIn, employer page.\n" f"4. Run searches in English AND Russian/Latvian.\n" f"5. After fetching pages, derive follow-up searches from what you find.\n\n" f"**Output format** (required):\n" f"- Use markdown with sections: Overview, Education, Career, Publications, " f"Online Presence, Interesting Findings, Not Found\n" f"- Every fact must have a source link: [fact](url)\n" f"- Include actual URLs to profiles, papers, articles found\n" f"- 'Interesting Findings': non-trivial facts not in the wiki context above\n" f"- Last line must be: **Sources checked: N** (count of URLs you fetched with fetch_url)\n\n" f'If truly nothing is found publicly, say "No public information found." ' f"but only after exhausting all search angles." ) # ── main ─────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Wiki research pipeline") parser.add_argument("--subject", help="Single subject (substring match)") parser.add_argument("--dry-run", action="store_true", help="Print prompts, don't send") parser.add_argument("--timeout", type=int, default=300, help="Per-subject timeout (s)") parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT, help="Output file") args = parser.parse_args() subjects = discover_subjects(WIKI_ROOT) if not subjects: print(f"[{FAIL}] No subjects found in {WIKI_ROOT}") sys.exit(1) print(f"[{INFO}] Discovered {len(subjects)} subjects") if args.subject: needle = args.subject.lower() subjects = [(n, c, s) for n, c, s in subjects if needle in n.lower()] if not subjects: print(f"[{FAIL}] No subject matching '{args.subject}'") sys.exit(1) print(f"[{INFO}] Filtered to {len(subjects)} subject(s)") if args.dry_run: for name, context, subdir in subjects: print(f"\n{'='*60}\nSUBJECT: {name} ({subdir})") print(f"PROMPT:\n{build_prompt(name, context, subdir)}") return init_output(args.output, len(subjects)) existing = load_existing_names(args.output) print(f"[{INFO}] Output: {args.output} ({len(existing)} already done)") total = len(subjects) done = 0 failed = [] for idx, (name, context, subdir) in enumerate(subjects, 1): if name in existing: print(f"[{idx}/{total}] SKIP {name} (already in output)") done += 1 continue prompt = build_prompt(name, context, subdir) session_id = f"wiki-{slugify(name)}" label = f"{idx}/{total}" print(f"\n[{label}] {name}") try: status = post_message(prompt, session_id, timeout=10) if status != 202: print(f" [{FAIL}] Unexpected status {status}") failed.append(name) continue except Exception as e: print(f" [{FAIL}] POST failed: {e}") failed.append(name) continue t0 = time.monotonic() reply_text = wait_for_reply(label, session_id, timeout_s=args.timeout) elapsed = time.monotonic() - t0 if reply_text is None: print(f" [{FAIL}] Timeout") failed.append(name) append_result(args.output, name, elapsed, "_Research timed out._") continue print(f" [{PASS}] {elapsed:.0f}s — {len(reply_text)} chars") append_result(args.output, name, elapsed, reply_text) done += 1 print(f"\n{'='*60}") print(f"Done: {done}/{total}") if failed: print(f"Failed ({len(failed)}): {', '.join(failed)}") print(f"Output: {args.output}") if __name__ == "__main__": main()