Files
legal-ai/mcp-server/src/legal_mcp/services/docx_reviser.py
Chaim b2ea0c28dd feat(storage): X14 Phase 2c — route remaining sync write-sites through storage.py
Completes the write-side rewiring (INV-STG1) for the call-sites that run in
synchronous contexts, via a new blocking facade in storage.py
(put_bytes_sync / put_file_sync — asyncio.run, or a worker thread when a loop
is already running):
- services/extractor.py: multimodal thumbnail JPEGs → DERIVED (rendered in a
  to_thread worker)
- services/docx_reviser.py: track-changes save (_save_docx_xml) + empty-diff
  copy (copy_with_revisions) → DOCUMENTS
- services/docx_retrofit.py: in-place retrofit backup → DOCUMENTS

Each site keeps a fallback to a direct disk write when the target path is
outside DATA_DIR (caller-provided). Under the default STORAGE_BACKEND=
filesystem the bytes land exactly where they did before — zero behaviour
change.

Also: mcp_env_catalog MINIO_ENDPOINT default updated to the durable
container-name endpoint (http://minio-bx2ykvw94xbutsex41hz4vv8:9000), matching
the Coolify "Connect to Predefined Network" change made for network durability.

All binary write-sites now flow through storage.py. git-tracked text
(case.json/notes/research-md/draft-md) stays on disk by design (INV-STG7);
court-fetch temp files are ephemeral.

tests: +2 (thumbnail renderer routes through storage; put_bytes_sync
round-trip); 55 storage/docx/track-changes green; 244 collected, no import
breakage.

Keeps G2; completes INV-STG1 write coverage. Spec: docs/spec/X14-storage-minio.md.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 08:26:09 +00:00

540 lines
19 KiB
Python

"""עריכת DOCX עם Track Changes אמיתיים של Word.
השירות מיועד לקבל DOCX קיים (עם bookmarks שזיהו אנקורים) ולהחיל עליו
עריכות מסומנות כ-w:ins / w:del, שבאים לידי ביטוי ב-Word כ-Track Changes
שהמשתמש יכול Accept/Reject.
אסטרטגיית אנקורים: bookmarks בשמות כגון 'block-yod', 'block-yod-para-3'
שמוכנסים בזמן הייצוא הראשוני (docx_exporter.py) או רטרואקטיבית
(docx_retrofit.py).
"""
from __future__ import annotations
import logging
import shutil
from legal_mcp import config
from legal_mcp.services import storage
import zipfile
from dataclasses import dataclass, field
from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
from typing import Literal
from lxml import etree
logger = logging.getLogger(__name__)
# ── XML namespaces ─────────────────────────────────────────────────
W_NS = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
NSMAP = {"w": W_NS}
def _w(tag: str) -> str:
"""Build a fully qualified tag name in the w: namespace."""
return f"{{{W_NS}}}{tag}"
# ── Data models ────────────────────────────────────────────────────
RevisionType = Literal["insert_after", "insert_before", "replace", "delete"]
StyleType = Literal["body", "quote", "heading", "bold"]
@dataclass
class Revision:
"""A single tracked change to apply to the DOCX."""
id: str
type: RevisionType
anchor_bookmark: str
content: str = ""
style: StyleType = "body"
reason: str = ""
anchor_position: Literal["start", "end"] = "end"
@dataclass
class RevisionResult:
"""Result of applying a single revision."""
id: str
status: Literal["applied", "failed"]
error: str | None = None
ins_id: int | None = None
@dataclass
class RevisionBatchResult:
"""Aggregate result of applying a revision batch."""
applied: int = 0
failed: int = 0
results: list[RevisionResult] = field(default_factory=list)
output_path: str = ""
# ── XML helpers ────────────────────────────────────────────────────
def _load_docx_xml(docx_path: Path) -> tuple[dict[str, bytes], etree._Element, etree._Element]:
"""Load a DOCX as a dict of zip members + parsed document/settings trees."""
members: dict[str, bytes] = {}
with zipfile.ZipFile(docx_path, "r") as zf:
for name in zf.namelist():
members[name] = zf.read(name)
if "word/document.xml" not in members:
raise ValueError(f"{docx_path}: missing word/document.xml")
document_tree = etree.fromstring(members["word/document.xml"])
settings_bytes = members.get("word/settings.xml")
if settings_bytes:
settings_tree = etree.fromstring(settings_bytes)
else:
settings_tree = etree.Element(_w("settings"), nsmap=NSMAP)
return members, document_tree, settings_tree
_DOCX_CTYPE = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
def _persist_docx_sync(output_path: Path, data: bytes) -> None:
"""Persist DOCX bytes through the storage layer (INV-STG1); fall back to a
direct disk write when output_path is outside DATA_DIR (caller-provided)."""
out = Path(output_path)
try:
key = out.resolve().relative_to(Path(config.DATA_DIR).resolve()).as_posix()
storage.put_bytes_sync(key, data, bucket=storage.Bucket.DOCUMENTS,
content_type=_DOCX_CTYPE)
except ValueError:
out.parent.mkdir(parents=True, exist_ok=True)
out.write_bytes(data)
def _save_docx_xml(
members: dict[str, bytes],
document_tree: etree._Element,
settings_tree: etree._Element,
output_path: Path,
) -> None:
"""Write a DOCX back to disk with updated document/settings XML."""
members = dict(members)
members["word/document.xml"] = etree.tostring(
document_tree, xml_declaration=True, encoding="UTF-8", standalone=True
)
members["word/settings.xml"] = etree.tostring(
settings_tree, xml_declaration=True, encoding="UTF-8", standalone=True
)
buffer = BytesIO()
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for name, data in members.items():
zf.writestr(name, data)
_persist_docx_sync(output_path, buffer.getvalue())
def _ensure_track_revisions(settings_tree: etree._Element) -> None:
"""Ensure <w:trackRevisions/> is present in settings.xml.
Note: This enables *display* of track changes — actual w:ins/w:del nodes
are rendered as tracked regardless. Word respects trackRevisions for
recording further user edits too.
"""
existing = settings_tree.find(_w("trackRevisions"))
if existing is None:
el = etree.SubElement(settings_tree, _w("trackRevisions"))
el.set(_w("val"), "true")
def _next_revision_id(document_tree: etree._Element) -> int:
"""Find max existing w:id on w:ins/w:del/w:bookmarkStart and return next."""
max_id = 0
for xpath in (
".//w:ins", ".//w:del", ".//w:bookmarkStart", ".//w:bookmarkEnd",
".//w:commentRangeStart", ".//w:comment",
):
for el in document_tree.iterfind(xpath, NSMAP):
val = el.get(_w("id"))
if val:
try:
max_id = max(max_id, int(val))
except ValueError:
pass
return max_id + 1
def _find_bookmark(
document_tree: etree._Element, name: str
) -> tuple[etree._Element | None, etree._Element | None]:
"""Find w:bookmarkStart and w:bookmarkEnd elements by bookmark name."""
start = None
end = None
for el in document_tree.iterfind(".//w:bookmarkStart", NSMAP):
if el.get(_w("name")) == name:
start = el
break
if start is None:
return None, None
bm_id = start.get(_w("id"))
for el in document_tree.iterfind(".//w:bookmarkEnd", NSMAP):
if el.get(_w("id")) == bm_id:
end = el
break
return start, end
def _find_enclosing_paragraph(element: etree._Element) -> etree._Element | None:
"""Walk up from an element to find its enclosing w:p."""
cur = element
while cur is not None:
if cur.tag == _w("p"):
return cur
cur = cur.getparent()
return None
# ── Paragraph builders ─────────────────────────────────────────────
def _build_run(text: str, *, bold: bool = False, italic: bool = False,
font: str = "David", size_half_pt: int | None = None) -> etree._Element:
"""Build a w:r (run) element with RTL/David defaults and given text."""
r = etree.Element(_w("r"))
rPr = etree.SubElement(r, _w("rPr"))
rFonts = etree.SubElement(rPr, _w("rFonts"))
rFonts.set(_w("ascii"), font)
rFonts.set(_w("hAnsi"), font)
rFonts.set(_w("cs"), font)
rFonts.set(_w("hint"), "cs")
if size_half_pt is not None:
sz = etree.SubElement(rPr, _w("sz"))
sz.set(_w("val"), str(size_half_pt))
szCs = etree.SubElement(rPr, _w("szCs"))
szCs.set(_w("val"), str(size_half_pt))
if bold:
etree.SubElement(rPr, _w("b"))
etree.SubElement(rPr, _w("bCs"))
if italic:
etree.SubElement(rPr, _w("i"))
etree.SubElement(rPr, _w("iCs"))
etree.SubElement(rPr, _w("rtl"))
t = etree.SubElement(r, _w("t"))
t.set("{http://www.w3.org/XML/1998/namespace}space", "preserve")
t.text = text
return r
def _build_paragraph(text: str, *, style: StyleType = "body") -> etree._Element:
"""Build a w:p (paragraph) with RTL + David + given text."""
p = etree.Element(_w("p"))
pPr = etree.SubElement(p, _w("pPr"))
bidi = etree.SubElement(pPr, _w("bidi"))
bidi.set(_w("val"), "1")
# Right alignment for body/RTL
jc = etree.SubElement(pPr, _w("jc"))
jc.set(_w("val"), "right")
rPr_p = etree.SubElement(pPr, _w("rPr"))
etree.SubElement(rPr_p, _w("rtl"))
bold = style in ("heading", "bold")
italic = style == "quote"
size = None
if style == "heading":
size = 28 # 14pt
elif style == "quote":
size = 22 # 11pt
run = _build_run(text, bold=bold, italic=italic, size_half_pt=size)
p.append(run)
return p
def _wrap_in_ins(elements: list[etree._Element], *, ins_id: int,
author: str, date_iso: str) -> etree._Element:
"""Wrap a list of *run-level* elements in a single <w:ins>."""
ins = etree.Element(_w("ins"))
ins.set(_w("id"), str(ins_id))
ins.set(_w("author"), author)
ins.set(_w("date"), date_iso)
for el in elements:
ins.append(el)
return ins
def _make_tracked_paragraph_insert(
text: str, *, style: StyleType, ins_id: int, author: str, date_iso: str,
mark_id: int | None = None,
) -> etree._Element:
"""Build a whole tracked-inserted paragraph.
DOCX convention for a fully-inserted paragraph:
1. All <w:r> runs are wrapped in a single <w:ins> (own id).
2. The paragraph's pPr/rPr gets an <w:ins> marker for the paragraph
mark itself (pilcrow) — this uses its *own* id.
"""
if mark_id is None:
mark_id = ins_id
p = _build_paragraph(text, style=style)
pPr = p.find(_w("pPr"))
assert pPr is not None
rPr = pPr.find(_w("rPr"))
if rPr is None:
rPr = etree.SubElement(pPr, _w("rPr"))
ins_mark = etree.SubElement(rPr, _w("ins"))
ins_mark.set(_w("id"), str(mark_id))
ins_mark.set(_w("author"), author)
ins_mark.set(_w("date"), date_iso)
runs = [child for child in list(p) if child.tag == _w("r")]
if runs:
for r in runs:
p.remove(r)
ins = _wrap_in_ins(runs, ins_id=ins_id, author=author, date_iso=date_iso)
p.append(ins)
return p
def _mark_runs_as_deleted(paragraph: etree._Element, *, del_id: int,
author: str, date_iso: str) -> None:
"""Convert all <w:r> in a paragraph to <w:del>-wrapped runs.
Within a <w:del>, <w:t> must become <w:delText>.
"""
runs = [child for child in list(paragraph) if child.tag == _w("r")]
if not runs:
return
# Convert <w:t> → <w:delText> inside each run
for r in runs:
for t in r.findall(_w("t")):
t.tag = _w("delText")
paragraph.remove(r)
wrapper = etree.Element(_w("del"))
wrapper.set(_w("id"), str(del_id))
wrapper.set(_w("author"), author)
wrapper.set(_w("date"), date_iso)
for r in runs:
wrapper.append(r)
paragraph.append(wrapper)
# ── Revision application ───────────────────────────────────────────
def _apply_insert(
document_tree: etree._Element,
revision: Revision,
*,
ins_id: int,
author: str,
date_iso: str,
) -> RevisionResult:
"""Apply insert_after / insert_before relative to a bookmark."""
start, end = _find_bookmark(document_tree, revision.anchor_bookmark)
if start is None:
return RevisionResult(id=revision.id, status="failed",
error=f"bookmark '{revision.anchor_bookmark}' not found")
# Pick anchor element based on position
if revision.type == "insert_before":
anchor = start
else: # insert_after — default
anchor = end if end is not None else start
enclosing_p = _find_enclosing_paragraph(anchor)
if enclosing_p is None:
return RevisionResult(id=revision.id, status="failed",
error="anchor has no enclosing paragraph")
# Build new tracked paragraph. ins_id for run wrapper, ins_id+1 for mark.
new_p = _make_tracked_paragraph_insert(
revision.content, style=revision.style,
ins_id=ins_id, mark_id=ins_id + 1,
author=author, date_iso=date_iso,
)
parent = enclosing_p.getparent()
if parent is None:
return RevisionResult(id=revision.id, status="failed",
error="enclosing paragraph has no parent")
idx = list(parent).index(enclosing_p)
insert_idx = idx if revision.type == "insert_before" else idx + 1
parent.insert(insert_idx, new_p)
return RevisionResult(id=revision.id, status="applied", ins_id=ins_id)
def _apply_delete(
document_tree: etree._Element,
revision: Revision,
*,
del_id: int,
author: str,
date_iso: str,
) -> RevisionResult:
"""Mark the paragraph enclosed by a bookmark as deleted."""
start, end = _find_bookmark(document_tree, revision.anchor_bookmark)
if start is None:
return RevisionResult(id=revision.id, status="failed",
error=f"bookmark '{revision.anchor_bookmark}' not found")
enclosing_p = _find_enclosing_paragraph(start)
if enclosing_p is None:
return RevisionResult(id=revision.id, status="failed",
error="anchor has no enclosing paragraph")
_mark_runs_as_deleted(enclosing_p, del_id=del_id,
author=author, date_iso=date_iso)
return RevisionResult(id=revision.id, status="applied", ins_id=del_id)
def _apply_replace(
document_tree: etree._Element,
revision: Revision,
*,
ins_id: int,
del_id: int,
author: str,
date_iso: str,
) -> RevisionResult:
"""Replace = delete the existing paragraph + insert new one after it."""
start, end = _find_bookmark(document_tree, revision.anchor_bookmark)
if start is None:
return RevisionResult(id=revision.id, status="failed",
error=f"bookmark '{revision.anchor_bookmark}' not found")
enclosing_p = _find_enclosing_paragraph(start)
if enclosing_p is None:
return RevisionResult(id=revision.id, status="failed",
error="anchor has no enclosing paragraph")
parent = enclosing_p.getparent()
if parent is None:
return RevisionResult(id=revision.id, status="failed",
error="enclosing paragraph has no parent")
new_p = _make_tracked_paragraph_insert(
revision.content, style=revision.style,
ins_id=ins_id, mark_id=ins_id + 1,
author=author, date_iso=date_iso,
)
idx = list(parent).index(enclosing_p)
parent.insert(idx + 1, new_p)
_mark_runs_as_deleted(enclosing_p, del_id=del_id,
author=author, date_iso=date_iso)
return RevisionResult(id=revision.id, status="applied", ins_id=ins_id)
# ── Public API ─────────────────────────────────────────────────────
def apply_tracked_revisions(
source_path: str | Path,
output_path: str | Path,
revisions: list[Revision],
*,
author: str = "מערכת AI",
date: datetime | None = None,
) -> RevisionBatchResult:
"""Apply a batch of tracked revisions to a DOCX, producing a new DOCX.
The source file is never mutated. Output is a new DOCX with <w:ins> /
<w:del> markers that Word renders as Track Changes (Accept/Reject).
Args:
source_path: existing DOCX (e.g. עריכה-v1.docx) — retains user edits.
output_path: where to write the revised DOCX (e.g. טיוטה-v6.docx).
revisions: list of Revision objects. Anchors are bookmark names.
author: displayed as the revision author in Word.
date: revision timestamp (defaults to now, UTC).
Returns:
RevisionBatchResult with per-revision status.
"""
source_path = Path(source_path)
output_path = Path(output_path)
if date is None:
date = datetime.now(timezone.utc)
date_iso = date.strftime("%Y-%m-%dT%H:%M:%SZ")
members, doc_tree, settings_tree = _load_docx_xml(source_path)
_ensure_track_revisions(settings_tree)
next_id = _next_revision_id(doc_tree)
batch = RevisionBatchResult()
for rev in revisions:
try:
if rev.type in ("insert_after", "insert_before"):
result = _apply_insert(doc_tree, rev, ins_id=next_id,
author=author, date_iso=date_iso)
# insert consumes 2 IDs: run-wrapper + paragraph-mark
next_id += 2
elif rev.type == "delete":
result = _apply_delete(doc_tree, rev, del_id=next_id,
author=author, date_iso=date_iso)
next_id += 1
elif rev.type == "replace":
result = _apply_replace(doc_tree, rev,
ins_id=next_id, del_id=next_id + 2,
author=author, date_iso=date_iso)
# replace consumes 3 IDs: ins-run, ins-mark, del
next_id += 3
else:
result = RevisionResult(id=rev.id, status="failed",
error=f"unknown type: {rev.type}")
except Exception as e: # pragma: no cover - defensive
logger.exception("revision %s failed", rev.id)
result = RevisionResult(id=rev.id, status="failed", error=str(e))
batch.results.append(result)
if result.status == "applied":
batch.applied += 1
else:
batch.failed += 1
_save_docx_xml(members, doc_tree, settings_tree, output_path)
batch.output_path = str(output_path)
logger.info("applied %d revisions (failed %d) → %s",
batch.applied, batch.failed, output_path)
return batch
def list_bookmarks(docx_path: str | Path) -> list[str]:
"""Return bookmark names present in the DOCX (excluding '_' internal ones)."""
docx_path = Path(docx_path)
members, doc_tree, _ = _load_docx_xml(docx_path)
names: list[str] = []
for el in doc_tree.iterfind(".//w:bookmarkStart", NSMAP):
name = el.get(_w("name"))
if name and not name.startswith("_"):
names.append(name)
return names
def copy_with_revisions(
source_path: str | Path, output_path: str | Path,
) -> None:
"""Copy source → output unchanged (used when revisions list is empty)."""
out = Path(output_path)
try:
key = out.resolve().relative_to(Path(config.DATA_DIR).resolve()).as_posix()
storage.put_file_sync(source_path, key, bucket=storage.Bucket.DOCUMENTS,
content_type=_DOCX_CTYPE)
except ValueError:
out.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(str(source_path), str(out))