All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m26s
When Paperclip wakes the CEO and the model issues an mcp__legal-ai__*
call within ~10s of session init, Claude Code sometimes returns
"No such tool available" because the legal-ai MCP server hasn't
finished bringing up its tool catalog yet. Observed twice today on
CMPA precedent-extraction wakeups (sessions 9989fbaf and a9c61801);
the agent fell back to bash + .venv/bin/python and finished the work,
but the race needed fixing on the server side.
Three changes that close the window:
1. Lazy schema init (services/db.py + server.py)
`init_schema()` was awaited inside the FastMCP lifespan, blocking
the `initialize`/`tools/list` handshake until ~10 CREATE TABLE IF
NOT EXISTS statements ran. Under contention (two CEOs waking at
once for different companies) this stretched. Now the lifespan
returns immediately and `get_pool()` runs the schema migrations
exactly once on first DB access, guarded by an asyncio.Lock.
tools/list is answered in milliseconds regardless of DB state.
2. Lazy heavy imports
- services/embeddings.py: voyageai (~450ms) loaded only inside
_get_client()
- services/extractor.py: google.cloud.vision (~550ms) loaded only
inside _get_vision_client() and _ocr_with_google_vision()
These two were being imported at module top from
legal_mcp.tools.documents -> services.processor -> services.{
extractor,embeddings}, so the FastMCP server couldn't even start
responding until both finished. Cold start dropped from 2.7s to
1.17s end-to-end (init + tools/list response).
3. Agent-side warmup + retry guidance (.claude/agents/legal-ceo.md)
Even with a fast server, the model can still race on the very
first call. The precedent-extraction section now tells the CEO
to call workflow_status as a warmup probe and to retry after a
short sleep if it sees "No such tool available", before falling
back to the python bypass.
Also expanded the precedent-tool whitelists on the sub-agents that
delegate halacha/library work (commits 4a9a6b7 + 7ee90dc added the
tools to the MCP server but only the CEO got them in its allowed
list). Added to: legal-researcher (full extraction set), legal-analyst
(library_get/list + halacha review), legal-writer (library lookups +
halacha_review), legal-qa (library_get + halacha_review), and the two
that the CEO was already missing (halacha_review, halachot_pending).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
360 lines
12 KiB
Python
360 lines
12 KiB
Python
"""Text extraction from PDF, DOCX, DOC, and RTF files.
|
||
|
||
Primary PDF extraction: PyMuPDF direct text (for born-digital PDFs).
|
||
Fallback: Google Cloud Vision OCR (for scanned documents).
|
||
DOC files: converted to DOCX via LibreOffice before extraction.
|
||
Post-processing: Hebrew abbreviation quote fixer.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import io
|
||
import logging
|
||
import re
|
||
import subprocess
|
||
import tempfile
|
||
from pathlib import Path
|
||
from typing import TYPE_CHECKING
|
||
|
||
import fitz # PyMuPDF
|
||
from PIL import Image
|
||
from docx import Document as DocxDocument
|
||
from striprtf.striprtf import rtf_to_text
|
||
|
||
from legal_mcp import config
|
||
|
||
if TYPE_CHECKING:
|
||
from google.cloud import vision
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ── Google Cloud Vision client (imported lazily — saves ~550ms at MCP startup) ──
|
||
|
||
_vision_client: "vision.ImageAnnotatorClient | None" = None
|
||
|
||
|
||
def _get_vision_client() -> "vision.ImageAnnotatorClient":
|
||
global _vision_client
|
||
if _vision_client is None:
|
||
from google.cloud import vision
|
||
_vision_client = vision.ImageAnnotatorClient(
|
||
client_options={"api_key": config.GOOGLE_CLOUD_VISION_API_KEY}
|
||
)
|
||
return _vision_client
|
||
|
||
|
||
# ── Hebrew text quality detection ────────────────────────────────
|
||
|
||
_HEBREW_RE = re.compile(r'[\u0590-\u05FF]')
|
||
_WORD_RE = re.compile(r'\S+')
|
||
|
||
|
||
def _text_quality_ok(text: str) -> bool:
|
||
"""Check if extracted text is real content vs broken OCR layer.
|
||
|
||
Returns True if text appears to be genuine Hebrew legal content.
|
||
Broken OCR layers from scanned PDFs often have:
|
||
- Very short words / single-character fragments
|
||
- Each word on its own line (high words-per-line ratio)
|
||
- Non-Hebrew characters mixed in
|
||
"""
|
||
words = _WORD_RE.findall(text)
|
||
if len(words) < 10:
|
||
return False
|
||
|
||
# Average word length — real Hebrew words avg 4-6 chars.
|
||
avg_len = sum(len(w) for w in words) / len(words)
|
||
if avg_len < 2.5:
|
||
return False
|
||
|
||
# Percentage of single-character "words"
|
||
single_char_pct = sum(1 for w in words if len(w) == 1) / len(words)
|
||
if single_char_pct > 0.4:
|
||
return False
|
||
|
||
# Words per line — broken OCR puts each word on its own line.
|
||
# Real text has 5-15 words per line; broken OCR has ~1-2.
|
||
lines = [l for l in text.split("\n") if l.strip()]
|
||
if lines:
|
||
words_per_line = len(words) / len(lines)
|
||
if words_per_line < 3.0:
|
||
return False
|
||
|
||
# Hebrew character ratio among letter characters
|
||
letters = re.findall(r'[a-zA-Z\u0590-\u05FF]', text)
|
||
if letters:
|
||
hebrew_pct = sum(1 for c in letters if _HEBREW_RE.match(c)) / len(letters)
|
||
if hebrew_pct < 0.5:
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
# ── Hebrew abbreviation quote fixer ──────────────────────────────
|
||
|
||
_HEBREW_ABBREV_FIXES: dict[str, str] = {
|
||
'עוהייד': 'עוה"ד',
|
||
'עוייד': 'עו"ד',
|
||
'הנייל': 'הנ"ל',
|
||
'מצייב': 'מצ"ב',
|
||
'ביהמייש': 'ביהמ"ש',
|
||
'תייז': 'ת"ז',
|
||
'עייי': 'ע"י',
|
||
'אחייכ': 'אח"כ',
|
||
'סייק': 'ס"ק',
|
||
'דייר': 'ד"ר',
|
||
'כדוייח': 'כדו"ח',
|
||
'חווייד': 'חוו"ד',
|
||
'מייר': 'מ"ר',
|
||
'יחייד': 'יח"ד',
|
||
'בייכ': 'ב"כ',
|
||
}
|
||
|
||
_ABBREV_PATTERN = re.compile(
|
||
'|'.join(re.escape(k) for k in sorted(_HEBREW_ABBREV_FIXES, key=len, reverse=True))
|
||
)
|
||
|
||
|
||
def _fix_hebrew_quotes(text: str) -> str:
|
||
"""Fix known Hebrew abbreviation quote replacements from Google Vision OCR."""
|
||
return _ABBREV_PATTERN.sub(lambda m: _HEBREW_ABBREV_FIXES[m.group()], text)
|
||
|
||
|
||
# ── Extraction ───────────────────────────────────────────────────
|
||
|
||
|
||
# Separator used when joining per-page text. Constant so chunker /
|
||
# retrofit can reproduce the join when computing page offsets.
|
||
PAGE_SEPARATOR = "\n\n"
|
||
|
||
|
||
async def extract_text(file_path: str) -> tuple[str, int, list[int] | None]:
|
||
"""Extract text from a document file.
|
||
|
||
Returns:
|
||
``(text, page_count, page_offsets)`` where:
|
||
- ``text``: concatenated extracted text
|
||
- ``page_count``: number of pages (0 for non-PDF)
|
||
- ``page_offsets``: ``page_offsets[i]`` = char start offset of
|
||
page (i+1) inside ``text``. ``None`` for non-PDFs (where the
|
||
notion of pages doesn't apply). Used by the chunker to assign
|
||
a ``page_number`` to each chunk.
|
||
"""
|
||
path = Path(file_path)
|
||
suffix = path.suffix.lower()
|
||
|
||
if suffix == ".pdf":
|
||
return await _extract_pdf(path)
|
||
elif suffix == ".docx":
|
||
return _extract_docx(path), 0, None
|
||
elif suffix == ".doc":
|
||
return _extract_doc(path), 0, None
|
||
elif suffix == ".rtf":
|
||
return _extract_rtf(path), 0, None
|
||
elif suffix in (".txt", ".md"):
|
||
return path.read_text(encoding="utf-8"), 0, None
|
||
else:
|
||
raise ValueError(f"Unsupported file type: {suffix}")
|
||
|
||
|
||
def _join_pages(pages_text: list[str]) -> tuple[str, list[int]]:
|
||
"""Join per-page text with PAGE_SEPARATOR while recording the start
|
||
offset of each page in the joined output."""
|
||
offsets: list[int] = []
|
||
parts: list[str] = []
|
||
cursor = 0
|
||
for i, pg in enumerate(pages_text):
|
||
offsets.append(cursor)
|
||
parts.append(pg)
|
||
cursor += len(pg)
|
||
if i < len(pages_text) - 1:
|
||
parts.append(PAGE_SEPARATOR)
|
||
cursor += len(PAGE_SEPARATOR)
|
||
return "".join(parts), offsets
|
||
|
||
|
||
async def _extract_pdf(path: Path) -> tuple[str, int, list[int]]:
|
||
"""Extract text from PDF.
|
||
|
||
Try direct text first, fall back to Google Cloud Vision for scanned
|
||
or broken-OCR pages.
|
||
"""
|
||
doc = fitz.open(str(path))
|
||
page_count = len(doc)
|
||
pages_text: list[str] = []
|
||
|
||
for page_num in range(page_count):
|
||
page = doc[page_num]
|
||
text = page.get_text().strip()
|
||
|
||
if len(text) > 50 and _text_quality_ok(text):
|
||
pages_text.append(text)
|
||
logger.debug("Page %d: direct extraction (%d chars, quality OK)", page_num + 1, len(text))
|
||
else:
|
||
reason = "insufficient text" if len(text) <= 50 else "low quality OCR layer"
|
||
logger.info("Page %d: Google Vision OCR (%s)", page_num + 1, reason)
|
||
pix = page.get_pixmap(dpi=300)
|
||
img_bytes = pix.tobytes("png")
|
||
ocr_text = await asyncio.to_thread(
|
||
_ocr_with_google_vision, img_bytes, page_num + 1
|
||
)
|
||
pages_text.append(ocr_text)
|
||
|
||
doc.close()
|
||
joined, offsets = _join_pages(pages_text)
|
||
return joined, page_count, offsets
|
||
|
||
|
||
def page_at_offset(offset: int, page_offsets: list[int]) -> int:
|
||
"""Look up the page number containing a given char offset.
|
||
|
||
page_offsets[i] is the start of page (i+1) in the joined text;
|
||
a chunk starting at ``offset`` belongs to the highest-indexed page
|
||
whose start is ``<= offset``. Returns 1-based page number.
|
||
"""
|
||
if not page_offsets:
|
||
return 1
|
||
# Linear scan is fine — page_offsets is short (≤ ~200 for our PDFs).
|
||
page = 1
|
||
for i, start in enumerate(page_offsets):
|
||
if start <= offset:
|
||
page = i + 1
|
||
else:
|
||
break
|
||
return page
|
||
|
||
|
||
def _ocr_with_google_vision(image_bytes: bytes, page_num: int) -> str:
|
||
"""OCR a single page image using Google Cloud Vision API."""
|
||
from google.cloud import vision # lazy: keeps MCP startup fast
|
||
client = _get_vision_client()
|
||
image = vision.Image(content=image_bytes)
|
||
|
||
response = client.document_text_detection(
|
||
image=image,
|
||
image_context=vision.ImageContext(language_hints=["he"]),
|
||
)
|
||
|
||
if response.error.message:
|
||
raise RuntimeError(
|
||
f"Google Vision error on page {page_num}: {response.error.message}"
|
||
)
|
||
|
||
text = response.full_text_annotation.text if response.full_text_annotation else ""
|
||
return _fix_hebrew_quotes(text)
|
||
|
||
|
||
def _extract_doc(path: Path) -> str:
|
||
"""Extract text from legacy .doc file by converting to .docx via LibreOffice."""
|
||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||
result = subprocess.run(
|
||
["libreoffice", "--headless", "--convert-to", "docx", str(path), "--outdir", tmp_dir],
|
||
capture_output=True, text=True, timeout=120,
|
||
)
|
||
if result.returncode != 0:
|
||
raise RuntimeError(f"LibreOffice conversion failed: {result.stderr}")
|
||
docx_path = Path(tmp_dir) / f"{path.stem}.docx"
|
||
if not docx_path.exists():
|
||
raise FileNotFoundError(f"Converted file not found: {docx_path}")
|
||
return _extract_docx(docx_path)
|
||
|
||
|
||
def _extract_docx(path: Path) -> str:
|
||
"""Extract text from DOCX file."""
|
||
doc = DocxDocument(str(path))
|
||
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
|
||
return "\n\n".join(paragraphs)
|
||
|
||
|
||
def _extract_rtf(path: Path) -> str:
|
||
"""Extract text from RTF file."""
|
||
rtf_content = path.read_text(encoding="utf-8", errors="replace")
|
||
return rtf_to_text(rtf_content)
|
||
|
||
|
||
# ── Multimodal page rendering (V9) ───────────────────────────────
|
||
|
||
|
||
def _pixmap_to_pil(pix: fitz.Pixmap) -> Image.Image:
|
||
"""Convert a PyMuPDF pixmap to PIL.Image (RGB) without going through
|
||
PNG bytes. Faster than tobytes('png') → Image.open()."""
|
||
if pix.alpha:
|
||
# Drop alpha channel — voyage multimodal expects RGB.
|
||
pix = fitz.Pixmap(pix, 0)
|
||
return Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
|
||
|
||
|
||
def render_pages_for_multimodal(
|
||
pdf_path: str | Path,
|
||
embed_dpi: int,
|
||
thumb_dpi: int | None = None,
|
||
thumbnail_dir: Path | None = None,
|
||
) -> list[tuple[Image.Image, Path | None]]:
|
||
"""Render each PDF page as PIL.Image at ``embed_dpi`` for the
|
||
multimodal embedder, and optionally save a smaller JPEG thumbnail
|
||
at ``thumb_dpi`` to ``thumbnail_dir`` for UI preview.
|
||
|
||
Returns ``[(pil_image, thumb_path_or_None), ...]`` in page order.
|
||
The full-DPI image stays in memory only — only the thumbnail is
|
||
persisted to disk.
|
||
"""
|
||
src = Path(pdf_path)
|
||
if not src.is_file():
|
||
raise FileNotFoundError(f"PDF not found: {src}")
|
||
if thumbnail_dir is not None:
|
||
thumbnail_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
out: list[tuple[Image.Image, Path | None]] = []
|
||
doc = fitz.open(str(src))
|
||
try:
|
||
for page_idx, page in enumerate(doc):
|
||
page_num = page_idx + 1
|
||
pix = page.get_pixmap(dpi=embed_dpi)
|
||
img = _pixmap_to_pil(pix)
|
||
|
||
thumb_path: Path | None = None
|
||
if thumbnail_dir is not None and thumb_dpi:
|
||
thumb_path = thumbnail_dir / f"p{page_num:03d}.jpg"
|
||
# Downsample the same render rather than re-rendering
|
||
# with PyMuPDF — far faster.
|
||
ratio = thumb_dpi / embed_dpi
|
||
thumb_size = (
|
||
max(1, int(img.width * ratio)),
|
||
max(1, int(img.height * ratio)),
|
||
)
|
||
thumb = img.resize(thumb_size, Image.Resampling.LANCZOS)
|
||
thumb.save(thumb_path, "JPEG", quality=75, optimize=True)
|
||
|
||
out.append((img, thumb_path))
|
||
finally:
|
||
doc.close()
|
||
return out
|
||
|
||
|
||
# ── Nevo preamble stripping ──────────────────────────────────────
|
||
|
||
_NEVO_MARKERS = ("ספרות:", "חקיקה שאוזכרה:", "מיני-רציו:", "פסקי דין שאוזכרו:",
|
||
"כתבי עת:", "הועתק מנבו")
|
||
|
||
_DECISION_START = re.compile(
|
||
r"^(בפנינו|לפנינו|הערר שבנדון|ועדת הערר לתכנון|רקע עובדתי|עסקינן)",
|
||
re.MULTILINE,
|
||
)
|
||
|
||
|
||
def strip_nevo_preamble(text: str) -> str:
|
||
"""Remove Nevo database preamble (bibliography, legislation, mini-ratio) from decision text.
|
||
|
||
Returns the original text unchanged if no preamble is detected.
|
||
"""
|
||
head = text[:400]
|
||
if not any(marker in head for marker in _NEVO_MARKERS):
|
||
return text
|
||
m = _DECISION_START.search(text)
|
||
if m and m.start() > 50:
|
||
stripped = text[m.start():]
|
||
logger.debug("Stripped %d chars of Nevo preamble", m.start())
|
||
return stripped
|
||
return text
|