Files
legal-ai/mcp-server/src/legal_mcp/services/claims_extractor.py
Chaim bed9d5c7e9 Improve block-zayin: synthesize claims by topic + fix markdown JSON parsing
block_writer: Rewrote block-zayin prompt to require synthesis by topic
instead of listing each claim separately. Now produces 3 organized
sections (appellants 8, committee 6, permit applicants 3+) instead
of 40 scattered paragraphs. Target: 800-1500 words.

claims_extractor: Fix markdown code block stripping (same bug as
qa_validator had). Enables parsing claims from Claude responses
wrapped in ```json blocks.

Tested on Hecht: block-zayin from 40 paragraphs/1049 words to
17 organized paragraphs/1039 words. Structure now matches Dafna's
original (3 parties, grouped by topic).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 12:54:42 +00:00

264 lines
8.6 KiB
Python

"""חילוץ טענות מכתבי טענות (ערר, תשובה) באמצעות Claude API.
שתי גישות:
1. extract_claims_with_ai — חילוץ עם Claude (לכתבי טענות קלט)
2. extract_claims_from_block — חילוץ regex (מבלוק ז של החלטות סופיות)
"""
from __future__ import annotations
import json
import logging
import re
from uuid import UUID
import anthropic
from legal_mcp import config
from legal_mcp.services import db
logger = logging.getLogger(__name__)
_anthropic_client: anthropic.Anthropic | None = None
def _get_anthropic() -> anthropic.Anthropic:
global _anthropic_client
if _anthropic_client is None:
_anthropic_client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY)
return _anthropic_client
EXTRACT_CLAIMS_PROMPT = """אתה מנתח מסמכים משפטיים בתחום תכנון ובניה. תפקידך לחלץ טענות מכתב טענות.
## כללים חשובים:
1. **נאמנות למקור** — כל טענה חייבת לשקף את מה שנכתב, לא לפרש או להוסיף.
2. **טענה = טיעון מובחן אחד** — אם פסקה מכילה 2 טיעונים שונים, פצל לשתי טענות.
3. **כל טענה חייבת להיות מובנת בפני עצמה** — גם בלי הקשר המסמך המלא.
4. **שמור על לשון הגוף שלישי** — גם אם המקור בגוף ראשון.
## סוג הצד (party_role):
- appellant — עורר/ת (מי שמגיש את הערר)
- respondent — משיב/ה (הצד שכנגד, לא הוועדה)
- committee — ועדה מקומית
- permit_applicant — מבקש/ת היתר
## פלט:
החזר JSON array בלבד:
[
{
"party_role": "appellant",
"claim_text": "הטענה בגוף שלישי, בעברית",
"topic": "נושא הטענה בקצרה (3-5 מילים)"
}
]
אם אין טענות — החזר [].
"""
async def extract_claims_with_ai(
text: str,
doc_type: str = "appeal",
party_hint: str = "",
) -> list[dict]:
"""חילוץ טענות מכתב טענות באמצעות Claude.
Args:
text: טקסט המסמך
doc_type: סוג המסמך (appeal/response)
party_hint: רמז לזהות הצד (אם ידוע)
Returns:
רשימת טענות עם party_role, claim_text, topic
"""
# For very long documents, truncate but try to keep complete paragraphs
max_chars = 25000
if len(text) > max_chars:
# Find a paragraph break near the limit
cutoff = text.rfind("\n\n", 0, max_chars)
if cutoff < max_chars // 2:
cutoff = max_chars
sample = text[:cutoff]
logger.info("Document truncated from %d to %d chars", len(text), len(sample))
else:
sample = text
context = f"סוג המסמך: {doc_type}"
if party_hint:
context += f"\nהצד המגיש: {party_hint}"
client = _get_anthropic()
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=[
{
"role": "user",
"content": (
f"{EXTRACT_CLAIMS_PROMPT}\n\n"
f"{context}\n\n"
f"--- תחילת מסמך ---\n{sample}\n--- סוף מסמך ---"
),
}
],
)
raw = message.content[0].text.strip()
# Strip markdown code blocks if present
raw = re.sub(r"^```(?:json)?\s*", "", raw)
raw = re.sub(r"\s*```$", "", raw)
try:
# Extract JSON array from response
json_match = re.search(r"\[.*\]", raw, re.DOTALL)
if json_match:
claims = json.loads(json_match.group())
else:
claims = json.loads(raw)
except json.JSONDecodeError:
logger.warning("Failed to parse claims response: %s", raw[:200])
return []
if not isinstance(claims, list):
return []
# Add claim_index
for i, claim in enumerate(claims):
claim["claim_index"] = i
# Validate required fields
if "party_role" not in claim or "claim_text" not in claim:
continue
return [c for c in claims if "party_role" in c and "claim_text" in c]
# ── Regex-based extraction (from existing decisions) ──────────────
PARTY_PATTERNS = [
(r"טענות\s*העוררי[םן]|טענות\s*העורר\b|טענות\s*המבקש|טענות\s*המערער", "appellant"),
(r"עמדת\s*הוועדה\s*המקומית|עמדת\s*המשיבה|טענות\s*המשיבה|תגובת\s*המשיבה|הוועדה\s*המקומית$", "committee"),
(r"עמדת\s*המשיבי[םן]|עמדת\s*המשיב\b|טענות\s*המשיבי[םן]|טענות\s*המשיב\b", "respondent"),
(r"מבקשי\s*ההיתר|עמדת\s*מבקש|עמדת\s*היזם|מגישי\s*התכנית", "permit_applicant"),
(r"הבהרות\s*השמא|התייחסות\s*הצדדים", "appraiser"),
]
def _detect_party_role(line: str) -> str | None:
for pattern, role in PARTY_PATTERNS:
if re.search(pattern, line):
return role
return None
def extract_claims_from_block(text: str) -> list[dict]:
"""חילוץ טענות מבלוק ז של החלטה סופית (regex-based).
Replicates the logic from scripts/extract-claims.py for use as a service.
"""
lines = text.split("\n")
claims = []
current_role = "appellant"
current_claim_lines: list[str] = []
claim_index = 0
for line in lines:
stripped = line.strip()
if not stripped:
continue
role = _detect_party_role(stripped) if len(stripped.split()) <= 8 else None
if role:
if current_claim_lines:
claim_text = "\n".join(current_claim_lines).strip()
if len(claim_text) > 30:
claims.append({
"party_role": current_role,
"claim_text": claim_text,
"claim_index": claim_index,
})
claim_index += 1
current_claim_lines = []
current_role = role
continue
# Numbered sub-header starts new claim
if re.match(r"^\d+\.\s+\S.{3,40}$", stripped):
if current_claim_lines:
claim_text = "\n".join(current_claim_lines).strip()
if len(claim_text) > 30:
claims.append({
"party_role": current_role,
"claim_text": claim_text,
"claim_index": claim_index,
})
claim_index += 1
current_claim_lines = [stripped]
continue
# Each paragraph is a claim
if current_claim_lines:
claim_text = "\n".join(current_claim_lines).strip()
if len(claim_text) > 30:
claims.append({
"party_role": current_role,
"claim_text": claim_text,
"claim_index": claim_index,
})
claim_index += 1
current_claim_lines = [stripped]
# Last claim
if current_claim_lines:
claim_text = "\n".join(current_claim_lines).strip()
if len(claim_text) > 30:
claims.append({
"party_role": current_role,
"claim_text": claim_text,
"claim_index": claim_index,
})
return claims
async def extract_and_store_claims(
case_id: UUID,
document_id: UUID,
text: str,
doc_type: str = "appeal",
party_hint: str = "",
) -> dict:
"""חילוץ טענות ושמירה ב-DB.
Args:
case_id: מזהה התיק
document_id: מזהה המסמך
text: טקסט המסמך
doc_type: סוג (appeal/response)
party_hint: שם הצד המגיש
Returns:
סיכום: כמה טענות חולצו, לפי צד
"""
doc = await db.get_document(document_id)
source_name = doc["title"] if doc else str(document_id)
claims = await extract_claims_with_ai(text, doc_type, party_hint)
if not claims:
return {"status": "no_claims", "total": 0, "source": source_name}
stored = await db.store_claims(case_id, claims, source_document=source_name)
# Summarize by role
role_counts: dict[str, int] = {}
for c in claims:
role = c["party_role"]
role_counts[role] = role_counts.get(role, 0) + 1
return {
"status": "completed",
"total": stored,
"by_role": role_counts,
"source": source_name,
}