#!/usr/bin/env python3 """Extract individual claims from block-zayin of each decision. Identifies party sub-sections and individual claims (paragraphs). Stores in the claims table with party_role classification. """ import asyncio import json import re import sys from pathlib import Path from uuid import UUID sys.path.insert(0, str(Path(__file__).parent.parent / "mcp-server" / "src")) from legal_mcp.services.db import get_pool, init_schema, close_pool # Party role detection patterns PARTY_PATTERNS = [ # Appellants (r"טענות\s*העוררי[םן]|טענות\s*העורר\b|טענות\s*המבקש|טענות\s*המערער", "appellant"), # Respondent - local committee (r"עמדת\s*הוועדה\s*המקומית|עמדת\s*המשיבה|טענות\s*המשיבה|תגובת\s*המשיבה|הוועדה\s*המקומית$", "committee"), # Respondent - general (r"עמדת\s*המשיבי[םן]|עמדת\s*המשיב\b|טענות\s*המשיבי[םן]|טענות\s*המשיב\b", "respondent"), # Permit applicant (r"מבקשי\s*ההיתר|עמדת\s*מבקש|עמדת\s*היזם|מגישי\s*התכנית", "permit_applicant"), # Appraiser clarifications (היטל השבחה) (r"הבהרות\s*השמא|התייחסות\s*הצדדים", "appraiser"), ] def detect_party_role(line: str) -> str | None: """Detect if a line is a party section header. Returns role or None.""" for pattern, role in PARTY_PATTERNS: if re.search(pattern, line): return role return None def is_section_header(line: str) -> bool: """Check if line is a section/sub-section header (not a claim).""" line = line.strip() if not line: return False # Very short lines that are headers if len(line) < 50 and ( detect_party_role(line) is not None or re.match(r"^תמצית\s*טענות", line) or re.match(r"^[א-ת][\.\)]\s*טענות", line) or re.match(r"^[א-ת][\.\)]\s*כללי", line) or re.match(r"^\d+\.\s*$", line) # just a number ): return True return False def is_numbered_sub_header(line: str) -> bool: """Check if line is a numbered topic header within claims (e.g., '2. שיעור ההפקעה').""" return bool(re.match(r"^\d+\.\s+\S.{3,40}$", line.strip())) def extract_claims_from_block(text: str) -> list[dict]: """Extract individual claims grouped by party from block-zayin text.""" lines = text.split("\n") claims = [] current_role = "appellant" # default if no header found current_claim_lines = [] claim_index = 0 for line in lines: stripped = line.strip() if not stripped: continue # Check for party header — must be a SHORT line (header, not claim content) role = detect_party_role(stripped) if len(stripped.split()) <= 8 else None if role: # Save accumulated claim if current_claim_lines: claim_text = "\n".join(current_claim_lines).strip() if len(claim_text) > 30: claims.append({ "party_role": current_role, "claim_text": claim_text, "claim_index": claim_index, }) claim_index += 1 current_claim_lines = [] current_role = role continue # Skip generic section headers if is_section_header(stripped): # Save accumulated claim before skipping header if current_claim_lines: claim_text = "\n".join(current_claim_lines).strip() if len(claim_text) > 30: claims.append({ "party_role": current_role, "claim_text": claim_text, "claim_index": claim_index, }) claim_index += 1 current_claim_lines = [] continue # Numbered sub-header in היטל השבחה style (e.g., "2. שיעור ההפקעה") # starts a new claim if is_numbered_sub_header(stripped): if current_claim_lines: claim_text = "\n".join(current_claim_lines).strip() if len(claim_text) > 30: claims.append({ "party_role": current_role, "claim_text": claim_text, "claim_index": claim_index, }) claim_index += 1 current_claim_lines = [stripped] continue # Each substantial paragraph is a separate claim # Save previous accumulated claim first if current_claim_lines: claim_text = "\n".join(current_claim_lines).strip() if len(claim_text) > 30: claims.append({ "party_role": current_role, "claim_text": claim_text, "claim_index": claim_index, }) claim_index += 1 current_claim_lines = [stripped] # Save last claim if current_claim_lines: claim_text = "\n".join(current_claim_lines).strip() if len(claim_text) > 30: claims.append({ "party_role": current_role, "claim_text": claim_text, "claim_index": claim_index, }) return claims async def main(): await init_schema() pool = await get_pool() async with pool.acquire() as conn: # Get all block-zayin with content rows = await conn.fetch( """SELECT c.id as case_id, c.case_number, c.title, db.content FROM decision_blocks db JOIN decisions d ON d.id = db.decision_id JOIN cases c ON c.id = d.case_id WHERE db.block_id = 'block-zayin' AND db.word_count > 0 ORDER BY c.case_number""" ) total_claims = 0 for row in rows: case_id = row["case_id"] case_number = row["case_number"] text = row["content"] claims = extract_claims_from_block(text) print(f"\n{'='*50}") print(f"תיק: {case_number} — {row['title']}") print(f"{'='*50}") async with pool.acquire() as conn: # Delete existing claims for this case await conn.execute("DELETE FROM claims WHERE case_id = $1", case_id) role_counts = {} for claim in claims: role = claim["party_role"] role_counts[role] = role_counts.get(role, 0) + 1 await conn.execute( """INSERT INTO claims (case_id, party_role, claim_text, claim_index, source_document) VALUES ($1, $2, $3, $4, $5)""", case_id, claim["party_role"], claim["claim_text"], claim["claim_index"], "block-zayin", ) for role, count in sorted(role_counts.items()): role_heb = { "appellant": "עוררים", "committee": "ועדה מקומית", "respondent": "משיבים", "permit_applicant": "מבקשי היתר", "appraiser": "שמאי", }.get(role, role) print(f" {role_heb:20s} — {count} טענות") total_claims += len(claims) print(f" סה\"כ: {len(claims)} טענות") # Summary async with pool.acquire() as conn: total = await conn.fetchval("SELECT count(*) FROM claims") by_role = await conn.fetch( "SELECT party_role, count(*) as cnt FROM claims GROUP BY party_role ORDER BY cnt DESC" ) print(f"\n{'='*50}") print(f"סיכום כללי — {total} טענות מ-{len(rows)} החלטות") for r in by_role: print(f" {r['party_role']:20s} — {r['cnt']}") await close_pool() if __name__ == "__main__": asyncio.run(main())