Files
legal-ai/mcp-server/src/legal_mcp/services/processor.py
Chaim 81ccf3a888
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 6m33s
feat(retrieval): track page_number on text chunks for multimodal hybrid boost
The legacy chunker did not track which PDF page each chunk came from.
Stored chunks had page_number=NULL, which blocked the multimodal
hybrid retriever's text+image boost — it joins (chunk, image) on
(document_id, page_number) and the join could never fire.

This change:

- extractor.extract_text now returns (text, page_count, page_offsets);
  page_offsets[i] is the start char offset of page (i+1) in the joined
  text. None for non-PDFs.
- chunker.chunk_document accepts an optional page_offsets and tags
  each chunk with the page that contains its first character (uses
  the existing chunker logic; pages assigned post-hoc by content
  search to keep the diff minimal).
- processor.process_document and precedent_library.ingest_precedent
  forward page_offsets through the chunker. New uploads now carry
  accurate page_number on every chunk.
- Other extract_text callers (tools/documents, tools/workflow,
  web/app.py) updated to unpack the third element (ignored).
- scripts/backfill_chunk_pages.py: per-case retrofit. Re-extracts each
  PDF (re-OCRs via Google Vision if needed, ~$0.0015/page), computes
  page_offsets, and updates page_number on every chunk by content
  search. Idempotent; --force re-runs on already-tagged docs.

Forward-only would leave the 419 image embeddings backfilled on
cases 8174-24 + 8137-24 unable to boost their corresponding text
chunks. The retrofit script closes that gap (cost ~$0.60).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 19:49:41 +00:00

204 lines
7.9 KiB
Python

"""Document processing pipeline: extract → chunk → embed → store."""
from __future__ import annotations
import asyncio
import logging
from pathlib import Path
from uuid import UUID
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor, references_extractor
logger = logging.getLogger(__name__)
async def process_document(document_id: UUID, case_id: UUID) -> dict:
"""Full processing pipeline for a document.
1. Extract text from file
2. Split into chunks
3. Generate embeddings
4. Store chunks + embeddings in DB
Returns processing summary.
"""
doc = await db.get_document(document_id)
if not doc:
raise ValueError(f"Document {document_id} not found")
await db.update_document(document_id, extraction_status="processing")
try:
# Step 1: Extract text
logger.info("Extracting text from %s", doc["file_path"])
text, page_count, page_offsets = await extractor.extract_text(doc["file_path"])
await db.update_document(
document_id,
extracted_text=text,
page_count=page_count,
)
# Save extracted text to documents/extracted/ directory
original_path = Path(doc["file_path"])
extracted_dir = original_path.parent.parent / "extracted"
extracted_dir.mkdir(parents=True, exist_ok=True)
txt_path = extracted_dir / (original_path.stem + ".txt")
try:
txt_path.write_text(text, encoding="utf-8")
logger.info("Saved extracted text to %s", txt_path)
except Exception as e:
logger.warning("Failed to save text file (non-fatal): %s", e)
# Step 1.5: Classify document — local rules first, Claude Code headless fallback
classification_result = {}
try:
from legal_mcp.services import local_classifier
filename = Path(doc["file_path"]).name
doc_type, confidence = local_classifier.classify(filename, text)
if confidence < 0.8:
doc_type, confidence = local_classifier.classify_with_claude_code(filename, text)
# Update doc_type if we got a good classification and current type is generic
if confidence >= 0.5 and doc.get("doc_type") in ("reference", "auto"):
await db.update_document(document_id, doc_type=doc_type)
logger.info("Auto-classified: %s%s (confidence %.2f)", filename, doc_type, confidence)
classification_result = {"classification": {"doc_type": doc_type, "confidence": confidence}}
await db.update_document(document_id, metadata=classification_result)
except Exception as e:
logger.warning("Classification failed (non-fatal): %s", e)
# Step 2: Chunk (page_offsets propagates page_number into chunks)
logger.info("Chunking document (%d chars)", len(text))
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
if not chunks:
await db.update_document(document_id, extraction_status="completed")
return {"status": "completed", "chunks": 0, "message": "No text to chunk"}
# Step 3: Embed
logger.info("Generating embeddings for %d chunks", len(chunks))
texts = [c.content for c in chunks]
embs = await embeddings.embed_texts(texts, input_type="document")
# Step 4: Store
chunk_dicts = [
{
"content": c.content,
"section_type": c.section_type,
"embedding": emb,
"page_number": c.page_number,
"chunk_index": c.chunk_index,
}
for c, emb in zip(chunks, embs)
]
stored = await db.store_chunks(document_id, case_id, chunk_dicts)
# Step 4.5: Multimodal page-image embeddings (V9). Gated by
# MULTIMODAL_ENABLED. Renders each PDF page → embeds via
# voyage-multimodal-3 → stores per-page row with thumbnail.
# Non-fatal on failure (text path already succeeded).
multimodal_result = {"pages_embedded": 0}
if config.MULTIMODAL_ENABLED and page_count > 0:
try:
pdf_path = Path(doc["file_path"])
if pdf_path.suffix.lower() == ".pdf":
multimodal_result = await _embed_document_pages(
document_id, case_id, pdf_path, page_count,
)
except Exception as e:
logger.warning("Multimodal embedding failed (non-fatal): %s", e)
# Step 5: Extract references (plans, case law, legislation) — non-fatal
refs_result = {"plans": 0, "case_law": 0, "case_law_linked": 0, "legislation": 0}
try:
logger.info("Extracting legal references")
refs_result = await references_extractor.extract_and_link_references(
document_id, case_id, text,
)
logger.info(
"References found: %d plans, %d case law (%d linked), %d legislation",
refs_result["plans"], refs_result["case_law"],
refs_result["case_law_linked"], refs_result["legislation"],
)
except Exception as e:
logger.warning("Reference extraction failed (non-fatal): %s", e)
await db.update_document(document_id, extraction_status="completed")
logger.info("Document processed: %d chunks stored", stored)
return {
"status": "completed",
"chunks": stored,
"pages": page_count,
"text_length": len(text),
"classification": classification_result,
"references": {
"plans": refs_result["plans"],
"case_law": refs_result["case_law"],
"legislation": refs_result["legislation"],
},
"multimodal": multimodal_result,
}
except Exception as e:
logger.exception("Document processing failed: %s", e)
await db.update_document(document_id, extraction_status="failed")
return {"status": "failed", "error": str(e)}
async def _embed_document_pages(
document_id: UUID,
case_id: UUID,
pdf_path: Path,
page_count: int,
) -> dict:
"""Render PDF pages → embed via voyage-multimodal → store per-page rows.
Thumbnails are saved under
``data/cases/{case_number}/thumbnails/{document_id}/p{N:03d}.jpg``
so the UI can show small previews next to image-side search hits.
"""
# Layout: data/cases/{case_number}/documents/originals/{file}.pdf
# → case_dir = pdf_path.parent.parent.parent
case_dir = pdf_path.parent.parent.parent
thumb_dir = case_dir / "thumbnails" / str(document_id)
logger.info("Multimodal: rendering %d pages @ %ddpi", page_count, config.MULTIMODAL_DPI)
rendered = await asyncio.to_thread(
extractor.render_pages_for_multimodal,
pdf_path,
config.MULTIMODAL_DPI,
config.MULTIMODAL_THUMB_DPI,
thumb_dir,
)
images = [pil for pil, _ in rendered]
thumb_paths = [thumb for _, thumb in rendered]
logger.info("Multimodal: embedding %d pages via %s", len(images), config.MULTIMODAL_MODEL)
img_embs = await embeddings.embed_images(images)
page_records = []
for i, (emb, thumb) in enumerate(zip(img_embs, thumb_paths)):
rel_thumb = None
if thumb is not None:
try:
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
except ValueError:
rel_thumb = str(thumb)
page_records.append({
"page_number": i + 1,
"embedding": emb,
"image_thumbnail_path": rel_thumb,
})
stored = await db.store_document_image_embeddings(
document_id, case_id, page_records,
model_name=config.MULTIMODAL_MODEL,
)
logger.info("Multimodal: stored %d page-image embeddings", stored)
return {"pages_embedded": stored, "model": config.MULTIMODAL_MODEL}