Legal documents have 3 types of assertions: - claim: from appeal documents (כתב ערר) - response: from original responses (כתב תשובה) - reply: from supplementary responses (תגובה, השלמת טיעון) DB: added claim_type column to claims table Extractor: _infer_claim_type() auto-detects from doc_type + title Updated existing 113 records: 29 claims, 28 responses, 56 replies Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
270 lines
9.3 KiB
Python
270 lines
9.3 KiB
Python
"""חילוץ טענות מכתבי טענות (ערר, תשובה) באמצעות Claude Code session.
|
|
|
|
שתי גישות:
|
|
1. extract_claims_with_ai — חילוץ עם Claude Code headless (לכתבי טענות קלט)
|
|
2. extract_claims_from_block — חילוץ regex (מבלוק ז של החלטות סופיות)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
from uuid import UUID
|
|
|
|
from legal_mcp import config
|
|
from legal_mcp.config import parse_llm_json
|
|
from legal_mcp.services import db, claude_session
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
EXTRACT_CLAIMS_PROMPT = """אתה מנתח מסמכים משפטיים בתחום תכנון ובניה. תפקידך לחלץ טענות מכתב טענות.
|
|
|
|
## כללים חשובים:
|
|
1. **נאמנות למקור** — כל טענה חייבת לשקף את מה שנכתב, לא לפרש או להוסיף.
|
|
2. **טענה = טיעון מובחן אחד** — אם פסקה מכילה 2 טיעונים שונים, פצל לשתי טענות.
|
|
3. **כל טענה חייבת להיות מובנת בפני עצמה** — גם בלי הקשר המסמך המלא.
|
|
4. **שמור על לשון הגוף שלישי** — גם אם המקור בגוף ראשון.
|
|
|
|
## סוג הצד (party_role):
|
|
- appellant — עורר/ת (מי שמגיש את הערר)
|
|
- respondent — משיב/ה (הצד שכנגד, לא הוועדה)
|
|
- committee — ועדה מקומית
|
|
- permit_applicant — מבקש/ת היתר
|
|
|
|
## פלט:
|
|
החזר JSON array בלבד — ללא markdown, ללא הסברים, רק JSON:
|
|
[{"party_role": "appellant", "claim_text": "הטענה בגוף שלישי", "topic": "נושא"}]
|
|
|
|
חשוב:
|
|
- claim_text קצר — עד 150 מילים לכל טענה
|
|
- קבץ טענות דומות לטענה אחת
|
|
- אם אין טענות החזר []
|
|
"""
|
|
|
|
|
|
async def extract_claims_with_ai(
|
|
text: str,
|
|
doc_type: str = "appeal",
|
|
party_hint: str = "",
|
|
) -> list[dict]:
|
|
"""חילוץ טענות מכתב טענות באמצעות Claude.
|
|
|
|
Args:
|
|
text: טקסט המסמך
|
|
doc_type: סוג המסמך (appeal/response)
|
|
party_hint: רמז לזהות הצד (אם ידוע)
|
|
|
|
Returns:
|
|
רשימת טענות עם party_role, claim_text, topic
|
|
"""
|
|
context = f"סוג המסמך: {doc_type}"
|
|
if party_hint:
|
|
context += f"\nהצד המגיש: {party_hint}"
|
|
|
|
# For very long documents, split into chunks and merge results
|
|
max_chars_per_call = 25000
|
|
chunks = []
|
|
if len(text) > max_chars_per_call:
|
|
# Split at paragraph boundaries
|
|
pos = 0
|
|
while pos < len(text):
|
|
end = min(pos + max_chars_per_call, len(text))
|
|
if end < len(text):
|
|
# Find paragraph break near the limit
|
|
break_pos = text.rfind("\n\n", pos, end)
|
|
if break_pos > pos + max_chars_per_call // 2:
|
|
end = break_pos
|
|
chunks.append(text[pos:end])
|
|
pos = end
|
|
logger.info("Document split into %d chunks (%d chars total)", len(chunks), len(text))
|
|
else:
|
|
chunks = [text]
|
|
|
|
all_claims = []
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
chunk_label = f" (חלק {i+1}/{len(chunks)})" if len(chunks) > 1 else ""
|
|
prompt = (
|
|
f"{EXTRACT_CLAIMS_PROMPT}\n\n"
|
|
f"{context}{chunk_label}\n\n"
|
|
f"--- תחילת מסמך ---\n{chunk}\n--- סוף מסמך ---"
|
|
)
|
|
claims = claude_session.query_json(prompt, timeout=120)
|
|
if claims is None:
|
|
logger.warning("Failed to parse claims for chunk %d: %s", i, raw[:200])
|
|
continue
|
|
if isinstance(claims, list):
|
|
all_claims.extend(claims)
|
|
|
|
claims = all_claims
|
|
if not claims:
|
|
return []
|
|
|
|
if not isinstance(claims, list):
|
|
return []
|
|
|
|
# Add claim_index
|
|
for i, claim in enumerate(claims):
|
|
claim["claim_index"] = i
|
|
# Validate required fields
|
|
if "party_role" not in claim or "claim_text" not in claim:
|
|
continue
|
|
|
|
return [c for c in claims if "party_role" in c and "claim_text" in c]
|
|
|
|
|
|
def _infer_claim_type(doc_type: str, source_name: str) -> str:
|
|
"""Determine claim_type from document type and title.
|
|
|
|
- 'claim' = from appeal documents (כתב ערר)
|
|
- 'response' = from original response documents (כתב תשובה)
|
|
- 'reply' = from supplementary responses (תגובה, השלמת טיעון)
|
|
"""
|
|
name_lower = source_name.lower() if source_name else ""
|
|
if doc_type == "appeal" or "כתב ערר" in name_lower:
|
|
return "claim"
|
|
if "כתב תשובה" in name_lower:
|
|
return "response"
|
|
if any(kw in name_lower for kw in ["תגובת", "השלמת טיעון", "תגובה"]):
|
|
return "reply"
|
|
if doc_type == "response":
|
|
return "response"
|
|
return "claim"
|
|
|
|
|
|
# ── Regex-based extraction (from existing decisions) ──────────────
|
|
|
|
PARTY_PATTERNS = [
|
|
(r"טענות\s*העוררי[םן]|טענות\s*העורר\b|טענות\s*המבקש|טענות\s*המערער", "appellant"),
|
|
(r"עמדת\s*הוועדה\s*המקומית|עמדת\s*המשיבה|טענות\s*המשיבה|תגובת\s*המשיבה|הוועדה\s*המקומית$", "committee"),
|
|
(r"עמדת\s*המשיבי[םן]|עמדת\s*המשיב\b|טענות\s*המשיבי[םן]|טענות\s*המשיב\b", "respondent"),
|
|
(r"מבקשי\s*ההיתר|עמדת\s*מבקש|עמדת\s*היזם|מגישי\s*התכנית", "permit_applicant"),
|
|
(r"הבהרות\s*השמא|התייחסות\s*הצדדים", "appraiser"),
|
|
]
|
|
|
|
|
|
def _detect_party_role(line: str) -> str | None:
|
|
for pattern, role in PARTY_PATTERNS:
|
|
if re.search(pattern, line):
|
|
return role
|
|
return None
|
|
|
|
|
|
def extract_claims_from_block(text: str) -> list[dict]:
|
|
"""חילוץ טענות מבלוק ז של החלטה סופית (regex-based).
|
|
|
|
Replicates the logic from scripts/extract-claims.py for use as a service.
|
|
"""
|
|
lines = text.split("\n")
|
|
claims = []
|
|
current_role = "appellant"
|
|
current_claim_lines: list[str] = []
|
|
claim_index = 0
|
|
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
continue
|
|
|
|
role = _detect_party_role(stripped) if len(stripped.split()) <= 8 else None
|
|
if role:
|
|
if current_claim_lines:
|
|
claim_text = "\n".join(current_claim_lines).strip()
|
|
if len(claim_text) > 30:
|
|
claims.append({
|
|
"party_role": current_role,
|
|
"claim_text": claim_text,
|
|
"claim_index": claim_index,
|
|
})
|
|
claim_index += 1
|
|
current_claim_lines = []
|
|
current_role = role
|
|
continue
|
|
|
|
# Numbered sub-header starts new claim
|
|
if re.match(r"^\d+\.\s+\S.{3,40}$", stripped):
|
|
if current_claim_lines:
|
|
claim_text = "\n".join(current_claim_lines).strip()
|
|
if len(claim_text) > 30:
|
|
claims.append({
|
|
"party_role": current_role,
|
|
"claim_text": claim_text,
|
|
"claim_index": claim_index,
|
|
})
|
|
claim_index += 1
|
|
current_claim_lines = [stripped]
|
|
continue
|
|
|
|
# Each paragraph is a claim
|
|
if current_claim_lines:
|
|
claim_text = "\n".join(current_claim_lines).strip()
|
|
if len(claim_text) > 30:
|
|
claims.append({
|
|
"party_role": current_role,
|
|
"claim_text": claim_text,
|
|
"claim_index": claim_index,
|
|
})
|
|
claim_index += 1
|
|
current_claim_lines = [stripped]
|
|
|
|
# Last claim
|
|
if current_claim_lines:
|
|
claim_text = "\n".join(current_claim_lines).strip()
|
|
if len(claim_text) > 30:
|
|
claims.append({
|
|
"party_role": current_role,
|
|
"claim_text": claim_text,
|
|
"claim_index": claim_index,
|
|
})
|
|
|
|
return claims
|
|
|
|
|
|
async def extract_and_store_claims(
|
|
case_id: UUID,
|
|
document_id: UUID,
|
|
text: str,
|
|
doc_type: str = "appeal",
|
|
party_hint: str = "",
|
|
) -> dict:
|
|
"""חילוץ טענות ושמירה ב-DB.
|
|
|
|
Args:
|
|
case_id: מזהה התיק
|
|
document_id: מזהה המסמך
|
|
text: טקסט המסמך
|
|
doc_type: סוג (appeal/response)
|
|
party_hint: שם הצד המגיש
|
|
|
|
Returns:
|
|
סיכום: כמה טענות חולצו, לפי צד
|
|
"""
|
|
doc = await db.get_document(document_id)
|
|
source_name = doc["title"] if doc else str(document_id)
|
|
|
|
claims = await extract_claims_with_ai(text, doc_type, party_hint)
|
|
|
|
if not claims:
|
|
return {"status": "no_claims", "total": 0, "source": source_name}
|
|
|
|
# Determine claim_type from document type and title
|
|
claim_type = _infer_claim_type(doc_type, source_name)
|
|
for c in claims:
|
|
c["claim_type"] = claim_type
|
|
|
|
stored = await db.store_claims(case_id, claims, source_document=source_name)
|
|
|
|
# Summarize by role
|
|
role_counts: dict[str, int] = {}
|
|
for c in claims:
|
|
role = c["party_role"]
|
|
role_counts[role] = role_counts.get(role, 0) + 1
|
|
|
|
return {
|
|
"status": "completed",
|
|
"total": stored,
|
|
"by_role": role_counts,
|
|
"source": source_name,
|
|
}
|