#!/usr/bin/env python3 """#82.1 — calibrate the lexical dedup thresholds against the cleanup gold-set. The 2026-06-03 cleanup manifest (data/audit/halacha-cleanup-manifest-*.csv) records, for each removed halacha, a ``reason`` and a ``survivor_id`` — i.e. a human-labeled set of TRUE duplicate pairs (deleted rule ↔ its survivor). This script uses them to validate the lexical near-duplicate thresholds introduced in #82.3 (``HALACHA`` Jaccard/Levenshtein), so the numbers in ``halacha_quality.lexical_near_duplicate`` are calibrated, not guessed. It sweeps (jaccard_min × levenshtein_min) and reports precision/recall against: * positives — duplicate-labeled pairs (deleted rule ↔ survivor rule) * negatives — random non-paired rules from the same manifest (≈all distinct) and marks the currently-configured operating point. cd ~/legal-ai/mcp-server .venv/bin/python ../scripts/calibrate_halacha_dedup.py \ --manifest ../data/audit/halacha-cleanup-manifest-20260603T101747Z.csv """ from __future__ import annotations import argparse import asyncio import csv import sys from pathlib import Path from uuid import UUID from legal_mcp.services import db, halacha_quality as hq async def _survivor_text(survivor_id: str, manifest_map: dict) -> str: if survivor_id in manifest_map: return manifest_map[survivor_id] try: row = await db.get_halacha(UUID(survivor_id)) if hasattr(db, "get_halacha") else None except Exception: row = None if row: return row.get("rule_statement", "") # fallback: direct query try: pool = await db.get_pool() r = await pool.fetchrow("SELECT rule_statement FROM halachot WHERE id = $1", UUID(survivor_id)) return r["rule_statement"] if r else "" except Exception: return "" async def main(args: argparse.Namespace) -> int: path = Path(args.manifest) if not path.is_absolute(): path = (Path.cwd() / path).resolve() with path.open(encoding="utf-8") as f: rows = list(csv.DictReader(f)) by_id = {r["id"]: r.get("rule_statement", "") for r in rows} positives: list[tuple[str, str]] = [] for r in rows: if "duplicate" in (r.get("reason") or "").lower() and r.get("survivor_id"): a = r.get("rule_statement", "") b = await _survivor_text(r["survivor_id"], by_id) if a and b: positives.append((a, b)) # negatives: pair each deleted rule with a different, non-survivor rule. rules = [r.get("rule_statement", "") for r in rows if r.get("rule_statement")] negatives: list[tuple[str, str]] = [] for i in range(len(positives)): a = rules[i % len(rules)] b = rules[(i * 7 + 3) % len(rules)] # deterministic spread, no RNG if a and b and a != b: negatives.append((a, b)) print(f"positives (labeled dup pairs): {len(positives)} " f"negatives: {len(negatives)}", flush=True) if not positives: print("no labeled duplicate pairs found in manifest — cannot calibrate", flush=True) return 1 # precompute lexical scores per pair def scores(pairs): return [(hq.jaccard_shingles(a, b), hq.normalized_levenshtein(a, b)) for a, b in pairs] pos_s, neg_s = scores(positives), scores(negatives) print(f"\n{'jac_min':>8}{'lev_min':>8}{'P':>8}{'R':>8}{'F1':>8}", flush=True) best = None for jm in (0.40, 0.45, 0.50, 0.55, 0.60, 0.65, 0.70): for lm in (0.60, 0.65, 0.70, 0.75, 0.80, 0.85): tp = sum(1 for j, l in pos_s if j >= jm or l >= lm) fp = sum(1 for j, l in neg_s if j >= jm or l >= lm) fn = len(pos_s) - tp p = tp / (tp + fp) if (tp + fp) else 0.0 r = tp / (tp + fn) if (tp + fn) else 0.0 f1 = 2 * p * r / (p + r) if (p + r) else 0.0 mark = " <- configured" if (abs(jm - hq._LEX_JACCARD_MIN) < 1e-9 and abs(lm - hq._LEX_LEVENSHTEIN_MIN) < 1e-9) else "" if mark: print(f"{jm:>8.2f}{lm:>8.2f}{p:>8.3f}{r:>8.3f}{f1:>8.3f}{mark}", flush=True) if best is None or f1 > best[0]: best = (f1, jm, lm, p, r) print(f"\nbest F1={best[0]:.3f} at jaccard_min={best[1]}, levenshtein_min={best[2]} " f"(P={best[3]:.3f}, R={best[4]:.3f})", flush=True) print("note: positives may include obiter/application cuts (not pure dups); " "use precision as the guard against false-merges.", flush=True) return 0 if __name__ == "__main__": ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--manifest", required=True, help="path to halacha-cleanup-manifest-*.csv") args = ap.parse_args() sys.exit(asyncio.run(main(args)))