Files
legal-ai/mcp-server/src/legal_mcp/services/halacha_quality.py
Chaim fb60dca796 feat(halacha): over-extraction consolidation — fold facets via claude_session (#81.5)
After a precedent finishes extracting, a claude_session pass folds facets of the
SAME legal question (below #82's dedup cosine — the שפר 14-vs-4 / 403-17→89
granularity gap) into one canonical; the rest are marked 'rejected' (reversible:
out of the active corpus AND the review queue, but recoverable). FOLD-ONLY —
never merges distinct legal questions, never invents.

- Engine: claude_session-as-judge (local CLI, zero cost), 'high' effort — folding
  needs careful judgment. One pass per precedent, runs in _extract_impl once all
  chunks are done (the prompt dedups within a chunk; this catches across chunks).
- Pure, unit-tested helpers in halacha_quality: CONSOLIDATE_SYSTEM,
  build_consolidation_prompt, parse_fold_groups (fails SAFE → [] on any malformed
  shape; drops <2-member groups; coerces/dedups indices).
- halacha_extractor._consolidate_precedent picks the canonical per group
  (approved>pending, higher confidence, quote_verified, longer) and rejects the
  rest via the existing update_halachot_batch (#84). Never rejects a canonical.
  Fails OPEN on any error (no CLI / parse fail → 0 folds, data untouched).
- config: HALACHA_CONSOLIDATE_ENABLED/MODEL/EFFORT.

Verified: suite 176 passed (10 new); integration vs dev DB — a 2-facet group
folds to 1 canonical + 1 rejected (tagged), distinct rules untouched, claude
error → 0 folds (fail-open).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 16:26:44 +00:00

268 lines
11 KiB
Python

"""Pure quality validators + dedup helpers for halacha extraction.
These encode the "strict rules" rubric (docs/halacha-strict-rubric.md) that
drove the 2026-06-03 corpus cleanup (1454→534), so that future extraction
comes out clean instead of accumulating duplicates, obiter dicta, truncated
quotes and thin restatements that clog the review queue.
Everything here is a PURE function (no DB, no LLM) so it is fully unit-tested.
The DB-touching dedup-on-insert (uses these helpers) lives in
``db.store_halachot_for_chunk``.
Flags produced by :func:`compute_quality_flags` BLOCK auto-approval (the item
routes to ``pending_review`` regardless of confidence) but never delete — the
chair still sees flagged items, just out of the auto-approved stream.
"""
from __future__ import annotations
import re
# ── Hebrew text normalization (shared with the extractor's quote check) ──
_HEB_QUOTE_VARIANTS = "\"'׳״‘’“”«»„′″"
def normalize_text(text: str) -> str:
"""Collapse whitespace and unify Hebrew quote-mark variants for matching.
Kept dependency-free (the extractor previously routed through
``proofreader._fix_hebrew_quotes``; here we inline a quote-class collapse so
this module stays pure and importable from anywhere).
"""
if not text:
return ""
# Unify the half-dozen quote/gershayim variants to a single ASCII quote.
unified = re.sub(f"[{re.escape(_HEB_QUOTE_VARIANTS)}]", '"', text)
return re.sub(r"\s+", " ", unified).strip()
# ── Non-decision / obiter detection (Wambaugh: the court did not decide) ──
#
# High-precision markers only. Phrases like "לכאורה" / "ניתן להניח" alone are
# too common to flag reliably, so we require the explicit "declined to rule"
# formulations the rubric calibration confirmed on שפר (idx 32: "איני רואה
# לקבוע מסמרות") and on 8027-25 (idx 18-19: "אין צורך להכריע").
NON_DECISION_MARKERS = (
"אין צורך להכריע",
"איני נדרש להכריע",
"איננו נדרשים להכריע",
"אין אנו נדרשים להכריע",
"מתייתר הצורך להכריע",
"אין צורך לקבוע מסמרות",
"מבלי לקבוע מסמרות",
"איני רואה לקבוע מסמרות",
"איננו רואים לקבוע מסמרות",
"אין לקבוע מסמרות",
"אין מקום לקבוע מסמרות",
"לא ראינו לקבוע מסמרות",
"למעלה מן הצורך",
"למעלה מהצורך",
"למעלה מן הדרוש",
"מעבר לנדרש",
"אגב אורחא",
"אגב אורחה",
)
def detect_non_decision(*texts: str) -> str | None:
"""Return the first non-decision marker found across ``texts`` (or None).
Scans rule_statement + reasoning_summary + supporting_quote — the court's
own hedge usually sits in the quote/reasoning, not the abstracted rule.
"""
joined = normalize_text(" ".join(t for t in texts if t))
for marker in NON_DECISION_MARKERS:
if marker in joined:
return marker
return None
# ── Truncated / incomplete supporting-quote detection ──
#
# Conservative: only flag a CLEAR mid-word cut — the quote's last whitespace-
# delimited token is a single Hebrew letter (a dangling construct/prefix such
# as the "...על ה" in 8099-02-17 idx 6). A complete clause ends in a full word,
# so this does not fire on quotes that merely lack a trailing period (the
# calibration showed ~1/3 of valid quotes drop the final period legitimately).
_HEB_LETTER = "א-ת"
def is_quote_truncated(quote: str) -> bool:
norm = normalize_text(quote)
if not norm:
return True
tokens = norm.split(" ")
last = tokens[-1].strip('".,;:)]')
# dangling single Hebrew letter at the end == cut mid-word
if len(last) == 1 and re.match(f"[{_HEB_LETTER}]", last):
return True
return False
# ── Thin restatement: rule_statement adds nothing over the quote ──
#
# Flag when the rule is essentially a copy of the quote: high token overlap AND
# the rule is no longer than the quote. A genuine halacha ABSTRACTS the rule, so
# it introduces wording the verbatim quote lacks and/or generalizes (longer or
# differently phrased).
_THIN_OVERLAP = 0.85
_THIN_LEN_RATIO = 1.10
def _tokens(text: str) -> set[str]:
norm = normalize_text(text)
return {t for t in re.split(r"[^א-ת0-9]+", norm) if len(t) > 1}
def is_thin_restatement(rule_statement: str, supporting_quote: str) -> bool:
rule_t = _tokens(rule_statement)
quote_t = _tokens(supporting_quote)
if not rule_t or not quote_t:
return False
overlap = len(rule_t & quote_t) / len(rule_t)
len_ratio = len(normalize_text(rule_statement)) / max(1, len(normalize_text(supporting_quote)))
return overlap >= _THIN_OVERLAP and len_ratio <= _THIN_LEN_RATIO
# ── Aggregate ──
FLAG_NON_DECISION = "non_decision"
FLAG_TRUNCATED_QUOTE = "truncated_quote"
FLAG_THIN_RESTATEMENT = "thin_restatement"
FLAG_QUOTE_UNVERIFIED = "quote_unverified"
FLAG_NLI_UNSUPPORTED = "nli_unsupported" # rule not entailed by its quote (#81.3)
# ── NLI entailment check (rule_statement ⊨ supporting_quote) — #81.3 ──
#
# Pure prompt-builder + verdict-parser; the LLM call itself runs through
# claude_session in halacha_extractor (local CLI, zero cost). A rule that the
# quote does not actually support (neutral) or contradicts is the model
# over-reaching beyond its source — flag it (blocks auto-approve). EVERYTHING
# here fails OPEN: any parse ambiguity resolves to "entailed" so a flaky judge
# never blocks a genuine halacha.
NLI_SYSTEM = (
"אתה בודק היסק (entailment) משפטי. לכל זוג {כלל, ציטוט} החלט האם **הכלל נובע מהציטוט** — "
"כלומר הציטוט תומך בכלל ואינו מרחיב מעבר למה שנכתב בו. שלוש תוויות בלבד:\n"
"- entailed = הכלל נתמך במלואו בציטוט.\n"
"- neutral = הציטוט אינו תומך בכלל (הכלל מרחיב/מוסיף מעבר לציטוט).\n"
"- contradiction = הכלל סותר את הציטוט.\n"
'החזר JSON array בלבד באורך מספר הזוגות, לדוגמה: ["entailed","neutral",...]. '
"ללא markdown, ללא הסבר."
)
_NLI_LABELS = {"entailed", "neutral", "contradiction"}
def build_nli_prompt(items: list[dict]) -> str:
"""Build the user message: a numbered list of {rule, quote} pairs."""
blocks = []
for i, h in enumerate(items, 1):
rule = (h.get("rule_statement") or "").strip()
quote = (h.get("supporting_quote") or "").strip()
blocks.append(f"### זוג {i}\nכלל: {rule}\nציטוט: {quote}")
return "\n\n".join(blocks)
def parse_nli_verdicts(raw, n: int) -> list[str]:
"""Coerce the judge's output into exactly ``n`` labels — fail-open.
Any shape mismatch / unknown label resolves to 'entailed' so a flaky or
unavailable judge never blocks a halacha.
"""
if not isinstance(raw, list) or len(raw) != n:
return ["entailed"] * n
out: list[str] = []
for item in raw:
v = item.get("verdict") if isinstance(item, dict) else item
v = str(v or "").strip().lower()
out.append(v if v in _NLI_LABELS else "entailed")
return out
# ── Over-extraction consolidation (fold facets of one legal question) — #81.5 ──
#
# #82 dedup-on-insert removes near-EXACT dups (cosine ≥ 0.93). #81.5 handles the
# remaining over-extraction: facets of the SAME legal question, phrased
# differently, that sit BELOW the dedup threshold (the שפר 14-vs-4 / 403-17→89
# granularity gap). A per-precedent claude_session pass groups such facets; the
# extractor keeps one canonical per group and marks the rest rejected (reversible,
# out of the active corpus + review queue). FOLD-ONLY — never merges distinct
# legal questions, never invents. Fails OPEN (parse error → no folds).
CONSOLIDATE_SYSTEM = (
"אתה מאחד פנים-כפולים של הלכות שחולצו מאותו פסק דין. בהינתן רשימה ממוספרת של הלכות, "
"זהה קבוצות של הלכות שהן **אותה שאלה משפטית** בניסוחים או פנים שונים. "
"כללים: (1) אַחֵד רק הלכות שעונות על אותה שאלה משפטית בדיוק; (2) **אל תאַחֵד** הלכות "
"שעונות על שאלות משפטיות שונות (גם אם קרובות בנושא); (3) הלכה ייחודית — אל תכלול בשום קבוצה. "
'החזר JSON array של קבוצות, כל קבוצה = array של מספרי-האינדקס שיש לאַחֵד (לפחות 2 חברים). '
"לדוגמה: [[2,5,9],[14,18]]. אם אין מה לאַחֵד החזר []. ללא markdown, ללא הסבר."
)
def build_consolidation_prompt(items: list[dict]) -> str:
"""Numbered list of a precedent's halachot (index + rule + reasoning)."""
blocks = []
for h in items:
idx = h.get("halacha_index")
rule = (h.get("rule_statement") or "").strip()
reason = (h.get("reasoning_summary") or "").strip()
line = f"[{idx}] {rule}"
if reason:
line += f" (היגיון: {reason})"
blocks.append(line)
return "\n".join(blocks)
def parse_fold_groups(raw) -> list[list[int]]:
"""Coerce judge output into a list of fold-groups (≥2 int indices each).
Fails SAFE: any malformed shape → [] (no folding). Non-int / <2-member
groups are dropped.
"""
if not isinstance(raw, list):
return []
groups: list[list[int]] = []
for g in raw:
if not isinstance(g, list):
continue
members: list[int] = []
for x in g:
try:
members.append(int(x))
except (TypeError, ValueError):
continue
# dedup within group, preserve order
seen: set[int] = set()
members = [m for m in members if not (m in seen or seen.add(m))]
if len(members) >= 2:
groups.append(members)
return groups
def compute_quality_flags(
rule_statement: str,
supporting_quote: str,
reasoning_summary: str = "",
quote_verified: bool = True,
) -> list[str]:
"""Return the list of quality flags for one halacha (empty == clean).
Any non-empty result blocks auto-approval (routes to pending_review).
"""
flags: list[str] = []
if detect_non_decision(rule_statement, reasoning_summary, supporting_quote):
flags.append(FLAG_NON_DECISION)
if is_quote_truncated(supporting_quote):
flags.append(FLAG_TRUNCATED_QUOTE)
if is_thin_restatement(rule_statement, supporting_quote):
flags.append(FLAG_THIN_RESTATEMENT)
if not quote_verified:
flags.append(FLAG_QUOTE_UNVERIFIED)
return flags