"""Multimodal backfill for precedent library — fills voyage-multimodal-3 page embeddings for case_law rows (external_upload + internal_committee) that don't have them yet. Background ---------- 77 (in practice 70 today, 2026-05-26) case_law rows were ingested before ``MULTIMODAL_ENABLED=true`` was permanently turned on, so they only have text chunks and no per-page image embeddings. The retrieval blend is hybrid (text + image), so the image side of the blend silently degrades for these rows. Strategy -------- Most rows have no PDF (they were ingested via text or are MD-only). The script: 1. Lists every case_law row with ``source_kind in (external_upload, internal_committee)`` that is missing image embeddings. 2. Tries to find a staged file by matching token-rich substrings of the case_number against filenames under ``data/precedent-library/`` and ``data/internal-decisions/``. 3. If the file is a PDF or DOCX (both renderable by PyMuPDF/fitz), renders pages at ``MULTIMODAL_DPI``, embeds via voyage-multimodal-3 in batches of 50, and stores rows into ``precedent_image_embeddings``. 4. Skips rows whose only candidate file is .md (PyMuPDF can't render markdown) or rows with no staged file. Designed to run inside the FastAPI/MCP container (where ``/data/...`` exists and Voyage env vars are present). Locally, it falls back to ``/home/chaim/legal-ai/data/...`` via ``_resolve_local_path``. Usage:: # Inside container (Coolify): docker exec -it /opt/api/.venv/bin/python \\ /opt/api/scripts/backfill_multimodal_precedents.py --dry-run # then: docker exec -it /opt/api/.venv/bin/python \\ /opt/api/scripts/backfill_multimodal_precedents.py --apply Notes ----- - Token cost: voyage-multimodal-3 averages ~3-4K tokens per dense legal page. 70 rows * ~30 pages avg = ~2,100 pages = ~7M tokens ≈ $0.70. - Estimate-only mode (``--dry-run``) prints the matched files and page counts without calling Voyage or touching the DB. - Idempotent: per-record DELETE+INSERT inside ``store_precedent_image_embeddings``, but the outer loop also skips rows that already have rows in ``precedent_image_embeddings``. """ from __future__ import annotations import argparse import asyncio import logging import os import re import sys import time from pathlib import Path from uuid import UUID import fitz # PyMuPDF def _setup_paths(): """Ensure mcp-server src is on path even when run as a standalone script. Works both from host (``/home/chaim/legal-ai/scripts/...``) and from inside the container (``/app/mcp-server/src``). """ here = Path(__file__).resolve().parent candidates = [ here.parent / "mcp-server" / "src", # host Path("/app/mcp-server/src"), # container ] for c in candidates: if c.is_dir() and str(c) not in sys.path: sys.path.insert(0, str(c)) _setup_paths() # Force multimodal on for this script regardless of env — backfill is # the entire point. The deploy-time default stays whatever Coolify sets. os.environ["MULTIMODAL_ENABLED"] = "true" from legal_mcp import config # noqa: E402 from legal_mcp.services import db, embeddings, extractor # noqa: E402 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger("backfill_multimodal_precedents") # ───────────────────────── file matching ───────────────────────── # Roots to search for staged precedent files. Both paths are tried; the # first that exists wins. ``/data/`` is the in-container mount; # ``/home/chaim/legal-ai/data/`` is the host path. SEARCH_ROOTS = [ Path("/data/precedent-library"), Path("/data/internal-decisions"), Path("/home/chaim/legal-ai/data/precedent-library"), Path("/home/chaim/legal-ai/data/internal-decisions"), ] # Extensions we can render with PyMuPDF (fitz). MD and TXT cannot be # rendered as page images, so we skip them. RENDERABLE_EXTS = {".pdf", ".docx"} # Token-extraction regex: only tokens that contain a slash or hyphen # (real case-number kernels like "8064/20" or "25226-04-25"). We # deliberately exclude pure numeric runs like "2011" (which is just a # year in "(נבו 5.4.2011)") to avoid false-positive matches against # unrelated filenames that happen to contain the same year. _NUMBER_TOKEN = re.compile(r"\d+[-/]\d+(?:[-/]\d+)*") def _extract_number_tokens(case_number: str) -> list[str]: """Pull numeric kernels out of a Hebrew case_number string. Only returns tokens containing a slash or hyphen (real case-number kernels), so years like "2011" and "2024" don't leak through and falsely match filenames. >>> _extract_number_tokens('בר"מ 25226-04-25 הוועדה') ['25226-04-25'] >>> _extract_number_tokens('ערר 8064/20 חברת') ['8064/20'] >>> _extract_number_tokens('עע"מ 10089/07 (נבו 5.4.2011)') ['10089/07', '5.4.2011'] # date stays; but '5.4.2011' is hyphenless after normalize → no match against random filenames """ # filter out date-shaped tokens (dotted) by additional check — only # keep tokens whose form is N/N or N-N-..., not N.N.N tokens = _NUMBER_TOKEN.findall(case_number) return [t for t in tokens if "." not in t] def _normalize_for_match(s: str) -> str: """Lowercase + strip whitespace/punct for filename matching.""" return re.sub(r"[\s/_-]+", "", s.lower()) def _build_file_index() -> dict[str, list[Path]]: """Walk SEARCH_ROOTS and return {normalized_filename: [paths]}. Only renderable extensions are included. """ idx: dict[str, list[Path]] = {} for root in SEARCH_ROOTS: if not root.is_dir(): continue for p in root.rglob("*"): if not p.is_file(): continue if p.suffix.lower() not in RENDERABLE_EXTS: continue if "thumbnails" in p.parts: continue key = _normalize_for_match(p.name) idx.setdefault(key, []).append(p) return idx def _digit_parts(token: str) -> list[str]: """Split a token like '14306-09-23' into ['14306','09','23'].""" return [p for p in re.split(r"[-/]", token) if p] def _find_file_for_case_number(case_number: str, file_index: dict[str, list[Path]]) -> Path | None: """Best-effort match a case_number → staged file path. Two strategies: 1. **Direct contiguous match** — token normalized (e.g. "8064/20" → "806420") appears as substring of the filename normalized. 2. **Parts-match** — every digit part of the token appears somewhere in the filename (handles reordered formats like case_number "14306-09-23" matched to "MM-23-09-14306-967.docx", where Nevo's case_number ordering differs from the legal template's filename ordering). Only accepts when the longest part has at least 4 digits — that filters out matches where only short pieces (year fragments) overlap. Returns the first match found, preferring PDFs over DOCX. """ tokens = _extract_number_tokens(case_number) if not tokens: return None candidates: list[Path] = [] for token in tokens: # Strategy 1: contiguous normalized_token = _normalize_for_match(token) token_hyphenated = token.replace("/", "-") normalized_hyphenated = _normalize_for_match(token_hyphenated) # Strategy 2: parts parts = _digit_parts(token) longest_part = max((len(p) for p in parts), default=0) for normalized_name, paths in file_index.items(): if normalized_token in normalized_name or normalized_hyphenated in normalized_name: candidates.extend(paths) continue # Parts-match requires longest part >= 4 digits AND all parts present if longest_part >= 4 and parts and all(p in normalized_name for p in parts): candidates.extend(paths) if not candidates: return None # Dedupe while preserving order seen = set() unique = [] for p in candidates: if p not in seen: seen.add(p) unique.append(p) # Prefer PDFs over DOCX (PDF rendering is more reliable for embedded fonts/images) pdf = next((p for p in unique if p.suffix.lower() == ".pdf"), None) return pdf or unique[0] # ───────────────────────── backfill core ───────────────────────── PRECEDENT_LIBRARY_THUMBNAILS = Path(config.DATA_DIR) / "precedent-library" / "thumbnails" async def _embed_one_precedent(case_law_id: UUID, src_path: Path) -> dict: """Render + embed + store image embeddings for a single precedent. Mirrors ``precedent_library._embed_precedent_pages`` but takes any fitz-renderable file (PDF or DOCX). """ thumb_dir = PRECEDENT_LIBRARY_THUMBNAILS / str(case_law_id) # PyMuPDF reads DOCX natively (uses its own MuPDF backend). We use # the same renderer as the live pipeline for consistency. rendered = await asyncio.to_thread( extractor.render_pages_for_multimodal, src_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir, ) if not rendered: return {"pages_embedded": 0, "status": "no_pages"} images = [pil for pil, _ in rendered] thumbs = [t for _, t in rendered] img_embs = await embeddings.embed_images(images) page_records = [] for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)): rel_thumb = None if thumb is not None: try: rel_thumb = str(thumb.relative_to(config.DATA_DIR)) except ValueError: rel_thumb = str(thumb) page_records.append({ "page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb, }) stored = await db.store_precedent_image_embeddings( case_law_id, page_records, model_name=config.MULTIMODAL_MODEL, ) return {"pages_embedded": stored, "status": "ok"} async def _scan_missing_records() -> list[dict]: pool = await db.get_pool() rows = await pool.fetch( """ SELECT id, case_number, source_kind, length(full_text) AS text_len FROM case_law cl WHERE NOT EXISTS ( SELECT 1 FROM precedent_image_embeddings ppi WHERE ppi.case_law_id = cl.id ) AND cl.source_kind IN ('external_upload', 'internal_committee') ORDER BY cl.source_kind, cl.case_number """ ) return [ { "id": UUID(str(r["id"])), "case_number": r["case_number"], "source_kind": r["source_kind"], "text_len": r["text_len"], } for r in rows ] async def backfill_all( *, dry_run: bool, limit: int | None = None, only_source_kind: str | None = None, ) -> dict: """Main entrypoint — scan, match, render, embed, store.""" await db.init_schema() records = await _scan_missing_records() if only_source_kind: records = [r for r in records if r["source_kind"] == only_source_kind] if limit: records = records[:limit] file_index = _build_file_index() logger.info("Indexed %d renderable files under %s", sum(len(v) for v in file_index.values()), ", ".join(str(r) for r in SEARCH_ROOTS if r.is_dir())) summary = { "scanned": len(records), "matched": 0, "no_match": 0, "embedded": 0, "skipped_md_only": 0, "errors": 0, "total_pages": 0, "details": [], } for rec in records: case_law_id = rec["id"] case_number = rec["case_number"] src = _find_file_for_case_number(case_number, file_index) if not src: summary["no_match"] += 1 summary["details"].append({ "case_law_id": str(case_law_id), "case_number": case_number, "source_kind": rec["source_kind"], "status": "no_match", }) logger.info(" NO MATCH: %s", case_number[:80]) continue # Probe page count without rendering (cheap) try: doc = fitz.open(str(src)) page_count = len(doc) doc.close() except Exception as e: summary["errors"] += 1 summary["details"].append({ "case_law_id": str(case_law_id), "case_number": case_number, "matched_file": str(src), "status": "open_error", "error": str(e), }) logger.warning(" OPEN ERROR for %s: %s", case_number[:60], e) continue summary["matched"] += 1 summary["total_pages"] += page_count logger.info(" MATCHED: %s -> %s (%d pages)", case_number[:60], src.name, page_count) if dry_run: summary["details"].append({ "case_law_id": str(case_law_id), "case_number": case_number, "matched_file": str(src), "pages": page_count, "status": "would_embed", }) continue # Actually embed + store t0 = time.time() try: result = await _embed_one_precedent(case_law_id, src) elapsed = time.time() - t0 summary["embedded"] += 1 summary["details"].append({ "case_law_id": str(case_law_id), "case_number": case_number, "matched_file": str(src), "pages": page_count, "elapsed_sec": round(elapsed, 1), "status": "ok", **result, }) logger.info(" EMBEDDED %d pages in %.1fs", result["pages_embedded"], elapsed) except Exception as e: summary["errors"] += 1 summary["details"].append({ "case_law_id": str(case_law_id), "case_number": case_number, "matched_file": str(src), "status": "embed_error", "error": str(e), }) logger.exception(" EMBED ERROR for %s", case_number[:60]) return summary # ───────────────────────── CLI ───────────────────────── def main(): parser = argparse.ArgumentParser( description="Backfill voyage-multimodal-3 embeddings for case_law records " "(external_upload + internal_committee) missing them.", ) parser.add_argument( "--dry-run", action="store_true", help="Only scan + match; do not call Voyage or write to DB.", ) parser.add_argument( "--apply", action="store_true", help="Render, embed, and store. Implies not --dry-run.", ) parser.add_argument( "--limit", type=int, default=None, help="Max number of records to process (debugging).", ) parser.add_argument( "--only", choices=["external_upload", "internal_committee"], default=None, help="Restrict to a single source_kind.", ) args = parser.parse_args() if not args.apply and not args.dry_run: # Default to dry_run for safety. args.dry_run = True logger.info( "Mode=%s MULTIMODAL_MODEL=%s DPI=%d THUMB_DPI=%d", "DRY-RUN" if args.dry_run else "APPLY", config.MULTIMODAL_MODEL, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, ) summary = asyncio.run( backfill_all( dry_run=args.dry_run, limit=args.limit, only_source_kind=args.only, ) ) print() print("=" * 60) print("BACKFILL SUMMARY") print("=" * 60) print(f" scanned: {summary['scanned']}") print(f" matched: {summary['matched']}") print(f" no_match: {summary['no_match']}") print(f" total pages: {summary['total_pages']}") if args.dry_run: # Cost estimate: ~3.5K tokens/page * $0.12/1M tokens est_tokens = summary["total_pages"] * 3500 est_cost = est_tokens / 1_000_000 * 0.12 print(f" est. tokens: ~{est_tokens:,} (~${est_cost:.2f})") else: print(f" embedded: {summary['embedded']}") print(f" errors: {summary['errors']}") if __name__ == "__main__": main()