"""Nevo proofreading service for training corpus. Strips Nevo editorial additions (front matter, back matter, page headers, watermarks, inline watermark codes) from legal decision DOCX/PDF/MD files. Also extracts metadata (decision number, date, subject categories) via heuristics on cleaned text. Used by: * CLI script: scripts/proofread_training_corpus.py * Web API: /api/training/analyze """ from __future__ import annotations import asyncio import re import time from datetime import date as date_type from pathlib import Path from typing import Any import fitz from docx import Document from google.cloud import vision from legal_mcp import config # ── Nevo pattern detection ──────────────────────────────────────── NEVO_PREAMBLE_HEADERS = ( "ספרות:", "חקיקה שאוזכרה:", "מיני-רציו:", ) DECISION_OPENING = re.compile( r"^(עניינו\s|ענייננו\s|עסקינן\s|בפנינו\s|לפנינו\s|בערר\s+שלפנינו|זהו\s+ערר)" ) DECISION_SECTION_HEADERS = { "רקע", "פתח דבר", "תמצית טענות הצדדים", "העובדות", "הרקע העובדתי", "מבוא", } NEVO_POSTAMBLE_MARKERS = ( "5129371512937154678313", "בעניין עריכה ושינויים במסמכי פסיקה", "נוסח מסמך זה כפוף לשינויי ניסוח ועריכה", ) NEVO_INLINE_CODE_RE = re.compile(r"^0?(5129371|54678313)\d*") PDF_PAGE_HEADER_RE = re.compile( r"\s*עמוד\s*\n?\s*\d+\s*\n?\s*(?:מתוך|בן)\s*\n?\s*\d+\s*" ) PDF_PAGE_ORPHAN_RE = re.compile(r"(?m)^עמוד[^\n]{0,12}$") PDF_PAGE_NUM_LINE_RE = re.compile(r"(?m)^\s*עמוד\s*\n?\s*\d+[·.*]?\s*$") NEVO_URL_RE = re.compile( r"(nevo\.co\.il|neto\.co\.il|netocoal|neetocoal|nevocoal|nevo\.co|rawo\.co\.il)", re.IGNORECASE, ) _FOOTER_JUNK_RE = re.compile( r"^(" r"\s*|" r"[-·*.\"\'׳״]+|" r"\d{1,3}[\s\-·*.\"\'׳״]*|" r"עמוד[\s\d\-·*.\"\'׳״]*|" r"[-·*\s\"\'׳״]*[a-zA-Z][a-zA-Z0-9 .\-·*_]{0,30}" r")$" ) # Hebrew abbreviation quote fixes — Google Vision renders ״ as 'יי' _HEBREW_ABBREV_FIXES: dict[str, str] = { "עוהייד": 'עוה"ד', "עוייד": 'עו"ד', "הנייל": 'הנ"ל', "מצייב": 'מצ"ב', "ביהמייש": 'ביהמ"ש', "תייז": 'ת"ז', "עייי": 'ע"י', "אחייכ": 'אח"כ', "סייק": 'ס"ק', "דייר": 'ד"ר', "חווייד": 'חוו"ד', "מייר": 'מ"ר', "יחייד": 'יח"ד', "בייכ": 'ב"כ', "בייה": 'ב"ה', "שייח": 'ש"ח', "יוייר": 'יו"ר', "בליימ": 'בל"מ', "תבייע": 'תב"ע', "תמייא": 'תמ"א', "סייה": 'ס"ה', "שייפ": 'ש"פ', "שצייפ": 'שצ"פ', "שבייצ": 'שב"צ', "עסיים": 'עס"ם', "הייה": 'ה"ה', "פסייד": 'פס"ד', "תיידא": 'תיד"א', "בגייץ": 'בג"ץ', "עתיים": 'עת"ם', "עעיים": 'עע"ם', "כייא": 'כ"א', "כייב": 'כ"ב', "כייג": 'כ"ג', "כייד": 'כ"ד', "כייה": 'כ"ה', "כייו": 'כ"ו', "כייז": 'כ"ז', "כייח": 'כ"ח', "כייט": 'כ"ט', "לייא": 'ל"א', "יייא": 'י"א', "יייב": 'י"ב', "יייג": 'י"ג', "יייד": 'י"ד', "טייו": 'ט"ו', "טייז": 'ט"ז', "יייז": 'י"ז', "יייח": 'י"ח', "יייט": 'י"ט', "תשפייא": 'תשפ"א', "תשפייב": 'תשפ"ב', "תשפייג": 'תשפ"ג', "תשפייד": 'תשפ"ד', "תשפייה": 'תשפ"ה', "תשפייו": 'תשפ"ו', "תשפיין": 'תשפ"ן', } _ABBREV_PATTERN = re.compile( "|".join(re.escape(k) for k in sorted(_HEBREW_ABBREV_FIXES, key=len, reverse=True)) ) def _fix_hebrew_quotes(text: str) -> str: return _ABBREV_PATTERN.sub(lambda m: _HEBREW_ABBREV_FIXES[m.group()], text) # ── Google Vision OCR ──────────────────────────────────────────── _vision_client: vision.ImageAnnotatorClient | None = None def _get_vision_client() -> vision.ImageAnnotatorClient: global _vision_client if _vision_client is None: if not config.GOOGLE_CLOUD_VISION_API_KEY: raise RuntimeError("GOOGLE_CLOUD_VISION_API_KEY not set") _vision_client = vision.ImageAnnotatorClient( client_options={"api_key": config.GOOGLE_CLOUD_VISION_API_KEY} ) return _vision_client def _ocr_page_image(image_bytes: bytes, page_num: int) -> str: client = _get_vision_client() image = vision.Image(content=image_bytes) response = client.document_text_detection( image=image, image_context=vision.ImageContext(language_hints=["he"]), ) if response.error.message: raise RuntimeError(f"Vision error page {page_num}: {response.error.message}") text = response.full_text_annotation.text if response.full_text_annotation else "" return _fix_hebrew_quotes(text) # ── DOCX proofreading ──────────────────────────────────────────── def _find_decision_start(paragraphs: list[str]) -> int: """Find first real decision paragraph, skipping Nevo preamble.""" has_nevo_preamble = any( any(p.startswith(h) for h in NEVO_PREAMBLE_HEADERS) for p in paragraphs[:10] ) if not has_nevo_preamble: return 0 for i, p in enumerate(paragraphs): stripped = p.strip() if stripped in DECISION_SECTION_HEADERS: return i if DECISION_OPENING.match(stripped): return i for i, p in enumerate(paragraphs): if "קבעה כלהלן" in p or "קבעה את הדברים הבאים" in p: for j in range(i + 1, min(i + 15, len(paragraphs))): if len(paragraphs[j]) > 80 and not paragraphs[j].strip().startswith("*"): return j break return min(10, len(paragraphs) - 1) def _find_decision_end(paragraphs: list[str]) -> int: """First paragraph that is a Nevo postamble marker (exclusive end).""" for i, p in enumerate(paragraphs): for marker in NEVO_POSTAMBLE_MARKERS: if marker in p: return i return len(paragraphs) def _strip_inline_nevo_codes(paragraphs: list[str]) -> list[str]: out: list[str] = [] for p in paragraphs: stripped = NEVO_INLINE_CODE_RE.sub("", p).strip() if stripped: out.append(stripped) return out def proofread_docx(path: Path) -> tuple[str, dict]: """Extract clean decision text from Nevo DOCX. Returns (markdown, stats).""" doc = Document(str(path)) paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] start = _find_decision_start(paragraphs) end = _find_decision_end(paragraphs) clean = _strip_inline_nevo_codes(paragraphs[start:end]) md = "\n\n".join(clean) return md, { "source_type": "docx", "total_paragraphs": len(paragraphs), "preamble_stripped": start, "postamble_stripped": len(paragraphs) - end, "clean_paragraphs": len(clean), } # ── PDF proofreading ───────────────────────────────────────────── def _clean_page_text(text: str) -> str: text = PDF_PAGE_HEADER_RE.sub("\n", text) lines = text.split("\n") while lines and _FOOTER_JUNK_RE.match(lines[-1].strip()): lines.pop() text = "\n".join(lines) text = NEVO_URL_RE.sub("", text) text = PDF_PAGE_NUM_LINE_RE.sub("", text) text = PDF_PAGE_ORPHAN_RE.sub("", text) return text.strip() async def proofread_pdf(path: Path) -> tuple[str, dict]: """Extract clean decision text from Nevo PDF via Google Vision OCR.""" doc = fitz.open(str(path)) pages: list[str] = [] for i, page in enumerate(doc): pix = page.get_pixmap(dpi=300) img_bytes = pix.tobytes("png") text = await asyncio.to_thread(_ocr_page_image, img_bytes, i + 1) pages.append(_clean_page_text(text)) await asyncio.sleep(0.1) doc.close() body = "\n\n".join(p for p in pages if p) body = re.sub(r"\n{3,}", "\n\n", body) body = re.sub(r"[ \t]+\n", "\n", body) for marker in NEVO_POSTAMBLE_MARKERS: idx = body.find(marker) if idx != -1: body = body[:idx].rstrip() break return body, { "source_type": "pdf", "pages": len(pages), "chars": len(body), } # ── MD/TXT passthrough ─────────────────────────────────────────── def proofread_md(path: Path) -> tuple[str, dict]: """Plain text passthrough for already-clean .md/.txt files.""" text = path.read_text(encoding="utf-8") return text, {"source_type": "md", "chars": len(text)} async def proofread(path: Path) -> tuple[str, dict]: """Proofread a file based on its extension. Returns (clean_text, stats).""" suffix = path.suffix.lower() if suffix == ".docx": return proofread_docx(path) if suffix == ".pdf": return await proofread_pdf(path) if suffix in (".md", ".txt"): return proofread_md(path) raise ValueError(f"Unsupported file type: {suffix}") # ── Metadata extraction ────────────────────────────────────────── FILENAME_NUMBER_PATTERNS = [ re.compile(r"^ARAR-(\d{2})-(\d{3,4})"), re.compile(r"^ערר\s+(\d{3,4})-(\d{2})"), re.compile(r"^ערר\s+(\d{3,4})\s*-"), ] LEGACY_MULTI_PATTERN = re.compile(r"(\d{3,4})\+(\d{3,4})") def decision_number_from_filename(stem: str) -> str | None: """Extract NUMBER/YY from a filename stem.""" m = FILENAME_NUMBER_PATTERNS[0].match(stem) if m: return f"{m.group(2)}/{m.group(1)}" m = FILENAME_NUMBER_PATTERNS[1].match(stem) if m: return f"{m.group(1)}/{m.group(2)}" m = FILENAME_NUMBER_PATTERNS[2].match(stem) if m: return f"{m.group(1)}/??" m = LEGACY_MULTI_PATTERN.search(stem) if m: return f"{m.group(1)}+{m.group(2)}/??" return None HEBREW_MONTHS = { "ינואר": 1, "בינואר": 1, "פברואר": 2, "בפברואר": 2, "מרץ": 3, "מרס": 3, "במרץ": 3, "במרס": 3, "אפריל": 4, "באפריל": 4, "מאי": 5, "במאי": 5, "יוני": 6, "ביוני": 6, "יולי": 7, "ביולי": 7, "אוגוסט": 8, "באוגוסט": 8, "ספטמבר": 9, "בספטמבר": 9, "אוקטובר": 10, "באוקטובר": 10, "נובמבר": 11, "בנובמבר": 11, "דצמבר": 12, "בדצמבר": 12, } DATE_RE = re.compile( r"(\d{1,2})\s+(ב?(?:ינואר|פברואר|מרץ|מרס|אפריל|מאי|יוני|יולי|אוגוסט|ספטמבר|אוקטובר|נובמבר|דצמבר))\s*[,.]?\s*(\d{4})" ) NITNA_RE = re.compile(r"ניתנ[הו]?\s+(?:פה\s+אחד|בדעת\s+רוב|היום)?") def decision_date_from_text(text: str) -> str | None: tail = text[-2500:] if len(text) > 2500 else text nitna_match = NITNA_RE.search(tail) search_text = tail[nitna_match.start():] if nitna_match else tail m = DATE_RE.search(search_text) if not m: m = DATE_RE.search(tail) if not m: return None day = int(m.group(1)) month = HEBREW_MONTHS.get(m.group(2)) year = int(m.group(3)) if not month: return None try: return date_type(year, month, day).isoformat() except ValueError: return None def finalize_decision_number(number: str | None, date_iso: str | None) -> str: if not number: return f"??/{date_iso[2:4]}" if date_iso else "" if number.endswith("/??"): return number.replace("/??", f"/{date_iso[2:4]}") if date_iso else number.replace("/??", "") return number def categorize(text: str) -> list[str]: """Heuristic subject category detection based on opening + repetition.""" opening = text[:2000] t = text cats: list[str] = [] if re.search(r'תמ[״"\']?א\s*38|תמא\s*38', t): cats.append('תמ"א 38') if len(re.findall(r"היטל(?:י)?\s+השבחה", t)) >= 3 or re.search(r"היטל(?:י)?\s+השבחה", opening): cats.append("היטל השבחה") p197_re = r"פיצויים\s+לפי\s+(?:ס(?:עיף|')\s*)?197|סעיף\s*197|ס['\"]?\s*197" if len(re.findall(p197_re, t)) >= 2 or re.search(p197_re, opening): cats.append("פיצויים 197") if t.count("שימוש חורג") >= 3 or "שימוש חורג" in opening: cats.append("שימוש חורג") if len(re.findall(r"\bהקלה\b|\bהקלות\b", t)) >= 3 and re.search(r"\bהקלה\b|\bהקלות\b", opening): cats.append("הקלה") if re.search(r"איחוד\s+וחלוקה|חלוקה\s+חדשה|תכנית\s+לחלוקה", t): cats.append("חלוקה") if re.search( r"הפקדת\s+ה?תכנית|אישור\s+ה?תכנית|המלצה\s+להפקיד|" r"להפקיד\s+את\s+ה?תכנית|לדון\s+בתכנית|דנה\s+בתכנית|" r"החלטה\s+לאשר\s+ה?תכנית", opening, ): cats.append("תכנית") if re.search(r"בקשה\s+להיתר|היתר\s+בני(?:י)?ה", opening): cats.append("היתר") has_permit_subject = "היתר" in cats or "הקלה" in cats or 'תמ"א 38' in cats if has_permit_subject and "בנייה" not in cats: cats.append("בנייה") return cats or ["בנייה"] async def analyze_file(path: Path) -> dict[str, Any]: """Proofread a file and extract metadata for review. Returns a dict suitable for UI preview with: clean text, metadata, stats, and a short text preview for visual verification. """ clean_text, stats = await proofread(path) num_raw = decision_number_from_filename(path.stem) d_iso = decision_date_from_text(clean_text) number = finalize_decision_number(num_raw, d_iso) cats = categorize(clean_text) return { "filename": path.name, "clean_text": clean_text, "preview": clean_text[:500], "decision_number": number, "decision_date": d_iso or "", "subject_categories": cats, "stats": stats, "chars": len(clean_text), }