Files
legal-ai/mcp-server/src/legal_mcp/services/extractor.py
Chaim fb51a0e869 feat(nevo): backfill leaked preamble + ratio gold-set benchmark (#86)
#86.2 backfill + #86.3 benchmark, plus a #86.1 over-strip fix found en route.

extractor.py
- extract_nevo_ratio(): capture Nevo's מיני-רציו block (editorial holdings
  summary) before it is stripped — a free professional gold-set (#86.3).
- _DECISION_START hardening (#86.2): the merged #86.1 regex over-stripped.
  (a) פסק-דין headers are markdown-wrapped (**פסק  דין**); the old anchor
      required the keyword as the first line char with one separator, so it
      missed the header and matched a citation 32K deep (עמ"נ 50567-07-21,
      losing 45% of the body). Now tolerates leading markdown + 0-3 seps,
      and the final-nun form (דין ן vs דינו נ).
  (b) bare השופט/הנשיא matched CITATIONS ("השופט מ' חשין, פסקה 23"). The
      authoring-judge line ends with a colon; we now require it.

ingest.py
- capture the ratio before stripping and store it on the row (best-effort,
  non-fatal); also strip the text-upload path (was file-only).

db.py
- add case_law.nevo_ratio column (additive); allow it in update_case_law.

scripts/backfill_nevo_preamble.py (#86.2) — dry-run-by-default data migration:
finds historically-leaked rulings, captures ratio→nevo_ratio, rewrites
full_text (+content_hash), reindexes, and FLAGS (never deletes) halachot whose
quote lives in the removed preamble (review_status=pending_review +
nevo_preamble_leak flag). Safety guard: rows with keep%<--min-keep (60) are
excluded from --apply as suspected over-strip. --apply writes backup+manifest
to data/audit/ first. Chair-gated — NOT applied here.

scripts/nevo_ratio_benchmark.py (#86.3) — LLM-as-judge (local claude_session,
zero cost) measures recall/precision/granularity of our halachot vs the Nevo
ratio. Works pre- and post-backfill (reads nevo_ratio, falls back to full_text).

Verified:
- pytest tests/test_nevo_preamble.py — 12 passed (incl. citation/markdown
  over-strip regressions).
- backfill dry-run: 19 leaked rulings, 27 contaminated halachot, all ≥75%
  keep (the 32K over-strip is gone).
- benchmark on בג"ץ 1764/05: recall=0.875 precision=1.0 granularity=1.75x.

Invariants: G1 (normalize at source — strip/capture at ingest, not at read);
no silent swallow (contaminated halachot flagged + reported, not dropped);
data-migration is dry-run-default with backup+manifest, chair-gated.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 19:45:43 +00:00

441 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Text extraction from PDF, DOCX, DOC, and RTF files.
Primary PDF extraction: PyMuPDF direct text (for born-digital PDFs).
Fallback: Google Cloud Vision OCR (for scanned documents).
DOC files: converted to DOCX via LibreOffice before extraction.
Post-processing: Hebrew abbreviation quote fixer.
"""
from __future__ import annotations
import asyncio
import io
import logging
import re
import subprocess
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
import fitz # PyMuPDF
from PIL import Image
from docx import Document as DocxDocument
from striprtf.striprtf import rtf_to_text
from legal_mcp import config
if TYPE_CHECKING:
from google.cloud import vision
logger = logging.getLogger(__name__)
# ── Google Cloud Vision client (imported lazily — saves ~550ms at MCP startup) ──
_vision_client: "vision.ImageAnnotatorClient | None" = None
def _get_vision_client() -> "vision.ImageAnnotatorClient":
global _vision_client
if _vision_client is None:
from google.cloud import vision
_vision_client = vision.ImageAnnotatorClient(
client_options={"api_key": config.GOOGLE_CLOUD_VISION_API_KEY}
)
return _vision_client
# ── Hebrew text quality detection ────────────────────────────────
_HEBREW_RE = re.compile(r'[\u0590-\u05FF]')
_WORD_RE = re.compile(r'\S+')
def _text_quality_ok(text: str) -> bool:
"""Check if extracted text is real content vs broken OCR layer.
Returns True if text appears to be genuine Hebrew legal content.
Broken OCR layers from scanned PDFs often have:
- Very short words / single-character fragments
- Each word on its own line (high words-per-line ratio)
- Non-Hebrew characters mixed in
"""
words = _WORD_RE.findall(text)
if len(words) < 10:
return False
# Average word length — real Hebrew words avg 4-6 chars.
avg_len = sum(len(w) for w in words) / len(words)
if avg_len < 2.5:
return False
# Percentage of single-character "words"
single_char_pct = sum(1 for w in words if len(w) == 1) / len(words)
if single_char_pct > 0.4:
return False
# Words per line — broken OCR puts each word on its own line.
# Real text has 5-15 words per line; broken OCR has ~1-2.
lines = [l for l in text.split("\n") if l.strip()]
if lines:
words_per_line = len(words) / len(lines)
if words_per_line < 3.0:
return False
# Hebrew character ratio among letter characters
letters = re.findall(r'[a-zA-Z\u0590-\u05FF]', text)
if letters:
hebrew_pct = sum(1 for c in letters if _HEBREW_RE.match(c)) / len(letters)
if hebrew_pct < 0.5:
return False
return True
# ── Hebrew abbreviation quote fixer ──────────────────────────────
_HEBREW_ABBREV_FIXES: dict[str, str] = {
'עוהייד': 'עוה"ד',
'עוייד': 'עו"ד',
'הנייל': 'הנ"ל',
'מצייב': 'מצ"ב',
'ביהמייש': 'ביהמ"ש',
'תייז': 'ת"ז',
'עייי': 'ע"י',
'אחייכ': 'אח"כ',
'סייק': 'ס"ק',
'דייר': 'ד"ר',
'כדוייח': 'כדו"ח',
'חווייד': 'חוו"ד',
'מייר': 'מ"ר',
'יחייד': 'יח"ד',
'בייכ': 'ב"כ',
# Patterns where double-yod (יי) substitutes for gershayim (״) in born-digital PDFs
'בליימ': 'בל"מ', # בקשה להארכת מועד — appears in RTL legal docs
'תמייא': 'תמ"א', # תכנית מתאר ארצית
}
_ABBREV_PATTERN = re.compile(
'|'.join(re.escape(k) for k in sorted(_HEBREW_ABBREV_FIXES, key=len, reverse=True))
)
# Matches Hebrew law year abbreviations where gershayim was encoded as double-yod.
# e.g. תשכייה → תשכ"ה, תשנייב → תשנ"ב
_HEBREW_YEAR_RE = re.compile(r'(תש[א-ת]+)יי([א-ת])')
def _fix_hebrew_quotes(text: str) -> str:
"""Fix known Hebrew abbreviation quote replacements.
Applied to both Google Vision OCR output and direct PyMuPDF extraction —
some born-digital PDFs encode gershayim (״) as double-yod (יי), producing
the same corruption patterns as OCR.
"""
text = _ABBREV_PATTERN.sub(lambda m: _HEBREW_ABBREV_FIXES[m.group()], text)
text = _HEBREW_YEAR_RE.sub(r'\1"\2', text)
return text
# ── Extraction ───────────────────────────────────────────────────
# Separator used when joining per-page text. Constant so chunker /
# retrofit can reproduce the join when computing page offsets.
PAGE_SEPARATOR = "\n\n"
async def extract_text(file_path: str) -> tuple[str, int, list[int] | None]:
"""Extract text from a document file.
Returns:
``(text, page_count, page_offsets)`` where:
- ``text``: concatenated extracted text
- ``page_count``: number of pages (0 for non-PDF)
- ``page_offsets``: ``page_offsets[i]`` = char start offset of
page (i+1) inside ``text``. ``None`` for non-PDFs (where the
notion of pages doesn't apply). Used by the chunker to assign
a ``page_number`` to each chunk.
"""
path = Path(file_path)
suffix = path.suffix.lower()
if suffix == ".pdf":
return await _extract_pdf(path)
elif suffix == ".docx":
return _extract_docx(path), 0, None
elif suffix == ".doc":
return _extract_doc(path), 0, None
elif suffix == ".rtf":
return _extract_rtf(path), 0, None
elif suffix in (".txt", ".md"):
return path.read_text(encoding="utf-8"), 0, None
else:
raise ValueError(f"Unsupported file type: {suffix}")
def _join_pages(pages_text: list[str]) -> tuple[str, list[int]]:
"""Join per-page text with PAGE_SEPARATOR while recording the start
offset of each page in the joined output."""
offsets: list[int] = []
parts: list[str] = []
cursor = 0
for i, pg in enumerate(pages_text):
offsets.append(cursor)
parts.append(pg)
cursor += len(pg)
if i < len(pages_text) - 1:
parts.append(PAGE_SEPARATOR)
cursor += len(PAGE_SEPARATOR)
return "".join(parts), offsets
async def _extract_pdf(path: Path) -> tuple[str, int, list[int]]:
"""Extract text from PDF.
Try direct text first, fall back to Google Cloud Vision for scanned
or broken-OCR pages.
"""
doc = fitz.open(str(path))
page_count = len(doc)
pages_text: list[str] = []
for page_num in range(page_count):
page = doc[page_num]
text = page.get_text().strip()
if len(text) > 50 and _text_quality_ok(text):
pages_text.append(_fix_hebrew_quotes(text))
logger.debug("Page %d: direct extraction (%d chars, quality OK)", page_num + 1, len(text))
else:
reason = "insufficient text" if len(text) <= 50 else "low quality OCR layer"
logger.info("Page %d: Google Vision OCR (%s)", page_num + 1, reason)
pix = page.get_pixmap(dpi=300)
img_bytes = pix.tobytes("png")
ocr_text = await asyncio.to_thread(
_ocr_with_google_vision, img_bytes, page_num + 1
)
pages_text.append(ocr_text)
doc.close()
joined, offsets = _join_pages(pages_text)
return joined, page_count, offsets
def page_at_offset(offset: int, page_offsets: list[int]) -> int:
"""Look up the page number containing a given char offset.
page_offsets[i] is the start of page (i+1) in the joined text;
a chunk starting at ``offset`` belongs to the highest-indexed page
whose start is ``<= offset``. Returns 1-based page number.
"""
if not page_offsets:
return 1
# Linear scan is fine — page_offsets is short (≤ ~200 for our PDFs).
page = 1
for i, start in enumerate(page_offsets):
if start <= offset:
page = i + 1
else:
break
return page
def _ocr_with_google_vision(image_bytes: bytes, page_num: int) -> str:
"""OCR a single page image using Google Cloud Vision API."""
from google.cloud import vision # lazy: keeps MCP startup fast
client = _get_vision_client()
image = vision.Image(content=image_bytes)
response = client.document_text_detection(
image=image,
image_context=vision.ImageContext(language_hints=["he"]),
)
if response.error.message:
raise RuntimeError(
f"Google Vision error on page {page_num}: {response.error.message}"
)
text = response.full_text_annotation.text if response.full_text_annotation else ""
return _fix_hebrew_quotes(text)
def _extract_doc(path: Path) -> str:
"""Extract text from legacy .doc file by converting to .docx via LibreOffice."""
with tempfile.TemporaryDirectory() as tmp_dir:
# Isolate the LibreOffice user profile per call: headless soffice
# locks a single shared profile, so concurrent .doc conversions would
# otherwise fail with a profile-lock error.
result = subprocess.run(
[
"libreoffice",
f"-env:UserInstallation=file://{tmp_dir}/lo-profile",
"--headless", "--convert-to", "docx", str(path), "--outdir", tmp_dir,
],
capture_output=True, text=True, timeout=120,
)
if result.returncode != 0:
raise RuntimeError(f"LibreOffice conversion failed: {result.stderr}")
docx_path = Path(tmp_dir) / f"{path.stem}.docx"
if not docx_path.exists():
raise FileNotFoundError(f"Converted file not found: {docx_path}")
return _extract_docx(docx_path)
def _extract_docx(path: Path) -> str:
"""Extract text from DOCX file."""
doc = DocxDocument(str(path))
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
return "\n\n".join(paragraphs)
def _extract_rtf(path: Path) -> str:
"""Extract text from RTF file."""
rtf_content = path.read_text(encoding="utf-8", errors="replace")
return rtf_to_text(rtf_content)
# ── Multimodal page rendering (V9) ───────────────────────────────
def _pixmap_to_pil(pix: fitz.Pixmap) -> Image.Image:
"""Convert a PyMuPDF pixmap to PIL.Image (RGB) without going through
PNG bytes. Faster than tobytes('png') → Image.open()."""
if pix.alpha:
# Drop alpha channel — voyage multimodal expects RGB.
pix = fitz.Pixmap(pix, 0)
return Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
def render_pages_for_multimodal(
pdf_path: str | Path,
embed_dpi: int,
thumb_dpi: int | None = None,
thumbnail_dir: Path | None = None,
) -> list[tuple[Image.Image, Path | None]]:
"""Render each PDF page as PIL.Image at ``embed_dpi`` for the
multimodal embedder, and optionally save a smaller JPEG thumbnail
at ``thumb_dpi`` to ``thumbnail_dir`` for UI preview.
Returns ``[(pil_image, thumb_path_or_None), ...]`` in page order.
The full-DPI image stays in memory only — only the thumbnail is
persisted to disk.
"""
src = Path(pdf_path)
if not src.is_file():
raise FileNotFoundError(f"PDF not found: {src}")
if thumbnail_dir is not None:
thumbnail_dir.mkdir(parents=True, exist_ok=True)
out: list[tuple[Image.Image, Path | None]] = []
doc = fitz.open(str(src))
try:
for page_idx, page in enumerate(doc):
page_num = page_idx + 1
pix = page.get_pixmap(dpi=embed_dpi)
img = _pixmap_to_pil(pix)
thumb_path: Path | None = None
if thumbnail_dir is not None and thumb_dpi:
thumb_path = thumbnail_dir / f"p{page_num:03d}.jpg"
# Downsample the same render rather than re-rendering
# with PyMuPDF — far faster.
ratio = thumb_dpi / embed_dpi
thumb_size = (
max(1, int(img.width * ratio)),
max(1, int(img.height * ratio)),
)
thumb = img.resize(thumb_size, Image.Resampling.LANCZOS)
thumb.save(thumb_path, "JPEG", quality=75, optimize=True)
out.append((img, thumb_path))
finally:
doc.close()
return out
# ── Nevo preamble stripping ──────────────────────────────────────
_NEVO_MARKERS = ("ספרות:", "חקיקה שאוזכרה:", "מיני-רציו:", "פסקי דין שאוזכרו:",
"כתבי עת:", "הועתק מנבו")
# Markers for where the actual decision body begins (everything before is Nevo
# preamble: bibliography + מיני-רציו). Two families:
# - ועדת ערר / district openings (בפנינו / הערר שבנדון / ...)
# - COURT-RULING openings (#86.1): a פסק-דין header or the authoring judge's
# line. Without these, Nevo court judgments — exactly the ones carrying a
# מיני-רציו — slipped through unstripped (e.g. בג"ץ 1764/05).
#
# #86.2 hardening — two over-strip bugs found while backfilling:
# 1. ``פסק-דין`` headers are often markdown-wrapped (``**פסק דין**``); the old
# ``^פסק[- ]דין`` required the keyword to be the very first char of the line
# and allowed only one separator, so it missed the header and fell through
# to a citation 32K deep (עמ"נ 50567-07-21). We now tolerate leading
# markdown/whitespace and 0-3 separators.
# 2. Bare ``השופט``/``הנשיא`` matched *citations* ("השופט מ' חשין, פסקה 23"),
# stripping real decision body. The authoring-judge line ends with a COLON
# ("השופט י' עמית:"); citations use a comma. We now require the colon.
_DECISION_START = re.compile(
r"^[ \t>*_#]{0,6}(?:"
r"בפנינו|לפנינו|לפניי|הערר שבנדון|ועדת הערר לתכנון|רקע עובדתי|עסקינן|"
r"פסק[ \t\-]{0,3}די(?:ן|נו)|" # פסק-דין / פסק דין / **פסק דין** header (final-nun ן vs דינו)
r"(?:כב(?:וד)?['׳\"]?\s*)?(?:ה?שופט[ת]?|ה?נשיא[ה]?|המשנה לנשיא)\s+[^\n,]{1,40}:" # author line → colon
r")",
re.MULTILINE,
)
def strip_nevo_preamble(text: str) -> str:
"""Remove Nevo database preamble (bibliography, legislation, mini-ratio) from decision text.
Returns the original text unchanged if no preamble is detected.
"""
# Window wide enough to catch the Nevo markers even when a long court/parties
# header precedes them (court rulings push חקיקה שאוזכרה:/מיני-רציו: down).
head = text[:1500]
if not any(marker in head for marker in _NEVO_MARKERS):
return text
m = _DECISION_START.search(text)
if m and m.start() > 50:
stripped = text[m.start():]
logger.debug("Stripped %d chars of Nevo preamble", m.start())
return stripped
return text
_RATIO_MARKER = "מיני-רציו:"
def extract_nevo_ratio(text: str) -> str:
"""Return the Nevo מיני-רציו block (editorial holdings summary), or ''.
The mini-ratio is Nevo's own headnote — a concise, professionally-written
list of the holdings. We capture it *before* :func:`strip_nevo_preamble`
discards it, to serve as a free gold-set for benchmarking how well our
halacha extractor covers the real holdings (#86.3).
The block runs from the ``מיני-רציו:`` marker to whichever comes first:
the decision body (``_DECISION_START``) or the next preamble marker
(bibliography / legislation). Returns '' when there is no mini-ratio.
"""
if not text:
return ""
start = text.find(_RATIO_MARKER)
if start == -1:
return ""
body = text[start + len(_RATIO_MARKER):]
# End at the earliest of: decision body start, or a following preamble
# marker (ספרות: / חקיקה שאוזכרה: / ...). Both are measured relative to
# the ratio body so we never run past it into the judgment itself.
end = len(body)
dm = _DECISION_START.search(body)
if dm:
end = min(end, dm.start())
for marker in _NEVO_MARKERS:
if marker == _RATIO_MARKER:
continue
pos = body.find(marker)
if pos != -1:
end = min(end, pos)
return body[:end].strip()