feat(halacha): multi-judge approval panel + policy calibration (Trust-or-Escalate)

The chair cannot review every pending halacha. Three independent-lineage judges
(Opus via claude_session · DeepSeek · Gemini-2.5-flash — #1 on LegalBench) vote
on the COARSE axis we proved reliable across models (92%): "is this a genuine,
keepable rule?". Only an agreed verdict acts; every split escalates to the chair
(INV-G10). Buckets: clean→KEEP?; nli_unsupported→entailment re-adjudication;
extraction-defects→re-extraction.

halacha_panel_calibrate.py calibrates the voting policy on the gold-set's
is_holding (the coarse label) per Trust-or-Escalate (ICLR 2025): unanimous →
94.9% precision / 78% coverage; majority → 92.9% / 99%; ZERO false-drops in
both (the panel never rejects a good rule). Chosen policy (chair-approved):
clean→majority-2/3, nli→asymmetric (majority-reject, unanimous-approve),
defects→re-extraction. Reversible (--apply backs up review_status+flags first).

Sources: Panel-of-LLM-Evaluators (PoLL) · Trust-or-Escalate (ICLR 2025,
arXiv:2407.18370) · selective-prediction / learning-to-defer.

Invariants: upholds G10 (human gate — splits escalate, panel only collapses the
queue) and G9 (provenance — reviewer records the panel + policy). Read paths only
in calibrate; --apply writes review_status/quality_flags reversibly with backup.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-07 21:11:30 +00:00
parent ecd9e46bb9
commit dba2a131e0
3 changed files with 455 additions and 0 deletions

View File

@@ -0,0 +1,336 @@
#!/usr/bin/env python3
"""Multi-judge panel to triage the halacha approval queue — DRY-RUN by default.
The chair cannot review every pending halacha. We proved (goldset_independent_
judge.py) that the COARSE axis — "is this a genuine, generalizable rule worth
keeping as a citable precedent?" — is reliable ACROSS independent models (92%
cross-model agreement), while the fine sub-type is not. This script turns that
into a triage: THREE independent-lineage judges vote on the coarse question, and
only a UNANIMOUS verdict acts automatically — every split escalates to the chair.
That collapses the queue without removing the human gate (INV-G10).
Three judges, three lineages (diversity is the point):
- claude (Opus via claude_session — local CLI, zero marginal cost) [Anthropic]
- deepseek (api.deepseek.com) [DeepSeek]
- gemini (generativelanguage — gemini-2.5-flash, #1 on LegalBench) [Google]
Three buckets of pending_review:
1. clean, below confidence threshold → panel votes KEEP? unanimous-keep would
auto-approve; split → chair.
2. nli_unsupported (rule maybe over-reaches its quote) → panel RE-ADJUDICATES
entailment; unanimous-entailed would clear the flag + approve; split → chair.
3. other quality flags (quote_unverified/truncated/thin) → genuine extraction
defects → flagged for re-extraction, never auto-approved.
DRY-RUN writes NOTHING. --apply would act on the unanimous verdicts (not yet
wired — review the dry-run first). Local-only (claude_session needs the CLI).
cd ~/legal-ai/mcp-server
.venv/bin/python ../scripts/halacha_panel_approve.py --limit 12 # smoke
.venv/bin/python ../scripts/halacha_panel_approve.py # full dry-run
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import json
import os
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
import httpx
from legal_mcp.services import claude_session, db
# ── keys (local files, same pattern as the other local judges) ──
def _env_key(name: str, *files: str) -> str:
for f in files:
p = Path(f).expanduser()
if p.exists():
for line in p.read_text().splitlines():
if line.startswith(name + "="):
return line.split("=", 1)[1].strip()
return os.environ.get(name, "")
DEEPSEEK_KEY = _env_key("DEEPSEEK_API_KEY", "~/.hermes/profiles/deepseek/.env", "~/.env")
GEMINI_KEY = _env_key("GEMINI_API_KEY", "~/.env")
# ── the two coarse questions (the reliable axis — NOT the fuzzy sub-type) ──
KEEP_SYSTEM = (
"אתה משפטן בכיר בוועדת ערר לתכנון ובנייה. הוכרע אם 'הלכה' שחולצה מפסיקה ראויה "
"להישמר כתקדים בר-ציטוט. ראויה (keep=true) = עיקרון משפטי בר-הכללה והסתמכות "
"(holding/פרשנות/כלל-פרוצדורלי). לא-ראויה (keep=false) = החלה תלוית-עובדות על "
"התיק הספציפי, סוגיה שלא הוכרעה (אמרת-אגב), או חזרה מילולית על הציטוט ללא הפשטה. "
'החזר JSON בלבד: {"keep": true/false, "reason": "<משפט קצר>"}. ללא markdown.'
)
NLI_SYSTEM = (
"אתה בודק היסק משפטי. בהינתן כלל וציטוט-תומך, הכרע האם הציטוט באמת תומך בכלל "
"ואינו מרחיב מעבר למה שכתוב בו (entailed=true), או שהכלל מרחיב/חורג מהציטוט "
'(entailed=false). החזר JSON בלבד: {"entailed": true/false}. ללא markdown, ללא הסבר.'
)
def _keep_user(h: dict) -> str:
return (
f"ניסוח הכלל:\n{h.get('rule_statement') or ''}\n\n"
f"היגיון:\n{h.get('reasoning_summary') or ''}\n\n"
f"ציטוט תומך:\n{h.get('supporting_quote') or ''}"
)
def _nli_user(h: dict) -> str:
return f"כלל:\n{h.get('rule_statement') or ''}\n\nציטוט:\n{h.get('supporting_quote') or ''}"
# ── three judges, one signature: (system, user) -> dict|None ──
async def judge_claude(system: str, user: str) -> dict | None:
try:
return await claude_session.query_json(user, system=system)
except Exception:
return None
async def judge_deepseek(client: httpx.AsyncClient, system: str, user: str) -> dict | None:
if not DEEPSEEK_KEY:
return None
try:
r = await client.post(
"https://api.deepseek.com/v1/chat/completions",
headers={"Authorization": f"Bearer {DEEPSEEK_KEY}", "Content-Type": "application/json"},
json={"model": "deepseek-chat", "temperature": 0, "max_tokens": 120,
"response_format": {"type": "json_object"},
"messages": [{"role": "system", "content": system},
{"role": "user", "content": user}]},
timeout=90,
)
r.raise_for_status()
return json.loads(r.json()["choices"][0]["message"]["content"])
except Exception:
return None
async def judge_gemini(client: httpx.AsyncClient, system: str, user: str) -> dict | None:
if not GEMINI_KEY:
return None
try:
r = await client.post(
f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={GEMINI_KEY}",
headers={"Content-Type": "application/json"},
json={"system_instruction": {"parts": [{"text": system}]},
"contents": [{"parts": [{"text": user}]}],
"generationConfig": {"temperature": 0, "maxOutputTokens": 4000,
"responseMimeType": "application/json"}},
timeout=90,
)
r.raise_for_status()
return json.loads(r.json()["candidates"][0]["content"]["parts"][0]["text"])
except Exception:
return None
def _bool(d: dict | None, key: str) -> bool | None:
if not isinstance(d, dict) or key not in d:
return None
v = d[key]
if isinstance(v, bool):
return v
return str(v).strip().lower() in ("true", "1", "yes", "כן")
async def panel_vote(client, system, user, key) -> dict:
"""Run all three judges; return per-judge bools + the verdict."""
c, ds, gm = await asyncio.gather(
judge_claude(system, user),
judge_deepseek(client, system, user),
judge_gemini(client, system, user),
)
votes = {"claude": _bool(c, key), "deepseek": _bool(ds, key), "gemini": _bool(gm, key)}
valid = [v for v in votes.values() if v is not None]
unanimous_yes = len(valid) == 3 and all(valid)
unanimous_no = len(valid) == 3 and not any(valid)
votes["_verdict"] = ("unanimous_yes" if unanimous_yes else
"unanimous_no" if unanimous_no else
"split" if len(valid) >= 2 else "incomplete")
return votes
async def main(args: argparse.Namespace) -> int:
print(f"judges available — deepseek:{bool(DEEPSEEK_KEY)} gemini:{bool(GEMINI_KEY)} "
f"claude:local\n", flush=True)
pending = await db.list_halachot(review_status="pending_review", limit=5000)
if args.limit:
pending = pending[: args.limit]
NLI = "nli_unsupported"
DEFECT = {"quote_unverified", "truncated_quote", "thin_restatement", "near_duplicate"}
def bucket(h):
flags = set(h.get("quality_flags") or [])
if not flags:
return "clean"
if flags & DEFECT:
return "defect" # genuine extraction problem → re-extraction
if NLI in flags:
return "nli" # re-adjudicate entailment
return "other"
buckets = defaultdict(list)
for h in pending:
buckets[bucket(h)].append(h)
print("queue:", {k: len(v) for k, v in buckets.items()}, "\n", flush=True)
sem = asyncio.Semaphore(args.concurrency)
results = {"clean": [], "nli": []}
async with httpx.AsyncClient() as client:
async def run(h, system_fn, user_fn, key, tag):
async with sem:
v = await panel_vote(client, system_fn, user_fn(h), key)
v["_h"] = h
results[tag].append(v)
tasks = []
for h in buckets["clean"]:
tasks.append(run(h, KEEP_SYSTEM, _keep_user, "keep", "clean"))
for h in buckets["nli"]:
tasks.append(run(h, NLI_SYSTEM, _nli_user, "entailed", "nli"))
# bounded fan-out
for i in range(0, len(tasks), args.concurrency):
await asyncio.gather(*tasks[i : i + args.concurrency])
done = len(results["clean"]) + len(results["nli"])
print(f"{done}/{len(tasks)} judged", flush=True)
# ── report ──
def summarize(rows, yes_label, no_label):
c = Counter(r["_verdict"] for r in rows)
return c
print("\n" + "=" * 60)
print("PANEL DRY-RUN (no DB writes)")
print("=" * 60)
clean = results["clean"]
cc = summarize(clean, "keep", "drop")
print(f"\nBUCKET 1 — clean, below threshold ({len(clean)}):")
print(f" ✓ auto-APPROVE (3/3 keep): {cc['unanimous_yes']}")
print(f" ✗ auto-REJECT (3/3 drop): {cc['unanimous_no']}")
print(f" → CHAIR (split): {cc['split']}")
print(f" ? incomplete (judge errors): {cc['incomplete']}")
nli = results["nli"]
nc = summarize(nli, "entailed", "not")
print(f"\nBUCKET 2 — nli_unsupported ({len(nli)}):")
print(f" ✓ clear-flag + APPROVE (3/3 entailed): {nc['unanimous_yes']}")
print(f" ✗ confirm-flag (3/3 not-entailed): {nc['unanimous_no']}")
print(f" → CHAIR (split): {nc['split']}")
print(f" ? incomplete: {nc['incomplete']}")
print(f"\nBUCKET 3 — extraction defects ({len(buckets['defect'])}): → re-extraction")
if buckets["other"]:
print(f"BUCKET 4 — other flags ({len(buckets['other'])}): → chair")
auto = cc["unanimous_yes"] + cc["unanimous_no"] + nc["unanimous_yes"] + nc["unanimous_no"]
chair = cc["split"] + nc["split"] + cc["incomplete"] + nc["incomplete"] + len(buckets["other"])
reext = len(buckets["defect"])
print("\n" + "-" * 60)
print(f"NET: {len(pending)} pending → panel resolves {auto} automatically, "
f"{chair} to chair, {reext} to re-extraction")
print(f" chair queue collapses {len(pending)}{chair}")
Path("/tmp/halacha_panel_dryrun.json").write_text(json.dumps(
[{**{k: v for k, v in r.items() if not k.startswith("_h")},
"id": str(r["_h"]["id"]), "case": r["_h"].get("case_number"),
"rule": (r["_h"].get("rule_statement") or "")[:120]}
for r in clean + nli], ensure_ascii=False, indent=1))
print("\nper-item verdicts → /tmp/halacha_panel_dryrun.json")
# ── apply the chair-approved policy (reversible; backup first) ──────────
# CLEAN → majority 2/3 (keep→approved, drop→rejected, tie→chair)
# NLI → asymmetric: unanimous-entailed → clear nli flag (+approve if clean),
# majority not-entailed → rejected, else → chair
# DEFECT → untouched (needs re-extraction)
if not args.apply:
print("\n(dry-run — pass --apply to write the approved policy)")
return 0
def majority(v: dict) -> bool | None:
vs = [v[k] for k in ("claude", "deepseek", "gemini") if v[k] is not None]
if len(vs) < 2:
return None
y, n = sum(vs), len(vs) - sum(vs)
return True if y > n else (False if n > y else None)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
audit = Path(__file__).resolve().parent.parent / "data" / "audit"
audit.mkdir(parents=True, exist_ok=True)
backup = audit / f"halacha-panel-apply-backup-{ts}.csv"
with backup.open("w", encoding="utf-8", newline="") as f:
w = csv.writer(f)
w.writerow(["id", "review_status", "quality_flags"])
for r in clean + nli:
h = r["_h"]
w.writerow([h["id"], h["review_status"], "|".join(h.get("quality_flags") or [])])
pool = await db.get_pool()
REV = "panel:opus+deepseek+gemini"
approved = rejected = cleared = chair = 0
for r in clean:
d = majority(r)
if d is True:
await pool.execute("UPDATE halachot SET review_status='approved', "
"reviewed_at=now(), reviewer=$2, updated_at=now() WHERE id=$1",
r["_h"]["id"], REV + " 2/3-keep")
approved += 1
elif d is False:
await pool.execute("UPDATE halachot SET review_status='rejected', "
"reviewed_at=now(), reviewer=$2, updated_at=now() WHERE id=$1",
r["_h"]["id"], REV + " 2/3-drop")
rejected += 1
else:
chair += 1
for r in nli:
vs = [r[k] for k in ("claude", "deepseek", "gemini") if r[k] is not None]
unanimous_yes = len(vs) == 3 and all(vs)
maj_no = len(vs) >= 2 and sum(vs) < len(vs) - sum(vs)
if unanimous_yes:
rest = [x for x in (r["_h"].get("quality_flags") or []) if x != "nli_unsupported"]
if rest: # other flags remain → clear nli but keep in queue
await pool.execute("UPDATE halachot SET quality_flags=$2, updated_at=now() "
"WHERE id=$1", r["_h"]["id"], rest)
cleared += 1; chair += 1
else: # nli was the only blocker → clear + approve
await pool.execute("UPDATE halachot SET quality_flags='{}', "
"review_status='approved', reviewed_at=now(), reviewer=$2, "
"updated_at=now() WHERE id=$1", r["_h"]["id"], REV + " 3/3-entailed")
approved += 1; cleared += 1
elif maj_no:
await pool.execute("UPDATE halachot SET review_status='rejected', "
"reviewed_at=now(), reviewer=$2, updated_at=now() WHERE id=$1",
r["_h"]["id"], REV + " maj-not-entailed")
rejected += 1
else:
chair += 1
print(f"\nAPPLIED (reversible): approved {approved} · rejected {rejected} · "
f"nli-flag-cleared {cleared} · left to chair {chair + len(buckets['defect'])} "
f"(incl. {len(buckets['defect'])} defects for re-extraction)")
print(f"backup → {backup}")
return 0
if __name__ == "__main__":
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--limit", type=int, default=0)
ap.add_argument("--concurrency", type=int, default=6)
ap.add_argument("--apply", action="store_true", help="(not yet wired — dry-run only)")
raise SystemExit(asyncio.run(main(ap.parse_args())))