Add Track Changes architecture for draft revisions (CMP + CMPA)
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m29s

Fixes critical bug in 1033-25: user-uploaded עריכה-*.docx files were
orphaned on disk while exports kept rebuilding from stale DB blocks.

New architecture:
- User-uploaded DOCX becomes the source of truth (cases.active_draft_path)
- System edits via XML surgery with real Word <w:ins>/<w:del> revisions
- User can Accept/Reject each change from within Word

Components:
- docx_reviser.py: XML surgery for Track Changes (15 tests)
- docx_retrofit.py: retroactive bookmark injection with Hebrew marker
  detection + heading heuristic (9 tests)
- docx_exporter.py: emits bookmarks around each of the 12 blocks
- 3 new MCP tools: apply_user_edit, list_bookmarks, revise_draft
- 4 new/updated endpoints: upload (auto-registers active draft),
  /exports/revise, /exports/bookmarks, /exports/{filename}/retrofit,
  /active-draft
- DB migration: cases.active_draft_path column
- UI: correct banner using real v-numbers, "מקור האמת" badge,
  detailed upload toast with bookmarks_added/missing_blocks
- agents: legal-exporter (3 export modes), legal-ceo (stage G for
  revision handling), legal-writer (revision mode)

Multi-tenancy:
- Works for both CMP (1xxx cases) and CMPA (8xxx/9xxx cases)
- New revise-draft skill added to both companies
- deploy-track-changes.sh syncs skills CMP ↔ CMPA
- retrofit_case.py: one-off retrofit of existing files

Tests: 34 passing (15 reviser + 9 retrofit + 4 exporter bookmarks + 6 e2e)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-16 18:49:30 +00:00
parent 28daff58be
commit 726498126d
20 changed files with 2419 additions and 23 deletions

View File

@@ -161,6 +161,11 @@ ALTER TABLE decisions ADD COLUMN IF NOT EXISTS outcome_reasoning TEXT DEFAULT ''
ALTER TABLE cases ADD COLUMN IF NOT EXISTS appeal_type TEXT DEFAULT '';
ALTER TABLE cases ADD COLUMN IF NOT EXISTS practice_area TEXT DEFAULT 'appeals_committee';
ALTER TABLE cases ADD COLUMN IF NOT EXISTS appeal_subtype TEXT DEFAULT '';
-- active_draft_path = path to the DOCX that is the current source of truth
-- for this case's decision text. Set to the latest טיוטה-v*.docx after export,
-- or the latest עריכה-v*.docx after user upload. Used by revise_draft to know
-- what file to base Track Changes revisions on.
ALTER TABLE cases ADD COLUMN IF NOT EXISTS active_draft_path TEXT;
-- הרחבת style_corpus עם practice_area / appeal_subtype
ALTER TABLE style_corpus ADD COLUMN IF NOT EXISTS practice_area TEXT DEFAULT 'appeals_committee';
@@ -520,6 +525,25 @@ async def get_case(case_id: UUID) -> dict | None:
return _row_to_case(row)
async def set_active_draft_path(case_id: UUID, path: str | None) -> None:
"""Update the case's active_draft_path (the DOCX that is source of truth)."""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(
"UPDATE cases SET active_draft_path = $1, updated_at = now() WHERE id = $2",
path, case_id,
)
async def get_active_draft_path(case_id: UUID) -> str | None:
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT active_draft_path FROM cases WHERE id = $1", case_id,
)
return row["active_draft_path"] if row else None
async def get_case_by_number(case_number: str) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:

View File

@@ -58,6 +58,57 @@ def _set_rtl_section(section) -> None:
sectPr.append(bidi)
# ── Bookmark helpers ──────────────────────────────────────────────
# Keep a per-document bookmark id counter. Bookmarks must have unique ids
# across the whole document; we start from a high value to avoid collisions
# with whatever Word's default template already assigned.
_BOOKMARK_ID_START = 10000
def _insert_bookmark_start(paragraph, name: str, bm_id: int) -> None:
"""Insert a <w:bookmarkStart> at the beginning of a paragraph."""
el = OxmlElement("w:bookmarkStart")
el.set(qn("w:id"), str(bm_id))
el.set(qn("w:name"), name)
paragraph._p.insert(0, el)
def _insert_bookmark_end(paragraph, bm_id: int) -> None:
"""Insert a <w:bookmarkEnd> at the end of a paragraph."""
el = OxmlElement("w:bookmarkEnd")
el.set(qn("w:id"), str(bm_id))
paragraph._p.append(el)
def _wrap_block_with_bookmarks(doc, block_name: str,
write_block_fn, bm_counter: list[int]) -> None:
"""Write a block with bookmarkStart before and bookmarkEnd after.
Uses a mutable counter (list of one int) so the caller keeps state
across multiple blocks.
"""
# Record paragraph count before writing
body = doc.element.body
before_count = len([c for c in body if c.tag == qn("w:p")])
write_block_fn()
after_count = len([c for c in body if c.tag == qn("w:p")])
if after_count == before_count:
# Block produced no paragraphs — nothing to wrap
return
# Use python-docx's paragraph indexing
first_new = doc.paragraphs[before_count]
last_new = doc.paragraphs[after_count - 1]
bm_counter[0] += 1
bm_id = bm_counter[0]
_insert_bookmark_start(first_new, block_name, bm_id)
_insert_bookmark_end(last_new, bm_id)
def _add_paragraph(doc, text: str, style: str = "Normal",
bold: bool = False, font_size=None,
alignment=None, space_after: Pt | None = None) -> None:
@@ -160,14 +211,22 @@ async def export_decision(case_id: UUID, output_path: str | None = None) -> str:
section.right_margin = PAGE_MARGIN
_set_rtl_section(section)
# Write blocks
# Write blocks with bookmarks wrapping each block (anchors for revisions)
bm_counter = [_BOOKMARK_ID_START]
for block in blocks:
block_id = block["block_id"]
content = block["content"] or ""
if not content.strip():
continue
_write_block_to_docx(doc, block_id, block["title"], content)
_wrap_block_with_bookmarks(
doc,
f"block-{block_id}",
lambda b=block, bid=block_id, c=content: _write_block_to_docx(
doc, bid, b["title"], c,
),
bm_counter,
)
# Determine output path — versioned under cases/{case_number}/exports/
if not output_path:

View File

@@ -0,0 +1,290 @@
"""הזרקת bookmarks רטרואקטיבית ל-DOCX שלא נוצרו ע"י ה-exporter.
כאשר משתמש מעלה `עריכה-v*.docx` שנערך ב-Word מחוץ למערכת, אין בו את ה-
bookmarks שאנו מצפים להם (block-alef ... block-yod-bet). השירות כאן
מזהה את תחילת כל בלוק לפי סימני הפתיחה העבריים (א., ב., ... יב.) ב-
הפסקאות הראשונות שלו, ומזריק bookmarkStart/bookmarkEnd בהתאם.
נעשה בצורה defensive — אם לא מצליחים לזהות בלוק, הוא פשוט לא יקבל
bookmark (`missing_blocks` בתוצאה). השרת אמור להתריע למשתמש.
"""
from __future__ import annotations
import logging
import re
import shutil
import zipfile
from io import BytesIO
from pathlib import Path
from lxml import etree
from legal_mcp.services.docx_reviser import (
NSMAP,
_load_docx_xml,
_save_docx_xml,
_w,
)
logger = logging.getLogger(__name__)
# ── Block identification ──────────────────────────────────────────
# The 12 blocks in order, with their Hebrew letter marker
BLOCK_ORDER = [
("block-alef", "א"),
("block-bet", "ב"),
("block-gimel", "ג"),
("block-dalet", "ד"),
("block-heh", "ה"),
("block-vav", "ו"),
("block-zayin", "ז"),
("block-chet", "ח"),
("block-tet", "ט"),
("block-yod", "י"),
("block-yod-alef", "יא"),
("block-yod-bet", "יב"),
]
# Regex matching a paragraph that begins with a Hebrew block marker
# followed by '.', ')', ' ', or end-of-string. The marker must be followed
# either by whitespace/punctuation or end of text to avoid matching longer
# words that happen to start with these letters.
_BLOCK_MARKERS_BY_LETTER: dict[str, str] = {letter: name for name, letter in BLOCK_ORDER}
# Longer markers (יא, יב) first so regex matches them before falling back to 'י'
_MARKER_ALTERNATION = "|".join(
re.escape(letter)
for letter in sorted(_BLOCK_MARKERS_BY_LETTER, key=len, reverse=True)
)
_BLOCK_MARKER_RE = re.compile(
rf"^\s*({_MARKER_ALTERNATION})\s*[\.\)\-]\s*"
)
# Secondary heuristic: Hebrew section headings that reliably mark the
# start of each block in the Daphna Tamir style (used when markers
# "א.", "ב." etc. are missing — common in user-edited Word files).
#
# Key observations from the 12-block schema:
# block-alef: "בפני: דפנה תמיר" or decision number page
# block-bet: "ערר מספר" line
# block-gimel: appellants vs respondents (parties)
# block-dalet: bold "החלטה" centered
# block-heh: "רקע" / "רקע עובדתי" / "פתח דבר"
# block-vav: "תכניות חלות" / "ההליך שבפנינו" / "ההליכים בפני"
# block-zayin: "תמצית טענות" / "טענות הצדדים"
# block-chet: "תגובת המשיבה" / "עמדת הוועדה"
# block-tet: "ההליכים בפני ועדת הערר" / "הדיון בפנינו"
# block-yod: "דיון והכרעה" / "דיון"
# block-yod-alef: "סוף דבר" / "סיכום"
# block-yod-bet: "ההחלטה" (signature / closing block)
_BLOCK_HEADING_PATTERNS: list[tuple[str, list[str]]] = [
("block-alef", [r"בפני[:\s]", r"ועדת הערר"]),
("block-bet", [r"^ערר\s+מספר", r"^ערר\s+\d"]),
("block-gimel", [r"^נגד\s*$", r"^—\s*נגד\s*—"]),
("block-dalet", [r"^החלטה\s*$"]),
("block-heh", [r"^רקע\s*$", r"^רקע\s+עובדתי", r"^פתח\s+דבר"]),
("block-vav", [r"^תכניות\s+חלות", r"^ההליכים?\s+שבפנינו", r"^ההליכים?\s+בפני\s+הוועדה\s+המקומית"]),
("block-zayin", [r"^תמצית\s+טענות", r"^טענות\s+הצדדים", r"^טענות\s+העוררי"]),
("block-chet", [r"^תגובת\s+המשיב", r"^עמדת\s+הוועדה\s+המקומית", r"^תשובת"]),
("block-tet", [r"^ההליכים?\s+בפני\s+ועדת\s+הערר", r"^הדיון\s+בפנינו"]),
("block-yod", [r"^דיון\s+והכרעה", r"^דיון\s*$", r"^ההכרעה"]),
("block-yod-alef", [r"^סוף\s+דבר", r"^סיכום\s*$"]),
("block-yod-bet", [r"^ההחלטה\s*$", r"^על\s+כן[,\.]?"]),
]
_COMPILED_HEADING_PATTERNS: list[tuple[str, list[re.Pattern[str]]]] = [
(name, [re.compile(p) for p in patterns])
for name, patterns in _BLOCK_HEADING_PATTERNS
]
def _paragraph_text(p: etree._Element) -> str:
"""Return the full text of a paragraph, joining all w:t nodes."""
return "".join(p.itertext()).strip()
def _detect_block_starts(
paragraphs: list[etree._Element],
) -> dict[str, int]:
"""Return a mapping of block_name → paragraph index (start of that block).
Uses a greedy scan: for each paragraph, if its text starts with an
expected block marker and the block hasn't been assigned yet, assign
this paragraph as the block's start.
"""
found: dict[str, int] = {}
expected_order = [name for name, _ in BLOCK_ORDER]
pointer = 0 # index into expected_order — next expected block
for i, p in enumerate(paragraphs):
text = _paragraph_text(p)
if not text:
continue
matched_name: str | None = None
# Try marker-based (א., ב., ...) first
m = _BLOCK_MARKER_RE.match(text)
if m:
letter = m.group(1)
matched_name = _BLOCK_MARKERS_BY_LETTER.get(letter)
# Fall back to heading-keyword heuristic (Daphna style)
if matched_name is None:
for name, patterns in _COMPILED_HEADING_PATTERNS:
if name in found:
continue
# Only check patterns for blocks we haven't assigned yet
# AND that come at/after the current pointer — to keep the
# greedy forward-scan semantics consistent with markers.
if expected_order.index(name) < pointer:
continue
if any(pat.search(text) for pat in patterns):
matched_name = name
break
if matched_name is None:
continue
if matched_name in found:
continue
if pointer >= len(expected_order):
continue
name_idx_in_order = expected_order.index(matched_name)
if name_idx_in_order >= pointer:
found[matched_name] = i
pointer = name_idx_in_order + 1
return found
def _insert_bookmark_around_range(
body: etree._Element,
paragraphs: list[etree._Element],
start_idx: int,
end_idx: int,
name: str,
bm_id: int,
) -> None:
"""Insert bookmarkStart at the start of paragraph start_idx and
bookmarkEnd at the end of paragraph end_idx."""
start_el = etree.Element(_w("bookmarkStart"))
start_el.set(_w("id"), str(bm_id))
start_el.set(_w("name"), name)
end_el = etree.Element(_w("bookmarkEnd"))
end_el.set(_w("id"), str(bm_id))
start_p = paragraphs[start_idx]
end_p = paragraphs[end_idx]
start_p.insert(0, start_el)
end_p.append(end_el)
def _next_bookmark_id(doc_tree: etree._Element) -> int:
"""Find max existing bookmark id and return next unused."""
max_id = 9999
for el in doc_tree.iterfind(".//w:bookmarkStart", NSMAP):
wid = el.get(_w("id"))
if wid:
try:
max_id = max(max_id, int(wid))
except ValueError:
pass
return max_id + 1
# ── Public API ────────────────────────────────────────────────────
def retrofit_bookmarks(
docx_path: str | Path,
*,
output_path: str | Path | None = None,
backup: bool = True,
) -> dict:
"""Inject block-* bookmarks into an existing DOCX via heuristic detection.
Args:
docx_path: path to DOCX file (modified in place unless output_path set).
output_path: if given, write to this path instead of overwriting.
backup: if True and writing in place, save the original as
`<path>.pre-retrofit.docx` first.
Returns:
{
'bookmarks_added': ['block-alef', ...],
'missing_blocks': ['block-dalet', ...],
'existing_bookmarks': [...] # bookmarks already on the doc
}
"""
docx_path = Path(docx_path)
if not docx_path.exists():
raise FileNotFoundError(str(docx_path))
if output_path is None:
output_path = docx_path
output_path = Path(output_path)
members, doc_tree, settings_tree = _load_docx_xml(docx_path)
# Existing bookmarks
existing_names: list[str] = []
for el in doc_tree.iterfind(".//w:bookmarkStart", NSMAP):
name = el.get(_w("name"))
if name:
existing_names.append(name)
# Collect *top-level* body paragraphs (don't descend into tables etc.
# for now — MVP). The XPath ".//w:p" would include table cells too;
# for retrofitting we only care about the main flow.
body = doc_tree.find(f".//{_w('body')}")
if body is None:
raise ValueError("document has no <w:body>")
paragraphs = [p for p in body if p.tag == _w("p")]
if not paragraphs:
return {
"bookmarks_added": [],
"missing_blocks": [n for n, _ in BLOCK_ORDER],
"existing_bookmarks": existing_names,
}
block_starts = _detect_block_starts(paragraphs)
# Calculate end_idx for each block = paragraph before the next block's start,
# or last paragraph if this is the last block found.
ordered_found = sorted(block_starts.items(), key=lambda kv: kv[1])
ranges: list[tuple[str, int, int]] = []
for i, (name, start_idx) in enumerate(ordered_found):
if i + 1 < len(ordered_found):
end_idx = ordered_found[i + 1][1] - 1
else:
end_idx = len(paragraphs) - 1
ranges.append((name, start_idx, max(start_idx, end_idx)))
# Backup if overwriting in place
if backup and output_path.resolve() == docx_path.resolve():
backup_path = docx_path.with_suffix(".pre-retrofit.docx")
shutil.copy2(str(docx_path), str(backup_path))
# Inject bookmarks, skipping any that already exist
next_id = _next_bookmark_id(doc_tree)
added: list[str] = []
for name, s, e in ranges:
if name in existing_names:
continue
_insert_bookmark_around_range(body, paragraphs, s, e, name, next_id)
added.append(name)
next_id += 1
_save_docx_xml(members, doc_tree, settings_tree, output_path)
missing = [n for n, _ in BLOCK_ORDER if n not in block_starts and n not in existing_names]
logger.info("retrofit %s: added=%s missing=%s",
docx_path.name, added, missing)
return {
"bookmarks_added": added,
"missing_blocks": missing,
"existing_bookmarks": existing_names,
}

View File

@@ -0,0 +1,514 @@
"""עריכת DOCX עם Track Changes אמיתיים של Word.
השירות מיועד לקבל DOCX קיים (עם bookmarks שזיהו אנקורים) ולהחיל עליו
עריכות מסומנות כ-w:ins / w:del, שבאים לידי ביטוי ב-Word כ-Track Changes
שהמשתמש יכול Accept/Reject.
אסטרטגיית אנקורים: bookmarks בשמות כגון 'block-yod', 'block-yod-para-3'
שמוכנסים בזמן הייצוא הראשוני (docx_exporter.py) או רטרואקטיבית
(docx_retrofit.py).
"""
from __future__ import annotations
import logging
import shutil
import zipfile
from dataclasses import dataclass, field
from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
from typing import Literal
from lxml import etree
logger = logging.getLogger(__name__)
# ── XML namespaces ─────────────────────────────────────────────────
W_NS = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
NSMAP = {"w": W_NS}
def _w(tag: str) -> str:
"""Build a fully qualified tag name in the w: namespace."""
return f"{{{W_NS}}}{tag}"
# ── Data models ────────────────────────────────────────────────────
RevisionType = Literal["insert_after", "insert_before", "replace", "delete"]
StyleType = Literal["body", "quote", "heading", "bold"]
@dataclass
class Revision:
"""A single tracked change to apply to the DOCX."""
id: str
type: RevisionType
anchor_bookmark: str
content: str = ""
style: StyleType = "body"
reason: str = ""
anchor_position: Literal["start", "end"] = "end"
@dataclass
class RevisionResult:
"""Result of applying a single revision."""
id: str
status: Literal["applied", "failed"]
error: str | None = None
ins_id: int | None = None
@dataclass
class RevisionBatchResult:
"""Aggregate result of applying a revision batch."""
applied: int = 0
failed: int = 0
results: list[RevisionResult] = field(default_factory=list)
output_path: str = ""
# ── XML helpers ────────────────────────────────────────────────────
def _load_docx_xml(docx_path: Path) -> tuple[dict[str, bytes], etree._Element, etree._Element]:
"""Load a DOCX as a dict of zip members + parsed document/settings trees."""
members: dict[str, bytes] = {}
with zipfile.ZipFile(docx_path, "r") as zf:
for name in zf.namelist():
members[name] = zf.read(name)
if "word/document.xml" not in members:
raise ValueError(f"{docx_path}: missing word/document.xml")
document_tree = etree.fromstring(members["word/document.xml"])
settings_bytes = members.get("word/settings.xml")
if settings_bytes:
settings_tree = etree.fromstring(settings_bytes)
else:
settings_tree = etree.Element(_w("settings"), nsmap=NSMAP)
return members, document_tree, settings_tree
def _save_docx_xml(
members: dict[str, bytes],
document_tree: etree._Element,
settings_tree: etree._Element,
output_path: Path,
) -> None:
"""Write a DOCX back to disk with updated document/settings XML."""
members = dict(members)
members["word/document.xml"] = etree.tostring(
document_tree, xml_declaration=True, encoding="UTF-8", standalone=True
)
members["word/settings.xml"] = etree.tostring(
settings_tree, xml_declaration=True, encoding="UTF-8", standalone=True
)
output_path.parent.mkdir(parents=True, exist_ok=True)
buffer = BytesIO()
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for name, data in members.items():
zf.writestr(name, data)
output_path.write_bytes(buffer.getvalue())
def _ensure_track_revisions(settings_tree: etree._Element) -> None:
"""Ensure <w:trackRevisions/> is present in settings.xml.
Note: This enables *display* of track changes — actual w:ins/w:del nodes
are rendered as tracked regardless. Word respects trackRevisions for
recording further user edits too.
"""
existing = settings_tree.find(_w("trackRevisions"))
if existing is None:
el = etree.SubElement(settings_tree, _w("trackRevisions"))
el.set(_w("val"), "true")
def _next_revision_id(document_tree: etree._Element) -> int:
"""Find max existing w:id on w:ins/w:del/w:bookmarkStart and return next."""
max_id = 0
for xpath in (
".//w:ins", ".//w:del", ".//w:bookmarkStart", ".//w:bookmarkEnd",
".//w:commentRangeStart", ".//w:comment",
):
for el in document_tree.iterfind(xpath, NSMAP):
val = el.get(_w("id"))
if val:
try:
max_id = max(max_id, int(val))
except ValueError:
pass
return max_id + 1
def _find_bookmark(
document_tree: etree._Element, name: str
) -> tuple[etree._Element | None, etree._Element | None]:
"""Find w:bookmarkStart and w:bookmarkEnd elements by bookmark name."""
start = None
end = None
for el in document_tree.iterfind(".//w:bookmarkStart", NSMAP):
if el.get(_w("name")) == name:
start = el
break
if start is None:
return None, None
bm_id = start.get(_w("id"))
for el in document_tree.iterfind(".//w:bookmarkEnd", NSMAP):
if el.get(_w("id")) == bm_id:
end = el
break
return start, end
def _find_enclosing_paragraph(element: etree._Element) -> etree._Element | None:
"""Walk up from an element to find its enclosing w:p."""
cur = element
while cur is not None:
if cur.tag == _w("p"):
return cur
cur = cur.getparent()
return None
# ── Paragraph builders ─────────────────────────────────────────────
def _build_run(text: str, *, bold: bool = False, italic: bool = False,
font: str = "David", size_half_pt: int | None = None) -> etree._Element:
"""Build a w:r (run) element with RTL/David defaults and given text."""
r = etree.Element(_w("r"))
rPr = etree.SubElement(r, _w("rPr"))
rFonts = etree.SubElement(rPr, _w("rFonts"))
rFonts.set(_w("ascii"), font)
rFonts.set(_w("hAnsi"), font)
rFonts.set(_w("cs"), font)
rFonts.set(_w("hint"), "cs")
if size_half_pt is not None:
sz = etree.SubElement(rPr, _w("sz"))
sz.set(_w("val"), str(size_half_pt))
szCs = etree.SubElement(rPr, _w("szCs"))
szCs.set(_w("val"), str(size_half_pt))
if bold:
etree.SubElement(rPr, _w("b"))
etree.SubElement(rPr, _w("bCs"))
if italic:
etree.SubElement(rPr, _w("i"))
etree.SubElement(rPr, _w("iCs"))
etree.SubElement(rPr, _w("rtl"))
t = etree.SubElement(r, _w("t"))
t.set("{http://www.w3.org/XML/1998/namespace}space", "preserve")
t.text = text
return r
def _build_paragraph(text: str, *, style: StyleType = "body") -> etree._Element:
"""Build a w:p (paragraph) with RTL + David + given text."""
p = etree.Element(_w("p"))
pPr = etree.SubElement(p, _w("pPr"))
bidi = etree.SubElement(pPr, _w("bidi"))
bidi.set(_w("val"), "1")
# Right alignment for body/RTL
jc = etree.SubElement(pPr, _w("jc"))
jc.set(_w("val"), "right")
rPr_p = etree.SubElement(pPr, _w("rPr"))
etree.SubElement(rPr_p, _w("rtl"))
bold = style in ("heading", "bold")
italic = style == "quote"
size = None
if style == "heading":
size = 28 # 14pt
elif style == "quote":
size = 22 # 11pt
run = _build_run(text, bold=bold, italic=italic, size_half_pt=size)
p.append(run)
return p
def _wrap_in_ins(elements: list[etree._Element], *, ins_id: int,
author: str, date_iso: str) -> etree._Element:
"""Wrap a list of *run-level* elements in a single <w:ins>."""
ins = etree.Element(_w("ins"))
ins.set(_w("id"), str(ins_id))
ins.set(_w("author"), author)
ins.set(_w("date"), date_iso)
for el in elements:
ins.append(el)
return ins
def _make_tracked_paragraph_insert(
text: str, *, style: StyleType, ins_id: int, author: str, date_iso: str,
mark_id: int | None = None,
) -> etree._Element:
"""Build a whole tracked-inserted paragraph.
DOCX convention for a fully-inserted paragraph:
1. All <w:r> runs are wrapped in a single <w:ins> (own id).
2. The paragraph's pPr/rPr gets an <w:ins> marker for the paragraph
mark itself (pilcrow) — this uses its *own* id.
"""
if mark_id is None:
mark_id = ins_id
p = _build_paragraph(text, style=style)
pPr = p.find(_w("pPr"))
assert pPr is not None
rPr = pPr.find(_w("rPr"))
if rPr is None:
rPr = etree.SubElement(pPr, _w("rPr"))
ins_mark = etree.SubElement(rPr, _w("ins"))
ins_mark.set(_w("id"), str(mark_id))
ins_mark.set(_w("author"), author)
ins_mark.set(_w("date"), date_iso)
runs = [child for child in list(p) if child.tag == _w("r")]
if runs:
for r in runs:
p.remove(r)
ins = _wrap_in_ins(runs, ins_id=ins_id, author=author, date_iso=date_iso)
p.append(ins)
return p
def _mark_runs_as_deleted(paragraph: etree._Element, *, del_id: int,
author: str, date_iso: str) -> None:
"""Convert all <w:r> in a paragraph to <w:del>-wrapped runs.
Within a <w:del>, <w:t> must become <w:delText>.
"""
runs = [child for child in list(paragraph) if child.tag == _w("r")]
if not runs:
return
# Convert <w:t> → <w:delText> inside each run
for r in runs:
for t in r.findall(_w("t")):
t.tag = _w("delText")
paragraph.remove(r)
wrapper = etree.Element(_w("del"))
wrapper.set(_w("id"), str(del_id))
wrapper.set(_w("author"), author)
wrapper.set(_w("date"), date_iso)
for r in runs:
wrapper.append(r)
paragraph.append(wrapper)
# ── Revision application ───────────────────────────────────────────
def _apply_insert(
document_tree: etree._Element,
revision: Revision,
*,
ins_id: int,
author: str,
date_iso: str,
) -> RevisionResult:
"""Apply insert_after / insert_before relative to a bookmark."""
start, end = _find_bookmark(document_tree, revision.anchor_bookmark)
if start is None:
return RevisionResult(id=revision.id, status="failed",
error=f"bookmark '{revision.anchor_bookmark}' not found")
# Pick anchor element based on position
if revision.type == "insert_before":
anchor = start
else: # insert_after — default
anchor = end if end is not None else start
enclosing_p = _find_enclosing_paragraph(anchor)
if enclosing_p is None:
return RevisionResult(id=revision.id, status="failed",
error="anchor has no enclosing paragraph")
# Build new tracked paragraph. ins_id for run wrapper, ins_id+1 for mark.
new_p = _make_tracked_paragraph_insert(
revision.content, style=revision.style,
ins_id=ins_id, mark_id=ins_id + 1,
author=author, date_iso=date_iso,
)
parent = enclosing_p.getparent()
if parent is None:
return RevisionResult(id=revision.id, status="failed",
error="enclosing paragraph has no parent")
idx = list(parent).index(enclosing_p)
insert_idx = idx if revision.type == "insert_before" else idx + 1
parent.insert(insert_idx, new_p)
return RevisionResult(id=revision.id, status="applied", ins_id=ins_id)
def _apply_delete(
document_tree: etree._Element,
revision: Revision,
*,
del_id: int,
author: str,
date_iso: str,
) -> RevisionResult:
"""Mark the paragraph enclosed by a bookmark as deleted."""
start, end = _find_bookmark(document_tree, revision.anchor_bookmark)
if start is None:
return RevisionResult(id=revision.id, status="failed",
error=f"bookmark '{revision.anchor_bookmark}' not found")
enclosing_p = _find_enclosing_paragraph(start)
if enclosing_p is None:
return RevisionResult(id=revision.id, status="failed",
error="anchor has no enclosing paragraph")
_mark_runs_as_deleted(enclosing_p, del_id=del_id,
author=author, date_iso=date_iso)
return RevisionResult(id=revision.id, status="applied", ins_id=del_id)
def _apply_replace(
document_tree: etree._Element,
revision: Revision,
*,
ins_id: int,
del_id: int,
author: str,
date_iso: str,
) -> RevisionResult:
"""Replace = delete the existing paragraph + insert new one after it."""
start, end = _find_bookmark(document_tree, revision.anchor_bookmark)
if start is None:
return RevisionResult(id=revision.id, status="failed",
error=f"bookmark '{revision.anchor_bookmark}' not found")
enclosing_p = _find_enclosing_paragraph(start)
if enclosing_p is None:
return RevisionResult(id=revision.id, status="failed",
error="anchor has no enclosing paragraph")
parent = enclosing_p.getparent()
if parent is None:
return RevisionResult(id=revision.id, status="failed",
error="enclosing paragraph has no parent")
new_p = _make_tracked_paragraph_insert(
revision.content, style=revision.style,
ins_id=ins_id, mark_id=ins_id + 1,
author=author, date_iso=date_iso,
)
idx = list(parent).index(enclosing_p)
parent.insert(idx + 1, new_p)
_mark_runs_as_deleted(enclosing_p, del_id=del_id,
author=author, date_iso=date_iso)
return RevisionResult(id=revision.id, status="applied", ins_id=ins_id)
# ── Public API ─────────────────────────────────────────────────────
def apply_tracked_revisions(
source_path: str | Path,
output_path: str | Path,
revisions: list[Revision],
*,
author: str = "מערכת AI",
date: datetime | None = None,
) -> RevisionBatchResult:
"""Apply a batch of tracked revisions to a DOCX, producing a new DOCX.
The source file is never mutated. Output is a new DOCX with <w:ins> /
<w:del> markers that Word renders as Track Changes (Accept/Reject).
Args:
source_path: existing DOCX (e.g. עריכה-v1.docx) — retains user edits.
output_path: where to write the revised DOCX (e.g. טיוטה-v6.docx).
revisions: list of Revision objects. Anchors are bookmark names.
author: displayed as the revision author in Word.
date: revision timestamp (defaults to now, UTC).
Returns:
RevisionBatchResult with per-revision status.
"""
source_path = Path(source_path)
output_path = Path(output_path)
if date is None:
date = datetime.now(timezone.utc)
date_iso = date.strftime("%Y-%m-%dT%H:%M:%SZ")
members, doc_tree, settings_tree = _load_docx_xml(source_path)
_ensure_track_revisions(settings_tree)
next_id = _next_revision_id(doc_tree)
batch = RevisionBatchResult()
for rev in revisions:
try:
if rev.type in ("insert_after", "insert_before"):
result = _apply_insert(doc_tree, rev, ins_id=next_id,
author=author, date_iso=date_iso)
# insert consumes 2 IDs: run-wrapper + paragraph-mark
next_id += 2
elif rev.type == "delete":
result = _apply_delete(doc_tree, rev, del_id=next_id,
author=author, date_iso=date_iso)
next_id += 1
elif rev.type == "replace":
result = _apply_replace(doc_tree, rev,
ins_id=next_id, del_id=next_id + 2,
author=author, date_iso=date_iso)
# replace consumes 3 IDs: ins-run, ins-mark, del
next_id += 3
else:
result = RevisionResult(id=rev.id, status="failed",
error=f"unknown type: {rev.type}")
except Exception as e: # pragma: no cover - defensive
logger.exception("revision %s failed", rev.id)
result = RevisionResult(id=rev.id, status="failed", error=str(e))
batch.results.append(result)
if result.status == "applied":
batch.applied += 1
else:
batch.failed += 1
_save_docx_xml(members, doc_tree, settings_tree, output_path)
batch.output_path = str(output_path)
logger.info("applied %d revisions (failed %d) → %s",
batch.applied, batch.failed, output_path)
return batch
def list_bookmarks(docx_path: str | Path) -> list[str]:
"""Return bookmark names present in the DOCX (excluding '_' internal ones)."""
docx_path = Path(docx_path)
members, doc_tree, _ = _load_docx_xml(docx_path)
names: list[str] = []
for el in doc_tree.iterfind(".//w:bookmarkStart", NSMAP):
name = el.get(_w("name"))
if name and not name.startswith("_"):
names.append(name)
return names
def copy_with_revisions(
source_path: str | Path, output_path: str | Path,
) -> None:
"""Copy source → output unchanged (used when revisions list is empty)."""
shutil.copy2(str(source_path), str(output_path))