"""עריכת DOCX עם Track Changes אמיתיים של Word. השירות מיועד לקבל DOCX קיים (עם bookmarks שזיהו אנקורים) ולהחיל עליו עריכות מסומנות כ-w:ins / w:del, שבאים לידי ביטוי ב-Word כ-Track Changes שהמשתמש יכול Accept/Reject. אסטרטגיית אנקורים: bookmarks בשמות כגון 'block-yod', 'block-yod-para-3' שמוכנסים בזמן הייצוא הראשוני (docx_exporter.py) או רטרואקטיבית (docx_retrofit.py). """ from __future__ import annotations import logging import shutil import zipfile from dataclasses import dataclass, field from datetime import datetime, timezone from io import BytesIO from pathlib import Path from typing import Literal from lxml import etree logger = logging.getLogger(__name__) # ── XML namespaces ───────────────────────────────────────────────── W_NS = "http://schemas.openxmlformats.org/wordprocessingml/2006/main" NSMAP = {"w": W_NS} def _w(tag: str) -> str: """Build a fully qualified tag name in the w: namespace.""" return f"{{{W_NS}}}{tag}" # ── Data models ──────────────────────────────────────────────────── RevisionType = Literal["insert_after", "insert_before", "replace", "delete"] StyleType = Literal["body", "quote", "heading", "bold"] @dataclass class Revision: """A single tracked change to apply to the DOCX.""" id: str type: RevisionType anchor_bookmark: str content: str = "" style: StyleType = "body" reason: str = "" anchor_position: Literal["start", "end"] = "end" @dataclass class RevisionResult: """Result of applying a single revision.""" id: str status: Literal["applied", "failed"] error: str | None = None ins_id: int | None = None @dataclass class RevisionBatchResult: """Aggregate result of applying a revision batch.""" applied: int = 0 failed: int = 0 results: list[RevisionResult] = field(default_factory=list) output_path: str = "" # ── XML helpers ──────────────────────────────────────────────────── def _load_docx_xml(docx_path: Path) -> tuple[dict[str, bytes], etree._Element, etree._Element]: """Load a DOCX as a dict of zip members + parsed document/settings trees.""" members: dict[str, bytes] = {} with zipfile.ZipFile(docx_path, "r") as zf: for name in zf.namelist(): members[name] = zf.read(name) if "word/document.xml" not in members: raise ValueError(f"{docx_path}: missing word/document.xml") document_tree = etree.fromstring(members["word/document.xml"]) settings_bytes = members.get("word/settings.xml") if settings_bytes: settings_tree = etree.fromstring(settings_bytes) else: settings_tree = etree.Element(_w("settings"), nsmap=NSMAP) return members, document_tree, settings_tree def _save_docx_xml( members: dict[str, bytes], document_tree: etree._Element, settings_tree: etree._Element, output_path: Path, ) -> None: """Write a DOCX back to disk with updated document/settings XML.""" members = dict(members) members["word/document.xml"] = etree.tostring( document_tree, xml_declaration=True, encoding="UTF-8", standalone=True ) members["word/settings.xml"] = etree.tostring( settings_tree, xml_declaration=True, encoding="UTF-8", standalone=True ) output_path.parent.mkdir(parents=True, exist_ok=True) buffer = BytesIO() with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf: for name, data in members.items(): zf.writestr(name, data) output_path.write_bytes(buffer.getvalue()) def _ensure_track_revisions(settings_tree: etree._Element) -> None: """Ensure is present in settings.xml. Note: This enables *display* of track changes — actual w:ins/w:del nodes are rendered as tracked regardless. Word respects trackRevisions for recording further user edits too. """ existing = settings_tree.find(_w("trackRevisions")) if existing is None: el = etree.SubElement(settings_tree, _w("trackRevisions")) el.set(_w("val"), "true") def _next_revision_id(document_tree: etree._Element) -> int: """Find max existing w:id on w:ins/w:del/w:bookmarkStart and return next.""" max_id = 0 for xpath in ( ".//w:ins", ".//w:del", ".//w:bookmarkStart", ".//w:bookmarkEnd", ".//w:commentRangeStart", ".//w:comment", ): for el in document_tree.iterfind(xpath, NSMAP): val = el.get(_w("id")) if val: try: max_id = max(max_id, int(val)) except ValueError: pass return max_id + 1 def _find_bookmark( document_tree: etree._Element, name: str ) -> tuple[etree._Element | None, etree._Element | None]: """Find w:bookmarkStart and w:bookmarkEnd elements by bookmark name.""" start = None end = None for el in document_tree.iterfind(".//w:bookmarkStart", NSMAP): if el.get(_w("name")) == name: start = el break if start is None: return None, None bm_id = start.get(_w("id")) for el in document_tree.iterfind(".//w:bookmarkEnd", NSMAP): if el.get(_w("id")) == bm_id: end = el break return start, end def _find_enclosing_paragraph(element: etree._Element) -> etree._Element | None: """Walk up from an element to find its enclosing w:p.""" cur = element while cur is not None: if cur.tag == _w("p"): return cur cur = cur.getparent() return None # ── Paragraph builders ───────────────────────────────────────────── def _build_run(text: str, *, bold: bool = False, italic: bool = False, font: str = "David", size_half_pt: int | None = None) -> etree._Element: """Build a w:r (run) element with RTL/David defaults and given text.""" r = etree.Element(_w("r")) rPr = etree.SubElement(r, _w("rPr")) rFonts = etree.SubElement(rPr, _w("rFonts")) rFonts.set(_w("ascii"), font) rFonts.set(_w("hAnsi"), font) rFonts.set(_w("cs"), font) rFonts.set(_w("hint"), "cs") if size_half_pt is not None: sz = etree.SubElement(rPr, _w("sz")) sz.set(_w("val"), str(size_half_pt)) szCs = etree.SubElement(rPr, _w("szCs")) szCs.set(_w("val"), str(size_half_pt)) if bold: etree.SubElement(rPr, _w("b")) etree.SubElement(rPr, _w("bCs")) if italic: etree.SubElement(rPr, _w("i")) etree.SubElement(rPr, _w("iCs")) etree.SubElement(rPr, _w("rtl")) t = etree.SubElement(r, _w("t")) t.set("{http://www.w3.org/XML/1998/namespace}space", "preserve") t.text = text return r def _build_paragraph(text: str, *, style: StyleType = "body") -> etree._Element: """Build a w:p (paragraph) with RTL + David + given text.""" p = etree.Element(_w("p")) pPr = etree.SubElement(p, _w("pPr")) bidi = etree.SubElement(pPr, _w("bidi")) bidi.set(_w("val"), "1") # Right alignment for body/RTL jc = etree.SubElement(pPr, _w("jc")) jc.set(_w("val"), "right") rPr_p = etree.SubElement(pPr, _w("rPr")) etree.SubElement(rPr_p, _w("rtl")) bold = style in ("heading", "bold") italic = style == "quote" size = None if style == "heading": size = 28 # 14pt elif style == "quote": size = 22 # 11pt run = _build_run(text, bold=bold, italic=italic, size_half_pt=size) p.append(run) return p def _wrap_in_ins(elements: list[etree._Element], *, ins_id: int, author: str, date_iso: str) -> etree._Element: """Wrap a list of *run-level* elements in a single .""" ins = etree.Element(_w("ins")) ins.set(_w("id"), str(ins_id)) ins.set(_w("author"), author) ins.set(_w("date"), date_iso) for el in elements: ins.append(el) return ins def _make_tracked_paragraph_insert( text: str, *, style: StyleType, ins_id: int, author: str, date_iso: str, mark_id: int | None = None, ) -> etree._Element: """Build a whole tracked-inserted paragraph. DOCX convention for a fully-inserted paragraph: 1. All runs are wrapped in a single (own id). 2. The paragraph's pPr/rPr gets an marker for the paragraph mark itself (pilcrow) — this uses its *own* id. """ if mark_id is None: mark_id = ins_id p = _build_paragraph(text, style=style) pPr = p.find(_w("pPr")) assert pPr is not None rPr = pPr.find(_w("rPr")) if rPr is None: rPr = etree.SubElement(pPr, _w("rPr")) ins_mark = etree.SubElement(rPr, _w("ins")) ins_mark.set(_w("id"), str(mark_id)) ins_mark.set(_w("author"), author) ins_mark.set(_w("date"), date_iso) runs = [child for child in list(p) if child.tag == _w("r")] if runs: for r in runs: p.remove(r) ins = _wrap_in_ins(runs, ins_id=ins_id, author=author, date_iso=date_iso) p.append(ins) return p def _mark_runs_as_deleted(paragraph: etree._Element, *, del_id: int, author: str, date_iso: str) -> None: """Convert all in a paragraph to -wrapped runs. Within a , must become . """ runs = [child for child in list(paragraph) if child.tag == _w("r")] if not runs: return # Convert inside each run for r in runs: for t in r.findall(_w("t")): t.tag = _w("delText") paragraph.remove(r) wrapper = etree.Element(_w("del")) wrapper.set(_w("id"), str(del_id)) wrapper.set(_w("author"), author) wrapper.set(_w("date"), date_iso) for r in runs: wrapper.append(r) paragraph.append(wrapper) # ── Revision application ─────────────────────────────────────────── def _apply_insert( document_tree: etree._Element, revision: Revision, *, ins_id: int, author: str, date_iso: str, ) -> RevisionResult: """Apply insert_after / insert_before relative to a bookmark.""" start, end = _find_bookmark(document_tree, revision.anchor_bookmark) if start is None: return RevisionResult(id=revision.id, status="failed", error=f"bookmark '{revision.anchor_bookmark}' not found") # Pick anchor element based on position if revision.type == "insert_before": anchor = start else: # insert_after — default anchor = end if end is not None else start enclosing_p = _find_enclosing_paragraph(anchor) if enclosing_p is None: return RevisionResult(id=revision.id, status="failed", error="anchor has no enclosing paragraph") # Build new tracked paragraph. ins_id for run wrapper, ins_id+1 for mark. new_p = _make_tracked_paragraph_insert( revision.content, style=revision.style, ins_id=ins_id, mark_id=ins_id + 1, author=author, date_iso=date_iso, ) parent = enclosing_p.getparent() if parent is None: return RevisionResult(id=revision.id, status="failed", error="enclosing paragraph has no parent") idx = list(parent).index(enclosing_p) insert_idx = idx if revision.type == "insert_before" else idx + 1 parent.insert(insert_idx, new_p) return RevisionResult(id=revision.id, status="applied", ins_id=ins_id) def _apply_delete( document_tree: etree._Element, revision: Revision, *, del_id: int, author: str, date_iso: str, ) -> RevisionResult: """Mark the paragraph enclosed by a bookmark as deleted.""" start, end = _find_bookmark(document_tree, revision.anchor_bookmark) if start is None: return RevisionResult(id=revision.id, status="failed", error=f"bookmark '{revision.anchor_bookmark}' not found") enclosing_p = _find_enclosing_paragraph(start) if enclosing_p is None: return RevisionResult(id=revision.id, status="failed", error="anchor has no enclosing paragraph") _mark_runs_as_deleted(enclosing_p, del_id=del_id, author=author, date_iso=date_iso) return RevisionResult(id=revision.id, status="applied", ins_id=del_id) def _apply_replace( document_tree: etree._Element, revision: Revision, *, ins_id: int, del_id: int, author: str, date_iso: str, ) -> RevisionResult: """Replace = delete the existing paragraph + insert new one after it.""" start, end = _find_bookmark(document_tree, revision.anchor_bookmark) if start is None: return RevisionResult(id=revision.id, status="failed", error=f"bookmark '{revision.anchor_bookmark}' not found") enclosing_p = _find_enclosing_paragraph(start) if enclosing_p is None: return RevisionResult(id=revision.id, status="failed", error="anchor has no enclosing paragraph") parent = enclosing_p.getparent() if parent is None: return RevisionResult(id=revision.id, status="failed", error="enclosing paragraph has no parent") new_p = _make_tracked_paragraph_insert( revision.content, style=revision.style, ins_id=ins_id, mark_id=ins_id + 1, author=author, date_iso=date_iso, ) idx = list(parent).index(enclosing_p) parent.insert(idx + 1, new_p) _mark_runs_as_deleted(enclosing_p, del_id=del_id, author=author, date_iso=date_iso) return RevisionResult(id=revision.id, status="applied", ins_id=ins_id) # ── Public API ───────────────────────────────────────────────────── def apply_tracked_revisions( source_path: str | Path, output_path: str | Path, revisions: list[Revision], *, author: str = "מערכת AI", date: datetime | None = None, ) -> RevisionBatchResult: """Apply a batch of tracked revisions to a DOCX, producing a new DOCX. The source file is never mutated. Output is a new DOCX with / markers that Word renders as Track Changes (Accept/Reject). Args: source_path: existing DOCX (e.g. עריכה-v1.docx) — retains user edits. output_path: where to write the revised DOCX (e.g. טיוטה-v6.docx). revisions: list of Revision objects. Anchors are bookmark names. author: displayed as the revision author in Word. date: revision timestamp (defaults to now, UTC). Returns: RevisionBatchResult with per-revision status. """ source_path = Path(source_path) output_path = Path(output_path) if date is None: date = datetime.now(timezone.utc) date_iso = date.strftime("%Y-%m-%dT%H:%M:%SZ") members, doc_tree, settings_tree = _load_docx_xml(source_path) _ensure_track_revisions(settings_tree) next_id = _next_revision_id(doc_tree) batch = RevisionBatchResult() for rev in revisions: try: if rev.type in ("insert_after", "insert_before"): result = _apply_insert(doc_tree, rev, ins_id=next_id, author=author, date_iso=date_iso) # insert consumes 2 IDs: run-wrapper + paragraph-mark next_id += 2 elif rev.type == "delete": result = _apply_delete(doc_tree, rev, del_id=next_id, author=author, date_iso=date_iso) next_id += 1 elif rev.type == "replace": result = _apply_replace(doc_tree, rev, ins_id=next_id, del_id=next_id + 2, author=author, date_iso=date_iso) # replace consumes 3 IDs: ins-run, ins-mark, del next_id += 3 else: result = RevisionResult(id=rev.id, status="failed", error=f"unknown type: {rev.type}") except Exception as e: # pragma: no cover - defensive logger.exception("revision %s failed", rev.id) result = RevisionResult(id=rev.id, status="failed", error=str(e)) batch.results.append(result) if result.status == "applied": batch.applied += 1 else: batch.failed += 1 _save_docx_xml(members, doc_tree, settings_tree, output_path) batch.output_path = str(output_path) logger.info("applied %d revisions (failed %d) → %s", batch.applied, batch.failed, output_path) return batch def list_bookmarks(docx_path: str | Path) -> list[str]: """Return bookmark names present in the DOCX (excluding '_' internal ones).""" docx_path = Path(docx_path) members, doc_tree, _ = _load_docx_xml(docx_path) names: list[str] = [] for el in doc_tree.iterfind(".//w:bookmarkStart", NSMAP): name = el.get(_w("name")) if name and not name.startswith("_"): names.append(name) return names def copy_with_revisions( source_path: str | Path, output_path: str | Path, ) -> None: """Copy source → output unchanged (used when revisions list is empty).""" shutil.copy2(str(source_path), str(output_path))