feat(retrieval): track page_number on text chunks for multimodal hybrid boost
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 6m33s

The legacy chunker did not track which PDF page each chunk came from.
Stored chunks had page_number=NULL, which blocked the multimodal
hybrid retriever's text+image boost — it joins (chunk, image) on
(document_id, page_number) and the join could never fire.

This change:

- extractor.extract_text now returns (text, page_count, page_offsets);
  page_offsets[i] is the start char offset of page (i+1) in the joined
  text. None for non-PDFs.
- chunker.chunk_document accepts an optional page_offsets and tags
  each chunk with the page that contains its first character (uses
  the existing chunker logic; pages assigned post-hoc by content
  search to keep the diff minimal).
- processor.process_document and precedent_library.ingest_precedent
  forward page_offsets through the chunker. New uploads now carry
  accurate page_number on every chunk.
- Other extract_text callers (tools/documents, tools/workflow,
  web/app.py) updated to unpack the third element (ignored).
- scripts/backfill_chunk_pages.py: per-case retrofit. Re-extracts each
  PDF (re-OCRs via Google Vision if needed, ~$0.0015/page), computes
  page_offsets, and updates page_number on every chunk by content
  search. Idempotent; --force re-runs on already-tagged docs.

Forward-only would leave the 419 image embeddings backfilled on
cases 8174-24 + 8137-24 unable to boost their corresponding text
chunks. The retrofit script closes that gap (cost ~$0.60).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-03 19:49:41 +00:00
parent 5724ed8e5b
commit 81ccf3a888
9 changed files with 301 additions and 18 deletions

View File

@@ -33,8 +33,15 @@ def chunk_document(
text: str, text: str,
chunk_size: int = config.CHUNK_SIZE_TOKENS, chunk_size: int = config.CHUNK_SIZE_TOKENS,
overlap: int = config.CHUNK_OVERLAP_TOKENS, overlap: int = config.CHUNK_OVERLAP_TOKENS,
page_offsets: list[int] | None = None,
) -> list[Chunk]: ) -> list[Chunk]:
"""Split a legal document into chunks, respecting section boundaries.""" """Split a legal document into chunks, respecting section boundaries.
When ``page_offsets`` is supplied (from a PDF extraction), each chunk
is tagged with the page number of its first character — used by the
multimodal hybrid retriever to join (text chunk, image at same page)
and surface text+image matches.
"""
if not text.strip(): if not text.strip():
return [] return []
@@ -52,9 +59,34 @@ def chunk_document(
)) ))
idx += 1 idx += 1
if page_offsets:
_assign_pages(chunks, text, page_offsets)
return chunks return chunks
def _assign_pages(chunks: list[Chunk], text: str, page_offsets: list[int]) -> None:
"""Locate each chunk's first character in ``text`` and tag with the
page that contains that offset. Mutates chunks in-place.
Chunks have overlap so we search forward from a position slightly
past the previous chunk's start. Falls back to a global search if
the forward scan misses (rare — happens only when overlap is bigger
than the advance distance below).
"""
from legal_mcp.services.extractor import page_at_offset
pos = 0
for c in chunks:
idx = text.find(c.content, pos)
if idx < 0:
idx = text.find(c.content)
if idx < 0:
continue
c.page_number = page_at_offset(idx, page_offsets)
# advance past the chunk's halfway point — overlap is < 50% so
# the next chunk's starting point will be after this cursor.
pos = idx + max(1, len(c.content) // 2)
def _split_into_sections(text: str) -> list[tuple[str, str]]: def _split_into_sections(text: str) -> list[tuple[str, str]]:
"""Split text into (section_type, text) pairs based on Hebrew headers.""" """Split text into (section_type, text) pairs based on Hebrew headers."""
# Find all section headers and their positions # Find all section headers and their positions

View File

@@ -120,12 +120,22 @@ def _fix_hebrew_quotes(text: str) -> str:
# ── Extraction ─────────────────────────────────────────────────── # ── Extraction ───────────────────────────────────────────────────
async def extract_text(file_path: str) -> tuple[str, int]: # Separator used when joining per-page text. Constant so chunker /
# retrofit can reproduce the join when computing page offsets.
PAGE_SEPARATOR = "\n\n"
async def extract_text(file_path: str) -> tuple[str, int, list[int] | None]:
"""Extract text from a document file. """Extract text from a document file.
Returns: Returns:
Tuple of (extracted_text, page_count). ``(text, page_count, page_offsets)`` where:
page_count is 0 for non-PDF files. - ``text``: concatenated extracted text
- ``page_count``: number of pages (0 for non-PDF)
- ``page_offsets``: ``page_offsets[i]`` = char start offset of
page (i+1) inside ``text``. ``None`` for non-PDFs (where the
notion of pages doesn't apply). Used by the chunker to assign
a ``page_number`` to each chunk.
""" """
path = Path(file_path) path = Path(file_path)
suffix = path.suffix.lower() suffix = path.suffix.lower()
@@ -133,18 +143,34 @@ async def extract_text(file_path: str) -> tuple[str, int]:
if suffix == ".pdf": if suffix == ".pdf":
return await _extract_pdf(path) return await _extract_pdf(path)
elif suffix == ".docx": elif suffix == ".docx":
return _extract_docx(path), 0 return _extract_docx(path), 0, None
elif suffix == ".doc": elif suffix == ".doc":
return _extract_doc(path), 0 return _extract_doc(path), 0, None
elif suffix == ".rtf": elif suffix == ".rtf":
return _extract_rtf(path), 0 return _extract_rtf(path), 0, None
elif suffix in (".txt", ".md"): elif suffix in (".txt", ".md"):
return path.read_text(encoding="utf-8"), 0 return path.read_text(encoding="utf-8"), 0, None
else: else:
raise ValueError(f"Unsupported file type: {suffix}") raise ValueError(f"Unsupported file type: {suffix}")
async def _extract_pdf(path: Path) -> tuple[str, int]: def _join_pages(pages_text: list[str]) -> tuple[str, list[int]]:
"""Join per-page text with PAGE_SEPARATOR while recording the start
offset of each page in the joined output."""
offsets: list[int] = []
parts: list[str] = []
cursor = 0
for i, pg in enumerate(pages_text):
offsets.append(cursor)
parts.append(pg)
cursor += len(pg)
if i < len(pages_text) - 1:
parts.append(PAGE_SEPARATOR)
cursor += len(PAGE_SEPARATOR)
return "".join(parts), offsets
async def _extract_pdf(path: Path) -> tuple[str, int, list[int]]:
"""Extract text from PDF. """Extract text from PDF.
Try direct text first, fall back to Google Cloud Vision for scanned Try direct text first, fall back to Google Cloud Vision for scanned
@@ -172,7 +198,27 @@ async def _extract_pdf(path: Path) -> tuple[str, int]:
pages_text.append(ocr_text) pages_text.append(ocr_text)
doc.close() doc.close()
return "\n\n".join(pages_text), page_count joined, offsets = _join_pages(pages_text)
return joined, page_count, offsets
def page_at_offset(offset: int, page_offsets: list[int]) -> int:
"""Look up the page number containing a given char offset.
page_offsets[i] is the start of page (i+1) in the joined text;
a chunk starting at ``offset`` belongs to the highest-indexed page
whose start is ``<= offset``. Returns 1-based page number.
"""
if not page_offsets:
return 1
# Linear scan is fine — page_offsets is short (≤ ~200 for our PDFs).
page = 1
for i, start in enumerate(page_offsets):
if start <= offset:
page = i + 1
else:
break
return page
def _ocr_with_google_vision(image_bytes: bytes, page_num: int) -> str: def _ocr_with_google_vision(image_bytes: bytes, page_num: int) -> str:

View File

@@ -127,7 +127,7 @@ async def ingest_precedent(
await progress("extracting", 15, "מחלץ טקסט מהקובץ") await progress("extracting", 15, "מחלץ טקסט מהקובץ")
try: try:
text, page_count = await extractor.extract_text(str(staged)) text, page_count, page_offsets = await extractor.extract_text(str(staged))
except Exception as e: except Exception as e:
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}") await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
raise raise
@@ -161,7 +161,7 @@ async def ingest_precedent(
try: try:
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')") await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')")
chunks = chunker.chunk_document(text) chunks = chunker.chunk_document(text, page_offsets=page_offsets)
if not chunks: if not chunks:
await db.set_case_law_extraction_status(case_law_id, "completed") await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "completed") await db.set_case_law_halacha_status(case_law_id, "completed")

View File

@@ -32,7 +32,7 @@ async def process_document(document_id: UUID, case_id: UUID) -> dict:
try: try:
# Step 1: Extract text # Step 1: Extract text
logger.info("Extracting text from %s", doc["file_path"]) logger.info("Extracting text from %s", doc["file_path"])
text, page_count = await extractor.extract_text(doc["file_path"]) text, page_count, page_offsets = await extractor.extract_text(doc["file_path"])
await db.update_document( await db.update_document(
document_id, document_id,
@@ -70,9 +70,9 @@ async def process_document(document_id: UUID, case_id: UUID) -> dict:
except Exception as e: except Exception as e:
logger.warning("Classification failed (non-fatal): %s", e) logger.warning("Classification failed (non-fatal): %s", e)
# Step 2: Chunk # Step 2: Chunk (page_offsets propagates page_number into chunks)
logger.info("Chunking document (%d chars)", len(text)) logger.info("Chunking document (%d chars)", len(text))
chunks = chunker.chunk_document(text) chunks = chunker.chunk_document(text, page_offsets=page_offsets)
if not chunks: if not chunks:
await db.update_document(document_id, extraction_status="completed") await db.update_document(document_id, extraction_status="completed")

View File

@@ -144,7 +144,7 @@ async def document_upload_training(
shutil.copy2(str(source), str(dest)) shutil.copy2(str(source), str(dest))
# Extract text and strip Nevo preamble # Extract text and strip Nevo preamble
text, page_count = await extractor.extract_text(str(dest)) text, page_count, _ = await extractor.extract_text(str(dest))
text = extractor.strip_nevo_preamble(text) text = extractor.strip_nevo_preamble(text)
# Parse date # Parse date

View File

@@ -308,7 +308,7 @@ async def ingest_final_version(
# Extract text from file if provided # Extract text from file if provided
if file_path and not final_text: if file_path and not final_text:
from legal_mcp.services import extractor from legal_mcp.services import extractor
final_text, _ = await extractor.extract_text(file_path) final_text, _, _ = await extractor.extract_text(file_path)
if not final_text: if not final_text:
return "לא סופק טקסט — יש לספק file_path או final_text." return "לא סופק טקסט — יש לספק file_path או final_text."

View File

@@ -23,6 +23,7 @@
| `voyage_rerank_judge_poc.py` | python | POC #4 — voyage-3 vs rerank-2 vs context-3 על אהרון ברק, 18 שאילתות, claude-haiku-4-5 כ-judge. הכרעה: rerank-2 ניצח עם +9% mean@3 | בנצ'מרק חד-פעמי | | `voyage_rerank_judge_poc.py` | python | POC #4 — voyage-3 vs rerank-2 vs context-3 על אהרון ברק, 18 שאילתות, claude-haiku-4-5 כ-judge. הכרעה: rerank-2 ניצח עם +9% mean@3 | בנצ'מרק חד-פעמי |
| `voyage_rerank_corpus_poc.py` | python | POC #5 — voyage-3 vs rerank-2 על קורפוס מלא (785 docs). הכרעה: +4.5% mean@3 כללי, +11.6% על P queries (practical) | בנצ'מרק חד-פעמי, אישר את שלב B | | `voyage_rerank_corpus_poc.py` | python | POC #5 — voyage-3 vs rerank-2 על קורפוס מלא (785 docs). הכרעה: +4.5% mean@3 כללי, +11.6% על P queries (practical) | בנצ'מרק חד-פעמי, אישר את שלב B |
| `multimodal_backfill.py` | python | Backfill voyage-multimodal-3 page embeddings על מסמכי תיקים קיימים. idempotent (skips by default), forces `MULTIMODAL_ENABLED=true` ל-run, רץ מהקונטיינר. שלב C — ראה `docs/voyage-upgrades-plan.md` | ידני per-case (`python multimodal_backfill.py 8174-24 8137-24`) | | `multimodal_backfill.py` | python | Backfill voyage-multimodal-3 page embeddings על מסמכי תיקים קיימים. idempotent (skips by default), forces `MULTIMODAL_ENABLED=true` ל-run, רץ מהקונטיינר. שלב C — ראה `docs/voyage-upgrades-plan.md` | ידני per-case (`python multimodal_backfill.py 8174-24 8137-24`) |
| `backfill_chunk_pages.py` | python | Backfill `page_number` ב-`document_chunks` קיימים. legacy chunker לא tracked עמודים → `page_number=NULL` חוסם boost של multimodal hybrid (text+image join על אותו עמוד). re-extracts כל PDF (re-OCR אם צריך, ~$0.0015/page), מחשב page_offsets, ומעדכן chunks. idempotent | ידני per-case (`python backfill_chunk_pages.py 8174-24 8137-24`) |
## תיקיית `.archive/` — סקריפטים שהושלמו ## תיקיית `.archive/` — סקריפטים שהושלמו

View File

@@ -0,0 +1,204 @@
"""Backfill page_number on existing document_chunks.
Why this exists: the legacy chunker did not track which page each chunk
came from. After the page-tracking fix, new uploads carry page_number
correctly, but existing chunks have ``page_number=NULL`` in the DB.
That blocks the multimodal hybrid retriever's text+image boost (it
joins (chunk, image) on (document_id, page_number)).
What it does (per case):
1. List every document in the case
2. For each document with NULL page_number chunks:
a. Re-extract via extractor.extract_text (re-runs OCR if needed —
~$0.0015/page on Google Vision; idempotent on the DB side)
b. Compute page_offsets from the re-extracted text
c. For every chunk row (sorted by chunk_index), search its
content in the re-extracted text → look up page → UPDATE
3. Skip documents whose chunks already have non-null page_number
Idempotent: a second run with no --force is a no-op.
Designed to run from inside the FastAPI/MCP container (where /data
is mounted and Google Vision creds are present). Locally it requires
GOOGLE_CLOUD_VISION_API_KEY in ~/.env.
Usage:
docker exec -it <legal-ai-container> python /tmp/backfill_chunk_pages.py 8174-24 8137-24
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
import time
from pathlib import Path
from uuid import UUID
def _setup_paths():
here = Path(__file__).resolve().parent
mcp_src = here.parent / "mcp-server" / "src"
if mcp_src.is_dir() and str(mcp_src) not in sys.path:
sys.path.insert(0, str(mcp_src))
_setup_paths()
from legal_mcp.services import db, extractor # noqa: E402
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("backfill_chunk_pages")
def _resolve_local_path(db_path: str) -> Path:
p = Path(db_path)
if p.is_file():
return p
if str(p).startswith("/data/"):
local = Path("/home/chaim/legal-ai") / Path(*p.parts[1:])
if local.is_file():
return local
return p
async def _backfill_document(
document_id: UUID,
title: str,
db_file_path: str,
force: bool,
) -> dict:
pool = await db.get_pool()
# Fetch chunks for this document
chunks = await pool.fetch(
"SELECT id, chunk_index, content, page_number FROM document_chunks "
"WHERE document_id = $1 ORDER BY chunk_index",
document_id,
)
if not chunks:
return {"status": "no_chunks"}
n_null = sum(1 for c in chunks if c["page_number"] is None)
if not force and n_null == 0:
logger.info(" skip (all %d chunks already tagged): %s", len(chunks), title)
return {"status": "skipped", "chunks": len(chunks)}
pdf_path = _resolve_local_path(db_file_path)
if not pdf_path.is_file():
logger.warning(" file missing: %s (%s)", pdf_path, title)
return {"status": "missing"}
if pdf_path.suffix.lower() != ".pdf":
return {"status": "not_pdf"}
logger.info(" re-extracting %s (%d chunks, %d need page)",
title, len(chunks), n_null)
t0 = time.time()
text, page_count, page_offsets = await extractor.extract_text(str(pdf_path))
elapsed = time.time() - t0
if not page_offsets:
return {"status": "no_offsets"}
# Walk chunks, find each in the re-extracted text, assign page
pos = 0
updated = 0
not_found = 0
for c in chunks:
content = c["content"]
if not content:
continue
idx = text.find(content, pos)
if idx < 0:
idx = text.find(content) # global fallback
if idx < 0:
not_found += 1
continue
page = extractor.page_at_offset(idx, page_offsets)
await pool.execute(
"UPDATE document_chunks SET page_number = $1 WHERE id = $2",
page, c["id"],
)
updated += 1
# advance roughly past midpoint — chunks have overlap
pos = idx + max(1, len(content) // 2)
logger.info(
" done in %.1fs: extracted %d pages, updated %d/%d chunks, "
"%d not found", elapsed, page_count, updated, len(chunks), not_found,
)
return {
"status": "ok",
"elapsed_sec": round(elapsed, 1),
"pages": page_count,
"chunks_total": len(chunks),
"chunks_updated": updated,
"chunks_not_found": not_found,
}
async def backfill_cases(case_numbers: list[str], force: bool) -> dict:
pool = await db.get_pool()
summary: dict = {}
for cn in case_numbers:
logger.info("=" * 60)
logger.info("Case %s", cn)
case = await db.get_case_by_number(cn)
if not case:
logger.warning("Case not found: %s", cn)
summary[cn] = {"status": "case_not_found"}
continue
case_id = UUID(str(case["id"]))
docs = await pool.fetch(
"SELECT id, title, file_path FROM documents WHERE case_id = $1 ORDER BY title",
case_id,
)
logger.info(" %d documents", len(docs))
per_doc: list[dict] = []
for d in docs:
r = await _backfill_document(
UUID(str(d["id"])), d["title"], d["file_path"], force,
)
per_doc.append({"document_id": str(d["id"]), "title": d["title"], **r})
summary[cn] = {
"documents_total": len(docs),
"ok": sum(1 for r in per_doc if r["status"] == "ok"),
"skipped": sum(1 for r in per_doc if r["status"] == "skipped"),
"missing": sum(1 for r in per_doc if r["status"] == "missing"),
"no_chunks": sum(1 for r in per_doc if r["status"] == "no_chunks"),
"no_offsets": sum(1 for r in per_doc if r["status"] == "no_offsets"),
"chunks_updated": sum(r.get("chunks_updated", 0) for r in per_doc),
"documents": per_doc,
}
return summary
def main():
parser = argparse.ArgumentParser(description="Backfill page_number on existing chunks")
parser.add_argument("cases", nargs="+", help="Case numbers (e.g. 8174-24 8137-24)")
parser.add_argument(
"--force", action="store_true",
help="Re-extract even if all chunks already have page_number (default: skip)",
)
args = parser.parse_args()
summary = asyncio.run(backfill_cases(args.cases, force=args.force))
print()
print("=" * 60)
print("SUMMARY")
print("=" * 60)
for cn, s in summary.items():
if s.get("status") == "case_not_found":
print(f" {cn}: NOT FOUND")
continue
print(
f" {cn}: {s['documents_total']} docs — "
f"ok {s['ok']}, skipped {s['skipped']}, missing {s['missing']}, "
f"chunks_updated {s['chunks_updated']}"
)
if __name__ == "__main__":
main()

View File

@@ -3500,7 +3500,7 @@ async def _process_training_document(task_id: str, source: Path, req: ClassifyRe
# Extract text # Extract text
await _progress.set(task_id, {"status": "processing", "filename": req.filename, "step": "extracting"}) await _progress.set(task_id, {"status": "processing", "filename": req.filename, "step": "extracting"})
text, page_count = await extractor.extract_text(str(dest)) text, page_count, _ = await extractor.extract_text(str(dest))
# Parse date # Parse date
d_date = None d_date = None