#!/usr/bin/env python3 """#81.7 — gold-set harness for halacha-extraction quality. Two modes — the human tagging in between is the only manual step: export — dump a stratified sample of halachot to a CSV with EMPTY label columns for חיים/דפנה to fill (is_holding, correct_type, quote_complete). Stratified across precedents and rule_types so the set isn't dominated by one ruling. score — read the tagged CSV back and measure each pure validator (compute_quality_flags / is_fact_dependent / is_quote_truncated / is_thin_restatement) against the human labels: precision, recall, F1 per validator + a confusion summary. This is the ground-truth #81.8 needs to recalibrate the auto-approve threshold. The validators here are the SAME ones the live extractor runs, imported directly — so the score reflects production behavior, not a reimplementation. cd ~/legal-ai/mcp-server .venv/bin/python ../scripts/halacha_goldset.py export --n 150 # ... חיים/דפנה fill is_holding / correct_type / quote_complete ... .venv/bin/python ../scripts/halacha_goldset.py score --in data/audit/halacha-goldset-.csv """ from __future__ import annotations import argparse import asyncio import csv import sys from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from legal_mcp.services import db, halacha_quality as hq REPO_ROOT = Path(__file__).resolve().parent.parent AUDIT_DIR = REPO_ROOT / "data" / "audit" # Columns the human fills. is_holding: 1 if a real generalizable holding, 0 if # obiter/application/fact-recitation/non-rule. correct_type: binding/interpretive/ # obiter/application. quote_complete: 1 if the quote is a whole, untruncated span. LABEL_COLS = ["is_holding", "correct_type", "quote_complete"] EXPORT_COLS = [ "id", "case_number", "halacha_index", "rule_type", "review_status", "confidence", "rule_statement", "supporting_quote", *LABEL_COLS, ] async def _export(n: int) -> int: rows = await db.list_halachot(limit=5000) # stratify: round-robin across (case_law_id, rule_type) buckets. buckets: dict = defaultdict(list) for r in rows: buckets[(r["case_law_id"], r.get("rule_type"))].append(r) sample: list[dict] = [] keys = list(buckets.values()) i = 0 while len(sample) < n and any(keys): b = keys[i % len(keys)] if b: sample.append(b.pop()) i += 1 if i > n * 50: break ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") AUDIT_DIR.mkdir(parents=True, exist_ok=True) out = AUDIT_DIR / f"halacha-goldset-{ts}.csv" with out.open("w", encoding="utf-8", newline="") as f: w = csv.DictWriter(f, fieldnames=EXPORT_COLS, extrasaction="ignore") w.writeheader() for r in sample: w.writerow({**{k: r.get(k, "") for k in EXPORT_COLS}, **{lc: "" for lc in LABEL_COLS}}) print(f"exported {len(sample)} halachot for tagging → {out}", flush=True) print(f"fill columns: {', '.join(LABEL_COLS)} (is_holding/quote_complete = 1/0)", flush=True) return 0 def _prf(tp: int, fp: int, fn: int) -> tuple[float, float, float]: p = tp / (tp + fp) if (tp + fp) else 0.0 r = tp / (tp + fn) if (tp + fn) else 0.0 f1 = 2 * p * r / (p + r) if (p + r) else 0.0 return round(p, 3), round(r, 3), round(f1, 3) def _score(path: Path) -> int: with path.open(encoding="utf-8") as f: rows = [r for r in csv.DictReader(f) if (r.get("is_holding") or "").strip() != ""] if not rows: print("no labeled rows (is_holding empty everywhere) — nothing to score", flush=True) return 1 # A validator FLAG is a prediction of "NOT a clean holding" (should be # rejected/reviewed). Ground truth NOT-holding = is_holding == 0. # We score each validator as a detector of not-holding. counters: dict[str, dict[str, int]] = defaultdict(lambda: {"tp": 0, "fp": 0, "fn": 0, "tn": 0}) def tally(name: str, predicted_bad: bool, truly_bad: bool): c = counters[name] if predicted_bad and truly_bad: c["tp"] += 1 elif predicted_bad and not truly_bad: c["fp"] += 1 elif not predicted_bad and truly_bad: c["fn"] += 1 else: c["tn"] += 1 for r in rows: rule = r.get("rule_statement", "") quote = r.get("supporting_quote", "") rtype = r.get("rule_type", "binding") quote_complete = (r.get("quote_complete") or "1").strip() not in ("0", "false", "") truly_not_holding = (r.get("is_holding") or "").strip() in ("0", "false") flags = hq.compute_quality_flags(rule, quote, "", quote_complete, rtype) tally("any_flag", bool(flags), truly_not_holding) tally("application", hq.FLAG_APPLICATION in flags, truly_not_holding) tally("non_decision", hq.FLAG_NON_DECISION in flags, truly_not_holding) tally("thin_restatement", hq.FLAG_THIN_RESTATEMENT in flags, truly_not_holding) # quote-truncation scored against quote_complete label specifically tally("truncated_quote", hq.is_quote_truncated(quote), not quote_complete) print(f"scored {len(rows)} labeled halachot\n", flush=True) print(f"{'validator':<18}{'P':>7}{'R':>7}{'F1':>7} tp/fp/fn/tn", flush=True) for name, c in counters.items(): p, rec, f1 = _prf(c["tp"], c["fp"], c["fn"]) print(f"{name:<18}{p:>7}{rec:>7}{f1:>7} " f"{c['tp']}/{c['fp']}/{c['fn']}/{c['tn']}", flush=True) return 0 async def main(args: argparse.Namespace) -> int: if args.mode == "export": return await _export(args.n) return _score(Path(args.infile)) if __name__ == "__main__": ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) sub = ap.add_subparsers(dest="mode", required=True) pe = sub.add_parser("export", help="dump a sample CSV for human tagging") pe.add_argument("--n", type=int, default=150, help="sample size (default 150)") ps = sub.add_parser("score", help="measure validators against a tagged CSV") ps.add_argument("--in", dest="infile", required=True, help="tagged CSV path") args = ap.parse_args() sys.exit(asyncio.run(main(args)))