Files
legal-ai/mcp-server/src/legal_mcp/services/claims_extractor.py
Chaim d05c1e3fce
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
fix(extractors): disable tools on text→JSON claude_session calls (no error_max_turns)
כל קריאות text→JSON ב-9 המחלצים העבירו את ברירת-המחדל של ה-CLI (כל הכלים
פעילים). המודל פלט מדי פעם stop_reason:"tool_use", מה שמפיל את --max-turns 1
ל-error_max_turns ומאלץ retry — ~$0.12-0.16 לניסיון, × 3. נצפה ב-drain
חילוץ-ההלכות (legal-halacha-drain, ‎15 כשלי error_max_turns ב-error.log).

התשתית כבר קיימת: claude_session.query מקבל tools=""‎ לנטרול כל הכלים, ושני
מחלצים (digest_metadata_extractor, bulletin_splitter) כבר משתמשים בו. כאן רק
מיישרים את שאר המחלצים לאותו מסלול קנוני — אף קריאת חילוץ/שיפוט/סיווג טהורה
לא צריכה כלי.

מתוקנים (11 קריאות, 9 קבצים): halacha_extractor (×3: extract/NLI/consolidate),
corroboration, claims_extractor, argument_aggregator, appraiser_facts_extractor,
learning_loop, qa_validator, brainstorm, style_metadata_extractor.

Invariants: מקיים INV-G2 (מסלול קנוני יחיד; סימטריה בין מחלצים-אחים) — לא מסלול
מקביל חדש אלא שימוש עקבי בפרמטר הקיים. אין בליעה שקטה (§6) — נתיבי הכשל/retry
נשמרים. ללא שינוי-ספ.

בדיקות: 60/60 ב-tests/test_halacha_coerce.py + test_halacha_quality.py עוברות;
py_compile נקי על כל 9 הקבצים.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-11 11:49:35 +00:00

377 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""חילוץ טענות מכתבי טענות (ערר, תשובה) באמצעות Claude Code session.
שתי גישות:
1. extract_claims_with_ai — חילוץ עם Claude Code headless (לכתבי טענות קלט)
2. extract_claims_from_block — חילוץ regex (מבלוק ז של החלטות סופיות)
"""
from __future__ import annotations
import asyncio
import logging
import re
from uuid import UUID
from legal_mcp import config
from legal_mcp.config import parse_llm_json
from legal_mcp.services import db, claude_session
logger = logging.getLogger(__name__)
# Each chunk targets ~12K chars (≈3K tokens of Hebrew). Smaller than the
# previous 25K because:
# • A single ``claude -p`` call on a 25K-char Hebrew prompt with cold
# cache routinely hit ~150-180s. 12K chunks finish in ~60-90s.
# • Per-chunk retry costs less when chunks are smaller.
# • Parallel chunks benefit more — see CHUNK_CONCURRENCY.
CHUNK_TARGET_CHARS = 12000
# How many chunks to send to Claude in parallel. Each subprocess holds
# ~300 MB RSS plus its own MCP stack; concurrency=3 keeps the box usable.
CHUNK_CONCURRENCY = 3
# How many retry attempts per failed chunk before giving up on it.
CHUNK_RETRY_ATTEMPTS = 1
EXTRACT_CLAIMS_PROMPT = """אתה מנתח מסמכים משפטיים בתחום תכנון ובניה. תפקידך לחלץ טענות מכתב טענות.
## כללים חשובים:
1. **נאמנות למקור** — כל טענה חייבת לשקף את מה שנכתב, לא לפרש או להוסיף.
2. **טענה = טיעון מובחן אחד** — אם פסקה מכילה 2 טיעונים שונים, פצל לשתי טענות.
3. **כל טענה חייבת להיות מובנת בפני עצמה** — גם בלי הקשר המסמך המלא.
4. **שמור על לשון הגוף שלישי** — גם אם המקור בגוף ראשון.
## סוג הצד (party_role):
- appellant — עורר/ת (מי שמגיש את הערר)
- respondent — משיב/ה (הצד שכנגד, לא הוועדה)
- committee — ועדה מקומית
- permit_applicant — מבקש/ת היתר
## פלט:
החזר JSON array בלבד — ללא markdown, ללא הסברים, רק JSON:
[{"party_role": "appellant", "claim_text": "הטענה בגוף שלישי", "topic": "נושא"}]
חשוב:
- claim_text קצר — עד 150 מילים לכל טענה
- קבץ טענות דומות לטענה אחת
- אם אין טענות החזר []
"""
# Section markers we treat as natural chunk boundaries when present.
# Hebrew legal briefs almost always use numbered sections like "10." or
# letter-section headings (".א", ".ב"). Splitting between sections keeps
# every chunk a self-contained argumentative unit.
_SECTION_BOUNDARY_RE = re.compile(
r"\n\s*("
r"\d+\.\s+\S" # numbered section: "10. טענות"
r"|[א-ת]\.\s+\S" # Hebrew letter section: "א. רקע"
r"|##\s+\S" # markdown heading
r"|פרק\s+\S" # "פרק" headings
r")"
)
def _split_by_sections(text: str, target: int = CHUNK_TARGET_CHARS) -> list[str]:
"""Split a long document into roughly ``target``-sized chunks at section
boundaries. Falls back to paragraph breaks, then to hard splits if a
section happens to be larger than ``target`` on its own.
"""
if len(text) <= target:
return [text]
boundaries = [m.start() for m in _SECTION_BOUNDARY_RE.finditer(text)]
boundaries = [0, *boundaries, len(text)]
chunks: list[str] = []
start = 0
for cut in boundaries[1:]:
# Greedy: keep adding sections to the current chunk until adding
# the next one would push past ``target``.
if cut - start < target:
continue
end = cut
if end - start > target * 1.5:
# Section group exceeds 1.5× target — fall back to paragraph
# break inside it to avoid one chunk being far too big.
soft = text.rfind("\n\n", start, start + target)
if soft > start + target // 2:
end = soft
chunks.append(text[start:end].strip())
start = end
if start < len(text):
chunks.append(text[start:].strip())
# Hard splits for any chunk that is still too large (rare, but
# documents without any section markers can fall through).
final: list[str] = []
for c in chunks:
if len(c) <= target * 1.5:
final.append(c)
continue
for i in range(0, len(c), target):
final.append(c[i:i + target])
return [c for c in final if c.strip()]
async def _extract_chunk(
chunk: str,
chunk_index: int,
chunk_total: int,
context: str,
) -> tuple[int, list[dict] | None]:
"""Run extraction on one chunk with retry. Returns ``(chunk_index, claims_or_None)``.
None means the chunk failed both the initial call and every retry
(caller can use this to mark the result as partial).
"""
chunk_label = f" (חלק {chunk_index + 1}/{chunk_total})" if chunk_total > 1 else ""
prompt = (
f"{EXTRACT_CLAIMS_PROMPT}\n\n"
f"{context}{chunk_label}\n\n"
f"--- תחילת מסמך ---\n{chunk}\n--- סוף מסמך ---"
)
last_err: Exception | None = None
for attempt in range(CHUNK_RETRY_ATTEMPTS + 1):
try:
claims = await claude_session.query_json(prompt, tools="") # no tool_use → no error_max_turns
except Exception as e:
last_err = e
logger.warning(
"extract_claims chunk %d/%d attempt %d raised: %s",
chunk_index + 1, chunk_total, attempt + 1, e,
)
continue
if isinstance(claims, list):
return chunk_index, claims
logger.warning(
"extract_claims chunk %d/%d attempt %d returned non-list (%s)",
chunk_index + 1, chunk_total, attempt + 1, type(claims).__name__,
)
logger.error(
"extract_claims chunk %d/%d failed after %d attempts: %s",
chunk_index + 1, chunk_total, CHUNK_RETRY_ATTEMPTS + 1, last_err,
)
return chunk_index, None
async def extract_claims_with_ai(
text: str,
doc_type: str = "appeal",
party_hint: str = "",
) -> list[dict]:
"""חילוץ טענות מכתב טענות באמצעות Claude.
Splits ``text`` at section boundaries, runs every chunk through
Claude in parallel (bounded by ``CHUNK_CONCURRENCY``), retries each
failed chunk once, and merges the results in original document order.
Failed chunks are logged but don't block the overall extraction —
we return what we got and surface the gap via the logs.
Args:
text: טקסט המסמך
doc_type: סוג המסמך (appeal/response)
party_hint: רמז לזהות הצד (אם ידוע)
Returns:
רשימת טענות עם party_role, claim_text, topic, claim_index.
"""
context = f"סוג המסמך: {doc_type}"
if party_hint:
context += f"\nהצד המגיש: {party_hint}"
chunks = _split_by_sections(text)
if len(chunks) > 1:
logger.info(
"extract_claims: split %d chars into %d chunks (target=%d, concurrency=%d)",
len(text), len(chunks), CHUNK_TARGET_CHARS, CHUNK_CONCURRENCY,
)
sem = asyncio.Semaphore(CHUNK_CONCURRENCY)
async def _bounded(idx: int, c: str) -> tuple[int, list[dict] | None]:
async with sem:
return await _extract_chunk(c, idx, len(chunks), context)
results = await asyncio.gather(*[_bounded(i, c) for i, c in enumerate(chunks)])
# Merge in original order. Skip chunks that failed entirely.
failed = [i for i, r in results if r is None]
if failed:
logger.warning(
"extract_claims: %d/%d chunks failed (indices=%s) — returning partial result",
len(failed), len(chunks), failed,
)
merged: list[dict] = []
for idx, claims in sorted(results, key=lambda x: x[0]):
if not claims:
continue
merged.extend(claims)
# Add claim_index and drop entries missing required fields.
cleaned: list[dict] = []
for i, claim in enumerate(merged):
if not isinstance(claim, dict):
continue
if "party_role" not in claim or "claim_text" not in claim:
continue
claim["claim_index"] = i
cleaned.append(claim)
return cleaned
def _infer_claim_type(doc_type: str, source_name: str) -> str:
"""Determine claim_type from document type and title.
- 'claim' = from appeal documents (כתב ערר)
- 'response' = from original response documents (כתב תשובה)
- 'reply' = from supplementary responses (תגובה, השלמת טיעון)
"""
name_lower = source_name.lower() if source_name else ""
if doc_type == "appeal" or "כתב ערר" in name_lower:
return "claim"
if "כתב תשובה" in name_lower:
return "response"
if any(kw in name_lower for kw in ["תגובת", "השלמת טיעון", "תגובה"]):
return "reply"
if doc_type == "response":
return "response"
return "claim"
# ── Regex-based extraction (from existing decisions) ──────────────
PARTY_PATTERNS = [
(r"טענות\s*העוררי[םן]|טענות\s*העורר\b|טענות\s*המבקש|טענות\s*המערער", "appellant"),
(r"עמדת\s*הוועדה\s*המקומית|עמדת\s*המשיבה|טענות\s*המשיבה|תגובת\s*המשיבה|הוועדה\s*המקומית$", "committee"),
(r"עמדת\s*המשיבי[םן]|עמדת\s*המשיב\b|טענות\s*המשיבי[םן]|טענות\s*המשיב\b", "respondent"),
(r"מבקשי\s*ההיתר|עמדת\s*מבקש|עמדת\s*היזם|מגישי\s*התכנית", "permit_applicant"),
(r"הבהרות\s*השמא|התייחסות\s*הצדדים", "appraiser"),
]
def _detect_party_role(line: str) -> str | None:
for pattern, role in PARTY_PATTERNS:
if re.search(pattern, line):
return role
return None
def extract_claims_from_block(text: str) -> list[dict]:
"""חילוץ טענות מבלוק ז של החלטה סופית (regex-based).
Replicates the logic from scripts/extract-claims.py for use as a service.
"""
lines = text.split("\n")
claims = []
current_role = "appellant"
current_claim_lines: list[str] = []
claim_index = 0
for line in lines:
stripped = line.strip()
if not stripped:
continue
role = _detect_party_role(stripped) if len(stripped.split()) <= 8 else None
if role:
if current_claim_lines:
claim_text = "\n".join(current_claim_lines).strip()
if len(claim_text) > 30:
claims.append({
"party_role": current_role,
"claim_text": claim_text,
"claim_index": claim_index,
})
claim_index += 1
current_claim_lines = []
current_role = role
continue
# Numbered sub-header starts new claim
if re.match(r"^\d+\.\s+\S.{3,40}$", stripped):
if current_claim_lines:
claim_text = "\n".join(current_claim_lines).strip()
if len(claim_text) > 30:
claims.append({
"party_role": current_role,
"claim_text": claim_text,
"claim_index": claim_index,
})
claim_index += 1
current_claim_lines = [stripped]
continue
# Each paragraph is a claim
if current_claim_lines:
claim_text = "\n".join(current_claim_lines).strip()
if len(claim_text) > 30:
claims.append({
"party_role": current_role,
"claim_text": claim_text,
"claim_index": claim_index,
})
claim_index += 1
current_claim_lines = [stripped]
# Last claim
if current_claim_lines:
claim_text = "\n".join(current_claim_lines).strip()
if len(claim_text) > 30:
claims.append({
"party_role": current_role,
"claim_text": claim_text,
"claim_index": claim_index,
})
return claims
async def extract_and_store_claims(
case_id: UUID,
document_id: UUID,
text: str,
doc_type: str = "appeal",
party_hint: str = "",
) -> dict:
"""חילוץ טענות ושמירה ב-DB.
Args:
case_id: מזהה התיק
document_id: מזהה המסמך
text: טקסט המסמך
doc_type: סוג (appeal/response)
party_hint: שם הצד המגיש
Returns:
סיכום: כמה טענות חולצו, לפי צד
"""
doc = await db.get_document(document_id)
source_name = doc["title"] if doc else str(document_id)
claims = await extract_claims_with_ai(text, doc_type, party_hint)
if not claims:
return {"status": "no_claims", "total": 0, "source": source_name}
# Determine claim_type from document type and title
claim_type = _infer_claim_type(doc_type, source_name)
for c in claims:
c["claim_type"] = claim_type
stored = await db.store_claims(case_id, claims, source_document=source_name)
# Summarize by role
role_counts: dict[str, int] = {}
for c in claims:
role = c["party_role"]
role_counts[role] = role_counts.get(role, 0) + 1
return {
"status": "completed",
"total": stored,
"by_role": role_counts,
"source": source_name,
}