"""Proofread training corpus: strip Nevo additions from DOCX/PDF, output clean Markdown. Nevo DOCX additions: Front: ספרות / חקיקה שאוזכרה / מיני-רציו / topic tags / Nevo summary paragraphs Back: 5129371512937154678313 / "בעניין עריכה ושינויים" link / "54678313-..." / "נוסח מסמך זה כפוף" Nevo PDF additions: "עמוד X מתוך Y" header on every page PDF text extraction uses Google Cloud Vision OCR — PyMuPDF fragments Hebrew RTL text unusably (words split mid-word, reading order broken). OCR gives clean output. """ from __future__ import annotations import io import os import re import sys import time from pathlib import Path import fitz from docx import Document # Load GOOGLE_CLOUD_VISION_API_KEY from ~/.env if not already set if not os.environ.get("GOOGLE_CLOUD_VISION_API_KEY"): env_path = Path.home() / ".env" if env_path.exists(): for line in env_path.read_text().splitlines(): if line.startswith("GOOGLE_CLOUD_VISION_API_KEY="): os.environ["GOOGLE_CLOUD_VISION_API_KEY"] = line.split("=", 1)[1].strip().strip('"').strip("'") break from google.cloud import vision # noqa: E402 TRAINING_DIR = Path("/home/chaim/legal-ai/data/training") OUTPUT_DIR = TRAINING_DIR / "proofread" RAW_DIR = TRAINING_DIR / "raw" # ── Nevo pattern detection ──────────────────────────────────────── NEVO_PREAMBLE_HEADERS = ( "ספרות:", "חקיקה שאוזכרה:", "מיני-רציו:", ) # Strong decision-opening patterns — highly distinctive first words of real decision # body. These rarely appear inside Nevo's own summary block, so first match wins. DECISION_OPENING = re.compile( r"^(עניינו\s|ענייננו\s|עסקינן\s|בפנינו\s|לפנינו\s|בערר\s+שלפנינו|זהו\s+ערר)" ) # Section headers that definitively mark decision body start. DECISION_SECTION_HEADERS = { "רקע", "פתח דבר", "תמצית טענות הצדדים", "העובדות", "הרקע העובדתי", "מבוא", } # Nevo postamble markers — everything from first match onwards is stripped. NEVO_POSTAMBLE_MARKERS = ( "5129371512937154678313", "בעניין עריכה ושינויים במסמכי פסיקה", "נוסח מסמך זה כפוף לשינויי ניסוח ועריכה", ) # Nevo inline watermark codes — appear as prefixes embedded in real paragraphs # (e.g. "5129371ניתנה פה אחד" or "054678313האם ההיתר..."). These must be # stripped from paragraph content, not used as postamble boundaries. NEVO_INLINE_CODE_RE = re.compile(r"^0?(5129371|54678313)\d*") # Nevo PDF page header: "עמוד X מתוך Y" or "עמוד X בן Y" (Hebrew variants) PDF_PAGE_HEADER_RE = re.compile( r"\s*עמוד\s*\n?\s*\d+\s*\n?\s*(?:מתוך|בן)\s*\n?\s*\d+\s*" ) # Short orphan lines starting with "עמוד" — OCR artifacts from merged footer text # (e.g. "עמודירבי", "עמוד :", "עמודי", "עמוד ר"). Conservative: up to 12 chars. PDF_PAGE_ORPHAN_RE = re.compile(r"(?m)^עמוד[^\n]{0,12}$") # "עמוד" followed by number (with optional garbled Nevo URL line after) PDF_PAGE_BLOCK_RE = re.compile( r"(?m)^\s*עמוד\s*\n\s*\d+[·.]?\s*\n[^\n]*\n", re.UNICODE ) # Standalone "עמוד N" at line start PDF_PAGE_NUM_LINE_RE = re.compile(r"(?m)^\s*עמוד\s*\n?\s*\d+[·.]?\s*$") # Nevo watermark URL (and common OCR-garbled variants) NEVO_URL_RE = re.compile( r"(nevo\.co\.il|neto\.co\.il|netocoal|neetocoal|nevocoal|nevo\.co|rawo\.co\.il)", re.IGNORECASE, ) def find_decision_start(paragraphs: list[str]) -> int: """Find index of first real decision paragraph, skipping Nevo preamble. Strategy: 1. If no Nevo headers present → start at 0. 2. Otherwise, scan past Nevo headers; look for first paragraph matching DECISION_OPENING regex or DECISION_SECTION_HEADERS. 3. Fallback: first paragraph after "ועדת הערר ... קבעה כלהלן:" bullet block that doesn't look like summary (heuristic: longer, has proper sentence). """ has_nevo_preamble = any( any(p.startswith(h) for h in NEVO_PREAMBLE_HEADERS) for p in paragraphs[:10] ) if not has_nevo_preamble: return 0 # Scan for strong decision-opening markers for i, p in enumerate(paragraphs): stripped = p.strip() if stripped in DECISION_SECTION_HEADERS: return i if DECISION_OPENING.match(stripped): return i # Fallback: find "ועדת הערר ... קבעה כלהלן" and take first long para after bullets for i, p in enumerate(paragraphs): if "קבעה כלהלן" in p or "קבעה את הדברים הבאים" in p: # Skip summary paragraphs (Nevo typically has 3-8 of these) for j in range(i + 1, min(i + 15, len(paragraphs))): if len(paragraphs[j]) > 80 and not paragraphs[j].strip().startswith("*"): # Check if this looks like real decision content return j break # Last resort: strip only the first 10 paragraphs of preamble return min(10, len(paragraphs) - 1) def find_decision_end(paragraphs: list[str]) -> int: """Find exclusive end index: first paragraph that is a Nevo postamble marker.""" for i, p in enumerate(paragraphs): for marker in NEVO_POSTAMBLE_MARKERS: if marker in p: return i return len(paragraphs) # ── DOCX proofreading ───────────────────────────────────────────── def _strip_inline_nevo_codes(paragraphs: list[str]) -> list[str]: """Remove Nevo inline watermark codes from paragraph prefixes; drop pure-code paras.""" out: list[str] = [] for p in paragraphs: stripped = NEVO_INLINE_CODE_RE.sub("", p).strip() if stripped: out.append(stripped) return out def proofread_docx(path: Path) -> tuple[str, dict]: """Extract clean decision text from Nevo DOCX. Returns (markdown, stats).""" doc = Document(str(path)) paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] start = find_decision_start(paragraphs) end = find_decision_end(paragraphs) clean = _strip_inline_nevo_codes(paragraphs[start:end]) md = "\n\n".join(clean) return md, { "total_paragraphs": len(paragraphs), "preamble_stripped": start, "postamble_stripped": len(paragraphs) - end, "clean_paragraphs": len(clean), } # ── PDF proofreading (Google Vision OCR) ────────────────────────── _vision_client: vision.ImageAnnotatorClient | None = None def _get_vision_client() -> vision.ImageAnnotatorClient: global _vision_client if _vision_client is None: api_key = os.environ.get("GOOGLE_CLOUD_VISION_API_KEY") if not api_key: raise RuntimeError("GOOGLE_CLOUD_VISION_API_KEY not set") _vision_client = vision.ImageAnnotatorClient( client_options={"api_key": api_key} ) return _vision_client # Hebrew abbreviation quote fixes — Google Vision renders ״ as 'יי' _HEBREW_ABBREV_FIXES: dict[str, str] = { "עוהייד": 'עוה"ד', "עוייד": 'עו"ד', "הנייל": 'הנ"ל', "מצייב": 'מצ"ב', "ביהמייש": 'ביהמ"ש', "תייז": 'ת"ז', "עייי": 'ע"י', "אחייכ": 'אח"כ', "סייק": 'ס"ק', "דייר": 'ד"ר', "חווייד": 'חוו"ד', "מייר": 'מ"ר', "יחייד": 'יח"ד', "בייכ": 'ב"כ', "בייה": 'ב"ה', "שייח": 'ש"ח', "יוייר": 'יו"ר', "בליימ": 'בל"מ', "תבייע": 'תב"ע', "תמייא": 'תמ"א', "סייה": 'ס"ה', "שייפ": 'ש"פ', "שצייפ": 'שצ"פ', "שבייצ": 'שב"צ', "עסיים": 'עס"ם', "הייה": 'ה"ה', "פסייד": 'פס"ד', "תיידא": 'תיד"א', "בגייץ": 'בג"ץ', "עתיים": 'עת"ם', "עעיים": 'עע"ם', # Hebrew calendar day prefixes (כ"א .. כ"ט etc.) "כייא": 'כ"א', "כייב": 'כ"ב', "כייג": 'כ"ג', "כייד": 'כ"ד', "כייה": 'כ"ה', "כייו": 'כ"ו', "כייז": 'כ"ז', "כייח": 'כ"ח', "כייט": 'כ"ט', "לייא": 'ל"א', "יייא": 'י"א', "יייב": 'י"ב', "יייג": 'י"ג', "יייד": 'י"ד', "טייו": 'ט"ו', "טייז": 'ט"ז', "יייז": 'י"ז', "יייח": 'י"ח', "יייט": 'י"ט', # Hebrew calendar years (תשפ"ה, תשפ"ד...) "תשפייא": 'תשפ"א', "תשפייב": 'תשפ"ב', "תשפייג": 'תשפ"ג', "תשפייד": 'תשפ"ד', "תשפייה": 'תשפ"ה', "תשפייו": 'תשפ"ו', "תשפיין": 'תשפ"ן', } _ABBREV_PATTERN = re.compile( "|".join(re.escape(k) for k in sorted(_HEBREW_ABBREV_FIXES, key=len, reverse=True)) ) def _fix_hebrew_quotes(text: str) -> str: return _ABBREV_PATTERN.sub(lambda m: _HEBREW_ABBREV_FIXES[m.group()], text) def _ocr_page_image(image_bytes: bytes, page_num: int) -> str: client = _get_vision_client() image = vision.Image(content=image_bytes) response = client.document_text_detection( image=image, image_context=vision.ImageContext(language_hints=["he"]), ) if response.error.message: raise RuntimeError(f"Vision error page {page_num}: {response.error.message}") text = response.full_text_annotation.text if response.full_text_annotation else "" return _fix_hebrew_quotes(text) _FOOTER_JUNK_RE = re.compile( r"^(" r"\s*|" # blank r"[-·*.\"\'׳״]+|" # stray punctuation r"\d{1,3}[\s\-·*.\"\'׳״]*|" # page number with any stray char r"עמוד[\s\d\-·*.\"\'׳״]*|" # "עמוד" / "עמוד N" w/ trailing noise r"[-·*\s\"\'׳״]*[a-zA-Z][a-zA-Z0-9 .\-·*_]{0,30}" # garbled latin (nevo URL variants) r")$" ) def _clean_page_text(text: str) -> str: """Strip Nevo page headers, footers and watermarks from a single page's OCR text. Nevo footer on each page looks like: עמוד N (or "N·", "N*") nevo.co.il (or OCR-garbled: "new coal", "neto coal", etc.) - (optional stray dash) Google Vision OCRs this block at the end of each page's text. """ # 1. Strip top header "עמוד X מתוך Y" anywhere text = PDF_PAGE_HEADER_RE.sub("\n", text) # 2. Walk back from end, dropping footer junk lines lines = text.split("\n") while lines and _FOOTER_JUNK_RE.match(lines[-1].strip()): lines.pop() text = "\n".join(lines) # 3. Final pass: strip any leftover Nevo URLs mid-text and orphan "עמוד X" lines text = NEVO_URL_RE.sub("", text) text = PDF_PAGE_NUM_LINE_RE.sub("", text) text = PDF_PAGE_ORPHAN_RE.sub("", text) return text.strip() def proofread_pdf(path: Path) -> tuple[str, dict]: """Extract clean decision text from Nevo PDF via Google Vision OCR.""" doc = fitz.open(str(path)) pages: list[str] = [] for i, page in enumerate(doc): pix = page.get_pixmap(dpi=300) img_bytes = pix.tobytes("png") text = _ocr_page_image(img_bytes, i + 1) pages.append(_clean_page_text(text)) # Small delay between API calls to be safe time.sleep(0.1) doc.close() body = "\n\n".join(p for p in pages if p) body = re.sub(r"\n{3,}", "\n\n", body) body = re.sub(r"[ \t]+\n", "\n", body) for marker in NEVO_POSTAMBLE_MARKERS: idx = body.find(marker) if idx != -1: body = body[:idx].rstrip() break return body, { "pages": len(pages), "chars": len(body), } # ── Orchestration ───────────────────────────────────────────────── SKIP_FILES = { "הכנת שאלות מחקר.docx", "סוכן_מנתח_ומחקר_משפטי_Paperclip_מדריך.docx", "README.md", } def output_filename(src: Path) -> str: """Build clean output filename preserving case identifier.""" stem = src.stem # Normalize: replace spaces with - where helpful, but keep Hebrew intact return f"{stem}.md" def main(argv: list[str]) -> int: OUTPUT_DIR.mkdir(exist_ok=True) RAW_DIR.mkdir(exist_ok=True) # Filter files only = argv[1:] if len(argv) > 1 else None files: list[Path] = [] for p in sorted(TRAINING_DIR.iterdir()): if p.is_dir() or p.name.startswith("."): continue if p.name in SKIP_FILES: continue if p.suffix.lower() not in (".docx", ".pdf"): continue if only and p.name not in only: continue files.append(p) print(f"Processing {len(files)} files...\n") for path in files: try: if path.suffix.lower() == ".docx": md, stats = proofread_docx(path) else: md, stats = proofread_pdf(path) out_path = OUTPUT_DIR / output_filename(path) out_path.write_text(md, encoding="utf-8") print(f"✓ {path.name}") print(f" → {out_path.name} ({len(md):,} chars) {stats}") except Exception as e: print(f"✗ {path.name}: {e}") return 0 if __name__ == "__main__": sys.exit(main(sys.argv))