"""Multimodal backfill — embed page images for existing case documents. Iterates over documents already in the DB and renders + embeds + stores per-page voyage-multimodal-3 vectors. Skips documents that already have image embeddings (idempotent). Independent of the processor pipeline — does NOT re-extract text or re-chunk; only the multimodal step. Designed to run from inside the FastAPI/MCP container (where /data is mounted and writable). Locally it requires sudo for the thumbnails dir under /home/chaim/legal-ai/data/cases/... Usage:: # In container (Coolify): docker exec -it python -m legal_mcp.cli \\ multimodal_backfill --cases 8174-24 8137-24 # Or as a script (sets MULTIMODAL_ENABLED=true automatically): /opt/api/mcp-server/.venv/bin/python /opt/api/scripts/multimodal_backfill.py 8174-24 8137-24 """ from __future__ import annotations import argparse import asyncio import logging import os import sys import time from pathlib import Path from uuid import UUID def _setup_paths(): """Ensure mcp-server src is on path even when run as a standalone script.""" here = Path(__file__).resolve().parent mcp_src = here.parent / "mcp-server" / "src" if mcp_src.is_dir() and str(mcp_src) not in sys.path: sys.path.insert(0, str(mcp_src)) _setup_paths() # Force the flag on for this run regardless of env — backfill is the # whole point of running this script. The deploy-time default stays off. os.environ["MULTIMODAL_ENABLED"] = "true" from legal_mcp import config # noqa: E402 from legal_mcp.services import db, embeddings, extractor, processor # noqa: E402 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger("multimodal_backfill") def _resolve_local_path(db_path: str) -> Path: """Map container path /data/... to host /home/chaim/legal-ai/data/... when running locally; pass-through when already absolute and present.""" p = Path(db_path) if p.is_file(): return p if str(p).startswith("/data/"): local = Path("/home/chaim/legal-ai") / Path(*p.parts[1:]) if local.is_file(): return local return p async def _backfill_document( document_id: UUID, case_id: UUID, title: str, db_file_path: str, skip_if_exists: bool, ) -> dict: pool = await db.get_pool() if skip_if_exists: existing = await pool.fetchval( "SELECT count(*) FROM document_image_embeddings WHERE document_id = $1", document_id, ) if existing and existing > 0: logger.info(" skip (%d rows already): %s", existing, title) return {"status": "skipped", "rows": int(existing)} pdf_path = _resolve_local_path(db_file_path) if not pdf_path.is_file(): logger.warning(" file missing: %s (%s)", pdf_path, title) return {"status": "missing"} if pdf_path.suffix.lower() != ".pdf": logger.info(" not a PDF, skipping: %s", title) return {"status": "not_pdf"} page_count = await pool.fetchval( "SELECT page_count FROM documents WHERE id = $1", document_id, ) if not page_count: # Open to count import fitz d = fitz.open(str(pdf_path)) page_count = len(d) d.close() logger.info(" embedding %s (%d pages)", title, page_count) t0 = time.time() result = await processor._embed_document_pages( document_id, case_id, pdf_path, page_count, ) elapsed = time.time() - t0 logger.info(" done in %.1fs: %s", elapsed, result) return {"status": "ok", "elapsed_sec": round(elapsed, 1), **result} async def backfill_cases(case_numbers: list[str], skip_if_exists: bool = True) -> dict: """Embed page images for every PDF document in the given cases.""" await db.init_schema() # in case schema V9 hasn't been applied pool = await db.get_pool() summary: dict = {} for cn in case_numbers: logger.info("=" * 60) logger.info("Case %s", cn) case = await db.get_case_by_number(cn) if not case: logger.warning("Case not found: %s", cn) summary[cn] = {"status": "case_not_found"} continue case_id = UUID(str(case["id"])) docs = await pool.fetch( "SELECT id, title, file_path FROM documents WHERE case_id = $1 ORDER BY title", case_id, ) logger.info(" %d documents", len(docs)) per_doc: list[dict] = [] for d in docs: doc_id = UUID(str(d["id"])) title = d["title"] r = await _backfill_document( doc_id, case_id, title, d["file_path"], skip_if_exists, ) per_doc.append({"document_id": str(doc_id), "title": title, **r}) summary[cn] = { "documents_total": len(docs), "embedded": sum(1 for r in per_doc if r["status"] == "ok"), "skipped": sum(1 for r in per_doc if r["status"] == "skipped"), "missing": sum(1 for r in per_doc if r["status"] == "missing"), "not_pdf": sum(1 for r in per_doc if r["status"] == "not_pdf"), "documents": per_doc, } return summary def main(): parser = argparse.ArgumentParser(description="Multimodal backfill for case documents") parser.add_argument( "cases", nargs="+", help="Case numbers to backfill (e.g. 8174-24 8137-24)" ) parser.add_argument( "--re-embed", action="store_true", help="Re-embed even if image embeddings already exist (default: skip)", ) args = parser.parse_args() logger.info("MULTIMODAL_MODEL=%s DPI=%d THUMB_DPI=%d", config.MULTIMODAL_MODEL, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI) summary = asyncio.run( backfill_cases(args.cases, skip_if_exists=not args.re_embed) ) print() print("=" * 60) print("SUMMARY") print("=" * 60) for cn, s in summary.items(): if s.get("status") == "case_not_found": print(f" {cn}: NOT FOUND") continue print( f" {cn}: {s['documents_total']} docs — " f"embedded {s['embedded']}, skipped {s['skipped']}, " f"missing {s['missing']}, non-pdf {s['not_pdf']}" ) if __name__ == "__main__": main()