Files
legal-ai/scripts/extract-claims.py
Chaim d5ccf03e4c Add docs, scripts, skills, commands, and taskmaster config to repo
Includes:
- docs/: architecture, block-schema, migration-plan, product-specification
- scripts/: bidi_table, decompose-decisions, extract-claims, seed-knowledge, etc.
- skill-legal-decision/: SKILL.md + references + block-schema
- skill-legal-assistant/: SKILL.md
- skill-legal-docx/: SKILL.md + references
- .claude/commands/: bidi-table skill
- .taskmaster/: task config + PRDs
- .gitignore: exclude legacy/, kiryat-yearim/, node_modules/, memory/

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 14:19:17 +00:00

229 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""Extract individual claims from block-zayin of each decision.
Identifies party sub-sections and individual claims (paragraphs).
Stores in the claims table with party_role classification.
"""
import asyncio
import json
import re
import sys
from pathlib import Path
from uuid import UUID
sys.path.insert(0, str(Path(__file__).parent.parent / "mcp-server" / "src"))
from legal_mcp.services.db import get_pool, init_schema, close_pool
# Party role detection patterns
PARTY_PATTERNS = [
# Appellants
(r"טענות\s*העוררי[םן]|טענות\s*העורר\b|טענות\s*המבקש|טענות\s*המערער", "appellant"),
# Respondent - local committee
(r"עמדת\s*הוועדה\s*המקומית|עמדת\s*המשיבה|טענות\s*המשיבה|תגובת\s*המשיבה|הוועדה\s*המקומית$", "committee"),
# Respondent - general
(r"עמדת\s*המשיבי[םן]|עמדת\s*המשיב\b|טענות\s*המשיבי[םן]|טענות\s*המשיב\b", "respondent"),
# Permit applicant
(r"מבקשי\s*ההיתר|עמדת\s*מבקש|עמדת\s*היזם|מגישי\s*התכנית", "permit_applicant"),
# Appraiser clarifications (היטל השבחה)
(r"הבהרות\s*השמא|התייחסות\s*הצדדים", "appraiser"),
]
def detect_party_role(line: str) -> str | None:
"""Detect if a line is a party section header. Returns role or None."""
for pattern, role in PARTY_PATTERNS:
if re.search(pattern, line):
return role
return None
def is_section_header(line: str) -> bool:
"""Check if line is a section/sub-section header (not a claim)."""
line = line.strip()
if not line:
return False
# Very short lines that are headers
if len(line) < 50 and (
detect_party_role(line) is not None
or re.match(r"^תמצית\s*טענות", line)
or re.match(r"^[א-ת][\.\)]\s*טענות", line)
or re.match(r"^[א-ת][\.\)]\s*כללי", line)
or re.match(r"^\d+\.\s*$", line) # just a number
):
return True
return False
def is_numbered_sub_header(line: str) -> bool:
"""Check if line is a numbered topic header within claims (e.g., '2. שיעור ההפקעה')."""
return bool(re.match(r"^\d+\.\s+\S.{3,40}$", line.strip()))
def extract_claims_from_block(text: str) -> list[dict]:
"""Extract individual claims grouped by party from block-zayin text."""
lines = text.split("\n")
claims = []
current_role = "appellant" # default if no header found
current_claim_lines = []
claim_index = 0
for line in lines:
stripped = line.strip()
if not stripped:
continue
# Check for party header — must be a SHORT line (header, not claim content)
role = detect_party_role(stripped) if len(stripped.split()) <= 8 else None
if role:
# Save accumulated claim
if current_claim_lines:
claim_text = "\n".join(current_claim_lines).strip()
if len(claim_text) > 30:
claims.append({
"party_role": current_role,
"claim_text": claim_text,
"claim_index": claim_index,
})
claim_index += 1
current_claim_lines = []
current_role = role
continue
# Skip generic section headers
if is_section_header(stripped):
# Save accumulated claim before skipping header
if current_claim_lines:
claim_text = "\n".join(current_claim_lines).strip()
if len(claim_text) > 30:
claims.append({
"party_role": current_role,
"claim_text": claim_text,
"claim_index": claim_index,
})
claim_index += 1
current_claim_lines = []
continue
# Numbered sub-header in היטל השבחה style (e.g., "2. שיעור ההפקעה")
# starts a new claim
if is_numbered_sub_header(stripped):
if current_claim_lines:
claim_text = "\n".join(current_claim_lines).strip()
if len(claim_text) > 30:
claims.append({
"party_role": current_role,
"claim_text": claim_text,
"claim_index": claim_index,
})
claim_index += 1
current_claim_lines = [stripped]
continue
# Each substantial paragraph is a separate claim
# Save previous accumulated claim first
if current_claim_lines:
claim_text = "\n".join(current_claim_lines).strip()
if len(claim_text) > 30:
claims.append({
"party_role": current_role,
"claim_text": claim_text,
"claim_index": claim_index,
})
claim_index += 1
current_claim_lines = [stripped]
# Save last claim
if current_claim_lines:
claim_text = "\n".join(current_claim_lines).strip()
if len(claim_text) > 30:
claims.append({
"party_role": current_role,
"claim_text": claim_text,
"claim_index": claim_index,
})
return claims
async def main():
await init_schema()
pool = await get_pool()
async with pool.acquire() as conn:
# Get all block-zayin with content
rows = await conn.fetch(
"""SELECT c.id as case_id, c.case_number, c.title,
db.content
FROM decision_blocks db
JOIN decisions d ON d.id = db.decision_id
JOIN cases c ON c.id = d.case_id
WHERE db.block_id = 'block-zayin' AND db.word_count > 0
ORDER BY c.case_number"""
)
total_claims = 0
for row in rows:
case_id = row["case_id"]
case_number = row["case_number"]
text = row["content"]
claims = extract_claims_from_block(text)
print(f"\n{'='*50}")
print(f"תיק: {case_number}{row['title']}")
print(f"{'='*50}")
async with pool.acquire() as conn:
# Delete existing claims for this case
await conn.execute("DELETE FROM claims WHERE case_id = $1", case_id)
role_counts = {}
for claim in claims:
role = claim["party_role"]
role_counts[role] = role_counts.get(role, 0) + 1
await conn.execute(
"""INSERT INTO claims (case_id, party_role, claim_text, claim_index, source_document)
VALUES ($1, $2, $3, $4, $5)""",
case_id,
claim["party_role"],
claim["claim_text"],
claim["claim_index"],
"block-zayin",
)
for role, count in sorted(role_counts.items()):
role_heb = {
"appellant": "עוררים",
"committee": "ועדה מקומית",
"respondent": "משיבים",
"permit_applicant": "מבקשי היתר",
"appraiser": "שמאי",
}.get(role, role)
print(f" {role_heb:20s}{count} טענות")
total_claims += len(claims)
print(f" סה\"כ: {len(claims)} טענות")
# Summary
async with pool.acquire() as conn:
total = await conn.fetchval("SELECT count(*) FROM claims")
by_role = await conn.fetch(
"SELECT party_role, count(*) as cnt FROM claims GROUP BY party_role ORDER BY cnt DESC"
)
print(f"\n{'='*50}")
print(f"סיכום כללי — {total} טענות מ-{len(rows)} החלטות")
for r in by_role:
print(f" {r['party_role']:20s}{r['cnt']}")
await close_pool()
if __name__ == "__main__":
asyncio.run(main())