Add training corpus UI with Nevo proofreading pipeline

- New proofreader service strips Nevo editorial additions (front matter,
  postamble, page headers, watermarks, inline codes) from DOCX/PDF/MD
- PDF pages use Google Vision OCR for clean Hebrew RTL extraction
- New training page at #/training with drag-and-drop upload, automatic
  metadata extraction (decision number, date, categories), reviewable
  preview, and style pattern report grouped by type
- API endpoints: /api/training/{analyze,upload,corpus,patterns,
  analyze-style,analyze-style/status}
- Fix claude_session.query to pipe prompt via stdin, avoiding ARG_MAX
  overflow when analyzing 900K+ char corpus
- CLI scripts for batch proofreading and corpus upload

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-11 11:04:58 +00:00
parent ecda95d610
commit 32f18de049
6 changed files with 1960 additions and 3 deletions

View File

@@ -0,0 +1,349 @@
"""Batch upload proofread training corpus to style DB.
Two-phase workflow:
--preview Extract metadata from all .md files, print review table, don't upload
--upload Actually upload all files (with optional --only FILE to run one)
Metadata extraction:
* decision_number: from filename (ARAR-YY-NNNN / ערר NNNN-YY) or decision date year
* decision_date: from "ניתנה ... <day> ב<Hebrew month> <YYYY>" near end of text
* categories: keyword heuristics on body text
"""
from __future__ import annotations
import argparse
import asyncio
import os
import re
import sys
from pathlib import Path
PROOFREAD_DIR = Path("/home/chaim/legal-ai/data/training/proofread")
# Manual metadata overrides for files where auto-extraction can't determine values.
METADATA_OVERRIDES: dict[str, dict] = {
"ARAR-25-1067 - יחיעם יפה ואח׳.md": {
"decision_date": "2025-11-27", # no "ניתנה" signature in file; user-provided
},
}
# Files to skip — already in style_corpus from legacy ingestion
# (verified by exact character-count match with existing DB rows).
SKIP_FILES = {
"תמא 38-בית הכרם-1126+1141-החלטה.md", # → corpus: 1126/1141
"היתר בניה-בית שמש-1180+1181-החלטה.md", # → corpus: 1180/1181
"היתר בניה-הראל-1043+1054-החלטה.md", # → corpus: 1043/1054
"היתר בניה-הראל-1071+1077-החלטה.md", # → corpus: 1071/1077
}
# Load env vars needed by mcp-server
ENV_FILE = Path.home() / ".env"
if ENV_FILE.exists():
for line in ENV_FILE.read_text().splitlines():
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip().strip('"').strip("'"))
# Make mcp-server package importable
sys.path.insert(0, "/home/chaim/legal-ai/mcp-server/src")
# ── Decision number extraction ───────────────────────────────────
FILENAME_NUMBER_PATTERNS = [
# ARAR-YY-NNNN[-X] - title.md
re.compile(r"^ARAR-(\d{2})-(\d{3,4})"),
# ערר NNNN-YY title.md or ערר NNNN-YY title
re.compile(r"^ערר\s+(\d{3,4})-(\d{2})"),
# ערר NNNN - title (no year in filename — needs date lookup)
re.compile(r"^ערר\s+(\d{3,4})\s*-"),
]
LEGACY_MULTI_PATTERN = re.compile(r"(\d{3,4})\+(\d{3,4})")
def decision_number_from_filename(stem: str) -> tuple[str | None, str | None]:
"""Return (number, year_short) or (multi_number, None) or (None, None).
year_short is YY (last 2 digits) if extractable from filename.
For legacy files with 'NNNN+NNNN' or no year, returns partial info
that must be completed from decision date.
"""
# ARAR-YY-NNNN
m = FILENAME_NUMBER_PATTERNS[0].match(stem)
if m:
year, num = m.group(1), m.group(2)
return f"{num}/{year}", year
# ערר NNNN-YY
m = FILENAME_NUMBER_PATTERNS[1].match(stem)
if m:
num, year = m.group(1), m.group(2)
return f"{num}/{year}", year
# ערר NNNN - title (no year)
m = FILENAME_NUMBER_PATTERNS[2].match(stem)
if m:
num = m.group(1)
return f"{num}/??", None
# Legacy: "NNNN+NNNN" merged decisions
m = LEGACY_MULTI_PATTERN.search(stem)
if m:
return f"{m.group(1)}+{m.group(2)}/??", None
return None, None
# ── Decision date extraction ─────────────────────────────────────
HEBREW_MONTHS = {
"ינואר": 1, "בינואר": 1,
"פברואר": 2, "בפברואר": 2,
"מרץ": 3, "מרס": 3, "במרץ": 3, "במרס": 3,
"אפריל": 4, "באפריל": 4,
"מאי": 5, "במאי": 5,
"יוני": 6, "ביוני": 6,
"יולי": 7, "ביולי": 7,
"אוגוסט": 8, "באוגוסט": 8,
"ספטמבר": 9, "בספטמבר": 9,
"אוקטובר": 10, "באוקטובר": 10,
"נובמבר": 11, "בנובמבר": 11,
"דצמבר": 12, "בדצמבר": 12,
}
# Matches "<day> ב<month>, <year>" or "<day> <month>, <year>" (with optional commas)
DATE_RE = re.compile(
r"(\d{1,2})\s+(ב?(?:ינואר|פברואר|מרץ|מרס|אפריל|מאי|יוני|יולי|אוגוסט|ספטמבר|אוקטובר|נובמבר|דצמבר))\s*[,.]?\s*(\d{4})"
)
NITNA_RE = re.compile(r"ניתנ[הו]?\s+(?:פה\s+אחד|בדעת\s+רוב|היום)?")
def decision_date_from_text(text: str) -> str | None:
"""Extract decision date in YYYY-MM-DD format from 'ניתנה... DATE' section.
Searches the last ~2000 chars where the signing block lives.
"""
tail = text[-2500:] if len(text) > 2500 else text
# Prefer dates near "ניתנה" marker
nitna_match = NITNA_RE.search(tail)
search_text = tail[nitna_match.start():] if nitna_match else tail
m = DATE_RE.search(search_text)
if not m:
# Fall back: search whole tail
m = DATE_RE.search(tail)
if not m:
return None
day = int(m.group(1))
month = HEBREW_MONTHS.get(m.group(2))
year = int(m.group(3))
if not month:
return None
try:
from datetime import date
return date(year, month, day).isoformat()
except ValueError:
return None
# ── Subject category extraction ──────────────────────────────────
# Categories as defined in the tool signature.
ALL_CATEGORIES = [
"בנייה", "שימוש חורג", "תכנית", "היתר", "הקלה",
"חלוקה", 'תמ"א 38', "היטל השבחה", "פיצויים 197",
]
def categorize(text: str) -> list[str]:
"""Heuristic category detection based on subject matter, not incidental mentions.
Strategy: the real subject is established in the opening 2000 chars
(first decision-opening paragraph). Secondary signal is repetition count
— casual mentions in law citations don't repeat.
"""
opening = text[:2000] # subject is stated up front
t = text
cats: list[str] = []
# תמ"א 38 — very specific marker, single mention is fine
if re.search(r'תמ[״"\']?א\s*38|תמא\s*38', t):
cats.append('תמ"א 38')
# היטל השבחה — require real engagement: must appear in opening OR 3+ times
hsbacha_count = len(re.findall(r"היטל(?:י)?\s+השבחה", t))
if hsbacha_count >= 3 or re.search(r"היטל(?:י)?\s+השבחה", opening):
cats.append("היטל השבחה")
# פיצויים 197 — require multiple mentions OR in opening
p197_re = r"פיצויים\s+לפי\s+(?:ס(?:עיף|')\s*)?197|סעיף\s*197|ס['\"]?\s*197"
p197_count = len(re.findall(p197_re, t))
if p197_count >= 2 or re.search(p197_re, opening):
cats.append("פיצויים 197")
# שימוש חורג — must appear in opening OR 3+ times (avoids law-quote false positives)
shimush_count = t.count("שימוש חורג")
if shimush_count >= 3 or "שימוש חורג" in opening:
cats.append("שימוש חורג")
# הקלה — real subject if 3+ mentions AND appears in opening
hakala_count = len(re.findall(r"\bהקלה\b|\bהקלות\b", t))
if hakala_count >= 3 and re.search(r"\bהקלה\b|\bהקלות\b", opening):
cats.append("הקלה")
# חלוקה — "איחוד וחלוקה" or "חלוקה חדשה" (specific phrases)
if re.search(r"איחוד\s+וחלוקה|חלוקה\s+חדשה|תכנית\s+לחלוקה", t):
cats.append("חלוקה")
# תכנית — plan-level appeal (primary subject). Allow ה/ב/ל prefixes on תכנית.
tochnit_opening = bool(re.search(
r"הפקדת\s+ה?תכנית|"
r"אישור\s+ה?תכנית|"
r"המלצה\s+להפקיד|"
r"להפקיד\s+את\s+ה?תכנית|"
r"לדון\s+בתכנית|"
r"דנה\s+בתכנית|"
r"החלטה\s+לאשר\s+ה?תכנית",
opening,
))
if tochnit_opening:
cats.append("תכנית")
# היתר — "בקשה להיתר" or "היתר בניה" as subject in opening
if re.search(r"בקשה\s+להיתר|היתר\s+בני(?:י)?ה", opening):
cats.append("היתר")
# בנייה — default/fallback for building-permit cases
# (not for plan-level תכנית-only cases)
has_permit_subject = "היתר" in cats or "הקלה" in cats or 'תמ"א 38' in cats
if has_permit_subject and "בנייה" not in cats:
cats.append("בנייה")
# If nothing matched, default to בנייה
return cats or ["בנייה"]
# ── Year fallback from date ──────────────────────────────────────
def finalize_decision_number(number: str | None, date_iso: str | None) -> str:
"""If filename number is missing year, fill it from decision date."""
if not number:
if date_iso:
# Extract last 2 digits of Hebrew year via Gregorian year
return f"??/{date_iso[2:4]}"
return ""
if number.endswith("/??"):
if date_iso:
yy = date_iso[2:4]
return number.replace("/??", f"/{yy}")
return number.replace("/??", "")
return number
# ── Main metadata extraction ─────────────────────────────────────
def extract_metadata(path: Path) -> dict:
text = path.read_text(encoding="utf-8")
num_from_name, _ = decision_number_from_filename(path.stem)
date_iso = decision_date_from_text(text)
decision_number = finalize_decision_number(num_from_name, date_iso)
cats = categorize(text)
meta = {
"file": path.name,
"decision_number": decision_number,
"decision_date": date_iso or "??",
"categories": cats,
"chars": len(text),
}
# Apply manual overrides
if path.name in METADATA_OVERRIDES:
meta.update(METADATA_OVERRIDES[path.name])
return meta
def print_preview(results: list[dict]) -> None:
"""Print review table of metadata for all files."""
print(f"\n{'#':<3} {'FILE':<55} {'NUMBER':<15} {'DATE':<12} {'CATEGORIES'}")
print("-" * 130)
for i, r in enumerate(results, 1):
file_short = r["file"] if len(r["file"]) <= 53 else r["file"][:50] + "..."
cats = ", ".join(r["categories"])
print(f"{i:<3} {file_short:<55} {r['decision_number']:<15} {r['decision_date']:<12} {cats}")
print()
# Highlight issues
issues = [r for r in results if r["decision_date"] == "??" or not r["decision_number"] or "??" in r["decision_number"]]
if issues:
print(f"⚠️ {len(issues)} files with incomplete metadata:")
for r in issues:
print(f" - {r['file']} → number={r['decision_number']!r} date={r['decision_date']!r}")
# ── Upload ───────────────────────────────────────────────────────
async def upload_one(meta: dict) -> dict:
from legal_mcp.tools.documents import document_upload_training
path = PROOFREAD_DIR / meta["file"]
result = await document_upload_training(
file_path=str(path),
decision_number=meta["decision_number"],
decision_date=meta["decision_date"] if meta["decision_date"] != "??" else "",
subject_categories=meta["categories"],
title=path.stem,
)
return {"file": meta["file"], "result": result}
async def upload_all(results: list[dict]) -> None:
for i, meta in enumerate(results, 1):
try:
r = await upload_one(meta)
print(f"[{i}/{len(results)}] ✓ {meta['file']}")
print(f" {r['result'][:200]}")
except Exception as e:
print(f"[{i}/{len(results)}] ✗ {meta['file']}: {e}")
# ── CLI ──────────────────────────────────────────────────────────
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--preview", action="store_true", help="Show metadata table without uploading")
ap.add_argument("--upload", action="store_true", help="Upload all files to style corpus")
ap.add_argument("--only", help="Only process this specific filename")
args = ap.parse_args()
files = sorted(PROOFREAD_DIR.glob("*.md"))
files = [f for f in files if f.name not in SKIP_FILES]
if args.only:
files = [f for f in files if f.name == args.only]
if not files:
print(f"File not found: {args.only}")
return 1
results = [extract_metadata(f) for f in files]
if args.preview or not args.upload:
print_preview(results)
if not args.upload:
return 0
if args.upload:
print(f"\n>>> Uploading {len(results)} files to style corpus...\n")
asyncio.run(upload_all(results))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,382 @@
"""Proofread training corpus: strip Nevo additions from DOCX/PDF, output clean Markdown.
Nevo DOCX additions:
Front: ספרות / חקיקה שאוזכרה / מיני-רציו / topic tags / Nevo summary paragraphs
Back: 5129371512937154678313 / "בעניין עריכה ושינויים" link / "54678313-..." / "נוסח מסמך זה כפוף"
Nevo PDF additions:
"עמוד X מתוך Y" header on every page
PDF text extraction uses Google Cloud Vision OCR — PyMuPDF fragments Hebrew RTL
text unusably (words split mid-word, reading order broken). OCR gives clean output.
"""
from __future__ import annotations
import io
import os
import re
import sys
import time
from pathlib import Path
import fitz
from docx import Document
# Load GOOGLE_CLOUD_VISION_API_KEY from ~/.env if not already set
if not os.environ.get("GOOGLE_CLOUD_VISION_API_KEY"):
env_path = Path.home() / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
if line.startswith("GOOGLE_CLOUD_VISION_API_KEY="):
os.environ["GOOGLE_CLOUD_VISION_API_KEY"] = line.split("=", 1)[1].strip().strip('"').strip("'")
break
from google.cloud import vision # noqa: E402
TRAINING_DIR = Path("/home/chaim/legal-ai/data/training")
OUTPUT_DIR = TRAINING_DIR / "proofread"
RAW_DIR = TRAINING_DIR / "raw"
# ── Nevo pattern detection ────────────────────────────────────────
NEVO_PREAMBLE_HEADERS = (
"ספרות:",
"חקיקה שאוזכרה:",
"מיני-רציו:",
)
# Strong decision-opening patterns — highly distinctive first words of real decision
# body. These rarely appear inside Nevo's own summary block, so first match wins.
DECISION_OPENING = re.compile(
r"^(עניינו\s|ענייננו\s|עסקינן\s|בפנינו\s|לפנינו\s|בערר\s+שלפנינו|זהו\s+ערר)"
)
# Section headers that definitively mark decision body start.
DECISION_SECTION_HEADERS = {
"רקע",
"פתח דבר",
"תמצית טענות הצדדים",
"העובדות",
"הרקע העובדתי",
"מבוא",
}
# Nevo postamble markers — everything from first match onwards is stripped.
NEVO_POSTAMBLE_MARKERS = (
"5129371512937154678313",
"בעניין עריכה ושינויים במסמכי פסיקה",
"נוסח מסמך זה כפוף לשינויי ניסוח ועריכה",
)
# Nevo inline watermark codes — appear as prefixes embedded in real paragraphs
# (e.g. "5129371ניתנה פה אחד" or "054678313האם ההיתר..."). These must be
# stripped from paragraph content, not used as postamble boundaries.
NEVO_INLINE_CODE_RE = re.compile(r"^0?(5129371|54678313)\d*")
# Nevo PDF page header: "עמוד X מתוך Y" or "עמוד X בן Y" (Hebrew variants)
PDF_PAGE_HEADER_RE = re.compile(
r"\s*עמוד\s*\n?\s*\d+\s*\n?\s*(?:מתוך|בן)\s*\n?\s*\d+\s*"
)
# Short orphan lines starting with "עמוד" — OCR artifacts from merged footer text
# (e.g. "עמודירבי", "עמוד :", "עמודי", "עמוד ר"). Conservative: up to 12 chars.
PDF_PAGE_ORPHAN_RE = re.compile(r"(?m)^עמוד[^\n]{0,12}$")
# "עמוד" followed by number (with optional garbled Nevo URL line after)
PDF_PAGE_BLOCK_RE = re.compile(
r"(?m)^\s*עמוד\s*\n\s*\d+[·.]?\s*\n[^\n]*\n", re.UNICODE
)
# Standalone "עמוד N" at line start
PDF_PAGE_NUM_LINE_RE = re.compile(r"(?m)^\s*עמוד\s*\n?\s*\d+[·.]?\s*$")
# Nevo watermark URL (and common OCR-garbled variants)
NEVO_URL_RE = re.compile(
r"(nevo\.co\.il|neto\.co\.il|netocoal|neetocoal|nevocoal|nevo\.co|rawo\.co\.il)",
re.IGNORECASE,
)
def find_decision_start(paragraphs: list[str]) -> int:
"""Find index of first real decision paragraph, skipping Nevo preamble.
Strategy:
1. If no Nevo headers present → start at 0.
2. Otherwise, scan past Nevo headers; look for first paragraph matching
DECISION_OPENING regex or DECISION_SECTION_HEADERS.
3. Fallback: first paragraph after "ועדת הערר ... קבעה כלהלן:" bullet block
that doesn't look like summary (heuristic: longer, has proper sentence).
"""
has_nevo_preamble = any(
any(p.startswith(h) for h in NEVO_PREAMBLE_HEADERS) for p in paragraphs[:10]
)
if not has_nevo_preamble:
return 0
# Scan for strong decision-opening markers
for i, p in enumerate(paragraphs):
stripped = p.strip()
if stripped in DECISION_SECTION_HEADERS:
return i
if DECISION_OPENING.match(stripped):
return i
# Fallback: find "ועדת הערר ... קבעה כלהלן" and take first long para after bullets
for i, p in enumerate(paragraphs):
if "קבעה כלהלן" in p or "קבעה את הדברים הבאים" in p:
# Skip summary paragraphs (Nevo typically has 3-8 of these)
for j in range(i + 1, min(i + 15, len(paragraphs))):
if len(paragraphs[j]) > 80 and not paragraphs[j].strip().startswith("*"):
# Check if this looks like real decision content
return j
break
# Last resort: strip only the first 10 paragraphs of preamble
return min(10, len(paragraphs) - 1)
def find_decision_end(paragraphs: list[str]) -> int:
"""Find exclusive end index: first paragraph that is a Nevo postamble marker."""
for i, p in enumerate(paragraphs):
for marker in NEVO_POSTAMBLE_MARKERS:
if marker in p:
return i
return len(paragraphs)
# ── DOCX proofreading ─────────────────────────────────────────────
def _strip_inline_nevo_codes(paragraphs: list[str]) -> list[str]:
"""Remove Nevo inline watermark codes from paragraph prefixes; drop pure-code paras."""
out: list[str] = []
for p in paragraphs:
stripped = NEVO_INLINE_CODE_RE.sub("", p).strip()
if stripped:
out.append(stripped)
return out
def proofread_docx(path: Path) -> tuple[str, dict]:
"""Extract clean decision text from Nevo DOCX. Returns (markdown, stats)."""
doc = Document(str(path))
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
start = find_decision_start(paragraphs)
end = find_decision_end(paragraphs)
clean = _strip_inline_nevo_codes(paragraphs[start:end])
md = "\n\n".join(clean)
return md, {
"total_paragraphs": len(paragraphs),
"preamble_stripped": start,
"postamble_stripped": len(paragraphs) - end,
"clean_paragraphs": len(clean),
}
# ── PDF proofreading (Google Vision OCR) ──────────────────────────
_vision_client: vision.ImageAnnotatorClient | None = None
def _get_vision_client() -> vision.ImageAnnotatorClient:
global _vision_client
if _vision_client is None:
api_key = os.environ.get("GOOGLE_CLOUD_VISION_API_KEY")
if not api_key:
raise RuntimeError("GOOGLE_CLOUD_VISION_API_KEY not set")
_vision_client = vision.ImageAnnotatorClient(
client_options={"api_key": api_key}
)
return _vision_client
# Hebrew abbreviation quote fixes — Google Vision renders ״ as 'יי'
_HEBREW_ABBREV_FIXES: dict[str, str] = {
"עוהייד": 'עוה"ד',
"עוייד": 'עו"ד',
"הנייל": 'הנ"ל',
"מצייב": 'מצ"ב',
"ביהמייש": 'ביהמ"ש',
"תייז": 'ת"ז',
"עייי": 'ע"י',
"אחייכ": 'אח"כ',
"סייק": 'ס"ק',
"דייר": 'ד"ר',
"חווייד": 'חוו"ד',
"מייר": 'מ"ר',
"יחייד": 'יח"ד',
"בייכ": 'ב"כ',
"בייה": 'ב"ה',
"שייח": 'ש"ח',
"יוייר": 'יו"ר',
"בליימ": 'בל"מ',
"תבייע": 'תב"ע',
"תמייא": 'תמ"א',
"סייה": 'ס"ה',
"שייפ": 'ש"פ',
"שצייפ": 'שצ"פ',
"שבייצ": 'שב"צ',
"עסיים": 'עס"ם',
"הייה": 'ה"ה',
"פסייד": 'פס"ד',
"תיידא": 'תיד"א',
"בגייץ": 'בג"ץ',
"עתיים": 'עת"ם',
"עעיים": 'עע"ם',
# Hebrew calendar day prefixes (כ"א .. כ"ט etc.)
"כייא": 'כ"א', "כייב": 'כ"ב', "כייג": 'כ"ג', "כייד": 'כ"ד',
"כייה": 'כ"ה', "כייו": 'כ"ו', "כייז": 'כ"ז', "כייח": 'כ"ח', "כייט": 'כ"ט',
"לייא": 'ל"א',
"יייא": 'י"א', "יייב": 'י"ב', "יייג": 'י"ג', "יייד": 'י"ד',
"טייו": 'ט"ו', "טייז": 'ט"ז', "יייז": 'י"ז', "יייח": 'י"ח', "יייט": 'י"ט',
# Hebrew calendar years (תשפ"ה, תשפ"ד...)
"תשפייא": 'תשפ"א', "תשפייב": 'תשפ"ב', "תשפייג": 'תשפ"ג',
"תשפייד": 'תשפ"ד', "תשפייה": 'תשפ"ה', "תשפייו": 'תשפ"ו',
"תשפיין": 'תשפ"ן',
}
_ABBREV_PATTERN = re.compile(
"|".join(re.escape(k) for k in sorted(_HEBREW_ABBREV_FIXES, key=len, reverse=True))
)
def _fix_hebrew_quotes(text: str) -> str:
return _ABBREV_PATTERN.sub(lambda m: _HEBREW_ABBREV_FIXES[m.group()], text)
def _ocr_page_image(image_bytes: bytes, page_num: int) -> str:
client = _get_vision_client()
image = vision.Image(content=image_bytes)
response = client.document_text_detection(
image=image,
image_context=vision.ImageContext(language_hints=["he"]),
)
if response.error.message:
raise RuntimeError(f"Vision error page {page_num}: {response.error.message}")
text = response.full_text_annotation.text if response.full_text_annotation else ""
return _fix_hebrew_quotes(text)
_FOOTER_JUNK_RE = re.compile(
r"^("
r"\s*|" # blank
r"[-·*.\"\'׳״]+|" # stray punctuation
r"\d{1,3}[\s\-·*.\"\'׳״]*|" # page number with any stray char
r"עמוד[\s\d\-·*.\"\'׳״]*|" # "עמוד" / "עמוד N" w/ trailing noise
r"[-·*\s\"\'׳״]*[a-zA-Z][a-zA-Z0-9 .\-·*_]{0,30}" # garbled latin (nevo URL variants)
r")$"
)
def _clean_page_text(text: str) -> str:
"""Strip Nevo page headers, footers and watermarks from a single page's OCR text.
Nevo footer on each page looks like:
עמוד
N (or "", "N*")
nevo.co.il (or OCR-garbled: "new coal", "neto coal", etc.)
- (optional stray dash)
Google Vision OCRs this block at the end of each page's text.
"""
# 1. Strip top header "עמוד X מתוך Y" anywhere
text = PDF_PAGE_HEADER_RE.sub("\n", text)
# 2. Walk back from end, dropping footer junk lines
lines = text.split("\n")
while lines and _FOOTER_JUNK_RE.match(lines[-1].strip()):
lines.pop()
text = "\n".join(lines)
# 3. Final pass: strip any leftover Nevo URLs mid-text and orphan "עמוד X" lines
text = NEVO_URL_RE.sub("", text)
text = PDF_PAGE_NUM_LINE_RE.sub("", text)
text = PDF_PAGE_ORPHAN_RE.sub("", text)
return text.strip()
def proofread_pdf(path: Path) -> tuple[str, dict]:
"""Extract clean decision text from Nevo PDF via Google Vision OCR."""
doc = fitz.open(str(path))
pages: list[str] = []
for i, page in enumerate(doc):
pix = page.get_pixmap(dpi=300)
img_bytes = pix.tobytes("png")
text = _ocr_page_image(img_bytes, i + 1)
pages.append(_clean_page_text(text))
# Small delay between API calls to be safe
time.sleep(0.1)
doc.close()
body = "\n\n".join(p for p in pages if p)
body = re.sub(r"\n{3,}", "\n\n", body)
body = re.sub(r"[ \t]+\n", "\n", body)
for marker in NEVO_POSTAMBLE_MARKERS:
idx = body.find(marker)
if idx != -1:
body = body[:idx].rstrip()
break
return body, {
"pages": len(pages),
"chars": len(body),
}
# ── Orchestration ─────────────────────────────────────────────────
SKIP_FILES = {
"הכנת שאלות מחקר.docx",
"סוכן_מנתח_ומחקר_משפטי_Paperclip_מדריך.docx",
"README.md",
}
def output_filename(src: Path) -> str:
"""Build clean output filename preserving case identifier."""
stem = src.stem
# Normalize: replace spaces with - where helpful, but keep Hebrew intact
return f"{stem}.md"
def main(argv: list[str]) -> int:
OUTPUT_DIR.mkdir(exist_ok=True)
RAW_DIR.mkdir(exist_ok=True)
# Filter files
only = argv[1:] if len(argv) > 1 else None
files: list[Path] = []
for p in sorted(TRAINING_DIR.iterdir()):
if p.is_dir() or p.name.startswith("."):
continue
if p.name in SKIP_FILES:
continue
if p.suffix.lower() not in (".docx", ".pdf"):
continue
if only and p.name not in only:
continue
files.append(p)
print(f"Processing {len(files)} files...\n")
for path in files:
try:
if path.suffix.lower() == ".docx":
md, stats = proofread_docx(path)
else:
md, stats = proofread_pdf(path)
out_path = OUTPUT_DIR / output_filename(path)
out_path.write_text(md, encoding="utf-8")
print(f"{path.name}")
print(f"{out_path.name} ({len(md):,} chars) {stats}")
except Exception as e:
print(f"{path.name}: {e}")
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))