Files
legal-ai/mcp-server/src/legal_mcp/services/docx_retrofit.py
Chaim 36ca713dfa
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 6s
Retrofit: tighten yod-bet pattern, add cover-block fallback
The "על כן" pattern for block-yod-bet was too greedy and matched mid-discussion
transitional sentences (e.g. "על כן, במקום בו..."), which caused forward-scan
to skip block-yod-alef ("סוף דבר") via the pointer advance.

Tightened to require an operative subject (אנו / הערר / הוועדה / ועדת הערר)
so terminal "על כן, אנו מחליטים" still matches but mid-block transitions don't.

Added structural_fallback for cover blocks (alef/bet/gimel/dalet) — these are
template metadata not present in user-edited DOCX bodies. Inject zero-content
anchors so apply_user_edit can still target them later. The frontend toast
distinguishes real content gaps from fallback anchors.

Also expanded heading patterns based on training corpus inspection:
- block-vav: על המקרקעין חלות / במצב התכנוני / התכניות החלות
- block-zayin: טענות העוררת
- block-chet: עיקר תגובת המשיב
- block-tet: הדיון בוועדת הערר

For case 1130-25, this raises detection from 6/12 to 11/12 blocks — only
block-yod-bet remains missing (Daphna's edit ends at "סוף דבר" + numbered
ruling, no terminal "ההחלטה" or "על כן אנו מחליטים" paragraph).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 06:57:41 +00:00

337 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""הזרקת bookmarks רטרואקטיבית ל-DOCX שלא נוצרו ע"י ה-exporter.
כאשר משתמש מעלה `עריכה-v*.docx` שנערך ב-Word מחוץ למערכת, אין בו את ה-
bookmarks שאנו מצפים להם (block-alef ... block-yod-bet). השירות כאן
מזהה את תחילת כל בלוק לפי סימני הפתיחה העבריים (א., ב., ... יב.) ב-
הפסקאות הראשונות שלו, ומזריק bookmarkStart/bookmarkEnd בהתאם.
נעשה בצורה defensive — אם לא מצליחים לזהות בלוק, הוא פשוט לא יקבל
bookmark (`missing_blocks` בתוצאה). השרת אמור להתריע למשתמש.
"""
from __future__ import annotations
import logging
import re
import shutil
import zipfile
from io import BytesIO
from pathlib import Path
from lxml import etree
from legal_mcp.services.docx_reviser import (
NSMAP,
_load_docx_xml,
_save_docx_xml,
_w,
)
logger = logging.getLogger(__name__)
# ── Block identification ──────────────────────────────────────────
# The 12 blocks in order, with their Hebrew letter marker
BLOCK_ORDER = [
("block-alef", "א"),
("block-bet", "ב"),
("block-gimel", "ג"),
("block-dalet", "ד"),
("block-heh", "ה"),
("block-vav", "ו"),
("block-zayin", "ז"),
("block-chet", "ח"),
("block-tet", "ט"),
("block-yod", "י"),
("block-yod-alef", "יא"),
("block-yod-bet", "יב"),
]
# Regex matching a paragraph that begins with a Hebrew block marker
# followed by '.', ')', ' ', or end-of-string. The marker must be followed
# either by whitespace/punctuation or end of text to avoid matching longer
# words that happen to start with these letters.
_BLOCK_MARKERS_BY_LETTER: dict[str, str] = {letter: name for name, letter in BLOCK_ORDER}
# Longer markers (יא, יב) first so regex matches them before falling back to 'י'
_MARKER_ALTERNATION = "|".join(
re.escape(letter)
for letter in sorted(_BLOCK_MARKERS_BY_LETTER, key=len, reverse=True)
)
_BLOCK_MARKER_RE = re.compile(
rf"^\s*({_MARKER_ALTERNATION})\s*[\.\)\-]\s*"
)
# Secondary heuristic: Hebrew section headings that reliably mark the
# start of each block in the Daphna Tamir style (used when markers
# "א.", "ב." etc. are missing — common in user-edited Word files).
#
# Key observations from the 12-block schema:
# block-alef: "בפני: דפנה תמיר" or decision number page
# block-bet: "ערר מספר" line
# block-gimel: appellants vs respondents (parties)
# block-dalet: bold "החלטה" centered
# block-heh: "רקע" / "רקע עובדתי" / "פתח דבר"
# block-vav: "תכניות חלות" / "ההליך שבפנינו" / "ההליכים בפני"
# block-zayin: "תמצית טענות" / "טענות הצדדים"
# block-chet: "תגובת המשיבה" / "עמדת הוועדה"
# block-tet: "ההליכים בפני ועדת הערר" / "הדיון בפנינו"
# block-yod: "דיון והכרעה" / "דיון"
# block-yod-alef: "סוף דבר" / "סיכום"
# block-yod-bet: "ההחלטה" (signature / closing block)
_BLOCK_HEADING_PATTERNS: list[tuple[str, list[str]]] = [
("block-alef", [r"בפני[:\s]", r"ועדת הערר"]),
("block-bet", [r"^ערר\s+מספר", r"^ערר\s+\d"]),
("block-gimel", [r"^נגד\s*$", r"^—\s*נגד\s*—"]),
("block-dalet", [r"^החלטה\s*$"]),
("block-heh", [r"^רקע\s*$", r"^רקע\s+עובדתי", r"^פתח\s+דבר"]),
("block-vav", [
r"^תכניות\s+חלות",
r"^ההליכים?\s+שבפנינו",
r"^ההליכים?\s+בפני\s+הוועדה\s+המקומית",
r"^על\s+המקרקעין\s+חלות",
r"^התכניות?\s+החלות",
r"^במצב\s+התכנוני",
]),
("block-zayin", [
r"^תמצית\s+טענות",
r"^טענות\s+הצדדים",
r"^טענות\s+העוררי",
r"^טענות\s+העוררת",
]),
("block-chet", [
r"^תגובת\s+המשיב",
r"^עמדת\s+הוועדה\s+המקומית",
r"^תשובת",
r"^עיקר\s+תגובת\s+המשיב",
]),
("block-tet", [
r"^ההליכים?\s+בפני\s+ועדת\s+הערר",
r"^הדיון\s+בפנינו",
r"^הדיון\s+בוועדת\s+הערר",
]),
("block-yod", [r"^דיון\s+והכרעה", r"^דיון\s*$", r"^ההכרעה"]),
("block-yod-alef", [r"^סוף\s+דבר", r"^סיכום\s*$"]),
# block-yod-bet "על כן" must be operative — paired with אנו/הערר/הוועדה.
# Loose `^על כן` alone matches mid-discussion transitions ("על כן, במקום בו...")
# and steals the bookmark from block-yod-alef via forward-scan.
("block-yod-bet", [
r"^ההחלטה\s*$",
r"^על\s+כן[,\.\s]+(?:אנו|הערר|הוועדה|ועדת\s+הערר)\b",
]),
]
_COMPILED_HEADING_PATTERNS: list[tuple[str, list[re.Pattern[str]]]] = [
(name, [re.compile(p) for p in patterns])
for name, patterns in _BLOCK_HEADING_PATTERNS
]
def _paragraph_text(p: etree._Element) -> str:
"""Return the full text of a paragraph, joining all w:t nodes."""
return "".join(p.itertext()).strip()
def _detect_block_starts(
paragraphs: list[etree._Element],
) -> dict[str, int]:
"""Return a mapping of block_name → paragraph index (start of that block).
Uses a greedy scan: for each paragraph, if its text starts with an
expected block marker and the block hasn't been assigned yet, assign
this paragraph as the block's start.
"""
found: dict[str, int] = {}
expected_order = [name for name, _ in BLOCK_ORDER]
pointer = 0 # index into expected_order — next expected block
for i, p in enumerate(paragraphs):
text = _paragraph_text(p)
if not text:
continue
matched_name: str | None = None
# Try marker-based (א., ב., ...) first
m = _BLOCK_MARKER_RE.match(text)
if m:
letter = m.group(1)
matched_name = _BLOCK_MARKERS_BY_LETTER.get(letter)
# Fall back to heading-keyword heuristic (Daphna style)
if matched_name is None:
for name, patterns in _COMPILED_HEADING_PATTERNS:
if name in found:
continue
# Only check patterns for blocks we haven't assigned yet
# AND that come at/after the current pointer — to keep the
# greedy forward-scan semantics consistent with markers.
if expected_order.index(name) < pointer:
continue
if any(pat.search(text) for pat in patterns):
matched_name = name
break
if matched_name is None:
continue
if matched_name in found:
continue
if pointer >= len(expected_order):
continue
name_idx_in_order = expected_order.index(matched_name)
if name_idx_in_order >= pointer:
found[matched_name] = i
pointer = name_idx_in_order + 1
return found
def _insert_bookmark_around_range(
body: etree._Element,
paragraphs: list[etree._Element],
start_idx: int,
end_idx: int,
name: str,
bm_id: int,
) -> None:
"""Insert bookmarkStart at the start of paragraph start_idx and
bookmarkEnd at the end of paragraph end_idx."""
start_el = etree.Element(_w("bookmarkStart"))
start_el.set(_w("id"), str(bm_id))
start_el.set(_w("name"), name)
end_el = etree.Element(_w("bookmarkEnd"))
end_el.set(_w("id"), str(bm_id))
start_p = paragraphs[start_idx]
end_p = paragraphs[end_idx]
start_p.insert(0, start_el)
end_p.append(end_el)
def _next_bookmark_id(doc_tree: etree._Element) -> int:
"""Find max existing bookmark id and return next unused."""
max_id = 9999
for el in doc_tree.iterfind(".//w:bookmarkStart", NSMAP):
wid = el.get(_w("id"))
if wid:
try:
max_id = max(max_id, int(wid))
except ValueError:
pass
return max_id + 1
# ── Public API ────────────────────────────────────────────────────
def retrofit_bookmarks(
docx_path: str | Path,
*,
output_path: str | Path | None = None,
backup: bool = True,
) -> dict:
"""Inject block-* bookmarks into an existing DOCX via heuristic detection.
Args:
docx_path: path to DOCX file (modified in place unless output_path set).
output_path: if given, write to this path instead of overwriting.
backup: if True and writing in place, save the original as
`<path>.pre-retrofit.docx` first.
Returns:
{
'bookmarks_added': ['block-alef', ...],
'missing_blocks': ['block-dalet', ...],
'existing_bookmarks': [...] # bookmarks already on the doc
}
"""
docx_path = Path(docx_path)
if not docx_path.exists():
raise FileNotFoundError(str(docx_path))
if output_path is None:
output_path = docx_path
output_path = Path(output_path)
members, doc_tree, settings_tree = _load_docx_xml(docx_path)
# Existing bookmarks
existing_names: list[str] = []
for el in doc_tree.iterfind(".//w:bookmarkStart", NSMAP):
name = el.get(_w("name"))
if name:
existing_names.append(name)
# Collect *top-level* body paragraphs (don't descend into tables etc.
# for now — MVP). The XPath ".//w:p" would include table cells too;
# for retrofitting we only care about the main flow.
body = doc_tree.find(f".//{_w('body')}")
if body is None:
raise ValueError("document has no <w:body>")
paragraphs = [p for p in body if p.tag == _w("p")]
if not paragraphs:
return {
"bookmarks_added": [],
"missing_blocks": [n for n, _ in BLOCK_ORDER],
"existing_bookmarks": existing_names,
}
block_starts = _detect_block_starts(paragraphs)
# Cover-block fallback: alef/bet/gimel/dalet are template metadata
# (judges, case number, parties, "החלטה" title) that don't appear in
# the body of user-edited DOCX files — they live in headers/template.
# Inject zero-content anchors at paragraph 0 so apply_user_edit can
# still target them later.
structural_fallback: list[str] = []
cover_blocks = ["block-alef", "block-bet", "block-gimel", "block-dalet"]
first_detected_idx = min(block_starts.values()) if block_starts else 0
for i, name in enumerate(cover_blocks):
if name not in block_starts:
idx = min(i, max(0, first_detected_idx - 1))
block_starts[name] = idx
structural_fallback.append(name)
# Calculate end_idx for each block = paragraph before the next block's start,
# or last paragraph if this is the last block found.
ordered_found = sorted(block_starts.items(), key=lambda kv: kv[1])
ranges: list[tuple[str, int, int]] = []
for i, (name, start_idx) in enumerate(ordered_found):
if i + 1 < len(ordered_found):
end_idx = ordered_found[i + 1][1] - 1
else:
end_idx = len(paragraphs) - 1
ranges.append((name, start_idx, max(start_idx, end_idx)))
# Backup if overwriting in place
if backup and output_path.resolve() == docx_path.resolve():
backup_path = docx_path.with_suffix(".pre-retrofit.docx")
shutil.copy2(str(docx_path), str(backup_path))
# Inject bookmarks, skipping any that already exist
next_id = _next_bookmark_id(doc_tree)
added: list[str] = []
for name, s, e in ranges:
if name in existing_names:
continue
_insert_bookmark_around_range(body, paragraphs, s, e, name, next_id)
added.append(name)
next_id += 1
_save_docx_xml(members, doc_tree, settings_tree, output_path)
missing = [
n for n, _ in BLOCK_ORDER
if n not in block_starts
and n not in existing_names
]
logger.info("retrofit %s: added=%s missing=%s structural=%s",
docx_path.name, added, missing, structural_fallback)
return {
"bookmarks_added": added,
"missing_blocks": missing,
"structural_fallback": structural_fallback,
"existing_bookmarks": existing_names,
}