#!/usr/bin/env python3 """#86.2 / #86.3 — audit the corpus for Nevo editorial-preamble leakage, and benchmark our extracted halachot against Nevo's מיני-רציו (free expert ground-truth). Two modes (read-only — no DB mutation, no re-ingest): leak (#86.2) — scan precedent_chunks AND halachot for Nevo editorial markers (_NEVO_MARKERS from extractor) that may have leaked in for rulings ingested BEFORE the #86.1 strip fix. Distinguishes the HARMFUL vector (markers inside extracted halachot — editorial ratio mistaken for a holding) from the benign one (a citation-list chunk). Writes a CSV report to data/audit/. Does NOT re-ingest: the knowledge layer (halachot) is the only vector that matters, and re-OCR retrofit is counter-indicated (non-deterministic OCR — see memory feedback_no_reocr_retrofit); chunk-level citation-lists are low-harm. benchmark (#86.3) — for rulings whose מיני-רציו was captured (case_law.nevo_ratio), ask the tri-model panel which of the ratio's holdings are covered by OUR extracted halachot → recall (coverage), plus a granularity ratio (our holdings / ratio holdings). Nevo's ratio is an independent expert summary, so this is a free quality signal that complements the #81.7 gold-set. Run locally (benchmark needs claude_session CLI + DeepSeek/Gemini keys): cd ~/legal-ai/mcp-server .venv/bin/python ../scripts/nevo_corpus_audit.py leak .venv/bin/python ../scripts/nevo_corpus_audit.py benchmark --limit 5 """ from __future__ import annotations import argparse import asyncio import csv import json import sys from datetime import datetime, timezone from pathlib import Path import httpx from legal_mcp.services import claude_session, db, extractor sys.path.insert(0, str(Path(__file__).resolve().parent)) from halacha_panel_approve import judge_deepseek, judge_gemini # noqa: E402 AUDIT = Path(__file__).resolve().parent.parent / "data" / "audit" # editorial-holdings markers (the harmful family) vs benign citation lists _EDITORIAL = ("מיני-רציו", "מבזק") def _has_marker(text: str) -> bool: return any(m.rstrip(":") in (text or "") for m in extractor._NEVO_MARKERS) def _has_editorial(text: str) -> bool: return any(m in (text or "") for m in _EDITORIAL) async def run_leak(args) -> int: pool = await db.get_pool() # chunks carrying any Nevo marker chunk_rows = await pool.fetch( "SELECT cl.id, cl.case_number, cl.source_type, pc.content " "FROM precedent_chunks pc JOIN case_law cl ON cl.id = pc.case_law_id" ) # halachot — the ONLY harmful vector (ratio mistaken for a holding) hal_rows = await pool.fetch( "SELECT cl.case_number, h.rule_statement, h.supporting_quote " "FROM halachot h JOIN case_law cl ON cl.id = h.case_law_id" ) per_case: dict = {} for r in chunk_rows: if _has_marker(r["content"]): d = per_case.setdefault(r["case_number"], { "source_type": r["source_type"], "marker_chunks": 0, "editorial_chunks": 0, "marker_halachot": 0}) d["marker_chunks"] += 1 if _has_editorial(r["content"]): d["editorial_chunks"] += 1 contaminated_halachot = 0 for r in hal_rows: if _has_marker(r["rule_statement"]) or _has_marker(r["supporting_quote"]): contaminated_halachot += 1 d = per_case.setdefault(r["case_number"], { "source_type": "?", "marker_chunks": 0, "editorial_chunks": 0, "marker_halachot": 0}) d["marker_halachot"] += 1 ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") AUDIT.mkdir(parents=True, exist_ok=True) out = AUDIT / f"nevo-leak-audit-{ts}.csv" with out.open("w", encoding="utf-8", newline="") as f: w = csv.writer(f) w.writerow(["case_number", "source_type", "marker_chunks", "editorial_chunks", "marker_halachot"]) for cn, d in sorted(per_case.items(), key=lambda kv: -kv[1]["marker_halachot"]): w.writerow([cn, d["source_type"], d["marker_chunks"], d["editorial_chunks"], d["marker_halachot"]]) editorial_cases = sum(1 for d in per_case.values() if d["editorial_chunks"]) print(f"affected rulings (any marker): {len(per_case)}") print(f" with EDITORIAL ratio chunks (מיני-רציו/מבזק): {editorial_cases}") print(f" HALACHOT contaminated (the harmful vector): {contaminated_halachot}") print(f"\nreport → {out}") if contaminated_halachot == 0: print("✓ knowledge layer clean — no halacha carries editorial ratio; " "no purge/re-ingest warranted (chunk citation-lists are benign, " "and re-OCR retrofit is counter-indicated).") else: print("⚠ contaminated halachot found — review the CSV; these need " "targeted re-extraction (NOT bulk re-OCR).") # Safe backfill (--apply): capture the מיני-רציו into case_law.nevo_ratio for # pre-#86.1 rulings that have it in their stored text but never extracted it. # Deterministic — runs extract_nevo_ratio on the STORED full_text (no re-OCR); # writes only the additive nevo_ratio field (the ground-truth for #86.3), # never touches chunks or halachot. "Capture, don't delete" (#86.3). missing = await pool.fetch( "SELECT id, case_number, full_text FROM case_law " "WHERE COALESCE(nevo_ratio,'') = '' AND full_text LIKE '%מיני-רציו%'" ) backfillable = [(r, extractor.extract_nevo_ratio(r["full_text"] or "")) for r in missing] backfillable = [(r, ratio) for r, ratio in backfillable if ratio] print(f"\nnevo_ratio backfill candidates (ratio in text, field empty): {len(backfillable)}") if not args.apply: print("(report-only — pass --apply to capture nevo_ratio for these)") return 0 for r, ratio in backfillable: await db.update_case_law(r["id"], nevo_ratio=ratio) print(f"✓ captured nevo_ratio for {len(backfillable)} rulings (additive; " f"chunks/halachot untouched).") return 0 _BENCH_SYSTEM = ( "אתה בוחן-איכות משפטי. נתון 'מיני-רציו' של נבו (סיכום-העריכה של העקרונות שנקבעו " "בפסק) ורשימת ההלכות שמערכת חילצה מאותו פסק. הכרע, לכל עיקרון ברציו, האם הוא " "מכוסה ע\"י לפחות הלכה אחת שחולצה (אותו עיקרון משפטי, גם אם בניסוח שונה). " 'החזר JSON בלבד: {"ratio_points": <מספר עקרונות ברציו>, "covered": <כמה מהם מכוסים>, ' '"missing": ["<עיקרון שלא כוסה>", ...]}. ללא markdown.' ) def _bench_user(ratio: str, halachot: list[str]) -> str: ours = "\n".join(f"- {h}" for h in halachot) or "(אין)" return f"מיני-רציו של נבו:\n{ratio}\n\nהלכות שחולצו אצלנו:\n{ours}" async def run_benchmark(args) -> int: pool = await db.get_pool() cases = await pool.fetch( "SELECT id, case_number, nevo_ratio FROM case_law " "WHERE COALESCE(nevo_ratio,'') <> '' ORDER BY case_number" ) if args.limit: cases = cases[: args.limit] print(f"rulings with stored nevo_ratio: {len(cases)}\n", flush=True) results = [] async with httpx.AsyncClient() as client: for c in cases: hs = await pool.fetch( "SELECT rule_statement FROM halachot WHERE case_law_id = $1 " "AND review_status IN ('approved','published','pending_review')", c["id"]) ours = [r["rule_statement"] for r in hs] user = _bench_user(c["nevo_ratio"], ours) # panel: claude + deepseek + gemini, take the median 'covered/ratio_points' async def _claude(): try: return await claude_session.query_json(user, system=_BENCH_SYSTEM, effort="low") except Exception: return None cj, dj, gj = await asyncio.gather( _claude(), judge_deepseek(client, _BENCH_SYSTEM, user), judge_gemini(client, _BENCH_SYSTEM, user)) recalls = [] for v in (cj, dj, gj): if isinstance(v, dict) and v.get("ratio_points"): try: recalls.append(int(v["covered"]) / int(v["ratio_points"])) except (ValueError, ZeroDivisionError, TypeError): pass recall = sorted(recalls)[len(recalls) // 2] if recalls else None results.append({"case": c["case_number"], "our_halachot": len(ours), "panel_recall": round(recall, 3) if recall is not None else None}) print(f" {c['case_number']}: ours={len(ours)} recall={results[-1]['panel_recall']}", flush=True) measured = [r["panel_recall"] for r in results if r["panel_recall"] is not None] mean_recall = round(sum(measured) / len(measured), 3) if measured else None print(f"\nmean ratio-coverage recall (n={len(measured)}): {mean_recall}") ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") out = AUDIT / f"nevo-ratio-benchmark-{ts}.json" out.write_text(json.dumps({"mean_recall": mean_recall, "results": results}, ensure_ascii=False, indent=2)) print(f"report → {out}") return 0 if __name__ == "__main__": ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) sub = ap.add_subparsers(dest="mode", required=True) lk = sub.add_parser("leak") lk.add_argument("--apply", action="store_true", help="capture nevo_ratio for pre-fix rulings (additive; no re-OCR)") b = sub.add_parser("benchmark") b.add_argument("--limit", type=int, default=0) args = ap.parse_args() fn = {"leak": run_leak, "benchmark": run_benchmark}[args.mode] raise SystemExit(asyncio.run(fn(args)))