#!/usr/bin/env python3 """#81.7 — label the halacha gold-set by TRI-MODEL CONSENSUS (no man-in-the-loop). Chair directive (2026-06-11): replace manual chair/Dafna tagging of the gold-set with the agreement of three INDEPENDENT model lineages. This is the same panel the live approval triage uses (``halacha_panel_approve.py``), proven to agree on the coarse "is this a real, keepable rule?" axis across models (92%): - claude (Opus via claude_session — local CLI, zero marginal cost) [Anthropic] - deepseek (api.deepseek.com) [DeepSeek] - gemini (gemini-2.5-flash) [Google] Why this is NOT circular: the validators measured downstream (#81.8 — compute_quality_flags / is_fact_dependent / is_quote_truncated / is_thin_restatement) are RULE-BASED heuristics, a different method family from the LLM judges. Two honesty guards: 1. SPLIT vote (no 2/3 agreement) writes NO ground-truth label — the item stays NULL and escalates to the chair (INV-G10). 2. ANONYMIZATION probe — every item is re-judged with the case identifier masked/faked; if the consensus flips, the verdict was keying on the identifier (memorization), not the legal reasoning. Reported as a stability rate (arXiv:2505.02172). Reuses the model callers from halacha_panel_approve and the rich is_holding+type prompt from goldset_ai_recommend — single source, no parallel path (G2). Run locally (claude_session needs the CLI; DeepSeek/Gemini keys from ~/.env): cd ~/legal-ai/mcp-server .venv/bin/python ../scripts/goldset_panel_label.py --limit 8 # smoke .venv/bin/python ../scripts/goldset_panel_label.py # full, with anon .venv/bin/python ../scripts/goldset_panel_label.py --no-anon # skip the anon probe """ from __future__ import annotations import argparse import asyncio import json import re import sys from collections import Counter from datetime import datetime, timezone from pathlib import Path from uuid import UUID import httpx from legal_mcp.services import claude_session, db # Reuse the model callers (DeepSeek/Gemini HTTP) and the rich gold-set prompt — # importing them keeps ONE source of truth for each (G2). sys.path.insert(0, str(Path(__file__).resolve().parent)) from halacha_panel_approve import judge_deepseek, judge_gemini # noqa: E402 from goldset_ai_recommend import SYSTEM, VALID_TYPES, _prompt # noqa: E402 # ── consensus aggregation (pure — unit-tested) ──────────────────────────────── def consensus(votes: list[bool | None]) -> tuple[bool | None, str]: """Majority of the (up to three) is_holding votes. Returns ``(consensus_bool_or_None, agreement_tag)`` where the tag is one of '3/3' / '2/3' / 'split' / 'incomplete'. A consensus is returned only on a real majority; 'split' and 'incomplete' return None (→ chair). """ valid = [v for v in votes if v is not None] if len(valid) < 2: return None, "incomplete" yes = sum(1 for v in valid if v) no = len(valid) - yes if yes == no: return None, "split" decision = yes > no if len(valid) == 3 and (yes == 3 or no == 3): return decision, "3/3" return decision, "2/3" def consensus_type(per_model: list[dict | None], decided: bool | None) -> str: """Most-common rule_type among the models, constrained to be consistent with the is_holding consensus (holding/interpretive/procedural ↔ True; application/obiter ↔ False). '' when undecided or no agreement.""" if decided is None: return "" consistent = ( {"holding", "interpretive", "procedural"} if decided else {"application", "obiter"} ) types = [ str(d.get("type") or "") for d in per_model if isinstance(d, dict) and str(d.get("type") or "") in consistent ] if not types: return "" return Counter(types).most_common(1)[0][0] def fleiss_kappa(rows: list[tuple[int, int]]) -> float | None: """Fleiss' kappa for binary ratings (yes_count, no_count) per item. Only items rated by ALL raters (here: 3) should be passed. Returns None if there isn't enough data. Standard formula (Fleiss 1971).""" rows = [(y, n) for (y, n) in rows if (y + n) > 0] N = len(rows) if N == 0: return None n = rows[0][0] + rows[0][1] if n < 2 or any((y + n_) != n for (y, n_) in rows): return None # ragged rater counts — not well-defined # P_i: agreement within item i P = [(y * (y - 1) + nn * (nn - 1)) / (n * (n - 1)) for (y, nn) in rows] Pbar = sum(P) / N # p_j: marginal proportion per category p_yes = sum(y for (y, _) in rows) / (N * n) p_no = sum(nn for (_, nn) in rows) / (N * n) Pe = p_yes ** 2 + p_no ** 2 if Pe >= 1.0: return 1.0 # degenerate (all one category) → perfect by convention return (Pbar - Pe) / (1 - Pe) def gwet_ac1(rows: list[tuple[int, int]]) -> float | None: """Gwet's AC1 for binary ratings (yes_count, no_count) per item. Reported ALONGSIDE Fleiss' κ because κ suffers the "kappa paradox": under a skewed marginal (here almost every halacha is is_holding=True) κ collapses toward 0 even at very high observed agreement, since chance-agreement Pe is also near 1. AC1 estimates chance agreement under maximum uncertainty, so it is robust to prevalence and tracks the true observed agreement (Gwet 2008; Feinstein & Cicchetti 1990, "high agreement, low kappa"). Same Pa as Fleiss; only the chance term differs.""" rows = [(y, n) for (y, n) in rows if (y + n) > 0] N = len(rows) if N == 0: return None n = rows[0][0] + rows[0][1] if n < 2 or any((y + n_) != n for (y, n_) in rows): return None Pa = sum((y * (y - 1) + nn * (nn - 1)) / (n * (n - 1)) for (y, nn) in rows) / N p_yes = sum(y for (y, _) in rows) / (N * n) p_no = 1.0 - p_yes Pe = 2 * p_yes * p_no # q=2 categories: (1/(q-1))·Σ π_k(1-π_k) = 2·p·(1-p) if Pe >= 1.0: return 1.0 return (Pa - Pe) / (1 - Pe) # ── anonymization probe (pure — unit-tested) ────────────────────────────────── _FAKE_CASE = "12345-67-89" _FAKE_NAME = "פלוני נ' אלמוני" def anonymize(text: str, case_number: str | None, case_name: str | None) -> str: """Mask the case identifier so the model can't key on a memorized case. Replaces the literal case_number and case_name (if they appear) with fake plausible tokens. Legal substance (the rule + quote) is untouched — only the identifiers that enable memorization are swapped (arXiv:2505.02172).""" out = text if case_number: out = out.replace(case_number, _FAKE_CASE) # also catch a bare nnnn-nn-nn / nnnn/nn pattern of the same case out = re.sub(re.escape(case_number).replace(r"\-", r"[-/]"), _FAKE_CASE, out) if case_name: out = out.replace(case_name, _FAKE_NAME) return out # ── one panel pass over a single item ───────────────────────────────────────── def _parse(d: dict | None) -> dict | None: if not isinstance(d, dict) or "is_holding" not in d: return None t = str(d.get("type") or "").strip() return { "is_holding": bool(d["is_holding"]), "type": t if t in VALID_TYPES else "", "rationale": str(d.get("rationale") or "")[:300], } async def _judge_claude(user: str) -> dict | None: try: return await claude_session.query_json(user, system=SYSTEM, effort="low") except Exception: # noqa: BLE001 return None async def panel_pass(client: httpx.AsyncClient, user: str) -> tuple[list[dict | None], bool | None, str]: """Run the three judges on one prompt; return (per_model, consensus, tag).""" c, ds, gm = await asyncio.gather( _judge_claude(user), judge_deepseek(client, SYSTEM, user), judge_gemini(client, SYSTEM, user), ) per = [_parse(c), _parse(ds), _parse(gm)] decided, tag = consensus([m["is_holding"] if m else None for m in per]) return per, decided, tag async def main(args: argparse.Namespace) -> int: print(f"keys — deepseek:{bool(db and True)} (see panel) · claude:local · anon:{not args.no_anon}\n", flush=True) items = await db.goldset_list(args.batch) todo = [it for it in items if args.force or not it.get("panel_generated_at")] if args.limit: todo = todo[: args.limit] print(f"gold-set '{args.batch}': {len(items)} items, {len(todo)} to label by panel", flush=True) sem = asyncio.Semaphore(args.concurrency) tags: Counter = Counter() kappa_rows: list[tuple[int, int]] = [] anon_checked = anon_stable = 0 chair_overlap = chair_agree = 0 # consensus vs existing human label (external validity) async with httpx.AsyncClient() as client: async def run(i: int, it: dict) -> None: nonlocal anon_checked, anon_stable, chair_overlap, chair_agree async with sem: user = _prompt(it) per, decided, tag = await panel_pass(client, user) anon_hold = anon_st = None if not args.no_anon and decided is not None: anon_user = anonymize(user, it.get("case_number"), it.get("case_name")) _, anon_decided, _ = await panel_pass(client, anon_user) if anon_decided is not None: anon_hold = anon_decided anon_st = (anon_decided == decided) anon_checked += 1 anon_stable += int(anon_st) ctype = consensus_type(per, decided) await db.goldset_set_panel_label( UUID(str(it["id"])), claude=per[0], deepseek=per[1], gemini=per[2], consensus_is_holding=decided, consensus_type=ctype, agreement=tag, anon_is_holding=anon_hold, anon_stable=anon_st, ) tags[tag] += 1 # κ counts only items all three judged nv = [m for m in per if m is not None] if len(nv) == 3: y = sum(1 for m in nv if m["is_holding"]) kappa_rows.append((y, 3 - y)) # external validity: where a human already labeled, does consensus match? if (it.get("tagged_by") == "chair" and it.get("is_holding") is not None and decided is not None): chair_overlap += 1 chair_agree += int(decided == it["is_holding"]) mark = {"3/3": "✓✓✓", "2/3": "✓✓", "split": "⚖", "incomplete": "…"}[tag] astr = "" if anon_st is None else (" anon✓" if anon_st else " anon✗FLIP") print(f"[{i}/{len(todo)}] {it.get('case_number')}: {mark} {tag} " f"→ {decided}/{ctype}{astr}", flush=True) tasks = [run(i, it) for i, it in enumerate(todo, 1)] for j in range(0, len(tasks), args.concurrency): await asyncio.gather(*tasks[j : j + args.concurrency]) kappa = fleiss_kappa(kappa_rows) ac1 = gwet_ac1(kappa_rows) raw_agree = (sum(1 for (y, n) in kappa_rows if y == 0 or n == 0) / len(kappa_rows) if kappa_rows else None) # share of items with unanimous 3/3 decided_n = tags["3/3"] + tags["2/3"] def interp(k: float) -> str: return ("almost-perfect" if k >= 0.8 else "substantial" if k >= 0.6 else "moderate" if k >= 0.4 else "fair/poor") print("\n" + "=" * 60) print(f"PANEL LABELING — gold-set '{args.batch}'") print("=" * 60) print(f" 3/3 unanimous : {tags['3/3']}") print(f" 2/3 majority : {tags['2/3']}") print(f" ⚖ split→chair : {tags['split']}") print(f" … incomplete : {tags['incomplete']}") print(f" DECIDED (labels written): {decided_n}/{len(todo)}") if kappa_rows: # Report AC1 as the headline agreement metric: the is_holding marginal is # heavily skewed (≈all True), where Fleiss κ hits the "kappa paradox" # (high agreement, near-zero κ). AC1 is prevalence-robust. print(f" inter-model agreement (n={len(kappa_rows)}, 3 raters, is_holding):") print(f" Gwet AC1 : {ac1:.3f} ({interp(ac1)}) ← headline (skew-robust)") print(f" Fleiss κ : {kappa:.3f} ({interp(kappa)}) [paradox under skew — see code]") print(f" raw 3/3 : {raw_agree:.1%} unanimous") if chair_overlap: print(f" consensus vs HUMAN labels (external validity): " f"{chair_agree}/{chair_overlap} = {chair_agree / chair_overlap:.1%}") if anon_checked: rate = anon_stable / anon_checked print(f" anonymization stability: {anon_stable}/{anon_checked} = {rate:.1%} " f"({'robust' if rate >= 0.9 else 'CHECK memorization'})") ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") report = Path(__file__).resolve().parent.parent / "data" / "audit" / f"goldset-panel-{args.batch}-{ts}.json" report.parent.mkdir(parents=True, exist_ok=True) report.write_text(json.dumps({ "batch": args.batch, "labeled": len(todo), "agreement": dict(tags), "decided": decided_n, "fleiss_kappa": kappa, "gwet_ac1": ac1, "raw_unanimous": raw_agree, "consensus_vs_human": {"agree": chair_agree, "overlap": chair_overlap}, "anon_checked": anon_checked, "anon_stable": anon_stable, }, ensure_ascii=False, indent=2)) print(f"\nreport → {report}") print("next: .venv/bin/python ../scripts/halacha_goldset.py score " "(measures validators vs the consensus labels — #81.8)") return 0 if __name__ == "__main__": ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--batch", default="default") ap.add_argument("--force", action="store_true", help="re-label even if already paneled") ap.add_argument("--limit", type=int, default=0) ap.add_argument("--concurrency", type=int, default=4) ap.add_argument("--no-anon", action="store_true", help="skip the anonymization probe") raise SystemExit(asyncio.run(main(ap.parse_args())))