Files
legal-ai/scripts/decompose-decisions-v2.py
Chaim d5ccf03e4c Add docs, scripts, skills, commands, and taskmaster config to repo
Includes:
- docs/: architecture, block-schema, migration-plan, product-specification
- scripts/: bidi_table, decompose-decisions, extract-claims, seed-knowledge, etc.
- skill-legal-decision/: SKILL.md + references + block-schema
- skill-legal-assistant/: SKILL.md
- skill-legal-docx/: SKILL.md + references
- .claude/commands/: bidi-table skill
- .taskmaster/: task config + PRDs
- .gitignore: exclude legacy/, kiryat-yearim/, node_modules/, memory/

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 14:19:17 +00:00

290 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Decompose final decisions into 12-block structure — V2 calibrated on הכט.
Key insight: DOCX extraction strips header blocks (א-ד). The real content
starts at block ה (opening "לפנינו"). We identify blocks by known section
headers and line-by-line analysis.
"""
import asyncio
import json
import re
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "mcp-server" / "src"))
from legal_mcp.services.db import get_pool, init_schema, close_pool
BLOCK_DEFS = [
("block-alef", 1, "כותרת מוסדית", "template-fill"),
("block-bet", 2, "הרכב הוועדה", "template-fill"),
("block-gimel", 3, "צדדים", "template-fill"),
("block-dalet", 4, "כותרת החלטה", "template-fill"),
("block-he", 5, "פתיחה", "paraphrase"),
("block-vav", 6, "רקע עובדתי", "reproduction"),
("block-zayin", 7, "טענות הצדדים", "paraphrase"),
("block-chet", 8, "הליכים בפני ועדת הערר", "reproduction"),
("block-tet", 9, "תכניות חלות", "guided-synthesis"),
("block-yod", 10, "דיון והכרעה", "rhetorical-construction"),
("block-yod-alef", 11, "סיכום", "paraphrase"),
("block-yod-bet", 12, "חתימות", "template-fill"),
]
def find_line(lines: list[str], pattern: str, start: int = 0) -> int:
"""Find first line matching pattern (substring or regex). Returns -1 if not found."""
pat = re.compile(pattern)
for i in range(start, len(lines)):
if pat.search(lines[i]):
return i
return -1
def slice_text(lines: list[str], start: int, end: int) -> str:
"""Join lines[start:end] into text."""
if start < 0 or end <= start:
return ""
return "\n".join(lines[start:end]).strip()
def count_words(text: str) -> int:
return len(text.split()) if text else 0
def decompose(text: str) -> dict[str, str]:
"""Parse decision into blocks. Returns {block_id: content}."""
lines = text.split("\n")
n = len(lines)
blocks = {}
# Find key section headers
# Style 1: רישוי — descriptive headers ("תמצית טענות הצדדים", "דיון והכרעה")
# Style 2: היטל השבחה — numbered headers ("א. רקע עובדתי", "ו. דיון והכרעה")
opening = find_line(lines, r"^לפנינו\s|^בפנינו\s|^בפני\s*ועדת|^בפני\s*בקשה")
claims = find_line(lines, r"תמצית\s*טענות|טענות\s*הצדדים|טענות\s*העוררי")
if claims == -1:
claims = find_line(lines, r"^טענות\s*העוררי")
if claims == -1:
# היטל השבחה style: "ב. טענות העורר"
claims = find_line(lines, r"^[א-ת][\.\)]\s*טענות")
background = find_line(lines, r"^[א-ת][\.\)]\s*רקע\s*עובדתי")
proceedings = find_line(lines, r"ההליכים\s*בפני|הליכים\s*בפני|הדיון\s*בפני\s*ועדת\s*הערר")
if proceedings == -1:
# היטל השבחה: "ד. הבהרות השמאית" or similar procedural sections
proceedings = find_line(lines, r"^[א-ת][\.\)]\s*הבהרות|^[א-ת][\.\)]\s*ההליך")
plans = find_line(lines, r"תכניות\s*חלות|המסגרת\s*הנורמטיבית|הוראות\s*התכנית")
if plans == -1:
plans = find_line(lines, r"^[א-ת][\.\)]\s*המסגרת\s*הנורמטיבית")
discussion = find_line(lines, r"^דיון\s*והכרעה|^דיון$|^הכרעה$")
if discussion == -1:
discussion = find_line(lines, r"^[א-ת][\.\)]\s*דיון\s*והכרעה")
summary = find_line(lines, r"^סיכום\s*$|^סוף\s*דבר\s*$")
if summary == -1:
summary = find_line(lines, r"^[א-ת][\.\)]\s*סיכום")
signature = find_line(lines, r"^ניתנה?\s*(היום|פה\s*אחד|ביום)")
# If no explicit discussion header, look for the opening formula
if discussion == -1:
discussion = find_line(lines, r"לאחר\s*שבחנו\s*את\s*טענות")
# ── Header blocks (א-ד): everything before opening ──
if opening >= 0:
header_text = slice_text(lines, 0, opening)
if header_text:
# Try to split header, but usually DOCX extraction loses these
blocks["block-alef"] = header_text
else:
blocks["block-alef"] = ""
else:
blocks["block-alef"] = ""
blocks["block-bet"] = "" # Usually lost in extraction
blocks["block-gimel"] = ""
blocks["block-dalet"] = "החלטה"
# ── Block ה: Opening — first 1-3 paragraphs from "לפנינו" ──
if opening >= 0:
next_section = claims if claims > opening else discussion if discussion > opening else n
opening_end = opening + 1
for i in range(opening + 1, min(opening + 5, next_section)):
line = lines[i].strip()
if not line:
break
opening_end = i + 1
blocks["block-he"] = slice_text(lines, opening, opening_end)
else:
blocks["block-he"] = ""
# ── Block ו: Background ──
# Style 1 (רישוי): after opening, before claims
# Style 2 (היטל השבחה): explicit "א. רקע עובדתי" section
if background >= 0:
# Explicit background header (היטל השבחה style)
bg_end = claims if claims > background else (proceedings if proceedings > background else (discussion if discussion > background else n))
blocks["block-vav"] = slice_text(lines, background, bg_end)
# In this case, opening (ה) might not exist — "לפנינו" may be absent
elif opening >= 0 and claims > opening:
bg_start = opening + 1
he_lines = count_words(blocks.get("block-he", ""))
if he_lines > 0:
he_end = opening
for i in range(opening, min(opening + 5, claims)):
if lines[i].strip():
he_end = i + 1
else:
break
bg_start = he_end
blocks["block-vav"] = slice_text(lines, bg_start, claims)
elif opening >= 0 and discussion > opening:
blocks["block-vav"] = slice_text(lines, opening + 1, discussion)
else:
blocks["block-vav"] = ""
# ── Block ז: Claims — from claims header to next section ──
if claims >= 0:
claims_end = min(
x for x in [proceedings, plans, discussion, summary, n]
if x > claims
)
blocks["block-zayin"] = slice_text(lines, claims, claims_end)
else:
blocks["block-zayin"] = ""
# ── Block ח: Proceedings (optional) ──
if proceedings >= 0:
proc_end = min(
x for x in [plans, discussion, summary, n]
if x > proceedings
)
blocks["block-chet"] = slice_text(lines, proceedings, proc_end)
else:
blocks["block-chet"] = ""
# ── Block ט: Plans (optional) ──
if plans >= 0 and (discussion == -1 or plans < discussion):
plans_end = min(
x for x in [discussion, summary, n]
if x > plans
)
blocks["block-tet"] = slice_text(lines, plans, plans_end)
else:
blocks["block-tet"] = ""
# ── Block י: Discussion ──
if discussion >= 0:
disc_end = summary if summary > discussion else (signature if signature > discussion else n)
blocks["block-yod"] = slice_text(lines, discussion, disc_end)
else:
blocks["block-yod"] = ""
# ── Block יא: Summary ──
if summary >= 0:
summ_end = signature if signature > summary else n
blocks["block-yod-alef"] = slice_text(lines, summary, summ_end)
else:
blocks["block-yod-alef"] = ""
# ── Block יב: Signatures ──
if signature >= 0:
blocks["block-yod-bet"] = slice_text(lines, signature, n)
else:
blocks["block-yod-bet"] = ""
return blocks
async def main():
await init_schema()
pool = await get_pool()
async with pool.acquire() as conn:
decisions = await conn.fetch(
"""SELECT d.id as decision_id, c.case_number, c.title,
doc.extracted_text
FROM decisions d
JOIN cases c ON c.id = d.case_id
JOIN documents doc ON doc.case_id = d.case_id AND doc.doc_type = 'decision'
WHERE d.status = 'final'
ORDER BY c.case_number"""
)
for dec in decisions:
decision_id = dec["decision_id"]
case_number = dec["case_number"]
text = dec["extracted_text"]
total_words = count_words(text)
print(f"\n{'='*60}")
print(f"מפרק: {case_number}{dec['title']}")
print(f"סה\"כ מילים: {total_words}")
print(f"{'='*60}")
parsed = decompose(text)
async with pool.acquire() as conn:
# Delete existing blocks
await conn.execute(
"DELETE FROM decision_blocks WHERE decision_id = $1", decision_id
)
total_parsed_words = 0
for block_id, block_index, title, gen_type in BLOCK_DEFS:
content = parsed.get(block_id, "")
wc = count_words(content)
weight = round(wc / total_words * 100, 1) if total_words > 0 and wc > 0 else 0
status = "final" if wc > 0 else "empty"
total_parsed_words += wc
await conn.execute(
"""INSERT INTO decision_blocks
(decision_id, block_id, block_index, title, content,
word_count, weight_percent, generation_type, status)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)""",
decision_id, block_id, block_index, title,
content, wc, weight, gen_type, status,
)
marker = "" if wc > 0 else ""
print(f" {marker} {block_id:18s} | {title:25s} | {wc:5d} מילים | {weight:5.1f}%")
# Update decision totals
disc_words = count_words(parsed.get("block-yod", ""))
disc_paras = len([p for p in parsed.get("block-yod", "").split("\n") if p.strip() and len(p.strip()) > 20])
await conn.execute(
"UPDATE decisions SET total_words = $1, total_paragraphs = $2, updated_at = now() WHERE id = $3",
total_words, disc_paras, decision_id,
)
coverage = round(total_parsed_words / total_words * 100, 1) if total_words > 0 else 0
print(f" --- כיסוי: {total_parsed_words}/{total_words} מילים ({coverage}%)")
# Summary
async with pool.acquire() as conn:
stats = await conn.fetch(
"""SELECT block_id, count(*) as decisions,
avg(word_count) as avg_words,
avg(weight_percent) as avg_weight
FROM decision_blocks
WHERE word_count > 0
GROUP BY block_id ORDER BY block_id"""
)
print(f"\n{'='*60}")
print("סטטיסטיקה לפי בלוק (רק בלוקים עם תוכן):")
for s in stats:
print(f" {s['block_id']:18s} | {s['decisions']} החלטות | ממוצע {s['avg_words']:.0f} מילים | {s['avg_weight']:.1f}%")
await close_pool()
if __name__ == "__main__":
asyncio.run(main())