#!/usr/bin/env python3 """#82.7 — offline CROSS-precedent halacha dedup (conservative, dry-run reporter). Dedup-on-insert (db.store_halachot_for_chunk) only compares within a single precedent — the 2026-06-03 audit showed cosine ≥0.90 is reliable only within-precedent. Across precedents the same principle legitimately recurs, so this batch job is deliberately STRICTER (cosine ≥0.95) and NON-DESTRUCTIVE: it only reports candidate cross-precedent near-duplicate pairs to a CSV for the chair to review. Nothing is skipped, merged, or deleted. Pairs are found with pgvector's exact cosine (``<=>``) per halacha against halachot in OTHER precedents; a secondary lexical check (Jaccard/Levenshtein) is reported alongside so the reviewer can tell "same rule" from "same topic". cd ~/legal-ai/mcp-server .venv/bin/python ../scripts/halacha_batch_reconcile.py # cosine ≥0.95 .venv/bin/python ../scripts/halacha_batch_reconcile.py --cosine 0.97 """ from __future__ import annotations import argparse import asyncio import csv import sys from datetime import datetime, timezone from pathlib import Path from legal_mcp.services import db, halacha_quality as hq REPO_ROOT = Path(__file__).resolve().parent.parent AUDIT_DIR = REPO_ROOT / "data" / "audit" async def main(args: argparse.Namespace) -> int: cosine = args.cosine max_dist = 1.0 - cosine statuses = ("approved", "published") if not args.include_pending else ( "approved", "published", "pending_review") pool = await db.get_pool() async with pool.acquire() as conn: rows = await conn.fetch( "SELECT h.id, h.case_law_id, cl.case_number, h.rule_statement " "FROM halachot h JOIN case_law cl ON cl.id = h.case_law_id " "WHERE h.embedding IS NOT NULL AND h.review_status = ANY($1::text[]) " "ORDER BY h.case_law_id, h.halacha_index", list(statuses), ) print(f"scanning {len(rows)} halachot for cross-precedent pairs " f"(cosine ≥ {cosine})...", flush=True) seen: set[frozenset] = set() pairs: list[dict] = [] for r in rows: # nearest neighbor in a DIFFERENT precedent nb = await conn.fetchrow( "SELECT h2.id, cl2.case_number, h2.rule_statement, " " (h2.embedding <=> (SELECT embedding FROM halachot WHERE id = $1)) AS dist " "FROM halachot h2 JOIN case_law cl2 ON cl2.id = h2.case_law_id " "WHERE h2.embedding IS NOT NULL AND h2.case_law_id <> $2 " " AND h2.review_status = ANY($3::text[]) " "ORDER BY h2.embedding <=> (SELECT embedding FROM halachot WHERE id = $1) " "LIMIT 1", r["id"], r["case_law_id"], list(statuses), ) if nb is None or float(nb["dist"]) > max_dist: continue key = frozenset({str(r["id"]), str(nb["id"])}) if key in seen: continue seen.add(key) pairs.append({ "case_a": r["case_number"], "id_a": r["id"], "rule_a": r["rule_statement"], "case_b": nb["case_number"], "id_b": nb["id"], "rule_b": nb["rule_statement"], "cosine": round(1.0 - float(nb["dist"]), 4), "jaccard": round(hq.jaccard_shingles(r["rule_statement"], nb["rule_statement"]), 3), "levenshtein": round(hq.normalized_levenshtein(r["rule_statement"], nb["rule_statement"]), 3), }) pairs.sort(key=lambda p: -p["cosine"]) print(f"found {len(pairs)} cross-precedent candidate pair(s)", flush=True) for p in pairs[:30]: print(f" cos={p['cosine']} jac={p['jaccard']} lev={p['levenshtein']} " f"{p['case_a']} ↔ {p['case_b']}: {p['rule_a'][:60]}...", flush=True) if pairs: ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") AUDIT_DIR.mkdir(parents=True, exist_ok=True) out = AUDIT_DIR / f"halacha-cross-precedent-{ts}.csv" with out.open("w", encoding="utf-8", newline="") as f: w = csv.DictWriter(f, fieldnames=list(pairs[0].keys())) w.writeheader() w.writerows(pairs) print(f"\nreport: {out} (review-only — nothing changed)", flush=True) return 0 if __name__ == "__main__": ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--cosine", type=float, default=0.95, help="min cosine for a cross-precedent candidate (default 0.95)") ap.add_argument("--include-pending", action="store_true", help="also scan pending_review halachot (default: approved/published only)") args = ap.parse_args() sys.exit(asyncio.run(main(args)))