#!/usr/bin/env python3 """#86.2 — backfill: strip leaked Nevo preamble/ratio from already-ingested rulings. Court rulings ingested BEFORE the #86.1 fix kept their Nevo preamble (bibliography + מיני-רציו) because the old ``_DECISION_START`` regex only matched ועדת-ערר openings, not ``פסק-דין``/judge openings. For those rows the preamble is baked into the stored ``full_text`` AND into the chunks — and the מיני-רציו (Nevo's editorial answer-key) may have leaked into extracted halachot, contaminating the corpus. This script finds every case_law row whose stored ``full_text`` would still be shortened by the CURRENT ``strip_nevo_preamble`` (i.e. a pre-fix leak), and: 1. captures the מיני-רציו into ``case_law.nevo_ratio`` (gold-set for #86.3), unless that column is already populated; 2. rewrites ``full_text`` to the stripped body + recomputes ``content_hash``; 3. re-chunks + re-embeds via ``ingest.reindex_case_law`` (no re-OCR, no LLM); 4. flags — never deletes — halachot whose supporting_quote lives entirely in the removed preamble region: review_status -> 'pending_review' plus a 'nevo_preamble_leak' quality_flag, so the chair can re-judge them (#84). DRY-RUN BY DEFAULT. ``--apply`` performs the migration and first writes a JSON backup + CSV manifest to ``data/audit/`` (per the code-protocol data-migration rule). Idempotent: a re-run finds nothing because stripped rows no longer match. Run with the MCP server venv (config loads ~/.env / Infisical for POSTGRES + VOYAGE, same as the live MCP tools): cd ~/legal-ai/mcp-server .venv/bin/python ../scripts/backfill_nevo_preamble.py # dry-run .venv/bin/python ../scripts/backfill_nevo_preamble.py --apply # migrate .venv/bin/python ../scripts/backfill_nevo_preamble.py --limit 3 # smoke """ from __future__ import annotations import argparse import asyncio import csv import json import sys from datetime import datetime, timezone from pathlib import Path from legal_mcp.services import db, ingest from legal_mcp.services.extractor import extract_nevo_ratio, strip_nevo_preamble from legal_mcp.services.halacha_quality import normalize_text REPO_ROOT = Path(__file__).resolve().parent.parent AUDIT_DIR = REPO_ROOT / "data" / "audit" # Safety: a clean strip removes only the Nevo preamble (a small head). If the # strip would discard more than this fraction of the document, treat it as a # suspected over-strip (a citation/heading false-match) and DO NOT auto-apply # — surface it for manual review instead. Destroying real decision body is # far worse than leaving a preamble in place. DEFAULT_MIN_KEEP_PCT = 60 async def _scan(conn, limit: int | None) -> list[dict]: """Return rows whose stored full_text still carries a Nevo preamble.""" rows = await conn.fetch( "SELECT id, case_number, full_text, nevo_ratio " "FROM case_law WHERE full_text <> '' ORDER BY case_number" ) hits: list[dict] = [] for r in rows: full = r["full_text"] or "" stripped = strip_nevo_preamble(full) if stripped == full: continue # no leak (already clean, or never had a preamble) removed = full[: len(full) - len(stripped)] ratio = extract_nevo_ratio(full) keep_pct = round(100 * len(stripped) / len(full)) if full else 0 hits.append({ "id": r["id"], "case_number": r["case_number"], "full_text": full, "stripped": stripped, "removed": removed, "ratio": ratio, "keep_pct": keep_pct, "had_ratio_stored": bool((r["nevo_ratio"] or "").strip()), }) if limit and len(hits) >= limit: break return hits async def _contaminated_halachot(conn, case_law_id, removed: str) -> list[dict]: """Halachot whose supporting_quote sits entirely inside the removed preamble.""" norm_removed = normalize_text(removed) if not norm_removed: return [] rows = await conn.fetch( "SELECT id, halacha_index, supporting_quote, review_status, quality_flags " "FROM halachot WHERE case_law_id = $1", case_law_id, ) bad = [] for r in rows: q = normalize_text(r["supporting_quote"] or "") if len(q) >= 20 and q in norm_removed: bad.append(dict(r)) return bad async def main(args: argparse.Namespace) -> int: ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") pool = await db.get_pool() async with pool.acquire() as conn: hits = await _scan(conn, args.limit) for h in hits: h["contaminated"] = await _contaminated_halachot(conn, h["id"], h["removed"]) # Partition into safe (auto-appliable) vs suspicious (manual review). for h in hits: h["suspicious"] = h["keep_pct"] < args.min_keep safe = [h for h in hits if not h["suspicious"]] suspicious = [h for h in hits if h["suspicious"]] n = len(hits) total_contam = sum(len(h["contaminated"]) for h in hits) print(f"leaked rulings found: {n} (contaminated halachot: {total_contam}; " f"safe: {len(safe)}, suspicious<{args.min_keep}%: {len(suspicious)})", flush=True) for h in hits: print( f" {'⚠ ' if h['suspicious'] else ' '}{h['case_number']}: " f"keep {h['keep_pct']}%, -{len(h['removed']):,} preamble chars, " f"ratio={len(h['ratio'])} chars, " f"{len(h['contaminated'])} contaminated halachot" + ("" if h["ratio"] else " [no mini-ratio]") + (" [ratio already stored]" if h["had_ratio_stored"] else ""), flush=True, ) if suspicious: print(f"\n⚠ {len(suspicious)} ruling(s) below {args.min_keep}% keep — " "EXCLUDED from --apply (suspected over-strip). Review manually or " "pass --include-suspicious to force.", flush=True) if not hits: print("nothing to backfill — corpus clean ✓", flush=True) return 0 apply_set = hits if args.include_suspicious else safe # Always write a manifest (dry-run included) for the audit trail. AUDIT_DIR.mkdir(parents=True, exist_ok=True) manifest = AUDIT_DIR / f"nevo-backfill-manifest-{ts}.csv" with manifest.open("w", encoding="utf-8", newline="") as f: w = csv.writer(f) w.writerow(["case_law_id", "case_number", "keep_pct", "preamble_chars", "ratio_chars", "contaminated_halachot", "suspicious", "applied"]) for h in hits: will_apply = args.apply and (not h["suspicious"] or args.include_suspicious) w.writerow([h["id"], h["case_number"], h["keep_pct"], len(h["removed"]), len(h["ratio"]), len(h["contaminated"]), h["suspicious"], will_apply]) print(f"manifest: {manifest}", flush=True) if not args.apply: print("\nDRY-RUN — no changes written. Re-run with --apply to migrate.", flush=True) return 0 # Backup the BEFORE state before mutating anything. backup = AUDIT_DIR / f"nevo-backfill-backup-{ts}.json" with backup.open("w", encoding="utf-8") as f: json.dump([ { "id": str(h["id"]), "case_number": h["case_number"], "full_text": h["full_text"], "ratio": h["ratio"], "contaminated": [ {"id": str(c["id"]), "halacha_index": c["halacha_index"], "review_status": c["review_status"], "quality_flags": list(c["quality_flags"] or [])} for c in h["contaminated"] ], } for h in apply_set ], f, ensure_ascii=False, indent=2) print(f"backup: {backup}", flush=True) n_apply = len(apply_set) ok, failed = 0, [] for i, h in enumerate(apply_set, 1): cid, cn = h["id"], h["case_number"] try: async with pool.acquire() as conn: async with conn.transaction(): # 1+2: rewrite full_text + content_hash; store ratio if absent. await conn.execute( "UPDATE case_law SET full_text = $2, content_hash = $3 WHERE id = $1", cid, h["stripped"], db._content_hash(h["stripped"]), ) if h["ratio"] and not h["had_ratio_stored"]: await conn.execute( "UPDATE case_law SET nevo_ratio = $2 WHERE id = $1", cid, h["ratio"], ) # 4: flag (never delete) contaminated halachot. for c in h["contaminated"]: flags = list(c["quality_flags"] or []) if "nevo_preamble_leak" not in flags: flags.append("nevo_preamble_leak") await conn.execute( "UPDATE halachot SET review_status = 'pending_review', " "quality_flags = $2 WHERE id = $1", c["id"], flags, ) # 3: reindex outside the txn (its own DELETE-then-INSERT + embeddings). res = await ingest.reindex_case_law(cid) ok += 1 print(f"[{i}/{n_apply}] OK {cn}: -> {res['chunks']} chunks, " f"{len(h['contaminated'])} halachot flagged", flush=True) except Exception as e: # noqa: BLE001 — per-row, keep going failed.append((cn, str(e))) print(f"[{i}/{n_apply}] FAIL {cn}: {e}", flush=True) print(f"\nDONE — {ok}/{n_apply} migrated, {len(failed)} failed" + (f", {len(suspicious)} suspicious skipped" if suspicious and not args.include_suspicious else ""), flush=True) for cn, e in failed: print(f" FAILED {cn}: {e}", flush=True) return 0 if not failed else 1 if __name__ == "__main__": ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--apply", action="store_true", help="perform the migration (default: dry-run)") ap.add_argument("--limit", type=int, default=None, help="process only the first N leaked rulings") ap.add_argument("--min-keep", type=int, default=DEFAULT_MIN_KEEP_PCT, help=f"min%% of doc that must remain after strip to auto-apply " f"(default {DEFAULT_MIN_KEEP_PCT}); lower = suspected over-strip") ap.add_argument("--include-suspicious", action="store_true", help="force --apply on rows below --min-keep (use with care)") args = ap.parse_args() sys.exit(asyncio.run(main(args)))