"""חילוץ טענות מכתבי טענות (ערר, תשובה) באמצעות Claude API. שתי גישות: 1. extract_claims_with_ai — חילוץ עם Claude (לכתבי טענות קלט) 2. extract_claims_from_block — חילוץ regex (מבלוק ז של החלטות סופיות) """ from __future__ import annotations import logging import re from uuid import UUID import anthropic from legal_mcp import config from legal_mcp.config import parse_llm_json from legal_mcp.services import db logger = logging.getLogger(__name__) _anthropic_client: anthropic.Anthropic | None = None def _get_anthropic() -> anthropic.Anthropic: global _anthropic_client if _anthropic_client is None: _anthropic_client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY) return _anthropic_client EXTRACT_CLAIMS_PROMPT = """אתה מנתח מסמכים משפטיים בתחום תכנון ובניה. תפקידך לחלץ טענות מכתב טענות. ## כללים חשובים: 1. **נאמנות למקור** — כל טענה חייבת לשקף את מה שנכתב, לא לפרש או להוסיף. 2. **טענה = טיעון מובחן אחד** — אם פסקה מכילה 2 טיעונים שונים, פצל לשתי טענות. 3. **כל טענה חייבת להיות מובנת בפני עצמה** — גם בלי הקשר המסמך המלא. 4. **שמור על לשון הגוף שלישי** — גם אם המקור בגוף ראשון. ## סוג הצד (party_role): - appellant — עורר/ת (מי שמגיש את הערר) - respondent — משיב/ה (הצד שכנגד, לא הוועדה) - committee — ועדה מקומית - permit_applicant — מבקש/ת היתר ## פלט: החזר JSON array בלבד — ללא markdown, ללא הסברים, רק JSON: [{"party_role": "appellant", "claim_text": "הטענה בגוף שלישי", "topic": "נושא"}] חשוב: - claim_text קצר — עד 150 מילים לכל טענה - קבץ טענות דומות לטענה אחת - אם אין טענות החזר [] """ async def extract_claims_with_ai( text: str, doc_type: str = "appeal", party_hint: str = "", ) -> list[dict]: """חילוץ טענות מכתב טענות באמצעות Claude. Args: text: טקסט המסמך doc_type: סוג המסמך (appeal/response) party_hint: רמז לזהות הצד (אם ידוע) Returns: רשימת טענות עם party_role, claim_text, topic """ context = f"סוג המסמך: {doc_type}" if party_hint: context += f"\nהצד המגיש: {party_hint}" # For very long documents, split into chunks and merge results max_chars_per_call = 25000 chunks = [] if len(text) > max_chars_per_call: # Split at paragraph boundaries pos = 0 while pos < len(text): end = min(pos + max_chars_per_call, len(text)) if end < len(text): # Find paragraph break near the limit break_pos = text.rfind("\n\n", pos, end) if break_pos > pos + max_chars_per_call // 2: end = break_pos chunks.append(text[pos:end]) pos = end logger.info("Document split into %d chunks (%d chars total)", len(chunks), len(text)) else: chunks = [text] all_claims = [] client = _get_anthropic() for i, chunk in enumerate(chunks): chunk_label = f" (חלק {i+1}/{len(chunks)})" if len(chunks) > 1 else "" message = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=8192, messages=[ { "role": "user", "content": ( f"{EXTRACT_CLAIMS_PROMPT}\n\n" f"{context}{chunk_label}\n\n" f"--- תחילת מסמך ---\n{chunk}\n--- סוף מסמך ---" ), } ], ) raw = message.content[0].text.strip() claims = parse_llm_json(raw) if claims is None: logger.warning("Failed to parse claims for chunk %d: %s", i, raw[:200]) continue if isinstance(claims, list): all_claims.extend(claims) claims = all_claims if not claims: return [] if not isinstance(claims, list): return [] # Add claim_index for i, claim in enumerate(claims): claim["claim_index"] = i # Validate required fields if "party_role" not in claim or "claim_text" not in claim: continue return [c for c in claims if "party_role" in c and "claim_text" in c] # ── Regex-based extraction (from existing decisions) ────────────── PARTY_PATTERNS = [ (r"טענות\s*העוררי[םן]|טענות\s*העורר\b|טענות\s*המבקש|טענות\s*המערער", "appellant"), (r"עמדת\s*הוועדה\s*המקומית|עמדת\s*המשיבה|טענות\s*המשיבה|תגובת\s*המשיבה|הוועדה\s*המקומית$", "committee"), (r"עמדת\s*המשיבי[םן]|עמדת\s*המשיב\b|טענות\s*המשיבי[םן]|טענות\s*המשיב\b", "respondent"), (r"מבקשי\s*ההיתר|עמדת\s*מבקש|עמדת\s*היזם|מגישי\s*התכנית", "permit_applicant"), (r"הבהרות\s*השמא|התייחסות\s*הצדדים", "appraiser"), ] def _detect_party_role(line: str) -> str | None: for pattern, role in PARTY_PATTERNS: if re.search(pattern, line): return role return None def extract_claims_from_block(text: str) -> list[dict]: """חילוץ טענות מבלוק ז של החלטה סופית (regex-based). Replicates the logic from scripts/extract-claims.py for use as a service. """ lines = text.split("\n") claims = [] current_role = "appellant" current_claim_lines: list[str] = [] claim_index = 0 for line in lines: stripped = line.strip() if not stripped: continue role = _detect_party_role(stripped) if len(stripped.split()) <= 8 else None if role: if current_claim_lines: claim_text = "\n".join(current_claim_lines).strip() if len(claim_text) > 30: claims.append({ "party_role": current_role, "claim_text": claim_text, "claim_index": claim_index, }) claim_index += 1 current_claim_lines = [] current_role = role continue # Numbered sub-header starts new claim if re.match(r"^\d+\.\s+\S.{3,40}$", stripped): if current_claim_lines: claim_text = "\n".join(current_claim_lines).strip() if len(claim_text) > 30: claims.append({ "party_role": current_role, "claim_text": claim_text, "claim_index": claim_index, }) claim_index += 1 current_claim_lines = [stripped] continue # Each paragraph is a claim if current_claim_lines: claim_text = "\n".join(current_claim_lines).strip() if len(claim_text) > 30: claims.append({ "party_role": current_role, "claim_text": claim_text, "claim_index": claim_index, }) claim_index += 1 current_claim_lines = [stripped] # Last claim if current_claim_lines: claim_text = "\n".join(current_claim_lines).strip() if len(claim_text) > 30: claims.append({ "party_role": current_role, "claim_text": claim_text, "claim_index": claim_index, }) return claims async def extract_and_store_claims( case_id: UUID, document_id: UUID, text: str, doc_type: str = "appeal", party_hint: str = "", ) -> dict: """חילוץ טענות ושמירה ב-DB. Args: case_id: מזהה התיק document_id: מזהה המסמך text: טקסט המסמך doc_type: סוג (appeal/response) party_hint: שם הצד המגיש Returns: סיכום: כמה טענות חולצו, לפי צד """ doc = await db.get_document(document_id) source_name = doc["title"] if doc else str(document_id) claims = await extract_claims_with_ai(text, doc_type, party_hint) if not claims: return {"status": "no_claims", "total": 0, "source": source_name} stored = await db.store_claims(case_id, claims, source_document=source_name) # Summarize by role role_counts: dict[str, int] = {} for c in claims: role = c["party_role"] role_counts[role] = role_counts.get(role, 0) + 1 return { "status": "completed", "total": stored, "by_role": role_counts, "source": source_name, }