Files
legal-ai/scripts/decompose-decisions.py
Chaim d5ccf03e4c Add docs, scripts, skills, commands, and taskmaster config to repo
Includes:
- docs/: architecture, block-schema, migration-plan, product-specification
- scripts/: bidi_table, decompose-decisions, extract-claims, seed-knowledge, etc.
- skill-legal-decision/: SKILL.md + references + block-schema
- skill-legal-assistant/: SKILL.md
- skill-legal-docx/: SKILL.md + references
- .claude/commands/: bidi-table skill
- .taskmaster/: task config + PRDs
- .gitignore: exclude legacy/, kiryat-yearim/, node_modules/, memory/

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 14:19:17 +00:00

324 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Decompose 6 final decisions into 12-block structure.
Uses heuristic parsing based on known section headers in Dafna's decisions.
"""
import asyncio
import json
import re
import sys
from pathlib import Path
from uuid import UUID
sys.path.insert(0, str(Path(__file__).parent.parent / "mcp-server" / "src"))
from legal_mcp.services.db import get_pool, init_schema, close_pool
# ═══════════════════════════════════════════════════════════════════
# Block definitions with detection patterns
# ═══════════════════════════════════════════════════════════════════
BLOCKS = [
{
"block_id": "block-alef",
"block_index": 1,
"title": "כותרת מוסדית",
"generation_type": "template-fill",
},
{
"block_id": "block-bet",
"block_index": 2,
"title": "הרכב הוועדה",
"generation_type": "template-fill",
},
{
"block_id": "block-gimel",
"block_index": 3,
"title": "צדדים",
"generation_type": "template-fill",
},
{
"block_id": "block-dalet",
"block_index": 4,
"title": "כותרת החלטה",
"generation_type": "template-fill",
},
{
"block_id": "block-he",
"block_index": 5,
"title": "פתיחה",
"generation_type": "paraphrase",
},
{
"block_id": "block-vav",
"block_index": 6,
"title": "רקע עובדתי",
"generation_type": "reproduction",
},
{
"block_id": "block-zayin",
"block_index": 7,
"title": "טענות הצדדים",
"generation_type": "paraphrase",
},
{
"block_id": "block-chet",
"block_index": 8,
"title": "הליכים בפני ועדת הערר",
"generation_type": "reproduction",
},
{
"block_id": "block-tet",
"block_index": 9,
"title": "תכניות חלות",
"generation_type": "guided-synthesis",
},
{
"block_id": "block-yod",
"block_index": 10,
"title": "דיון והכרעה",
"generation_type": "rhetorical-construction",
},
{
"block_id": "block-yod-alef",
"block_index": 11,
"title": "סיכום",
"generation_type": "paraphrase",
},
{
"block_id": "block-yod-bet",
"block_index": 12,
"title": "חתימות",
"generation_type": "template-fill",
},
]
# Section header patterns (Hebrew)
SECTION_PATTERNS = {
"claims": re.compile(r"תמצית\s*טענות\s*הצדדים|טענות\s*הצדדים|טענות\s*העוררי"),
"proceedings": re.compile(r"ההליכים\s*בפני\s*ועדת\s*הערר|הליכים\s*בפני\s*הוועדה|הדיון\s*בפני\s*ועדת\s*הערר"),
"plans": re.compile(r"תכניות\s*חלות|המסגרת\s*התכנונית|הוראות\s*התכנית"),
"discussion": re.compile(r"דיון\s*והכרעה|דיון|הכרעה"),
"summary": re.compile(r"^סיכום$|^סוף\s*דבר$", re.MULTILINE),
"appellant_claims": re.compile(r"טענות\s*העוררי|טענות\s*העורר"),
"respondent_claims": re.compile(r"עמדת\s*הוועדה\s*המקומית|תגובת\s*המשיבה|עמדת\s*המשיב"),
"permit_applicant": re.compile(r"עמדת\s*מבקש|עמדת\s*מגיש|עמדת\s*היזם"),
"panel": re.compile(r"בפני[:\s]|יו\"ר"),
"parties_vs": re.compile(r"\s*נגד\s*"),
"decision_title": re.compile(r"^החלטה$", re.MULTILINE),
"opening": re.compile(r"^לפנינו\s|^בפנינו\s"),
"signature": re.compile(r"ניתנה?\s*(היום|פה\s*אחד|ביום)|חתימ"),
}
def find_section_start(text: str, pattern: re.Pattern) -> int:
"""Find the character position where a section starts."""
match = pattern.search(text)
return match.start() if match else -1
def decompose_decision(text: str) -> list[dict]:
"""Parse decision text into blocks based on section headers."""
lines = text.split("\n")
total_len = len(text)
# Find key section boundaries
pos_claims = find_section_start(text, SECTION_PATTERNS["claims"])
pos_proceedings = find_section_start(text, SECTION_PATTERNS["proceedings"])
pos_plans = find_section_start(text, SECTION_PATTERNS["plans"])
pos_discussion = find_section_start(text, SECTION_PATTERNS["discussion"])
pos_summary = find_section_start(text, SECTION_PATTERNS["summary"])
pos_signature = find_section_start(text, SECTION_PATTERNS["signature"])
pos_opening = find_section_start(text, SECTION_PATTERNS["opening"])
pos_decision_title = find_section_start(text, SECTION_PATTERNS["decision_title"])
pos_panel = find_section_start(text, SECTION_PATTERNS["panel"])
pos_parties = find_section_start(text, SECTION_PATTERNS["parties_vs"])
# Build blocks based on what we found
blocks = []
# Blocks א-ד: Header area (before the opening "לפנינו")
header_end = pos_opening if pos_opening > 0 else pos_claims if pos_claims > 0 else 500
header_text = text[:header_end].strip()
# Try to split header into institutional header, panel, parties, title
if pos_panel > 0 and pos_panel < header_end:
blocks.append({"block_id": "block-alef", "content": text[:pos_panel].strip()})
if pos_parties > 0 and pos_parties < header_end:
blocks.append({"block_id": "block-bet", "content": text[pos_panel:pos_parties].strip()})
if pos_decision_title > 0 and pos_decision_title < header_end:
blocks.append({"block_id": "block-gimel", "content": text[pos_parties:pos_decision_title].strip()})
blocks.append({"block_id": "block-dalet", "content": "החלטה"})
else:
blocks.append({"block_id": "block-gimel", "content": text[pos_parties:header_end].strip()})
blocks.append({"block_id": "block-dalet", "content": "החלטה"})
else:
blocks.append({"block_id": "block-bet", "content": text[pos_panel:header_end].strip()})
blocks.append({"block_id": "block-gimel", "content": ""})
blocks.append({"block_id": "block-dalet", "content": "החלטה"})
else:
# Can't split — put everything in alef
blocks.append({"block_id": "block-alef", "content": header_text})
blocks.append({"block_id": "block-bet", "content": ""})
blocks.append({"block_id": "block-gimel", "content": ""})
blocks.append({"block_id": "block-dalet", "content": "החלטה"})
# Block ה: Opening — from "לפנינו" to claims section
if pos_opening > 0:
opening_end = pos_claims if pos_claims > pos_opening else pos_discussion if pos_discussion > pos_opening else total_len
# Opening is usually just 1-3 paragraphs
opening_text = text[pos_opening:min(pos_opening + 1000, opening_end)].strip()
# Find end of first few paragraphs
para_breaks = [i for i, c in enumerate(opening_text) if c == '\n' and i > 50]
if len(para_breaks) >= 2:
opening_text = opening_text[:para_breaks[1]].strip()
blocks.append({"block_id": "block-he", "content": opening_text})
# Block ו: Background — from after opening to claims
if pos_claims > pos_opening:
bg_start = pos_opening + len(opening_text)
blocks.append({"block_id": "block-vav", "content": text[bg_start:pos_claims].strip()})
else:
blocks.append({"block_id": "block-vav", "content": ""})
else:
blocks.append({"block_id": "block-he", "content": ""})
blocks.append({"block_id": "block-vav", "content": ""})
# Block ז: Claims
if pos_claims > 0:
claims_end = pos_proceedings if pos_proceedings > pos_claims else pos_discussion if pos_discussion > pos_claims else pos_summary if pos_summary > pos_claims else total_len
blocks.append({"block_id": "block-zayin", "content": text[pos_claims:claims_end].strip()})
else:
blocks.append({"block_id": "block-zayin", "content": ""})
# Block ח: Proceedings (optional)
if pos_proceedings > 0:
proc_end = pos_plans if pos_plans > pos_proceedings else pos_discussion if pos_discussion > pos_proceedings else pos_summary if pos_summary > pos_proceedings else total_len
blocks.append({"block_id": "block-chet", "content": text[pos_proceedings:proc_end].strip()})
else:
blocks.append({"block_id": "block-chet", "content": ""})
# Block ט: Plans (optional)
if pos_plans > 0 and pos_plans < (pos_discussion if pos_discussion > 0 else total_len):
plans_end = pos_discussion if pos_discussion > pos_plans else pos_summary if pos_summary > pos_plans else total_len
blocks.append({"block_id": "block-tet", "content": text[pos_plans:plans_end].strip()})
else:
blocks.append({"block_id": "block-tet", "content": ""})
# Block י: Discussion
if pos_discussion > 0:
disc_end = pos_summary if pos_summary > pos_discussion else pos_signature if pos_signature > pos_discussion else total_len
blocks.append({"block_id": "block-yod", "content": text[pos_discussion:disc_end].strip()})
else:
blocks.append({"block_id": "block-yod", "content": ""})
# Block יא: Summary
if pos_summary > 0:
summ_end = pos_signature if pos_signature > pos_summary else total_len
blocks.append({"block_id": "block-yod-alef", "content": text[pos_summary:summ_end].strip()})
else:
blocks.append({"block_id": "block-yod-alef", "content": ""})
# Block יב: Signatures
if pos_signature > 0:
blocks.append({"block_id": "block-yod-bet", "content": text[pos_signature:].strip()})
else:
blocks.append({"block_id": "block-yod-bet", "content": ""})
return blocks
async def main():
await init_schema()
pool = await get_pool()
async with pool.acquire() as conn:
decisions = await conn.fetch(
"""SELECT d.id as decision_id, c.case_number, c.title, d.total_words,
doc.extracted_text
FROM decisions d
JOIN cases c ON c.id = d.case_id
JOIN documents doc ON doc.case_id = d.case_id AND doc.doc_type = 'decision'
WHERE d.status = 'final'
ORDER BY c.case_number"""
)
for dec in decisions:
decision_id = dec["decision_id"]
case_number = dec["case_number"]
text = dec["extracted_text"]
total_words = len(text.split())
print(f"\n{'='*60}")
print(f"מפרק: {case_number}{dec['title']}")
print(f"{'='*60}")
# Decompose
blocks = decompose_decision(text)
# Merge with block metadata
block_data = []
for block_def in BLOCKS:
matching = [b for b in blocks if b["block_id"] == block_def["block_id"]]
content = matching[0]["content"] if matching else ""
word_count = len(content.split()) if content else 0
weight = round((word_count / total_words * 100), 2) if total_words > 0 and word_count > 0 else 0
block_data.append({
**block_def,
"content": content,
"word_count": word_count,
"weight_percent": weight,
"status": "final" if content else "empty",
})
# Print summary
for b in block_data:
status = "" if b["word_count"] > 0 else ""
print(f" {status} {b['block_id']:18s} | {b['title']:25s} | {b['word_count']:5d} מילים | {b['weight_percent']:5.1f}%")
# Store in DB
async with pool.acquire() as conn:
# Delete existing blocks for this decision
await conn.execute(
"DELETE FROM decision_blocks WHERE decision_id = $1", decision_id
)
for b in block_data:
await conn.execute(
"""INSERT INTO decision_blocks
(decision_id, block_id, block_index, title, content,
word_count, weight_percent, generation_type, status)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)""",
decision_id,
b["block_id"], b["block_index"], b["title"],
b["content"], b["word_count"], b["weight_percent"],
b["generation_type"], b["status"],
)
# Count paragraphs in discussion block
discussion = [b for b in block_data if b["block_id"] == "block-yod"][0]
if discussion["content"]:
paragraphs = [p.strip() for p in discussion["content"].split("\n") if p.strip() and len(p.strip()) > 20]
await conn.execute(
"UPDATE decisions SET total_paragraphs = $1 WHERE id = $2",
len(paragraphs), decision_id,
)
# Final summary
async with pool.acquire() as conn:
block_count = await conn.fetchval("SELECT count(*) FROM decision_blocks")
non_empty = await conn.fetchval("SELECT count(*) FROM decision_blocks WHERE status = 'final'")
await close_pool()
print(f"\n{'='*60}")
print(f"✅ סה\"כ בלוקים: {block_count} ({non_empty} עם תוכן)")
if __name__ == "__main__":
asyncio.run(main())