Continue the write-site rewiring onto the unified storage layer (INV-STG1): - services/processor.py: extracted-text .txt → DERIVED bucket (a derived artifact; the DB column is the source of truth per INV-STG5, so the write stays non-fatal) - services/docx_exporter.py (export_decision): DOCX → DOCUMENTS bucket via BytesIO → put_bytes, with a fallback to a direct disk write when the caller passes an output_path outside DATA_DIR - services/analysis_docx_exporter.py (build_analysis_docx): same pattern; out_path is always under DATA_DIR Under the default STORAGE_BACKEND=filesystem the bytes land at the exact legacy path (put_bytes → DATA_DIR/key), so behaviour is unchanged. The disk-reading bits that must stay for now (export_dir glob in _next_version) are kept; storage-native versioning is a cutover concern. Still on disk (sync call-sites, follow-up Phase 2c): docx_reviser (track-changes), docx_retrofit backup, and multimodal thumbnails (rendered in a to_thread). git-tracked text (case.json/notes/research-md/draft-md) stays on disk by design (INV-STG7). tests: 38 storage + docx tests green (incl. test_export_qa_gate / test_docx_exporter_bookmarks which exercise the real export path); 242 collected, no import breakage. Keeps G2; advances INV-STG1. Spec: docs/spec/X14-storage-minio.md. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
210 lines
8.2 KiB
Python
210 lines
8.2 KiB
Python
"""Document processing pipeline: extract → chunk → embed → store."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from pathlib import Path
|
|
from uuid import UUID
|
|
|
|
from legal_mcp import config
|
|
from legal_mcp.services import (
|
|
chunker, db, embeddings, extractor, references_extractor, storage,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def process_document(document_id: UUID, case_id: UUID) -> dict:
|
|
"""Full processing pipeline for a document.
|
|
|
|
1. Extract text from file
|
|
2. Split into chunks
|
|
3. Generate embeddings
|
|
4. Store chunks + embeddings in DB
|
|
|
|
Returns processing summary.
|
|
"""
|
|
doc = await db.get_document(document_id)
|
|
if not doc:
|
|
raise ValueError(f"Document {document_id} not found")
|
|
|
|
await db.update_document(document_id, extraction_status="processing")
|
|
|
|
try:
|
|
# Step 1: Extract text
|
|
logger.info("Extracting text from %s", doc["file_path"])
|
|
text, page_count, page_offsets = await extractor.extract_text(doc["file_path"])
|
|
|
|
await db.update_document(
|
|
document_id,
|
|
extracted_text=text,
|
|
page_count=page_count,
|
|
)
|
|
|
|
# Save extracted text (a DERIVED artifact — the DB column holds the
|
|
# source of truth, INV-STG5) through the storage layer (INV-STG1).
|
|
# Non-fatal: the .txt is a convenience copy, the pipeline reads the DB.
|
|
original_path = Path(doc["file_path"])
|
|
txt_path = original_path.parent.parent / "extracted" / (original_path.stem + ".txt")
|
|
try:
|
|
await storage.put_bytes(
|
|
txt_path.relative_to(config.DATA_DIR).as_posix(),
|
|
text.encode("utf-8"), bucket=storage.Bucket.DERIVED,
|
|
content_type="text/plain; charset=utf-8",
|
|
)
|
|
logger.info("Saved extracted text to %s", txt_path)
|
|
except Exception as e:
|
|
logger.warning("Failed to save text file (non-fatal): %s", e)
|
|
|
|
# Step 1.5: Classify document — local rules first, Claude Code headless fallback
|
|
classification_result = {}
|
|
try:
|
|
from legal_mcp.services import local_classifier
|
|
filename = Path(doc["file_path"]).name
|
|
doc_type, confidence = local_classifier.classify(filename, text)
|
|
if confidence < 0.8:
|
|
doc_type, confidence = local_classifier.classify_with_claude_code(filename, text)
|
|
|
|
# Update doc_type if we got a good classification and current type is generic
|
|
if confidence >= 0.5 and doc.get("doc_type") in ("reference", "auto"):
|
|
await db.update_document(document_id, doc_type=doc_type)
|
|
logger.info("Auto-classified: %s → %s (confidence %.2f)", filename, doc_type, confidence)
|
|
|
|
classification_result = {"classification": {"doc_type": doc_type, "confidence": confidence}}
|
|
await db.update_document(document_id, metadata=classification_result)
|
|
except Exception as e:
|
|
logger.warning("Classification failed (non-fatal): %s", e)
|
|
|
|
# Step 2: Chunk (page_offsets propagates page_number into chunks)
|
|
logger.info("Chunking document (%d chars)", len(text))
|
|
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
|
|
|
|
if not chunks:
|
|
await db.update_document(document_id, extraction_status="completed")
|
|
return {"status": "completed", "chunks": 0, "message": "No text to chunk"}
|
|
|
|
# Step 3: Embed
|
|
logger.info("Generating embeddings for %d chunks", len(chunks))
|
|
texts = [c.content for c in chunks]
|
|
embs = await embeddings.embed_texts(texts, input_type="document")
|
|
|
|
# Step 4: Store
|
|
chunk_dicts = [
|
|
{
|
|
"content": c.content,
|
|
"section_type": c.section_type,
|
|
"embedding": emb,
|
|
"page_number": c.page_number,
|
|
"chunk_index": c.chunk_index,
|
|
}
|
|
for c, emb in zip(chunks, embs)
|
|
]
|
|
|
|
stored = await db.store_chunks(document_id, case_id, chunk_dicts)
|
|
|
|
# Step 4.5: Multimodal page-image embeddings (V9). Gated by
|
|
# MULTIMODAL_ENABLED. Renders each PDF page → embeds via
|
|
# voyage-multimodal-3 → stores per-page row with thumbnail.
|
|
# Non-fatal on failure (text path already succeeded).
|
|
multimodal_result = {"pages_embedded": 0}
|
|
if config.MULTIMODAL_ENABLED and page_count > 0:
|
|
try:
|
|
pdf_path = Path(doc["file_path"])
|
|
if pdf_path.suffix.lower() == ".pdf":
|
|
multimodal_result = await _embed_document_pages(
|
|
document_id, case_id, pdf_path, page_count,
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Multimodal embedding failed (non-fatal): %s", e)
|
|
|
|
# Step 5: Extract references (plans, case law, legislation) — non-fatal
|
|
refs_result = {"plans": 0, "case_law": 0, "case_law_linked": 0, "legislation": 0}
|
|
try:
|
|
logger.info("Extracting legal references")
|
|
refs_result = await references_extractor.extract_and_link_references(
|
|
document_id, case_id, text,
|
|
)
|
|
logger.info(
|
|
"References found: %d plans, %d case law (%d linked), %d legislation",
|
|
refs_result["plans"], refs_result["case_law"],
|
|
refs_result["case_law_linked"], refs_result["legislation"],
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Reference extraction failed (non-fatal): %s", e)
|
|
|
|
await db.update_document(document_id, extraction_status="completed")
|
|
|
|
logger.info("Document processed: %d chunks stored", stored)
|
|
return {
|
|
"status": "completed",
|
|
"chunks": stored,
|
|
"pages": page_count,
|
|
"text_length": len(text),
|
|
"classification": classification_result,
|
|
"references": {
|
|
"plans": refs_result["plans"],
|
|
"case_law": refs_result["case_law"],
|
|
"legislation": refs_result["legislation"],
|
|
},
|
|
"multimodal": multimodal_result,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.exception("Document processing failed: %s", e)
|
|
await db.update_document(document_id, extraction_status="failed")
|
|
return {"status": "failed", "error": str(e)}
|
|
|
|
|
|
async def _embed_document_pages(
|
|
document_id: UUID,
|
|
case_id: UUID,
|
|
pdf_path: Path,
|
|
page_count: int,
|
|
) -> dict:
|
|
"""Render PDF pages → embed via voyage-multimodal → store per-page rows.
|
|
|
|
Thumbnails are saved under
|
|
``data/cases/{case_number}/thumbnails/{document_id}/p{N:03d}.jpg``
|
|
so the UI can show small previews next to image-side search hits.
|
|
"""
|
|
# Layout: data/cases/{case_number}/documents/originals/{file}.pdf
|
|
# → case_dir = pdf_path.parent.parent.parent
|
|
case_dir = pdf_path.parent.parent.parent
|
|
thumb_dir = case_dir / "thumbnails" / str(document_id)
|
|
|
|
logger.info("Multimodal: rendering %d pages @ %ddpi", page_count, config.MULTIMODAL_DPI)
|
|
rendered = await asyncio.to_thread(
|
|
extractor.render_pages_for_multimodal,
|
|
pdf_path,
|
|
config.MULTIMODAL_DPI,
|
|
config.MULTIMODAL_THUMB_DPI,
|
|
thumb_dir,
|
|
)
|
|
images = [pil for pil, _ in rendered]
|
|
thumb_paths = [thumb for _, thumb in rendered]
|
|
|
|
logger.info("Multimodal: embedding %d pages via %s", len(images), config.MULTIMODAL_MODEL)
|
|
img_embs = await embeddings.embed_images(images)
|
|
|
|
page_records = []
|
|
for i, (emb, thumb) in enumerate(zip(img_embs, thumb_paths)):
|
|
rel_thumb = None
|
|
if thumb is not None:
|
|
try:
|
|
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
|
|
except ValueError:
|
|
rel_thumb = str(thumb)
|
|
page_records.append({
|
|
"page_number": i + 1,
|
|
"embedding": emb,
|
|
"image_thumbnail_path": rel_thumb,
|
|
})
|
|
|
|
stored = await db.store_document_image_embeddings(
|
|
document_id, case_id, page_records,
|
|
model_name=config.MULTIMODAL_MODEL,
|
|
)
|
|
logger.info("Multimodal: stored %d page-image embeddings", stored)
|
|
return {"pages_embedded": stored, "model": config.MULTIMODAL_MODEL}
|