diff --git a/scripts/backfill_chunk_pages.py b/scripts/backfill_chunk_pages.py index eb1cd85..e94d90b 100644 --- a/scripts/backfill_chunk_pages.py +++ b/scripts/backfill_chunk_pages.py @@ -1,36 +1,42 @@ -"""Backfill page_number on existing document_chunks. +"""Backfill page_number on existing document_chunks (no re-OCR). Why this exists: the legacy chunker did not track which page each chunk came from. After the page-tracking fix, new uploads carry page_number correctly, but existing chunks have ``page_number=NULL`` in the DB. -That blocks the multimodal hybrid retriever's text+image boost (it +That blocks the multimodal hybrid retriever's text+image boost (which joins (chunk, image) on (document_id, page_number)). -What it does (per case): - 1. List every document in the case - 2. For each document with NULL page_number chunks: - a. Re-extract via extractor.extract_text (re-runs OCR if needed — - ~$0.0015/page on Google Vision; idempotent on the DB side) - b. Compute page_offsets from the re-extracted text - c. For every chunk row (sorted by chunk_index), search its - content in the re-extracted text → look up page → UPDATE - 3. Skip documents whose chunks already have non-null page_number +What it does (per case, per document): + + 1. Load stored ``documents.extracted_text`` from the DB. This is + the exact text that was used to produce the existing chunks — + so chunk content lookups against it match verbatim. + 2. Open the PDF with PyMuPDF and call ``page.get_text()`` on each + page (cheap, no OCR). For pages with usable direct text we get + a clean snippet; for fully-scanned pages we get little/nothing. + 3. Anchor: for each page with a usable snippet, search the snippet + in ``extracted_text`` to recover that page's start offset. + 4. Interpolate: for OCR-only pages with no anchor, position is + linearly interpolated between the nearest anchored neighbors + (or uniformly when no anchors exist at all). + 5. For every chunk row (sorted by chunk_index), find the chunk's + content in ``extracted_text`` (verbatim match), look up the + page from the offsets, and ``UPDATE document_chunks SET + page_number = ?``. Idempotent: a second run with no --force is a no-op. -Designed to run from inside the FastAPI/MCP container (where /data -is mounted and Google Vision creds are present). Locally it requires -GOOGLE_CLOUD_VISION_API_KEY in ~/.env. +Cost: zero. Runs in seconds even for the 89-page appraisal report. Usage: - docker exec -it python /tmp/backfill_chunk_pages.py 8174-24 8137-24 + docker cp scripts/backfill_chunk_pages.py :/tmp/ + docker exec python /tmp/backfill_chunk_pages.py 8174-24 8137-24 """ from __future__ import annotations import argparse import asyncio import logging -import os import sys import time from pathlib import Path @@ -45,7 +51,8 @@ def _setup_paths(): _setup_paths() -from legal_mcp.services import db, extractor # noqa: E402 +import fitz # PyMuPDF # noqa: E402 +from legal_mcp.services import db # noqa: E402 logging.basicConfig( level=logging.INFO, @@ -54,6 +61,14 @@ logging.basicConfig( logger = logging.getLogger("backfill_chunk_pages") +# Snippet length for page anchoring. Long enough to be unique, short +# enough to survive minor whitespace variation between PyMuPDF direct +# extraction and the stored OCR text. +ANCHOR_SNIPPET_LEN = 80 +# Minimum direct-text length on a page to attempt anchoring at all. +MIN_DIRECT_LEN = 60 + + def _resolve_local_path(db_path: str) -> Path: p = Path(db_path) if p.is_file(): @@ -65,6 +80,123 @@ def _resolve_local_path(db_path: str) -> Path: return p +def _norm_whitespace(s: str) -> str: + """Collapse runs of whitespace; helps cross-source matching where + PyMuPDF direct extraction may differ from the stored OCR text in + line-break placement.""" + return " ".join(s.split()) + + +def _find_anchored_snippet( + extracted_text: str, snippet: str, search_start: int = 0, +) -> int: + """Search for ``snippet`` in ``extracted_text``, tolerant to + whitespace differences. Returns the offset in the original + extracted_text, or -1.""" + # Direct match first — fastest path + idx = extracted_text.find(snippet, search_start) + if idx >= 0: + return idx + # Whitespace-normalized fallback + norm_text = _norm_whitespace(extracted_text) + norm_snip = _norm_whitespace(snippet) + if not norm_snip: + return -1 + norm_idx = norm_text.find(norm_snip) + if norm_idx < 0: + return -1 + # Map norm offset back to original — count chars until we've passed + # `norm_idx` non-collapsed characters in the original. + orig_pos = 0 + norm_pos = 0 + in_ws = False + for ch in extracted_text: + if norm_pos == norm_idx: + return orig_pos + if ch.isspace(): + if not in_ws: + norm_pos += 1 + in_ws = True + else: + in_ws = False + norm_pos += 1 + orig_pos += 1 + return -1 + + +def _compute_page_offsets(pdf_path: Path, extracted_text: str) -> list[int]: + """Return ``page_offsets`` (start char offset of each page in + ``extracted_text``), using direct PyMuPDF reads for anchoring and + linear interpolation for OCR-only pages.""" + doc = fitz.open(str(pdf_path)) + n_pages = len(doc) + anchors: list[int | None] = [None] * n_pages + + last_pos = 0 + for i, page in enumerate(doc): + direct = page.get_text().strip() + if len(direct) < MIN_DIRECT_LEN: + continue + # Take the first ANCHOR_SNIPPET_LEN chars after stripping + snippet = direct[:ANCHOR_SNIPPET_LEN] + pos = _find_anchored_snippet(extracted_text, snippet, last_pos) + if pos < 0: + # try a global search before giving up + pos = _find_anchored_snippet(extracted_text, snippet, 0) + if pos >= 0: + anchors[i] = pos + last_pos = pos + doc.close() + + # Force first page to start at 0 if not already anchored + if anchors[0] is None: + anchors[0] = 0 + + # Fill gaps via linear interpolation between the nearest anchors; + # extrapolate beyond the last anchor by the average page length. + page_offsets: list[int] = [0] * n_pages + for i in range(n_pages): + if anchors[i] is not None: + page_offsets[i] = anchors[i] + continue + # Find prev anchored + prev_i = i - 1 + while prev_i >= 0 and anchors[prev_i] is None: + prev_i -= 1 + # Find next anchored + next_i = i + 1 + while next_i < n_pages and anchors[next_i] is None: + next_i += 1 + prev_pos = anchors[prev_i] if prev_i >= 0 else 0 + if next_i < n_pages: + next_pos = anchors[next_i] + ratio = (i - prev_i) / (next_i - prev_i) + page_offsets[i] = int(prev_pos + ratio * (next_pos - prev_pos)) + else: + # Extrapolate: assume uniform distribution beyond last anchor + # using page-density inferred from prior anchors (or fall + # back to total_text/n_pages). + avg = len(extracted_text) / max(1, n_pages) + page_offsets[i] = int(prev_pos + avg * (i - prev_i)) + # Monotone-clip just in case interpolation ever goes backwards + for i in range(1, n_pages): + if page_offsets[i] < page_offsets[i - 1]: + page_offsets[i] = page_offsets[i - 1] + return page_offsets + + +def _page_at_offset(offset: int, page_offsets: list[int]) -> int: + if not page_offsets: + return 1 + page = 1 + for i, start in enumerate(page_offsets): + if start <= offset: + page = i + 1 + else: + break + return page + + async def _backfill_document( document_id: UUID, title: str, @@ -73,7 +205,6 @@ async def _backfill_document( ) -> dict: pool = await db.get_pool() - # Fetch chunks for this document chunks = await pool.fetch( "SELECT id, chunk_index, content, page_number FROM document_chunks " "WHERE document_id = $1 ORDER BY chunk_index", @@ -94,15 +225,21 @@ async def _backfill_document( if pdf_path.suffix.lower() != ".pdf": return {"status": "not_pdf"} - logger.info(" re-extracting %s (%d chunks, %d need page)", - title, len(chunks), n_null) - t0 = time.time() - text, page_count, page_offsets = await extractor.extract_text(str(pdf_path)) - elapsed = time.time() - t0 - if not page_offsets: - return {"status": "no_offsets"} + doc_row = await pool.fetchrow( + "SELECT extracted_text FROM documents WHERE id = $1", document_id, + ) + extracted_text = doc_row["extracted_text"] if doc_row else None + if not extracted_text: + return {"status": "no_extracted_text"} - # Walk chunks, find each in the re-extracted text, assign page + t0 = time.time() + page_offsets = _compute_page_offsets(pdf_path, extracted_text) + n_anchored = sum(1 for i in range(len(page_offsets)) if i == 0 or page_offsets[i] > page_offsets[i - 1]) + + # The chunker joins paragraphs with single `\n` while extracted_text + # has `\n\n` between pages, so verbatim search misses cross-page + # chunks. Use the whitespace-tolerant helper that returns an offset + # in the *original* text. pos = 0 updated = 0 not_found = 0 @@ -110,29 +247,34 @@ async def _backfill_document( content = c["content"] if not content: continue - idx = text.find(content, pos) + # Use a unique slice from the chunk to anchor in extracted_text + # — anchoring on the chunk's first ~120 chars is enough to + # disambiguate across the document. + snippet = content[: min(len(content), 120)] + idx = _find_anchored_snippet(extracted_text, snippet, pos) if idx < 0: - idx = text.find(content) # global fallback + idx = _find_anchored_snippet(extracted_text, snippet, 0) if idx < 0: not_found += 1 continue - page = extractor.page_at_offset(idx, page_offsets) + page = _page_at_offset(idx, page_offsets) await pool.execute( "UPDATE document_chunks SET page_number = $1 WHERE id = $2", page, c["id"], ) updated += 1 - # advance roughly past midpoint — chunks have overlap pos = idx + max(1, len(content) // 2) + elapsed = time.time() - t0 logger.info( - " done in %.1fs: extracted %d pages, updated %d/%d chunks, " - "%d not found", elapsed, page_count, updated, len(chunks), not_found, + " %s — %d pages, %d anchors, updated %d/%d chunks (%d not found) in %.2fs", + title, len(page_offsets), n_anchored, updated, len(chunks), not_found, elapsed, ) return { "status": "ok", - "elapsed_sec": round(elapsed, 1), - "pages": page_count, + "elapsed_sec": round(elapsed, 2), + "pages": len(page_offsets), + "anchors": n_anchored, "chunks_total": len(chunks), "chunks_updated": updated, "chunks_not_found": not_found, @@ -168,7 +310,7 @@ async def backfill_cases(case_numbers: list[str], force: bool) -> dict: "skipped": sum(1 for r in per_doc if r["status"] == "skipped"), "missing": sum(1 for r in per_doc if r["status"] == "missing"), "no_chunks": sum(1 for r in per_doc if r["status"] == "no_chunks"), - "no_offsets": sum(1 for r in per_doc if r["status"] == "no_offsets"), + "no_extracted_text": sum(1 for r in per_doc if r["status"] == "no_extracted_text"), "chunks_updated": sum(r.get("chunks_updated", 0) for r in per_doc), "documents": per_doc, } @@ -176,11 +318,11 @@ async def backfill_cases(case_numbers: list[str], force: bool) -> dict: def main(): - parser = argparse.ArgumentParser(description="Backfill page_number on existing chunks") + parser = argparse.ArgumentParser(description="Backfill page_number on existing chunks (no OCR)") parser.add_argument("cases", nargs="+", help="Case numbers (e.g. 8174-24 8137-24)") parser.add_argument( "--force", action="store_true", - help="Re-extract even if all chunks already have page_number (default: skip)", + help="Re-process even if all chunks already have page_number (default: skip)", ) args = parser.parse_args() @@ -195,8 +337,8 @@ def main(): continue print( f" {cn}: {s['documents_total']} docs — " - f"ok {s['ok']}, skipped {s['skipped']}, missing {s['missing']}, " - f"chunks_updated {s['chunks_updated']}" + f"ok {s['ok']}, skipped {s['skipped']}, " + f"missing {s['missing']}, chunks_updated {s['chunks_updated']}" )