Files
legal-ai/mcp-server/src/legal_mcp/services/panel_extraction.py
Chaim 4ca907b97f
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 4s
Lint — undefined names / undefined-names (pull_request) Successful in 11s
feat(principles): retroactive cull (Phase C) + source-derived terminology (Phase D, #152)
Phase C — scripts/cull_principles.py: re-adjudicates every existing 'original'
principle with the SAME panel regime (panel_keep_score → classify → apply_cap),
reversible (CSV backup + rejected canonical recoverable), usage-throttled.
panel_extraction.panel_keep_score + apply_cap (shared, G2). Dry-run on 3
decisions: 37→15 survive.

Phase D — services/principles.py: source-derived label הלכה (binding court) /
כלל פרשני (committee) / עיקרון (persuasive); umbrella עקרונות משפטיים. Wired into
canonical_halacha_get/list (principle_class+principle_label). UI string changes
deferred to the Claude Design gate. spec INV-LRN7; SCRIPTS.md; 7 new tests; 428 green.

Phase E needs no new code — synthesis already targets pending_synthesis, which the
cull leaves only on survivors (rejected canonicals → 'rejected').

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-19 11:14:59 +00:00

324 lines
15 KiB
Python

"""Tri-model panel extraction regime (legal-principles-redesign, #152).
The shared core (G2) for BOTH the going-forward extractor (Phase B) and the
retroactive cull (Phase C). chaim 2026-06-19:
1. THREE models (Claude local + DeepSeek + Gemini) deep-analyze a decision and
each PROPOSES candidate principles, each with a 0-1 score.
2. Candidates are matched ACROSS models by embedding cosine → a "merged
candidate" carries: votes (# distinct models that proposed it) and score
(mean of the voters' scores).
3. Approval rule:
votes == 3 → approved (even if score < floor)
votes >= 2 AND score >= SCORE_FLOOR → approved
votes == 2 AND score < SCORE_FLOOR → pending_review (chair, G10)
votes <= 1 → rejected (dropped)
4. The CALLER applies the corpus-dedup (V41 link → frees a slot) and the
MAX_NEW cap (top-N approved-new by score). This module is corpus-agnostic
and DB-free so it is unit-testable and reused identically by B and C.
Terminology (#152): a principle from a binding higher court is a הלכה; one from
the appeals committee (internal_committee) is a כלל פרשני (interpretive rule) —
the committee applies law, it does not make binding precedent. The extract prompt
adapts to ``source_kind`` and, for the committee, demands genuine novelty.
"""
from __future__ import annotations
import logging
import math
import httpx
from legal_mcp import config
from legal_mcp.services import embeddings, panel_judges
logger = logging.getLogger(__name__)
_RULE_TYPES = ("holding", "interpretive", "procedural") # citable kinds only
def _extract_system(source_kind: str, is_binding: bool, max_candidates: int) -> str:
if source_kind == "internal_committee":
nature = (
"המקור הוא החלטת ועדת-ערר. ועדת ערר מיישמת דין קיים ואינה יוצרת הלכה מחייבת. "
"חלץ אך ורק כללים פרשניים חדשים לגמרי שהוועדה גיבשה — לא יישום של הלכה ידועה, "
"לא חזרה על דין מוכר, ולא תיאור עובדות. אם אין כלל פרשני חדש אמיתי — החזר []."
)
elif is_binding:
nature = (
"המקור הוא פסק-דין של בית-משפט מחוזי/עליון. חלץ הלכות — כללים משפטיים "
"בני-הכללה והסתמכות שהפסק קובע או מאמץ ומיישם."
)
else:
nature = (
"המקור הוא פסיקה משכנעת (לא-מחייבת). חלץ עקרונות משפטיים בני-הכללה בלבד."
)
return (
"אתה משפטן בכיר בוועדת ערר לתכנון ובנייה, מנתח פסיקה לבסיס-ידע בר-ציטוט. "
f"{nature}\n\n"
"כללי-ברזל:\n"
"• רק עיקרון כללי בר-הכללה והסתמכות — לא החלה תלוית-עובדות/צדדים/סכומים, "
"לא אמרת-אגב (סוגיה שלא הוכרעה), לא חזרה מילולית על הציטוט ללא הפשטה.\n"
"• כל עיקרון חייב עיגון: ציטוט מילולי מהמקור התומך בו (INV-AH).\n"
f"• החזר עד {max_candidates} המועמדים החזקים ביותר בלבד; מוטב מעט ואיכותי.\n\n"
"פלט — JSON array בלבד, ללא markdown:\n"
"[{\n"
' "rule_statement": "<העיקרון, כללי ובלתי-תלוי-תיק>",\n'
' "supporting_quote": "<ציטוט מילולי מהמקור>",\n'
' "reasoning_summary": "<מדוע זה עיקרון בר-הסתמכות>",\n'
' "rule_type": "holding|interpretive|procedural",\n'
' "score": 0.0-1.0\n'
"}]\n"
"אם אין עקרונות ראויים — החזר []."
)
def _coerce_list(reply) -> list[dict]:
"""A judge may return a list, or {"principles":[...]}/{"items":[...]}, or junk."""
if isinstance(reply, list):
items = reply
elif isinstance(reply, dict):
for k in ("principles", "items", "halachot", "results", "candidates"):
if isinstance(reply.get(k), list):
items = reply[k]
break
else:
items = [reply] if reply.get("rule_statement") else []
else:
return []
out = []
for it in items:
if not isinstance(it, dict):
continue
rule = (it.get("rule_statement") or "").strip()
quote = (it.get("supporting_quote") or "").strip()
if not rule or not quote:
continue
rt = (it.get("rule_type") or "interpretive").strip().lower()
try:
score = float(it.get("score", 0.0))
except (TypeError, ValueError):
score = 0.0
out.append({
"rule_statement": rule,
"supporting_quote": quote,
"reasoning_summary": (it.get("reasoning_summary") or "").strip(),
"rule_type": rt if rt in _RULE_TYPES else "interpretive",
"score": max(0.0, min(1.0, score)),
})
return out
def _cosine(a: list[float], b: list[float]) -> float:
dot = sum(x * y for x, y in zip(a, b))
na = math.sqrt(sum(x * x for x in a))
nb = math.sqrt(sum(y * y for y in b))
return 0.0 if na == 0 or nb == 0 else dot / (na * nb)
def classify(votes: int, score: float) -> str:
"""The chair's approval rule → 'approved' | 'pending_review' | 'rejected'."""
floor = config.HALACHA_PANEL_SCORE_FLOOR
if votes >= 3:
return "approved"
if votes == 2:
return "approved" if score >= floor else "pending_review"
return "rejected"
def apply_cap(judged: list[dict], max_new: int | None = None) -> list[dict]:
"""Per-decision cap for the retroactive cull (#152, Phase C).
``judged`` = a decision's principles, each with a panel ``verdict`` + ``score``.
Survivors (approved/pending_review) are ranked by score; those beyond ``max_new``
are downgraded to 'rejected' (over-cap). Already-rejected stay rejected. Returns
a new list with ``final_verdict`` set on each (order preserved). Pure.
"""
max_new = config.HALACHA_PANEL_MAX_NEW if max_new is None else max_new
survivors = [j for j in judged if j.get("verdict") in ("approved", "pending_review")]
survivors.sort(key=lambda j: j.get("score", 0.0), reverse=True)
keep_ids = {id(j) for j in survivors[:max_new]}
out = []
for j in judged:
v = j.get("verdict")
if v in ("approved", "pending_review") and id(j) not in keep_ids:
final = "rejected" # over the cap
else:
final = v
out.append({**j, "final_verdict": final})
return out
def cluster_candidates(
per_model: dict[str, list[dict]], embs: dict[int, list[float]],
) -> list[dict]:
"""Greedy cross-model clustering. ``per_model`` maps judge→its candidate list;
``embs`` maps id(candidate)→embedding. Each cluster merges near-duplicate
proposals: votes = # distinct models present, score = mean of each model's
BEST score in the cluster, representative = highest-scoring member.
Pure (no I/O) given the embeddings — unit-testable.
"""
match = config.HALACHA_PANEL_MATCH_COSINE
clusters: list[dict] = []
# deterministic order: model order, then model-local order
flat: list[tuple[str, dict]] = []
for m in panel_judges.JUDGE_NAMES:
for c in per_model.get(m, []):
flat.append((m, c))
for model, cand in flat:
emb = embs.get(id(cand))
placed = False
if emb is not None:
for cl in clusters:
if cl["_emb"] is not None and _cosine(cl["_emb"], emb) >= match:
cl["members"].append({"model": model, **cand})
prev = cl["per_model_score"].get(model, -1.0)
cl["per_model_score"][model] = max(prev, cand["score"])
if cand["score"] > cl["score_rep"]:
cl["score_rep"] = cand["score"]
cl["rule_statement"] = cand["rule_statement"]
cl["supporting_quote"] = cand["supporting_quote"]
cl["reasoning_summary"] = cand["reasoning_summary"]
cl["rule_type"] = cand["rule_type"]
cl["_emb"] = emb
placed = True
break
if not placed:
clusters.append({
"rule_statement": cand["rule_statement"],
"supporting_quote": cand["supporting_quote"],
"reasoning_summary": cand["reasoning_summary"],
"rule_type": cand["rule_type"],
"members": [{"model": model, **cand}],
"per_model_score": {model: cand["score"]},
"score_rep": cand["score"],
"_emb": emb,
})
out = []
for cl in clusters:
pms = cl["per_model_score"]
votes = len(pms)
score = sum(pms.values()) / votes if votes else 0.0
out.append({
"rule_statement": cl["rule_statement"],
"supporting_quote": cl["supporting_quote"],
"reasoning_summary": cl["reasoning_summary"],
"rule_type": cl["rule_type"],
"votes": votes,
"score": round(score, 4),
"voters": sorted(pms.keys()),
"verdict": classify(votes, score),
"embedding": cl["_emb"],
})
# strongest first
out.sort(key=lambda c: (c["votes"], c["score"]), reverse=True)
return out
def _keep_score_system(source_kind: str, is_binding: bool) -> str:
if source_kind == "internal_committee":
nature = ("המקור הוא החלטת ועדת-ערר (מיישמת דין, אינה יוצרת הלכה). ראוי-לשמירה = "
"כלל פרשני חדש ובר-הכללה שהוועדה גיבשה; לא-ראוי = יישום תלוי-עובדות, "
"חזרה על דין מוכר, אמרת-אגב, או חזרה מילולית על הציטוט.")
else:
nature = ("ראוי-לשמירה = עיקרון משפטי בר-הכללה והסתמכות (הלכה/פרשנות/כלל-פרוצדורלי); "
"לא-ראוי = החלה תלוית-עובדות, אמרת-אגב, או חזרה מילולית על הציטוט.")
return (
"אתה משפטן בכיר בוועדת ערר לתכנון ובנייה. הוכרע אם עיקרון שחולץ מפסיקה ראוי "
f"להישמר כתקדים בר-ציטוט. {nature}\n"
"תן גם ציון-ביטחון 0-1 לכך שזהו עיקרון בר-הסתמכות אמיתי.\n"
'החזר JSON בלבד: {"keep": true/false, "score": 0.0-1.0, "reason": "<משפט קצר>"}. ללא markdown.'
)
async def panel_keep_score(
rule_statement: str,
supporting_quote: str,
reasoning_summary: str = "",
*,
source_kind: str = "external_upload",
is_binding: bool = True,
) -> dict:
"""Run the 3-judge panel on ONE existing principle (Phase C cull, #152).
Each judge votes keep + score; votes = # keepers, score = mean of the keepers'
scores (chaim: "ממוצע המצביעים"), verdict via the shared :func:`classify`.
Returns {votes, score, verdict, voters, per_judge} — per_judge keeps raw
replies for the active-learning round (FU-1). Used by the retroactive cull;
the extractor uses :func:`panel_extract` instead.
"""
import asyncio
system = _keep_score_system(source_kind, is_binding)
user = (f"ניסוח העיקרון:\n{rule_statement}\n\n"
f"היגיון:\n{reasoning_summary}\n\nציטוט תומך:\n{supporting_quote}")
async with httpx.AsyncClient() as client:
c, ds, gm = await asyncio.gather(
panel_judges.judge_claude(system, user, max_tokens=300),
panel_judges.judge_deepseek(client, system, user, max_tokens=300),
panel_judges.judge_gemini(client, system, user, max_tokens=2000),
)
raw = {"claude": c, "deepseek": ds, "gemini": gm}
keepers, scores = [], []
for name, reply in raw.items():
if panel_judges.to_bool(reply, "keep"):
keepers.append(name)
try:
scores.append(max(0.0, min(1.0, float(reply.get("score", 0.0)))))
except (TypeError, ValueError):
scores.append(0.0)
votes = len(keepers)
score = round(sum(scores) / votes, 4) if votes else 0.0
return {"votes": votes, "score": score, "verdict": classify(votes, score),
"voters": sorted(keepers), "per_judge": raw}
async def _run_three(system: str, user: str, max_tokens: int) -> dict[str, object]:
async with httpx.AsyncClient() as client:
import asyncio
c, ds, gm = await asyncio.gather(
panel_judges.judge_claude(system, user, max_tokens=max_tokens),
panel_judges.judge_deepseek(client, system, user, max_tokens=max_tokens),
panel_judges.judge_gemini(client, system, user, max_tokens=max_tokens),
)
return {"claude": c, "deepseek": ds, "gemini": gm}
async def panel_extract(
text: str,
*,
source_kind: str = "external_upload",
is_binding: bool = True,
propose_n: int | None = None,
) -> list[dict]:
"""Run the 3-model panel over a decision's text → merged candidate principles.
Returns clusters (strongest first), each:
{rule_statement, supporting_quote, reasoning_summary, rule_type,
votes, score, voters, verdict, embedding}
Does NOT dedup vs the corpus and does NOT apply the MAX_NEW cap — the caller
(extractor / cull) owns those (they need DB + differ B vs C).
"""
propose_n = propose_n if propose_n is not None else config.HALACHA_PANEL_MAX_NEW + 3
system = _extract_system(source_kind, is_binding, propose_n)
user = f"--- תחילת המקור ---\n{text}\n--- סוף המקור ---"
replies = await _run_three(system, user, max_tokens=8000)
per_model: dict[str, list[dict]] = {}
for name in panel_judges.JUDGE_NAMES:
per_model[name] = _coerce_list(replies.get(name))
if not any(per_model.values()):
logger.warning("panel_extract: all three judges returned no candidates")
return []
# embed every candidate's rule_statement for cross-model matching
flat = [c for m in panel_judges.JUDGE_NAMES for c in per_model[m]]
embs: dict[int, list[float]] = {}
if flat:
vecs = await embeddings.embed_texts([c["rule_statement"] for c in flat])
for c, v in zip(flat, vecs):
embs[id(c)] = list(v)
return cluster_candidates(per_model, embs)