Files
legal-ai/scripts/halacha_panel_approve.py
Chaim f79c46a352 feat(learning): מסלול נקי להעלאת החלטה סופית + פאנל-סגנון דו-סוכני (DeepSeek+Gemini)
מוסיף מסלול ייעודי לקליטת ההחלטה החתומה של היו"ר, ומפעיל אותו דרך שני
שלבים אוטומטיים מדורגים עם פאנלי-סוכנים (אוטו-אישור + אסקלציה ליו"ר).

Backend (web/):
- POST /api/cases/{case}/final/upload — קליטת final חיצוני: שמירה קנונית
  (סופי-{case}.docx + עותק קורפוס-סגנון תחת case_number מלא כדי שבל"מ לא
  יתנגש עם ערר באותו מספר), פתיחת draft_final_pairs (final_received). לא נוגע
  ב-active_draft ולא מריץ retrofit (נבדל מ-exports/upload ו-mark-final → לא G2).
- POST .../final/run-learning + .../final/run-halacha — שלבים מדורגים שמעירים
  worker מקומי (claude/DeepSeek/Gemini מקומיים בלבד) דרך הרחבת
  wake_curator_for_final עם param task=learning|halacha.

פאנל-סגנון חדש (scripts/style_lesson_panel.py): שני שופטים (DeepSeek+Gemini)
על-גבי דיסטילציית-ה-Opus; הסכמה 2/2-keep → decision_lesson
(source=panel:deepseek+gemini); substance מדולג (INV-LRN5); הפיך + גיבוי CSV.
פאנל-הלכות: docstring/SCRIPTS.md עודכנו (--apply מחווט).

Frontend (web-ui/): כפתור "העלאת החלטה סופית של היו"ר" + שני כפתורים מדורגים
"הרץ למידת-קול"/"הרץ אימות-הלכות" ב-drafts-panel; כל התוויות בעברית
(badge מקור-לקח: "פאנל: דיפסיק+גמיני", "הרמס (סקירה)"...).

Spec: docs/spec/07-learning.md §0.6. Invariants: INV-LRN1/LRN4/LRN5, G10
(שער-יו"ר ידני להטמעה ל-SKILL.md/lessons.md — הפאנלים יוצרים הצעות בלבד);
G2 (מסלול-סופי הוא יכולת חסרה, לא מסלול-מקביל).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 09:03:26 +00:00

340 lines
15 KiB
Python

#!/usr/bin/env python3
"""Multi-judge panel to triage the halacha approval queue — DRY-RUN by default.
The chair cannot review every pending halacha. We proved (goldset_independent_
judge.py) that the COARSE axis — "is this a genuine, generalizable rule worth
keeping as a citable precedent?" — is reliable ACROSS independent models (92%
cross-model agreement), while the fine sub-type is not. This script turns that
into a triage: THREE independent-lineage judges vote on the coarse question, and
only a UNANIMOUS verdict acts automatically — every split escalates to the chair.
That collapses the queue without removing the human gate (INV-G10).
Three judges, three lineages (diversity is the point):
- claude (Opus via claude_session — local CLI, zero marginal cost) [Anthropic]
- deepseek (api.deepseek.com) [DeepSeek]
- gemini (generativelanguage — gemini-2.5-flash, #1 on LegalBench) [Google]
Three buckets of pending_review:
1. clean, below confidence threshold → panel votes KEEP? unanimous-keep would
auto-approve; split → chair.
2. nli_unsupported (rule maybe over-reaches its quote) → panel RE-ADJUDICATES
entailment; unanimous-entailed would clear the flag + approve; split → chair.
3. other quality flags (quote_unverified/truncated/thin) → genuine extraction
defects → flagged for re-extraction, never auto-approved.
DRY-RUN writes NOTHING. --apply acts on the agreed verdicts (clean: 2/3 majority;
nli: unanimous-entailed clears the flag) — reversible, backed up to data/audit/ first.
Splits/defects stay pending_review for the chair. Local-only (claude_session needs CLI).
cd ~/legal-ai/mcp-server
.venv/bin/python ../scripts/halacha_panel_approve.py --limit 12 # smoke
.venv/bin/python ../scripts/halacha_panel_approve.py # full dry-run
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import json
import os
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
import httpx
from legal_mcp.services import claude_session, db
# ── keys (local files, same pattern as the other local judges) ──
def _env_key(name: str, *files: str) -> str:
for f in files:
p = Path(f).expanduser()
if p.exists():
for line in p.read_text().splitlines():
if line.startswith(name + "="):
return line.split("=", 1)[1].strip()
return os.environ.get(name, "")
DEEPSEEK_KEY = _env_key("DEEPSEEK_API_KEY", "~/.hermes/profiles/deepseek/.env", "~/.env")
# canonical Infisical name is GOOGLE_GEMINI_API_KEY (/external-apis/gemini); accept
# the bare GEMINI_API_KEY too for back-compat.
GEMINI_KEY = _env_key("GOOGLE_GEMINI_API_KEY", "~/.env") or _env_key("GEMINI_API_KEY", "~/.env")
# ── the two coarse questions (the reliable axis — NOT the fuzzy sub-type) ──
KEEP_SYSTEM = (
"אתה משפטן בכיר בוועדת ערר לתכנון ובנייה. הוכרע אם 'הלכה' שחולצה מפסיקה ראויה "
"להישמר כתקדים בר-ציטוט. ראויה (keep=true) = עיקרון משפטי בר-הכללה והסתמכות "
"(holding/פרשנות/כלל-פרוצדורלי). לא-ראויה (keep=false) = החלה תלוית-עובדות על "
"התיק הספציפי, סוגיה שלא הוכרעה (אמרת-אגב), או חזרה מילולית על הציטוט ללא הפשטה. "
'החזר JSON בלבד: {"keep": true/false, "reason": "<משפט קצר>"}. ללא markdown.'
)
NLI_SYSTEM = (
"אתה בודק היסק משפטי. בהינתן כלל וציטוט-תומך, הכרע האם הציטוט באמת תומך בכלל "
"ואינו מרחיב מעבר למה שכתוב בו (entailed=true), או שהכלל מרחיב/חורג מהציטוט "
'(entailed=false). החזר JSON בלבד: {"entailed": true/false}. ללא markdown, ללא הסבר.'
)
def _keep_user(h: dict) -> str:
return (
f"ניסוח הכלל:\n{h.get('rule_statement') or ''}\n\n"
f"היגיון:\n{h.get('reasoning_summary') or ''}\n\n"
f"ציטוט תומך:\n{h.get('supporting_quote') or ''}"
)
def _nli_user(h: dict) -> str:
return f"כלל:\n{h.get('rule_statement') or ''}\n\nציטוט:\n{h.get('supporting_quote') or ''}"
# ── three judges, one signature: (system, user) -> dict|None ──
async def judge_claude(system: str, user: str) -> dict | None:
try:
return await claude_session.query_json(user, system=system)
except Exception:
return None
async def judge_deepseek(client: httpx.AsyncClient, system: str, user: str) -> dict | None:
if not DEEPSEEK_KEY:
return None
try:
r = await client.post(
"https://api.deepseek.com/v1/chat/completions",
headers={"Authorization": f"Bearer {DEEPSEEK_KEY}", "Content-Type": "application/json"},
json={"model": "deepseek-chat", "temperature": 0, "max_tokens": 120,
"response_format": {"type": "json_object"},
"messages": [{"role": "system", "content": system},
{"role": "user", "content": user}]},
timeout=90,
)
r.raise_for_status()
return json.loads(r.json()["choices"][0]["message"]["content"])
except Exception:
return None
async def judge_gemini(client: httpx.AsyncClient, system: str, user: str) -> dict | None:
if not GEMINI_KEY:
return None
try:
r = await client.post(
f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={GEMINI_KEY}",
headers={"Content-Type": "application/json"},
json={"system_instruction": {"parts": [{"text": system}]},
"contents": [{"parts": [{"text": user}]}],
"generationConfig": {"temperature": 0, "maxOutputTokens": 4000,
"responseMimeType": "application/json"}},
timeout=90,
)
r.raise_for_status()
return json.loads(r.json()["candidates"][0]["content"]["parts"][0]["text"])
except Exception:
return None
def _bool(d: dict | None, key: str) -> bool | None:
if not isinstance(d, dict) or key not in d:
return None
v = d[key]
if isinstance(v, bool):
return v
return str(v).strip().lower() in ("true", "1", "yes", "כן")
async def panel_vote(client, system, user, key) -> dict:
"""Run all three judges; return per-judge bools + the verdict."""
c, ds, gm = await asyncio.gather(
judge_claude(system, user),
judge_deepseek(client, system, user),
judge_gemini(client, system, user),
)
votes = {"claude": _bool(c, key), "deepseek": _bool(ds, key), "gemini": _bool(gm, key)}
valid = [v for v in votes.values() if v is not None]
unanimous_yes = len(valid) == 3 and all(valid)
unanimous_no = len(valid) == 3 and not any(valid)
votes["_verdict"] = ("unanimous_yes" if unanimous_yes else
"unanimous_no" if unanimous_no else
"split" if len(valid) >= 2 else "incomplete")
return votes
async def main(args: argparse.Namespace) -> int:
print(f"judges available — deepseek:{bool(DEEPSEEK_KEY)} gemini:{bool(GEMINI_KEY)} "
f"claude:local\n", flush=True)
pending = await db.list_halachot(review_status="pending_review", limit=5000)
if args.limit:
pending = pending[: args.limit]
NLI = "nli_unsupported"
DEFECT = {"quote_unverified", "truncated_quote", "thin_restatement", "near_duplicate"}
def bucket(h):
flags = set(h.get("quality_flags") or [])
if not flags:
return "clean"
if flags & DEFECT:
return "defect" # genuine extraction problem → re-extraction
if NLI in flags:
return "nli" # re-adjudicate entailment
return "other"
buckets = defaultdict(list)
for h in pending:
buckets[bucket(h)].append(h)
print("queue:", {k: len(v) for k, v in buckets.items()}, "\n", flush=True)
sem = asyncio.Semaphore(args.concurrency)
results = {"clean": [], "nli": []}
async with httpx.AsyncClient() as client:
async def run(h, system_fn, user_fn, key, tag):
async with sem:
v = await panel_vote(client, system_fn, user_fn(h), key)
v["_h"] = h
results[tag].append(v)
tasks = []
for h in buckets["clean"]:
tasks.append(run(h, KEEP_SYSTEM, _keep_user, "keep", "clean"))
for h in buckets["nli"]:
tasks.append(run(h, NLI_SYSTEM, _nli_user, "entailed", "nli"))
# bounded fan-out
for i in range(0, len(tasks), args.concurrency):
await asyncio.gather(*tasks[i : i + args.concurrency])
done = len(results["clean"]) + len(results["nli"])
print(f"{done}/{len(tasks)} judged", flush=True)
# ── report ──
def summarize(rows, yes_label, no_label):
c = Counter(r["_verdict"] for r in rows)
return c
print("\n" + "=" * 60)
print("PANEL DRY-RUN (no DB writes)")
print("=" * 60)
clean = results["clean"]
cc = summarize(clean, "keep", "drop")
print(f"\nBUCKET 1 — clean, below threshold ({len(clean)}):")
print(f" ✓ auto-APPROVE (3/3 keep): {cc['unanimous_yes']}")
print(f" ✗ auto-REJECT (3/3 drop): {cc['unanimous_no']}")
print(f" → CHAIR (split): {cc['split']}")
print(f" ? incomplete (judge errors): {cc['incomplete']}")
nli = results["nli"]
nc = summarize(nli, "entailed", "not")
print(f"\nBUCKET 2 — nli_unsupported ({len(nli)}):")
print(f" ✓ clear-flag + APPROVE (3/3 entailed): {nc['unanimous_yes']}")
print(f" ✗ confirm-flag (3/3 not-entailed): {nc['unanimous_no']}")
print(f" → CHAIR (split): {nc['split']}")
print(f" ? incomplete: {nc['incomplete']}")
print(f"\nBUCKET 3 — extraction defects ({len(buckets['defect'])}): → re-extraction")
if buckets["other"]:
print(f"BUCKET 4 — other flags ({len(buckets['other'])}): → chair")
auto = cc["unanimous_yes"] + cc["unanimous_no"] + nc["unanimous_yes"] + nc["unanimous_no"]
chair = cc["split"] + nc["split"] + cc["incomplete"] + nc["incomplete"] + len(buckets["other"])
reext = len(buckets["defect"])
print("\n" + "-" * 60)
print(f"NET: {len(pending)} pending → panel resolves {auto} automatically, "
f"{chair} to chair, {reext} to re-extraction")
print(f" chair queue collapses {len(pending)}{chair}")
Path("/tmp/halacha_panel_dryrun.json").write_text(json.dumps(
[{**{k: v for k, v in r.items() if not k.startswith("_h")},
"id": str(r["_h"]["id"]), "case": r["_h"].get("case_number"),
"rule": (r["_h"].get("rule_statement") or "")[:120]}
for r in clean + nli], ensure_ascii=False, indent=1))
print("\nper-item verdicts → /tmp/halacha_panel_dryrun.json")
# ── apply the chair-approved policy (reversible; backup first) ──────────
# CLEAN → majority 2/3 (keep→approved, drop→rejected, tie→chair)
# NLI → asymmetric: unanimous-entailed → clear nli flag (+approve if clean),
# majority not-entailed → rejected, else → chair
# DEFECT → untouched (needs re-extraction)
if not args.apply:
print("\n(dry-run — pass --apply to write the approved policy)")
return 0
def majority(v: dict) -> bool | None:
vs = [v[k] for k in ("claude", "deepseek", "gemini") if v[k] is not None]
if len(vs) < 2:
return None
y, n = sum(vs), len(vs) - sum(vs)
return True if y > n else (False if n > y else None)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
audit = Path(__file__).resolve().parent.parent / "data" / "audit"
audit.mkdir(parents=True, exist_ok=True)
backup = audit / f"halacha-panel-apply-backup-{ts}.csv"
with backup.open("w", encoding="utf-8", newline="") as f:
w = csv.writer(f)
w.writerow(["id", "review_status", "quality_flags"])
for r in clean + nli:
h = r["_h"]
w.writerow([h["id"], h["review_status"], "|".join(h.get("quality_flags") or [])])
pool = await db.get_pool()
REV = "panel:opus+deepseek+gemini"
approved = rejected = cleared = chair = 0
for r in clean:
d = majority(r)
if d is True:
await pool.execute("UPDATE halachot SET review_status='approved', "
"reviewed_at=now(), reviewer=$2, updated_at=now() WHERE id=$1",
r["_h"]["id"], REV + " 2/3-keep")
approved += 1
elif d is False:
await pool.execute("UPDATE halachot SET review_status='rejected', "
"reviewed_at=now(), reviewer=$2, updated_at=now() WHERE id=$1",
r["_h"]["id"], REV + " 2/3-drop")
rejected += 1
else:
chair += 1
for r in nli:
vs = [r[k] for k in ("claude", "deepseek", "gemini") if r[k] is not None]
unanimous_yes = len(vs) == 3 and all(vs)
maj_no = len(vs) >= 2 and sum(vs) < len(vs) - sum(vs)
if unanimous_yes:
rest = [x for x in (r["_h"].get("quality_flags") or []) if x != "nli_unsupported"]
if rest: # other flags remain → clear nli but keep in queue
await pool.execute("UPDATE halachot SET quality_flags=$2, updated_at=now() "
"WHERE id=$1", r["_h"]["id"], rest)
cleared += 1; chair += 1
else: # nli was the only blocker → clear + approve
await pool.execute("UPDATE halachot SET quality_flags='{}', "
"review_status='approved', reviewed_at=now(), reviewer=$2, "
"updated_at=now() WHERE id=$1", r["_h"]["id"], REV + " 3/3-entailed")
approved += 1; cleared += 1
elif maj_no:
await pool.execute("UPDATE halachot SET review_status='rejected', "
"reviewed_at=now(), reviewer=$2, updated_at=now() WHERE id=$1",
r["_h"]["id"], REV + " maj-not-entailed")
rejected += 1
else:
chair += 1
print(f"\nAPPLIED (reversible): approved {approved} · rejected {rejected} · "
f"nli-flag-cleared {cleared} · left to chair {chair + len(buckets['defect'])} "
f"(incl. {len(buckets['defect'])} defects for re-extraction)")
print(f"backup → {backup}")
return 0
if __name__ == "__main__":
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--limit", type=int, default=0)
ap.add_argument("--concurrency", type=int, default=6)
ap.add_argument("--apply", action="store_true", help="(not yet wired — dry-run only)")
raise SystemExit(asyncio.run(main(ap.parse_args())))