Files
legal-ai/scripts/halacha_panel_approve.py
Chaim 338a8a947f feat(principles): canonical_statement synthesis service + throttled backfill (Phase E groundwork, #152)
Grounded (INV-AH) multi-instance synthesis with drift guard + chair gate
(pending_review, G10). Single path used by backfill, MCP tool, nightly drain.
HELD from production run pending the principles-redesign (rename+cull, #152).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-19 10:57:48 +00:00

315 lines
15 KiB
Python

#!/usr/bin/env python3
"""Multi-judge panel to triage the halacha approval queue — DRY-RUN by default.
The chair cannot review every pending halacha. We proved (goldset_independent_
judge.py) that the COARSE axis — "is this a genuine, generalizable rule worth
keeping as a citable precedent?" — is reliable ACROSS independent models (92%
cross-model agreement), while the fine sub-type is not. This script turns that
into a triage: THREE independent-lineage judges vote on the coarse question, and
only a UNANIMOUS verdict acts automatically — every split escalates to the chair.
That collapses the queue without removing the human gate (INV-G10).
Three judges, three lineages (diversity is the point):
- claude (Opus via claude_session — local CLI, zero marginal cost) [Anthropic]
- deepseek (api.deepseek.com) [DeepSeek]
- gemini (generativelanguage — gemini-2.5-flash, #1 on LegalBench) [Google]
Three buckets of pending_review:
1. clean, below confidence threshold → panel votes KEEP? unanimous-keep would
auto-approve; split → chair.
2. nli_unsupported (rule maybe over-reaches its quote) → panel RE-ADJUDICATES
entailment; unanimous-entailed would clear the flag + approve; split → chair.
3. other quality flags (quote_unverified/truncated/thin) → genuine extraction
defects → flagged for re-extraction, never auto-approved.
DRY-RUN writes no DECISIONS. --apply acts on the agreed verdicts (clean: 2/3 majority;
nli: unanimous-entailed clears the flag) — reversible, backed up to data/audit/ first.
Splits/defects stay pending_review for the chair. Local-only (claude_session needs CLI).
FU-1 (#133, active-learning): EVERY adjudication — votes AND per-judge rationale — is
persisted to halacha_panel_rounds in BOTH modes (a dry-run analysis is still a learning
datapoint; apply_mode records which). This is capture-only and never touches `halachot`
(the chair gate stays the single source of truth, INV-G10). The learning seed is formed
later by joining a round against the chair's own later decision on the same halacha. Pass
--no-capture to skip.
cd ~/legal-ai/mcp-server
.venv/bin/python ../scripts/halacha_panel_approve.py --limit 12 # smoke
.venv/bin/python ../scripts/halacha_panel_approve.py # full dry-run
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import json
import os
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
import httpx
from legal_mcp.services import db, panel_judges
# Judges are the shared primitive (G2) — #152 lifted them to services/panel_judges.
from legal_mcp.services.panel_judges import (
DEEPSEEK_KEY,
GEMINI_KEY,
judge_claude,
judge_deepseek,
judge_gemini,
)
_bool = panel_judges.to_bool
# ── the two coarse questions (the reliable axis — NOT the fuzzy sub-type) ──
KEEP_SYSTEM = (
"אתה משפטן בכיר בוועדת ערר לתכנון ובנייה. הוכרע אם 'הלכה' שחולצה מפסיקה ראויה "
"להישמר כתקדים בר-ציטוט. ראויה (keep=true) = עיקרון משפטי בר-הכללה והסתמכות "
"(holding/פרשנות/כלל-פרוצדורלי). לא-ראויה (keep=false) = החלה תלוית-עובדות על "
"התיק הספציפי, סוגיה שלא הוכרעה (אמרת-אגב), או חזרה מילולית על הציטוט ללא הפשטה. "
'החזר JSON בלבד: {"keep": true/false, "reason": "<משפט קצר>"}. ללא markdown.'
)
NLI_SYSTEM = (
"אתה בודק היסק משפטי. בהינתן כלל וציטוט-תומך, הכרע האם הציטוט באמת תומך בכלל "
"ואינו מרחיב מעבר למה שכתוב בו (entailed=true), או שהכלל מרחיב/חורג מהציטוט "
'(entailed=false). החזר JSON בלבד: {"entailed": true/false, "reason": "<משפט קצר>"}. '
"ללא markdown."
)
def _keep_user(h: dict) -> str:
return (
f"ניסוח הכלל:\n{h.get('rule_statement') or ''}\n\n"
f"היגיון:\n{h.get('reasoning_summary') or ''}\n\n"
f"ציטוט תומך:\n{h.get('supporting_quote') or ''}"
)
def _nli_user(h: dict) -> str:
return f"כלל:\n{h.get('rule_statement') or ''}\n\nציטוט:\n{h.get('supporting_quote') or ''}"
async def panel_vote(client, system, user, key) -> dict:
"""Run all three judges; return per-judge bools + the verdict."""
c, ds, gm = await asyncio.gather(
judge_claude(system, user),
judge_deepseek(client, system, user),
judge_gemini(client, system, user),
)
votes = {"claude": _bool(c, key), "deepseek": _bool(ds, key), "gemini": _bool(gm, key)}
valid = [v for v in votes.values() if v is not None]
unanimous_yes = len(valid) == 3 and all(valid)
unanimous_no = len(valid) == 3 and not any(valid)
votes["_verdict"] = ("unanimous_yes" if unanimous_yes else
"unanimous_no" if unanimous_no else
"split" if len(valid) >= 2 else "incomplete")
# keep the raw replies so the per-judge rationale can be persisted (FU-1)
votes["_raw"] = {"claude": c, "deepseek": ds, "gemini": gm}
return votes
async def main(args: argparse.Namespace) -> int:
print(f"judges available — deepseek:{bool(DEEPSEEK_KEY)} gemini:{bool(GEMINI_KEY)} "
f"claude:local\n", flush=True)
pending = await db.list_halachot(review_status="pending_review", limit=5000)
if args.limit:
pending = pending[: args.limit]
NLI = "nli_unsupported"
DEFECT = {"quote_unverified", "truncated_quote", "thin_restatement", "near_duplicate"}
def bucket(h):
flags = set(h.get("quality_flags") or [])
if not flags:
return "clean"
if flags & DEFECT:
return "defect" # genuine extraction problem → re-extraction
if NLI in flags:
return "nli" # re-adjudicate entailment
return "other"
buckets = defaultdict(list)
for h in pending:
buckets[bucket(h)].append(h)
print("queue:", {k: len(v) for k, v in buckets.items()}, "\n", flush=True)
# one stamp shared by the whole run, so a round is reconstructable later (FU-1)
round_ts = datetime.now(timezone.utc)
sem = asyncio.Semaphore(args.concurrency)
results = {"clean": [], "nli": []}
async with httpx.AsyncClient() as client:
async def run(h, system_fn, user_fn, key, tag):
async with sem:
v = await panel_vote(client, system_fn, user_fn(h), key)
v["_h"] = h
results[tag].append(v)
tasks = []
for h in buckets["clean"]:
tasks.append(run(h, KEEP_SYSTEM, _keep_user, "keep", "clean"))
for h in buckets["nli"]:
tasks.append(run(h, NLI_SYSTEM, _nli_user, "entailed", "nli"))
# bounded fan-out
for i in range(0, len(tasks), args.concurrency):
await asyncio.gather(*tasks[i : i + args.concurrency])
done = len(results["clean"]) + len(results["nli"])
print(f"{done}/{len(tasks)} judged", flush=True)
# ── report ──
def summarize(rows, yes_label, no_label):
c = Counter(r["_verdict"] for r in rows)
return c
print("\n" + "=" * 60)
print("PANEL DRY-RUN (no DB writes)")
print("=" * 60)
clean = results["clean"]
cc = summarize(clean, "keep", "drop")
print(f"\nBUCKET 1 — clean, below threshold ({len(clean)}):")
print(f" ✓ auto-APPROVE (3/3 keep): {cc['unanimous_yes']}")
print(f" ✗ auto-REJECT (3/3 drop): {cc['unanimous_no']}")
print(f" → CHAIR (split): {cc['split']}")
print(f" ? incomplete (judge errors): {cc['incomplete']}")
nli = results["nli"]
nc = summarize(nli, "entailed", "not")
print(f"\nBUCKET 2 — nli_unsupported ({len(nli)}):")
print(f" ✓ clear-flag + APPROVE (3/3 entailed): {nc['unanimous_yes']}")
print(f" ✗ confirm-flag (3/3 not-entailed): {nc['unanimous_no']}")
print(f" → CHAIR (split): {nc['split']}")
print(f" ? incomplete: {nc['incomplete']}")
print(f"\nBUCKET 3 — extraction defects ({len(buckets['defect'])}): → re-extraction")
if buckets["other"]:
print(f"BUCKET 4 — other flags ({len(buckets['other'])}): → chair")
auto = cc["unanimous_yes"] + cc["unanimous_no"] + nc["unanimous_yes"] + nc["unanimous_no"]
chair = cc["split"] + nc["split"] + cc["incomplete"] + nc["incomplete"] + len(buckets["other"])
reext = len(buckets["defect"])
print("\n" + "-" * 60)
print(f"NET: {len(pending)} pending → panel resolves {auto} automatically, "
f"{chair} to chair, {reext} to re-extraction")
print(f" chair queue collapses {len(pending)}{chair}")
Path("/tmp/halacha_panel_dryrun.json").write_text(json.dumps(
[{**{k: v for k, v in r.items() if not k.startswith("_h")},
"id": str(r["_h"]["id"]), "case": r["_h"].get("case_number"),
"rule": (r["_h"].get("rule_statement") or "")[:120]}
for r in clean + nli], ensure_ascii=False, indent=1))
print("\nper-item verdicts → /tmp/halacha_panel_dryrun.json")
# ── apply the chair-approved policy (reversible; backup first) ──────────
# CLEAN → majority 2/3 (keep→approved, drop→rejected, tie→chair)
# NLI → asymmetric: unanimous-entailed → clear nli flag (+approve if clean),
# majority not-entailed → rejected, else → chair
# DEFECT → untouched (needs re-extraction)
def majority(v: dict) -> bool | None:
vs = [v[k] for k in ("claude", "deepseek", "gemini") if v[k] is not None]
if len(vs) < 2:
return None
y, n = sum(vs), len(vs) - sum(vs)
return True if y > n else (False if n > y else None)
if args.apply:
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
audit = Path(__file__).resolve().parent.parent / "data" / "audit"
audit.mkdir(parents=True, exist_ok=True)
backup = audit / f"halacha-panel-apply-backup-{ts}.csv"
with backup.open("w", encoding="utf-8", newline="") as f:
w = csv.writer(f)
w.writerow(["id", "review_status", "quality_flags"])
for r in clean + nli:
h = r["_h"]
w.writerow([h["id"], h["review_status"], "|".join(h.get("quality_flags") or [])])
pool = await db.get_pool()
REV = "panel:opus+deepseek+gemini"
approved = rejected = cleared = chair = 0
for r in clean:
d = majority(r)
if d is True:
await pool.execute("UPDATE halachot SET review_status='approved', "
"reviewed_at=now(), reviewer=$2, updated_at=now() WHERE id=$1",
r["_h"]["id"], REV + " 2/3-keep")
approved += 1; r["_action"] = "approved"
elif d is False:
await pool.execute("UPDATE halachot SET review_status='rejected', "
"reviewed_at=now(), reviewer=$2, updated_at=now() WHERE id=$1",
r["_h"]["id"], REV + " 2/3-drop")
rejected += 1; r["_action"] = "rejected"
else:
chair += 1; r["_action"] = "chair"
for r in nli:
vs = [r[k] for k in ("claude", "deepseek", "gemini") if r[k] is not None]
unanimous_yes = len(vs) == 3 and all(vs)
maj_no = len(vs) >= 2 and sum(vs) < len(vs) - sum(vs)
if unanimous_yes:
rest = [x for x in (r["_h"].get("quality_flags") or []) if x != "nli_unsupported"]
if rest: # other flags remain → clear nli but keep in queue
await pool.execute("UPDATE halachot SET quality_flags=$2, updated_at=now() "
"WHERE id=$1", r["_h"]["id"], rest)
cleared += 1; chair += 1; r["_action"] = "nli_cleared"
else: # nli was the only blocker → clear + approve
await pool.execute("UPDATE halachot SET quality_flags='{}', "
"review_status='approved', reviewed_at=now(), reviewer=$2, "
"updated_at=now() WHERE id=$1", r["_h"]["id"], REV + " 3/3-entailed")
approved += 1; cleared += 1; r["_action"] = "approved"
elif maj_no:
await pool.execute("UPDATE halachot SET review_status='rejected', "
"reviewed_at=now(), reviewer=$2, updated_at=now() WHERE id=$1",
r["_h"]["id"], REV + " maj-not-entailed")
rejected += 1; r["_action"] = "rejected"
else:
chair += 1; r["_action"] = "chair"
print(f"\nAPPLIED (reversible): approved {approved} · rejected {rejected} · "
f"nli-flag-cleared {cleared} · left to chair {chair + len(buckets['defect'])} "
f"(incl. {len(buckets['defect'])} defects for re-extraction)")
print(f"backup → {backup}")
else:
print("\n(dry-run — pass --apply to write the approved policy)")
# ── FU-1 (#133): persist EVERY adjudication so active-learning has a signal.
# Capture-only — writes to halacha_panel_rounds, never touches `halachot`
# (chair gate stays the single source of truth, INV-G10). Runs in BOTH modes:
# a dry-run analysis is still a learning datapoint (apply_mode records which).
if not args.no_capture:
captured = errs = 0
for tag, q in (("clean", "keep"), ("nli", "entailed")):
for r in results[tag]:
raw = r.get("_raw") or {}
try:
await db.insert_panel_round(
r["_h"]["id"], round_ts=round_ts, question=q, bucket=tag,
claude=raw.get("claude"), deepseek=raw.get("deepseek"),
gemini=raw.get("gemini"), vote_key=q, verdict=r["_verdict"],
applied_action=r.get("_action", ""), apply_mode=args.apply,
)
captured += 1
except Exception as e:
errs += 1
print(f" capture-error {r['_h']['id']}: {e}", flush=True)
print(f"captured {captured} panel rounds → halacha_panel_rounds "
f"(apply_mode={args.apply}, errors={errs})")
return 0
if __name__ == "__main__":
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--limit", type=int, default=0)
ap.add_argument("--concurrency", type=int, default=6)
ap.add_argument("--apply", action="store_true",
help="write the agreed verdicts (reversible, CSV-backed); default dry-run")
ap.add_argument("--no-capture", action="store_true",
help="skip persisting per-judge votes+reasons to halacha_panel_rounds (FU-1, #133)")
raise SystemExit(asyncio.run(main(ap.parse_args())))