Files
legal-ai/mcp-server/src/legal_mcp/services/extractor.py
Chaim 476c2fc5d1 feat(upload): accept legacy .doc, convert via LibreOffice in container
Legacy Hebrew .doc precedents (e.g. nevo.co.il CP1255 OLE2) can now be
uploaded directly through the precedent-library, missing-precedent, and
training upload paths — the frontend already advertised .doc but the
backend gate rejected it before reaching the extractor.

- web/app.py: add .doc to ALLOWED_EXTENSIONS (covers all paths that share
  the set: precedent library, missing-precedent, training).
- Dockerfile: install libreoffice-writer-nogui (no X11/Java) so the
  extractor's existing _extract_doc LibreOffice conversion works in the
  Coolify container (was missing → would fail at runtime).
- extractor.py: isolate the LibreOffice user profile per call to avoid a
  profile-lock failure on concurrent .doc conversions.

Verified in python:3.12-slim (prod base): .doc→.docx→text yields text
byte-identical to a native Word .docx save (103 paragraphs, 24,341 chars).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 13:47:47 +00:00

381 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Text extraction from PDF, DOCX, DOC, and RTF files.
Primary PDF extraction: PyMuPDF direct text (for born-digital PDFs).
Fallback: Google Cloud Vision OCR (for scanned documents).
DOC files: converted to DOCX via LibreOffice before extraction.
Post-processing: Hebrew abbreviation quote fixer.
"""
from __future__ import annotations
import asyncio
import io
import logging
import re
import subprocess
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
import fitz # PyMuPDF
from PIL import Image
from docx import Document as DocxDocument
from striprtf.striprtf import rtf_to_text
from legal_mcp import config
if TYPE_CHECKING:
from google.cloud import vision
logger = logging.getLogger(__name__)
# ── Google Cloud Vision client (imported lazily — saves ~550ms at MCP startup) ──
_vision_client: "vision.ImageAnnotatorClient | None" = None
def _get_vision_client() -> "vision.ImageAnnotatorClient":
global _vision_client
if _vision_client is None:
from google.cloud import vision
_vision_client = vision.ImageAnnotatorClient(
client_options={"api_key": config.GOOGLE_CLOUD_VISION_API_KEY}
)
return _vision_client
# ── Hebrew text quality detection ────────────────────────────────
_HEBREW_RE = re.compile(r'[\u0590-\u05FF]')
_WORD_RE = re.compile(r'\S+')
def _text_quality_ok(text: str) -> bool:
"""Check if extracted text is real content vs broken OCR layer.
Returns True if text appears to be genuine Hebrew legal content.
Broken OCR layers from scanned PDFs often have:
- Very short words / single-character fragments
- Each word on its own line (high words-per-line ratio)
- Non-Hebrew characters mixed in
"""
words = _WORD_RE.findall(text)
if len(words) < 10:
return False
# Average word length — real Hebrew words avg 4-6 chars.
avg_len = sum(len(w) for w in words) / len(words)
if avg_len < 2.5:
return False
# Percentage of single-character "words"
single_char_pct = sum(1 for w in words if len(w) == 1) / len(words)
if single_char_pct > 0.4:
return False
# Words per line — broken OCR puts each word on its own line.
# Real text has 5-15 words per line; broken OCR has ~1-2.
lines = [l for l in text.split("\n") if l.strip()]
if lines:
words_per_line = len(words) / len(lines)
if words_per_line < 3.0:
return False
# Hebrew character ratio among letter characters
letters = re.findall(r'[a-zA-Z\u0590-\u05FF]', text)
if letters:
hebrew_pct = sum(1 for c in letters if _HEBREW_RE.match(c)) / len(letters)
if hebrew_pct < 0.5:
return False
return True
# ── Hebrew abbreviation quote fixer ──────────────────────────────
_HEBREW_ABBREV_FIXES: dict[str, str] = {
'עוהייד': 'עוה"ד',
'עוייד': 'עו"ד',
'הנייל': 'הנ"ל',
'מצייב': 'מצ"ב',
'ביהמייש': 'ביהמ"ש',
'תייז': 'ת"ז',
'עייי': 'ע"י',
'אחייכ': 'אח"כ',
'סייק': 'ס"ק',
'דייר': 'ד"ר',
'כדוייח': 'כדו"ח',
'חווייד': 'חוו"ד',
'מייר': 'מ"ר',
'יחייד': 'יח"ד',
'בייכ': 'ב"כ',
# Patterns where double-yod (יי) substitutes for gershayim (״) in born-digital PDFs
'בליימ': 'בל"מ', # בקשה להארכת מועד — appears in RTL legal docs
'תמייא': 'תמ"א', # תכנית מתאר ארצית
}
_ABBREV_PATTERN = re.compile(
'|'.join(re.escape(k) for k in sorted(_HEBREW_ABBREV_FIXES, key=len, reverse=True))
)
# Matches Hebrew law year abbreviations where gershayim was encoded as double-yod.
# e.g. תשכייה → תשכ"ה, תשנייב → תשנ"ב
_HEBREW_YEAR_RE = re.compile(r'(תש[א-ת]+)יי([א-ת])')
def _fix_hebrew_quotes(text: str) -> str:
"""Fix known Hebrew abbreviation quote replacements.
Applied to both Google Vision OCR output and direct PyMuPDF extraction —
some born-digital PDFs encode gershayim (״) as double-yod (יי), producing
the same corruption patterns as OCR.
"""
text = _ABBREV_PATTERN.sub(lambda m: _HEBREW_ABBREV_FIXES[m.group()], text)
text = _HEBREW_YEAR_RE.sub(r'\1"\2', text)
return text
# ── Extraction ───────────────────────────────────────────────────
# Separator used when joining per-page text. Constant so chunker /
# retrofit can reproduce the join when computing page offsets.
PAGE_SEPARATOR = "\n\n"
async def extract_text(file_path: str) -> tuple[str, int, list[int] | None]:
"""Extract text from a document file.
Returns:
``(text, page_count, page_offsets)`` where:
- ``text``: concatenated extracted text
- ``page_count``: number of pages (0 for non-PDF)
- ``page_offsets``: ``page_offsets[i]`` = char start offset of
page (i+1) inside ``text``. ``None`` for non-PDFs (where the
notion of pages doesn't apply). Used by the chunker to assign
a ``page_number`` to each chunk.
"""
path = Path(file_path)
suffix = path.suffix.lower()
if suffix == ".pdf":
return await _extract_pdf(path)
elif suffix == ".docx":
return _extract_docx(path), 0, None
elif suffix == ".doc":
return _extract_doc(path), 0, None
elif suffix == ".rtf":
return _extract_rtf(path), 0, None
elif suffix in (".txt", ".md"):
return path.read_text(encoding="utf-8"), 0, None
else:
raise ValueError(f"Unsupported file type: {suffix}")
def _join_pages(pages_text: list[str]) -> tuple[str, list[int]]:
"""Join per-page text with PAGE_SEPARATOR while recording the start
offset of each page in the joined output."""
offsets: list[int] = []
parts: list[str] = []
cursor = 0
for i, pg in enumerate(pages_text):
offsets.append(cursor)
parts.append(pg)
cursor += len(pg)
if i < len(pages_text) - 1:
parts.append(PAGE_SEPARATOR)
cursor += len(PAGE_SEPARATOR)
return "".join(parts), offsets
async def _extract_pdf(path: Path) -> tuple[str, int, list[int]]:
"""Extract text from PDF.
Try direct text first, fall back to Google Cloud Vision for scanned
or broken-OCR pages.
"""
doc = fitz.open(str(path))
page_count = len(doc)
pages_text: list[str] = []
for page_num in range(page_count):
page = doc[page_num]
text = page.get_text().strip()
if len(text) > 50 and _text_quality_ok(text):
pages_text.append(_fix_hebrew_quotes(text))
logger.debug("Page %d: direct extraction (%d chars, quality OK)", page_num + 1, len(text))
else:
reason = "insufficient text" if len(text) <= 50 else "low quality OCR layer"
logger.info("Page %d: Google Vision OCR (%s)", page_num + 1, reason)
pix = page.get_pixmap(dpi=300)
img_bytes = pix.tobytes("png")
ocr_text = await asyncio.to_thread(
_ocr_with_google_vision, img_bytes, page_num + 1
)
pages_text.append(ocr_text)
doc.close()
joined, offsets = _join_pages(pages_text)
return joined, page_count, offsets
def page_at_offset(offset: int, page_offsets: list[int]) -> int:
"""Look up the page number containing a given char offset.
page_offsets[i] is the start of page (i+1) in the joined text;
a chunk starting at ``offset`` belongs to the highest-indexed page
whose start is ``<= offset``. Returns 1-based page number.
"""
if not page_offsets:
return 1
# Linear scan is fine — page_offsets is short (≤ ~200 for our PDFs).
page = 1
for i, start in enumerate(page_offsets):
if start <= offset:
page = i + 1
else:
break
return page
def _ocr_with_google_vision(image_bytes: bytes, page_num: int) -> str:
"""OCR a single page image using Google Cloud Vision API."""
from google.cloud import vision # lazy: keeps MCP startup fast
client = _get_vision_client()
image = vision.Image(content=image_bytes)
response = client.document_text_detection(
image=image,
image_context=vision.ImageContext(language_hints=["he"]),
)
if response.error.message:
raise RuntimeError(
f"Google Vision error on page {page_num}: {response.error.message}"
)
text = response.full_text_annotation.text if response.full_text_annotation else ""
return _fix_hebrew_quotes(text)
def _extract_doc(path: Path) -> str:
"""Extract text from legacy .doc file by converting to .docx via LibreOffice."""
with tempfile.TemporaryDirectory() as tmp_dir:
# Isolate the LibreOffice user profile per call: headless soffice
# locks a single shared profile, so concurrent .doc conversions would
# otherwise fail with a profile-lock error.
result = subprocess.run(
[
"libreoffice",
f"-env:UserInstallation=file://{tmp_dir}/lo-profile",
"--headless", "--convert-to", "docx", str(path), "--outdir", tmp_dir,
],
capture_output=True, text=True, timeout=120,
)
if result.returncode != 0:
raise RuntimeError(f"LibreOffice conversion failed: {result.stderr}")
docx_path = Path(tmp_dir) / f"{path.stem}.docx"
if not docx_path.exists():
raise FileNotFoundError(f"Converted file not found: {docx_path}")
return _extract_docx(docx_path)
def _extract_docx(path: Path) -> str:
"""Extract text from DOCX file."""
doc = DocxDocument(str(path))
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
return "\n\n".join(paragraphs)
def _extract_rtf(path: Path) -> str:
"""Extract text from RTF file."""
rtf_content = path.read_text(encoding="utf-8", errors="replace")
return rtf_to_text(rtf_content)
# ── Multimodal page rendering (V9) ───────────────────────────────
def _pixmap_to_pil(pix: fitz.Pixmap) -> Image.Image:
"""Convert a PyMuPDF pixmap to PIL.Image (RGB) without going through
PNG bytes. Faster than tobytes('png') → Image.open()."""
if pix.alpha:
# Drop alpha channel — voyage multimodal expects RGB.
pix = fitz.Pixmap(pix, 0)
return Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
def render_pages_for_multimodal(
pdf_path: str | Path,
embed_dpi: int,
thumb_dpi: int | None = None,
thumbnail_dir: Path | None = None,
) -> list[tuple[Image.Image, Path | None]]:
"""Render each PDF page as PIL.Image at ``embed_dpi`` for the
multimodal embedder, and optionally save a smaller JPEG thumbnail
at ``thumb_dpi`` to ``thumbnail_dir`` for UI preview.
Returns ``[(pil_image, thumb_path_or_None), ...]`` in page order.
The full-DPI image stays in memory only — only the thumbnail is
persisted to disk.
"""
src = Path(pdf_path)
if not src.is_file():
raise FileNotFoundError(f"PDF not found: {src}")
if thumbnail_dir is not None:
thumbnail_dir.mkdir(parents=True, exist_ok=True)
out: list[tuple[Image.Image, Path | None]] = []
doc = fitz.open(str(src))
try:
for page_idx, page in enumerate(doc):
page_num = page_idx + 1
pix = page.get_pixmap(dpi=embed_dpi)
img = _pixmap_to_pil(pix)
thumb_path: Path | None = None
if thumbnail_dir is not None and thumb_dpi:
thumb_path = thumbnail_dir / f"p{page_num:03d}.jpg"
# Downsample the same render rather than re-rendering
# with PyMuPDF — far faster.
ratio = thumb_dpi / embed_dpi
thumb_size = (
max(1, int(img.width * ratio)),
max(1, int(img.height * ratio)),
)
thumb = img.resize(thumb_size, Image.Resampling.LANCZOS)
thumb.save(thumb_path, "JPEG", quality=75, optimize=True)
out.append((img, thumb_path))
finally:
doc.close()
return out
# ── Nevo preamble stripping ──────────────────────────────────────
_NEVO_MARKERS = ("ספרות:", "חקיקה שאוזכרה:", "מיני-רציו:", "פסקי דין שאוזכרו:",
"כתבי עת:", "הועתק מנבו")
_DECISION_START = re.compile(
r"^(בפנינו|לפנינו|הערר שבנדון|ועדת הערר לתכנון|רקע עובדתי|עסקינן)",
re.MULTILINE,
)
def strip_nevo_preamble(text: str) -> str:
"""Remove Nevo database preamble (bibliography, legislation, mini-ratio) from decision text.
Returns the original text unchanged if no preamble is detected.
"""
head = text[:400]
if not any(marker in head for marker in _NEVO_MARKERS):
return text
m = _DECISION_START.search(text)
if m and m.start() > 50:
stripped = text[m.start():]
logger.debug("Stripped %d chars of Nevo preamble", m.start())
return stripped
return text